Compare commits

...

59 Commits

Author SHA1 Message Date
Fran Arbanas 7b8458630a bump version 2024-12-20 14:22:07 +01:00
Fran Arbanas cf2ab08b4d fix dockerfile 2024-12-20 14:20:43 +01:00
Fran Arbanas 2466112829 test version 2024-12-20 13:19:18 +01:00
Fran Arbanas e5306908e4 feat: add entrypoint script 2024-12-20 13:18:52 +01:00
Tommy Verrall 2fab3f11b6 Merge pull request #5274 from nymtech/feature/nyx-chain-watcher
Nyx Chain Watcher
2024-12-19 17:34:36 +00:00
Sachin Kamath d0722e5f63 chain-watcher: try fix windows path 2024-12-19 21:07:50 +05:30
Sachin Kamath 64373548e4 chain-watcher: windows workaround for db path, add sqlx 2024-12-19 20:30:11 +05:30
Sachin Kamath bad85abff3 chain-watcher: bump version 2024-12-19 14:10:28 +00:00
Sachin Kamath 6e66cc2467 validator-rewarder: fix errors 2024-12-19 14:10:28 +00:00
Sachin Kamath c805aa79a4 nyx-chain-watcher: fallback to env variable when reading config 2024-12-19 14:10:28 +00:00
Mark Sinclair f5ca1ee20a Bump version 2024-12-19 14:10:28 +00:00
Sachin Kamath 4f07343efd api: fetch addresses from config. 2024-12-19 14:10:28 +00:00
Mark Sinclair 94ab78606a Bump version 2024-12-19 14:10:28 +00:00
Sachin Kamath 7b92e471c8 bugfix: dont manually set last_processed_height for pruning=nothing strat. 2024-12-19 14:10:28 +00:00
Sachin Kamath a507ffe371 chain-scraper : use tx module for parsing transactions 2024-12-19 14:10:28 +00:00
Mark Sinclair c02e93004f nyx-chain-watcher: return average price over 24 hours 2024-12-19 14:10:28 +00:00
Mark Sinclair 1113e0c599 formatting 2024-12-19 14:10:28 +00:00
Mark Sinclair 06c7394861 change webhook payload to have a structured coin for funds 2024-12-19 14:10:28 +00:00
Mark Sinclair e20bea9d32 bump version 2024-12-19 14:10:28 +00:00
Mark Sinclair eeea32fdca add websocket rpcs to env files 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński b06349efd0 added env variable to nuke the db 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 98a4cb4ae8 even more logs 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński be185824b4 extra logs 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 60e8e53f3b explicitly build websocket client in 0.37 compat mode 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 1890367bfc allow conversion from CometBFT block subscription 2024-12-19 14:10:28 +00:00
Mark Sinclair 2b26a88d6c Bump version 2024-12-19 14:10:28 +00:00
Mark Sinclair a6f4f017c7 Bump version 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński d8a6ca48c1 implemented starting block logic inside the chain scraper itself 2024-12-19 14:10:28 +00:00
Mark Sinclair 541d46e899 Fix docker entry point and bump version 2024-12-19 14:10:28 +00:00
Mark Sinclair 39f525e88e Add Dockerfile and workflow to build 2024-12-19 14:10:28 +00:00
Mark Sinclair 156e892baa parse message index and process all log entries 2024-12-19 14:10:28 +00:00
Mark Sinclair 5b6ae39dab init saves example config 2024-12-19 14:10:28 +00:00
Mark Sinclair df004f834f Add example to README 2024-12-19 14:10:28 +00:00
Mark Sinclair 235165171b Remove migration from seed app 2024-12-19 14:10:28 +00:00
Mark Sinclair 572875058d Add config, overrides and CLI 2024-12-19 14:10:28 +00:00
Mark Sinclair cf6f437187 Move nym-data-observatory (v0) to nyx-chain-watcher 2024-12-19 14:10:28 +00:00
Mark Sinclair 6010de978d data-observatory: renamed transactions to payments because there is already transaction in the base scraper schema 2024-12-19 14:10:28 +00:00
Mark Sinclair d951ea9548 nyxd-scraper: add optional starting height parameter to scrape before listening for new blocks 2024-12-19 14:10:28 +00:00
Sachin Kamath 868d7439ec observatory 0.1 2024-12-19 14:10:28 +00:00
Sachin Kamath a884aee1e9 fix review comments 2024-12-19 14:10:28 +00:00
Sachin Kamath 80f965a104 clippy 2024-12-19 14:10:28 +00:00
Sachin Kamath c99a240ed4 nyxd-scraper: add config to make pre-commit storage optional 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 67976b1b30 feature: wireguard metrics (#5278)
* experimental log

* introduce wireguard metrics updates

* add wireguard traffic rates to console logger

* missing import

* changed order of displayed values

* expose bytes information via rest endpoint

* clippy
2024-12-19 10:49:56 +00:00
Jędrzej Stuczyński a2322d6cdf feature: nym topology revamp (#5271)
* revamped NymTopology

* wip

* working e2e client

* updated nym-api

* updated nym-node

* updated rest of non-test code

* updated the rest of the codebase

* additional tweaks

* linux clippy fixes + adding additional dummy ipr types for better linting on non-linux targets
2024-12-19 10:44:34 +00:00
Jędrzej Stuczyński ae346bb75b bugfix: remove unnecessary arguments for nym-api swagger endpoints (#5272)
* removed incorrect body argument for '/rewarded-set' endpoint

* removed incorrect pagination parameters for monitor run results
2024-12-19 10:42:52 +00:00
Jon Häggblad 53c28af847 Add close to credential storage (#5283) (#5293)
* Add close method to credential storage

* wip
2024-12-18 21:51:00 +01:00
Bogdan-Ștefan Neacşu 3521f36374 Include IPINFO_API_TOKEN in nightly CI (#5285)
* Include IPINFO_API_TOKEN in nightly CI

* Fix beta clippy
2024-12-18 16:46:28 +02:00
Bogdan-Ștefan Neacşu 3695332036 Move tun constants to network defaults (#5286) 2024-12-18 15:03:21 +02:00
Jon Häggblad d03302c391 http-api-client: deduplicate code (#5267)
* Deduplicate code

* Remove unneeded async
2024-12-18 12:36:10 +01:00
mfahampshire cd86110b2c Max/crunch patch docs (#5284)
* patch changelog done

* auto commit generated command files
2024-12-18 10:37:45 +00:00
Mark Sinclair ad0c135d4c Bump credential proxy version 2024-12-17 20:35:42 +00:00
Bogdan-Ștefan Neacşu 61e872f033 Add windows to CI builds (#5269)
* Add windows to CI builds

* Fix win build for node status api

* Fix win build for sdk

* Fix win build for cred proxy
2024-12-17 15:18:11 +02:00
dynco-nym b4f51baf94 Change sqlite journal mode to WAL (#5213)
* Change sqlite journal mode to WAL

* Synchronous mode & auto vacuum

* Bump probe git ref to 1.1.0
2024-12-16 16:40:02 +01:00
Drazen Urch a3f3d83c1b Shipping raw metrics to PG (#5216)
* Shipping raw metrics to PG

* Put cancel token back in its place

* fmt
2024-12-16 16:19:37 +01:00
Drazen Urch 84d7004cb2 Add control messages to GatewayTransciver (#5247)
* Add control messages to GatewayTransciver

* Add forget me flag to clients

* CI gate IPIINFO test

* Handle ForgetMe for client and stats db

* fmt
2024-12-16 15:18:04 +01:00
import this be063a36eb syntax hotfix (#5266) 2024-12-16 13:17:38 +00:00
windy-ux 0a712b9fce Fix/web 615 seo setup (#5265)
* + add header into Packet Mixing docs

* + add head changes for testing

* / updated version of metatags in theme.config

* + add env file

* / theme.config to use NEXT_PUBLIC_SITE_URL from env file

* @ Fix broken link in theme.config

* - remove favicon code

* + add desription for intro pages

* + add default book's desriptions

* Revert "+ add desription for intro pages"

This reverts commit 98c78242d4.
2024-12-16 13:17:25 +00:00
Bogdan-Ștefan Neacşu 88d6fb4e22 Add fd callback to client core (#5230)
* Add fd callback to client core

* Include in sdk

* Fix clippy many args

* Method in builder

* Replace Box with Arc
2024-12-16 13:57:34 +02:00
Jon Häggblad 04c2045d94 Add PATCH support to nym-http-api-client (#5260) 2024-12-16 12:28:44 +01:00
271 changed files with 5695 additions and 3852 deletions
+1 -1
View File
@@ -30,7 +30,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ arc-ubuntu-20.04, custom-runner-mac-m1 ]
os: [ arc-ubuntu-20.04, custom-windows-11, custom-runner-mac-m1 ]
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
+1
View File
@@ -15,6 +15,7 @@ jobs:
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
IPINFO_API_TOKEN: ${{ secrets.IPINFO_API_TOKEN }}
continue-on-error: true
steps:
- name: Check out repository code
+7 -1
View File
@@ -51,4 +51,10 @@ ppa-private-key.b64
ppa-private-key.asc
nym-network-monitor/topology.json
nym-network-monitor/__pycache__
nym-network-monitor/*.key
nym-network-monitor/*.key
nym-network-monitor/.envrc
nym-network-monitor/.envrc
*.sqlite
.build
Generated
+130 -41
View File
@@ -2475,6 +2475,12 @@ dependencies = [
"once_cell",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fancy_constructor"
version = "1.2.2"
@@ -4978,6 +4984,7 @@ dependencies = [
"log",
"nym-network-defaults",
"serde",
"thiserror",
"toml 0.8.14",
"url",
]
@@ -5021,7 +5028,7 @@ dependencies = [
[[package]]
name = "nym-credential-proxy"
version = "0.1.6"
version = "0.1.7"
dependencies = [
"anyhow",
"async-trait",
@@ -5215,31 +5222,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "nym-data-observatory"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.7.7",
"chrono",
"clap 4.5.20",
"nym-bin-common",
"nym-network-defaults",
"nym-node-requests",
"nym-task",
"serde",
"serde_json",
"sqlx",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nym-dkg"
version = "0.1.0"
@@ -5782,6 +5764,7 @@ dependencies = [
"nym-bin-common",
"nym-client-core",
"nym-crypto",
"nym-gateway-requests",
"nym-network-defaults",
"nym-sdk",
"nym-sphinx",
@@ -5795,6 +5778,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-postgres",
"tokio-util",
"utoipa",
"utoipa-swagger-ui",
@@ -6551,10 +6535,7 @@ name = "nym-topology"
version = "0.1.0"
dependencies = [
"async-trait",
"bs58",
"log",
"nym-api-requests",
"nym-bin-common",
"nym-config",
"nym-crypto",
"nym-mixnet-contract-common",
@@ -6563,10 +6544,10 @@ dependencies = [
"nym-sphinx-types",
"rand",
"reqwest 0.12.4",
"semver 1.0.23",
"serde",
"serde_json",
"thiserror",
"tracing",
"tsify",
"wasm-bindgen",
"wasm-utils",
@@ -6799,6 +6780,7 @@ dependencies = [
"nym-crypto",
"nym-gateway-storage",
"nym-network-defaults",
"nym-node-metrics",
"nym-task",
"nym-wireguard-types",
"thiserror",
@@ -6854,6 +6836,40 @@ dependencies = [
"url",
]
[[package]]
name = "nyx-chain-watcher"
version = "0.1.8"
dependencies = [
"anyhow",
"async-trait",
"axum 0.7.7",
"chrono",
"clap 4.5.20",
"nym-bin-common",
"nym-config",
"nym-network-defaults",
"nym-node-requests",
"nym-task",
"nym-validator-client",
"nyxd-scraper",
"reqwest 0.12.4",
"rocket",
"schemars",
"serde",
"serde_json",
"sqlx",
"thiserror",
"time",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nyxd-scraper"
version = "0.1.0"
@@ -7250,6 +7266,24 @@ dependencies = [
"indexmap 2.2.6",
]
[[package]]
name = "phf"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
dependencies = [
"siphasher 0.3.11",
]
[[package]]
name = "pin-project"
version = "1.1.6"
@@ -7388,6 +7422,35 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0"
[[package]]
name = "postgres-protocol"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23"
dependencies = [
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand",
"sha2 0.10.8",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@@ -9722,6 +9785,32 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "tokio-postgres"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"rand",
"socket2",
"tokio",
"tokio-util",
"whoami",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
@@ -10678,9 +10767,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasm-bindgen"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e"
checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396"
dependencies = [
"cfg-if",
"once_cell",
@@ -10689,13 +10778,12 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358"
checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.90",
@@ -10716,9 +10804,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56"
checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -10726,9 +10814,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68"
checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2"
dependencies = [
"proc-macro2",
"quote",
@@ -10739,9 +10827,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d"
checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6"
[[package]]
name = "wasm-bindgen-test"
@@ -10912,6 +11000,7 @@ checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d"
dependencies = [
"redox_syscall 0.5.1",
"wasite",
"web-sys",
]
[[package]]
+6 -4
View File
@@ -118,8 +118,8 @@ members = [
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-network-monitor",
"nyx-chain-watcher",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
@@ -147,7 +147,9 @@ members = [
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract", "common/verloc", "tools/internal/mixnet-connectivity-check",
"tools/internal/testnet-manager/dkg-bypass-contract",
"common/verloc",
"tools/internal/mixnet-connectivity-check",
]
default-members = [
@@ -156,11 +158,11 @@ default-members = [
"explorer-api",
"nym-api",
"nym-credential-proxy/nym-credential-proxy",
"nym-data-observatory",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -403,7 +405,7 @@ indexed_db_futures = { git = "https://github.com/TiemenSch/rust-indexed-db", bra
js-sys = "0.3.70"
serde-wasm-bindgen = "0.6.5"
tsify = "0.4.5"
wasm-bindgen = "0.2.95"
wasm-bindgen = "0.2.99"
wasm-bindgen-futures = "0.4.45"
wasmtimer = "0.2.0"
web-sys = "0.3.72"
+2 -2
View File
@@ -3,7 +3,7 @@ name = "nym-client-core"
version = "1.1.15"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
rust-version = "1.70"
rust-version = "1.76"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -45,7 +45,7 @@ nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
nym-pemstore = { path = "../pemstore" }
nym-topology = { path = "../topology", features = ["serializable"] }
nym-topology = { path = "../topology", features = ["persistence"] }
nym-mixnet-client = { path = "../client-libs/mixnet-client", default-features = false }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
nym-task = { path = "../task" }
@@ -550,6 +550,15 @@ pub struct Topology {
/// Specifies a minimum performance of a gateway that is used on route construction.
/// This setting is only applicable when `NymApi` topology is used.
pub minimum_gateway_performance: u8,
/// Specifies whether this client should attempt to retrieve all available network nodes
/// as opposed to just active mixnodes/gateways.
/// Useless without `ignore_epoch_roles = true`
pub use_extended_topology: bool,
/// Specifies whether this client should ignore the current epoch role of the target egress node
/// when constructing the final hop packets.
pub ignore_egress_epoch_role: bool,
}
#[allow(clippy::large_enum_variant)]
@@ -586,6 +595,8 @@ impl Default for Topology {
topology_structure: TopologyStructure::default(),
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
use_extended_topology: false,
ignore_egress_epoch_role: false,
}
}
}
@@ -8,7 +8,10 @@ use crate::{
},
};
use log::{debug, error};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
#[derive(Debug, Clone)]
@@ -30,6 +33,9 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -110,7 +116,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
VALUES (?, ?, ?)
"#,
registered_gateway.gateway_id_bs58,
@@ -224,7 +230,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
VALUES (?, ?)
"#,
custom.gateway_id_bs58,
@@ -112,7 +112,7 @@ where
source,
}
})?;
hardcoded_topology.get_gateways()
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::current_gateways(
@@ -128,7 +128,7 @@ where
// make sure the list of available gateways doesn't overlap the list of known gateways
let available_gateways = available_gateways
.into_iter()
.filter(|g| !registered_gateways.contains(g.identity()))
.filter(|g| !registered_gateways.contains(&g.identity()))
.collect::<Vec<_>>();
if available_gateways.is_empty() {
@@ -167,7 +167,7 @@ where
source,
}
})?;
hardcoded_topology.get_gateways()
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::current_gateways(
@@ -3,7 +3,6 @@
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
@@ -24,7 +23,7 @@ use crate::client::replies::reply_storage::{
};
use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::{
nym_api_provider, TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
};
use crate::config::{Config, DebugConfig};
use crate::error::ClientCoreError;
@@ -32,7 +31,7 @@ use crate::init::{
setup_gateway,
types::{GatewaySetup, InitialisationResult},
};
use crate::{config, spawn_future};
use crate::{config, spawn_future, ForgetMe};
use futures::channel::mpsc;
use log::*;
use nym_bandwidth_controller::BandwidthController;
@@ -188,6 +187,11 @@ pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
user_agent: Option<UserAgent>,
setup_method: GatewaySetup,
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
forget_me: ForgetMe,
}
impl<'a, C, S> BaseClientBuilder<'a, C, S>
@@ -210,9 +214,18 @@ where
shutdown: None,
user_agent: None,
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
connection_fd_callback: None,
forget_me: Default::default(),
}
}
#[must_use]
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
self.forget_me = forget_me.clone();
self
}
#[must_use]
pub fn with_gateway_setup(mut self, setup: GatewaySetup) -> Self {
self.setup_method = setup;
@@ -261,6 +274,15 @@ where
Ok(self)
}
#[cfg(unix)]
pub fn with_connection_fd_callback(
mut self,
callback: Arc<dyn Fn(RawFd) + Send + Sync>,
) -> Self {
self.connection_fd_callback = Some(callback);
self
}
// note: do **NOT** make this method public as its only valid usage is from within `start_base`
// because it relies on the crypto keys being already loaded
fn mix_address(details: &InitialisationResult) -> Recipient {
@@ -352,6 +374,7 @@ where
controller.start_with_shutdown(shutdown)
}
#[allow(clippy::too_many_arguments)]
async fn start_gateway_client(
config: &Config,
initialisation_result: InitialisationResult,
@@ -359,6 +382,7 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
shutdown: TaskClient,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
@@ -401,6 +425,8 @@ where
packet_router,
bandwidth_controller,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
};
@@ -437,8 +463,8 @@ where
details_store
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
.await.map_err(|err| {
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
})?
}
@@ -462,6 +488,7 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
mut shutdown: TaskClient,
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
where
@@ -493,6 +520,8 @@ where
details_store,
packet_router,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
.await?;
@@ -509,15 +538,15 @@ where
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or_else(|| match config_topology.topology_structure {
config::TopologyStructure::NymApi => Box::new(NymApiTopologyProvider::new(
nym_api_provider::Config {
min_mixnode_performance: config_topology.minimum_mixnode_performance,
min_gateway_performance: config_topology.minimum_gateway_performance,
},
config_topology,
nym_api_urls,
user_agent,
)),
config::TopologyStructure::GeoAware(group_by) => {
Box::new(GeoAwareTopologyProvider::new(nym_api_urls, group_by))
warn!("using deprecated 'GeoAware' topology provider - this option will be removed very soon");
#[allow(deprecated)]
Box::new(crate::client::topology_control::GeoAwareTopologyProvider::new(nym_api_urls, group_by))
}
})
}
@@ -528,7 +557,7 @@ where
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: config::Topology,
topology_accessor: TopologyAccessor,
local_gateway: &NodeIdentity,
local_gateway: NodeIdentity,
wait_for_gateway: bool,
mut shutdown: TaskClient,
) -> Result<(), ClientCoreError> {
@@ -560,7 +589,7 @@ where
};
if let Err(err) = topology_refresher
.ensure_contains_gateway(local_gateway)
.ensure_contains_routable_egress(local_gateway)
.await
{
if let Some(waiting_timeout) = gateway_wait_timeout {
@@ -615,9 +644,11 @@ where
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown: TaskClient,
forget_me: ForgetMe,
) -> BatchMixMessageSender {
info!("Starting mix traffic controller...");
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_transceiver);
let (mix_traffic_controller, mix_tx) =
MixTrafficController::new(gateway_transceiver, forget_me);
mix_traffic_controller.start_with_shutdown(shutdown);
mix_tx
}
@@ -708,7 +739,8 @@ where
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor = TopologyAccessor::new();
let shared_topology_accessor =
TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
// Shutdown notifier for signalling tasks to stop
let shutdown = self
@@ -772,6 +804,8 @@ where
&details_store,
gateway_packet_router,
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
shutdown.fork("gateway_transceiver"),
)
.await?;
@@ -797,9 +831,11 @@ where
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
// The MixTrafficController then sends the actual traffic
let message_sender = Self::start_mix_traffic_controller(
gateway_transceiver,
shutdown.fork("mix_traffic_controller"),
self.forget_me,
);
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -163,6 +163,7 @@ impl LoopCoverTrafficStream<OsRng> {
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.our_full_destination,
Some(&self.our_full_destination),
@@ -28,7 +28,6 @@ pub enum InputMessage {
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Creates a message used for a duplex anonymous communication where the recipient
@@ -44,7 +43,6 @@ pub enum InputMessage {
data: Vec<u8>,
reply_surbs: u32,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
@@ -94,29 +92,6 @@ impl InputMessage {
recipient,
data,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
} else {
message
}
}
// IMHO `new_regular` should take `mix_hops: Option<u8>` as an argument instead of creating
// this function, but that would potentially break backwards compatibility with the current API
pub fn new_regular_with_custom_hops(
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Regular {
recipient,
data,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -137,7 +112,6 @@ impl InputMessage {
data,
reply_surbs,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -154,14 +128,12 @@ impl InputMessage {
reply_surbs: u32,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Anonymous {
recipient,
data,
reply_surbs,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -2,8 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::spawn_future;
use crate::{spawn_future, ForgetMe};
use log::*;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
@@ -26,10 +27,14 @@ pub struct MixTrafficController {
// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
consecutive_gateway_failure_count: usize,
forget_me: ForgetMe,
}
impl MixTrafficController {
pub fn new<T>(gateway_transceiver: T) -> (MixTrafficController, BatchMixMessageSender)
pub fn new<T>(
gateway_transceiver: T,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender)
where
T: GatewayTransceiver + Send + 'static,
{
@@ -40,6 +45,7 @@ impl MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_rx: message_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
},
message_sender,
)
@@ -47,6 +53,7 @@ impl MixTrafficController {
pub fn new_dynamic(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender) {
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
@@ -55,6 +62,7 @@ impl MixTrafficController {
gateway_transceiver,
mix_rx: message_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
},
message_sender,
)
@@ -111,7 +119,27 @@ impl MixTrafficController {
}
}
shutdown.recv_timeout().await;
if self.forget_me.any() {
log::info!("Sending forget me request to the gateway");
match self
.gateway_transceiver
.send_client_request(ClientRequest::ForgetMe {
client: self.forget_me.client(),
stats: self.forget_me.stats(),
})
.await
{
Ok(_) => {
log::info!("Successfully sent forget me request to the gateway");
}
Err(err) => {
log::error!("Failed to send forget me request to the gateway: {err}");
}
}
}
log::debug!("MixTrafficController: Exiting");
})
});
}
}
@@ -5,8 +5,10 @@ use async_trait::async_trait;
use log::{debug, error};
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::identity;
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::GatewayClient;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
@@ -26,9 +28,14 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
}
/// This combines combines the functionalities of being able to send and receive mix packets.
#[async_trait]
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> identity::PublicKey;
fn ws_fd(&self) -> Option<RawFd>;
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError>;
}
/// This trait defines the functionality of sending `MixPacket` into the mixnet,
@@ -65,6 +72,7 @@ pub trait GatewayReceiver {
}
// to allow for dynamic dispatch
#[async_trait]
impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
#[inline]
fn gateway_identity(&self) -> identity::PublicKey {
@@ -73,6 +81,13 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
fn ws_fd(&self) -> Option<RawFd> {
(**self).ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
(**self).send_client_request(message).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -91,7 +106,6 @@ impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
(**self).batch_send_mix_packets(packets).await
}
}
impl<G: GatewayReceiver + ?Sized> GatewayReceiver for Box<G> {
#[inline]
fn set_packet_router(&mut self, packet_router: PacketRouter) -> Result<(), ErasedGatewayError> {
@@ -111,6 +125,7 @@ impl<C, St> RemoteGateway<C, St> {
}
}
#[async_trait]
impl<C, St> GatewayTransceiver for RemoteGateway<C, St>
where
C: DkgQueryClient + Send + Sync,
@@ -123,6 +138,20 @@ where
fn ws_fd(&self) -> Option<RawFd> {
self.gateway_client.ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.gateway_client.shared_key() {
self.gateway_client
.send_websocket_message(message.encrypt(&*shared_key)?)
.await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -195,6 +224,7 @@ impl LocalGateway {
mod nonwasm_sealed {
use super::*;
#[async_trait]
impl GatewayTransceiver for LocalGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.local_identity
@@ -202,6 +232,13 @@ mod nonwasm_sealed {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
#[async_trait]
@@ -269,6 +306,7 @@ impl GatewaySender for MockGateway {
}
}
#[async_trait]
impl GatewayTransceiver for MockGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.dummy_identity
@@ -276,4 +314,11 @@ impl GatewayTransceiver for MockGateway {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
@@ -73,11 +73,10 @@ where
content: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_plain_message(recipient, content, lane, packet_type, mix_hops)
.try_send_plain_message(recipient, content, lane, packet_type)
.await
{
warn!("failed to send a plain message - {err}")
@@ -91,18 +90,10 @@ where
reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_message_with_reply_surbs(
recipient,
content,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.try_send_message_with_reply_surbs(recipient, content, reply_surbs, lane, packet_type)
.await
{
warn!("failed to send a repliable message - {err}")
@@ -115,9 +106,8 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, PacketType::Mix, mix_hops)
self.handle_plain_message(recipient, data, lane, PacketType::Mix)
.await
}
InputMessage::Anonymous {
@@ -125,17 +115,9 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
PacketType::Mix,
mix_hops,
)
.await
self.handle_repliable_message(recipient, data, reply_surbs, lane, PacketType::Mix)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -153,9 +135,8 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, packet_type, mix_hops)
self.handle_plain_message(recipient, data, lane, packet_type)
.await
}
InputMessage::Anonymous {
@@ -163,17 +144,9 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.await
self.handle_repliable_message(recipient, data, reply_surbs, lane, packet_type)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -70,7 +70,6 @@ pub(crate) struct PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
destination: PacketDestination,
mix_hops: Option<u8>,
retransmissions: u32,
}
@@ -80,13 +79,11 @@ impl PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
recipient: Recipient,
mix_hops: Option<u8>,
) -> Self {
PendingAcknowledgement {
message_chunk,
delay,
destination: PacketDestination::KnownRecipient(recipient.into()),
mix_hops,
retransmissions: 0,
}
}
@@ -104,9 +101,6 @@ impl PendingAcknowledgement {
recipient_tag,
extra_surb_request,
},
// Messages sent using SURBs are using the number of mix hops set by the recipient when
// they provided the SURBs, so it doesn't make sense to include it here.
mix_hops: None,
retransmissions: 0,
}
}
@@ -52,18 +52,12 @@ where
packet_recipient: Recipient,
chunk_data: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("retransmitting normal packet...");
// TODO: Figure out retransmission packet type signaling
self.message_handler
.try_prepare_single_chunk_for_sending(
packet_recipient,
chunk_data,
packet_type,
mix_hops,
)
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type)
.await
}
@@ -110,7 +104,6 @@ where
**recipient,
timed_out_ack.message_chunk.clone(),
packet_type,
timed_out_ack.mix_hops,
)
.await
}
@@ -15,11 +15,11 @@ use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessa
use nym_sphinx::anonymous_replies::{ReplySurb, SurbEncryptionKey};
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, PacketType, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
use std::sync::Arc;
@@ -100,10 +100,6 @@ pub(crate) struct Config {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
/// Primary predefined packet size used for the encapsulated messages.
primary_packet_size: PacketSize,
@@ -125,19 +121,11 @@ impl Config {
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
primary_packet_size: PacketSize::default(),
secondary_packet_size: None,
}
}
/// Allows setting non-default number of expected mix hops in the network.
#[allow(dead_code)]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
/// Allows setting non-default size of the sphinx packets sent out.
pub fn with_custom_primary_packet_size(mut self, packet_size: PacketSize) -> Self {
self.primary_packet_size = packet_size;
@@ -185,9 +173,7 @@ where
config.sender_address,
config.average_packet_delay,
config.average_ack_delay,
)
.with_mix_hops(config.num_mix_hops);
);
MessageHandler {
config,
rng,
@@ -216,7 +202,7 @@ where
fn get_topology<'a>(
&self,
permit: &'a TopologyReadPermit<'a>,
) -> Result<&'a NymTopology, PreparationError> {
) -> Result<&'a NymRouteProvider, PreparationError> {
match permit.try_get_valid_topology_ref(&self.config.sender_address, None) {
Ok(topology_ref) => Ok(topology_ref),
Err(err) => {
@@ -233,9 +219,8 @@ where
return self.config.primary_packet_size;
};
let primary_count =
msg.required_packets(self.config.primary_packet_size, self.config.num_mix_hops);
let secondary_count = msg.required_packets(secondary_packet, self.config.num_mix_hops);
let primary_count = msg.required_packets(self.config.primary_packet_size);
let secondary_count = msg.required_packets(secondary_packet);
trace!("This message would require: {primary_count} primary packets or {secondary_count} secondary packets...");
// if there would be no benefit in using the secondary packet - use the primary (duh)
@@ -424,10 +409,9 @@ where
message: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
let message = NymMessage::new_plain(message);
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
.await
}
@@ -437,7 +421,6 @@ where
recipient: Recipient,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending non-reply message with packet type {packet_type}");
// TODO: I really dislike existence of this assertion, it implies code has to be re-organised
@@ -470,7 +453,6 @@ where
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)?;
let real_message = RealMessage::new(
@@ -478,8 +460,7 @@ where
Some(fragment.fragment_identifier()),
);
let delay = prepared_fragment.total_delay;
let pending_ack =
PendingAcknowledgement::new_known(fragment, delay, recipient, mix_hops);
let pending_ack = PendingAcknowledgement::new_known(fragment, delay, recipient);
real_messages.push(real_message);
pending_acks.push(pending_ack);
@@ -496,7 +477,6 @@ where
recipient: Recipient,
amount: u32,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending additional reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -513,7 +493,6 @@ where
recipient,
TransmissionLane::AdditionalReplySurbs,
packet_type,
mix_hops,
)
.await?;
@@ -530,7 +509,6 @@ where
num_reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), SurbWrappedPreparationError> {
debug!("Sending message with reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -541,7 +519,7 @@ where
let message =
NymMessage::new_repliable(RepliableMessage::new_data(message, sender_tag, reply_surbs));
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
.await?;
log::trace!("storing {} reply keys", reply_keys.len());
@@ -555,23 +533,18 @@ where
recipient: Recipient,
chunk: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("Sending single chunk with packet type {packet_type}");
let topology_permit = self.topology_access.get_read_permit().await;
let topology = self.get_topology(&topology_permit)?;
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)
.unwrap();
let prepared_fragment = self.message_preparer.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
)?;
Ok(prepared_fragment)
}
@@ -624,16 +597,13 @@ where
Err(err) => return Err(err.return_surbs(vec![reply_surb])),
};
let prepared_fragment = self
.message_preparer
.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)
.unwrap();
let prepared_fragment = self.message_preparer.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)?;
Ok(prepared_fragment)
}
@@ -230,6 +230,7 @@ where
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.config.our_full_destination,
Some(&self.config.our_full_destination),
@@ -516,7 +516,6 @@ where
recipient,
to_send,
nym_sphinx::params::PacketType::Mix,
self.config.reply_surbs.surb_mix_hops,
)
.await
{
@@ -2,8 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::DEFAULT_NUM_MIX_HOPS;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -17,29 +16,36 @@ pub struct TopologyAccessorInner {
// few seconds, while reads are needed every single packet generated.
// However, proper benchmarks will be needed to determine if `RwLock` is indeed a better
// approach than a `Mutex`
topology: RwLock<Option<NymTopology>>,
topology: RwLock<NymRouteProvider>,
}
impl TopologyAccessorInner {
fn new() -> Self {
fn new(initial: NymRouteProvider) -> Self {
TopologyAccessorInner {
controlled_manually: AtomicBool::new(false),
released_manual_control: Notify::new(),
topology: RwLock::new(None),
topology: RwLock::new(initial),
}
}
async fn update(&self, new: Option<NymTopology>) {
*self.topology.write().await = new;
let mut guard = self.topology.write().await;
match new {
Some(updated) => {
guard.update(updated);
}
None => guard.clear_topology(),
}
}
}
pub struct TopologyReadPermit<'a> {
permit: RwLockReadGuard<'a, Option<NymTopology>>,
permit: RwLockReadGuard<'a, NymRouteProvider>,
}
impl Deref for TopologyReadPermit<'_> {
type Target = Option<NymTopology>;
type Target = NymRouteProvider;
fn deref(&self) -> &Self::Target {
&self.permit
@@ -53,43 +59,31 @@ impl<'a> TopologyReadPermit<'a> {
&'a self,
ack_recipient: &Recipient,
packet_recipient: Option<&Recipient>,
) -> Result<&'a NymTopology, NymTopologyError> {
) -> Result<&'a NymRouteProvider, NymTopologyError> {
let route_provider = self.permit.deref();
let topology = &route_provider.topology;
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
let topology = self
.permit
.as_ref()
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
topology.ensure_not_empty()?;
// 2. does it have any mixnode at all?
// 3. does it have any gateways at all?
// 4. does it have a mixnode on each layer?
topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS)?;
// 2. does the topology have a node on each mixing layer?
topology.ensure_minimally_routable()?;
// 5. does it contain OUR gateway (so that we could create an ack packet)?
if !topology.gateway_exists(ack_recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: ack_recipient.gateway().to_base58_string(),
});
}
// 3. does it contain OUR gateway (so that we could create an ack packet)?
let _ = route_provider.egress_by_identity(ack_recipient.gateway())?;
// 6. for our target recipient, does it contain THEIR gateway (so that we could create
// 4. for our target recipient, does it contain THEIR gateway (so that we send anything over?)
if let Some(recipient) = packet_recipient {
if !topology.gateway_exists(recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: recipient.gateway().to_base58_string(),
});
}
let _ = route_provider.egress_by_identity(recipient.gateway())?;
}
Ok(topology)
Ok(route_provider)
}
}
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
fn from(read_permit: RwLockReadGuard<'a, Option<NymTopology>>) -> Self {
TopologyReadPermit {
permit: read_permit,
}
impl<'a> From<RwLockReadGuard<'a, NymRouteProvider>> for TopologyReadPermit<'a> {
fn from(permit: RwLockReadGuard<'a, NymRouteProvider>) -> Self {
TopologyReadPermit { permit }
}
}
@@ -99,9 +93,11 @@ pub struct TopologyAccessor {
}
impl TopologyAccessor {
pub fn new() -> Self {
pub fn new(ignore_egress_epoch_roles: bool) -> Self {
TopologyAccessor {
inner: Arc::new(TopologyAccessorInner::new()),
inner: Arc::new(TopologyAccessorInner::new(NymRouteProvider::new_empty(
ignore_egress_epoch_roles,
))),
}
}
@@ -121,8 +117,21 @@ impl TopologyAccessor {
self.inner.released_manual_control.notified().await
}
#[deprecated(note = "use .current_route_provider instead")]
pub async fn current_topology(&self) -> Option<NymTopology> {
self.inner.topology.read().await.clone()
self.current_route_provider()
.await
.as_ref()
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
} else {
Some(provider)
}
}
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
@@ -140,15 +149,11 @@ impl TopologyAccessor {
// only used by the client at startup to get a slightly more reasonable error message
// (currently displays as unused because health checker is disabled due to required changes)
pub async fn ensure_is_routable(&self) -> Result<(), NymTopologyError> {
match self.inner.topology.read().await.deref() {
None => Err(NymTopologyError::EmptyNetworkTopology),
Some(ref topology) => topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS),
}
}
}
impl Default for TopologyAccessor {
fn default() -> Self {
TopologyAccessor::new()
self.inner
.topology
.read()
.await
.topology
.ensure_minimally_routable()
}
}
@@ -3,7 +3,6 @@ use log::{debug, error};
use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond};
use nym_network_defaults::var_names::EXPLORER_API;
use nym_topology::{
nym_topology_from_basic_info,
provider_trait::{async_trait, TopologyProvider},
NymTopology,
};
@@ -15,8 +14,6 @@ use url::Url;
pub use nym_country_group::CountryGroup;
const MIN_NODES_PER_LAYER: usize = 1;
fn create_explorer_client() -> Option<ExplorerClient> {
let Ok(explorer_api_url) = std::env::var(EXPLORER_API) else {
error!("Missing EXPLORER_API");
@@ -63,30 +60,20 @@ fn log_mixnode_distribution(mixnodes: &HashMap<CountryGroup, Vec<NodeId>>) {
}
fn check_layer_integrity(topology: NymTopology) -> Result<(), ()> {
let mixes = topology.mixes();
if mixes.keys().len() < 3 {
if topology.ensure_minimally_routable().is_err() {
error!("Layer is missing in topology!");
return Err(());
}
for (layer, mixnodes) in mixes {
debug!("Layer {:?} has {} mixnodes", layer, mixnodes.len());
if mixnodes.len() < MIN_NODES_PER_LAYER {
error!(
"There are only {} mixnodes in layer {:?}",
mixnodes.len(),
layer
);
return Err(());
}
}
Ok(())
}
#[deprecated(note = "use NymApiTopologyProvider instead as explorer API will soon be removed")]
pub struct GeoAwareTopologyProvider {
validator_client: nym_validator_client::client::NymApiClient,
filter_on: GroupBy,
}
#[allow(deprecated)]
impl GeoAwareTopologyProvider {
pub fn new(mut nym_api_urls: Vec<Url>, filter_on: GroupBy) -> GeoAwareTopologyProvider {
log::info!(
@@ -104,6 +91,15 @@ impl GeoAwareTopologyProvider {
}
async fn get_topology(&self) -> Option<NymTopology> {
let rewarded_set = self
.validator_client
.get_current_rewarded_set()
.await
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let mut topology = NymTopology::new_empty(rewarded_set);
let mixnodes = match self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
@@ -187,7 +183,8 @@ impl GeoAwareTopologyProvider {
.filter(|m| filtered_mixnode_ids.contains(&m.node_id))
.collect::<Vec<_>>();
let topology = nym_topology_from_basic_info(&mixnodes, &gateways);
topology.add_skimmed_nodes(&mixnodes);
topology.add_skimmed_nodes(&gateways);
// TODO: return real error type
check_layer_integrity(topology.clone()).ok()?;
@@ -196,6 +193,7 @@ impl GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -205,6 +203,7 @@ impl TopologyProvider for GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(target_arch = "wasm32")]
#[async_trait(?Send)]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -19,6 +19,7 @@ mod accessor;
pub mod geo_aware_provider;
pub mod nym_api_provider;
#[allow(deprecated)]
pub use geo_aware_provider::GeoAwareTopologyProvider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::TopologyProvider;
@@ -27,7 +28,7 @@ pub use nym_topology::provider_trait::TopologyProvider;
const MAX_FAILURE_COUNT: usize = 10;
pub struct TopologyRefresherConfig {
refresh_rate: Duration,
pub refresh_rate: Duration,
}
impl TopologyRefresherConfig {
@@ -96,28 +97,24 @@ impl TopologyRefresher {
self.topology_accessor.ensure_is_routable().await
}
pub async fn ensure_contains_gateway(
pub async fn ensure_contains_routable_egress(
&self,
gateway: &NodeIdentity,
egress: NodeIdentity,
) -> Result<(), NymTopologyError> {
let topology = self
.topology_accessor
.current_topology()
.current_route_provider()
.await
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
if !topology.gateway_exists(gateway) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: gateway.to_base58_string(),
});
}
let _ = topology.egress_by_identity(egress)?;
Ok(())
}
pub async fn wait_for_gateway(
&mut self,
gateway: &NodeIdentity,
gateway: NodeIdentity,
timeout_duration: Duration,
) -> Result<(), NymTopologyError> {
info!(
@@ -135,7 +132,7 @@ impl TopologyRefresher {
})
}
_ = self.try_refresh() => {
if self.ensure_contains_gateway(gateway).await.is_ok() {
if self.ensure_contains_routable_egress(gateway).await.is_ok() {
return Ok(())
}
info!("gateway '{gateway}' is still not online...");
@@ -4,32 +4,39 @@
use async_trait::async_trait;
use log::{debug, error, warn};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::NymTopology;
use nym_validator_client::UserAgent;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
use url::Url;
// the same values as our current (10.06.24) blacklist
pub const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
pub const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
#[derive(Debug)]
pub struct Config {
pub min_mixnode_performance: u8,
pub min_gateway_performance: u8,
pub use_extended_topology: bool,
pub ignore_egress_epoch_role: bool,
}
impl Default for Config {
fn default() -> Self {
// old values that decided on blacklist membership
impl From<nym_client_core_config_types::Topology> for Config {
fn from(value: nym_client_core_config_types::Topology) -> Self {
Config {
min_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
min_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
min_mixnode_performance: value.minimum_mixnode_performance,
min_gateway_performance: value.minimum_gateway_performance,
use_extended_topology: value.use_extended_topology,
ignore_egress_epoch_role: value.ignore_egress_epoch_role,
}
}
}
impl Config {
// if we're using 'extended' topology, filter the nodes based on the lowest set performance
fn min_node_performance(&self) -> u8 {
min(self.min_mixnode_performance, self.min_gateway_performance)
}
}
pub struct NymApiTopologyProvider {
config: Config,
@@ -39,7 +46,11 @@ pub struct NymApiTopologyProvider {
}
impl NymApiTopologyProvider {
pub fn new(config: Config, mut nym_api_urls: Vec<Url>, user_agent: Option<UserAgent>) -> Self {
pub fn new(
config: impl Into<Config>,
mut nym_api_urls: Vec<Url>,
user_agent: Option<UserAgent>,
) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
let validator_client = if let Some(user_agent) = user_agent {
@@ -52,7 +63,7 @@ impl NymApiTopologyProvider {
};
NymApiTopologyProvider {
config,
config: config.into(),
validator_client,
nym_api_urls,
currently_used_api: 0,
@@ -70,70 +81,69 @@ impl NymApiTopologyProvider {
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
}
/// Verifies whether nodes a reasonably distributed among all mix layers.
///
/// In ideal world we would have 33% nodes on layer 1, 33% on layer 2 and 33% on layer 3.
/// However, this is a rather unrealistic expectation, instead we check whether there exists
/// a layer with more than 66% of nodes or with fewer than 15% and if so, we trigger a failure.
///
/// # Arguments
///
/// * `topology`: active topology constructed from validator api data
fn check_layer_distribution(
&self,
active_topology: &NymTopology,
) -> Result<(), NymTopologyError> {
let lower_threshold = 0.15;
let upper_threshold = 0.66;
active_topology.ensure_even_layer_distribution(lower_threshold, upper_threshold)
}
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
let mixnodes = match self
let rewarded_set = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
.get_current_rewarded_set()
.await
{
Err(err) => {
error!("failed to get network mixnodes - {err}");
return None;
}
Ok(mixes) => mixes,
};
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let gateways = match self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
{
Err(err) => {
error!("failed to get network gateways - {err}");
return None;
}
Ok(gateways) => gateways,
};
let mut topology = NymTopology::new_empty(rewarded_set);
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
if self.config.use_extended_topology {
let all_nodes = self
.validator_client
.get_all_basic_nodes()
.await
.inspect_err(|err| error!("failed to get network nodes: {err}"))
.ok()?;
let topology = NymTopology::from_unordered(
mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}),
gateways.iter().filter(|g| {
g.performance.round_to_integer() >= self.config.min_gateway_performance
}),
);
if let Err(err) = self.check_layer_distribution(&topology) {
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
self.use_next_nym_api();
None
debug!(
"there are {} nodes on the network (before filtering)",
all_nodes.len()
);
topology.add_additional_nodes(all_nodes.iter().filter(|n| {
n.performance.round_to_integer() >= self.config.min_node_performance()
}));
} else {
Some(topology)
// if we're not using extended topology, we're only getting active set mixnodes and gateways
let mixnodes = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network mixnodes: {err}"))
.ok()?;
// TODO: we really should be getting ACTIVE gateways only
let gateways = self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network gateways: {err}"))
.ok()?;
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
topology.add_additional_nodes(mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}));
topology.add_additional_nodes(gateways.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_gateway_performance
}));
};
if !topology.is_minimally_routable() {
error!("the current filtered active topology can't be used to construct any packets");
return None;
}
Some(topology)
}
}
@@ -142,7 +152,11 @@ impl NymApiTopologyProvider {
#[async_trait]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
self.get_current_compatible_topology().await
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
}
}
@@ -150,6 +164,10 @@ impl TopologyProvider for NymApiTopologyProvider {
#[async_trait(?Send)]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
self.get_current_compatible_topology().await
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
}
}
+7 -4
View File
@@ -4,8 +4,8 @@
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_topology::gateway::GatewayConversionError;
use nym_topology::NymTopologyError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::ValidatorClientError;
use std::error::Error;
use std::path::PathBuf;
@@ -74,10 +74,10 @@ pub enum ClientCoreError {
#[error("the gateway id is invalid - {0}")]
UnableToCreatePublicKeyFromGatewayId(Ed25519RecoveryError),
#[error("The gateway is malformed: {source}")]
#[error("the node is malformed: {source}")]
MalformedGateway {
#[from]
source: GatewayConversionError,
source: Box<RoutingNodeError>,
},
#[error("failed to establish connection to gateway: {source}")]
@@ -159,6 +159,9 @@ pub enum ClientCoreError {
#[error("the specified gateway '{gateway}' does not support the wss protocol")]
UnsupportedWssProtocol { gateway: String },
#[error("node {id} ({identity}) does not support mixnet entry mode")]
UnsupportedEntry { id: NodeId, identity: String },
#[error(
"failed to load custom topology using path '{}'. detailed message: {source}", file_path.display()
)]
+40 -20
View File
@@ -7,7 +7,7 @@ use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use nym_crypto::asymmetric::identity;
use nym_gateway_client::GatewayClient;
use nym_topology::gateway;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKeyRef;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
@@ -15,6 +15,7 @@ use std::{sync::Arc, time::Duration};
use tungstenite::Message;
use url::Url;
use nym_topology::NodeId;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
@@ -25,7 +26,6 @@ use tokio::time::Instant;
use tokio_tungstenite::connect_async;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
#[cfg(target_arch = "wasm32")]
@@ -48,22 +48,30 @@ const PING_TIMEOUT: Duration = Duration::from_millis(1000);
// The abstraction that some of these helpers use
pub trait ConnectableGateway {
fn identity(&self) -> &identity::PublicKey;
fn clients_address(&self) -> String;
fn node_id(&self) -> NodeId;
fn identity(&self) -> identity::PublicKey;
fn clients_address(&self, prefer_ipv6: bool) -> Option<String>;
fn is_wss(&self) -> bool;
}
impl ConnectableGateway for gateway::LegacyNode {
fn identity(&self) -> &identity::PublicKey {
self.identity()
impl ConnectableGateway for RoutingNode {
fn node_id(&self) -> NodeId {
self.node_id
}
fn clients_address(&self) -> String {
self.clients_address()
fn identity(&self) -> identity::PublicKey {
self.identity_key
}
fn clients_address(&self, prefer_ipv6: bool) -> Option<String> {
self.ws_entry_address(prefer_ipv6)
}
fn is_wss(&self) -> bool {
self.clients_wss_port.is_some()
self.entry
.as_ref()
.map(|e| e.clients_wss_port.is_some())
.unwrap_or_default()
}
}
@@ -83,7 +91,7 @@ pub async fn current_gateways<R: Rng>(
nym_apis: &[Url],
user_agent: Option<UserAgent>,
minimum_performance: u8,
) -> Result<Vec<gateway::LegacyNode>, ClientCoreError> {
) -> Result<Vec<RoutingNode>, ClientCoreError> {
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
@@ -104,7 +112,7 @@ pub async fn current_gateways<R: Rng>(
.iter()
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<gateway::LegacyNode>>();
.collect::<Vec<_>>();
log::debug!("After checking validity: {}", valid_gateways.len());
log::trace!("Valid gateways: {:#?}", valid_gateways);
@@ -134,7 +142,12 @@ async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, Client
where
G: ConnectableGateway,
{
let addr = gateway.clients_address();
let Some(addr) = gateway.clients_address(false) else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id(),
identity: gateway.identity().to_string(),
});
};
trace!(
"establishing connection to {} ({addr})...",
gateway.identity(),
@@ -190,7 +203,7 @@ where
Ok(GatewayWithLatency::new(gateway, avg))
}
pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone>(
pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
rng: &mut R,
gateways: &[G],
must_use_tls: bool,
@@ -205,7 +218,7 @@ pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone
let gateways_with_latency = Arc::new(tokio::sync::Mutex::new(Vec::new()));
futures::stream::iter(gateways)
.for_each_concurrent(CONCURRENT_GATEWAYS_MEASURED, |gateway| async {
let id = *gateway.identity();
let id = gateway.identity();
trace!("measuring latency to {id}...");
match measure_latency(gateway).await {
Ok(with_latency) => {
@@ -252,9 +265,9 @@ fn filter_by_tls<G: ConnectableGateway>(
pub(super) fn uniformly_random_gateway<R: Rng>(
rng: &mut R,
gateways: &[gateway::LegacyNode],
gateways: &[RoutingNode],
must_use_tls: bool,
) -> Result<gateway::LegacyNode, ClientCoreError> {
) -> Result<RoutingNode, ClientCoreError> {
filter_by_tls(gateways, must_use_tls)?
.choose(rng)
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
@@ -263,9 +276,9 @@ pub(super) fn uniformly_random_gateway<R: Rng>(
pub(super) fn get_specified_gateway(
gateway_identity: IdentityKeyRef,
gateways: &[gateway::LegacyNode],
gateways: &[RoutingNode],
must_use_tls: bool,
) -> Result<gateway::LegacyNode, ClientCoreError> {
) -> Result<RoutingNode, ClientCoreError> {
log::debug!("Requesting specified gateway: {}", gateway_identity);
let user_gateway = identity::PublicKey::from_base58_string(gateway_identity)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
@@ -275,7 +288,14 @@ pub(super) fn get_specified_gateway(
.find(|gateway| gateway.identity_key == user_gateway)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_identity.to_string()))?;
if must_use_tls && gateway.clients_wss_port.is_none() {
let Some(entry_details) = gateway.entry.as_ref() else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id,
identity: gateway.identity().to_string(),
});
};
if must_use_tls && entry_details.clients_wss_port.is_none() {
return Err(ClientCoreError::UnsupportedWssProtocol {
gateway: gateway_identity.to_string(),
});
+2 -2
View File
@@ -19,7 +19,7 @@ use crate::init::types::{
use nym_client_core_gateways_storage::GatewaysDetailsStore;
use nym_client_core_gateways_storage::{GatewayDetails, GatewayRegistration};
use nym_gateway_client::client::InitGatewayClient;
use nym_topology::gateway;
use nym_topology::node::RoutingNode;
use rand::rngs::OsRng;
use rand::{CryptoRng, RngCore};
use serde::Serialize;
@@ -50,7 +50,7 @@ async fn setup_new_gateway<K, D>(
key_store: &K,
details_store: &D,
selection_specification: GatewaySelectionSpecification,
available_gateways: Vec<gateway::LegacyNode>,
available_gateways: Vec<RoutingNode>,
) -> Result<InitialisationResult, ClientCoreError>
where
K: KeyStore,
+12 -5
View File
@@ -13,7 +13,7 @@ use nym_crypto::asymmetric::identity;
use nym_gateway_client::client::InitGatewayClient;
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::gateway;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKey;
use nym_validator_client::nyxd::AccountId;
use serde::Serialize;
@@ -38,16 +38,23 @@ pub enum SelectedGateway {
impl SelectedGateway {
pub fn from_topology_node(
node: gateway::LegacyNode,
node: RoutingNode,
must_use_tls: bool,
) -> Result<Self, ClientCoreError> {
// for now, let's use 'old' behaviour, if you want to change it, you can pass it up the enum stack yourself : )
let prefer_ipv6 = false;
let gateway_listener = if must_use_tls {
node.clients_address_tls()
node.ws_entry_address_tls()
.ok_or(ClientCoreError::UnsupportedWssProtocol {
gateway: node.identity_key.to_base58_string(),
})?
} else {
node.clients_address()
node.ws_entry_address(prefer_ipv6)
.ok_or(ClientCoreError::UnsupportedEntry {
id: node.node_id,
identity: node.identity_key.to_base58_string(),
})?
};
let gateway_listener =
@@ -200,7 +207,7 @@ pub enum GatewaySetup {
specification: GatewaySelectionSpecification,
// TODO: seems to be a bit inefficient to pass them by value
available_gateways: Vec<gateway::LegacyNode>,
available_gateways: Vec<RoutingNode>,
},
ReuseConnection {
+46 -2
View File
@@ -14,8 +14,7 @@ pub mod error;
pub mod init;
pub use nym_topology::{
HardcodedTopologyProvider, NymTopology, NymTopologyError, SerializableNymTopology,
SerializableTopologyError, TopologyProvider,
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
};
#[cfg(target_arch = "wasm32")]
@@ -34,3 +33,48 @@ where
{
tokio::spawn(future);
}
#[derive(Clone, Default, Debug)]
pub struct ForgetMe {
client: bool,
stats: bool,
}
impl ForgetMe {
pub fn new_all() -> Self {
Self {
client: true,
stats: true,
}
}
pub fn new_client() -> Self {
Self {
client: true,
stats: false,
}
}
pub fn new_stats() -> Self {
Self {
client: false,
stats: true,
}
}
pub fn new(client: bool, stats: bool) -> Self {
Self { client, stats }
}
pub fn any(&self) -> bool {
self.client || self.stats
}
pub fn client(&self) -> bool {
self.client
}
pub fn stats(&self) -> bool {
self.stats
}
}
@@ -9,7 +9,10 @@ use crate::backend::fs_backend::{
},
};
use log::{error, info};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
#[derive(Debug, Clone)]
@@ -31,6 +34,9 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(fresh)
.disable_statement_logging();
@@ -101,6 +101,10 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
// currently unused (but populated)
negotiated_protocol: Option<u8>,
// Callback on the fd as soon as the connection has been established
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
/// Listen to shutdown messages and send notifications back to the task manager
task_client: TaskClient,
}
@@ -116,6 +120,7 @@ impl<C, St> GatewayClient<C, St> {
packet_router: PacketRouter,
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
task_client: TaskClient,
) -> Self {
GatewayClient {
@@ -131,6 +136,8 @@ impl<C, St> GatewayClient<C, St> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
task_client,
}
}
@@ -205,6 +212,12 @@ impl<C, St> GatewayClient<C, St> {
};
self.connection = SocketState::Available(Box::new(ws_stream));
#[cfg(unix)]
if let (Some(callback), Some(fd)) = (self.connection_fd_callback.as_ref(), self.ws_fd()) {
callback.as_ref()(fd);
}
Ok(())
}
@@ -311,7 +324,7 @@ impl<C, St> GatewayClient<C, St> {
// If we want to send a message (with response), we need to have a full control over the socket,
// as we need to be able to write the request and read the subsequent response
async fn send_websocket_message(
pub async fn send_websocket_message(
&mut self,
msg: impl Into<Message>,
) -> Result<ServerResponse, GatewayClientError> {
@@ -1034,6 +1047,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_controller: None,
stats_reporter: ClientStatsSender::new(None),
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback: None,
task_client,
}
}
@@ -1064,6 +1079,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: self.negotiated_protocol,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
task_client,
}
}
@@ -32,10 +32,10 @@ use time::Date;
use url::Url;
pub use crate::nym_api::NymApiClientExt;
use nym_mixnet_contract_common::EpochRewardedSet;
pub use nym_mixnet_contract_common::{
mixnode::MixNodeDetails, GatewayBond, IdentityKey, IdentityKeyRef, NodeId, NymNodeDetails,
};
// re-export the type to not break existing imports
pub use crate::coconut::EcashApiClient;
@@ -367,6 +367,10 @@ impl NymApiClient {
Ok(self.nym_api.get_basic_gateways().await?.nodes)
}
pub async fn get_current_rewarded_set(&self) -> Result<EpochRewardedSet, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_set().await?.into())
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
pub async fn get_all_basic_entry_assigned_nodes(
@@ -13,7 +13,7 @@ use nym_api_requests::ecash::models::{
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, LegacyDescribedMixNode, NodePerformanceResponse,
NodeRefreshBody, NymNodeDescription,
NodeRefreshBody, NymNodeDescription, RewardedSetResponse,
};
use nym_api_requests::nym_nodes::PaginatedCachedNodesResponse;
use nym_api_requests::pagination::PaginatedResponse;
@@ -235,6 +235,15 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_set(&self) -> Result<RewardedSetResponse, NymAPIError> {
self.get_json(
&[routes::API_VERSION, "nym-nodes", "rewarded-set"],
NO_PARAMS,
)
.await
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
#[instrument(level = "debug", skip(self))]
@@ -912,6 +921,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn force_refresh_describe_cache(
&self,
request: &NodeRefreshBody,
@@ -924,6 +934,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_for(
&self,
expiration_date: Date,
@@ -940,6 +951,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_challenge(
&self,
expiration_date: Date,
@@ -26,10 +26,10 @@ use nym_mixnet_contract_common::{
reward_params::{Performance, RewardingParams},
rewarding::{EstimatedCurrentEpochRewardResponse, PendingRewardResponse},
ContractBuildInformation, ContractState, ContractStateParams, CurrentIntervalResponse,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochStatus, GatewayBond,
GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry, IdentityKey,
IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails, MixOwnershipResponse,
MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochRewardedSet, EpochStatus,
GatewayBond, GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry,
IdentityKey, IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails,
MixOwnershipResponse, MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
NumberOfPendingEventsResponse, NymNodeBond, NymNodeDetails, NymNodeVersionHistoryResponse,
PagedAllDelegationsResponse, PagedDelegatorDelegationsResponse, PagedGatewayResponse,
PagedMixnodeBondsResponse, PagedNodeDelegationsResponse, PendingEpochEvent,
@@ -670,7 +670,7 @@ impl<T> PagedMixnetQueryClient for T where T: MixnetQueryClient {}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MixnetQueryClientExt: MixnetQueryClient {
async fn get_rewarded_set(&self) -> Result<RewardedSet, NyxdError> {
async fn get_rewarded_set(&self) -> Result<EpochRewardedSet, NyxdError> {
let error_response = |message| Err(NyxdError::extension_query_failure("mixnet", message));
let metadata = self.get_rewarded_set_metadata().await?;
@@ -711,13 +711,16 @@ pub trait MixnetQueryClientExt: MixnetQueryClient {
return error_response("the nodes assigned for 'standby' returned unexpected epoch_id");
}
Ok(RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
Ok(EpochRewardedSet {
epoch_id: expected_epoch_id,
assignment: RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
},
})
}
}
+2 -1
View File
@@ -12,7 +12,8 @@ dirs = { workspace = true, optional = true }
handlebars = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
toml = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true, features = ["display"] }
url = { workspace = true }
nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
+10
View File
@@ -0,0 +1,10 @@
use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NymConfigTomlError {
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
TomlSerializeFailure(#[from] toml::ser::Error),
}
+37
View File
@@ -13,6 +13,7 @@ pub use helpers::{parse_urls, OptionalSet};
pub use toml::de::Error as TomlDeError;
pub mod defaults;
pub mod error;
pub mod helpers;
pub mod legacy_helpers;
pub mod serde_helpers;
@@ -95,6 +96,42 @@ where
config.format_to_writer(file)
}
pub fn save_unformatted_config_to_file<C, P>(
config: &C,
path: P,
) -> Result<(), error::NymConfigTomlError>
where
C: Serialize + ?Sized,
P: AsRef<Path>,
{
let path = path.as_ref();
log::info!("saving config file to {}", path.display());
if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}
let mut file = File::create(path)?;
// TODO: check for whether any of our configs store anything sensitive
// and change that to 0o644 instead
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(path, perms)?;
}
// let serde format the TOML in whatever ugly way it chooses
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
let toml_string = toml::to_string_pretty(config)?;
Ok(file.write_all(toml_string.as_bytes())?)
}
pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
where
C: DeserializeOwned,
@@ -7,6 +7,7 @@
use crate::constants::{TOKEN_SUPPLY, UNIT_DELEGATION_BASE};
use crate::error::MixnetContractError;
use crate::helpers::IntoBaseDecimal;
use crate::nym_node::Role;
use crate::reward_params::{NodeRewardingParameters, RewardingParams};
use crate::rewarding::helpers::truncate_reward;
use crate::rewarding::RewardDistribution;
@@ -611,6 +612,16 @@ pub enum LegacyMixLayer {
Three = 3,
}
impl From<LegacyMixLayer> for Role {
fn from(layer: LegacyMixLayer) -> Self {
match layer {
LegacyMixLayer::One => Role::Layer1,
LegacyMixLayer::Two => Role::Layer2,
LegacyMixLayer::Three => Role::Layer3,
}
}
}
impl From<LegacyMixLayer> for String {
fn from(layer: LegacyMixLayer) -> Self {
(layer as u8).to_string()
@@ -3,6 +3,7 @@
use crate::config_score::{ConfigScoreParams, OutdatedVersionWeights, VersionScoreFormulaParams};
use crate::nym_node::Role;
use crate::EpochId;
use contracts_common::Percent;
use cosmwasm_schema::cw_serde;
use cosmwasm_std::Coin;
@@ -32,6 +33,23 @@ impl RoleAssignment {
}
}
#[cw_serde]
#[derive(Default)]
pub struct EpochRewardedSet {
pub epoch_id: EpochId,
pub assignment: RewardedSet,
}
impl From<(EpochId, RewardedSet)> for EpochRewardedSet {
fn from((epoch_id, assignment): (EpochId, RewardedSet)) -> Self {
EpochRewardedSet {
epoch_id,
assignment,
}
}
}
#[cw_serde]
#[derive(Default)]
pub struct RewardedSet {
@@ -69,6 +87,29 @@ impl RewardedSet {
pub fn rewarded_set_size(&self) -> usize {
self.active_set_size() + self.standby.len()
}
pub fn get_role(&self, node_id: NodeId) -> Option<Role> {
// given each role has ~100 entries in them, doing linear lookup with vec should be fine
if self.entry_gateways.contains(&node_id) {
return Some(Role::EntryGateway);
}
if self.exit_gateways.contains(&node_id) {
return Some(Role::ExitGateway);
}
if self.layer1.contains(&node_id) {
return Some(Role::Layer1);
}
if self.layer2.contains(&node_id) {
return Some(Role::Layer2);
}
if self.layer3.contains(&node_id) {
return Some(Role::Layer3);
}
if self.standby.contains(&node_id) {
return Some(Role::Standby);
}
None
}
}
#[cw_serde]
@@ -23,6 +23,11 @@ impl SqliteEcashTicketbookManager {
SqliteEcashTicketbookManager { connection_pool }
}
/// Closes the connection pool.
pub async fn close(&self) {
self.connection_pool.close().await
}
pub(crate) async fn cleanup_expired(&self, deadline: Date) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM ecash_ticketbook WHERE expiration_date <= ?",
@@ -43,6 +43,10 @@ impl Debug for EphemeralStorage {
impl Storage for EphemeralStorage {
type StorageError = StorageError;
async fn close(&self) {
// nothing to do here
}
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
self.storage_manager.cleanup_expired().await;
Ok(())
@@ -33,7 +33,10 @@ use nym_credentials::{
IssuanceTicketBook, IssuedTicketBook,
};
use nym_ecash_time::{ecash_today, Date, EcashTime};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use zeroize::Zeroizing;
@@ -56,6 +59,9 @@ impl PersistentStorage {
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -83,6 +89,10 @@ impl PersistentStorage {
impl Storage for PersistentStorage {
type StorageError = StorageError;
async fn close(&self) {
self.storage_manager.close().await
}
/// remove all expired ticketbooks and expiration date signatures
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
let ecash_yesterday = ecash_today().date().previous_day().unwrap();
+2
View File
@@ -22,6 +22,8 @@ use std::error::Error;
pub trait Storage: Send + Sync {
type StorageError: Error;
async fn close(&self);
/// remove all expired ticketbooks and expiration date signatures
async fn cleanup_expired(&self) -> Result<(), Self::StorageError>;
@@ -9,7 +9,7 @@ use futures::{Sink, Stream};
use rand::{CryptoRng, RngCore};
use tungstenite::Message as WsMessage;
impl<'a, S, R> State<'a, S, R> {
impl<S, R> State<'_, S, R> {
async fn client_handshake_inner(&mut self) -> Result<(), HandshakeError>
where
S: Stream<Item = WsItem> + Sink<WsMessage> + Unpin,
@@ -10,7 +10,7 @@ use crate::registration::handshake::{error::HandshakeError, WsItem};
use futures::{Sink, Stream};
use tungstenite::Message as WsMessage;
impl<'a, S, R> State<'a, S, R> {
impl<S, R> State<'_, S, R> {
async fn gateway_handshake_inner(
&mut self,
raw_init_message: Vec<u8>,
@@ -20,6 +20,10 @@ pub enum ClientRequest {
hkdf_salt: Vec<u8>,
derived_key_digest: Vec<u8>,
},
ForgetMe {
client: bool,
stats: bool,
},
}
impl ClientRequest {
@@ -11,6 +11,7 @@ use tungstenite::Message;
#[non_exhaustive]
pub enum SensitiveServerResponse {
KeyUpgradeAck {},
ForgetMeAck {},
}
impl SensitiveServerResponse {
+17 -1
View File
@@ -6,7 +6,10 @@ use models::StoredFinishedSession;
use nym_node_metrics::entry::{ActiveSession, FinishedSession, SessionType};
use nym_sphinx::DestinationAddressBytes;
use sessions::SessionManager;
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use time::Date;
use tracing::{debug, error};
@@ -36,6 +39,9 @@ impl PersistentStatsStorage {
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -116,6 +122,16 @@ impl PersistentStatsStorage {
.await?)
}
pub async fn delete_unique_user(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_unique_user(client_address.as_base58_string())
.await?)
}
pub async fn insert_active_session(
&self,
client_address: DestinationAddressBytes,
@@ -71,6 +71,16 @@ impl SessionManager {
Ok(())
}
pub(crate) async fn delete_unique_user(&self, client_address_b58: String) -> Result<()> {
sqlx::query!(
"DELETE FROM sessions_unique_users WHERE client_address = ?",
client_address_b58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn get_unique_users(&self, date: Date) -> Result<Vec<String>> {
sqlx::query_scalar!(
"SELECT client_address as count FROM sessions_unique_users WHERE day = ?",
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM message_store WHERE client_address_bs58 = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "3ea5542b21a41b14276a8fd6b870c61aa0ddd30fee2565803b88c6086bd2a734"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM available_bandwidth WHERE client_id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "a3cc707995b8215fa77738cd1a55f9e8d251a3e764104d2a54153895dee1a118"
}
+10
View File
@@ -49,6 +49,16 @@ impl BandwidthManager {
Ok(())
}
pub(crate) async fn remove_client(&self, client_id: i64) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM available_bandwidth WHERE client_id = ?",
client_id
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
/// Set the expiration date of the particular client to the provided date.
pub(crate) async fn set_expiration(
&self,
+13
View File
@@ -133,4 +133,17 @@ impl InboxManager {
.await?;
Ok(())
}
pub(crate) async fn remove_messages_for_client(
&self,
client_address_bs58: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM message_store WHERE client_address_bs58 = ?",
client_address_bs58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
}
+49 -1
View File
@@ -12,7 +12,10 @@ use nym_credentials_interface::ClientTicket;
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_sphinx::DestinationAddressBytes;
use shared_keys::SharedKeysManager;
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use tickets::TicketStorageManager;
use time::OffsetDateTime;
@@ -41,6 +44,33 @@ pub struct GatewayStorage {
}
impl GatewayStorage {
#[allow(dead_code)]
pub(crate) fn client_manager(&self) -> &ClientManager {
&self.client_manager
}
pub(crate) fn shared_key_manager(&self) -> &SharedKeysManager {
&self.shared_key_manager
}
pub(crate) fn inbox_manager(&self) -> &InboxManager {
&self.inbox_manager
}
pub(crate) fn bandwidth_manager(&self) -> &BandwidthManager {
&self.bandwidth_manager
}
#[allow(dead_code)]
pub(crate) fn ticket_manager(&self) -> &TicketStorageManager {
&self.ticket_manager
}
#[allow(dead_code)]
pub(crate) fn wireguard_peer_manager(&self) -> &wireguard_peers::WgPeerManager {
&self.wireguard_peer_manager
}
/// Initialises `PersistentStorage` using the provided path.
///
/// # Arguments
@@ -59,6 +89,9 @@ impl GatewayStorage {
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -101,6 +134,21 @@ impl GatewayStorage {
.await?)
}
pub async fn handle_forget_me(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), GatewayStorageError> {
let client_id = self.get_mixnet_client_id(client_address).await?;
self.inbox_manager()
.remove_messages_for_client(&client_address.as_base58_string())
.await?;
self.bandwidth_manager().remove_client(client_id).await?;
self.shared_key_manager()
.remove_shared_keys(&client_address.as_base58_string())
.await?;
Ok(())
}
pub async fn insert_shared_keys(
&self,
client_address: DestinationAddressBytes,
+241 -138
View File
@@ -192,6 +192,28 @@ impl Client {
&self.base_url
}
pub fn create_request<B, K, V>(
&self,
method: reqwest::Method,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: Option<&B>,
) -> RequestBuilder
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
{
let url = sanitize_url(&self.base_url, path, params);
let mut request = self.reqwest_client.request(method.clone(), url);
if let Some(body) = json_body {
request = request.json(body);
}
request
}
pub fn create_get_request<K, V>(
&self,
path: PathSegments<'_>,
@@ -205,38 +227,6 @@ impl Client {
self.reqwest_client.get(url)
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending GET request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.get(url).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.get(url).send().await?)
}
}
pub fn create_post_request<B, K, V>(
&self,
path: PathSegments<'_>,
@@ -252,36 +242,6 @@ impl Client {
self.reqwest_client.post(url).json(json_body)
}
async fn send_post_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.post(url).json(json_body).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.post(url).json(json_body).send().await?)
}
}
pub fn create_delete_request<K, V>(
&self,
path: PathSegments<'_>,
@@ -295,6 +255,88 @@ impl Client {
self.reqwest_client.delete(url)
}
pub fn create_patch_request<B, K, V>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> RequestBuilder
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
{
let url = sanitize_url(&self.base_url, path, params);
self.reqwest_client.patch(url).json(json_body)
}
async fn send_request<B, K, V, E>(
&self,
method: reqwest::Method,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: Option<&B>,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
let url = sanitize_url(&self.base_url, path, params);
let mut request = self.reqwest_client.request(method.clone(), url);
if let Some(body) = json_body {
request = request.json(body);
}
#[cfg(target_arch = "wasm32")]
{
Ok(
wasmtimer::tokio::timeout(self.request_timeout, request.send())
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(request.send().await?)
}
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::GET, path, params, None::<&()>)
.await
}
async fn send_post_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::POST, path, params, Some(json_body))
.await
}
pub async fn send_delete_request<K, V, E>(
&self,
path: PathSegments<'_>,
@@ -305,23 +347,24 @@ impl Client {
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending DELETE request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.delete(url).send(),
)
self.send_request(reqwest::Method::DELETE, path, params, None::<&()>)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.delete(url).send().await?)
}
pub async fn send_patch_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::PATCH, path, params, Some(json_body))
.await
}
#[instrument(level = "debug", skip_all)]
@@ -372,6 +415,56 @@ impl Client {
parse_response(res, false).await
}
pub async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
K: AsRef<str>,
V: AsRef<str>,
E: Display + DeserializeOwned,
{
let res = self.send_patch_request(path, params, json_body).await?;
parse_response(res, true).await
}
async fn call_json_endpoint<B, T, S, E>(
&self,
method: reqwest::Method,
endpoint: S,
json_body: Option<&B>,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str>,
{
let mut request = self
.reqwest_client
.request(method.clone(), self.base_url.join(endpoint.as_ref())?);
if let Some(body) = json_body {
request = request.json(body);
}
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(self.request_timeout, request.send())
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = { request.send().await? };
parse_response(res, false).await
}
#[instrument(level = "debug", skip_all)]
pub async fn get_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
@@ -379,27 +472,8 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.get(self.base_url.join(endpoint.as_ref())?)
.send(),
)
self.call_json_endpoint(reqwest::Method::GET, endpoint, None::<&()>)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.get(self.base_url.join(endpoint.as_ref())?)
.send()
.await?
};
parse_response(res, false).await
}
pub async fn post_json_endpoint<B, T, S, E>(
@@ -413,29 +487,8 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.post(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send(),
)
self.call_json_endpoint(reqwest::Method::POST, endpoint, Some(json_body))
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.post(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send()
.await?
};
parse_response(res, true).await
}
pub async fn delete_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
@@ -444,27 +497,23 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.delete(self.base_url.join(endpoint.as_ref())?)
.send(),
)
self.call_json_endpoint(reqwest::Method::DELETE, endpoint, None::<&()>)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
}
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.delete(self.base_url.join(endpoint.as_ref())?)
.send()
.await?
};
parse_response(res, false).await
pub async fn patch_json_endpoint<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str>,
{
self.call_json_endpoint(reqwest::Method::PATCH, endpoint, Some(json_body))
.await
}
}
@@ -509,6 +558,19 @@ pub trait ApiClient {
V: AsRef<str> + Sync,
E: Display + DeserializeOwned;
async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
K: AsRef<str> + Sync,
V: AsRef<str> + Sync,
E: Display + DeserializeOwned;
/// `get` json data from the provided absolute endpoint, i.e. for example `"/api/v1/mixnodes?since=12345"`
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
@@ -532,6 +594,17 @@ pub trait ApiClient {
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send;
async fn patch_json_data_at<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -581,6 +654,22 @@ impl ApiClient for Client {
self.delete_json(path, params).await
}
async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
K: AsRef<str> + Sync,
V: AsRef<str> + Sync,
E: Display + DeserializeOwned,
{
self.patch_json(path, params, json_body).await
}
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
for<'a> T: Deserialize<'a>,
@@ -612,6 +701,20 @@ impl ApiClient for Client {
{
self.delete_json_endpoint(endpoint).await
}
async fn patch_json_data_at<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send,
{
self.patch_json_endpoint(endpoint, json_body).await
}
}
// utility function that should solve the double slash problem in API urls forever.
@@ -63,6 +63,7 @@ impl From<v6::request::StaticConnectRequest> for v7::request::StaticConnectReque
}
}
#[allow(deprecated)]
impl From<v6::request::DynamicConnectRequest> for v7::request::DynamicConnectRequest {
fn from(dynamic_connect_request: v6::request::DynamicConnectRequest) -> Self {
Self {
@@ -51,6 +51,7 @@ impl IpPacketRequest {
)
}
#[allow(deprecated)]
pub fn new_dynamic_connect_request(
reply_to: Recipient,
reply_to_hops: Option<u8>,
@@ -285,6 +286,9 @@ pub struct DynamicConnectRequest {
// The number of mix node hops that responses should take, in addition to the entry and exit
// node. Zero means only client -> entry -> exit -> client.
#[deprecated(
note = "clients can no longer control number of hops to use. this field is scheduled for removal in V8"
)]
pub reply_to_hops: Option<u8>,
// The average delay at each mix node, in milliseconds. Currently this is not supported by the
+11
View File
@@ -57,3 +57,14 @@ pub mod wireguard {
pub const WG_TUN_DEVICE_IP_ADDRESS_V6: Ipv6Addr = Ipv6Addr::new(0xfc01, 0, 0, 0, 0, 0, 0, 0x1); // fc01::1
pub const WG_TUN_DEVICE_NETMASK_V6: u8 = 112;
}
pub mod mixnet_vpn {
use std::net::{Ipv4Addr, Ipv6Addr};
// The interface used to route traffic
pub const NYM_TUN_BASE_NAME: &str = "nymtun";
pub const NYM_TUN_DEVICE_ADDRESS_V4: Ipv4Addr = Ipv4Addr::new(10, 0, 0, 1);
pub const NYM_TUN_DEVICE_NETMASK_V4: Ipv4Addr = Ipv4Addr::new(255, 255, 0, 0);
pub const NYM_TUN_DEVICE_ADDRESS_V6: Ipv6Addr = Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 0x1); // fc00::1
pub const NYM_TUN_DEVICE_NETMASK_V6: &str = "112";
}
+33 -31
View File
@@ -2,10 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::NetworkTestingError;
use crate::node::TestableNode;
use crate::NodeId;
use crate::node::{NodeType, TestableNode};
use nym_sphinx::message::NymMessage;
use nym_topology::{gateway, mix};
use nym_topology::node::RoutingNode;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
@@ -26,73 +25,76 @@ pub struct TestMessage<T = Empty> {
}
impl<T> TestMessage<T> {
pub fn new<N: Into<TestableNode>>(node: N, msg_id: u32, total_msgs: u32, ext: T) -> Self {
pub fn new(tested_node: TestableNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
TestMessage {
tested_node: node.into(),
tested_node,
msg_id,
total_msgs,
ext,
}
}
pub fn new_mix(node: &mix::LegacyNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(node, msg_id, total_msgs, ext)
pub fn new_mix(node: &RoutingNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(
TestableNode::new_routing(node, NodeType::Mixnode),
msg_id,
total_msgs,
ext,
)
}
// pub fn new_gateway(node: &gateway::Node, msg_id: u32, total_msgs: u32, ext: T) -> Self {
// Self::new(node, msg_id, total_msgs, ext)
// }
pub fn new_serialized<N>(
node: N,
msg_id: u32,
total_msgs: u32,
ext: T,
) -> Result<Vec<u8>, NetworkTestingError>
where
N: Into<TestableNode>,
T: Serialize,
{
Self::new(node, msg_id, total_msgs, ext).as_bytes()
pub fn new_gateway(node: &RoutingNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(
TestableNode::new_routing(node, NodeType::Gateway),
msg_id,
total_msgs,
ext,
)
}
pub fn new_plaintexts<N>(
node: &N,
pub fn new_plaintexts(
node: TestableNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
for<'a> &'a N: Into<TestableNode>,
T: Serialize + Clone,
{
let mut msgs = Vec::with_capacity(total_msgs as usize);
for msg_id in 1..=total_msgs {
msgs.push(Self::new(node, msg_id, total_msgs, ext.clone()).as_bytes()?)
msgs.push(Self::new(node.clone(), msg_id, total_msgs, ext.clone()).as_bytes()?)
}
Ok(msgs)
}
pub fn mix_plaintexts(
node: &mix::LegacyNode,
node: &RoutingNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
T: Serialize + Clone,
{
Self::new_plaintexts(node, total_msgs, ext)
Self::new_plaintexts(
TestableNode::new_routing(node, NodeType::Mixnode),
total_msgs,
ext,
)
}
pub fn legacy_gateway_plaintexts(
node: &gateway::LegacyNode,
node_id: NodeId,
node: &RoutingNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
T: Serialize + Clone,
{
Self::new_plaintexts(&(node, node_id), total_msgs, ext)
Self::new_plaintexts(
TestableNode::new_routing(node, NodeType::Gateway),
total_msgs,
ext,
)
}
pub fn as_json_string(&self) -> Result<String, NetworkTestingError>
+9 -33
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::NodeId;
use nym_topology::{gateway, mix};
use nym_topology::node::RoutingNode;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
@@ -24,6 +24,14 @@ impl TestableNode {
}
}
pub fn new_routing(routing_node: &RoutingNode, typ: NodeType) -> Self {
TestableNode::new(
routing_node.identity_key.to_base58_string(),
typ,
routing_node.node_id,
)
}
pub fn new_mixnode(encoded_identity: String, node_id: NodeId) -> Self {
TestableNode::new(encoded_identity, NodeType::Mixnode, node_id)
}
@@ -37,38 +45,6 @@ impl TestableNode {
}
}
impl<'a> From<&'a mix::LegacyNode> for TestableNode {
fn from(value: &'a mix::LegacyNode) -> Self {
TestableNode {
encoded_identity: value.identity_key.to_base58_string(),
typ: NodeType::Mixnode,
node_id: value.mix_id,
}
}
}
impl<'a> From<(&'a gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): (&'a gateway::LegacyNode, NodeId)) -> Self {
(&(gateway, node_id)).into()
}
}
impl<'a> From<&'a (gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): &'a (gateway::LegacyNode, NodeId)) -> Self {
(gateway, *node_id).into()
}
}
impl<'a, 'b> From<&'a (&'b gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): &'a (&'b gateway::LegacyNode, NodeId)) -> Self {
TestableNode {
encoded_identity: gateway.identity_key.to_base58_string(),
typ: NodeType::Gateway,
node_id: *node_id,
}
}
}
impl Display for TestableNode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
+39 -93
View File
@@ -2,21 +2,22 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::NetworkTestingError;
use crate::Empty;
use crate::NodeId;
use crate::TestMessage;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx::params::PacketSize;
use nym_sphinx::preparer::{FragmentPreparer, PreparedFragment};
use nym_sphinx_params::PacketType;
use nym_topology::{gateway, mix, NymTopology};
use nym_topology::node::RoutingNode;
use nym_topology::{NymRouteProvider, NymTopology, Role};
use rand::{CryptoRng, Rng};
use serde::Serialize;
use std::sync::Arc;
use std::time::Duration;
pub use nym_topology::node::LegacyMixLayer;
pub struct NodeTester<R> {
rng: R,
@@ -38,10 +39,6 @@ pub struct NodeTester<R> {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
// while acks are going to be ignored they still need to be constructed
// so that the gateway would be able to correctly process and forward the message
ack_key: Arc<AckKey>,
@@ -70,41 +67,27 @@ where
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
ack_key,
}
}
/// Allows setting non-default number of expected mix hops in the network.
#[allow(dead_code)]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
pub fn testable_mix_topology(&self, node: &mix::LegacyNode) -> NymTopology {
pub fn testable_mix_topology(&self, layer: LegacyMixLayer, node: &RoutingNode) -> NymTopology {
let mut topology = self.base_topology.clone();
topology.set_mixes_in_layer(node.layer as u8, vec![node.clone()]);
topology.set_testable_node(layer.into(), node.clone());
topology
}
pub fn testable_gateway_topology(&self, gateway: &gateway::LegacyNode) -> NymTopology {
pub fn testable_gateway_topology(&self, node: &RoutingNode) -> NymTopology {
let mut topology = self.base_topology.clone();
topology.set_gateways(vec![gateway.clone()]);
topology.set_testable_node(Role::EntryGateway, node.clone());
topology.set_testable_node(Role::ExitGateway, node.clone());
topology
}
pub fn simple_mixnode_test_packets(
&mut self,
mix: &mix::LegacyNode,
test_packets: u32,
) -> Result<Vec<PreparedFragment>, NetworkTestingError> {
self.mixnode_test_packets(mix, Empty, test_packets, None)
}
pub fn mixnode_test_packets<T>(
&mut self,
mix: &mix::LegacyNode,
mix: &RoutingNode,
legacy_mix_layer: LegacyMixLayer,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -112,7 +95,9 @@ where
where
T: Serialize + Clone,
{
let ephemeral_topology = self.testable_mix_topology(mix);
let ephemeral_topology =
NymRouteProvider::from(self.testable_mix_topology(legacy_mix_layer, mix))
.with_ignore_egress_epoch_roles(true);
let mut packets = Vec::with_capacity(test_packets as usize);
for plaintext in TestMessage::mix_plaintexts(mix, test_packets, msg_ext)? {
@@ -128,7 +113,7 @@ where
pub fn mixnodes_test_packets<T>(
&mut self,
nodes: &[mix::LegacyNode],
nodes: &[(LegacyMixLayer, RoutingNode)],
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -137,9 +122,10 @@ where
T: Serialize + Clone,
{
let mut packets = Vec::new();
for node in nodes {
for (layer, node) in nodes {
packets.append(&mut self.mixnode_test_packets(
node,
*layer,
msg_ext.clone(),
test_packets,
custom_recipient,
@@ -149,26 +135,10 @@ where
Ok(packets)
}
pub fn existing_mixnode_test_packets<T>(
&mut self,
mix_id: NodeId,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
) -> Result<Vec<PreparedFragment>, NetworkTestingError>
where
T: Serialize + Clone,
{
let Some(node) = self.base_topology.find_mix(mix_id) else {
return Err(NetworkTestingError::NonExistentMixnode { mix_id });
};
self.mixnode_test_packets(&node.clone(), msg_ext, test_packets, custom_recipient)
}
pub fn existing_identity_mixnode_test_packets<T>(
&mut self,
encoded_mix_identity: String,
layer: LegacyMixLayer,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -176,22 +146,30 @@ where
where
T: Serialize + Clone,
{
let Some(node) = self
.base_topology
.find_mix_by_identity(&encoded_mix_identity)
else {
let Ok(identity) = encoded_mix_identity.parse() else {
return Err(NetworkTestingError::NonExistentMixnodeIdentity {
mix_identity: encoded_mix_identity,
});
};
self.mixnode_test_packets(&node.clone(), msg_ext, test_packets, custom_recipient)
let Some(node) = self.base_topology.find_node_by_identity(identity) else {
return Err(NetworkTestingError::NonExistentMixnodeIdentity {
mix_identity: encoded_mix_identity,
});
};
self.mixnode_test_packets(
&node.clone(),
layer,
msg_ext,
test_packets,
custom_recipient,
)
}
pub fn legacy_gateway_test_packets<T>(
&mut self,
gateway: &gateway::LegacyNode,
node_id: NodeId,
gateway: &RoutingNode,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -199,12 +177,11 @@ where
where
T: Serialize + Clone,
{
let ephemeral_topology = self.testable_gateway_topology(gateway);
let ephemeral_topology = NymRouteProvider::from(self.testable_gateway_topology(gateway))
.with_ignore_egress_epoch_roles(true);
let mut packets = Vec::with_capacity(test_packets as usize);
for plaintext in
TestMessage::legacy_gateway_plaintexts(gateway, node_id, test_packets, msg_ext)?
{
for plaintext in TestMessage::legacy_gateway_plaintexts(gateway, test_packets, msg_ext)? {
packets.push(self.wrap_plaintext_data(
plaintext,
&ephemeral_topology,
@@ -215,36 +192,10 @@ where
Ok(packets)
}
pub fn existing_gateway_test_packets<T>(
&mut self,
node_id: NodeId,
encoded_gateway_identity: String,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
) -> Result<Vec<PreparedFragment>, NetworkTestingError>
where
T: Serialize + Clone,
{
let Some(node) = self.base_topology.find_gateway(&encoded_gateway_identity) else {
return Err(NetworkTestingError::NonExistentGateway {
gateway_identity: encoded_gateway_identity,
});
};
self.legacy_gateway_test_packets(
&node.clone(),
node_id,
msg_ext,
test_packets,
custom_recipient,
)
}
pub fn wrap_plaintext_data(
&mut self,
plaintext: Vec<u8>,
topology: &NymTopology,
topology: &NymRouteProvider,
custom_recipient: Option<Recipient>,
) -> Result<PreparedFragment, NetworkTestingError> {
let message = NymMessage::new_plain(plaintext);
@@ -274,14 +225,13 @@ where
&address,
&address,
PacketType::Mix,
None,
)?)
}
pub fn create_test_packet<T>(
&mut self,
message: &TestMessage<T>,
topology: &NymTopology,
topology: &NymRouteProvider,
custom_recipient: Option<Recipient>,
) -> Result<PreparedFragment, NetworkTestingError>
where
@@ -307,10 +257,6 @@ impl<R: CryptoRng + Rng> FragmentPreparer for NodeTester<R> {
1
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
fn average_packet_delay(&self) -> Duration {
self.average_packet_delay
}
@@ -115,10 +115,7 @@ pub fn aggregate_signatures(
let params = ecash_group_parameters();
// aggregate the signature
let signature = match Aggregatable::aggregate(signatures, indices) {
Ok(res) => res,
Err(err) => return Err(err),
};
let signature = Aggregatable::aggregate(signatures, indices)?;
// Ensure the aggregated signature is not an infinity point
if bool::from(signature.is_at_infinity()) {
@@ -8,10 +8,10 @@ use nym_sphinx_addressing::nodes::{
NymNodeRoutingAddress, NymNodeRoutingAddressError, MAX_NODE_ADDRESS_UNPADDED_LEN,
};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_params::PacketType;
use nym_sphinx_types::delays::Delay;
use nym_sphinx_types::{NymPacket, NymPacketError, MIN_PACKET_SIZE};
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use std::time;
@@ -43,14 +43,13 @@ impl SurbAck {
ack_key: &AckKey,
marshaled_fragment_id: [u8; 5],
average_delay: time::Duration,
topology: &NymTopology,
topology: &NymRouteProvider,
packet_type: PacketType,
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
let route = topology.random_route_to_egress(rng, recipient.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
+2 -2
View File
@@ -131,8 +131,8 @@ impl Recipient {
&self.client_encryption_key
}
pub fn gateway(&self) -> &NodeIdentity {
&self.gateway
pub fn gateway(&self) -> NodeIdentity {
self.gateway
}
pub fn to_bytes(self) -> RecipientBytes {
@@ -6,9 +6,9 @@ use nym_crypto::{generic_array::typenum::Unsigned, Digest};
use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, MAX_NODE_ADDRESS_UNPADDED_LEN};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
use nym_sphinx_types::{NymPacket, SURBMaterial, SphinxError, SURB};
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use serde::de::{Error as SerdeError, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -89,13 +89,12 @@ impl ReplySurb {
rng: &mut R,
recipient: &Recipient,
average_delay: time::Duration,
topology: &NymTopology,
topology: &NymRouteProvider,
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
let route = topology.random_route_to_egress(rng, recipient.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
@@ -110,15 +109,12 @@ impl ReplySurb {
/// Returns the expected number of bytes the [`ReplySURB`] will take after serialization.
/// Useful for deserialization from a bytes stream.
pub fn serialized_len(mix_hops: u8) -> usize {
pub fn serialized_len() -> usize {
use nym_sphinx_types::{HEADER_SIZE, NODE_ADDRESS_LENGTH, PAYLOAD_KEY_SIZE};
// the SURB itself consists of SURB_header, first hop address and set of payload keys
// (note extra 1 for the gateway)
SurbEncryptionKeySize::USIZE
+ HEADER_SIZE
+ NODE_ADDRESS_LENGTH
+ (1 + mix_hops as usize) * PAYLOAD_KEY_SIZE
// for each hop (3x mix + egress)
SurbEncryptionKeySize::USIZE + HEADER_SIZE + NODE_ADDRESS_LENGTH + 4 * PAYLOAD_KEY_SIZE
}
pub fn encryption_key(&self) -> &SurbEncryptionKey {
@@ -169,10 +169,7 @@ impl RepliableMessage {
.collect()
}
pub fn try_from_bytes(
bytes: &[u8],
num_mix_hops: u8,
) -> Result<Self, InvalidReplyRequestError> {
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, InvalidReplyRequestError> {
if bytes.len() < SENDER_TAG_SIZE + 1 {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
@@ -180,11 +177,8 @@ impl RepliableMessage {
AnonymousSenderTag::from_bytes(bytes[..SENDER_TAG_SIZE].try_into().unwrap());
let content_tag = RepliableMessageContentTag::try_from(bytes[SENDER_TAG_SIZE])?;
let content = RepliableMessageContent::try_from_bytes(
&bytes[SENDER_TAG_SIZE + 1..],
num_mix_hops,
content_tag,
)?;
let content =
RepliableMessageContent::try_from_bytes(&bytes[SENDER_TAG_SIZE + 1..], content_tag)?;
Ok(RepliableMessage {
sender_tag,
@@ -192,23 +186,20 @@ impl RepliableMessage {
})
}
pub fn serialized_size(&self, num_mix_hops: u8) -> usize {
pub fn serialized_size(&self) -> usize {
let content_type_size = 1;
SENDER_TAG_SIZE + content_type_size + self.content.serialized_size(num_mix_hops)
SENDER_TAG_SIZE + content_type_size + self.content.serialized_size()
}
}
// this recovery code is shared between all variants containing reply surbs
fn recover_reply_surbs(
bytes: &[u8],
num_mix_hops: u8,
) -> Result<(Vec<ReplySurb>, usize), InvalidReplyRequestError> {
fn recover_reply_surbs(bytes: &[u8]) -> Result<(Vec<ReplySurb>, usize), InvalidReplyRequestError> {
let mut consumed = mem::size_of::<u32>();
if bytes.len() < consumed {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
let num_surbs = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
let surb_size = ReplySurb::serialized_len(num_mix_hops);
let surb_size = ReplySurb::serialized_len();
if bytes[consumed..].len() < num_surbs as usize * surb_size {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
@@ -307,14 +298,13 @@ impl RepliableMessageContent {
fn try_from_bytes(
bytes: &[u8],
num_mix_hops: u8,
tag: RepliableMessageContentTag,
) -> Result<Self, InvalidReplyRequestError> {
if bytes.is_empty() {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
let (reply_surbs, n) = recover_reply_surbs(bytes, num_mix_hops)?;
let (reply_surbs, n) = recover_reply_surbs(bytes)?;
match tag {
RepliableMessageContentTag::Data => Ok(RepliableMessageContent::Data {
@@ -340,7 +330,7 @@ impl RepliableMessageContent {
}
}
fn serialized_size(&self, num_mix_hops: u8) -> usize {
fn serialized_size(&self) -> usize {
match self {
RepliableMessageContent::Data {
message,
@@ -348,19 +338,18 @@ impl RepliableMessageContent {
} => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag
+ reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
+ reply_surbs.len() * ReplySurb::serialized_len()
+ message.len()
}
RepliableMessageContent::AdditionalSurbs { reply_surbs } => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag + reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
num_reply_surbs_tag + reply_surbs.len() * ReplySurb::serialized_len()
}
RepliableMessageContent::Heartbeat {
additional_reply_surbs,
} => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag
+ additional_reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
num_reply_surbs_tag + additional_reply_surbs.len() * ReplySurb::serialized_len()
}
}
}
@@ -578,11 +567,11 @@ mod tests {
}
}
pub(super) fn reply_surb(rng: &mut ChaCha20Rng, num_mix_hops: u8) -> ReplySurb {
pub(super) fn reply_surb(rng: &mut ChaCha20Rng) -> ReplySurb {
// due to gateway
let num_hops = num_mix_hops + 1;
let route = (0..num_hops).map(|_| node(rng)).collect();
let delays = (0..num_hops)
const HOPS: u8 = 4;
let route = (0..HOPS).map(|_| node(rng)).collect();
let delays = (0..HOPS)
.map(|_| Delay::new_from_nanos(rng.next_u64()))
.collect();
let mut destination_bytes = [0u8; 32];
@@ -605,47 +594,40 @@ mod tests {
}
}
pub(super) fn reply_surbs(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
n: usize,
) -> Vec<ReplySurb> {
pub(super) fn reply_surbs(rng: &mut ChaCha20Rng, n: usize) -> Vec<ReplySurb> {
let mut surbs = Vec::with_capacity(n);
for _ in 0..n {
surbs.push(reply_surb(rng, num_mix_hops))
surbs.push(reply_surb(rng))
}
surbs
}
pub(super) fn repliable_content_data(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
msg_len: usize,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::Data {
message: random_vec_u8(rng, msg_len),
reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
reply_surbs: reply_surbs(rng, surbs),
}
}
pub(super) fn repliable_content_surbs(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::AdditionalSurbs {
reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
reply_surbs: reply_surbs(rng, surbs),
}
}
pub(super) fn repliable_content_heartbeat(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::Heartbeat {
additional_reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
additional_reply_surbs: reply_surbs(rng, surbs),
}
}
@@ -676,70 +658,54 @@ mod tests {
#[test]
fn serialized_size_matches_actual_serialization() {
let mut rng = fixtures::test_rng();
let num_mix_hops = 3;
let data1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 10000, 0),
content: fixtures::repliable_content_data(&mut rng, 10000, 0),
};
assert_eq!(
data1.serialized_size(num_mix_hops),
data1.into_bytes().len()
);
assert_eq!(data1.serialized_size(), data1.into_bytes().len());
let data2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 10, 100),
content: fixtures::repliable_content_data(&mut rng, 10, 100),
};
assert_eq!(
data2.serialized_size(num_mix_hops),
data2.into_bytes().len()
);
assert_eq!(data2.serialized_size(), data2.into_bytes().len());
let data3 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 100000, 1000),
content: fixtures::repliable_content_data(&mut rng, 100000, 1000),
};
assert_eq!(
data3.serialized_size(num_mix_hops),
data3.into_bytes().len()
);
assert_eq!(data3.serialized_size(), data3.into_bytes().len());
let additional_surbs1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1),
content: fixtures::repliable_content_surbs(&mut rng, 1),
};
assert_eq!(
additional_surbs1.serialized_size(num_mix_hops),
additional_surbs1.serialized_size(),
additional_surbs1.into_bytes().len()
);
let additional_surbs2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1000),
content: fixtures::repliable_content_surbs(&mut rng, 1000),
};
assert_eq!(
additional_surbs2.serialized_size(num_mix_hops),
additional_surbs2.serialized_size(),
additional_surbs2.into_bytes().len()
);
let heartbeat1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1),
content: fixtures::repliable_content_heartbeat(&mut rng, 1),
};
assert_eq!(
heartbeat1.serialized_size(num_mix_hops),
heartbeat1.into_bytes().len()
);
assert_eq!(heartbeat1.serialized_size(), heartbeat1.into_bytes().len());
let heartbeat2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1000),
content: fixtures::repliable_content_heartbeat(&mut rng, 1000),
};
assert_eq!(
heartbeat2.serialized_size(num_mix_hops),
heartbeat2.into_bytes().len()
);
assert_eq!(heartbeat2.serialized_size(), heartbeat2.into_bytes().len());
}
}
@@ -750,49 +716,33 @@ mod tests {
#[test]
fn serialized_size_matches_actual_serialization() {
let mut rng = fixtures::test_rng();
let num_mix_hops = 3;
let data1 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 10000, 0);
assert_eq!(
data1.serialized_size(num_mix_hops),
data1.into_bytes().len()
);
let data1 = fixtures::repliable_content_data(&mut rng, 10000, 0);
assert_eq!(data1.serialized_size(), data1.into_bytes().len());
let data2 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 10, 100);
assert_eq!(
data2.serialized_size(num_mix_hops),
data2.into_bytes().len()
);
let data2 = fixtures::repliable_content_data(&mut rng, 10, 100);
assert_eq!(data2.serialized_size(), data2.into_bytes().len());
let data3 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 100000, 1000);
assert_eq!(
data3.serialized_size(num_mix_hops),
data3.into_bytes().len()
);
let data3 = fixtures::repliable_content_data(&mut rng, 100000, 1000);
assert_eq!(data3.serialized_size(), data3.into_bytes().len());
let additional_surbs1 = fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1);
let additional_surbs1 = fixtures::repliable_content_surbs(&mut rng, 1);
assert_eq!(
additional_surbs1.serialized_size(num_mix_hops),
additional_surbs1.serialized_size(),
additional_surbs1.into_bytes().len()
);
let additional_surbs2 = fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1000);
let additional_surbs2 = fixtures::repliable_content_surbs(&mut rng, 1000);
assert_eq!(
additional_surbs2.serialized_size(num_mix_hops),
additional_surbs2.serialized_size(),
additional_surbs2.into_bytes().len()
);
let heartbeat1 = fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1);
assert_eq!(
heartbeat1.serialized_size(num_mix_hops),
heartbeat1.into_bytes().len()
);
let heartbeat1 = fixtures::repliable_content_heartbeat(&mut rng, 1);
assert_eq!(heartbeat1.serialized_size(), heartbeat1.into_bytes().len());
let heartbeat2 = fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1000);
assert_eq!(
heartbeat2.serialized_size(num_mix_hops),
heartbeat2.into_bytes().len()
);
let heartbeat2 = fixtures::repliable_content_heartbeat(&mut rng, 1000);
assert_eq!(heartbeat2.serialized_size(), heartbeat2.into_bytes().len());
}
}
+6 -17
View File
@@ -69,11 +69,11 @@ pub mod monitoring {
}
}
pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey, hops: u8) {
pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey) {
if enabled() {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_SENT.entry(id).or_default();
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination, hops);
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination);
entry.push(s);
}
}
@@ -82,16 +82,11 @@ pub mod monitoring {
#[derive(Debug, Clone)]
pub struct FragmentMixParams {
destination: PublicKey,
hops: u8,
}
impl FragmentMixParams {
pub fn destination(&self) -> &PublicKey {
&self.destination
}
pub fn hops(&self) -> u8 {
self.hops
pub fn destination(&self) -> PublicKey {
self.destination
}
}
@@ -105,14 +100,8 @@ pub struct SentFragment {
}
impl SentFragment {
fn new(
header: FragmentHeader,
at: u64,
client_nonce: i32,
destination: PublicKey,
hops: u8,
) -> Self {
let mixnet_params = FragmentMixParams { destination, hops };
fn new(header: FragmentHeader, at: u64, client_nonce: i32, destination: PublicKey) -> Self {
let mixnet_params = FragmentMixParams { destination };
SentFragment {
header,
at,
+5 -8
View File
@@ -10,11 +10,9 @@ use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_chunking::fragment::COVER_FRAG_ID;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, PacketType, DEFAULT_NUM_MIX_HOPS,
};
use nym_sphinx_params::{PacketEncryptionAlgorithm, PacketHkdfAlgorithm, PacketType};
use nym_sphinx_types::NymPacket;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use std::time;
@@ -36,7 +34,7 @@ pub enum CoverMessageError {
pub fn generate_loop_cover_surb_ack<R>(
rng: &mut R,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
full_address: &Recipient,
average_ack_delay: time::Duration,
@@ -59,7 +57,7 @@ where
#[allow(clippy::too_many_arguments)]
pub fn generate_loop_cover_packet<R>(
rng: &mut R,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
full_address: &Recipient,
average_ack_delay: time::Duration,
@@ -118,8 +116,7 @@ where
.chain(cover_content)
.collect();
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, full_address.gateway())?;
let route = topology.random_route_to_egress(rng, full_address.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
let destination = full_address.as_sphinx_destination();
-4
View File
@@ -16,10 +16,6 @@ pub mod packet_sizes;
pub mod packet_types;
pub mod packet_version;
// If somebody can provide an argument why it might be reasonable to have more than 255 mix hops,
// I will change this to [`usize`]
pub const DEFAULT_NUM_MIX_HOPS: u8 = 3;
// TODO: not entirely sure how to feel about those being defined here, ideally it'd be where [`Fragment`]
// is defined, but that'd introduce circular dependencies as the acknowledgements crate also needs
// access to that
+1 -33
View File
@@ -1,19 +1,10 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx_types::{delays, Delay};
use std::time::Duration;
use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_types::{delays, Delay, Node};
use thiserror::Error;
pub trait SphinxRouteMaker {
type Error;
fn sphinx_route(&mut self, hops: u8, destination: &Recipient)
-> Result<Vec<Node>, Self::Error>;
}
#[derive(Debug, Error, Clone, Copy)]
#[error("the route vector contains {available} nodes while {requested} hops are required")]
pub struct InvalidNumberOfHops {
@@ -21,29 +12,6 @@ pub struct InvalidNumberOfHops {
requested: u8,
}
// if one wants to provide a hardcoded route, they can
impl SphinxRouteMaker for Vec<Node> {
type Error = InvalidNumberOfHops;
fn sphinx_route(
&mut self,
hops: u8,
_destination: &Recipient,
) -> Result<Vec<Node>, InvalidNumberOfHops> {
// it's the responsibility of the caller to ensure the hardcoded route has correct number of hops
// and that it's final hop include the recipient's gateway.
if self.len() != hops as usize {
Err(InvalidNumberOfHops {
available: self.len(),
requested: hops,
})
} else {
Ok(self.clone())
}
}
}
pub fn generate_hop_delays(average_packet_delay: Duration, num_hops: usize) -> Vec<Delay> {
if average_packet_delay.is_zero() {
vec![nym_sphinx_types::Delay::new_from_millis(0); num_hops]
+11 -11
View File
@@ -149,7 +149,7 @@ impl NymMessage {
.collect()
}
fn try_from_bytes(bytes: &[u8], num_mix_hops: u8) -> Result<Self, NymMessageError> {
fn try_from_bytes(bytes: &[u8]) -> Result<Self, NymMessageError> {
if bytes.is_empty() {
return Err(NymMessageError::EmptyMessage);
}
@@ -158,7 +158,7 @@ impl NymMessage {
match typ_tag {
NymMessageType::Plain => Ok(NymMessage::Plain(bytes[1..].to_vec())),
NymMessageType::Repliable => Ok(NymMessage::Repliable(
RepliableMessage::try_from_bytes(&bytes[1..], num_mix_hops)?,
RepliableMessage::try_from_bytes(&bytes[1..])?,
)),
NymMessageType::Reply => Ok(NymMessage::Reply(ReplyMessage::try_from_bytes(
&bytes[1..],
@@ -166,10 +166,10 @@ impl NymMessage {
}
}
fn serialized_size(&self, num_mix_hops: u8) -> usize {
fn serialized_size(&self) -> usize {
let inner_size = match self {
NymMessage::Plain(msg) => msg.len(),
NymMessage::Repliable(msg) => msg.serialized_size(num_mix_hops),
NymMessage::Repliable(msg) => msg.serialized_size(),
NymMessage::Reply(msg) => msg.serialized_size(),
};
let message_type_size = 1;
@@ -207,9 +207,9 @@ impl NymMessage {
}
/// Determines the number of required packets of the provided size for the split message.
pub fn required_packets(&self, packet_size: PacketSize, num_mix_hops: u8) -> usize {
pub fn required_packets(&self, packet_size: PacketSize) -> usize {
let plaintext_per_packet = self.true_available_plaintext_per_packet(packet_size);
let serialized_len = self.serialized_size(num_mix_hops);
let serialized_len = self.serialized_size();
let (num_fragments, _) =
chunking::number_of_required_fragments(serialized_len, plaintext_per_packet);
@@ -279,11 +279,11 @@ impl PaddedMessage {
}
// reverse of NymMessage::pad_to_full_packet_lengths
pub fn remove_padding(self, num_mix_hops: u8) -> Result<NymMessage, NymMessageError> {
pub fn remove_padding(self) -> Result<NymMessage, NymMessageError> {
// we are looking for first occurrence of 1 in the tail and we get its index
if let Some(padding_end) = self.0.iter().rposition(|b| *b == 1) {
// and now we only take bytes until that point (but not including it)
NymMessage::try_from_bytes(&self.0[..padding_end], num_mix_hops)
NymMessage::try_from_bytes(&self.0[..padding_end])
} else {
Err(NymMessageError::InvalidMessagePadding)
}
@@ -304,7 +304,7 @@ mod tests {
fn serialized_size_matches_actual_serialization() {
// plain
let plain = NymMessage::new_plain(vec![1, 2, 3, 4, 5]);
assert_eq!(plain.serialized_size(3), plain.into_bytes().len());
assert_eq!(plain.serialized_size(), plain.into_bytes().len());
// a single variant for each repliable and reply is enough as they are more thoroughly tested
// internally
@@ -313,9 +313,9 @@ mod tests {
[42u8; 16].into(),
vec![],
));
assert_eq!(repliable.serialized_size(3), repliable.into_bytes().len());
assert_eq!(repliable.serialized_size(), repliable.into_bytes().len());
let reply = NymMessage::new_reply(ReplyMessage::new_data_message(vec![1, 2, 3, 4, 5]));
assert_eq!(reply.serialized_size(3), reply.into_bytes().len());
assert_eq!(reply.serialized_size(), reply.into_bytes().len());
}
}
+16 -37
View File
@@ -14,9 +14,9 @@ use nym_sphinx_anonymous_replies::reply_surb::ReplySurb;
use nym_sphinx_chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
use nym_sphinx_types::{Delay, NymPacket};
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
@@ -54,14 +54,13 @@ pub trait FragmentPreparer {
fn deterministic_route_selection(&self) -> bool;
fn rng(&mut self) -> &mut Self::Rng;
fn nonce(&self) -> i32;
fn num_mix_hops(&self) -> u8;
fn average_packet_delay(&self) -> Duration;
fn average_ack_delay(&self) -> Duration;
fn generate_reply_surbs(
&mut self,
amount: usize,
topology: &NymTopology,
topology: &NymRouteProvider,
reply_recipient: &Recipient,
) -> Result<Vec<ReplySurb>, NymTopologyError> {
let mut reply_surbs = Vec::with_capacity(amount);
@@ -79,7 +78,7 @@ pub trait FragmentPreparer {
&mut self,
recipient: &Recipient,
fragment_id: FragmentIdentifier,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_type: PacketType,
) -> Result<SurbAck, NymTopologyError> {
@@ -109,7 +108,7 @@ pub trait FragmentPreparer {
fn prepare_reply_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
reply_surb: ReplySurb,
packet_sender: &Recipient,
@@ -130,9 +129,8 @@ pub trait FragmentPreparer {
.expect("the message has been incorrectly fragmented");
// this is not going to be accurate by any means. but that's the best estimation we can do
let expected_forward_delay = Delay::new_from_millis(
(self.average_packet_delay().as_millis() * self.num_mix_hops() as u128) as u64,
);
let expected_forward_delay =
Delay::new_from_millis((self.average_packet_delay().as_millis() * 3) as u64);
let fragment_identifier = fragment.fragment_identifier();
@@ -190,12 +188,11 @@ pub trait FragmentPreparer {
fn prepare_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_sender: &Recipient,
packet_recipient: &Recipient,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, NymTopologyError> {
debug!("Preparing chunk for sending");
// each plain or repliable packet (i.e. not a reply) attaches an ephemeral public key so that the recipient
@@ -204,8 +201,7 @@ pub trait FragmentPreparer {
let fragment_header = fragment.header();
let destination = packet_recipient.gateway();
let hops = mix_hops.unwrap_or(self.num_mix_hops());
monitoring::fragment_sent(&fragment, self.nonce(), *destination, hops);
monitoring::fragment_sent(&fragment, self.nonce(), destination);
let non_reply_overhead = encryption::PUBLIC_KEY_SIZE;
let expected_plaintext = match packet_type {
@@ -240,16 +236,16 @@ pub trait FragmentPreparer {
};
// generate pseudorandom route for the packet
log::trace!("Preparing chunk for sending with {hops} mix hops");
log::trace!("Preparing chunk for sending");
let route = if self.deterministic_route_selection() {
log::trace!("using deterministic route selection");
let seed = fragment_header.seed().wrapping_mul(self.nonce());
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
topology.random_route_to_gateway(&mut rng, hops, destination)?
topology.random_route_to_egress(&mut rng, destination)?
} else {
log::trace!("using pseudorandom route selection");
let mut rng = self.rng();
topology.random_route_to_gateway(&mut rng, hops, destination)?
topology.random_route_to_egress(&mut rng, destination)?
};
let destination = packet_recipient.as_sphinx_destination();
@@ -335,10 +331,6 @@ pub struct MessagePreparer<R> {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
nonce: i32,
}
@@ -361,17 +353,10 @@ where
sender_address,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
nonce,
}
}
/// Allows setting non-default number of expected mix hops in the network.
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
/// Overwrites existing sender address with the provided value.
pub fn set_sender_address(&mut self, sender_address: Recipient) {
self.sender_address = sender_address;
@@ -380,7 +365,7 @@ where
pub fn generate_reply_surbs(
&mut self,
amount: usize,
topology: &NymTopology,
topology: &NymRouteProvider,
) -> Result<Vec<ReplySurb>, NymTopologyError> {
let mut reply_surbs = Vec::with_capacity(amount);
for _ in 0..amount {
@@ -399,7 +384,7 @@ where
pub fn prepare_reply_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
reply_surb: ReplySurb,
packet_type: PacketType,
@@ -420,11 +405,10 @@ where
pub fn prepare_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_recipient: &Recipient,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, NymTopologyError> {
let sender = self.sender_address;
@@ -436,7 +420,6 @@ where
&sender,
packet_recipient,
packet_type,
mix_hops,
)
}
@@ -444,7 +427,7 @@ where
pub fn generate_surb_ack(
&mut self,
fragment_id: FragmentIdentifier,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_type: PacketType,
) -> Result<SurbAck, NymTopologyError> {
@@ -483,10 +466,6 @@ impl<R: CryptoRng + Rng> FragmentPreparer for MessagePreparer<R> {
self.nonce
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
fn average_packet_delay(&self) -> Duration {
self.average_packet_delay
}
+2 -129
View File
@@ -14,7 +14,6 @@ use nym_sphinx_chunking::reconstruction::MessageReconstructor;
use nym_sphinx_chunking::ChunkingError;
use nym_sphinx_params::{
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, ReplySurbEncryptionAlgorithm,
DEFAULT_NUM_MIX_HOPS,
};
use thiserror::Error;
@@ -79,7 +78,6 @@ pub enum MessageRecoveryError {
pub trait MessageReceiver {
fn new() -> Self;
fn reconstructor(&mut self) -> &mut MessageReconstructor;
fn num_mix_hops(&self) -> u8;
fn decrypt_raw_message<C>(
&self,
@@ -143,7 +141,7 @@ pub trait MessageReceiver {
fragment: Fragment,
) -> Result<Option<(NymMessage, Vec<i32>)>, MessageRecoveryError> {
if let Some((message, used_sets)) = self.reconstructor().insert_new_fragment(fragment) {
match PaddedMessage::new_reconstructed(message).remove_padding(self.num_mix_hops()) {
match PaddedMessage::new_reconstructed(message).remove_padding() {
Ok(message) => Ok(Some((message, used_sets))),
Err(err) => Err(MessageRecoveryError::MalformedReconstructedMessage {
source: err,
@@ -156,28 +154,11 @@ pub trait MessageReceiver {
}
}
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct SphinxMessageReceiver {
/// High level public structure used to buffer all received data [`Fragment`]s and eventually
/// returning original messages that they encapsulate.
reconstructor: MessageReconstructor,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
}
impl SphinxMessageReceiver {
/// Allows setting non-default number of expected mix hops in the network.
// IMPORTANT NOTE: this is among others used to deserialize SURBs. Meaning that this is a
// global setting and currently always set to the default value. The implication is that it is
// not currently possible to have different number of hops for different SURB messages. So,
// don't try to use <3 mix hops for SURBs until this is refactored.
#[must_use]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
}
impl MessageReceiver for SphinxMessageReceiver {
@@ -201,112 +182,4 @@ impl MessageReceiver for SphinxMessageReceiver {
fn reconstructor(&mut self) -> &mut MessageReconstructor {
&mut self.reconstructor
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
}
impl Default for SphinxMessageReceiver {
fn default() -> Self {
SphinxMessageReceiver {
reconstructor: Default::default(),
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
}
}
}
#[cfg(test)]
mod message_receiver {
use super::*;
use nym_crypto::asymmetric::identity;
use nym_mixnet_contract_common::LegacyMixLayer;
use nym_topology::{gateway, mix, NymTopology};
use std::collections::BTreeMap;
// TODO: is it somehow maybe possible to move it to `topology` and have if conditionally
// available to other modules?
/// Returns a hardcoded, valid instance of [`NymTopology`] that is to be used in
/// tests requiring instance of topology.
#[allow(dead_code)]
fn topology_fixture() -> NymTopology {
let mut mixes = BTreeMap::new();
mixes.insert(
1,
vec![mix::LegacyNode {
mix_id: 123,
host: "10.20.30.40".parse().unwrap(),
mix_host: "10.20.30.40:1789".parse().unwrap(),
identity_key: identity::PublicKey::from_base58_string(
"3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"B3GzG62aXAZNg14RoMCp3BhELNBrySLr2JqrwyfYFzRc",
)
.unwrap(),
layer: LegacyMixLayer::One,
version: "0.8.0-dev".into(),
}],
);
mixes.insert(
2,
vec![mix::LegacyNode {
mix_id: 234,
host: "11.21.31.41".parse().unwrap(),
mix_host: "11.21.31.41:1789".parse().unwrap(),
identity_key: identity::PublicKey::from_base58_string(
"D6YaMzLSY7mANtSQRKXsmMZpqgqiVkeiagKM4V4oFPFr",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"5Z1VqYwM2xeKxd8H7fJpGWasNiDFijYBAee7MErkZ5QT",
)
.unwrap(),
layer: LegacyMixLayer::Two,
version: "0.8.0-dev".into(),
}],
);
mixes.insert(
3,
vec![mix::LegacyNode {
mix_id: 456,
host: "12.22.32.42".parse().unwrap(),
mix_host: "12.22.32.42:1789".parse().unwrap(),
identity_key: identity::PublicKey::from_base58_string(
"GkWDysw4AjESv1KiAiVn7JzzCMJeksxNSXVfr1PpX8wD",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"9EyjhCggr2QEA2nakR88YHmXgpy92DWxoe2draDRkYof",
)
.unwrap(),
layer: LegacyMixLayer::Three,
version: "0.8.0-dev".into(),
}],
);
NymTopology::new(
// currently coco_nodes don't really exist so this is still to be determined
mixes,
vec![gateway::LegacyNode {
node_id: 789,
host: "1.2.3.4".parse().unwrap(),
mix_host: "1.2.3.4:1789".parse().unwrap(),
clients_ws_port: 9000,
clients_wss_port: None,
identity_key: identity::PublicKey::from_base58_string(
"FioFa8nMmPpQnYi7JyojoTuwGLeyNS8BF4ChPr29zUML",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"EB42xvMFMD5rUCstE2CDazgQQJ22zLv8SPm1Luxni44c",
)
.unwrap(),
version: "0.8.0-dev".into(),
}],
)
}
}
+99 -12
View File
@@ -42,8 +42,43 @@ impl PendingSync {
}
}
#[derive(Debug, Clone)]
pub struct BlockProcessorConfig {
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub explicit_starting_block_height: Option<u32>,
pub use_best_effort_start_height: bool,
}
impl Default for BlockProcessorConfig {
fn default() -> Self {
Self {
pruning_options: PruningOptions::nothing(),
store_precommits: true,
explicit_starting_block_height: None,
use_best_effort_start_height: false,
}
}
}
impl BlockProcessorConfig {
pub fn new(
pruning_options: PruningOptions,
store_precommits: bool,
explicit_starting_block_height: Option<u32>,
use_best_effort_start_height: bool,
) -> Self {
Self {
pruning_options,
store_precommits,
explicit_starting_block_height,
use_best_effort_start_height,
}
}
}
pub struct BlockProcessor {
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
@@ -65,9 +100,10 @@ pub struct BlockProcessor {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
pub async fn new(
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
@@ -81,8 +117,10 @@ impl BlockProcessor {
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(last_processed_height = %last_processed_height, pruned_height = %last_pruned_height, "setting up block processor...");
Ok(BlockProcessor {
pruning_options,
config,
cancel,
synced,
last_processed_height,
@@ -101,7 +139,7 @@ impl BlockProcessor {
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.pruning_options = pruning_options;
self.config.pruning_options = pruning_options;
self
}
@@ -128,7 +166,7 @@ impl BlockProcessor {
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;
persist_block(&full_info, &mut tx).await?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
// let the modules do whatever they want
// the ones wanting the full block:
@@ -241,7 +279,7 @@ impl BlockProcessor {
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
@@ -282,12 +320,12 @@ impl BlockProcessor {
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.pruning_options.strategy.is_nothing() {
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.pruning_options.strategy_interval();
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
@@ -363,20 +401,69 @@ impl BlockProcessor {
// but we need it to help the compiler figure out the future is `Send`
async fn startup_resync(&mut self) -> Result<(), ScraperError> {
assert!(self.pending_sync.is_empty());
info!("attempting to run startup resync...");
self.maybe_prune_storage().await?;
let latest_block = self.rpc_client.current_block_height().await? as u32;
info!("obtained latest block height: {latest_block}");
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
info!("we have already processed some blocks in the past - attempting to resume...");
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
self.last_processed_height = max(self.last_processed_height, last_to_keep);
if !self.config.pruning_options.strategy.is_nothing() {
self.last_processed_height = max(self.last_processed_height, last_to_keep);
}
let request_range = self.last_processed_height + 1..latest_block + 1;
info!("we need to request {request_range:?} to resync");
info!(
keep_recent = %keep_recent,
last_to_keep = %last_to_keep,
last_processed_height = %self.last_processed_height,
"we need to request {request_range:?} to resync"
);
self.request_missing_blocks(request_range).await?;
return Ok(());
}
// this is the first time starting up
if self.last_processed_height == 0 {
info!("this is the first time starting up");
let Some(starting_height) = self.config.explicit_starting_block_height else {
info!("no starting block height set - will use the default behaviour");
// nothing to do
return Ok(());
};
info!("attempting to start the scraper from block {starting_height}");
let earliest_available =
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");
if earliest_available > starting_height && self.config.use_best_effort_start_height {
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
});
}
let starting_height = if earliest_available > starting_height {
// add few additional blocks to account for all the startup waiting
// because the node might have pruned few blocks since
earliest_available + 10
} else {
starting_height
};
let request_range = starting_height..latest_block + 1;
info!("going to start the scraper from block {starting_height}");
info!("we need to request {request_range:?} before properly starting up");
self.request_missing_blocks(request_range).await?;
}
@@ -384,7 +471,7 @@ impl BlockProcessor {
}
pub(crate) async fn run(&mut self) {
info!("starting processing loop");
info!("starting block processor processing loop");
// sure, we could be more efficient and reset it on every processed block,
// but the overhead is so minimal that it doesn't matter
@@ -84,13 +84,7 @@ impl TryFrom<Event> for BlockToProcess {
// TODO: we're losing `result_begin_block` and `result_end_block` here but maybe that's fine?
let maybe_block = match event.data {
// we don't care about `NewBlock` until CometBFT 0.38, i.e. until we upgrade to wasmd 0.50
EventData::NewBlock { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
query,
kind: "NewBlock".to_string(),
})
}
EventData::NewBlock { block, .. } => block,
EventData::LegacyNewBlock { block, .. } => block,
EventData::Tx { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
+3
View File
@@ -19,6 +19,9 @@ pub enum ScraperError {
#[error("the block scraper is already running")]
ScraperAlreadyRunning,
#[error("block information for height {height} is not available on the provided rpc endpoint")]
BlocksUnavailable { height: u32 },
#[error("failed to establish websocket connection to {url}: {source}")]
WebSocketConnectionFailure {
url: String,
+2 -1
View File
@@ -15,6 +15,7 @@ pub(crate) mod scraper;
pub mod storage;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use block_processor::types::ParsedTransactionResponse;
pub use modules::{BlockModule, MsgModule, TxModule};
pub use scraper::{Config, NyxdScraper};
pub use scraper::{Config, NyxdScraper, StartingBlockOpts};
pub use storage::models;
+11
View File
@@ -117,6 +117,17 @@ impl RpcClient {
Ok(info.last_block_height.value())
}
pub(crate) async fn earliest_available_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting earliest available block height");
let status = self
.inner
.status()
.await
.map_err(|source| ScraperError::AbciInfoQueryFailure { source })?;
Ok(status.sync_info.earliest_block_height.value())
}
async fn get_transaction_results(
&self,
raw: &[Vec<u8>],
+99 -30
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::BlockProcessor;
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
@@ -24,6 +24,15 @@ use url::Url;
mod subscriber;
#[derive(Default, Clone, Copy)]
pub struct StartingBlockOpts {
pub start_block_height: Option<u32>,
/// If the scraper fails to start from the desired height, rather than failing,
/// attempt to use the next available height
pub use_best_effort_start_height: bool,
}
pub struct Config {
/// Url to the websocket endpoint of a validator, for example `wss://rpc.nymtech.net/websocket`
pub websocket_url: Url,
@@ -34,6 +43,10 @@ pub struct Config {
pub database_path: PathBuf,
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub start_block: StartingBlockOpts,
}
pub struct NyxdScraperBuilder {
@@ -60,8 +73,16 @@ impl NyxdScraperBuilder {
req_rx,
processing_tx.clone(),
);
let mut block_processor = BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
scraper.config.pruning_options,
scraper.config.store_precommits,
scraper.config.start_block.start_block_height,
scraper.config.start_block.use_best_effort_start_height,
);
let mut block_processor = BlockProcessor::new(
block_processor_config,
scraper.cancel_token.clone(),
scraper.startup_sync.clone(),
processing_rx,
@@ -118,7 +139,7 @@ pub struct NyxdScraper {
task_tracker: TaskTracker,
cancel_token: CancellationToken,
startup_sync: Arc<Notify>,
pub storage: ScraperStorage,
storage: ScraperStorage,
rpc_client: RpcClient,
}
@@ -142,6 +163,10 @@ impl NyxdScraper {
})
}
pub fn storage(&self) -> ScraperStorage {
self.storage.clone()
}
fn start_tasks(
&self,
mut block_requester: BlockRequester,
@@ -158,7 +183,10 @@ impl NyxdScraper {
self.task_tracker.close();
}
pub async fn process_single_block(&self, height: u32) -> Result<(), ScraperError> {
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_single_block(&self, height: u32) -> Result<(), ScraperError> {
info!(height = height, "attempting to process a single block");
if !self.task_tracker.is_empty() {
return Err(ScraperError::ScraperAlreadyRunning);
@@ -177,7 +205,10 @@ impl NyxdScraper {
block_processor.process_block(block.into()).await
}
pub async fn process_block_range(
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_block_range(
&self,
starting_height: Option<u32>,
end_height: Option<u32>,
@@ -194,10 +225,10 @@ impl NyxdScraper {
.await?
.with_pruning(PruningOptions::nothing());
let current_height = self.rpc_client.current_block_height().await? as u32;
let mut current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
let starting_height = match starting_height {
let mut starting_height = match starting_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -211,7 +242,8 @@ impl NyxdScraper {
}
};
let end_height = match end_height {
let must_catch_up = end_height.is_none();
let mut end_height = match end_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -226,32 +258,62 @@ impl NyxdScraper {
}
};
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
let mut last_processed = starting_height;
let range = (starting_height..=end_height).collect::<Vec<_>>();
while last_processed < current_height {
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
let range = (starting_height..=end_height).collect::<Vec<_>>();
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
}
}
}
}
// if we don't need to catch up, return early
if !must_catch_up {
return Ok(());
}
// check if we have caught up to the current block height
last_processed = end_height;
current_height = self.rpc_client.current_block_height().await? as u32;
info!(
last_processed = last_processed,
current_height = current_height,
"🏃 still need to catch up..."
);
starting_height = last_processed + 1;
end_height = current_height;
}
if must_catch_up {
info!(
last_processed = last_processed,
current_height = current_height,
"✅ block processing has caught up!"
);
}
Ok(())
@@ -275,8 +337,15 @@ impl NyxdScraper {
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
self.config.store_precommits,
self.config.start_block.start_block_height,
self.config.start_block.use_best_effort_start_height,
);
BlockProcessor::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
+11 -1
View File
@@ -3,6 +3,7 @@
use crate::block_processor::types::BlockToProcess;
use crate::error::ScraperError;
use tendermint_rpc::client::CompatMode;
use tendermint_rpc::event::Event;
use tendermint_rpc::query::EventType;
use tendermint_rpc::{SubscriptionClient, WebSocketClient, WebSocketClientDriver};
@@ -38,7 +39,16 @@ impl ChainSubscriber {
) -> Result<Self, ScraperError> {
// sure, we could have just used websocket client entirely, but let's keep the logic for
// getting current blocks and historical blocks completely separate with the dual connection
let (client, driver) = WebSocketClient::new(websocket_endpoint.as_str())
let websocket_url = websocket_endpoint.as_str().try_into().map_err(|source| {
ScraperError::WebSocketConnectionFailure {
url: websocket_endpoint.to_string(),
source,
}
})?;
let (client, driver) = WebSocketClient::builder(websocket_url)
.compat_mode(CompatMode::V0_37)
.build()
.await
.map_err(|source| ScraperError::WebSocketConnectionFailure {
url: websocket_endpoint.to_string(),
+21 -6
View File
@@ -13,7 +13,11 @@ use crate::{
models::{CommitSignature, Validator},
},
};
use sqlx::{types::time::OffsetDateTime, ConnectOptions, Sqlite, Transaction};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
types::time::OffsetDateTime,
ConnectOptions, Sqlite, Transaction,
};
use std::{fmt::Debug, path::Path};
use tendermint::{
block::{Commit, CommitSig},
@@ -50,7 +54,16 @@ pub(crate) fn log_db_operation_time(op_name: &str, start_time: Instant) {
impl ScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(database_path: P) -> Result<Self, ScraperError> {
let database_path = database_path.as_ref();
debug!(
"initialising scraper database path to '{}'",
database_path.display()
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -212,6 +225,7 @@ impl ScraperStorage {
pub async fn persist_block(
block: &FullBlockInformation,
tx: &mut StorageTransaction,
store_precommits: bool,
) -> Result<(), ScraperError> {
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
@@ -224,11 +238,12 @@ pub async fn persist_block(
// persist block data
persist_block_data(&block.block, total_gas, tx).await?;
// persist commits
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
if store_precommits {
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
@@ -428,7 +428,7 @@ impl PacketStatisticsControl {
while self
.history
.front()
.map_or(false, |&(t, _)| t < recording_window)
.is_some_and(|&(t, _)| t < recording_window)
{
self.history.pop_front();
}
@@ -462,7 +462,7 @@ impl PacketStatisticsControl {
while self
.rates
.front()
.map_or(false, |&(t, _)| t < recording_window)
.is_some_and(|&(t, _)| t < recording_window)
{
self.rates.pop_front();
}
+8
View File
@@ -58,6 +58,10 @@ pub enum GatewaySessionEvent {
/// Address of the remote client opening the connection
client: DestinationAddressBytes,
},
SessionDelete {
/// Address of the remote client opening the connection
client: DestinationAddressBytes,
},
}
impl GatewaySessionEvent {
@@ -87,4 +91,8 @@ impl GatewaySessionEvent {
client,
}
}
pub fn new_session_delete(client: DestinationAddressBytes) -> GatewaySessionEvent {
GatewaySessionEvent::SessionDelete { client }
}
}
+4 -7
View File
@@ -12,15 +12,13 @@ documentation = { workspace = true }
[dependencies]
async-trait = { workspace = true, optional = true }
bs58 = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
semver = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
# 'serializable' feature
serde = { workspace = true, features = ["derive"], optional = true }
# 'serde' feature
serde_json = { workspace = true, optional = true }
# 'wasm-serde-types' feature
@@ -28,7 +26,6 @@ tsify = { workspace = true, features = ["js"], optional = true }
wasm-bindgen = { workspace = true, optional = true }
## internal
nym-bin-common = { path = "../bin-common" }
nym-config = { path = "../config" }
nym-crypto = { path = "../crypto", features = ["sphinx", "outfox"] }
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
@@ -51,5 +48,5 @@ wasm-utils = { path = "../wasm/utils", default-features = false, optional = true
default = ["provider-trait"]
provider-trait = ["async-trait"]
wasm-serde-types = ["tsify", "wasm-bindgen", "wasm-utils"]
serializable = ["serde", "serde_json"]
persistence = ["serde_json"]
outfox = []
+12 -8
View File
@@ -4,18 +4,28 @@
use std::array::TryFromSliceError;
use crate::MixLayer;
use nym_sphinx_addressing::NodeIdentity;
use nym_sphinx_types::NymPacketError;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NymTopologyError {
#[error("The provided network topology is empty - there are no mixnodes and no gateways on it - the network request(s) probably failed")]
#[error("the provided network topology is empty - there are no valid nodes on it - the network request(s) probably failed")]
EmptyNetworkTopology,
#[error("no node with identity {node_identity} is known")]
NonExistentNode { node_identity: Box<NodeIdentity> },
#[error("could not use node with identity {node_identity} as egress since it didn't get assigned valid role in the current epoch")]
InvalidEgressRole { node_identity: Box<NodeIdentity> },
#[error("one (or more) of mixing layers does not have any valid nodes available")]
InsufficientMixingNodes,
#[error("The provided network topology has no gateways available")]
NoGatewaysAvailable,
#[error("The provided network topology has no mixnodes available")]
#[error("The provided network topology has no valid mixnodes available")]
NoMixnodesAvailable,
#[error("Gateway with identity key {identity_key} doesn't exist")]
@@ -55,12 +65,6 @@ pub enum NymTopologyError {
#[error("{0}")]
ReqwestError(#[from] reqwest::Error),
#[error("{0}")]
MixnodeConversionError(#[from] crate::mix::MixnodeConversionError),
#[error("{0}")]
GatewayConversionError(#[from] crate::gateway::GatewayConversionError),
#[error("{0}")]
VarError(#[from] std::env::VarError),
}
-174
View File
@@ -1,174 +0,0 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{NetworkAddress, NodeVersion};
use nym_api_requests::nym_nodes::SkimmedNode;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_contract_common::NodeId;
use nym_sphinx_addressing::nodes::{NodeIdentity, NymNodeRoutingAddress};
use nym_sphinx_types::Node as SphinxNode;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::fmt;
use std::fmt::Formatter;
use std::io;
use std::net::AddrParseError;
use std::net::SocketAddr;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum GatewayConversionError {
#[error("gateway identity key was malformed - {0}")]
InvalidIdentityKey(#[from] identity::Ed25519RecoveryError),
#[error("gateway sphinx key was malformed - {0}")]
InvalidSphinxKey(#[from] encryption::KeyRecoveryError),
#[error("'{value}' is not a valid gateway address - {source}")]
InvalidAddress {
value: String,
#[source]
source: io::Error,
},
#[error("'{gateway}' has not provided any valid ip addresses")]
NoIpAddressesProvided { gateway: String },
#[error("'{gateway}' has provided a malformed ip address: {err}")]
MalformedIpAddress {
gateway: String,
#[source]
err: AddrParseError,
},
#[error("provided node is not an entry gateway in this epoch!")]
NotGateway,
}
#[derive(Clone)]
pub struct LegacyNode {
pub node_id: NodeId,
pub host: NetworkAddress,
// we're keeping this as separate resolved field since we do not want to be resolving the potential
// hostname every time we want to construct a path via this node
pub mix_host: SocketAddr,
// #[serde(alias = "clients_port")]
pub clients_ws_port: u16,
// #[serde(default)]
pub clients_wss_port: Option<u16>,
pub identity_key: identity::PublicKey,
pub sphinx_key: encryption::PublicKey, // TODO: or nymsphinx::PublicKey? both are x25519
// to be removed:
pub version: NodeVersion,
}
impl std::fmt::Debug for LegacyNode {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("gateway::Node")
.field("host", &self.host)
.field("mix_host", &self.mix_host)
.field("clients_ws_port", &self.clients_ws_port)
.field("clients_wss_port", &self.clients_wss_port)
.field("identity_key", &self.identity_key.to_base58_string())
.field("sphinx_key", &self.sphinx_key.to_base58_string())
.field("version", &self.version)
.finish()
}
}
impl LegacyNode {
pub fn parse_host(raw: &str) -> Result<NetworkAddress, GatewayConversionError> {
// safety: this conversion is infallible
// (but we retain result return type for legacy reasons)
Ok(raw.parse().unwrap())
}
pub fn extract_mix_host(
host: &NetworkAddress,
mix_port: u16,
) -> Result<SocketAddr, GatewayConversionError> {
Ok(host.to_socket_addrs(mix_port).map_err(|err| {
GatewayConversionError::InvalidAddress {
value: host.to_string(),
source: err,
}
})?[0])
}
pub fn identity(&self) -> &NodeIdentity {
&self.identity_key
}
pub fn clients_address(&self) -> String {
self.clients_address_tls()
.unwrap_or_else(|| self.clients_address_no_tls())
}
pub fn clients_address_no_tls(&self) -> String {
format!("ws://{}:{}", self.host, self.clients_ws_port)
}
pub fn clients_address_tls(&self) -> Option<String> {
self.clients_wss_port
.map(|p| format!("wss://{}:{p}", self.host))
}
}
impl fmt::Display for LegacyNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "legacy gateway {} @ {}", self.node_id, self.host)
}
}
impl<'a> From<&'a LegacyNode> for SphinxNode {
fn from(node: &'a LegacyNode) -> Self {
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host)
.try_into()
.unwrap();
SphinxNode::new(node_address_bytes, (&node.sphinx_key).into())
}
}
impl<'a> TryFrom<&'a SkimmedNode> for LegacyNode {
type Error = GatewayConversionError;
fn try_from(value: &'a SkimmedNode) -> Result<Self, Self::Error> {
let Some(entry_details) = &value.entry else {
return Err(GatewayConversionError::NotGateway);
};
if value.ip_addresses.is_empty() {
return Err(GatewayConversionError::NoIpAddressesProvided {
gateway: value.ed25519_identity_pubkey.to_base58_string(),
});
}
// safety: we just checked the slice is not empty
#[allow(clippy::unwrap_used)]
let ip = value.ip_addresses.choose(&mut thread_rng()).unwrap();
let host = if let Some(hostname) = &entry_details.hostname {
NetworkAddress::Hostname(hostname.to_string())
} else {
NetworkAddress::IpAddr(*ip)
};
Ok(LegacyNode {
node_id: value.node_id,
host,
mix_host: SocketAddr::new(*ip, value.mix_port),
clients_ws_port: entry_details.ws_port,
clients_wss_port: entry_details.wss_port,
identity_key: value.ed25519_identity_pubkey,
sphinx_key: value.x25519_sphinx_pubkey,
version: NodeVersion::Unknown,
})
}
}
+440 -478
View File
File diff suppressed because it is too large Load Diff
-135
View File
@@ -1,135 +0,0 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{NetworkAddress, NodeVersion};
use nym_api_requests::nym_nodes::{NodeRole, SkimmedNode};
use nym_crypto::asymmetric::{encryption, identity};
pub use nym_mixnet_contract_common::LegacyMixLayer;
use nym_mixnet_contract_common::NodeId;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_types::Node as SphinxNode;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::fmt::Formatter;
use std::io;
use std::net::SocketAddr;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MixnodeConversionError {
#[error("mixnode identity key was malformed - {0}")]
InvalidIdentityKey(#[from] identity::Ed25519RecoveryError),
#[error("mixnode sphinx key was malformed - {0}")]
InvalidSphinxKey(#[from] encryption::KeyRecoveryError),
#[error("'{value}' is not a valid mixnode address - {source}")]
InvalidAddress {
value: String,
#[source]
source: io::Error,
},
#[error("invalid mix layer")]
InvalidLayer,
#[error("'{mixnode}' has not provided any valid ip addresses")]
NoIpAddressesProvided { mixnode: String },
#[error("provided node is not a mixnode in this epoch!")]
NotMixnode,
}
#[derive(Clone)]
pub struct LegacyNode {
pub mix_id: NodeId,
pub host: NetworkAddress,
// we're keeping this as separate resolved field since we do not want to be resolving the potential
// hostname every time we want to construct a path via this node
pub mix_host: SocketAddr,
pub identity_key: identity::PublicKey,
pub sphinx_key: encryption::PublicKey, // TODO: or nymsphinx::PublicKey? both are x25519
pub layer: LegacyMixLayer,
// to be removed:
pub version: NodeVersion,
}
impl std::fmt::Debug for LegacyNode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("mix::Node")
.field("mix_id", &self.mix_id)
.field("host", &self.host)
.field("mix_host", &self.mix_host)
.field("identity_key", &self.identity_key.to_base58_string())
.field("sphinx_key", &self.sphinx_key.to_base58_string())
.field("layer", &self.layer)
.field("version", &self.version)
.finish()
}
}
impl LegacyNode {
pub fn parse_host(raw: &str) -> Result<NetworkAddress, MixnodeConversionError> {
// safety: this conversion is infallible
// (but we retain result return type for legacy reasons)
Ok(raw.parse().unwrap())
}
pub fn extract_mix_host(
host: &NetworkAddress,
mix_port: u16,
) -> Result<SocketAddr, MixnodeConversionError> {
Ok(host.to_socket_addrs(mix_port).map_err(|err| {
MixnodeConversionError::InvalidAddress {
value: host.to_string(),
source: err,
}
})?[0])
}
}
impl<'a> From<&'a LegacyNode> for SphinxNode {
fn from(node: &'a LegacyNode) -> Self {
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host)
.try_into()
.unwrap();
SphinxNode::new(node_address_bytes, (&node.sphinx_key).into())
}
}
impl<'a> TryFrom<&'a SkimmedNode> for LegacyNode {
type Error = MixnodeConversionError;
fn try_from(value: &'a SkimmedNode) -> Result<Self, Self::Error> {
if value.ip_addresses.is_empty() {
return Err(MixnodeConversionError::NoIpAddressesProvided {
mixnode: value.ed25519_identity_pubkey.to_base58_string(),
});
}
let layer = match value.role {
NodeRole::Mixnode { layer } => layer
.try_into()
.map_err(|_| MixnodeConversionError::InvalidLayer)?,
_ => return Err(MixnodeConversionError::NotMixnode),
};
// safety: we just checked the slice is not empty
#[allow(clippy::unwrap_used)]
let ip = value.ip_addresses.choose(&mut thread_rng()).unwrap();
let host = NetworkAddress::IpAddr(*ip);
Ok(LegacyNode {
mix_id: value.node_id,
host,
mix_host: SocketAddr::new(*ip, value.mix_port),
identity_key: value.ed25519_identity_pubkey,
sphinx_key: value.x25519_sphinx_pubkey,
layer,
version: NodeVersion::Unknown,
})
}
}
+143
View File
@@ -0,0 +1,143 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_api_requests::models::DeclaredRoles;
use nym_api_requests::nym_nodes::SkimmedNode;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_mixnet_contract_common::NodeId;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_types::Node as SphinxNode;
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, SocketAddr};
use thiserror::Error;
pub use nym_mixnet_contract_common::LegacyMixLayer;
#[derive(Error, Debug)]
pub enum RoutingNodeError {
#[error("node {node_id} ('{identity}') has not provided any valid ip addresses")]
NoIpAddressesProvided { node_id: NodeId, identity: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EntryDetails {
// to allow client to choose ipv6 preference, if available
pub ip_addresses: Vec<IpAddr>,
pub clients_ws_port: u16,
pub hostname: Option<String>,
pub clients_wss_port: Option<u16>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct SupportedRoles {
pub mixnode: bool,
pub mixnet_entry: bool,
pub mixnet_exit: bool,
}
impl From<DeclaredRoles> for SupportedRoles {
fn from(value: DeclaredRoles) -> Self {
SupportedRoles {
mixnode: value.mixnode,
mixnet_entry: value.entry,
mixnet_exit: value.exit_nr && value.exit_ipr,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RoutingNode {
pub node_id: NodeId,
pub mix_host: SocketAddr,
pub entry: Option<EntryDetails>,
pub identity_key: ed25519::PublicKey,
pub sphinx_key: x25519::PublicKey,
pub supported_roles: SupportedRoles,
}
impl RoutingNode {
pub fn ws_entry_address_tls(&self) -> Option<String> {
let entry = self.entry.as_ref()?;
let hostname = entry.hostname.as_ref()?;
let wss_port = entry.clients_wss_port?;
Some(format!("wss://{hostname}:{wss_port}"))
}
pub fn ws_entry_address_no_tls(&self, prefer_ipv6: bool) -> Option<String> {
let entry = self.entry.as_ref()?;
if let Some(hostname) = entry.hostname.as_ref() {
return Some(format!("ws://{hostname}:{}", entry.clients_ws_port));
}
if prefer_ipv6 {
if let Some(ipv6) = entry.ip_addresses.iter().find(|ip| ip.is_ipv6()) {
return Some(format!("ws://{ipv6}:{}", entry.clients_ws_port));
}
}
let any_ip = entry.ip_addresses.first()?;
Some(format!("ws://{any_ip}:{}", entry.clients_ws_port))
}
pub fn ws_entry_address(&self, prefer_ipv6: bool) -> Option<String> {
if let Some(tls) = self.ws_entry_address_tls() {
return Some(tls);
}
self.ws_entry_address_no_tls(prefer_ipv6)
}
pub fn identity(&self) -> ed25519::PublicKey {
self.identity_key
}
}
impl<'a> From<&'a RoutingNode> for SphinxNode {
fn from(node: &'a RoutingNode) -> Self {
// SAFETY: this conversion is infallible as all versions of socket addresses have
// sufficiently small bytes representation to fit inside `NodeAddressBytes`
#[allow(clippy::unwrap_used)]
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host)
.try_into()
.unwrap();
SphinxNode::new(node_address_bytes, (&node.sphinx_key).into())
}
}
impl<'a> TryFrom<&'a SkimmedNode> for RoutingNode {
type Error = RoutingNodeError;
fn try_from(value: &'a SkimmedNode) -> Result<Self, Self::Error> {
// IF YOU EVER ADD "performance" TO RoutingNode,
// MAKE SURE TO UPDATE THE LAZY IMPLEMENTATION OF
// `impl NodeDescriptionTopologyExt for NymNodeDescription`!!!
let Some(first_ip) = value.ip_addresses.first() else {
return Err(RoutingNodeError::NoIpAddressesProvided {
node_id: value.node_id,
identity: value.ed25519_identity_pubkey.to_string(),
});
};
let entry = value.entry.as_ref().map(|entry| EntryDetails {
ip_addresses: value.ip_addresses.clone(),
clients_ws_port: entry.ws_port,
hostname: entry.hostname.clone(),
clients_wss_port: entry.wss_port,
});
Ok(RoutingNode {
node_id: value.node_id,
mix_host: SocketAddr::new(*first_ip, value.mix_port),
entry,
identity_key: value.ed25519_identity_pubkey,
sphinx_key: value.x25519_sphinx_pubkey,
supported_roles: value.supported_roles.into(),
})
}
}
+1 -1
View File
@@ -22,7 +22,7 @@ pub struct HardcodedTopologyProvider {
}
impl HardcodedTopologyProvider {
#[cfg(feature = "serializable")]
#[cfg(feature = "persistence")]
pub fn new_from_file<P: AsRef<std::path::Path>>(path: P) -> std::io::Result<Self> {
NymTopology::new_from_file(path).map(Self::new)
}
@@ -1,30 +0,0 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{NymTopology, NymTopologyError};
use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_routing::SphinxRouteMaker;
use nym_sphinx_types::Node;
use rand::{CryptoRng, Rng};
#[allow(dead_code)]
pub struct NymTopologyRouteProvider<R> {
rng: R,
inner: NymTopology,
}
impl<R> SphinxRouteMaker for NymTopologyRouteProvider<R>
where
R: Rng + CryptoRng,
{
type Error = NymTopologyError;
fn sphinx_route(
&mut self,
hops: u8,
destination: &Recipient,
) -> Result<Vec<Node>, NymTopologyError> {
self.inner
.random_route_to_gateway(&mut self.rng, hops, destination.gateway())
}
}
+122
View File
@@ -0,0 +1,122 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_mixnet_contract_common::nym_node::Role;
use nym_mixnet_contract_common::{EpochId, EpochRewardedSet, NodeId, RewardedSet};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct CachedEpochRewardedSet {
pub epoch_id: EpochId,
pub entry_gateways: HashSet<NodeId>,
pub exit_gateways: HashSet<NodeId>,
pub layer1: HashSet<NodeId>,
pub layer2: HashSet<NodeId>,
pub layer3: HashSet<NodeId>,
pub standby: HashSet<NodeId>,
}
impl From<EpochRewardedSet> for CachedEpochRewardedSet {
fn from(value: EpochRewardedSet) -> Self {
CachedEpochRewardedSet {
epoch_id: value.epoch_id,
entry_gateways: value.assignment.entry_gateways.into_iter().collect(),
exit_gateways: value.assignment.exit_gateways.into_iter().collect(),
layer1: value.assignment.layer1.into_iter().collect(),
layer2: value.assignment.layer2.into_iter().collect(),
layer3: value.assignment.layer3.into_iter().collect(),
standby: value.assignment.standby.into_iter().collect(),
}
}
}
impl From<CachedEpochRewardedSet> for EpochRewardedSet {
fn from(value: CachedEpochRewardedSet) -> Self {
EpochRewardedSet {
epoch_id: value.epoch_id,
assignment: RewardedSet {
entry_gateways: value.entry_gateways.into_iter().collect(),
exit_gateways: value.exit_gateways.into_iter().collect(),
layer1: value.layer1.into_iter().collect(),
layer2: value.layer2.into_iter().collect(),
layer3: value.layer3.into_iter().collect(),
standby: value.standby.into_iter().collect(),
},
}
}
}
impl CachedEpochRewardedSet {
pub fn is_empty(&self) -> bool {
self.entry_gateways.is_empty()
&& self.exit_gateways.is_empty()
&& self.layer1.is_empty()
&& self.layer2.is_empty()
&& self.layer3.is_empty()
&& self.standby.is_empty()
}
pub fn role(&self, node_id: NodeId) -> Option<Role> {
if self.entry_gateways.contains(&node_id) {
Some(Role::EntryGateway)
} else if self.exit_gateways.contains(&node_id) {
Some(Role::ExitGateway)
} else if self.layer1.contains(&node_id) {
Some(Role::Layer1)
} else if self.layer2.contains(&node_id) {
Some(Role::Layer2)
} else if self.layer3.contains(&node_id) {
Some(Role::Layer3)
} else if self.standby.contains(&node_id) {
Some(Role::Standby)
} else {
None
}
}
pub fn legacy_mix_layer(&self, node_id: &NodeId) -> Option<u8> {
if self.layer1.contains(node_id) {
Some(1)
} else if self.layer2.contains(node_id) {
Some(2)
} else if self.layer3.contains(node_id) {
Some(3)
} else {
None
}
}
pub fn is_standby(&self, node_id: &NodeId) -> bool {
self.standby.contains(node_id)
}
pub fn is_active_mixnode(&self, node_id: &NodeId) -> bool {
self.layer1.contains(node_id)
|| self.layer2.contains(node_id)
|| self.layer3.contains(node_id)
}
pub fn gateways(&self) -> HashSet<NodeId> {
let mut gateways =
HashSet::with_capacity(self.entry_gateways.len() + self.exit_gateways.len());
gateways.extend(&self.entry_gateways);
gateways.extend(&self.exit_gateways);
gateways
}
pub fn active_mixnodes(&self) -> HashSet<NodeId> {
let mut mixnodes =
HashSet::with_capacity(self.layer1.len() + self.layer2.len() + self.layer3.len());
mixnodes.extend(&self.layer1);
mixnodes.extend(&self.layer2);
mixnodes.extend(&self.layer3);
mixnodes
}
}
-262
View File
@@ -1,262 +0,0 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(unknown_lints)]
// clippy::empty_docs is not on stable as of 1.77
// due to the code generated by Tsify
#![allow(clippy::empty_docs)]
use crate::gateway::GatewayConversionError;
use crate::mix::MixnodeConversionError;
use crate::{gateway, mix, MixLayer, NymTopology};
use nym_config::defaults::{DEFAULT_CLIENT_LISTENING_PORT, DEFAULT_MIX_LISTENING_PORT};
use nym_crypto::asymmetric::{encryption, identity};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::net::{IpAddr, SocketAddr};
use thiserror::Error;
#[cfg(feature = "wasm-serde-types")]
use tsify::Tsify;
use nym_mixnet_contract_common::NodeId;
#[cfg(feature = "wasm-serde-types")]
use wasm_bindgen::{prelude::wasm_bindgen, JsValue};
#[cfg(feature = "wasm-serde-types")]
use wasm_utils::error::simple_js_error;
#[derive(Debug, Error)]
pub enum SerializableTopologyError {
#[error("got invalid mix layer {value}. Expected 1, 2 or 3.")]
InvalidMixLayer { value: u8 },
#[error(transparent)]
GatewayConversion(#[from] GatewayConversionError),
#[error(transparent)]
MixnodeConversion(#[from] MixnodeConversionError),
#[error("The provided mixnode map was malformed: {msg}")]
MalformedMixnodeMap { msg: String },
#[error("The provided gateway list was malformed: {msg}")]
MalformedGatewayList { msg: String },
}
#[cfg(feature = "wasm-serde-types")]
impl From<SerializableTopologyError> for JsValue {
fn from(value: SerializableTopologyError) -> Self {
simple_js_error(value.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "wasm-serde-types", derive(Tsify))]
#[cfg_attr(feature = "wasm-serde-types", tsify(into_wasm_abi, from_wasm_abi))]
#[serde(rename_all = "camelCase")]
#[serde(deny_unknown_fields)]
pub struct SerializableNymTopology {
pub mixnodes: BTreeMap<MixLayer, Vec<SerializableMixNode>>,
pub gateways: Vec<SerializableGateway>,
}
impl TryFrom<SerializableNymTopology> for NymTopology {
type Error = SerializableTopologyError;
fn try_from(value: SerializableNymTopology) -> Result<Self, Self::Error> {
let mut converted_mixes = BTreeMap::new();
for (layer, nodes) in value.mixnodes {
let layer_nodes = nodes
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, _>>()?;
converted_mixes.insert(layer, layer_nodes);
}
let gateways = value
.gateways
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, _>>()?;
Ok(NymTopology::new(converted_mixes, gateways))
}
}
impl From<NymTopology> for SerializableNymTopology {
fn from(value: NymTopology) -> Self {
SerializableNymTopology {
mixnodes: value
.mixes()
.iter()
.map(|(&l, nodes)| (l, nodes.iter().map(Into::into).collect()))
.collect(),
gateways: value.gateways().iter().map(Into::into).collect(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "wasm-serde-types", derive(Tsify))]
#[cfg_attr(feature = "wasm-serde-types", tsify(into_wasm_abi, from_wasm_abi))]
#[serde(rename_all = "camelCase")]
#[serde(deny_unknown_fields)]
pub struct SerializableMixNode {
// this is a `MixId` but due to typescript issue, we're using u32 directly.
#[serde(alias = "mix_id")]
pub mix_id: u32,
pub host: String,
#[cfg_attr(feature = "wasm-serde-types", tsify(optional))]
#[serde(alias = "mix_port")]
pub mix_port: Option<u16>,
#[serde(alias = "identity_key")]
pub identity_key: String,
#[serde(alias = "sphinx_key")]
pub sphinx_key: String,
// this is a `MixLayer` but due to typescript issue, we're using u8 directly.
pub layer: u8,
#[cfg_attr(feature = "wasm-serde-types", tsify(optional))]
pub version: Option<String>,
}
impl TryFrom<SerializableMixNode> for mix::LegacyNode {
type Error = SerializableTopologyError;
fn try_from(value: SerializableMixNode) -> Result<Self, Self::Error> {
let host = mix::LegacyNode::parse_host(&value.host)?;
let mix_port = value.mix_port.unwrap_or(DEFAULT_MIX_LISTENING_PORT);
let version = value.version.map(|v| v.as_str().into()).unwrap_or_default();
// try to completely resolve the host in the mix situation to avoid doing it every
// single time we want to construct a path
let mix_host = mix::LegacyNode::extract_mix_host(&host, mix_port)?;
Ok(mix::LegacyNode {
mix_id: value.mix_id,
host,
mix_host,
identity_key: identity::PublicKey::from_base58_string(&value.identity_key)
.map_err(MixnodeConversionError::from)?,
sphinx_key: encryption::PublicKey::from_base58_string(&value.sphinx_key)
.map_err(MixnodeConversionError::from)?,
layer: mix::LegacyMixLayer::try_from(value.layer)
.map_err(|_| SerializableTopologyError::InvalidMixLayer { value: value.layer })?,
version,
})
}
}
impl<'a> From<&'a mix::LegacyNode> for SerializableMixNode {
fn from(value: &'a mix::LegacyNode) -> Self {
SerializableMixNode {
mix_id: value.mix_id,
host: value.host.to_string(),
mix_port: Some(value.mix_host.port()),
identity_key: value.identity_key.to_base58_string(),
sphinx_key: value.sphinx_key.to_base58_string(),
layer: value.layer.into(),
version: Some(value.version.to_string()),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "wasm-serde-types", derive(Tsify))]
#[cfg_attr(feature = "wasm-serde-types", tsify(into_wasm_abi, from_wasm_abi))]
#[serde(rename_all = "camelCase")]
#[serde(deny_unknown_fields)]
pub struct SerializableGateway {
pub host: String,
pub node_id: NodeId,
// optional ip address in the case of host being a hostname that can't be resolved
// (thank you wasm)
#[cfg_attr(feature = "wasm-serde-types", tsify(optional))]
#[serde(alias = "explicit_ip")]
pub explicit_ip: Option<IpAddr>,
#[cfg_attr(feature = "wasm-serde-types", tsify(optional))]
#[serde(alias = "mix_port")]
pub mix_port: Option<u16>,
#[cfg_attr(feature = "wasm-serde-types", tsify(optional))]
#[serde(alias = "clients_port")]
#[serde(alias = "clients_ws_port")]
pub clients_ws_port: Option<u16>,
#[cfg_attr(feature = "wasm-serde-types", tsify(optional))]
#[serde(alias = "clients_wss_port")]
pub clients_wss_port: Option<u16>,
#[serde(alias = "identity_key")]
pub identity_key: String,
#[serde(alias = "sphinx_key")]
pub sphinx_key: String,
#[cfg_attr(feature = "wasm-serde-types", tsify(optional))]
pub version: Option<String>,
}
impl TryFrom<SerializableGateway> for gateway::LegacyNode {
type Error = SerializableTopologyError;
fn try_from(value: SerializableGateway) -> Result<Self, Self::Error> {
let host = gateway::LegacyNode::parse_host(&value.host)?;
let mix_port = value.mix_port.unwrap_or(DEFAULT_MIX_LISTENING_PORT);
let clients_ws_port = value
.clients_ws_port
.unwrap_or(DEFAULT_CLIENT_LISTENING_PORT);
let version = value.version.map(|v| v.as_str().into()).unwrap_or_default();
// try to completely resolve the host in the mix situation to avoid doing it every
// single time we want to construct a path
let mix_host = if let Some(explicit_ip) = value.explicit_ip {
SocketAddr::new(explicit_ip, mix_port)
} else {
gateway::LegacyNode::extract_mix_host(&host, mix_port)?
};
Ok(gateway::LegacyNode {
node_id: value.node_id,
host,
mix_host,
clients_ws_port,
clients_wss_port: value.clients_wss_port,
identity_key: identity::PublicKey::from_base58_string(&value.identity_key)
.map_err(GatewayConversionError::from)?,
sphinx_key: encryption::PublicKey::from_base58_string(&value.sphinx_key)
.map_err(GatewayConversionError::from)?,
version,
})
}
}
impl<'a> From<&'a gateway::LegacyNode> for SerializableGateway {
fn from(value: &'a gateway::LegacyNode) -> Self {
SerializableGateway {
host: value.host.to_string(),
node_id: value.node_id,
explicit_ip: Some(value.mix_host.ip()),
mix_port: Some(value.mix_host.port()),
clients_ws_port: Some(value.clients_ws_port),
clients_wss_port: value.clients_wss_port,
identity_key: value.identity_key.to_base58_string(),
sphinx_key: value.sphinx_key.to_base58_string(),
version: Some(value.version.to_string()),
}
}
}
+123
View File
@@ -0,0 +1,123 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// due to the code generated by Tsify
#![allow(clippy::empty_docs)]
use crate::node::{EntryDetails, RoutingNode, RoutingNodeError, SupportedRoles};
use crate::{CachedEpochRewardedSet, NymTopology};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use thiserror::Error;
use tsify::Tsify;
use wasm_bindgen::{prelude::wasm_bindgen, JsValue};
use wasm_utils::error::simple_js_error;
#[derive(Debug, Error)]
pub enum SerializableTopologyError {
#[error(transparent)]
NodeConversion(#[from] RoutingNodeError),
#[error("{provided} is not a valid ed25519 public key")]
MalformedIdentity { provided: String },
#[error("{provided} is not a valid x25519 public key")]
MalformedSphinxKey { provided: String },
}
#[cfg(feature = "wasm-serde-types")]
impl From<SerializableTopologyError> for JsValue {
fn from(value: SerializableTopologyError) -> Self {
simple_js_error(value.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Tsify)]
#[tsify(into_wasm_abi, from_wasm_abi)]
#[serde(rename_all = "camelCase")]
#[serde(deny_unknown_fields)]
pub struct WasmFriendlyNymTopology {
pub rewarded_set: CachedEpochRewardedSet,
pub node_details: HashMap<u32, WasmFriendlyRoutingNode>,
}
impl TryFrom<WasmFriendlyNymTopology> for NymTopology {
type Error = SerializableTopologyError;
fn try_from(value: WasmFriendlyNymTopology) -> Result<Self, Self::Error> {
let node_details = value
.node_details
.into_values()
.map(|details| details.try_into())
.collect::<Result<_, _>>()?;
Ok(NymTopology::new(value.rewarded_set, node_details))
}
}
impl From<NymTopology> for WasmFriendlyNymTopology {
fn from(value: NymTopology) -> Self {
WasmFriendlyNymTopology {
rewarded_set: value.rewarded_set,
node_details: value
.node_details
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Tsify)]
#[tsify(into_wasm_abi, from_wasm_abi)]
#[serde(rename_all = "camelCase")]
#[serde(deny_unknown_fields)]
pub struct WasmFriendlyRoutingNode {
pub node_id: u32,
pub mix_host: SocketAddr,
pub entry: Option<EntryDetails>,
pub identity_key: String,
pub sphinx_key: String,
pub supported_roles: SupportedRoles,
}
impl TryFrom<WasmFriendlyRoutingNode> for RoutingNode {
type Error = SerializableTopologyError;
fn try_from(value: WasmFriendlyRoutingNode) -> Result<Self, Self::Error> {
Ok(RoutingNode {
node_id: value.node_id,
mix_host: value.mix_host,
entry: value.entry,
identity_key: value.identity_key.as_str().parse().map_err(|_| {
SerializableTopologyError::MalformedIdentity {
provided: value.identity_key,
}
})?,
sphinx_key: value.sphinx_key.as_str().parse().map_err(|_| {
SerializableTopologyError::MalformedIdentity {
provided: value.sphinx_key,
}
})?,
supported_roles: value.supported_roles,
})
}
}
impl From<RoutingNode> for WasmFriendlyRoutingNode {
fn from(node: RoutingNode) -> Self {
WasmFriendlyRoutingNode {
node_id: node.node_id,
mix_host: node.mix_host,
entry: node.entry,
identity_key: node.identity_key.to_string(),
sphinx_key: node.sphinx_key.to_string(),
supported_roles: node.supported_roles,
}
}
}
+2 -2
View File
@@ -29,10 +29,10 @@ nym-credential-storage = { path = "../../credential-storage" }
nym-crypto = { path = "../../crypto", features = ["asymmetric", "serde"] }
nym-gateway-client = { path = "../../client-libs/gateway-client", default-features = false, features = ["wasm"] }
nym-sphinx = { path = "../../nymsphinx" }
nym-sphinx-acknowledgements = { path = "../../nymsphinx/acknowledgements", features = ["serde"]}
nym-sphinx-acknowledgements = { path = "../../nymsphinx/acknowledgements", features = ["serde"] }
nym-statistics-common = { path = "../../statistics" }
nym-task = { path = "../../task" }
nym-topology = { path = "../../topology", features = ["serializable", "wasm-serde-types"] }
nym-topology = { path = "../../topology", features = ["wasm-serde-types"] }
nym-validator-client = { path = "../../client-libs/validator-client", default-features = false }
wasm-utils = { path = "../utils" }
wasm-storage = { path = "../storage" }
+13
View File
@@ -387,6 +387,15 @@ pub struct TopologyWasm {
/// Specifies a minimum performance of a gateway that is used on route construction.
/// This setting is only applicable when `NymApi` topology is used.
pub minimum_gateway_performance: u8,
/// Specifies whether this client should attempt to retrieve all available network nodes
/// as opposed to just active mixnodes/gateways.
/// Useless without `ignore_epoch_roles = true`
pub use_extended_topology: bool,
/// Specifies whether this client should ignore the current epoch role of the target egress node
/// when constructing the final hop packets.
pub ignore_egress_epoch_role: bool,
}
impl Default for TopologyWasm {
@@ -409,6 +418,8 @@ impl From<TopologyWasm> for ConfigTopology {
topology_structure: Default::default(),
minimum_mixnode_performance: topology.minimum_mixnode_performance,
minimum_gateway_performance: topology.minimum_gateway_performance,
use_extended_topology: topology.use_extended_topology,
ignore_egress_epoch_role: topology.ignore_egress_epoch_role,
}
}
}
@@ -424,6 +435,8 @@ impl From<ConfigTopology> for TopologyWasm {
disable_refreshing: topology.disable_refreshing,
minimum_mixnode_performance: topology.minimum_mixnode_performance,
minimum_gateway_performance: topology.minimum_gateway_performance,
use_extended_topology: topology.use_extended_topology,
ignore_egress_epoch_role: topology.ignore_egress_epoch_role,
}
}
}
@@ -271,6 +271,17 @@ pub struct TopologyWasmOverride {
/// This setting is only applicable when `NymApi` topology is used.
#[tsify(optional)]
pub minimum_gateway_performance: Option<u8>,
/// Specifies whether this client should attempt to retrieve all available network nodes
/// as opposed to just active mixnodes/gateways.
/// Useless without `ignore_epoch_roles = true`
#[tsify(optional)]
pub use_extended_topology: Option<bool>,
/// Specifies whether this client should ignore the current epoch role of the target egress node
/// when constructing the final hop packets.
#[tsify(optional)]
pub ignore_egress_epoch_role: Option<bool>,
}
impl From<TopologyWasmOverride> for TopologyWasm {
@@ -294,6 +305,12 @@ impl From<TopologyWasmOverride> for TopologyWasm {
minimum_gateway_performance: value
.minimum_gateway_performance
.unwrap_or(def.minimum_gateway_performance),
use_extended_topology: value
.use_extended_topology
.unwrap_or(def.use_extended_topology),
ignore_egress_epoch_role: value
.ignore_egress_epoch_role
.unwrap_or(def.ignore_egress_epoch_role),
}
}
}
+13 -7
View File
@@ -15,7 +15,8 @@ use nym_client_core::init::{
};
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_topology::{gateway, NymTopology, SerializableNymTopology};
use nym_topology::wasm_helpers::WasmFriendlyNymTopology;
use nym_topology::{NymTopology, RoutingNode};
use nym_validator_client::client::IdentityKey;
use nym_validator_client::NymApiClient;
use rand::thread_rng;
@@ -55,7 +56,7 @@ pub fn parse_sender_tag(tag: &str) -> Result<AnonymousSenderTag, WasmCoreError>
pub async fn current_network_topology_async(
nym_api_url: String,
) -> Result<SerializableNymTopology, WasmCoreError> {
) -> Result<WasmFriendlyNymTopology, WasmCoreError> {
let url: Url = match nym_api_url.parse() {
Ok(url) => url,
Err(source) => {
@@ -67,12 +68,17 @@ pub async fn current_network_topology_async(
};
let api_client = NymApiClient::new(url);
let rewarded_set = api_client.get_current_rewarded_set().await?;
let mixnodes = api_client
.get_all_basic_active_mixing_assigned_nodes()
.await?;
let gateways = api_client.get_all_basic_entry_assigned_nodes().await?;
Ok(NymTopology::from_basic(&mixnodes, &gateways).into())
let mut topology = NymTopology::new_empty(rewarded_set);
topology.add_skimmed_nodes(&mixnodes);
topology.add_skimmed_nodes(&gateways);
Ok(topology.into())
}
#[wasm_bindgen(js_name = "currentNetworkTopology")]
@@ -90,7 +96,7 @@ pub async fn setup_gateway_wasm(
client_store: &ClientStorage,
force_tls: bool,
chosen_gateway: Option<IdentityKey>,
gateways: &[gateway::LegacyNode],
gateways: Vec<RoutingNode>,
) -> Result<InitialisationResult, WasmCoreError> {
// TODO: so much optimization and extra features could be added here, but that's for the future
@@ -107,7 +113,7 @@ pub async fn setup_gateway_wasm(
GatewaySetup::New {
specification: selection_spec,
available_gateways: gateways.to_vec(),
available_gateways: gateways,
}
};
@@ -125,7 +131,7 @@ pub async fn setup_gateway_from_api(
) -> Result<InitialisationResult, WasmCoreError> {
let mut rng = thread_rng();
let gateways = current_gateways(&mut rng, nym_apis, None, minimum_performance).await?;
setup_gateway_wasm(client_store, force_tls, chosen_gateway, &gateways).await
setup_gateway_wasm(client_store, force_tls, chosen_gateway, gateways).await
}
pub async fn setup_from_topology(
@@ -134,6 +140,6 @@ pub async fn setup_from_topology(
topology: &NymTopology,
client_store: &ClientStorage,
) -> Result<InitialisationResult, WasmCoreError> {
let gateways = topology.gateways();
let gateways = topology.entry_capable_nodes().cloned().collect::<Vec<_>>();
setup_gateway_wasm(client_store, force_tls, explicit_gateway, gateways).await
}
+8 -33
View File
@@ -1,51 +1,26 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_topology::SerializableTopologyError;
use nym_topology::wasm_helpers::SerializableTopologyError;
use nym_validator_client::client::IdentityKeyRef;
use wasm_utils::console_log;
pub use nym_topology::{
gateway, mix, SerializableGateway, SerializableMixNode, SerializableNymTopology,
};
pub use nym_topology::wasm_helpers::{WasmFriendlyNymTopology, WasmFriendlyRoutingNode};
pub use nym_topology::{Role, RoutingNode};
// redeclare this as a type alias for easy of use
pub type WasmTopologyError = SerializableTopologyError;
// helper trait to define extra functionality on the external type that we used to have here before
pub trait SerializableTopologyExt {
fn print(&self);
// fn print(&self);
fn ensure_contains_gateway_id(&self, gateway_id: IdentityKeyRef) -> bool;
}
impl SerializableTopologyExt for SerializableNymTopology {
fn print(&self) {
if !self.mixnodes.is_empty() {
console_log!("mixnodes:");
for (layer, nodes) in &self.mixnodes {
console_log!("\tlayer {layer}:");
for node in nodes {
// console_log!("\t\t{} - {}", node.mix_id, node.identity_key)
console_log!("\t\t{} - {:?}", node.mix_id, node)
}
}
} else {
console_log!("NO MIXNODES")
}
if !self.gateways.is_empty() {
console_log!("gateways:");
for gateway in &self.gateways {
// console_log!("\t{}", gateway.identity_key)
console_log!("\t{:?}", gateway)
}
} else {
console_log!("NO GATEWAYS")
}
}
impl SerializableTopologyExt for WasmFriendlyNymTopology {
fn ensure_contains_gateway_id(&self, gateway_id: IdentityKeyRef) -> bool {
self.gateways.iter().any(|g| g.identity_key == gateway_id)
self.node_details
.values()
.any(|node| node.identity_key == gateway_id)
}
}

Some files were not shown because too many files have changed in this diff Show More