Compare commits

...

75 Commits

Author SHA1 Message Date
jmwample 0be860268f compile time config with some type troubles 2025-01-14 12:46:41 -07:00
mfahampshire 11d6ee2fdb update links readme (#5323) 2025-01-09 14:44:45 +00:00
mfahampshire d704c428fc update landing page colour highlight (#5322) 2025-01-09 14:44:21 +00:00
import this bca070c1bd [DOCs]: Readiness for nym-dot-com (#5319)
* url rewrites and redirects

* url rewrites and redirects
2025-01-09 14:44:12 +00:00
Jędrzej Stuczyński 226c040a13 feature: periodically remove stale gateway messages (#5312)
* add timestamp to stored client messages

* removed dead code

* starting node task to remove old messages

* added log for number of removed messages

* debug log on task finishing
2025-01-09 09:03:19 +00:00
dependabot[bot] 3d2914b3e5 build(deps): bump the patch-updates group across 1 directory with 35 updates (#5310)
* build(deps): bump the patch-updates group across 1 directory with 35 updates

Bumps the patch-updates group with 33 updates in the / directory:

| Package | From | To |
| --- | --- | --- |
| [anyhow](https://github.com/dtolnay/anyhow) | `1.0.90` | `1.0.95` |
| [async-trait](https://github.com/dtolnay/async-trait) | `0.1.83` | `0.1.84` |
| [blake3](https://github.com/BLAKE3-team/BLAKE3) | `1.5.4` | `1.5.5` |
| [chrono](https://github.com/chronotope/chrono) | `0.4.38` | `0.4.39` |
| [clap](https://github.com/clap-rs/clap) | `4.5.20` | `4.5.23` |
| [clap_complete](https://github.com/clap-rs/clap) | `4.5.33` | `4.5.40` |
| [comfy-table](https://github.com/nukesor/comfy-table) | `7.1.1` | `7.1.3` |
| [console](https://github.com/console-rs/console) | `0.15.8` | `0.15.10` |
| [const_format](https://github.com/rodrimati1992/const_format_crates) | `0.2.33` | `0.2.34` |
| [csv](https://github.com/BurntSushi/rust-csv) | `1.3.0` | `1.3.1` |
| [flate2](https://github.com/rust-lang/flate2-rs) | `1.0.34` | `1.0.35` |
| [futures-util](https://github.com/rust-lang/futures-rs) | `0.3.30` | `0.3.31` |
| [hyper-util](https://github.com/hyperium/hyper-util) | `0.1.9` | `0.1.10` |
| [indicatif](https://github.com/console-rs/indicatif) | `0.17.8` | `0.17.9` |
| [moka](https://github.com/moka-rs/moka) | `0.12.8` | `0.12.10` |
| [pin-project](https://github.com/taiki-e/pin-project) | `1.1.6` | `1.1.7` |
| [pin-project-lite](https://github.com/taiki-e/pin-project-lite) | `0.2.14` | `0.2.15` |
| [quote](https://github.com/dtolnay/quote) | `1.0.37` | `1.0.38` |
| [semver](https://github.com/dtolnay/semver) | `1.0.23` | `1.0.24` |
| [serde](https://github.com/serde-rs/serde) | `1.0.215` | `1.0.217` |
| [serde_json](https://github.com/serde-rs/json) | `1.0.132` | `1.0.134` |
| [tar](https://github.com/alexcrichton/tar-rs) | `0.4.42` | `0.4.43` |
| [time](https://github.com/time-rs/time) | `0.3.36` | `0.3.37` |
| [tokio-stream](https://github.com/tokio-rs/tokio) | `0.1.16` | `0.1.17` |
| [tokio-util](https://github.com/tokio-rs/tokio) | `0.7.12` | `0.7.13` |
| [toml](https://github.com/toml-rs/toml) | `0.8.14` | `0.8.19` |
| [tracing](https://github.com/tokio-rs/tracing) | `0.1.40` | `0.1.41` |
| [tracing-subscriber](https://github.com/tokio-rs/tracing) | `0.3.18` | `0.3.19` |
| [url](https://github.com/servo/rust-url) | `2.5.2` | `2.5.4` |
| [wasm-bindgen-test](https://github.com/rustwasm/wasm-bindgen) | `0.3.43` | `0.3.45` |
| [js-sys](https://github.com/rustwasm/wasm-bindgen) | `0.3.72` | `0.3.76` |
| [wasm-bindgen-futures](https://github.com/rustwasm/wasm-bindgen) | `0.4.45` | `0.4.49` |
| [env_logger](https://github.com/rust-cli/env_logger) | `0.11.5` | `0.11.6` |



Updates `anyhow` from 1.0.90 to 1.0.95
- [Release notes](https://github.com/dtolnay/anyhow/releases)
- [Commits](https://github.com/dtolnay/anyhow/compare/1.0.90...1.0.95)

Updates `async-trait` from 0.1.83 to 0.1.84
- [Release notes](https://github.com/dtolnay/async-trait/releases)
- [Commits](https://github.com/dtolnay/async-trait/compare/0.1.83...0.1.84)

Updates `blake3` from 1.5.4 to 1.5.5
- [Release notes](https://github.com/BLAKE3-team/BLAKE3/releases)
- [Commits](https://github.com/BLAKE3-team/BLAKE3/compare/1.5.4...1.5.5)

Updates `chrono` from 0.4.38 to 0.4.39
- [Release notes](https://github.com/chronotope/chrono/releases)
- [Changelog](https://github.com/chronotope/chrono/blob/main/CHANGELOG.md)
- [Commits](https://github.com/chronotope/chrono/compare/v0.4.38...v0.4.39)

Updates `clap` from 4.5.20 to 4.5.23
- [Release notes](https://github.com/clap-rs/clap/releases)
- [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md)
- [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.5.20...clap_complete-v4.5.23)

Updates `clap_complete` from 4.5.33 to 4.5.40
- [Release notes](https://github.com/clap-rs/clap/releases)
- [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md)
- [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.5.33...clap_complete-v4.5.40)

Updates `comfy-table` from 7.1.1 to 7.1.3
- [Release notes](https://github.com/nukesor/comfy-table/releases)
- [Changelog](https://github.com/Nukesor/comfy-table/blob/main/CHANGELOG.md)
- [Commits](https://github.com/nukesor/comfy-table/compare/v7.1.1...v7.1.3)

Updates `console` from 0.15.8 to 0.15.10
- [Release notes](https://github.com/console-rs/console/releases)
- [Changelog](https://github.com/console-rs/console/blob/main/CHANGELOG.md)
- [Commits](https://github.com/console-rs/console/compare/0.15.8...0.15.10)

Updates `const_format` from 0.2.33 to 0.2.34
- [Release notes](https://github.com/rodrimati1992/const_format_crates/releases)
- [Changelog](https://github.com/rodrimati1992/const_format_crates/blob/master/Changelog.md)
- [Commits](https://github.com/rodrimati1992/const_format_crates/commits/0.2.34)

Updates `csv` from 1.3.0 to 1.3.1
- [Commits](https://github.com/BurntSushi/rust-csv/compare/1.3.0...1.3.1)

Updates `flate2` from 1.0.34 to 1.0.35
- [Release notes](https://github.com/rust-lang/flate2-rs/releases)
- [Changelog](https://github.com/rust-lang/flate2-rs/blob/main/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/flate2-rs/compare/1.0.34...1.0.35)

Updates `futures-util` from 0.3.30 to 0.3.31
- [Release notes](https://github.com/rust-lang/futures-rs/releases)
- [Changelog](https://github.com/rust-lang/futures-rs/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/futures-rs/compare/0.3.30...0.3.31)

Updates `hyper-util` from 0.1.9 to 0.1.10
- [Release notes](https://github.com/hyperium/hyper-util/releases)
- [Changelog](https://github.com/hyperium/hyper-util/blob/master/CHANGELOG.md)
- [Commits](https://github.com/hyperium/hyper-util/compare/v0.1.9...v0.1.10)

Updates `indicatif` from 0.17.8 to 0.17.9
- [Release notes](https://github.com/console-rs/indicatif/releases)
- [Commits](https://github.com/console-rs/indicatif/compare/0.17.8...0.17.9)

Updates `moka` from 0.12.8 to 0.12.10
- [Changelog](https://github.com/moka-rs/moka/blob/main/CHANGELOG.md)
- [Commits](https://github.com/moka-rs/moka/compare/v0.12.8...v0.12.10)

Updates `pin-project` from 1.1.6 to 1.1.7
- [Release notes](https://github.com/taiki-e/pin-project/releases)
- [Changelog](https://github.com/taiki-e/pin-project/blob/main/CHANGELOG.md)
- [Commits](https://github.com/taiki-e/pin-project/compare/v1.1.6...v1.1.7)

Updates `pin-project-lite` from 0.2.14 to 0.2.15
- [Release notes](https://github.com/taiki-e/pin-project-lite/releases)
- [Changelog](https://github.com/taiki-e/pin-project-lite/blob/main/CHANGELOG.md)
- [Commits](https://github.com/taiki-e/pin-project-lite/compare/v0.2.14...v0.2.15)

Updates `quote` from 1.0.37 to 1.0.38
- [Release notes](https://github.com/dtolnay/quote/releases)
- [Commits](https://github.com/dtolnay/quote/compare/1.0.37...1.0.38)

Updates `semver` from 1.0.23 to 1.0.24
- [Release notes](https://github.com/dtolnay/semver/releases)
- [Commits](https://github.com/dtolnay/semver/compare/1.0.23...1.0.24)

Updates `serde` from 1.0.215 to 1.0.217
- [Release notes](https://github.com/serde-rs/serde/releases)
- [Commits](https://github.com/serde-rs/serde/compare/v1.0.215...v1.0.217)

Updates `serde_derive` from 1.0.215 to 1.0.217
- [Release notes](https://github.com/serde-rs/serde/releases)
- [Commits](https://github.com/serde-rs/serde/compare/v1.0.215...v1.0.217)

Updates `serde_json` from 1.0.132 to 1.0.134
- [Release notes](https://github.com/serde-rs/json/releases)
- [Commits](https://github.com/serde-rs/json/compare/v1.0.132...v1.0.134)

Updates `tar` from 0.4.42 to 0.4.43
- [Commits](https://github.com/alexcrichton/tar-rs/compare/0.4.42...0.4.43)

Updates `time` from 0.3.36 to 0.3.37
- [Release notes](https://github.com/time-rs/time/releases)
- [Changelog](https://github.com/time-rs/time/blob/main/CHANGELOG.md)
- [Commits](https://github.com/time-rs/time/compare/v0.3.36...v0.3.37)

Updates `tokio-stream` from 0.1.16 to 0.1.17
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-stream-0.1.16...tokio-stream-0.1.17)

Updates `tokio-util` from 0.7.12 to 0.7.13
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-util-0.7.12...tokio-util-0.7.13)

Updates `toml` from 0.8.14 to 0.8.19
- [Commits](https://github.com/toml-rs/toml/compare/toml-v0.8.14...toml-v0.8.19)

Updates `tracing` from 0.1.40 to 0.1.41
- [Release notes](https://github.com/tokio-rs/tracing/releases)
- [Commits](https://github.com/tokio-rs/tracing/compare/tracing-0.1.40...tracing-0.1.41)

Updates `tracing-subscriber` from 0.3.18 to 0.3.19
- [Release notes](https://github.com/tokio-rs/tracing/releases)
- [Commits](https://github.com/tokio-rs/tracing/compare/tracing-subscriber-0.3.18...tracing-subscriber-0.3.19)

Updates `url` from 2.5.2 to 2.5.4
- [Release notes](https://github.com/servo/rust-url/releases)
- [Commits](https://github.com/servo/rust-url/compare/v2.5.2...v2.5.4)

Updates `wasm-bindgen-test` from 0.3.43 to 0.3.45
- [Release notes](https://github.com/rustwasm/wasm-bindgen/releases)
- [Changelog](https://github.com/rustwasm/wasm-bindgen/blob/main/CHANGELOG.md)
- [Commits](https://github.com/rustwasm/wasm-bindgen/commits)

Updates `js-sys` from 0.3.72 to 0.3.76
- [Release notes](https://github.com/rustwasm/wasm-bindgen/releases)
- [Changelog](https://github.com/rustwasm/wasm-bindgen/blob/main/CHANGELOG.md)
- [Commits](https://github.com/rustwasm/wasm-bindgen/commits)

Updates `wasm-bindgen-futures` from 0.4.45 to 0.4.49
- [Release notes](https://github.com/rustwasm/wasm-bindgen/releases)
- [Changelog](https://github.com/rustwasm/wasm-bindgen/blob/main/CHANGELOG.md)
- [Commits](https://github.com/rustwasm/wasm-bindgen/commits)

Updates `web-sys` from 0.3.72 to 0.3.76
- [Release notes](https://github.com/rustwasm/wasm-bindgen/releases)
- [Changelog](https://github.com/rustwasm/wasm-bindgen/blob/main/CHANGELOG.md)
- [Commits](https://github.com/rustwasm/wasm-bindgen/commits)

Updates `env_logger` from 0.11.5 to 0.11.6
- [Release notes](https://github.com/rust-cli/env_logger/releases)
- [Changelog](https://github.com/rust-cli/env_logger/blob/main/CHANGELOG.md)
- [Commits](https://github.com/rust-cli/env_logger/compare/v0.11.5...v0.11.6)

---
updated-dependencies:
- dependency-name: anyhow
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: async-trait
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: blake3
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: chrono
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: clap
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: clap_complete
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: comfy-table
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: console
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: const_format
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: csv
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: flate2
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: futures-util
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: hyper-util
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: indicatif
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: moka
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: pin-project
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: pin-project-lite
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: quote
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: semver
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: serde
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: serde_derive
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: serde_json
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: tar
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: time
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: tokio-stream
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: tokio-util
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: toml
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: tracing
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: tracing-subscriber
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: url
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: wasm-bindgen-test
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: js-sys
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: wasm-bindgen-futures
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: web-sys
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: env_logger
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
...

Signed-off-by: dependabot[bot] <support@github.com>

* Use expect in geodata test to give error message on failure

I keep hitting this error on CI, from what I think is network hickup.
But it's hard to tell form the log since the error is swallowed.

Explicitly unwrap the result so we get a more detailed error output

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jon Häggblad <jon.haggblad@gmail.com>
2025-01-08 10:56:39 +01:00
Jon Häggblad 9b02de3e75 Use expect in geodata test to give error message on failure (#5314)
* Use expect in geodata test to give error message on failure

I keep hitting this error on CI, from what I think is network hickup.
But it's hard to tell form the log since the error is swallowed.

Explicitly unwrap the result so we get a more detailed error output

* Add nym-node-status-api to ci-build
2025-01-08 10:56:26 +01:00
benedetta davico f3d8aba82c Merge pull request #5288 from nymtech/release/2024.14-crunch-patched
Merge patched crunch to develop
2025-01-08 10:34:12 +01:00
import this 72a4a26c40 [DOCs/operators]: smooth operators (#5311)
* smooth minimum expectation

* simplify simplify

* quick fix

* feedback edits

* feedback edits

* feedback edits
2025-01-06 12:46:16 +00:00
import this 5d9b5a0d70 initialise minimum requirements page (#5304) 2025-01-06 10:50:43 +00:00
import this c070e4bfee [DOCs]: Fixes (#5299)
* correct url

* version fix
2025-01-06 10:50:31 +00:00
mfahampshire 4d3d60b78e tweak format (#5295)
* tweak format

* auto commit generated command files

* auto commit generated command files

* push components

* edit migration page (#5303)

---------

Co-authored-by: import this <97586125+serinko@users.noreply.github.com>
2024-12-23 11:51:24 +00:00
dynco-nym 41fb17a31b Extend swagger docs (#5235)
* WIP adding derive(ToSchema)

* Derive ToSchema for more types

* ContractBuildInformation on /nym_contracts_detailed

* rustfmt

* Add cfg_attr

* A bunch of annotations

* Compiles with utoipa 5.2

* WIP

* Post rebase fixes

* Gitattributes to ignore .sqlx diffs

* generate Sqlx schema files

* Improvements

* Move ecash schema out of ecash crate

* Move redocly config to nym-api/

* Move redocly config to nym-api/

* Remove ErrorResponse

* Move generated openapi spec to .gitignore

* Include BSL licence

* Remove utoipa from ecash toml file

* Remove placeholder annotations

* Chain-watcher rebase changes

* Update licence info

* Treat Scalar as String in OpenAPI
2024-12-20 12:18:45 +01:00
Jędrzej Stuczyński 7d5e3ef7d3 feature: expand nym-node prometheus metrics (#5298)
* fixed bearer auth for prometheus route

* basic prometheus metrics

* added rates on global values

* improved structure on the prometheus metrics

* added additional metrics for ingress websockets and egress mixnet connections

* some channel business metrics

* fixed metrics registration and added additional variants

* added counter for number of disk persisted packets

* counter for pending egress packets

* counter for pending egress forward packets

* clippy
2024-12-20 10:32:56 +00:00
Jon Häggblad 4f283f565c Add assignes for the root cargo ecosystem (#5297) 2024-12-20 01:16:39 +01:00
Tommy Verrall 2fab3f11b6 Merge pull request #5274 from nymtech/feature/nyx-chain-watcher
Nyx Chain Watcher
2024-12-19 17:34:36 +00:00
Sachin Kamath d0722e5f63 chain-watcher: try fix windows path 2024-12-19 21:07:50 +05:30
Sachin Kamath 64373548e4 chain-watcher: windows workaround for db path, add sqlx 2024-12-19 20:30:11 +05:30
Sachin Kamath bad85abff3 chain-watcher: bump version 2024-12-19 14:10:28 +00:00
Sachin Kamath 6e66cc2467 validator-rewarder: fix errors 2024-12-19 14:10:28 +00:00
Sachin Kamath c805aa79a4 nyx-chain-watcher: fallback to env variable when reading config 2024-12-19 14:10:28 +00:00
Mark Sinclair f5ca1ee20a Bump version 2024-12-19 14:10:28 +00:00
Sachin Kamath 4f07343efd api: fetch addresses from config. 2024-12-19 14:10:28 +00:00
Mark Sinclair 94ab78606a Bump version 2024-12-19 14:10:28 +00:00
Sachin Kamath 7b92e471c8 bugfix: dont manually set last_processed_height for pruning=nothing strat. 2024-12-19 14:10:28 +00:00
Sachin Kamath a507ffe371 chain-scraper : use tx module for parsing transactions 2024-12-19 14:10:28 +00:00
Mark Sinclair c02e93004f nyx-chain-watcher: return average price over 24 hours 2024-12-19 14:10:28 +00:00
Mark Sinclair 1113e0c599 formatting 2024-12-19 14:10:28 +00:00
Mark Sinclair 06c7394861 change webhook payload to have a structured coin for funds 2024-12-19 14:10:28 +00:00
Mark Sinclair e20bea9d32 bump version 2024-12-19 14:10:28 +00:00
Mark Sinclair eeea32fdca add websocket rpcs to env files 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński b06349efd0 added env variable to nuke the db 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 98a4cb4ae8 even more logs 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński be185824b4 extra logs 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 60e8e53f3b explicitly build websocket client in 0.37 compat mode 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 1890367bfc allow conversion from CometBFT block subscription 2024-12-19 14:10:28 +00:00
Mark Sinclair 2b26a88d6c Bump version 2024-12-19 14:10:28 +00:00
Mark Sinclair a6f4f017c7 Bump version 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński d8a6ca48c1 implemented starting block logic inside the chain scraper itself 2024-12-19 14:10:28 +00:00
Mark Sinclair 541d46e899 Fix docker entry point and bump version 2024-12-19 14:10:28 +00:00
Mark Sinclair 39f525e88e Add Dockerfile and workflow to build 2024-12-19 14:10:28 +00:00
Mark Sinclair 156e892baa parse message index and process all log entries 2024-12-19 14:10:28 +00:00
Mark Sinclair 5b6ae39dab init saves example config 2024-12-19 14:10:28 +00:00
Mark Sinclair df004f834f Add example to README 2024-12-19 14:10:28 +00:00
Mark Sinclair 235165171b Remove migration from seed app 2024-12-19 14:10:28 +00:00
Mark Sinclair 572875058d Add config, overrides and CLI 2024-12-19 14:10:28 +00:00
Mark Sinclair cf6f437187 Move nym-data-observatory (v0) to nyx-chain-watcher 2024-12-19 14:10:28 +00:00
Mark Sinclair 6010de978d data-observatory: renamed transactions to payments because there is already transaction in the base scraper schema 2024-12-19 14:10:28 +00:00
Mark Sinclair d951ea9548 nyxd-scraper: add optional starting height parameter to scrape before listening for new blocks 2024-12-19 14:10:28 +00:00
Sachin Kamath 868d7439ec observatory 0.1 2024-12-19 14:10:28 +00:00
Sachin Kamath a884aee1e9 fix review comments 2024-12-19 14:10:28 +00:00
Sachin Kamath 80f965a104 clippy 2024-12-19 14:10:28 +00:00
Sachin Kamath c99a240ed4 nyxd-scraper: add config to make pre-commit storage optional 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 67976b1b30 feature: wireguard metrics (#5278)
* experimental log

* introduce wireguard metrics updates

* add wireguard traffic rates to console logger

* missing import

* changed order of displayed values

* expose bytes information via rest endpoint

* clippy
2024-12-19 10:49:56 +00:00
Jędrzej Stuczyński a2322d6cdf feature: nym topology revamp (#5271)
* revamped NymTopology

* wip

* working e2e client

* updated nym-api

* updated nym-node

* updated rest of non-test code

* updated the rest of the codebase

* additional tweaks

* linux clippy fixes + adding additional dummy ipr types for better linting on non-linux targets
2024-12-19 10:44:34 +00:00
Jędrzej Stuczyński ae346bb75b bugfix: remove unnecessary arguments for nym-api swagger endpoints (#5272)
* removed incorrect body argument for '/rewarded-set' endpoint

* removed incorrect pagination parameters for monitor run results
2024-12-19 10:42:52 +00:00
Jon Häggblad 53c28af847 Add close to credential storage (#5283) (#5293)
* Add close method to credential storage

* wip
2024-12-18 21:51:00 +01:00
Bogdan-Ștefan Neacşu 3521f36374 Include IPINFO_API_TOKEN in nightly CI (#5285)
* Include IPINFO_API_TOKEN in nightly CI

* Fix beta clippy
2024-12-18 16:46:28 +02:00
Bogdan-Ștefan Neacşu 3695332036 Move tun constants to network defaults (#5286) 2024-12-18 15:03:21 +02:00
Jon Häggblad d03302c391 http-api-client: deduplicate code (#5267)
* Deduplicate code

* Remove unneeded async
2024-12-18 12:36:10 +01:00
mfahampshire cd86110b2c Max/crunch patch docs (#5284)
* patch changelog done

* auto commit generated command files
2024-12-18 10:37:45 +00:00
benedetta davico 8d5a41a790 Merge pull request #5277 from nymtech/feature/modify_changelog
Modify CHANGELOG
2024-12-18 11:07:49 +01:00
Mark Sinclair ad0c135d4c Bump credential proxy version 2024-12-17 20:35:42 +00:00
Bogdan-Ștefan Neacşu 039b05cf7e Modify CHANGELOG 2024-12-17 18:59:49 +02:00
benedetta davico 37b10b59aa update changelog for nym-node v1.2.1 2024-12-17 17:54:18 +01:00
benedetta davico a9ede22bbd update nym-node version 2024-12-17 17:41:12 +01:00
Bogdan-Ștefan Neacşu b656003306 Expect that previously regitrated clients don't have v6 addr 2024-12-17 16:59:01 +02:00
Bogdan-Ștefan Neacşu 61e872f033 Add windows to CI builds (#5269)
* Add windows to CI builds

* Fix win build for node status api

* Fix win build for sdk

* Fix win build for cred proxy
2024-12-17 15:18:11 +02:00
dynco-nym b4f51baf94 Change sqlite journal mode to WAL (#5213)
* Change sqlite journal mode to WAL

* Synchronous mode & auto vacuum

* Bump probe git ref to 1.1.0
2024-12-16 16:40:02 +01:00
Drazen Urch a3f3d83c1b Shipping raw metrics to PG (#5216)
* Shipping raw metrics to PG

* Put cancel token back in its place

* fmt
2024-12-16 16:19:37 +01:00
Drazen Urch 84d7004cb2 Add control messages to GatewayTransciver (#5247)
* Add control messages to GatewayTransciver

* Add forget me flag to clients

* CI gate IPIINFO test

* Handle ForgetMe for client and stats db

* fmt
2024-12-16 15:18:04 +01:00
import this be063a36eb syntax hotfix (#5266) 2024-12-16 13:17:38 +00:00
windy-ux 0a712b9fce Fix/web 615 seo setup (#5265)
* + add header into Packet Mixing docs

* + add head changes for testing

* / updated version of metatags in theme.config

* + add env file

* / theme.config to use NEXT_PUBLIC_SITE_URL from env file

* @ Fix broken link in theme.config

* - remove favicon code

* + add desription for intro pages

* + add default book's desriptions

* Revert "+ add desription for intro pages"

This reverts commit 98c78242d4.
2024-12-16 13:17:25 +00:00
Bogdan-Ștefan Neacşu 88d6fb4e22 Add fd callback to client core (#5230)
* Add fd callback to client core

* Include in sdk

* Fix clippy many args

* Method in builder

* Replace Box with Arc
2024-12-16 13:57:34 +02:00
Jon Häggblad 04c2045d94 Add PATCH support to nym-http-api-client (#5260) 2024-12-16 12:28:44 +01:00
495 changed files with 9985 additions and 6635 deletions
+1
View File
@@ -0,0 +1 @@
nym-validator-rewarder/.sqlx/** diff=nodiff
+2
View File
@@ -31,3 +31,5 @@ updates:
update-types:
- "patch"
open-pull-requests-limit: 10
assignees:
- "octol"
+8 -7
View File
@@ -9,15 +9,16 @@ on:
- 'gateway/**'
- 'integrations/**'
- 'mixnode/**'
- 'sdk/rust/**'
- 'sdk/lib/**'
- 'service-providers/**'
- 'nym-network-monitor/**'
- 'nym-api/**'
- 'nym-node/**'
- 'nym-outfox/**'
- 'nym-data-observatory/**'
- 'nym-network-monitor/**'
- 'nym-node/**'
- 'nym-node-status-api/**'
- 'nym-outfox/**'
- 'nym-validator-rewarder/**'
- 'sdk/lib/**'
- 'sdk/rust/**'
- 'service-providers/**'
- 'tools/**'
- 'wasm/**'
- 'Cargo.toml'
@@ -30,7 +31,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ arc-ubuntu-20.04, custom-runner-mac-m1 ]
os: [ arc-ubuntu-20.04, custom-windows-11, custom-runner-mac-m1 ]
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
+1
View File
@@ -15,6 +15,7 @@ jobs:
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
IPINFO_API_TOKEN: ${{ secrets.IPINFO_API_TOKEN }}
continue-on-error: true
steps:
- name: Check out repository code
+8 -1
View File
@@ -51,4 +51,11 @@ ppa-private-key.b64
ppa-private-key.asc
nym-network-monitor/topology.json
nym-network-monitor/__pycache__
nym-network-monitor/*.key
nym-network-monitor/*.key
nym-network-monitor/.envrc
nym-network-monitor/.envrc
nym-api/redocly/formatted-openapi.json
*.sqlite
.build
+5
View File
@@ -4,6 +4,11 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2024.14-crunch-patched] (2024-12-17)
- Fixes an issue to allow previously registred clients to connect to latest nym-nodes
- Fixes compatibility issues between nym-nodes and older clients
## [2024.14-crunch] (2024-12-11)
- Merge/release/2024.14-crunch ([#5242])
Generated
+796 -395
View File
File diff suppressed because it is too large Load Diff
+34 -32
View File
@@ -118,8 +118,8 @@ members = [
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-network-monitor",
"nyx-chain-watcher",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
@@ -147,7 +147,9 @@ members = [
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract", "common/verloc", "tools/internal/mixnet-connectivity-check",
"tools/internal/testnet-manager/dkg-bypass-contract",
"common/verloc",
"tools/internal/mixnet-connectivity-check",
]
default-members = [
@@ -156,11 +158,11 @@ default-members = [
"explorer-api",
"nym-api",
"nym-credential-proxy/nym-credential-proxy",
"nym-data-observatory",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -191,9 +193,9 @@ aes = "0.8.1"
aes-gcm = "0.10.1"
aes-gcm-siv = "0.11.1"
aead = "0.5.2"
anyhow = "1.0.90"
anyhow = "1.0.95"
argon2 = "0.5.0"
async-trait = "0.1.83"
async-trait = "0.1.84"
axum-client-ip = "0.6.1"
axum = "0.7.5"
axum-extra = "0.9.4"
@@ -202,7 +204,7 @@ bincode = "1.3.3"
bip39 = { version = "2.0.0", features = ["zeroize"] }
bit-vec = "0.7.0" # can we unify those?
bitvec = "1.0.0"
blake3 = "1.5.4"
blake3 = "1.5.5"
bloomfilter = "1.0.14"
bs58 = "0.5.1"
bytecodec = "0.4.15"
@@ -212,20 +214,20 @@ celes = "2.4.0"
cfg-if = "1.0.0"
chacha20 = "0.9.0"
chacha20poly1305 = "0.10.1"
chrono = "0.4.31"
chrono = "0.4.39"
cipher = "0.4.3"
clap = "4.5.20"
clap = "4.5.23"
clap_complete = "4.5"
clap_complete_fig = "4.5"
colored = "2.0"
comfy-table = "7.1.1"
console = "0.15.8"
comfy-table = "7.1.3"
console = "0.15.10"
console-subscriber = "0.1.1"
console_error_panic_hook = "0.1"
const-str = "0.5.6"
const_format = "0.2.33"
const_format = "0.2.34"
criterion = "0.4"
csv = "1.3.0"
csv = "1.3.1"
ctr = "0.9.1"
cupid = "0.6.1"
curve25519-dalek = "4.1"
@@ -242,7 +244,7 @@ etherparse = "0.13.0"
envy = "0.4"
eyre = "0.6.9"
fastrand = "2.1.1"
flate2 = "1.0.34"
flate2 = "1.0.35"
futures = "0.3.28"
futures-util = "0.3"
generic-array = "0.14.7"
@@ -262,7 +264,7 @@ humantime-serde = "1.1.1"
human-repr = "1.1.0"
hyper = "1.4.1"
hyper-util = "0.1"
indicatif = "0.17.8"
indicatif = "0.17.9"
inquire = "0.6.2"
ip_network = "0.4.1"
ipnetwork = "0.20"
@@ -287,7 +289,7 @@ parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
pin-project = "1.1"
pin-project-lite = "0.2.14"
pin-project-lite = "0.2.15"
pretty_env_logger = "0.4.0"
publicsuffix = "2.2.3"
quote = "1"
@@ -305,11 +307,11 @@ rocket_cors = "0.6.0"
rocket_okapi = "0.8.0"
safer-ffi = "0.1.13"
schemars = "0.8.21"
semver = "1.0.23"
serde = "1.0.211"
semver = "1.0.24"
serde = "1.0.217"
serde_bytes = "0.11.15"
serde_derive = "1.0"
serde_json = "1.0.132"
serde_json = "1.0.134"
serde_json_path = "0.7.1"
serde_repr = "0.1"
serde_with = "3.9.0"
@@ -324,34 +326,34 @@ subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.30.13"
tap = "1.0.1"
tar = "0.4.42"
tar = "0.4.43"
tempfile = "3.14"
thiserror = "1.0.64"
time = "0.3.30"
time = "0.3.37"
tokio = "1.39"
tokio-stream = "0.1.16"
tokio-stream = "0.1.17"
tokio-test = "0.4.4"
tokio-tun = "0.11.5"
tokio-tungstenite = { version = "0.20.1" }
tokio-util = "0.7.12"
toml = "0.8.14"
tokio-util = "0.7.13"
toml = "0.8.19"
tower = "0.4.13"
tower-http = "0.5.2"
tracing = "0.1.37"
tracing = "0.1.41"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = "0.3.16"
tracing-subscriber = "0.3.19"
tracing-tree = "0.2.2"
tracing-log = "0.2"
ts-rs = "10.0.0"
tungstenite = { version = "0.20.1", default-features = false }
url = "2.5"
utoipa = "4.2"
utoipa-swagger-ui = "7.1"
utoipauto = "0.1"
utoipa = "5.2"
utoipa-swagger-ui = "8.0"
utoipauto = "0.2"
uuid = "*"
vergen = { version = "=8.3.1", default-features = false }
walkdir = "2"
wasm-bindgen-test = "0.3.43"
wasm-bindgen-test = "0.3.45"
x25519-dalek = "2.0.0"
zeroize = "1.6.0"
@@ -400,11 +402,11 @@ gloo-net = "0.5.0"
# this is blocked until the upstream removes outdates `wasm_bindgen` feature usage
# indexed_db_futures = "0.4.1"
indexed_db_futures = { git = "https://github.com/TiemenSch/rust-indexed-db", branch = "update-uuid" }
js-sys = "0.3.70"
js-sys = "0.3.76"
serde-wasm-bindgen = "0.6.5"
tsify = "0.4.5"
wasm-bindgen = "0.2.95"
wasm-bindgen-futures = "0.4.45"
wasm-bindgen = "0.2.99"
wasm-bindgen-futures = "0.4.49"
wasmtimer = "0.2.0"
web-sys = "0.3.72"
+23
View File
@@ -0,0 +1,23 @@
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
+6 -6
View File
@@ -14,7 +14,7 @@ The platform is composed of multiple Rust crates. Top-level executable binary cr
* `nym-socks5-client` - a Socks5 proxy you can run on your machine and use with existing applications.
* `nym-explorer` - a (projected) block explorer and (existing) mixnet viewer.
* `nym-wallet` - a desktop wallet implemented using the [Tauri](https://tauri.studio/en/docs/about/intro) framework.
* `nym-cli` - a tool for interacting with the network from the CLI.
* `nym-cli` - a tool for interacting with the network from the CLI.
<!-- coming soon
* `nym-network-monitor` - sends packets through the full system to check that they are working as expected, and stores node uptime histories as the basis of a rewards system ("mixmining" or "proof-of-mixing").
-->
@@ -42,10 +42,10 @@ client ───► Gateway ──┘ mix │ mix ┌─►mix ───►
References for developers:
* [Dev Docs](https://nymtech.net/docs/developers)
* [SDKs](https://nymtech.net/docs/developers/rust)
* [Network Docs](https://nymtech.net/docs/network)
* [Release Cycle - git flow](https://nymtech.net/docs/operators/release-cycle)
* [Dev Docs](https://nym.com/docs/developers)
* [SDKs](https://nym.com/docs/developers/rust)
* [Network Docs](https://nym.com/docs/network)
* [Release Cycle - git flow](https://nym.com/docs/operators/release-cycle)
### Developer chat
@@ -66,4 +66,4 @@ As a general approach, licensing is as follows this pattern:
- libraries and components are Apache 2.0 or MIT
- documentation is Apache 2.0 or CC0-1.0
Nym Node Operators and Validators Temrs and Conditions can be found [here](https://nymtech.net/terms-and-conditions/operators/v1.0.0).
Nym Node Operators and Validators Temrs and Conditions can be found [here](https://nym.com/terms-and-conditions/operators/v1.0.0).
+14 -3
View File
@@ -3,11 +3,18 @@ name = "nym-client-core"
version = "1.1.15"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
rust-version = "1.70"
rust-version = "1.76"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[build-dependencies]
nym-client-core-config-types = { path = "./config-types", features = [
"disk-persistence",
] }
toml.workspace = true
uneval = "0.2.4"
[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
@@ -43,9 +50,10 @@ nym-gateway-requests = { path = "../gateway-requests" }
nym-metrics = { path = "../nym-metrics" }
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-sphinx-params = { path = "../nymsphinx/params" }
nym-statistics-common = { path = "../statistics" }
nym-pemstore = { path = "../pemstore" }
nym-topology = { path = "../topology", features = ["serializable"] }
nym-topology = { path = "../topology", features = ["persistence"] }
nym-mixnet-client = { path = "../client-libs/mixnet-client", default-features = false }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
nym-task = { path = "../task" }
@@ -117,12 +125,15 @@ features = ["wasm-bindgen"]
[dev-dependencies]
tempfile = { workspace = true }
toml.workspace = true
[features]
default = []
default = ["enable-cfg"]
cli = ["clap", "comfy-table"]
fs-credentials-storage = ["nym-credential-storage/persistent-storage"]
fs-surb-storage = ["nym-client-core-surb-storage/fs-surb-storage"]
fs-gateways-storage = ["nym-client-core-gateways-storage/fs-gateways-storage"]
wasm = ["nym-gateway-client/wasm"]
metrics-server = []
enable-cfg = []
+109
View File
@@ -0,0 +1,109 @@
use nym_client_core_config_types::Config;
use std::io::{BufWriter, Write};
fn main() {
write_conditional_default();
println!("cargo:rerun-if-changed=build.rs");
}
#[allow(unused)]
const DEFAULT_CONFIG_FILE_NAME: &str = "nymvpn-config.toml";
const CONFIG_INIT_FN_PREAMBLE: &[u8] = br#"
pub fn new_bootstrapped<S1, S2>(id: S1, version: S2) -> Config
where
S1: Into<String>,
S2: Into<String>,
{
use nym_sphinx_params::{PacketSize, PacketType};
let mut cfg: Config = "#;
const CONFIG_INIT_FN_EPILOGUE: &[u8] = br#";
cfg.client.id = id.into();
cfg.client.version = version.into();
cfg
}"#;
// #[cfg(feature = "enable-cfg")]
// const CUSTOM_BREAK: &[u8] = br#"
// #[cfg(feature="enable-cfg")]
// "#;
// #[cfg(not(feature = "enable-cfg"))]
// const DEFAULT_BREAK: &[u8] = br#"
// #[cfg(not(feature="enable-cfg"))]
// "#;
// const CONFIG_INIT_FN_PREAMBLE: &[u8] = br#"
// impl Default for BaseClientConfig {
// fn default() -> Self {
// use config_types::Keys;
// Self("#;
// const DEFAULT_CONFIG_EPILOGUE: &[u8] = br#"
// )
// }
// }"#;
// #[cfg(feature = "enable-cfg")]
// const CUSTOM_BREAK: &[u8] = br#"
// #[cfg(feature="enable-cfg")]
// "#;
// #[cfg(not(feature = "enable-cfg"))]
// const DEFAULT_BREAK: &[u8] = br#"
// #[cfg(not(feature="enable-cfg"))]
// "#;
fn write_conditional_default() {
// creating a string with the values from our default Config object
let mut array_string = BufWriter::new(Vec::new());
array_string.write(CONFIG_INIT_FN_PREAMBLE).unwrap();
#[cfg(feature = "enable-cfg")]
write_custom_config(&mut array_string);
#[cfg(not(feature = "enable-cfg"))]
{
let default_config = Config::new("", "");
uneval::write(default_config, &mut array_string).unwrap();
}
array_string.write(CONFIG_INIT_FN_EPILOGUE).unwrap();
// write the string to a file. OUT_DIR environment variable is defined by cargo
let out_dir = std::env::var("OUT_DIR").unwrap();
let dest_path = std::path::Path::new(&out_dir).join("default.rs");
let out_str = String::from_utf8(array_string.into_inner().unwrap()).unwrap();
std::fs::write(&dest_path, out_str).unwrap();
}
#[cfg(feature = "enable-cfg")]
pub(crate) fn write_custom_config(out: impl Write) {
// allow the name of the file we draw hardcoded values from to be set by an
// environment variable at compile time.
let cfg_file_name = option_env!("NYMVPN_CONFIG_PATH").unwrap_or(DEFAULT_CONFIG_FILE_NAME);
// set reasons to rebuild
println!("cargo:rerun-if-changed={cfg_file_name}");
println!("cargo:rerun-if-env-changed=NYMVPN_CONFIG_PATH");
let path = std::path::PathBuf::from(cfg_file_name);
let cfg_file_path = if path.is_absolute() {
path
} else {
let workspace_path = std::env::var("CARGO_MANIFEST_DIR").unwrap();
std::path::Path::new(&workspace_path).join(cfg_file_name)
};
let config_str: String = std::fs::read_to_string(cfg_file_path).unwrap();
let config: Config = toml::from_str(&config_str).unwrap();
uneval::write(config, out).unwrap();
}
@@ -550,6 +550,15 @@ pub struct Topology {
/// Specifies a minimum performance of a gateway that is used on route construction.
/// This setting is only applicable when `NymApi` topology is used.
pub minimum_gateway_performance: u8,
/// Specifies whether this client should attempt to retrieve all available network nodes
/// as opposed to just active mixnodes/gateways.
/// Useless without `ignore_epoch_roles = true`
pub use_extended_topology: bool,
/// Specifies whether this client should ignore the current epoch role of the target egress node
/// when constructing the final hop packets.
pub ignore_egress_epoch_role: bool,
}
#[allow(clippy::large_enum_variant)]
@@ -586,6 +595,8 @@ impl Default for Topology {
topology_structure: TopologyStructure::default(),
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
use_extended_topology: false,
ignore_egress_epoch_role: false,
}
}
}
@@ -8,7 +8,10 @@ use crate::{
},
};
use log::{debug, error};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
#[derive(Debug, Clone)]
@@ -30,6 +33,9 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -110,7 +116,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
VALUES (?, ?, ?)
"#,
registered_gateway.gateway_id_bs58,
@@ -224,7 +230,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
VALUES (?, ?)
"#,
custom.gateway_id_bs58,
+54
View File
@@ -0,0 +1,54 @@
[client]
version = ""
id = ""
disabled_credentials_mode = true
nyxd_urls = ["https://rpc.nymtech.net/"]
nym_api_urls = ["https://validator.nymtech.net/api/"]
[debug.traffic]
average_packet_delay = "50ms"
message_sending_average_delay = "20ms"
disable_main_poisson_packet_distribution = false
deterministic_route_selection = false
primary_packet_size = "regular"
packet_type = "mix"
[debug.cover_traffic]
loop_cover_traffic_average_delay = "200ms"
cover_traffic_primary_size_ratio = 0.7
disable_loop_cover_traffic_stream = false
[debug.gateway_connection]
gateway_response_timeout = "5m"
[debug.acknowledgements]
average_ack_delay = "50ms"
ack_wait_multiplier = 1.5
ack_wait_addition = "1s 500ms"
[debug.topology]
topology_refresh_rate = "5m"
topology_resolution_timeout = "5s"
disable_refreshing = false
max_startup_gateway_waiting_period = "1h 10m"
topology_structure = "NymApi"
minimum_mixnode_performance = 50
minimum_gateway_performance = 50
use_extended_topology = false
ignore_egress_epoch_role = false
[debug.reply_surbs]
minimum_reply_surb_storage_threshold = 10
maximum_reply_surb_storage_threshold = 200
minimum_reply_surb_request_size = 10
maximum_reply_surb_request_size = 100
maximum_allowed_reply_surb_request_size = 500
maximum_reply_surb_rerequest_waiting_period = "10s"
maximum_reply_surb_drop_waiting_period = "5m"
maximum_reply_surb_age = "12h"
maximum_reply_key_age = "1day"
[debug.stats_reporting]
enabled = true
reporting_interval = "5m"
@@ -112,7 +112,7 @@ where
source,
}
})?;
hardcoded_topology.get_gateways()
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::current_gateways(
@@ -128,7 +128,7 @@ where
// make sure the list of available gateways doesn't overlap the list of known gateways
let available_gateways = available_gateways
.into_iter()
.filter(|g| !registered_gateways.contains(g.identity()))
.filter(|g| !registered_gateways.contains(&g.identity()))
.collect::<Vec<_>>();
if available_gateways.is_empty() {
@@ -167,7 +167,7 @@ where
source,
}
})?;
hardcoded_topology.get_gateways()
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::current_gateways(
@@ -3,7 +3,6 @@
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
@@ -24,7 +23,7 @@ use crate::client::replies::reply_storage::{
};
use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::{
nym_api_provider, TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
};
use crate::config::{Config, DebugConfig};
use crate::error::ClientCoreError;
@@ -32,7 +31,7 @@ use crate::init::{
setup_gateway,
types::{GatewaySetup, InitialisationResult},
};
use crate::{config, spawn_future};
use crate::{config, spawn_future, ForgetMe};
use futures::channel::mpsc;
use log::*;
use nym_bandwidth_controller::BandwidthController;
@@ -188,6 +187,11 @@ pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
user_agent: Option<UserAgent>,
setup_method: GatewaySetup,
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
forget_me: ForgetMe,
}
impl<'a, C, S> BaseClientBuilder<'a, C, S>
@@ -210,9 +214,18 @@ where
shutdown: None,
user_agent: None,
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
connection_fd_callback: None,
forget_me: Default::default(),
}
}
#[must_use]
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
self.forget_me = forget_me.clone();
self
}
#[must_use]
pub fn with_gateway_setup(mut self, setup: GatewaySetup) -> Self {
self.setup_method = setup;
@@ -261,6 +274,15 @@ where
Ok(self)
}
#[cfg(unix)]
pub fn with_connection_fd_callback(
mut self,
callback: Arc<dyn Fn(RawFd) + Send + Sync>,
) -> Self {
self.connection_fd_callback = Some(callback);
self
}
// note: do **NOT** make this method public as its only valid usage is from within `start_base`
// because it relies on the crypto keys being already loaded
fn mix_address(details: &InitialisationResult) -> Recipient {
@@ -352,6 +374,7 @@ where
controller.start_with_shutdown(shutdown)
}
#[allow(clippy::too_many_arguments)]
async fn start_gateway_client(
config: &Config,
initialisation_result: InitialisationResult,
@@ -359,6 +382,7 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
shutdown: TaskClient,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
@@ -401,6 +425,8 @@ where
packet_router,
bandwidth_controller,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
};
@@ -437,8 +463,8 @@ where
details_store
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
.await.map_err(|err| {
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
})?
}
@@ -462,6 +488,7 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
mut shutdown: TaskClient,
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
where
@@ -493,6 +520,8 @@ where
details_store,
packet_router,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
.await?;
@@ -509,15 +538,15 @@ where
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or_else(|| match config_topology.topology_structure {
config::TopologyStructure::NymApi => Box::new(NymApiTopologyProvider::new(
nym_api_provider::Config {
min_mixnode_performance: config_topology.minimum_mixnode_performance,
min_gateway_performance: config_topology.minimum_gateway_performance,
},
config_topology,
nym_api_urls,
user_agent,
)),
config::TopologyStructure::GeoAware(group_by) => {
Box::new(GeoAwareTopologyProvider::new(nym_api_urls, group_by))
warn!("using deprecated 'GeoAware' topology provider - this option will be removed very soon");
#[allow(deprecated)]
Box::new(crate::client::topology_control::GeoAwareTopologyProvider::new(nym_api_urls, group_by))
}
})
}
@@ -528,7 +557,7 @@ where
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: config::Topology,
topology_accessor: TopologyAccessor,
local_gateway: &NodeIdentity,
local_gateway: NodeIdentity,
wait_for_gateway: bool,
mut shutdown: TaskClient,
) -> Result<(), ClientCoreError> {
@@ -560,7 +589,7 @@ where
};
if let Err(err) = topology_refresher
.ensure_contains_gateway(local_gateway)
.ensure_contains_routable_egress(local_gateway)
.await
{
if let Some(waiting_timeout) = gateway_wait_timeout {
@@ -615,9 +644,11 @@ where
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown: TaskClient,
forget_me: ForgetMe,
) -> BatchMixMessageSender {
info!("Starting mix traffic controller...");
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_transceiver);
let (mix_traffic_controller, mix_tx) =
MixTrafficController::new(gateway_transceiver, forget_me);
mix_traffic_controller.start_with_shutdown(shutdown);
mix_tx
}
@@ -708,7 +739,8 @@ where
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor = TopologyAccessor::new();
let shared_topology_accessor =
TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
// Shutdown notifier for signalling tasks to stop
let shutdown = self
@@ -772,6 +804,8 @@ where
&details_store,
gateway_packet_router,
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
shutdown.fork("gateway_transceiver"),
)
.await?;
@@ -797,9 +831,11 @@ where
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
// The MixTrafficController then sends the actual traffic
let message_sender = Self::start_mix_traffic_controller(
gateway_transceiver,
shutdown.fork("mix_traffic_controller"),
self.forget_me,
);
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -163,6 +163,7 @@ impl LoopCoverTrafficStream<OsRng> {
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.our_full_destination,
Some(&self.our_full_destination),
@@ -28,7 +28,6 @@ pub enum InputMessage {
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Creates a message used for a duplex anonymous communication where the recipient
@@ -44,7 +43,6 @@ pub enum InputMessage {
data: Vec<u8>,
reply_surbs: u32,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
@@ -94,29 +92,6 @@ impl InputMessage {
recipient,
data,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
} else {
message
}
}
// IMHO `new_regular` should take `mix_hops: Option<u8>` as an argument instead of creating
// this function, but that would potentially break backwards compatibility with the current API
pub fn new_regular_with_custom_hops(
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Regular {
recipient,
data,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -137,7 +112,6 @@ impl InputMessage {
data,
reply_surbs,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -154,14 +128,12 @@ impl InputMessage {
reply_surbs: u32,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Anonymous {
recipient,
data,
reply_surbs,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -2,8 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::spawn_future;
use crate::{spawn_future, ForgetMe};
use log::*;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
@@ -26,10 +27,14 @@ pub struct MixTrafficController {
// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
consecutive_gateway_failure_count: usize,
forget_me: ForgetMe,
}
impl MixTrafficController {
pub fn new<T>(gateway_transceiver: T) -> (MixTrafficController, BatchMixMessageSender)
pub fn new<T>(
gateway_transceiver: T,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender)
where
T: GatewayTransceiver + Send + 'static,
{
@@ -40,6 +45,7 @@ impl MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_rx: message_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
},
message_sender,
)
@@ -47,6 +53,7 @@ impl MixTrafficController {
pub fn new_dynamic(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender) {
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
@@ -55,6 +62,7 @@ impl MixTrafficController {
gateway_transceiver,
mix_rx: message_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
},
message_sender,
)
@@ -111,7 +119,27 @@ impl MixTrafficController {
}
}
shutdown.recv_timeout().await;
if self.forget_me.any() {
log::info!("Sending forget me request to the gateway");
match self
.gateway_transceiver
.send_client_request(ClientRequest::ForgetMe {
client: self.forget_me.client(),
stats: self.forget_me.stats(),
})
.await
{
Ok(_) => {
log::info!("Successfully sent forget me request to the gateway");
}
Err(err) => {
log::error!("Failed to send forget me request to the gateway: {err}");
}
}
}
log::debug!("MixTrafficController: Exiting");
})
});
}
}
@@ -5,8 +5,10 @@ use async_trait::async_trait;
use log::{debug, error};
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::identity;
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::GatewayClient;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
@@ -26,9 +28,14 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
}
/// This combines combines the functionalities of being able to send and receive mix packets.
#[async_trait]
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> identity::PublicKey;
fn ws_fd(&self) -> Option<RawFd>;
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError>;
}
/// This trait defines the functionality of sending `MixPacket` into the mixnet,
@@ -65,6 +72,7 @@ pub trait GatewayReceiver {
}
// to allow for dynamic dispatch
#[async_trait]
impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
#[inline]
fn gateway_identity(&self) -> identity::PublicKey {
@@ -73,6 +81,13 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
fn ws_fd(&self) -> Option<RawFd> {
(**self).ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
(**self).send_client_request(message).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -91,7 +106,6 @@ impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
(**self).batch_send_mix_packets(packets).await
}
}
impl<G: GatewayReceiver + ?Sized> GatewayReceiver for Box<G> {
#[inline]
fn set_packet_router(&mut self, packet_router: PacketRouter) -> Result<(), ErasedGatewayError> {
@@ -111,6 +125,7 @@ impl<C, St> RemoteGateway<C, St> {
}
}
#[async_trait]
impl<C, St> GatewayTransceiver for RemoteGateway<C, St>
where
C: DkgQueryClient + Send + Sync,
@@ -123,6 +138,20 @@ where
fn ws_fd(&self) -> Option<RawFd> {
self.gateway_client.ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.gateway_client.shared_key() {
self.gateway_client
.send_websocket_message(message.encrypt(&*shared_key)?)
.await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -195,6 +224,7 @@ impl LocalGateway {
mod nonwasm_sealed {
use super::*;
#[async_trait]
impl GatewayTransceiver for LocalGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.local_identity
@@ -202,6 +232,13 @@ mod nonwasm_sealed {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
#[async_trait]
@@ -269,6 +306,7 @@ impl GatewaySender for MockGateway {
}
}
#[async_trait]
impl GatewayTransceiver for MockGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.dummy_identity
@@ -276,4 +314,11 @@ impl GatewayTransceiver for MockGateway {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
@@ -73,11 +73,10 @@ where
content: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_plain_message(recipient, content, lane, packet_type, mix_hops)
.try_send_plain_message(recipient, content, lane, packet_type)
.await
{
warn!("failed to send a plain message - {err}")
@@ -91,18 +90,10 @@ where
reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_message_with_reply_surbs(
recipient,
content,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.try_send_message_with_reply_surbs(recipient, content, reply_surbs, lane, packet_type)
.await
{
warn!("failed to send a repliable message - {err}")
@@ -115,9 +106,8 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, PacketType::Mix, mix_hops)
self.handle_plain_message(recipient, data, lane, PacketType::Mix)
.await
}
InputMessage::Anonymous {
@@ -125,17 +115,9 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
PacketType::Mix,
mix_hops,
)
.await
self.handle_repliable_message(recipient, data, reply_surbs, lane, PacketType::Mix)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -153,9 +135,8 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, packet_type, mix_hops)
self.handle_plain_message(recipient, data, lane, packet_type)
.await
}
InputMessage::Anonymous {
@@ -163,17 +144,9 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.await
self.handle_repliable_message(recipient, data, reply_surbs, lane, packet_type)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -70,7 +70,6 @@ pub(crate) struct PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
destination: PacketDestination,
mix_hops: Option<u8>,
retransmissions: u32,
}
@@ -80,13 +79,11 @@ impl PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
recipient: Recipient,
mix_hops: Option<u8>,
) -> Self {
PendingAcknowledgement {
message_chunk,
delay,
destination: PacketDestination::KnownRecipient(recipient.into()),
mix_hops,
retransmissions: 0,
}
}
@@ -104,9 +101,6 @@ impl PendingAcknowledgement {
recipient_tag,
extra_surb_request,
},
// Messages sent using SURBs are using the number of mix hops set by the recipient when
// they provided the SURBs, so it doesn't make sense to include it here.
mix_hops: None,
retransmissions: 0,
}
}
@@ -52,18 +52,12 @@ where
packet_recipient: Recipient,
chunk_data: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("retransmitting normal packet...");
// TODO: Figure out retransmission packet type signaling
self.message_handler
.try_prepare_single_chunk_for_sending(
packet_recipient,
chunk_data,
packet_type,
mix_hops,
)
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type)
.await
}
@@ -110,7 +104,6 @@ where
**recipient,
timed_out_ack.message_chunk.clone(),
packet_type,
timed_out_ack.mix_hops,
)
.await
}
@@ -15,11 +15,11 @@ use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessa
use nym_sphinx::anonymous_replies::{ReplySurb, SurbEncryptionKey};
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, PacketType, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
use std::sync::Arc;
@@ -100,10 +100,6 @@ pub(crate) struct Config {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
/// Primary predefined packet size used for the encapsulated messages.
primary_packet_size: PacketSize,
@@ -125,19 +121,11 @@ impl Config {
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
primary_packet_size: PacketSize::default(),
secondary_packet_size: None,
}
}
/// Allows setting non-default number of expected mix hops in the network.
#[allow(dead_code)]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
/// Allows setting non-default size of the sphinx packets sent out.
pub fn with_custom_primary_packet_size(mut self, packet_size: PacketSize) -> Self {
self.primary_packet_size = packet_size;
@@ -185,9 +173,7 @@ where
config.sender_address,
config.average_packet_delay,
config.average_ack_delay,
)
.with_mix_hops(config.num_mix_hops);
);
MessageHandler {
config,
rng,
@@ -216,7 +202,7 @@ where
fn get_topology<'a>(
&self,
permit: &'a TopologyReadPermit<'a>,
) -> Result<&'a NymTopology, PreparationError> {
) -> Result<&'a NymRouteProvider, PreparationError> {
match permit.try_get_valid_topology_ref(&self.config.sender_address, None) {
Ok(topology_ref) => Ok(topology_ref),
Err(err) => {
@@ -233,9 +219,8 @@ where
return self.config.primary_packet_size;
};
let primary_count =
msg.required_packets(self.config.primary_packet_size, self.config.num_mix_hops);
let secondary_count = msg.required_packets(secondary_packet, self.config.num_mix_hops);
let primary_count = msg.required_packets(self.config.primary_packet_size);
let secondary_count = msg.required_packets(secondary_packet);
trace!("This message would require: {primary_count} primary packets or {secondary_count} secondary packets...");
// if there would be no benefit in using the secondary packet - use the primary (duh)
@@ -424,10 +409,9 @@ where
message: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
let message = NymMessage::new_plain(message);
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
.await
}
@@ -437,7 +421,6 @@ where
recipient: Recipient,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending non-reply message with packet type {packet_type}");
// TODO: I really dislike existence of this assertion, it implies code has to be re-organised
@@ -470,7 +453,6 @@ where
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)?;
let real_message = RealMessage::new(
@@ -478,8 +460,7 @@ where
Some(fragment.fragment_identifier()),
);
let delay = prepared_fragment.total_delay;
let pending_ack =
PendingAcknowledgement::new_known(fragment, delay, recipient, mix_hops);
let pending_ack = PendingAcknowledgement::new_known(fragment, delay, recipient);
real_messages.push(real_message);
pending_acks.push(pending_ack);
@@ -496,7 +477,6 @@ where
recipient: Recipient,
amount: u32,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending additional reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -513,7 +493,6 @@ where
recipient,
TransmissionLane::AdditionalReplySurbs,
packet_type,
mix_hops,
)
.await?;
@@ -530,7 +509,6 @@ where
num_reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), SurbWrappedPreparationError> {
debug!("Sending message with reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -541,7 +519,7 @@ where
let message =
NymMessage::new_repliable(RepliableMessage::new_data(message, sender_tag, reply_surbs));
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
.await?;
log::trace!("storing {} reply keys", reply_keys.len());
@@ -555,23 +533,18 @@ where
recipient: Recipient,
chunk: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("Sending single chunk with packet type {packet_type}");
let topology_permit = self.topology_access.get_read_permit().await;
let topology = self.get_topology(&topology_permit)?;
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)
.unwrap();
let prepared_fragment = self.message_preparer.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
)?;
Ok(prepared_fragment)
}
@@ -624,16 +597,13 @@ where
Err(err) => return Err(err.return_surbs(vec![reply_surb])),
};
let prepared_fragment = self
.message_preparer
.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)
.unwrap();
let prepared_fragment = self.message_preparer.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)?;
Ok(prepared_fragment)
}
@@ -230,6 +230,7 @@ where
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.config.our_full_destination,
Some(&self.config.our_full_destination),
@@ -516,7 +516,6 @@ where
recipient,
to_send,
nym_sphinx::params::PacketType::Mix,
self.config.reply_surbs.surb_mix_hops,
)
.await
{
@@ -2,8 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::DEFAULT_NUM_MIX_HOPS;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -17,29 +16,36 @@ pub struct TopologyAccessorInner {
// few seconds, while reads are needed every single packet generated.
// However, proper benchmarks will be needed to determine if `RwLock` is indeed a better
// approach than a `Mutex`
topology: RwLock<Option<NymTopology>>,
topology: RwLock<NymRouteProvider>,
}
impl TopologyAccessorInner {
fn new() -> Self {
fn new(initial: NymRouteProvider) -> Self {
TopologyAccessorInner {
controlled_manually: AtomicBool::new(false),
released_manual_control: Notify::new(),
topology: RwLock::new(None),
topology: RwLock::new(initial),
}
}
async fn update(&self, new: Option<NymTopology>) {
*self.topology.write().await = new;
let mut guard = self.topology.write().await;
match new {
Some(updated) => {
guard.update(updated);
}
None => guard.clear_topology(),
}
}
}
pub struct TopologyReadPermit<'a> {
permit: RwLockReadGuard<'a, Option<NymTopology>>,
permit: RwLockReadGuard<'a, NymRouteProvider>,
}
impl Deref for TopologyReadPermit<'_> {
type Target = Option<NymTopology>;
type Target = NymRouteProvider;
fn deref(&self) -> &Self::Target {
&self.permit
@@ -53,43 +59,31 @@ impl<'a> TopologyReadPermit<'a> {
&'a self,
ack_recipient: &Recipient,
packet_recipient: Option<&Recipient>,
) -> Result<&'a NymTopology, NymTopologyError> {
) -> Result<&'a NymRouteProvider, NymTopologyError> {
let route_provider = self.permit.deref();
let topology = &route_provider.topology;
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
let topology = self
.permit
.as_ref()
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
topology.ensure_not_empty()?;
// 2. does it have any mixnode at all?
// 3. does it have any gateways at all?
// 4. does it have a mixnode on each layer?
topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS)?;
// 2. does the topology have a node on each mixing layer?
topology.ensure_minimally_routable()?;
// 5. does it contain OUR gateway (so that we could create an ack packet)?
if !topology.gateway_exists(ack_recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: ack_recipient.gateway().to_base58_string(),
});
}
// 3. does it contain OUR gateway (so that we could create an ack packet)?
let _ = route_provider.egress_by_identity(ack_recipient.gateway())?;
// 6. for our target recipient, does it contain THEIR gateway (so that we could create
// 4. for our target recipient, does it contain THEIR gateway (so that we send anything over?)
if let Some(recipient) = packet_recipient {
if !topology.gateway_exists(recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: recipient.gateway().to_base58_string(),
});
}
let _ = route_provider.egress_by_identity(recipient.gateway())?;
}
Ok(topology)
Ok(route_provider)
}
}
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
fn from(read_permit: RwLockReadGuard<'a, Option<NymTopology>>) -> Self {
TopologyReadPermit {
permit: read_permit,
}
impl<'a> From<RwLockReadGuard<'a, NymRouteProvider>> for TopologyReadPermit<'a> {
fn from(permit: RwLockReadGuard<'a, NymRouteProvider>) -> Self {
TopologyReadPermit { permit }
}
}
@@ -99,9 +93,11 @@ pub struct TopologyAccessor {
}
impl TopologyAccessor {
pub fn new() -> Self {
pub fn new(ignore_egress_epoch_roles: bool) -> Self {
TopologyAccessor {
inner: Arc::new(TopologyAccessorInner::new()),
inner: Arc::new(TopologyAccessorInner::new(NymRouteProvider::new_empty(
ignore_egress_epoch_roles,
))),
}
}
@@ -121,8 +117,21 @@ impl TopologyAccessor {
self.inner.released_manual_control.notified().await
}
#[deprecated(note = "use .current_route_provider instead")]
pub async fn current_topology(&self) -> Option<NymTopology> {
self.inner.topology.read().await.clone()
self.current_route_provider()
.await
.as_ref()
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
} else {
Some(provider)
}
}
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
@@ -140,15 +149,11 @@ impl TopologyAccessor {
// only used by the client at startup to get a slightly more reasonable error message
// (currently displays as unused because health checker is disabled due to required changes)
pub async fn ensure_is_routable(&self) -> Result<(), NymTopologyError> {
match self.inner.topology.read().await.deref() {
None => Err(NymTopologyError::EmptyNetworkTopology),
Some(ref topology) => topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS),
}
}
}
impl Default for TopologyAccessor {
fn default() -> Self {
TopologyAccessor::new()
self.inner
.topology
.read()
.await
.topology
.ensure_minimally_routable()
}
}
@@ -3,7 +3,6 @@ use log::{debug, error};
use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond};
use nym_network_defaults::var_names::EXPLORER_API;
use nym_topology::{
nym_topology_from_basic_info,
provider_trait::{async_trait, TopologyProvider},
NymTopology,
};
@@ -15,8 +14,6 @@ use url::Url;
pub use nym_country_group::CountryGroup;
const MIN_NODES_PER_LAYER: usize = 1;
fn create_explorer_client() -> Option<ExplorerClient> {
let Ok(explorer_api_url) = std::env::var(EXPLORER_API) else {
error!("Missing EXPLORER_API");
@@ -63,30 +60,20 @@ fn log_mixnode_distribution(mixnodes: &HashMap<CountryGroup, Vec<NodeId>>) {
}
fn check_layer_integrity(topology: NymTopology) -> Result<(), ()> {
let mixes = topology.mixes();
if mixes.keys().len() < 3 {
if topology.ensure_minimally_routable().is_err() {
error!("Layer is missing in topology!");
return Err(());
}
for (layer, mixnodes) in mixes {
debug!("Layer {:?} has {} mixnodes", layer, mixnodes.len());
if mixnodes.len() < MIN_NODES_PER_LAYER {
error!(
"There are only {} mixnodes in layer {:?}",
mixnodes.len(),
layer
);
return Err(());
}
}
Ok(())
}
#[deprecated(note = "use NymApiTopologyProvider instead as explorer API will soon be removed")]
pub struct GeoAwareTopologyProvider {
validator_client: nym_validator_client::client::NymApiClient,
filter_on: GroupBy,
}
#[allow(deprecated)]
impl GeoAwareTopologyProvider {
pub fn new(mut nym_api_urls: Vec<Url>, filter_on: GroupBy) -> GeoAwareTopologyProvider {
log::info!(
@@ -104,6 +91,15 @@ impl GeoAwareTopologyProvider {
}
async fn get_topology(&self) -> Option<NymTopology> {
let rewarded_set = self
.validator_client
.get_current_rewarded_set()
.await
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let mut topology = NymTopology::new_empty(rewarded_set);
let mixnodes = match self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
@@ -187,7 +183,8 @@ impl GeoAwareTopologyProvider {
.filter(|m| filtered_mixnode_ids.contains(&m.node_id))
.collect::<Vec<_>>();
let topology = nym_topology_from_basic_info(&mixnodes, &gateways);
topology.add_skimmed_nodes(&mixnodes);
topology.add_skimmed_nodes(&gateways);
// TODO: return real error type
check_layer_integrity(topology.clone()).ok()?;
@@ -196,6 +193,7 @@ impl GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -205,6 +203,7 @@ impl TopologyProvider for GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(target_arch = "wasm32")]
#[async_trait(?Send)]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -19,6 +19,7 @@ mod accessor;
pub mod geo_aware_provider;
pub mod nym_api_provider;
#[allow(deprecated)]
pub use geo_aware_provider::GeoAwareTopologyProvider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::TopologyProvider;
@@ -27,7 +28,7 @@ pub use nym_topology::provider_trait::TopologyProvider;
const MAX_FAILURE_COUNT: usize = 10;
pub struct TopologyRefresherConfig {
refresh_rate: Duration,
pub refresh_rate: Duration,
}
impl TopologyRefresherConfig {
@@ -96,28 +97,24 @@ impl TopologyRefresher {
self.topology_accessor.ensure_is_routable().await
}
pub async fn ensure_contains_gateway(
pub async fn ensure_contains_routable_egress(
&self,
gateway: &NodeIdentity,
egress: NodeIdentity,
) -> Result<(), NymTopologyError> {
let topology = self
.topology_accessor
.current_topology()
.current_route_provider()
.await
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
if !topology.gateway_exists(gateway) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: gateway.to_base58_string(),
});
}
let _ = topology.egress_by_identity(egress)?;
Ok(())
}
pub async fn wait_for_gateway(
&mut self,
gateway: &NodeIdentity,
gateway: NodeIdentity,
timeout_duration: Duration,
) -> Result<(), NymTopologyError> {
info!(
@@ -135,7 +132,7 @@ impl TopologyRefresher {
})
}
_ = self.try_refresh() => {
if self.ensure_contains_gateway(gateway).await.is_ok() {
if self.ensure_contains_routable_egress(gateway).await.is_ok() {
return Ok(())
}
info!("gateway '{gateway}' is still not online...");
@@ -4,32 +4,39 @@
use async_trait::async_trait;
use log::{debug, error, warn};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::NymTopology;
use nym_validator_client::UserAgent;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
use url::Url;
// the same values as our current (10.06.24) blacklist
pub const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
pub const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
#[derive(Debug)]
pub struct Config {
pub min_mixnode_performance: u8,
pub min_gateway_performance: u8,
pub use_extended_topology: bool,
pub ignore_egress_epoch_role: bool,
}
impl Default for Config {
fn default() -> Self {
// old values that decided on blacklist membership
impl From<nym_client_core_config_types::Topology> for Config {
fn from(value: nym_client_core_config_types::Topology) -> Self {
Config {
min_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
min_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
min_mixnode_performance: value.minimum_mixnode_performance,
min_gateway_performance: value.minimum_gateway_performance,
use_extended_topology: value.use_extended_topology,
ignore_egress_epoch_role: value.ignore_egress_epoch_role,
}
}
}
impl Config {
// if we're using 'extended' topology, filter the nodes based on the lowest set performance
fn min_node_performance(&self) -> u8 {
min(self.min_mixnode_performance, self.min_gateway_performance)
}
}
pub struct NymApiTopologyProvider {
config: Config,
@@ -39,7 +46,11 @@ pub struct NymApiTopologyProvider {
}
impl NymApiTopologyProvider {
pub fn new(config: Config, mut nym_api_urls: Vec<Url>, user_agent: Option<UserAgent>) -> Self {
pub fn new(
config: impl Into<Config>,
mut nym_api_urls: Vec<Url>,
user_agent: Option<UserAgent>,
) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
let validator_client = if let Some(user_agent) = user_agent {
@@ -52,7 +63,7 @@ impl NymApiTopologyProvider {
};
NymApiTopologyProvider {
config,
config: config.into(),
validator_client,
nym_api_urls,
currently_used_api: 0,
@@ -70,70 +81,69 @@ impl NymApiTopologyProvider {
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
}
/// Verifies whether nodes a reasonably distributed among all mix layers.
///
/// In ideal world we would have 33% nodes on layer 1, 33% on layer 2 and 33% on layer 3.
/// However, this is a rather unrealistic expectation, instead we check whether there exists
/// a layer with more than 66% of nodes or with fewer than 15% and if so, we trigger a failure.
///
/// # Arguments
///
/// * `topology`: active topology constructed from validator api data
fn check_layer_distribution(
&self,
active_topology: &NymTopology,
) -> Result<(), NymTopologyError> {
let lower_threshold = 0.15;
let upper_threshold = 0.66;
active_topology.ensure_even_layer_distribution(lower_threshold, upper_threshold)
}
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
let mixnodes = match self
let rewarded_set = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
.get_current_rewarded_set()
.await
{
Err(err) => {
error!("failed to get network mixnodes - {err}");
return None;
}
Ok(mixes) => mixes,
};
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let gateways = match self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
{
Err(err) => {
error!("failed to get network gateways - {err}");
return None;
}
Ok(gateways) => gateways,
};
let mut topology = NymTopology::new_empty(rewarded_set);
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
if self.config.use_extended_topology {
let all_nodes = self
.validator_client
.get_all_basic_nodes()
.await
.inspect_err(|err| error!("failed to get network nodes: {err}"))
.ok()?;
let topology = NymTopology::from_unordered(
mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}),
gateways.iter().filter(|g| {
g.performance.round_to_integer() >= self.config.min_gateway_performance
}),
);
if let Err(err) = self.check_layer_distribution(&topology) {
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
self.use_next_nym_api();
None
debug!(
"there are {} nodes on the network (before filtering)",
all_nodes.len()
);
topology.add_additional_nodes(all_nodes.iter().filter(|n| {
n.performance.round_to_integer() >= self.config.min_node_performance()
}));
} else {
Some(topology)
// if we're not using extended topology, we're only getting active set mixnodes and gateways
let mixnodes = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network mixnodes: {err}"))
.ok()?;
// TODO: we really should be getting ACTIVE gateways only
let gateways = self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network gateways: {err}"))
.ok()?;
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
topology.add_additional_nodes(mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}));
topology.add_additional_nodes(gateways.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_gateway_performance
}));
};
if !topology.is_minimally_routable() {
error!("the current filtered active topology can't be used to construct any packets");
return None;
}
Some(topology)
}
}
@@ -142,7 +152,11 @@ impl NymApiTopologyProvider {
#[async_trait]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
self.get_current_compatible_topology().await
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
}
}
@@ -150,6 +164,10 @@ impl TopologyProvider for NymApiTopologyProvider {
#[async_trait(?Send)]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
self.get_current_compatible_topology().await
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
}
}
+37
View File
@@ -7,3 +7,40 @@ pub use nym_client_core_config_types::old::{
old_config_v1_1_33,
};
pub use nym_client_core_config_types::*;
include!(concat!(env!("OUT_DIR"), "/default.rs"));
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dump_default() {
let cfg = Config::new("","");
println!("{}", toml::to_string(&cfg).unwrap());
}
#[test]
fn bootstrapped_config() {
#[cfg(not(feature = "enable-cfg"))]
{
let config = new_bootstrapped("id1", "v0.0.0");
assert_eq!(config.client.id, "id1");
assert_eq!(config.client.version, "v0.0.0");
assert_eq!(config.client.disabled_credentials_mode, true);
assert_eq!(config.debug.topology.use_extended_topology, false);
assert_eq!(config.debug.stats_reporting.enabled, true);
}
#[cfg(feature = "enable-cfg")]
{
let config = new_bootstrapped("id2", "v0.0.0-beta");
assert_eq!(config.client.id, "id2");
assert_eq!(config.client.version, "v0.0.0-beta");
assert_eq!(config.client.disabled_credentials_mode, true);
}
}
}
+7 -4
View File
@@ -4,8 +4,8 @@
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_topology::gateway::GatewayConversionError;
use nym_topology::NymTopologyError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::ValidatorClientError;
use std::error::Error;
use std::path::PathBuf;
@@ -74,10 +74,10 @@ pub enum ClientCoreError {
#[error("the gateway id is invalid - {0}")]
UnableToCreatePublicKeyFromGatewayId(Ed25519RecoveryError),
#[error("The gateway is malformed: {source}")]
#[error("the node is malformed: {source}")]
MalformedGateway {
#[from]
source: GatewayConversionError,
source: Box<RoutingNodeError>,
},
#[error("failed to establish connection to gateway: {source}")]
@@ -159,6 +159,9 @@ pub enum ClientCoreError {
#[error("the specified gateway '{gateway}' does not support the wss protocol")]
UnsupportedWssProtocol { gateway: String },
#[error("node {id} ({identity}) does not support mixnet entry mode")]
UnsupportedEntry { id: NodeId, identity: String },
#[error(
"failed to load custom topology using path '{}'. detailed message: {source}", file_path.display()
)]
+40 -20
View File
@@ -7,7 +7,7 @@ use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use nym_crypto::asymmetric::identity;
use nym_gateway_client::GatewayClient;
use nym_topology::gateway;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKeyRef;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
@@ -15,6 +15,7 @@ use std::{sync::Arc, time::Duration};
use tungstenite::Message;
use url::Url;
use nym_topology::NodeId;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
@@ -25,7 +26,6 @@ use tokio::time::Instant;
use tokio_tungstenite::connect_async;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
#[cfg(target_arch = "wasm32")]
@@ -48,22 +48,30 @@ const PING_TIMEOUT: Duration = Duration::from_millis(1000);
// The abstraction that some of these helpers use
pub trait ConnectableGateway {
fn identity(&self) -> &identity::PublicKey;
fn clients_address(&self) -> String;
fn node_id(&self) -> NodeId;
fn identity(&self) -> identity::PublicKey;
fn clients_address(&self, prefer_ipv6: bool) -> Option<String>;
fn is_wss(&self) -> bool;
}
impl ConnectableGateway for gateway::LegacyNode {
fn identity(&self) -> &identity::PublicKey {
self.identity()
impl ConnectableGateway for RoutingNode {
fn node_id(&self) -> NodeId {
self.node_id
}
fn clients_address(&self) -> String {
self.clients_address()
fn identity(&self) -> identity::PublicKey {
self.identity_key
}
fn clients_address(&self, prefer_ipv6: bool) -> Option<String> {
self.ws_entry_address(prefer_ipv6)
}
fn is_wss(&self) -> bool {
self.clients_wss_port.is_some()
self.entry
.as_ref()
.map(|e| e.clients_wss_port.is_some())
.unwrap_or_default()
}
}
@@ -83,7 +91,7 @@ pub async fn current_gateways<R: Rng>(
nym_apis: &[Url],
user_agent: Option<UserAgent>,
minimum_performance: u8,
) -> Result<Vec<gateway::LegacyNode>, ClientCoreError> {
) -> Result<Vec<RoutingNode>, ClientCoreError> {
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
@@ -104,7 +112,7 @@ pub async fn current_gateways<R: Rng>(
.iter()
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<gateway::LegacyNode>>();
.collect::<Vec<_>>();
log::debug!("After checking validity: {}", valid_gateways.len());
log::trace!("Valid gateways: {:#?}", valid_gateways);
@@ -134,7 +142,12 @@ async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, Client
where
G: ConnectableGateway,
{
let addr = gateway.clients_address();
let Some(addr) = gateway.clients_address(false) else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id(),
identity: gateway.identity().to_string(),
});
};
trace!(
"establishing connection to {} ({addr})...",
gateway.identity(),
@@ -190,7 +203,7 @@ where
Ok(GatewayWithLatency::new(gateway, avg))
}
pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone>(
pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
rng: &mut R,
gateways: &[G],
must_use_tls: bool,
@@ -205,7 +218,7 @@ pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone
let gateways_with_latency = Arc::new(tokio::sync::Mutex::new(Vec::new()));
futures::stream::iter(gateways)
.for_each_concurrent(CONCURRENT_GATEWAYS_MEASURED, |gateway| async {
let id = *gateway.identity();
let id = gateway.identity();
trace!("measuring latency to {id}...");
match measure_latency(gateway).await {
Ok(with_latency) => {
@@ -252,9 +265,9 @@ fn filter_by_tls<G: ConnectableGateway>(
pub(super) fn uniformly_random_gateway<R: Rng>(
rng: &mut R,
gateways: &[gateway::LegacyNode],
gateways: &[RoutingNode],
must_use_tls: bool,
) -> Result<gateway::LegacyNode, ClientCoreError> {
) -> Result<RoutingNode, ClientCoreError> {
filter_by_tls(gateways, must_use_tls)?
.choose(rng)
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
@@ -263,9 +276,9 @@ pub(super) fn uniformly_random_gateway<R: Rng>(
pub(super) fn get_specified_gateway(
gateway_identity: IdentityKeyRef,
gateways: &[gateway::LegacyNode],
gateways: &[RoutingNode],
must_use_tls: bool,
) -> Result<gateway::LegacyNode, ClientCoreError> {
) -> Result<RoutingNode, ClientCoreError> {
log::debug!("Requesting specified gateway: {}", gateway_identity);
let user_gateway = identity::PublicKey::from_base58_string(gateway_identity)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
@@ -275,7 +288,14 @@ pub(super) fn get_specified_gateway(
.find(|gateway| gateway.identity_key == user_gateway)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_identity.to_string()))?;
if must_use_tls && gateway.clients_wss_port.is_none() {
let Some(entry_details) = gateway.entry.as_ref() else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id,
identity: gateway.identity().to_string(),
});
};
if must_use_tls && entry_details.clients_wss_port.is_none() {
return Err(ClientCoreError::UnsupportedWssProtocol {
gateway: gateway_identity.to_string(),
});
+2 -2
View File
@@ -19,7 +19,7 @@ use crate::init::types::{
use nym_client_core_gateways_storage::GatewaysDetailsStore;
use nym_client_core_gateways_storage::{GatewayDetails, GatewayRegistration};
use nym_gateway_client::client::InitGatewayClient;
use nym_topology::gateway;
use nym_topology::node::RoutingNode;
use rand::rngs::OsRng;
use rand::{CryptoRng, RngCore};
use serde::Serialize;
@@ -50,7 +50,7 @@ async fn setup_new_gateway<K, D>(
key_store: &K,
details_store: &D,
selection_specification: GatewaySelectionSpecification,
available_gateways: Vec<gateway::LegacyNode>,
available_gateways: Vec<RoutingNode>,
) -> Result<InitialisationResult, ClientCoreError>
where
K: KeyStore,
+12 -5
View File
@@ -13,7 +13,7 @@ use nym_crypto::asymmetric::identity;
use nym_gateway_client::client::InitGatewayClient;
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::gateway;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKey;
use nym_validator_client::nyxd::AccountId;
use serde::Serialize;
@@ -38,16 +38,23 @@ pub enum SelectedGateway {
impl SelectedGateway {
pub fn from_topology_node(
node: gateway::LegacyNode,
node: RoutingNode,
must_use_tls: bool,
) -> Result<Self, ClientCoreError> {
// for now, let's use 'old' behaviour, if you want to change it, you can pass it up the enum stack yourself : )
let prefer_ipv6 = false;
let gateway_listener = if must_use_tls {
node.clients_address_tls()
node.ws_entry_address_tls()
.ok_or(ClientCoreError::UnsupportedWssProtocol {
gateway: node.identity_key.to_base58_string(),
})?
} else {
node.clients_address()
node.ws_entry_address(prefer_ipv6)
.ok_or(ClientCoreError::UnsupportedEntry {
id: node.node_id,
identity: node.identity_key.to_base58_string(),
})?
};
let gateway_listener =
@@ -200,7 +207,7 @@ pub enum GatewaySetup {
specification: GatewaySelectionSpecification,
// TODO: seems to be a bit inefficient to pass them by value
available_gateways: Vec<gateway::LegacyNode>,
available_gateways: Vec<RoutingNode>,
},
ReuseConnection {
+46 -2
View File
@@ -14,8 +14,7 @@ pub mod error;
pub mod init;
pub use nym_topology::{
HardcodedTopologyProvider, NymTopology, NymTopologyError, SerializableNymTopology,
SerializableTopologyError, TopologyProvider,
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
};
#[cfg(target_arch = "wasm32")]
@@ -34,3 +33,48 @@ where
{
tokio::spawn(future);
}
#[derive(Clone, Default, Debug)]
pub struct ForgetMe {
client: bool,
stats: bool,
}
impl ForgetMe {
pub fn new_all() -> Self {
Self {
client: true,
stats: true,
}
}
pub fn new_client() -> Self {
Self {
client: true,
stats: false,
}
}
pub fn new_stats() -> Self {
Self {
client: false,
stats: true,
}
}
pub fn new(client: bool, stats: bool) -> Self {
Self { client, stats }
}
pub fn any(&self) -> bool {
self.client || self.stats
}
pub fn client(&self) -> bool {
self.client
}
pub fn stats(&self) -> bool {
self.stats
}
}
@@ -9,7 +9,10 @@ use crate::backend::fs_backend::{
},
};
use log::{error, info};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
#[derive(Debug, Clone)]
@@ -31,6 +34,9 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(fresh)
.disable_statement_logging();
@@ -101,6 +101,10 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
// currently unused (but populated)
negotiated_protocol: Option<u8>,
// Callback on the fd as soon as the connection has been established
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
/// Listen to shutdown messages and send notifications back to the task manager
task_client: TaskClient,
}
@@ -116,6 +120,7 @@ impl<C, St> GatewayClient<C, St> {
packet_router: PacketRouter,
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
task_client: TaskClient,
) -> Self {
GatewayClient {
@@ -131,6 +136,8 @@ impl<C, St> GatewayClient<C, St> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
task_client,
}
}
@@ -205,6 +212,12 @@ impl<C, St> GatewayClient<C, St> {
};
self.connection = SocketState::Available(Box::new(ws_stream));
#[cfg(unix)]
if let (Some(callback), Some(fd)) = (self.connection_fd_callback.as_ref(), self.ws_fd()) {
callback.as_ref()(fd);
}
Ok(())
}
@@ -311,7 +324,7 @@ impl<C, St> GatewayClient<C, St> {
// If we want to send a message (with response), we need to have a full control over the socket,
// as we need to be able to write the request and read the subsequent response
async fn send_websocket_message(
pub async fn send_websocket_message(
&mut self,
msg: impl Into<Message>,
) -> Result<ServerResponse, GatewayClientError> {
@@ -1034,6 +1047,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_controller: None,
stats_reporter: ClientStatsSender::new(None),
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback: None,
task_client,
}
}
@@ -1064,6 +1079,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: self.negotiated_protocol,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
task_client,
}
}
+3 -1
View File
@@ -8,10 +8,12 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dashmap = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tokio = { workspace = true, features = ["time", "sync"] }
tokio-util = { workspace = true, features = ["codec"], optional = true }
tokio-stream = { workspace = true }
# internal
nym-sphinx = { path = "../../nymsphinx" }
+141 -86
View File
@@ -1,21 +1,24 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::channel::mpsc;
use dashmap::DashMap;
use futures::StreamExt;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::params::PacketType;
use nym_sphinx::NymPacket;
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::Framed;
use tracing::*;
@@ -55,11 +58,37 @@ pub trait SendWithoutResponse {
}
pub struct Client {
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
active_connections: ActiveConnections,
connections_count: Arc<AtomicUsize>,
config: Config,
}
struct ConnectionSender {
#[derive(Default, Clone)]
pub struct ActiveConnections {
inner: Arc<DashMap<NymNodeRoutingAddress, ConnectionSender>>,
}
impl ActiveConnections {
pub fn pending_packets(&self) -> usize {
self.inner
.iter()
.map(|sender| {
let max_capacity = sender.channel.max_capacity();
let capacity = sender.channel.capacity();
max_capacity - capacity
})
.sum()
}
}
impl Deref for ActiveConnections {
type Target = DashMap<NymNodeRoutingAddress, ConnectionSender>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct ConnectionSender {
channel: mpsc::Sender<FramedNymPacket>,
current_reconnection_attempt: Arc<AtomicU32>,
}
@@ -73,46 +102,53 @@ impl ConnectionSender {
}
}
impl Client {
pub fn new(config: Config) -> Client {
Client {
conn_new: HashMap::new(),
config,
struct ManagedConnection {
address: SocketAddr,
message_receiver: ReceiverStream<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
}
impl ManagedConnection {
fn new(
address: SocketAddr,
message_receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
) -> Self {
ManagedConnection {
address,
message_receiver: ReceiverStream::new(message_receiver),
connection_timeout,
current_reconnection,
}
}
async fn manage_connection(
address: SocketAddr,
receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
) {
async fn run(self) {
let address = self.address;
let connection_fut = TcpStream::connect(address);
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", address);
debug!("Managed to establish connection to {}", self.address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
self.current_reconnection.store(0, Ordering::Release);
Framed::new(stream, NymCodec)
}
Err(err) => {
debug!(
"failed to establish connection to {} (err: {})",
address, err
);
debug!("failed to establish connection to {address} (err: {err})",);
return;
}
},
Err(_) => {
debug!(
"failed to connect to {} within {:?}",
address, connection_timeout
"failed to connect to {address} within {:?}",
self.connection_timeout
);
// we failed to connect - increase reconnection attempt
current_reconnection.fetch_add(1, Ordering::SeqCst);
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
@@ -120,15 +156,28 @@ impl Client {
// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {} - {err}", address);
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {address}: {err}");
}
debug!(
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
address
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
);
}
}
impl Client {
pub fn new(config: Config, connections_count: Arc<AtomicUsize>) -> Client {
Client {
active_connections: Default::default(),
connections_count,
config,
}
}
pub fn active_connections(&self) -> ActiveConnections {
self.active_connections.clone()
}
/// If we're trying to reconnect, determine how long we should wait.
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
@@ -148,7 +197,7 @@ impl Client {
}
fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) {
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
let (sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
// this CAN'T fail because we just created the channel which has a non-zero capacity
if self.config.maximum_connection_buffer_size > 0 {
@@ -156,15 +205,16 @@ impl Client {
}
// if we already tried to connect to `address` before, grab the current attempt count
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.conn_new.insert(address, new_entry);
current_attempt
};
let current_reconnection_attempt =
if let Some(mut existing) = self.active_connections.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.active_connections.insert(address, new_entry);
current_attempt
};
// load the actual value.
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
@@ -173,6 +223,7 @@ impl Client {
// copy the value before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;
let connections_count = self.connections_count.clone();
tokio::spawn(async move {
// before executing the manager, wait for what was specified, if anything
if let Some(backoff) = backoff {
@@ -180,13 +231,16 @@ impl Client {
sleep(backoff).await;
}
Self::manage_connection(
connections_count.fetch_add(1, Ordering::SeqCst);
ManagedConnection::new(
address.into(),
receiver,
initial_connection_timeout,
&current_reconnection_attempt,
current_reconnection_attempt,
)
.await
.run()
.await;
connections_count.fetch_sub(1, Ordering::SeqCst);
});
}
}
@@ -201,49 +255,47 @@ impl SendWithoutResponse for Client {
trace!("Sending packet to {address:?}");
let framed_packet = FramedNymPacket::new(packet, packet_type);
if let Some(sender) = self.conn_new.get_mut(&address) {
if let Err(err) = sender.channel.try_send(framed_packet) {
if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
))
} else if err.is_disconnected() {
debug!(
"Connection to {} seems to be dead. attempting to re-establish it...",
address
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, err.into_inner());
Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
))
} else {
// this can't really happen, but let's safe-guard against it in case something changes in futures library
Err(io::Error::new(
io::ErrorKind::Other,
"unknown connection buffer error",
))
}
} else {
Ok(())
}
} else {
let Some(sender) = self.active_connections.get_mut(&address) else {
// there was never a connection to begin with
debug!("establishing initial connection to {}", address);
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
Err(io::Error::new(
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection is in progress",
))
}
));
};
let sending_res = sender.channel.try_send(framed_packet);
drop(sender);
sending_res.map_err(|err| {
match err {
TrySendError::Full(_) => {
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
)
}
TrySendError::Closed(dropped) => {
debug!(
"Connection to {address} seems to be dead. attempting to re-establish it...",
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, dropped);
io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
)
}
}
} )
}
}
@@ -252,12 +304,15 @@ mod tests {
use super::*;
fn dummy_client() -> Client {
Client::new(Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
})
Client::new(
Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
},
Default::default(),
)
}
#[test]
@@ -32,10 +32,10 @@ use time::Date;
use url::Url;
pub use crate::nym_api::NymApiClientExt;
use nym_mixnet_contract_common::EpochRewardedSet;
pub use nym_mixnet_contract_common::{
mixnode::MixNodeDetails, GatewayBond, IdentityKey, IdentityKeyRef, NodeId, NymNodeDetails,
};
// re-export the type to not break existing imports
pub use crate::coconut::EcashApiClient;
@@ -367,6 +367,10 @@ impl NymApiClient {
Ok(self.nym_api.get_basic_gateways().await?.nodes)
}
pub async fn get_current_rewarded_set(&self) -> Result<EpochRewardedSet, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_set().await?.into())
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
pub async fn get_all_basic_entry_assigned_nodes(
@@ -13,7 +13,7 @@ use nym_api_requests::ecash::models::{
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, LegacyDescribedMixNode, NodePerformanceResponse,
NodeRefreshBody, NymNodeDescription,
NodeRefreshBody, NymNodeDescription, RewardedSetResponse,
};
use nym_api_requests::nym_nodes::PaginatedCachedNodesResponse;
use nym_api_requests::pagination::PaginatedResponse;
@@ -235,6 +235,15 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_set(&self) -> Result<RewardedSetResponse, NymAPIError> {
self.get_json(
&[routes::API_VERSION, "nym-nodes", "rewarded-set"],
NO_PARAMS,
)
.await
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
#[instrument(level = "debug", skip(self))]
@@ -912,6 +921,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn force_refresh_describe_cache(
&self,
request: &NodeRefreshBody,
@@ -924,6 +934,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_for(
&self,
expiration_date: Date,
@@ -940,6 +951,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_challenge(
&self,
expiration_date: Date,
@@ -26,10 +26,10 @@ use nym_mixnet_contract_common::{
reward_params::{Performance, RewardingParams},
rewarding::{EstimatedCurrentEpochRewardResponse, PendingRewardResponse},
ContractBuildInformation, ContractState, ContractStateParams, CurrentIntervalResponse,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochStatus, GatewayBond,
GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry, IdentityKey,
IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails, MixOwnershipResponse,
MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochRewardedSet, EpochStatus,
GatewayBond, GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry,
IdentityKey, IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails,
MixOwnershipResponse, MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
NumberOfPendingEventsResponse, NymNodeBond, NymNodeDetails, NymNodeVersionHistoryResponse,
PagedAllDelegationsResponse, PagedDelegatorDelegationsResponse, PagedGatewayResponse,
PagedMixnodeBondsResponse, PagedNodeDelegationsResponse, PendingEpochEvent,
@@ -670,7 +670,7 @@ impl<T> PagedMixnetQueryClient for T where T: MixnetQueryClient {}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MixnetQueryClientExt: MixnetQueryClient {
async fn get_rewarded_set(&self) -> Result<RewardedSet, NyxdError> {
async fn get_rewarded_set(&self) -> Result<EpochRewardedSet, NyxdError> {
let error_response = |message| Err(NyxdError::extension_query_failure("mixnet", message));
let metadata = self.get_rewarded_set_metadata().await?;
@@ -711,13 +711,16 @@ pub trait MixnetQueryClientExt: MixnetQueryClient {
return error_response("the nodes assigned for 'standby' returned unexpected epoch_id");
}
Ok(RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
Ok(EpochRewardedSet {
epoch_id: expected_epoch_id,
assignment: RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
},
})
}
}
+2 -1
View File
@@ -12,7 +12,8 @@ dirs = { workspace = true, optional = true }
handlebars = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
toml = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true, features = ["display"] }
url = { workspace = true }
nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
+10
View File
@@ -0,0 +1,10 @@
use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NymConfigTomlError {
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
TomlSerializeFailure(#[from] toml::ser::Error),
}
+37
View File
@@ -13,6 +13,7 @@ pub use helpers::{parse_urls, OptionalSet};
pub use toml::de::Error as TomlDeError;
pub mod defaults;
pub mod error;
pub mod helpers;
pub mod legacy_helpers;
pub mod serde_helpers;
@@ -95,6 +96,42 @@ where
config.format_to_writer(file)
}
pub fn save_unformatted_config_to_file<C, P>(
config: &C,
path: P,
) -> Result<(), error::NymConfigTomlError>
where
C: Serialize + ?Sized,
P: AsRef<Path>,
{
let path = path.as_ref();
log::info!("saving config file to {}", path.display());
if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}
let mut file = File::create(path)?;
// TODO: check for whether any of our configs store anything sensitive
// and change that to 0o644 instead
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(path, perms)?;
}
// let serde format the TOML in whatever ugly way it chooses
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
let toml_string = toml::to_string_pretty(config)?;
Ok(file.write_all(toml_string.as_bytes())?)
}
pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
where
C: DeserializeOwned,
@@ -13,6 +13,7 @@ cosmwasm-std = { workspace = true }
cosmwasm-schema = { workspace = true }
cw-storage-plus = { workspace = true }
schemars = { workspace = true }
utoipa = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
@@ -23,4 +24,5 @@ serde_json = { workspace = true }
vergen = { workspace = true, features = ["build", "git", "gitcl", "rustc", "cargo"] }
[features]
naive_float = []
naive_float = []
utoipa = ["dep:utoipa"]
@@ -221,6 +221,7 @@ fn default_unknown() -> String {
// TODO: there's no reason this couldn't be used for proper binaries, but in that case
// perhaps the struct should get renamed and moved to a "more" common crate
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct ContractBuildInformation {
/// Provides the name of the binary, i.e. the content of `CARGO_PKG_NAME` environmental variable.
#[serde(default = "default_unknown")]
@@ -42,9 +42,11 @@ pub struct Gateway {
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct GatewayBond {
/// Original amount pledged by the operator of this node.
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
pub pledge_amount: Coin,
/// Address of the owner of this gateway.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub owner: Addr,
/// Block height at which this gateway has been bonded.
@@ -55,6 +57,7 @@ pub struct GatewayBond {
/// Entity who bonded this gateway on behalf of the owner.
/// If exists, it's most likely the address of the vesting contract.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub proxy: Option<Addr>,
}
@@ -7,6 +7,7 @@
use crate::constants::{TOKEN_SUPPLY, UNIT_DELEGATION_BASE};
use crate::error::MixnetContractError;
use crate::helpers::IntoBaseDecimal;
use crate::nym_node::Role;
use crate::reward_params::{NodeRewardingParameters, RewardingParams};
use crate::rewarding::helpers::truncate_reward;
use crate::rewarding::RewardDistribution;
@@ -81,20 +82,25 @@ impl MixNodeDetails {
// currently this struct is shared between mixnodes and nymnodes
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NodeRewarding {
/// Information provided by the operator that influence the cost function.
pub cost_params: NodeCostParams,
/// Total pledge and compounded reward earned by the node operator.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub operator: Decimal,
/// Total delegation and compounded reward earned by all node delegators.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub delegates: Decimal,
/// Cumulative reward earned by the "unit delegation" since the block 0.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub total_unit_reward: Decimal,
/// Value of the theoretical "unit delegation" that has delegated to this node at block 0.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub unit_delegation: Decimal,
/// Marks the epoch when this node was last rewarded so that we wouldn't accidentally attempt
@@ -491,14 +497,17 @@ impl NodeRewarding {
::cosmwasm_schema::schemars::JsonSchema,
)]
#[schemars(crate = "::cosmwasm_schema::schemars")]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct MixNodeBond {
/// Unique id assigned to the bonded mixnode.
pub mix_id: NodeId,
/// Address of the owner of this mixnode.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub owner: Addr,
/// Original amount pledged by the operator of this node.
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
pub original_pledge: Coin,
// REMOVED (but might be needed due to legacy things, idk yet)
@@ -509,6 +518,7 @@ pub struct MixNodeBond {
/// Entity who bonded this mixnode on behalf of the owner.
/// If exists, it's most likely the address of the vesting contract.
#[cfg_attr(feature = "utoipa", schema(value_type = Option<String>))]
pub proxy: Option<Addr>,
/// Block height at which this mixnode has been bonded.
@@ -544,6 +554,7 @@ impl MixNodeBond {
feature = "generate-ts",
ts(export, export_to = "ts-packages/types/src/types/rust/Mixnode.ts")
)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct MixNode {
/// Network address of this mixnode, for example 1.1.1.1 or foo.mixnode.com
pub host: String,
@@ -570,11 +581,14 @@ pub struct MixNode {
/// The cost parameters, or the cost function, defined for the particular mixnode that influences
/// how the rewards should be split between the node operator and its delegators.
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NodeCostParams {
/// The profit margin of the associated node, i.e. the desired percent of the reward to be distributed to the operator.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub profit_margin_percent: Percent,
/// Operating cost of the associated node per the entire interval.
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
pub interval_operating_cost: Coin,
}
@@ -611,6 +625,16 @@ pub enum LegacyMixLayer {
Three = 3,
}
impl From<LegacyMixLayer> for Role {
fn from(layer: LegacyMixLayer) -> Self {
match layer {
LegacyMixLayer::One => Role::Layer1,
LegacyMixLayer::Two => Role::Layer2,
LegacyMixLayer::Three => Role::Layer3,
}
}
}
impl From<LegacyMixLayer> for String {
fn from(layer: LegacyMixLayer) -> Self {
(layer as u8).to_string()
@@ -669,7 +693,9 @@ pub struct PendingMixNodeChanges {
}
#[derive(Default, Copy, Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct LegacyPendingMixNodeChanges {
#[cfg_attr(feature = "utoipa", schema(value_type = Option<u32>))]
pub pledge_change: Option<EpochEventId>,
}
@@ -231,6 +231,7 @@ pub struct RoleMetadata {
/// Full details associated with given node.
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NymNodeDetails {
/// Basic bond information of this node, such as owner address, original pledge, etc.
pub bond_information: NymNodeBond,
@@ -288,14 +289,19 @@ impl NymNodeDetails {
}
#[cw_serde]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NymNodeBond {
/// Unique id assigned to the bonded node.
#[cfg_attr(feature = "utoipa", schema(value_type = u32))]
pub node_id: NodeId,
/// Address of the owner of this nym-node.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub owner: Addr,
/// Original amount pledged by the operator of this node.
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
pub original_pledge: Coin,
/// Block height at which this nym-node has been bonded.
@@ -348,6 +354,7 @@ impl NymNodeBond {
feature = "generate-ts",
ts(export, export_to = "ts-packages/types/src/types/rust/NymNode.ts")
)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NymNode {
/// Network address of this nym-node, for example 1.1.1.1 or foo.mixnode.com
/// that is used to discover other capabilities of this node.
@@ -358,6 +365,7 @@ pub struct NymNode {
pub custom_http_port: Option<u16>,
/// Base58-encoded ed25519 EdDSA public key.
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub identity_key: IdentityKey,
// TODO: I don't think we want to include sphinx keys here,
// given we want to rotate them and keeping that in sync with contract will be a PITA
@@ -435,8 +443,11 @@ pub struct NodeConfigUpdate {
export_to = "ts-packages/types/src/types/rust/PendingNodeChanges.ts"
)
)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct PendingNodeChanges {
#[cfg_attr(feature = "utoipa", schema(value_type = Option<u32>))]
pub pledge_change: Option<EpochEventId>,
#[cfg_attr(feature = "utoipa", schema(value_type = Option<u32>))]
pub cost_params_change: Option<IntervalEventId>,
}
@@ -21,31 +21,37 @@ pub type WorkFactor = Decimal;
)]
#[cw_serde]
#[derive(Copy)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct IntervalRewardParams {
/// Current value of the rewarding pool.
/// It is expected to be constant throughout the interval.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub reward_pool: Decimal,
/// Current value of the staking supply.
/// It is expected to be constant throughout the interval.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub staking_supply: Decimal,
/// Defines the percentage of stake needed to reach saturation for all of the nodes in the rewarded set.
/// Also known as `beta`.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub staking_supply_scale_factor: Percent,
// computed values
/// Current value of the computed reward budget per epoch, per node.
/// It is expected to be constant throughout the interval.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub epoch_reward_budget: Decimal,
/// Current value of the stake saturation point.
/// It is expected to be constant throughout the interval.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub stake_saturation_point: Decimal,
// constants(-ish)
@@ -54,6 +60,7 @@ pub struct IntervalRewardParams {
/// It is not really expected to be changing very often.
/// As a matter of fact, unless there's a very specific reason, it should remain constant.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub sybil_resistance: Percent,
// default: 10
@@ -61,6 +68,7 @@ pub struct IntervalRewardParams {
/// It is not really expected to be changing very often.
/// As a matter of fact, unless there's a very specific reason, it should remain constant.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub active_set_work_factor: Decimal,
// default: 2%
@@ -70,6 +78,7 @@ pub struct IntervalRewardParams {
/// It is not really expected to be changing very often.
/// As a matter of fact, unless there's a very specific reason, it should remain constant.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub interval_pool_emission: Percent,
}
@@ -90,6 +99,7 @@ impl IntervalRewardParams {
)]
#[cw_serde]
#[derive(Copy)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct RewardingParams {
/// Parameters that should remain unchanged throughout an interval.
pub interval: IntervalRewardParams,
@@ -254,6 +264,7 @@ impl RewardingParams {
)]
#[cw_serde]
#[derive(Copy)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct RewardedSetParams {
/// The expected number of nodes assigned entry gateway role (i.e. [`Role::EntryGateway`])
pub entry_gateways: u32,
@@ -17,10 +17,12 @@ pub mod simulator;
)]
#[cw_serde]
#[derive(Copy, Default)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct RewardEstimate {
/// The amount of **decimal** coins that are going to get distributed to the node,
/// i.e. the operator and all its delegators.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub total_node_reward: Decimal,
// note that operator reward includes the operating_cost,
@@ -28,14 +30,17 @@ pub struct RewardEstimate {
// in that case the operator reward would still be `1nym` as opposed to 0
/// The share of the reward that is going to get distributed to the node operator.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub operator: Decimal,
/// The share of the reward that is going to get distributed among the node delegators.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub delegates: Decimal,
/// The operating cost of this node. Note: it's already included in the operator reward.
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
pub operating_cost: Decimal,
}
@@ -3,6 +3,7 @@
use crate::config_score::{ConfigScoreParams, OutdatedVersionWeights, VersionScoreFormulaParams};
use crate::nym_node::Role;
use crate::EpochId;
use contracts_common::Percent;
use cosmwasm_schema::cw_serde;
use cosmwasm_std::Coin;
@@ -32,6 +33,23 @@ impl RoleAssignment {
}
}
#[cw_serde]
#[derive(Default)]
pub struct EpochRewardedSet {
pub epoch_id: EpochId,
pub assignment: RewardedSet,
}
impl From<(EpochId, RewardedSet)> for EpochRewardedSet {
fn from((epoch_id, assignment): (EpochId, RewardedSet)) -> Self {
EpochRewardedSet {
epoch_id,
assignment,
}
}
}
#[cw_serde]
#[derive(Default)]
pub struct RewardedSet {
@@ -69,6 +87,29 @@ impl RewardedSet {
pub fn rewarded_set_size(&self) -> usize {
self.active_set_size() + self.standby.len()
}
pub fn get_role(&self, node_id: NodeId) -> Option<Role> {
// given each role has ~100 entries in them, doing linear lookup with vec should be fine
if self.entry_gateways.contains(&node_id) {
return Some(Role::EntryGateway);
}
if self.exit_gateways.contains(&node_id) {
return Some(Role::ExitGateway);
}
if self.layer1.contains(&node_id) {
return Some(Role::Layer1);
}
if self.layer2.contains(&node_id) {
return Some(Role::Layer2);
}
if self.layer3.contains(&node_id) {
return Some(Role::Layer3);
}
if self.standby.contains(&node_id) {
return Some(Role::Standby);
}
None
}
}
#[cw_serde]
@@ -134,6 +175,14 @@ where
}
}
#[cfg(feature = "utoipa")]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "utoipa", schema(title = "Coin"))]
pub struct CoinSchema {
pub denom: String,
pub amount: String,
}
/// The current state of the mixnet contract.
#[cw_serde]
pub struct ContractState {
@@ -23,6 +23,11 @@ impl SqliteEcashTicketbookManager {
SqliteEcashTicketbookManager { connection_pool }
}
/// Closes the connection pool.
pub async fn close(&self) {
self.connection_pool.close().await
}
pub(crate) async fn cleanup_expired(&self, deadline: Date) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM ecash_ticketbook WHERE expiration_date <= ?",
@@ -43,6 +43,10 @@ impl Debug for EphemeralStorage {
impl Storage for EphemeralStorage {
type StorageError = StorageError;
async fn close(&self) {
// nothing to do here
}
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
self.storage_manager.cleanup_expired().await;
Ok(())
@@ -33,7 +33,10 @@ use nym_credentials::{
IssuanceTicketBook, IssuedTicketBook,
};
use nym_ecash_time::{ecash_today, Date, EcashTime};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use zeroize::Zeroizing;
@@ -56,6 +59,9 @@ impl PersistentStorage {
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -83,6 +89,10 @@ impl PersistentStorage {
impl Storage for PersistentStorage {
type StorageError = StorageError;
async fn close(&self) {
self.storage_manager.close().await
}
/// remove all expired ticketbooks and expiration date signatures
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
let ecash_yesterday = ecash_today().date().previous_day().unwrap();
+2
View File
@@ -22,6 +22,8 @@ use std::error::Error;
pub trait Storage: Send + Sync {
type StorageError: Error;
async fn close(&self);
/// remove all expired ticketbooks and expiration date signatures
async fn cleanup_expired(&self) -> Result<(), Self::StorageError>;
+1
View File
@@ -16,6 +16,7 @@ serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
strum = { workspace = true, features = ["derive"] }
time = { workspace = true, features = ["serde"] }
utoipa = { workspace = true }
rand = { workspace = true }
nym-compact-ecash = { path = "../nym_offline_compact_ecash" }
@@ -86,7 +86,6 @@ impl Display for AddressPolicyAction {
/// ```
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[cfg_attr(feature = "openapi", aliases(ExitPolicy))]
pub struct AddressPolicy {
/// A list of rules to apply to find out whether an address is
/// contained by this policy.
@@ -727,10 +726,10 @@ mod test {
let policy = AddressPolicy::parse_from_torrc(
r#"
ExitPolicy reject 1.2.3.4/32:*
ExitPolicy reject 1.2.3.5:*
ExitPolicy reject 1.2.3.5:*
ExitPolicy reject 1.2.3.6/16:*
ExitPolicy reject 1.2.3.6/16:123-456
ExitPolicy accept *:53
ExitPolicy reject 1.2.3.6/16:123-456
ExitPolicy accept *:53
ExitPolicy accept6 *6:119
ExitPolicy accept *4:120
ExitPolicy reject6 [FC00::]/7:*
@@ -9,7 +9,7 @@ use futures::{Sink, Stream};
use rand::{CryptoRng, RngCore};
use tungstenite::Message as WsMessage;
impl<'a, S, R> State<'a, S, R> {
impl<S, R> State<'_, S, R> {
async fn client_handshake_inner(&mut self) -> Result<(), HandshakeError>
where
S: Stream<Item = WsItem> + Sink<WsMessage> + Unpin,
@@ -10,7 +10,7 @@ use crate::registration::handshake::{error::HandshakeError, WsItem};
use futures::{Sink, Stream};
use tungstenite::Message as WsMessage;
impl<'a, S, R> State<'a, S, R> {
impl<S, R> State<'_, S, R> {
async fn gateway_handshake_inner(
&mut self,
raw_init_message: Vec<u8>,
@@ -20,6 +20,10 @@ pub enum ClientRequest {
hkdf_salt: Vec<u8>,
derived_key_digest: Vec<u8>,
},
ForgetMe {
client: bool,
stats: bool,
},
}
impl ClientRequest {
@@ -11,6 +11,7 @@ use tungstenite::Message;
#[non_exhaustive]
pub enum SensitiveServerResponse {
KeyUpgradeAck {},
ForgetMeAck {},
}
impl SensitiveServerResponse {
+17 -1
View File
@@ -6,7 +6,10 @@ use models::StoredFinishedSession;
use nym_node_metrics::entry::{ActiveSession, FinishedSession, SessionType};
use nym_sphinx::DestinationAddressBytes;
use sessions::SessionManager;
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use time::Date;
use tracing::{debug, error};
@@ -36,6 +39,9 @@ impl PersistentStatsStorage {
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -116,6 +122,16 @@ impl PersistentStatsStorage {
.await?)
}
pub async fn delete_unique_user(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_unique_user(client_address.as_base58_string())
.await?)
}
pub async fn insert_active_session(
&self,
client_address: DestinationAddressBytes,
@@ -71,6 +71,16 @@ impl SessionManager {
Ok(())
}
pub(crate) async fn delete_unique_user(&self, client_address_b58: String) -> Result<()> {
sqlx::query!(
"DELETE FROM sessions_unique_users WHERE client_address = ?",
client_address_b58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn get_unique_users(&self, date: Date) -> Result<Vec<String>> {
sqlx::query_scalar!(
"SELECT client_address as count FROM sessions_unique_users WHERE day = ?",
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM message_store WHERE client_address_bs58 = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "3ea5542b21a41b14276a8fd6b870c61aa0ddd30fee2565803b88c6086bd2a734"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM available_bandwidth WHERE client_id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "a3cc707995b8215fa77738cd1a55f9e8d251a3e764104d2a54153895dee1a118"
}
-1
View File
@@ -11,7 +11,6 @@ license.workspace = true
[dependencies]
bincode = { workspace = true }
defguard_wireguard_rs = { workspace = true }
log = { workspace = true }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
@@ -0,0 +1,7 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: GPL-3.0-only
*/
ALTER TABLE message_store
ADD COLUMN timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP;
+10
View File
@@ -49,6 +49,16 @@ impl BandwidthManager {
Ok(())
}
pub(crate) async fn remove_client(&self, client_id: i64) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM available_bandwidth WHERE client_id = ?",
client_id
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
/// Set the expiration date of the particular client to the provided date.
pub(crate) async fn set_expiration(
&self,
+41 -39
View File
@@ -2,9 +2,11 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::models::StoredMessage;
use time::OffsetDateTime;
use tracing::debug;
#[derive(Clone)]
pub(crate) struct InboxManager {
pub struct InboxManager {
connection_pool: sqlx::SqlitePool,
/// Maximum number of messages that can be obtained from the database per operation.
/// It is used to prevent out of memory errors in the case of client receiving a lot of data while
@@ -71,44 +73,22 @@ impl InboxManager {
// get 1 additional message to check whether there will be more to grab
// next time
let limit = self.retrieval_limit + 1;
let mut res = if let Some(start_after) = start_after {
sqlx::query_as!(
StoredMessage,
r#"
SELECT
id as "id!",
client_address_bs58 as "client_address_bs58!",
content as "content!"
FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
client_address_bs58,
start_after,
limit
)
.fetch_all(&self.connection_pool)
.await?
} else {
sqlx::query_as!(
StoredMessage,
r#"
SELECT
id as "id!",
client_address_bs58 as "client_address_bs58!",
content as "content!"
FROM message_store
WHERE client_address_bs58 = ?
ORDER BY id ASC
LIMIT ?;
"#,
client_address_bs58,
limit
)
.fetch_all(&self.connection_pool)
.await?
};
let start_after = start_after.unwrap_or(-1);
let mut res: Vec<StoredMessage> = sqlx::query_as(
r#"
SELECT id, client_address_bs58, content, timestamp
FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
)
.bind(client_address_bs58)
.bind(start_after)
.bind(limit)
.fetch_all(&self.connection_pool)
.await?;
if res.len() > self.retrieval_limit as usize {
res.truncate(self.retrieval_limit as usize);
@@ -133,4 +113,26 @@ impl InboxManager {
.await?;
Ok(())
}
pub(crate) async fn remove_messages_for_client(
&self,
client_address_bs58: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM message_store WHERE client_address_bs58 = ?",
client_address_bs58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub async fn remove_stale(&self, cutoff: OffsetDateTime) -> Result<(), sqlx::Error> {
let affected = sqlx::query!("DELETE FROM message_store WHERE timestamp < ?", cutoff)
.execute(&self.connection_pool)
.await?
.rows_affected();
debug!("Removed {affected} stale messages");
Ok(())
}
}
+50 -2
View File
@@ -3,7 +3,6 @@
use bandwidth::BandwidthManager;
use clients::{ClientManager, ClientType};
use inboxes::InboxManager;
use models::{
Client, PersistedBandwidth, PersistedSharedKeys, RedemptionProposal, StoredMessage,
VerifiedTicket, WireguardPeer,
@@ -12,7 +11,10 @@ use nym_credentials_interface::ClientTicket;
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_sphinx::DestinationAddressBytes;
use shared_keys::SharedKeysManager;
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use tickets::TicketStorageManager;
use time::OffsetDateTime;
@@ -28,6 +30,7 @@ mod tickets;
mod wireguard_peers;
pub use error::GatewayStorageError;
pub use inboxes::InboxManager;
// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
@@ -41,6 +44,33 @@ pub struct GatewayStorage {
}
impl GatewayStorage {
#[allow(dead_code)]
pub(crate) fn client_manager(&self) -> &ClientManager {
&self.client_manager
}
pub(crate) fn shared_key_manager(&self) -> &SharedKeysManager {
&self.shared_key_manager
}
pub fn inbox_manager(&self) -> &InboxManager {
&self.inbox_manager
}
pub(crate) fn bandwidth_manager(&self) -> &BandwidthManager {
&self.bandwidth_manager
}
#[allow(dead_code)]
pub(crate) fn ticket_manager(&self) -> &TicketStorageManager {
&self.ticket_manager
}
#[allow(dead_code)]
pub(crate) fn wireguard_peer_manager(&self) -> &wireguard_peers::WgPeerManager {
&self.wireguard_peer_manager
}
/// Initialises `PersistentStorage` using the provided path.
///
/// # Arguments
@@ -59,6 +89,9 @@ impl GatewayStorage {
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -101,6 +134,21 @@ impl GatewayStorage {
.await?)
}
pub async fn handle_forget_me(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), GatewayStorageError> {
let client_id = self.get_mixnet_client_id(client_address).await?;
self.inbox_manager()
.remove_messages_for_client(&client_address.as_base58_string())
.await?;
self.bandwidth_manager().remove_client(client_id).await?;
self.shared_key_manager()
.remove_shared_keys(&client_address.as_base58_string())
.await?;
Ok(())
}
pub async fn insert_shared_keys(
&self,
client_address: DestinationAddressBytes,
+2
View File
@@ -48,11 +48,13 @@ impl TryFrom<PersistedSharedKeys> for SharedGatewayKey {
}
}
#[derive(FromRow)]
pub struct StoredMessage {
pub id: i64,
#[allow(dead_code)]
pub client_address_bs58: String,
pub content: Vec<u8>,
pub timestamp: OffsetDateTime,
}
#[derive(Debug, Clone, FromRow)]
+241 -138
View File
@@ -192,6 +192,28 @@ impl Client {
&self.base_url
}
pub fn create_request<B, K, V>(
&self,
method: reqwest::Method,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: Option<&B>,
) -> RequestBuilder
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
{
let url = sanitize_url(&self.base_url, path, params);
let mut request = self.reqwest_client.request(method.clone(), url);
if let Some(body) = json_body {
request = request.json(body);
}
request
}
pub fn create_get_request<K, V>(
&self,
path: PathSegments<'_>,
@@ -205,38 +227,6 @@ impl Client {
self.reqwest_client.get(url)
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending GET request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.get(url).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.get(url).send().await?)
}
}
pub fn create_post_request<B, K, V>(
&self,
path: PathSegments<'_>,
@@ -252,36 +242,6 @@ impl Client {
self.reqwest_client.post(url).json(json_body)
}
async fn send_post_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.post(url).json(json_body).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.post(url).json(json_body).send().await?)
}
}
pub fn create_delete_request<K, V>(
&self,
path: PathSegments<'_>,
@@ -295,6 +255,88 @@ impl Client {
self.reqwest_client.delete(url)
}
pub fn create_patch_request<B, K, V>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> RequestBuilder
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
{
let url = sanitize_url(&self.base_url, path, params);
self.reqwest_client.patch(url).json(json_body)
}
async fn send_request<B, K, V, E>(
&self,
method: reqwest::Method,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: Option<&B>,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
let url = sanitize_url(&self.base_url, path, params);
let mut request = self.reqwest_client.request(method.clone(), url);
if let Some(body) = json_body {
request = request.json(body);
}
#[cfg(target_arch = "wasm32")]
{
Ok(
wasmtimer::tokio::timeout(self.request_timeout, request.send())
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(request.send().await?)
}
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::GET, path, params, None::<&()>)
.await
}
async fn send_post_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::POST, path, params, Some(json_body))
.await
}
pub async fn send_delete_request<K, V, E>(
&self,
path: PathSegments<'_>,
@@ -305,23 +347,24 @@ impl Client {
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending DELETE request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.delete(url).send(),
)
self.send_request(reqwest::Method::DELETE, path, params, None::<&()>)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.delete(url).send().await?)
}
pub async fn send_patch_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::PATCH, path, params, Some(json_body))
.await
}
#[instrument(level = "debug", skip_all)]
@@ -372,6 +415,56 @@ impl Client {
parse_response(res, false).await
}
pub async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
K: AsRef<str>,
V: AsRef<str>,
E: Display + DeserializeOwned,
{
let res = self.send_patch_request(path, params, json_body).await?;
parse_response(res, true).await
}
async fn call_json_endpoint<B, T, S, E>(
&self,
method: reqwest::Method,
endpoint: S,
json_body: Option<&B>,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str>,
{
let mut request = self
.reqwest_client
.request(method.clone(), self.base_url.join(endpoint.as_ref())?);
if let Some(body) = json_body {
request = request.json(body);
}
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(self.request_timeout, request.send())
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = { request.send().await? };
parse_response(res, false).await
}
#[instrument(level = "debug", skip_all)]
pub async fn get_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
@@ -379,27 +472,8 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.get(self.base_url.join(endpoint.as_ref())?)
.send(),
)
self.call_json_endpoint(reqwest::Method::GET, endpoint, None::<&()>)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.get(self.base_url.join(endpoint.as_ref())?)
.send()
.await?
};
parse_response(res, false).await
}
pub async fn post_json_endpoint<B, T, S, E>(
@@ -413,29 +487,8 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.post(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send(),
)
self.call_json_endpoint(reqwest::Method::POST, endpoint, Some(json_body))
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.post(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send()
.await?
};
parse_response(res, true).await
}
pub async fn delete_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
@@ -444,27 +497,23 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.delete(self.base_url.join(endpoint.as_ref())?)
.send(),
)
self.call_json_endpoint(reqwest::Method::DELETE, endpoint, None::<&()>)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
}
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.delete(self.base_url.join(endpoint.as_ref())?)
.send()
.await?
};
parse_response(res, false).await
pub async fn patch_json_endpoint<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str>,
{
self.call_json_endpoint(reqwest::Method::PATCH, endpoint, Some(json_body))
.await
}
}
@@ -509,6 +558,19 @@ pub trait ApiClient {
V: AsRef<str> + Sync,
E: Display + DeserializeOwned;
async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
K: AsRef<str> + Sync,
V: AsRef<str> + Sync,
E: Display + DeserializeOwned;
/// `get` json data from the provided absolute endpoint, i.e. for example `"/api/v1/mixnodes?since=12345"`
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
@@ -532,6 +594,17 @@ pub trait ApiClient {
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send;
async fn patch_json_data_at<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -581,6 +654,22 @@ impl ApiClient for Client {
self.delete_json(path, params).await
}
async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
K: AsRef<str> + Sync,
V: AsRef<str> + Sync,
E: Display + DeserializeOwned,
{
self.patch_json(path, params, json_body).await
}
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
for<'a> T: Deserialize<'a>,
@@ -612,6 +701,20 @@ impl ApiClient for Client {
{
self.delete_json_endpoint(endpoint).await
}
async fn patch_json_data_at<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send,
{
self.patch_json_endpoint(endpoint, json_body).await
}
}
// utility function that should solve the double slash problem in API urls forever.
-1
View File
@@ -10,7 +10,6 @@ use serde::{Deserialize, Serialize};
pub mod middleware;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub enum FormattedResponse<T> {
Json(Json<T>),
Yaml(Yaml<T>),
@@ -63,6 +63,7 @@ impl From<v6::request::StaticConnectRequest> for v7::request::StaticConnectReque
}
}
#[allow(deprecated)]
impl From<v6::request::DynamicConnectRequest> for v7::request::DynamicConnectRequest {
fn from(dynamic_connect_request: v6::request::DynamicConnectRequest) -> Self {
Self {
@@ -51,6 +51,7 @@ impl IpPacketRequest {
)
}
#[allow(deprecated)]
pub fn new_dynamic_connect_request(
reply_to: Recipient,
reply_to_hops: Option<u8>,
@@ -285,6 +286,9 @@ pub struct DynamicConnectRequest {
// The number of mix node hops that responses should take, in addition to the entry and exit
// node. Zero means only client -> entry -> exit -> client.
#[deprecated(
note = "clients can no longer control number of hops to use. this field is scheduled for removal in V8"
)]
pub reply_to_hops: Option<u8>,
// The average delay at each mix node, in milliseconds. Currently this is not supported by the
+11
View File
@@ -57,3 +57,14 @@ pub mod wireguard {
pub const WG_TUN_DEVICE_IP_ADDRESS_V6: Ipv6Addr = Ipv6Addr::new(0xfc01, 0, 0, 0, 0, 0, 0, 0x1); // fc01::1
pub const WG_TUN_DEVICE_NETMASK_V6: u8 = 112;
}
pub mod mixnet_vpn {
use std::net::{Ipv4Addr, Ipv6Addr};
// The interface used to route traffic
pub const NYM_TUN_BASE_NAME: &str = "nymtun";
pub const NYM_TUN_DEVICE_ADDRESS_V4: Ipv4Addr = Ipv4Addr::new(10, 0, 0, 1);
pub const NYM_TUN_DEVICE_NETMASK_V4: Ipv4Addr = Ipv4Addr::new(255, 255, 0, 0);
pub const NYM_TUN_DEVICE_ADDRESS_V6: Ipv6Addr = Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 0x1); // fc00::1
pub const NYM_TUN_DEVICE_NETMASK_V6: &str = "112";
}
+33 -31
View File
@@ -2,10 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::NetworkTestingError;
use crate::node::TestableNode;
use crate::NodeId;
use crate::node::{NodeType, TestableNode};
use nym_sphinx::message::NymMessage;
use nym_topology::{gateway, mix};
use nym_topology::node::RoutingNode;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
@@ -26,73 +25,76 @@ pub struct TestMessage<T = Empty> {
}
impl<T> TestMessage<T> {
pub fn new<N: Into<TestableNode>>(node: N, msg_id: u32, total_msgs: u32, ext: T) -> Self {
pub fn new(tested_node: TestableNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
TestMessage {
tested_node: node.into(),
tested_node,
msg_id,
total_msgs,
ext,
}
}
pub fn new_mix(node: &mix::LegacyNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(node, msg_id, total_msgs, ext)
pub fn new_mix(node: &RoutingNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(
TestableNode::new_routing(node, NodeType::Mixnode),
msg_id,
total_msgs,
ext,
)
}
// pub fn new_gateway(node: &gateway::Node, msg_id: u32, total_msgs: u32, ext: T) -> Self {
// Self::new(node, msg_id, total_msgs, ext)
// }
pub fn new_serialized<N>(
node: N,
msg_id: u32,
total_msgs: u32,
ext: T,
) -> Result<Vec<u8>, NetworkTestingError>
where
N: Into<TestableNode>,
T: Serialize,
{
Self::new(node, msg_id, total_msgs, ext).as_bytes()
pub fn new_gateway(node: &RoutingNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(
TestableNode::new_routing(node, NodeType::Gateway),
msg_id,
total_msgs,
ext,
)
}
pub fn new_plaintexts<N>(
node: &N,
pub fn new_plaintexts(
node: TestableNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
for<'a> &'a N: Into<TestableNode>,
T: Serialize + Clone,
{
let mut msgs = Vec::with_capacity(total_msgs as usize);
for msg_id in 1..=total_msgs {
msgs.push(Self::new(node, msg_id, total_msgs, ext.clone()).as_bytes()?)
msgs.push(Self::new(node.clone(), msg_id, total_msgs, ext.clone()).as_bytes()?)
}
Ok(msgs)
}
pub fn mix_plaintexts(
node: &mix::LegacyNode,
node: &RoutingNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
T: Serialize + Clone,
{
Self::new_plaintexts(node, total_msgs, ext)
Self::new_plaintexts(
TestableNode::new_routing(node, NodeType::Mixnode),
total_msgs,
ext,
)
}
pub fn legacy_gateway_plaintexts(
node: &gateway::LegacyNode,
node_id: NodeId,
node: &RoutingNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
T: Serialize + Clone,
{
Self::new_plaintexts(&(node, node_id), total_msgs, ext)
Self::new_plaintexts(
TestableNode::new_routing(node, NodeType::Gateway),
total_msgs,
ext,
)
}
pub fn as_json_string(&self) -> Result<String, NetworkTestingError>
+9 -33
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::NodeId;
use nym_topology::{gateway, mix};
use nym_topology::node::RoutingNode;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
@@ -24,6 +24,14 @@ impl TestableNode {
}
}
pub fn new_routing(routing_node: &RoutingNode, typ: NodeType) -> Self {
TestableNode::new(
routing_node.identity_key.to_base58_string(),
typ,
routing_node.node_id,
)
}
pub fn new_mixnode(encoded_identity: String, node_id: NodeId) -> Self {
TestableNode::new(encoded_identity, NodeType::Mixnode, node_id)
}
@@ -37,38 +45,6 @@ impl TestableNode {
}
}
impl<'a> From<&'a mix::LegacyNode> for TestableNode {
fn from(value: &'a mix::LegacyNode) -> Self {
TestableNode {
encoded_identity: value.identity_key.to_base58_string(),
typ: NodeType::Mixnode,
node_id: value.mix_id,
}
}
}
impl<'a> From<(&'a gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): (&'a gateway::LegacyNode, NodeId)) -> Self {
(&(gateway, node_id)).into()
}
}
impl<'a> From<&'a (gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): &'a (gateway::LegacyNode, NodeId)) -> Self {
(gateway, *node_id).into()
}
}
impl<'a, 'b> From<&'a (&'b gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): &'a (&'b gateway::LegacyNode, NodeId)) -> Self {
TestableNode {
encoded_identity: gateway.identity_key.to_base58_string(),
typ: NodeType::Gateway,
node_id: *node_id,
}
}
}
impl Display for TestableNode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
+39 -93
View File
@@ -2,21 +2,22 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::NetworkTestingError;
use crate::Empty;
use crate::NodeId;
use crate::TestMessage;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx::params::PacketSize;
use nym_sphinx::preparer::{FragmentPreparer, PreparedFragment};
use nym_sphinx_params::PacketType;
use nym_topology::{gateway, mix, NymTopology};
use nym_topology::node::RoutingNode;
use nym_topology::{NymRouteProvider, NymTopology, Role};
use rand::{CryptoRng, Rng};
use serde::Serialize;
use std::sync::Arc;
use std::time::Duration;
pub use nym_topology::node::LegacyMixLayer;
pub struct NodeTester<R> {
rng: R,
@@ -38,10 +39,6 @@ pub struct NodeTester<R> {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
// while acks are going to be ignored they still need to be constructed
// so that the gateway would be able to correctly process and forward the message
ack_key: Arc<AckKey>,
@@ -70,41 +67,27 @@ where
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
ack_key,
}
}
/// Allows setting non-default number of expected mix hops in the network.
#[allow(dead_code)]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
pub fn testable_mix_topology(&self, node: &mix::LegacyNode) -> NymTopology {
pub fn testable_mix_topology(&self, layer: LegacyMixLayer, node: &RoutingNode) -> NymTopology {
let mut topology = self.base_topology.clone();
topology.set_mixes_in_layer(node.layer as u8, vec![node.clone()]);
topology.set_testable_node(layer.into(), node.clone());
topology
}
pub fn testable_gateway_topology(&self, gateway: &gateway::LegacyNode) -> NymTopology {
pub fn testable_gateway_topology(&self, node: &RoutingNode) -> NymTopology {
let mut topology = self.base_topology.clone();
topology.set_gateways(vec![gateway.clone()]);
topology.set_testable_node(Role::EntryGateway, node.clone());
topology.set_testable_node(Role::ExitGateway, node.clone());
topology
}
pub fn simple_mixnode_test_packets(
&mut self,
mix: &mix::LegacyNode,
test_packets: u32,
) -> Result<Vec<PreparedFragment>, NetworkTestingError> {
self.mixnode_test_packets(mix, Empty, test_packets, None)
}
pub fn mixnode_test_packets<T>(
&mut self,
mix: &mix::LegacyNode,
mix: &RoutingNode,
legacy_mix_layer: LegacyMixLayer,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -112,7 +95,9 @@ where
where
T: Serialize + Clone,
{
let ephemeral_topology = self.testable_mix_topology(mix);
let ephemeral_topology =
NymRouteProvider::from(self.testable_mix_topology(legacy_mix_layer, mix))
.with_ignore_egress_epoch_roles(true);
let mut packets = Vec::with_capacity(test_packets as usize);
for plaintext in TestMessage::mix_plaintexts(mix, test_packets, msg_ext)? {
@@ -128,7 +113,7 @@ where
pub fn mixnodes_test_packets<T>(
&mut self,
nodes: &[mix::LegacyNode],
nodes: &[(LegacyMixLayer, RoutingNode)],
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -137,9 +122,10 @@ where
T: Serialize + Clone,
{
let mut packets = Vec::new();
for node in nodes {
for (layer, node) in nodes {
packets.append(&mut self.mixnode_test_packets(
node,
*layer,
msg_ext.clone(),
test_packets,
custom_recipient,
@@ -149,26 +135,10 @@ where
Ok(packets)
}
pub fn existing_mixnode_test_packets<T>(
&mut self,
mix_id: NodeId,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
) -> Result<Vec<PreparedFragment>, NetworkTestingError>
where
T: Serialize + Clone,
{
let Some(node) = self.base_topology.find_mix(mix_id) else {
return Err(NetworkTestingError::NonExistentMixnode { mix_id });
};
self.mixnode_test_packets(&node.clone(), msg_ext, test_packets, custom_recipient)
}
pub fn existing_identity_mixnode_test_packets<T>(
&mut self,
encoded_mix_identity: String,
layer: LegacyMixLayer,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -176,22 +146,30 @@ where
where
T: Serialize + Clone,
{
let Some(node) = self
.base_topology
.find_mix_by_identity(&encoded_mix_identity)
else {
let Ok(identity) = encoded_mix_identity.parse() else {
return Err(NetworkTestingError::NonExistentMixnodeIdentity {
mix_identity: encoded_mix_identity,
});
};
self.mixnode_test_packets(&node.clone(), msg_ext, test_packets, custom_recipient)
let Some(node) = self.base_topology.find_node_by_identity(identity) else {
return Err(NetworkTestingError::NonExistentMixnodeIdentity {
mix_identity: encoded_mix_identity,
});
};
self.mixnode_test_packets(
&node.clone(),
layer,
msg_ext,
test_packets,
custom_recipient,
)
}
pub fn legacy_gateway_test_packets<T>(
&mut self,
gateway: &gateway::LegacyNode,
node_id: NodeId,
gateway: &RoutingNode,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -199,12 +177,11 @@ where
where
T: Serialize + Clone,
{
let ephemeral_topology = self.testable_gateway_topology(gateway);
let ephemeral_topology = NymRouteProvider::from(self.testable_gateway_topology(gateway))
.with_ignore_egress_epoch_roles(true);
let mut packets = Vec::with_capacity(test_packets as usize);
for plaintext in
TestMessage::legacy_gateway_plaintexts(gateway, node_id, test_packets, msg_ext)?
{
for plaintext in TestMessage::legacy_gateway_plaintexts(gateway, test_packets, msg_ext)? {
packets.push(self.wrap_plaintext_data(
plaintext,
&ephemeral_topology,
@@ -215,36 +192,10 @@ where
Ok(packets)
}
pub fn existing_gateway_test_packets<T>(
&mut self,
node_id: NodeId,
encoded_gateway_identity: String,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
) -> Result<Vec<PreparedFragment>, NetworkTestingError>
where
T: Serialize + Clone,
{
let Some(node) = self.base_topology.find_gateway(&encoded_gateway_identity) else {
return Err(NetworkTestingError::NonExistentGateway {
gateway_identity: encoded_gateway_identity,
});
};
self.legacy_gateway_test_packets(
&node.clone(),
node_id,
msg_ext,
test_packets,
custom_recipient,
)
}
pub fn wrap_plaintext_data(
&mut self,
plaintext: Vec<u8>,
topology: &NymTopology,
topology: &NymRouteProvider,
custom_recipient: Option<Recipient>,
) -> Result<PreparedFragment, NetworkTestingError> {
let message = NymMessage::new_plain(plaintext);
@@ -274,14 +225,13 @@ where
&address,
&address,
PacketType::Mix,
None,
)?)
}
pub fn create_test_packet<T>(
&mut self,
message: &TestMessage<T>,
topology: &NymTopology,
topology: &NymRouteProvider,
custom_recipient: Option<Recipient>,
) -> Result<PreparedFragment, NetworkTestingError>
where
@@ -307,10 +257,6 @@ impl<R: CryptoRng + Rng> FragmentPreparer for NodeTester<R> {
1
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
fn average_packet_delay(&self) -> Duration {
self.average_packet_delay
}
@@ -65,6 +65,14 @@ impl<T> NonExhaustiveDelayQueue<T> {
pub fn remove(&mut self, key: &QueueKey) -> Expired<T> {
self.inner.remove(key)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl<T> Default for NonExhaustiveDelayQueue<T> {
+1 -1
View File
@@ -12,6 +12,6 @@ license.workspace = true
[dependencies]
prometheus = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
dashmap = { workspace = true }
lazy_static = { workspace = true }
+390 -81
View File
@@ -1,14 +1,18 @@
use dashmap::DashMap;
pub use log::error;
use log::{debug, warn};
use std::fmt;
pub use std::time::Instant;
use tracing::{debug, error, warn};
use prometheus::{core::Collector, Encoder as _, IntCounter, IntGauge, Registry, TextEncoder};
use prometheus::{
core::Collector, Encoder as _, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge, Registry,
TextEncoder,
};
pub use prometheus::HistogramTimer;
pub use std::time::Instant;
#[macro_export]
macro_rules! prepend_package_name {
($name: literal) => {
($name: tt) => {
&format!(
"{}_{}",
std::module_path!()
@@ -23,15 +27,29 @@ macro_rules! prepend_package_name {
#[macro_export]
macro_rules! inc_by {
($name:literal, $x:expr, $help: expr) => {
$crate::REGISTRY.maybe_register_and_inc_by(
$crate::prepend_package_name!($name),
$x as i64,
$help,
);
};
($name:literal, $x:expr) => {
$crate::REGISTRY.inc_by($crate::prepend_package_name!($name), $x as i64);
$crate::REGISTRY.maybe_register_and_inc_by(
$crate::prepend_package_name!($name),
$x as i64,
None,
);
};
}
#[macro_export]
macro_rules! inc {
($name:literal, $help: expr) => {
$crate::REGISTRY.maybe_register_and_inc($crate::prepend_package_name!($name), $help);
};
($name:literal) => {
$crate::REGISTRY.inc($crate::prepend_package_name!($name));
$crate::REGISTRY.maybe_register_and_inc($crate::prepend_package_name!($name), None);
};
}
@@ -42,6 +60,71 @@ macro_rules! metrics {
};
}
#[macro_export]
macro_rules! set_metric {
($name:literal, $x:expr, $help: expr) => {
$crate::REGISTRY.maybe_register_and_set(
$crate::prepend_package_name!($name),
$x as i64,
$help,
);
};
($name:literal, $x:expr) => {
$crate::REGISTRY.maybe_register_and_set(
$crate::prepend_package_name!($name),
$x as i64,
None,
);
};
}
#[macro_export]
macro_rules! set_metric_float {
($name:literal, $x:expr, $help: expr) => {
$crate::REGISTRY.maybe_register_and_set_float(
$crate::prepend_package_name!($name),
$x as f64,
$help,
);
};
($name:literal, $x:expr) => {
$crate::REGISTRY.maybe_register_and_set_float(
$crate::prepend_package_name!($name),
$x as f64,
None,
);
};
}
#[macro_export]
macro_rules! add_histogram_obs {
($name:expr, $x:expr, $b:expr, $help:expr) => {
$crate::REGISTRY.maybe_register_and_add_to_histogram(
$crate::prepend_package_name!($name),
$x as f64,
Some($b),
$help,
);
};
($name:expr, $x:expr, $b:expr) => {
$crate::REGISTRY.maybe_register_and_add_to_histogram(
$crate::prepend_package_name!($name),
$x as f64,
Some($b),
None,
);
};
($name:expr, $x:expr) => {
$crate::REGISTRY.maybe_register_and_add_to_histogram(
$crate::prepend_package_name!($name),
$x as f64,
None,
None,
);
};
}
#[macro_export]
macro_rules! nanos {
( $name:literal, $x:expr ) => {{
@@ -50,7 +133,7 @@ macro_rules! nanos {
let r = $x;
let duration = start.elapsed().as_nanos() as i64;
let name = $crate::prepend_package_name!($name);
$crate::REGISTRY.inc_by(&format!("{}_nanos", $name), duration);
$crate::REGISTRY.maybe_register_and_inc_by(&format!("{}_nanos", $name), duration, None);
r
}};
}
@@ -59,15 +142,100 @@ lazy_static::lazy_static! {
pub static ref REGISTRY: MetricsController = MetricsController::default();
}
pub fn metrics_registry() -> &'static MetricsController {
&REGISTRY
}
#[derive(Default)]
pub struct MetricsController {
registry: Registry,
registry_index: DashMap<String, Metric>,
}
enum Metric {
C(Box<IntCounter>),
G(Box<IntGauge>),
pub enum Metric {
IntCounter(Box<IntCounter>),
IntGauge(Box<IntGauge>),
FloatGauge(Box<Gauge>),
Histogram(Box<Histogram>),
}
impl Metric {
pub fn new_int_counter(name: &str, help: &str) -> Option<Self> {
match IntCounter::new(sanitize_metric_name(name), help) {
Ok(c) => Some(c.into()),
Err(err) => {
error!("Failed to create counter {name:?}: {err}");
None
}
}
}
pub fn new_int_gauge(name: &str, help: &str) -> Option<Self> {
match IntGauge::new(sanitize_metric_name(name), help) {
Ok(g) => Some(g.into()),
Err(err) => {
error!("Failed to create gauge {name:?}: {err}");
None
}
}
}
pub fn new_float_gauge(name: &str, help: &str) -> Option<Self> {
match Gauge::new(sanitize_metric_name(name), help) {
Ok(g) => Some(g.into()),
Err(err) => {
error!("Failed to create gauge {name:?}: {err}");
None
}
}
}
pub fn new_histogram(name: &str, help: &str, buckets: Option<&[f64]>) -> Option<Self> {
let mut opts = HistogramOpts::new(sanitize_metric_name(name), help);
if let Some(buckets) = buckets {
opts = opts.buckets(buckets.to_vec())
}
match Histogram::with_opts(opts) {
Ok(h) => Some(Metric::Histogram(Box::new(h))),
Err(err) => {
error!("failed to create histogram {name:?}: {err}");
None
}
}
}
fn as_collector(&self) -> Box<dyn Collector> {
match self {
Metric::IntCounter(c) => c.clone(),
Metric::IntGauge(g) => g.clone(),
Metric::FloatGauge(g) => g.clone(),
Metric::Histogram(h) => h.clone(),
}
}
}
impl From<IntCounter> for Metric {
fn from(v: IntCounter) -> Self {
Metric::IntCounter(Box::new(v))
}
}
impl From<IntGauge> for Metric {
fn from(v: IntGauge) -> Self {
Metric::IntGauge(Box::new(v))
}
}
impl From<Gauge> for Metric {
fn from(v: Gauge) -> Self {
Metric::FloatGauge(Box::new(v))
}
}
impl From<Histogram> for Metric {
fn from(v: Histogram) -> Self {
Metric::Histogram(Box::new(v))
}
}
fn fq_name(c: &dyn Collector) -> String {
@@ -81,34 +249,92 @@ impl Metric {
#[inline(always)]
fn fq_name(&self) -> String {
match self {
Metric::C(c) => fq_name(c.as_ref()),
Metric::G(g) => fq_name(g.as_ref()),
Metric::IntCounter(c) => fq_name(c.as_ref()),
Metric::IntGauge(g) => fq_name(g.as_ref()),
Metric::FloatGauge(g) => fq_name(g.as_ref()),
Metric::Histogram(h) => fq_name(h.as_ref()),
}
}
#[inline(always)]
fn inc(&self) {
match self {
Metric::C(c) => c.inc(),
Metric::G(g) => g.inc(),
Metric::IntCounter(c) => c.inc(),
Metric::IntGauge(g) => g.inc(),
Metric::FloatGauge(g) => g.inc(),
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call increment on a histogram")
}
}
}
#[inline(always)]
fn inc_by(&self, value: i64) {
match self {
Metric::C(c) => c.inc_by(value as u64),
Metric::G(g) => g.add(value),
Metric::IntCounter(c) => c.inc_by(value as u64),
Metric::IntGauge(g) => g.add(value),
Metric::FloatGauge(g) => {
warn!("attempted to increment a float gauge ('{}') by an integer - this is most likely a bug", self.fq_name());
g.add(value as f64)
}
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call increment on a histogram")
}
}
}
#[inline(always)]
fn set(&self, value: i64) {
match self {
Metric::C(_c) => {
Metric::IntCounter(_c) => {
warn!("Cannot set value for counter {:?}", self.fq_name());
}
Metric::G(g) => g.set(value),
Metric::IntGauge(g) => g.set(value),
Metric::FloatGauge(g) => {
warn!("attempted to set a float gauge ('{}') to an integer value - this is most likely a bug", self.fq_name());
g.set(value as f64)
}
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call set on a histogram")
}
}
}
#[inline(always)]
fn set_float(&self, value: f64) {
match self {
Metric::IntCounter(_c) => {
warn!("Cannot set value for counter {:?}", self.fq_name());
}
Metric::IntGauge(g) => {
warn!("attempted to set a integer gauge ('{}') to a float value - this is most likely a bug", self.fq_name());
g.set(value as i64)
}
Metric::FloatGauge(g) => g.set(value),
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call increment on a histogram")
}
}
}
#[inline(always)]
fn add_histogram_observation(&self, value: f64) {
match self {
Metric::Histogram(h) => {
h.observe(value);
}
_ => warn!("attempted to add histogram observation on a non-histogram metric"),
}
}
#[inline(always)]
fn start_timer(&self) -> Option<HistogramTimer> {
match self {
Metric::Histogram(h) => Some(h.start_timer()),
_ => {
warn!("attempted to start histogram observation on a non-histogram metric");
None
}
}
}
}
@@ -145,93 +371,165 @@ impl MetricsController {
}
}
pub fn set(&self, name: &str, value: i64) {
pub fn register_int_gauge<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_int_gauge(name, help.into().unwrap_or(name)) else {
return;
};
self.register_metric(metric);
}
pub fn register_float_gauge<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_float_gauge(name, help.into().unwrap_or(name)) else {
return;
};
self.register_metric(metric);
}
pub fn register_int_counter<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_int_counter(name, help.into().unwrap_or(name)) else {
return;
};
self.register_metric(metric);
}
pub fn register_histogram<'a>(
&self,
name: &str,
help: impl Into<Option<&'a str>>,
buckets: Option<&[f64]>,
) {
let Some(metric) = Metric::new_histogram(name, help.into().unwrap_or(name), buckets) else {
return;
};
self.register_metric(metric);
}
pub fn set(&self, name: &str, value: i64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.set(value);
true
} else {
let gauge = match IntGauge::new(sanitize_metric_name(name), name) {
Ok(g) => g,
Err(e) => {
debug!("Failed to create gauge {:?}:\n{}", name, e);
return;
}
};
self.register_gauge(Box::new(gauge));
self.set(name, value)
false
}
}
pub fn inc(&self, name: &str) {
pub fn set_float(&self, name: &str, value: f64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.set_float(value);
true
} else {
false
}
}
pub fn add_to_histogram(&self, name: &str, value: f64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.add_histogram_observation(value);
true
} else {
false
}
}
pub fn start_timer(&self, name: &str) -> Option<HistogramTimer> {
self.registry_index
.get(name)
.and_then(|metric| metric.start_timer())
}
pub fn inc(&self, name: &str) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.inc();
true
} else {
let counter = match IntCounter::new(sanitize_metric_name(name), name) {
Ok(c) => c,
Err(e) => {
debug!("Failed to create counter {:?}:\n{}", name, e);
return;
}
};
self.register_counter(Box::new(counter));
self.inc(name)
false
}
}
pub fn inc_by(&self, name: &str, value: i64) {
pub fn inc_by(&self, name: &str, value: i64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.inc_by(value);
true
} else {
let counter = match IntCounter::new(sanitize_metric_name(name), name) {
Ok(c) => c,
Err(e) => {
debug!("Failed to create counter {:?}:\n{}", name, e);
return;
}
};
self.register_counter(Box::new(counter));
self.inc_by(name, value)
false
}
}
fn register_gauge(&self, metric: Box<IntGauge>) {
let fq_name = metric
.desc()
.first()
.map(|d| d.fq_name.clone())
.unwrap_or_default();
pub fn maybe_register_and_set<'a>(
&self,
name: &str,
value: i64,
help: impl Into<Option<&'a str>>,
) {
if !self.set(name, value) {
let help = help.into();
self.register_int_gauge(name, help);
self.set(name, value);
}
}
pub fn maybe_register_and_set_float<'a>(
&self,
name: &str,
value: f64,
help: impl Into<Option<&'a str>>,
) {
if !self.set_float(name, value) {
let help = help.into();
self.register_float_gauge(name, help);
self.set_float(name, value);
}
}
pub fn maybe_register_and_add_to_histogram<'a>(
&self,
name: &str,
value: f64,
buckets: Option<&[f64]>,
help: impl Into<Option<&'a str>>,
) {
if !self.add_to_histogram(name, value) {
let help = help.into();
self.register_histogram(name, help, buckets);
self.add_to_histogram(name, value);
}
}
pub fn maybe_register_and_inc<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
if !self.inc(name) {
let help = help.into();
self.register_int_counter(name, help);
self.inc(name);
}
}
pub fn maybe_register_and_inc_by<'a>(
&self,
name: &str,
value: i64,
help: impl Into<Option<&'a str>>,
) {
if !self.inc_by(name, value) {
let help = help.into();
self.register_int_counter(name, help);
self.inc_by(name, value);
}
}
pub fn register_metric(&self, metric: impl Into<Metric>) {
let m = metric.into();
let fq_name = m.fq_name();
if self.registry_index.contains_key(&fq_name) {
return;
}
match self.registry.register(metric.clone()) {
match self.registry.register(m.as_collector()) {
Ok(_) => {
self.registry_index
.insert(fq_name, Metric::G(metric.clone()));
self.registry_index.insert(fq_name, m);
}
Err(e) => {
debug!("Failed to register {:?}:\n{}", fq_name, e)
}
}
}
fn register_counter(&self, metric: Box<IntCounter>) {
let fq_name = metric
.desc()
.first()
.map(|d| d.fq_name.clone())
.unwrap_or_default();
if self.registry_index.contains_key(&fq_name) {
return;
}
match self.registry.register(metric.clone()) {
Ok(_) => {
self.registry_index
.insert(fq_name, Metric::C(metric.clone()));
}
Err(e) => {
debug!("Failed to register {:?}:\n{}", fq_name, e)
Err(err) => {
debug!("Failed to register '{fq_name}': {err}")
}
}
}
@@ -275,4 +573,15 @@ mod tests {
"packets_sent_34_242_65_133:1789"
)
}
#[test]
fn prepend_package_name() {
let literal = prepend_package_name!("foo");
assert_eq!(literal, "nym_metrics_foo");
let bar = "bar";
let format = format!("foomp_{}", bar);
let formatted = prepend_package_name!(format);
assert_eq!(formatted, "nym_metrics_foomp_bar");
}
}
+1 -1
View File
@@ -63,4 +63,4 @@ par_signing = ["rayon"]
# but given it's not done very frequently, it shouldn't be too much of a problem
# furthermore, we can't and shouldn't dedicate the entire nym-api CPU just for verification,
# but this feature might potentially be desirable for clients.
par_verify = ["rayon"]
par_verify = ["rayon"]
@@ -4,10 +4,10 @@
use crate::ecash_group_parameters;
use crate::error::Result;
use crate::helpers::{g1_tuple_to_bytes, recover_g1_tuple};
use bls12_381::{G1Projective, Scalar};
use serde::{Deserialize, Serialize};
use subtle::Choice;
pub use bls12_381::{G1Projective, G2Projective, Scalar};
pub type SignerIndex = u64;
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
+1 -1
View File
@@ -36,7 +36,7 @@ pub mod common_types;
pub mod constants;
pub mod error;
mod helpers;
mod proofs;
pub mod proofs;
pub mod scheme;
pub mod tests;
mod traits;
@@ -115,10 +115,7 @@ pub fn aggregate_signatures(
let params = ecash_group_parameters();
// aggregate the signature
let signature = match Aggregatable::aggregate(signatures, indices) {
Ok(res) => res,
Err(err) => return Err(err),
};
let signature = Aggregatable::aggregate(signatures, indices)?;
// Ensure the aggregated signature is not an infinity point
if bool::from(signature.is_at_infinity()) {
@@ -8,10 +8,10 @@ use nym_sphinx_addressing::nodes::{
NymNodeRoutingAddress, NymNodeRoutingAddressError, MAX_NODE_ADDRESS_UNPADDED_LEN,
};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_params::PacketType;
use nym_sphinx_types::delays::Delay;
use nym_sphinx_types::{NymPacket, NymPacketError, MIN_PACKET_SIZE};
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use std::time;
@@ -43,14 +43,13 @@ impl SurbAck {
ack_key: &AckKey,
marshaled_fragment_id: [u8; 5],
average_delay: time::Duration,
topology: &NymTopology,
topology: &NymRouteProvider,
packet_type: PacketType,
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
let route = topology.random_route_to_egress(rng, recipient.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
+2 -2
View File
@@ -131,8 +131,8 @@ impl Recipient {
&self.client_encryption_key
}
pub fn gateway(&self) -> &NodeIdentity {
&self.gateway
pub fn gateway(&self) -> NodeIdentity {
self.gateway
}
pub fn to_bytes(self) -> RecipientBytes {
@@ -6,9 +6,9 @@ use nym_crypto::{generic_array::typenum::Unsigned, Digest};
use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, MAX_NODE_ADDRESS_UNPADDED_LEN};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
use nym_sphinx_types::{NymPacket, SURBMaterial, SphinxError, SURB};
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use serde::de::{Error as SerdeError, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -89,13 +89,12 @@ impl ReplySurb {
rng: &mut R,
recipient: &Recipient,
average_delay: time::Duration,
topology: &NymTopology,
topology: &NymRouteProvider,
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
let route = topology.random_route_to_egress(rng, recipient.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
@@ -110,15 +109,12 @@ impl ReplySurb {
/// Returns the expected number of bytes the [`ReplySURB`] will take after serialization.
/// Useful for deserialization from a bytes stream.
pub fn serialized_len(mix_hops: u8) -> usize {
pub fn serialized_len() -> usize {
use nym_sphinx_types::{HEADER_SIZE, NODE_ADDRESS_LENGTH, PAYLOAD_KEY_SIZE};
// the SURB itself consists of SURB_header, first hop address and set of payload keys
// (note extra 1 for the gateway)
SurbEncryptionKeySize::USIZE
+ HEADER_SIZE
+ NODE_ADDRESS_LENGTH
+ (1 + mix_hops as usize) * PAYLOAD_KEY_SIZE
// for each hop (3x mix + egress)
SurbEncryptionKeySize::USIZE + HEADER_SIZE + NODE_ADDRESS_LENGTH + 4 * PAYLOAD_KEY_SIZE
}
pub fn encryption_key(&self) -> &SurbEncryptionKey {
@@ -169,10 +169,7 @@ impl RepliableMessage {
.collect()
}
pub fn try_from_bytes(
bytes: &[u8],
num_mix_hops: u8,
) -> Result<Self, InvalidReplyRequestError> {
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, InvalidReplyRequestError> {
if bytes.len() < SENDER_TAG_SIZE + 1 {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
@@ -180,11 +177,8 @@ impl RepliableMessage {
AnonymousSenderTag::from_bytes(bytes[..SENDER_TAG_SIZE].try_into().unwrap());
let content_tag = RepliableMessageContentTag::try_from(bytes[SENDER_TAG_SIZE])?;
let content = RepliableMessageContent::try_from_bytes(
&bytes[SENDER_TAG_SIZE + 1..],
num_mix_hops,
content_tag,
)?;
let content =
RepliableMessageContent::try_from_bytes(&bytes[SENDER_TAG_SIZE + 1..], content_tag)?;
Ok(RepliableMessage {
sender_tag,
@@ -192,23 +186,20 @@ impl RepliableMessage {
})
}
pub fn serialized_size(&self, num_mix_hops: u8) -> usize {
pub fn serialized_size(&self) -> usize {
let content_type_size = 1;
SENDER_TAG_SIZE + content_type_size + self.content.serialized_size(num_mix_hops)
SENDER_TAG_SIZE + content_type_size + self.content.serialized_size()
}
}
// this recovery code is shared between all variants containing reply surbs
fn recover_reply_surbs(
bytes: &[u8],
num_mix_hops: u8,
) -> Result<(Vec<ReplySurb>, usize), InvalidReplyRequestError> {
fn recover_reply_surbs(bytes: &[u8]) -> Result<(Vec<ReplySurb>, usize), InvalidReplyRequestError> {
let mut consumed = mem::size_of::<u32>();
if bytes.len() < consumed {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
let num_surbs = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
let surb_size = ReplySurb::serialized_len(num_mix_hops);
let surb_size = ReplySurb::serialized_len();
if bytes[consumed..].len() < num_surbs as usize * surb_size {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
@@ -307,14 +298,13 @@ impl RepliableMessageContent {
fn try_from_bytes(
bytes: &[u8],
num_mix_hops: u8,
tag: RepliableMessageContentTag,
) -> Result<Self, InvalidReplyRequestError> {
if bytes.is_empty() {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
let (reply_surbs, n) = recover_reply_surbs(bytes, num_mix_hops)?;
let (reply_surbs, n) = recover_reply_surbs(bytes)?;
match tag {
RepliableMessageContentTag::Data => Ok(RepliableMessageContent::Data {
@@ -340,7 +330,7 @@ impl RepliableMessageContent {
}
}
fn serialized_size(&self, num_mix_hops: u8) -> usize {
fn serialized_size(&self) -> usize {
match self {
RepliableMessageContent::Data {
message,
@@ -348,19 +338,18 @@ impl RepliableMessageContent {
} => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag
+ reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
+ reply_surbs.len() * ReplySurb::serialized_len()
+ message.len()
}
RepliableMessageContent::AdditionalSurbs { reply_surbs } => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag + reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
num_reply_surbs_tag + reply_surbs.len() * ReplySurb::serialized_len()
}
RepliableMessageContent::Heartbeat {
additional_reply_surbs,
} => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag
+ additional_reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
num_reply_surbs_tag + additional_reply_surbs.len() * ReplySurb::serialized_len()
}
}
}
@@ -578,11 +567,11 @@ mod tests {
}
}
pub(super) fn reply_surb(rng: &mut ChaCha20Rng, num_mix_hops: u8) -> ReplySurb {
pub(super) fn reply_surb(rng: &mut ChaCha20Rng) -> ReplySurb {
// due to gateway
let num_hops = num_mix_hops + 1;
let route = (0..num_hops).map(|_| node(rng)).collect();
let delays = (0..num_hops)
const HOPS: u8 = 4;
let route = (0..HOPS).map(|_| node(rng)).collect();
let delays = (0..HOPS)
.map(|_| Delay::new_from_nanos(rng.next_u64()))
.collect();
let mut destination_bytes = [0u8; 32];
@@ -605,47 +594,40 @@ mod tests {
}
}
pub(super) fn reply_surbs(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
n: usize,
) -> Vec<ReplySurb> {
pub(super) fn reply_surbs(rng: &mut ChaCha20Rng, n: usize) -> Vec<ReplySurb> {
let mut surbs = Vec::with_capacity(n);
for _ in 0..n {
surbs.push(reply_surb(rng, num_mix_hops))
surbs.push(reply_surb(rng))
}
surbs
}
pub(super) fn repliable_content_data(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
msg_len: usize,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::Data {
message: random_vec_u8(rng, msg_len),
reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
reply_surbs: reply_surbs(rng, surbs),
}
}
pub(super) fn repliable_content_surbs(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::AdditionalSurbs {
reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
reply_surbs: reply_surbs(rng, surbs),
}
}
pub(super) fn repliable_content_heartbeat(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::Heartbeat {
additional_reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
additional_reply_surbs: reply_surbs(rng, surbs),
}
}
@@ -676,70 +658,54 @@ mod tests {
#[test]
fn serialized_size_matches_actual_serialization() {
let mut rng = fixtures::test_rng();
let num_mix_hops = 3;
let data1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 10000, 0),
content: fixtures::repliable_content_data(&mut rng, 10000, 0),
};
assert_eq!(
data1.serialized_size(num_mix_hops),
data1.into_bytes().len()
);
assert_eq!(data1.serialized_size(), data1.into_bytes().len());
let data2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 10, 100),
content: fixtures::repliable_content_data(&mut rng, 10, 100),
};
assert_eq!(
data2.serialized_size(num_mix_hops),
data2.into_bytes().len()
);
assert_eq!(data2.serialized_size(), data2.into_bytes().len());
let data3 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 100000, 1000),
content: fixtures::repliable_content_data(&mut rng, 100000, 1000),
};
assert_eq!(
data3.serialized_size(num_mix_hops),
data3.into_bytes().len()
);
assert_eq!(data3.serialized_size(), data3.into_bytes().len());
let additional_surbs1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1),
content: fixtures::repliable_content_surbs(&mut rng, 1),
};
assert_eq!(
additional_surbs1.serialized_size(num_mix_hops),
additional_surbs1.serialized_size(),
additional_surbs1.into_bytes().len()
);
let additional_surbs2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1000),
content: fixtures::repliable_content_surbs(&mut rng, 1000),
};
assert_eq!(
additional_surbs2.serialized_size(num_mix_hops),
additional_surbs2.serialized_size(),
additional_surbs2.into_bytes().len()
);
let heartbeat1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1),
content: fixtures::repliable_content_heartbeat(&mut rng, 1),
};
assert_eq!(
heartbeat1.serialized_size(num_mix_hops),
heartbeat1.into_bytes().len()
);
assert_eq!(heartbeat1.serialized_size(), heartbeat1.into_bytes().len());
let heartbeat2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1000),
content: fixtures::repliable_content_heartbeat(&mut rng, 1000),
};
assert_eq!(
heartbeat2.serialized_size(num_mix_hops),
heartbeat2.into_bytes().len()
);
assert_eq!(heartbeat2.serialized_size(), heartbeat2.into_bytes().len());
}
}
@@ -750,49 +716,33 @@ mod tests {
#[test]
fn serialized_size_matches_actual_serialization() {
let mut rng = fixtures::test_rng();
let num_mix_hops = 3;
let data1 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 10000, 0);
assert_eq!(
data1.serialized_size(num_mix_hops),
data1.into_bytes().len()
);
let data1 = fixtures::repliable_content_data(&mut rng, 10000, 0);
assert_eq!(data1.serialized_size(), data1.into_bytes().len());
let data2 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 10, 100);
assert_eq!(
data2.serialized_size(num_mix_hops),
data2.into_bytes().len()
);
let data2 = fixtures::repliable_content_data(&mut rng, 10, 100);
assert_eq!(data2.serialized_size(), data2.into_bytes().len());
let data3 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 100000, 1000);
assert_eq!(
data3.serialized_size(num_mix_hops),
data3.into_bytes().len()
);
let data3 = fixtures::repliable_content_data(&mut rng, 100000, 1000);
assert_eq!(data3.serialized_size(), data3.into_bytes().len());
let additional_surbs1 = fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1);
let additional_surbs1 = fixtures::repliable_content_surbs(&mut rng, 1);
assert_eq!(
additional_surbs1.serialized_size(num_mix_hops),
additional_surbs1.serialized_size(),
additional_surbs1.into_bytes().len()
);
let additional_surbs2 = fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1000);
let additional_surbs2 = fixtures::repliable_content_surbs(&mut rng, 1000);
assert_eq!(
additional_surbs2.serialized_size(num_mix_hops),
additional_surbs2.serialized_size(),
additional_surbs2.into_bytes().len()
);
let heartbeat1 = fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1);
assert_eq!(
heartbeat1.serialized_size(num_mix_hops),
heartbeat1.into_bytes().len()
);
let heartbeat1 = fixtures::repliable_content_heartbeat(&mut rng, 1);
assert_eq!(heartbeat1.serialized_size(), heartbeat1.into_bytes().len());
let heartbeat2 = fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1000);
assert_eq!(
heartbeat2.serialized_size(num_mix_hops),
heartbeat2.into_bytes().len()
);
let heartbeat2 = fixtures::repliable_content_heartbeat(&mut rng, 1000);
assert_eq!(heartbeat2.serialized_size(), heartbeat2.into_bytes().len());
}
}
+6 -17
View File
@@ -69,11 +69,11 @@ pub mod monitoring {
}
}
pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey, hops: u8) {
pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey) {
if enabled() {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_SENT.entry(id).or_default();
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination, hops);
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination);
entry.push(s);
}
}
@@ -82,16 +82,11 @@ pub mod monitoring {
#[derive(Debug, Clone)]
pub struct FragmentMixParams {
destination: PublicKey,
hops: u8,
}
impl FragmentMixParams {
pub fn destination(&self) -> &PublicKey {
&self.destination
}
pub fn hops(&self) -> u8 {
self.hops
pub fn destination(&self) -> PublicKey {
self.destination
}
}
@@ -105,14 +100,8 @@ pub struct SentFragment {
}
impl SentFragment {
fn new(
header: FragmentHeader,
at: u64,
client_nonce: i32,
destination: PublicKey,
hops: u8,
) -> Self {
let mixnet_params = FragmentMixParams { destination, hops };
fn new(header: FragmentHeader, at: u64, client_nonce: i32, destination: PublicKey) -> Self {
let mixnet_params = FragmentMixParams { destination };
SentFragment {
header,
at,
+5 -8
View File
@@ -10,11 +10,9 @@ use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_chunking::fragment::COVER_FRAG_ID;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, PacketType, DEFAULT_NUM_MIX_HOPS,
};
use nym_sphinx_params::{PacketEncryptionAlgorithm, PacketHkdfAlgorithm, PacketType};
use nym_sphinx_types::NymPacket;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use std::time;
@@ -36,7 +34,7 @@ pub enum CoverMessageError {
pub fn generate_loop_cover_surb_ack<R>(
rng: &mut R,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
full_address: &Recipient,
average_ack_delay: time::Duration,
@@ -59,7 +57,7 @@ where
#[allow(clippy::too_many_arguments)]
pub fn generate_loop_cover_packet<R>(
rng: &mut R,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
full_address: &Recipient,
average_ack_delay: time::Duration,
@@ -118,8 +116,7 @@ where
.chain(cover_content)
.collect();
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, full_address.gateway())?;
let route = topology.random_route_to_egress(rng, full_address.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
let destination = full_address.as_sphinx_destination();
-4
View File
@@ -16,10 +16,6 @@ pub mod packet_sizes;
pub mod packet_types;
pub mod packet_version;
// If somebody can provide an argument why it might be reasonable to have more than 255 mix hops,
// I will change this to [`usize`]
pub const DEFAULT_NUM_MIX_HOPS: u8 = 3;
// TODO: not entirely sure how to feel about those being defined here, ideally it'd be where [`Fragment`]
// is defined, but that'd introduce circular dependencies as the acknowledgements crate also needs
// access to that
+7 -7
View File
@@ -59,30 +59,30 @@ pub enum InvalidPacketSize {
pub enum PacketSize {
// for example instant messaging use case
#[default]
#[serde(rename = "regular")]
#[serde(alias = "regular")]
RegularPacket = 1,
// for sending SURB-ACKs
#[serde(rename = "ack")]
#[serde(alias = "ack")]
AckPacket = 2,
// for example for streaming fast and furious in uncompressed 10bit 4K HDR quality
#[serde(rename = "extended32")]
#[serde(alias = "extended32")]
ExtendedPacket32 = 3,
// for example for streaming fast and furious in heavily compressed lossy RealPlayer quality
#[serde(rename = "extended8")]
#[serde(alias = "extended8")]
ExtendedPacket8 = 4,
// for example for streaming fast and furious in compressed XviD quality
#[serde(rename = "extended16")]
#[serde(alias = "extended16")]
ExtendedPacket16 = 5,
#[serde(rename = "outfox_regular")]
#[serde(alias = "outfox_regular")]
OutfoxRegularPacket = 6,
// for sending SURB-ACKs
#[serde(rename = "outfox_ack")]
#[serde(alias = "outfox_ack")]
OutfoxAckPacket = 7,
}
+3 -3
View File
@@ -23,17 +23,17 @@ pub enum PacketType {
/// Represents 'normal' packet sent through the network that should be delayed by an appropriate
/// value at each hop.
#[default]
#[serde(rename = "mix")]
#[serde(alias = "mix")]
#[serde(alias = "sphinx")]
Mix = 0,
/// Represents a packet that should be sent through the network as fast as possible.
#[deprecated]
#[serde(rename = "unsupported-mix-vpn")]
#[serde(alias = "unsupported-mix-vpn")]
Vpn = 1,
/// Abusing this to add Outfox support
#[serde(rename = "outfox")]
#[serde(alias = "outfox")]
Outfox = 2,
}

Some files were not shown because too many files have changed in this diff Show More