Compare commits

...

129 Commits

Author SHA1 Message Date
Drazen 5ed7da9bdb Fix orphan ! 2025-02-25 10:42:51 +01:00
Drazen 9bedda8a24 Remove prints 2025-02-25 10:39:20 +01:00
Drazen 7d601782fb Disable acks, friendlier SURB construction 2025-02-21 14:12:55 +01:00
dynco-nym 6b24f081e1 Add extra args for the probe (#5499) 2025-02-21 12:14:37 +01:00
Jędrzej Stuczyński 6e5d0dac1b feature: allow nym-nodes to understand future version of sphinx packets (#5496)
* use updated sphinx crate

* updated outfox usage of keygen in tests

* use x25519 in outfox

* remove redundant constructor

* adjusted key convertion traits
2025-02-21 11:06:07 +00:00
mfahampshire 5f2740bf66 add vercel config file: turn off autodeploy on master (#5490) 2025-02-19 11:03:04 +00:00
Tommy Verrall ecb15034d3 Merge pull request #5489 from nymtech/fix/contracts-cargo-lock
fix: Cargo.lock for contracts
2025-02-19 11:41:30 +01:00
Fran Arbanas bd49c222a3 fix: Cargo.lock for contracts 2025-02-19 09:06:34 +01:00
Jack Wampler 50b044a100 Support static routes for HTTP requests (#5487)
allow static dns override
2025-02-18 11:53:32 -07:00
Jack Wampler ba645694d4 Provide Interval context with node descriptor endpoints (#5456)
send interval with paginated cached node responses - if epoch_id is in params and current send noupdates
2025-02-18 09:02:34 -07:00
Jack Wampler be44811a65 centralize API request interface and add preffered compression in responses (#5450) 2025-02-18 08:58:35 -07:00
import this 62e1d32e4f [DOCs:/operators]: Update sgp locations (#5486) 2025-02-18 11:39:45 +00:00
benedetta davico 9a4bbe1d67 Merge pull request #5484 from nymtech/release/2025.3-ruta
Release/2025.3 ruta to develop
2025-02-18 09:54:04 +01:00
dependabot[bot] 98090d18b4 build(deps): bump the patch-updates group across 1 directory with 3 updates (#5482) 2025-02-18 01:21:46 +01:00
dependabot[bot] 79f8066c13 build(deps): bump http from 1.1.0 to 1.2.0 (#5472)
Bumps [http](https://github.com/hyperium/http) from 1.1.0 to 1.2.0.
- [Release notes](https://github.com/hyperium/http/releases)
- [Changelog](https://github.com/hyperium/http/blob/master/CHANGELOG.md)
- [Commits](https://github.com/hyperium/http/compare/v1.1.0...v1.2.0)

---
updated-dependencies:
- dependency-name: http
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-18 00:45:21 +01:00
dependabot[bot] d0209766a3 build(deps): bump celes from 2.4.0 to 2.5.0 (#5469)
Bumps [celes](https://github.com/mikelodder7/celes) from 2.4.0 to 2.5.0.
- [Commits](https://github.com/mikelodder7/celes/commits/2.5.0)

---
updated-dependencies:
- dependency-name: celes
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-18 00:24:50 +01:00
dependabot[bot] 844030091f build(deps): bump colored from 2.1.0 to 2.2.0 (#5470)
Bumps [colored](https://github.com/mackwic/colored) from 2.1.0 to 2.2.0.
- [Release notes](https://github.com/mackwic/colored/releases)
- [Changelog](https://github.com/colored-rs/colored/blob/master/CHANGELOG.md)
- [Commits](https://github.com/mackwic/colored/compare/v2.1.0...v2.2.0)

---
updated-dependencies:
- dependency-name: colored
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-18 00:24:36 +01:00
dependabot[bot] a7a421b006 build(deps): bump utoipa-swagger-ui from 8.0.3 to 8.1.0 (#5471)
Bumps [utoipa-swagger-ui](https://github.com/juhaku/utoipa) from 8.0.3 to 8.1.0.
- [Release notes](https://github.com/juhaku/utoipa/releases)
- [Changelog](https://github.com/juhaku/utoipa/blob/master/utoipa-rapidoc/CHANGELOG.md)
- [Commits](https://github.com/juhaku/utoipa/compare/utoipa-swagger-ui-8.0.3...utoipa-swagger-ui-8.1.0)

---
updated-dependencies:
- dependency-name: utoipa-swagger-ui
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-18 00:24:24 +01:00
import this 6680fbd61a [DOCs/operators]: Relase notes for v2025.3-ruta & SGPv2 form for public (#5481)
* new relase notess + SG2 rules

* PR ready to review

* PR ready to review

* fix review comments
2025-02-17 16:48:44 +00:00
Jack Wampler fe2d21cf88 Add a middleware layer to the nym api allowing for data compression (#5451) 2025-02-17 09:05:24 -07:00
Jon Häggblad eeaca9fc10 Run cargo autoinherit (#5460)
* cargo autoinherit

* sort
2025-02-17 15:05:27 +00:00
Jon Häggblad 7255f79b9c Merge pull request #5435 from nymtech/jon/task-all-stop
Remove all recv_with_delay and add shutdown condition to loops in client-core
2025-02-17 15:54:34 +01:00
Tommy Verrall 589069504a Merge pull request #5463 from nymtech/dependabot/npm_and_yarn/docker/typescript_client/upload_contract/elliptic-6.6.1
build(deps): bump elliptic from 6.5.4 to 6.6.1 in /docker/typescript_client/upload_contract
2025-02-17 14:48:09 +01:00
Jon Häggblad 4da7bc7442 Fix wasm client stats sender task client 2025-02-17 14:37:34 +01:00
Jon Häggblad 35be8de9f1 Update task fork names to be consistent 2025-02-17 14:37:34 +01:00
Jon Häggblad 2b14a9e6f8 Fix unexpected drop: 2025-02-17 14:37:34 +01:00
Jon Häggblad e9269da897 Fix using is_shutdown_poll 2025-02-17 14:37:34 +01:00
Jon Häggblad 7bceeadf16 Include MessageHandler 2025-02-17 14:37:34 +01:00
Jon Häggblad e72ce8fa92 Fix bug with ack control task client 2025-02-17 14:37:34 +01:00
Jon Häggblad 1ccdd5d660 Also remove a bunch of panics in the native client 2025-02-17 14:37:34 +01:00
Jon Häggblad c6d38d3c4f Also include topology refresher and mix traffic controller 2025-02-17 14:37:34 +01:00
Jon Häggblad e8e2bf107f Wrap more send errors in shutdown check 2025-02-17 14:37:34 +01:00
Jon Häggblad efe4e5c1c1 Move TaskClient to Self in few tasks 2025-02-17 14:37:34 +01:00
Jon Häggblad 2230609a72 Use a TaskClient in client stats sender 2025-02-17 14:37:34 +01:00
Jon Häggblad 6d80c37b21 Tweak logging 2025-02-17 14:37:34 +01:00
Jon Häggblad cb8b4c56af Remove a bunch of unwraps from client-core 2025-02-17 14:37:34 +01:00
Jon Häggblad 4d486abfef Remove all recv_with_delay and add shutdown condition to loops in client-core
Inside client-core we want to prepare the ground for moving a behaviour
close to what we have in the vpn client.

Remove all the recv_with_delay since we want to just stop

Add shutdown condition to all select loops to guard against the shutdown
listener being polled inside the select blocks.
2025-02-17 14:37:34 +01:00
Jędrzej Stuczyński b694845e4c added missing import to doctest (#5480) 2025-02-17 13:27:47 +00:00
Jon Häggblad 5cb2800d15 Trigger contracts CI on main workspace Cargo changes (#5477)
Since the contracts workspace depends on the common code in the main
workspace, and since the contracts are critical to not have regressions
in, trigger contracts CI on any changes to the workspace
Cargo.toml and lock files.
2025-02-17 13:00:40 +01:00
Jędrzej Stuczyński fd14394958 adjusted TestSetup::new_complex to ensure bonded node's existence (#5478) 2025-02-17 11:52:53 +00:00
Drazen Urch 134883522d Seedable clients (#5440)
* Seedable clients

* Finalize seedable PR

* Address PR comments

* More generic DerivationMaterials init

* Fix xoring the wrong index

* Tests
2025-02-17 00:00:17 +01:00
dependabot[bot] 221e01e9b8 build(deps): bump elliptic in /docker/typescript_client/upload_contract
Bumps [elliptic](https://github.com/indutny/elliptic) from 6.5.4 to 6.6.1.
- [Commits](https://github.com/indutny/elliptic/compare/v6.5.4...v6.6.1)

---
updated-dependencies:
- dependency-name: elliptic
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-02-14 05:30:38 +00:00
Jon Häggblad dcc48db301 Fix clippy::precedence (#5457)
* Fix clippy::precedence

* Fix clippy::useless_conversion
2025-02-13 11:05:39 +00:00
dainius-nym 7528109693 fix: update fx average rate calcs to ignore 0 values (#5454)
* fix: update fx average rate calcs to ignore 0 values

* chore: bump version and format the code
2025-02-13 09:50:32 +00:00
Jon Häggblad 203d682f2c Upgrade tower to 0.5.2 (#5446) 2025-02-13 10:43:39 +01:00
dependabot[bot] 589575eed8 build(deps): bump publicsuffix from 2.2.3 to 2.3.0 (#5367)
Bumps [publicsuffix](https://github.com/rushmorem/publicsuffix) from 2.2.3 to 2.3.0.
- [Release notes](https://github.com/rushmorem/publicsuffix/releases)
- [Commits](https://github.com/rushmorem/publicsuffix/compare/v2.2.3...v2.3.0)

---
updated-dependencies:
- dependency-name: publicsuffix
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-13 00:50:34 +01:00
Jon Häggblad 35bf1cc717 Disable debug in wasm and wallet workflows too (#5459) 2025-02-13 00:16:32 +01:00
dependabot[bot] f5e02d5652 build(deps): bump hickory-proto from 0.24.2 to 0.24.3 (#5444)
* build(deps): bump hickory-proto from 0.24.2 to 0.24.3

Bumps [hickory-proto](https://github.com/hickory-dns/hickory-dns) from 0.24.2 to 0.24.3.
- [Release notes](https://github.com/hickory-dns/hickory-dns/releases)
- [Changelog](https://github.com/hickory-dns/hickory-dns/blob/v0.24.3/CHANGELOG.md)
- [Commits](https://github.com/hickory-dns/hickory-dns/compare/v0.24.2...v0.24.3)

---
updated-dependencies:
- dependency-name: hickory-proto
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>

* Don't downgrade rand_core

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jon Häggblad <jon.haggblad@gmail.com>
2025-02-13 00:09:03 +01:00
dependabot[bot] 2fc641a7ff build(deps): bump hyper from 1.4.1 to 1.6.0 (#5416)
Bumps [hyper](https://github.com/hyperium/hyper) from 1.4.1 to 1.6.0.
- [Release notes](https://github.com/hyperium/hyper/releases)
- [Changelog](https://github.com/hyperium/hyper/blob/master/CHANGELOG.md)
- [Commits](https://github.com/hyperium/hyper/compare/v1.4.1...v1.6.0)

---
updated-dependencies:
- dependency-name: hyper
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-13 00:05:39 +01:00
dependabot[bot] 0ccca19cc2 build(deps): bump uniffi_build from 0.25.3 to 0.29.0 (#5448)
* build(deps): bump uniffi_build from 0.25.3 to 0.29.0

Bumps [uniffi_build](https://github.com/mozilla/uniffi-rs) from 0.25.3 to 0.29.0.
- [Changelog](https://github.com/mozilla/uniffi-rs/blob/main/CHANGELOG.md)
- [Commits](https://github.com/mozilla/uniffi-rs/compare/v0.25.3...v0.29.0)

---
updated-dependencies:
- dependency-name: uniffi_build
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Also update uniffi to match uniffi_build

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jon Häggblad <jon.haggblad@gmail.com>
2025-02-12 23:56:02 +01:00
Jon Häggblad a07e567eb2 Set debug to false in ci-build.yml (#5458) 2025-02-12 23:08:44 +01:00
Jon Häggblad f3400a0aa5 Add helper to extract a list of sqlite files with journal files wal/shm (#5452)
Co-authored-by: Andrej Mihajlov <andrej@nymtech.net>
2025-02-12 17:29:06 +01:00
dainius-nym bf8614a545 Feature/add gbp currency (#5453)
* features: add gbp currency to the fx price scrapper

* regenerated sqlx queries

* nump cargo version

---------

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
2025-02-12 13:16:34 +00:00
dynco-nym b7e3687757 Dz nym node stats (#5418)
* Remove blacklisted, inactive, reserve fields

* Remove gw.blacklisted

* Remove blacklisted and bonded count

* DB operations

* Improve logging

* Remove unused functions

* get_nym_nodes for scraping WIP

* Separate nym_nodes from mixnode stats
- fixes FOREIGN_KEY_CONSTRAINT error when storing
  stats for nym_nodes which aren't in mixnodes table

* Daily aggregation works

* mixnodes/stats exposes correct info

* Undo unnecessary tidbits

* Replace obsolete stats

* Add total_stake

* Bump cargo.toml version

* Rename MixingNodeKind for better clarity
2025-02-11 12:07:15 +01:00
windy-ux b9b969b7d3 + specify worker-src (#5443)
+ CSP from main website

Co-authored-by: benedetta davico <46782255+benedettadavico@users.noreply.github.com>
2025-02-11 10:19:12 +00:00
dependabot[bot] 47303e5b3b build(deps): bump openssl from 0.10.56 to 0.10.70 in /nym-wallet (#5422)
Bumps [openssl](https://github.com/sfackler/rust-openssl) from 0.10.56 to 0.10.70.
- [Release notes](https://github.com/sfackler/rust-openssl/releases)
- [Commits](https://github.com/sfackler/rust-openssl/compare/openssl-v0.10.56...openssl-v0.10.70)

---
updated-dependencies:
- dependency-name: openssl
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-10 22:27:42 +01:00
dependabot[bot] 6b38ffd4f3 build(deps): bump hickory-proto from 0.24.2 to 0.24.3 in /nym-wallet (#5445)
Bumps [hickory-proto](https://github.com/hickory-dns/hickory-dns) from 0.24.2 to 0.24.3.
- [Release notes](https://github.com/hickory-dns/hickory-dns/releases)
- [Changelog](https://github.com/hickory-dns/hickory-dns/blob/v0.24.3/CHANGELOG.md)
- [Commits](https://github.com/hickory-dns/hickory-dns/compare/v0.24.2...v0.24.3)

---
updated-dependencies:
- dependency-name: hickory-proto
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-10 22:19:43 +01:00
import this 169c313404 [DOCs/operators]: Email templates update (#5441)
* new intro template

* Update dmca_response.md
2025-02-10 19:11:03 +00:00
benedettadavico a3e19b4563 update changelog 2025-02-10 18:14:47 +01:00
dependabot[bot] ccf430ea62 build(deps): bump the patch-updates group across 1 directory with 10 updates (#5439)
Bumps the patch-updates group with 10 updates in the / directory:

| Package | From | To |
| --- | --- | --- |
| [async-trait](https://github.com/dtolnay/async-trait) | `0.1.85` | `0.1.86` |
| [clap](https://github.com/clap-rs/clap) | `4.5.27` | `4.5.28` |
| [comfy-table](https://github.com/nukesor/comfy-table) | `7.1.3` | `7.1.4` |
| [hickory-resolver](https://github.com/hickory-dns/hickory-dns) | `0.24.2` | `0.24.3` |
| [once_cell](https://github.com/matklad/once_cell) | `1.20.2` | `1.20.3` |
| [pin-project](https://github.com/taiki-e/pin-project) | `1.1.8` | `1.1.9` |
| [serde_json_path](https://github.com/hiltontj/serde_json_path) | `0.7.1` | `0.7.2` |
| [toml](https://github.com/toml-rs/toml) | `0.8.19` | `0.8.20` |
| [cosmrs](https://github.com/cosmos/cosmos-rust) | `0.21.0` | `0.21.1` |
| [tokio-postgres](https://github.com/sfackler/rust-postgres) | `0.7.12` | `0.7.13` |



Updates `async-trait` from 0.1.85 to 0.1.86
- [Release notes](https://github.com/dtolnay/async-trait/releases)
- [Commits](https://github.com/dtolnay/async-trait/compare/0.1.85...0.1.86)

Updates `clap` from 4.5.27 to 4.5.28
- [Release notes](https://github.com/clap-rs/clap/releases)
- [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md)
- [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.5.27...clap_complete-v4.5.28)

Updates `comfy-table` from 7.1.3 to 7.1.4
- [Release notes](https://github.com/nukesor/comfy-table/releases)
- [Changelog](https://github.com/Nukesor/comfy-table/blob/main/CHANGELOG.md)
- [Commits](https://github.com/nukesor/comfy-table/compare/v7.1.3...v7.1.4)

Updates `hickory-resolver` from 0.24.2 to 0.24.3
- [Release notes](https://github.com/hickory-dns/hickory-dns/releases)
- [Changelog](https://github.com/hickory-dns/hickory-dns/blob/v0.24.3/CHANGELOG.md)
- [Commits](https://github.com/hickory-dns/hickory-dns/compare/v0.24.2...v0.24.3)

Updates `once_cell` from 1.20.2 to 1.20.3
- [Changelog](https://github.com/matklad/once_cell/blob/master/CHANGELOG.md)
- [Commits](https://github.com/matklad/once_cell/compare/v1.20.2...v1.20.3)

Updates `pin-project` from 1.1.8 to 1.1.9
- [Release notes](https://github.com/taiki-e/pin-project/releases)
- [Changelog](https://github.com/taiki-e/pin-project/blob/main/CHANGELOG.md)
- [Commits](https://github.com/taiki-e/pin-project/compare/v1.1.8...v1.1.9)

Updates `serde_json_path` from 0.7.1 to 0.7.2
- [Release notes](https://github.com/hiltontj/serde_json_path/releases)
- [Changelog](https://github.com/hiltontj/serde_json_path/blob/main/CHANGELOG.md)
- [Commits](https://github.com/hiltontj/serde_json_path/compare/v0.7.1...v0.7.2)

Updates `toml` from 0.8.19 to 0.8.20
- [Commits](https://github.com/toml-rs/toml/compare/toml-v0.8.19...toml-v0.8.20)

Updates `cosmrs` from 0.21.0 to 0.21.1
- [Commits](https://github.com/cosmos/cosmos-rust/compare/cosmrs/v0.21.0...cosmrs/v0.21.1)

Updates `tokio-postgres` from 0.7.12 to 0.7.13
- [Release notes](https://github.com/sfackler/rust-postgres/releases)
- [Commits](https://github.com/sfackler/rust-postgres/compare/tokio-postgres-v0.7.12...tokio-postgres-v0.7.13)

---
updated-dependencies:
- dependency-name: async-trait
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: clap
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: comfy-table
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: hickory-resolver
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: once_cell
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: pin-project
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: serde_json_path
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: toml
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: cosmrs
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: tokio-postgres
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-02-10 14:59:07 +01:00
import this cf13b79e93 [DOCs/operators]: Clarify SGPv2 program rules (#5434) 2025-02-07 11:31:34 +00:00
Jon Häggblad 134a0196f8 Disable the test for checking the remaining bandwidth in nym-node-status-api (#5425)
* Disable the test for checking the remaining bandwidth in nym-node-status-api

This check fails almost every time on CI, possibly due to rate limiting?
It's not good to disable the check, but it's blocking CI as it stands
now. Given that we have the check above for locating the ip, we at least
have a little coverage.

* Remove unused
2025-02-07 11:39:37 +01:00
benedettadavico 54aef7c242 bump binary versions 2025-02-07 10:21:16 +01:00
benedetta davico 6c45c9f0b0 Merge pull request #5396 from nymtech/fix/wallet-explorer-url
Change Explorer URL to new smooshed nodes
2025-02-06 16:47:26 +01:00
import this b5afae0916 [DOCs:operators]: Update nym-node specs (#5433)
* Update nym-node-specs.mdx

* update specs - PR finished
2025-02-06 15:43:33 +00:00
benedetta davico 988eca857f Merge pull request #5431 from nymtech/drazen/forget-cli-client
Push down forget me to client configs
2025-02-06 15:25:04 +01:00
benedetta davico 3c05db2874 Merge pull request #5428 from nymtech/release/2025.2-hu
Merge release/2025.2-hu to develop
2025-02-06 13:58:47 +01:00
durch a8e268f84a Push down forget me to client configs 2025-02-06 13:15:58 +01:00
benedetta davico ac22533ecd Merge pull request #5429 from nymtech/feature/fix_develop_merge
Feature/fix develop merge
2025-02-06 13:12:31 +01:00
Bogdan-Ștefan Neacşu bdc0b875a4 Merge remote-tracking branch 'origin/develop' into release/2025.2-hu 2025-02-06 13:16:51 +02:00
import this d7b67c1408 [DOCs]: hotfix relative path url (#5427) 2025-02-06 10:15:45 +00:00
import this 606e29ebb0 [DOCs/operators]: Release notes, new specs, legal pages (#5419)
* add legal support notes

* write dev release notes

* create new legal page and add templates

* remove node_api_check to backup

* templates page

* update specs

* update backup and restore node

* PR ready for review

* address review comment

* last tweaks - PR finished

* last tweaks - PR finished
2025-02-05 15:19:56 +00:00
Bogdan-Ștefan Neacşu 21e3c1538d Fix statistics shutdown (#5426) 2025-02-05 16:06:46 +02:00
mfahampshire 0fc7cc657d Max/openapi docs update (#5292)
* spacing + working openapi local for nymapi

* sandbox nyx rest api

* add now working nym-api openapi json url to component
2025-02-05 14:05:44 +00:00
dependabot[bot] 23a7f01c05 build(deps): bump tokio from 1.40.0 to 1.43.0 (#5370)
* build(deps): bump tokio from 1.40.0 to 1.43.0

Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.40.0 to 1.43.0.
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-1.40.0...tokio-1.43.0)

---
updated-dependencies:
- dependency-name: tokio
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* wip: test if token is set

* Try with an artifical delay between calls

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Jon Häggblad <jon.haggblad@gmail.com>
2025-02-05 10:38:28 +01:00
Jon Häggblad 3a21cfa1ab Make wait_for_graceful_shutdown to be pub (#5424) 2025-02-05 08:58:25 +01:00
Jack Wampler 1d2e6d916c Use secure DNS for websocket connection establishment (#5386)
implementation of secure dns for websocket connection establishment. depends on #5355
2025-02-04 11:20:39 -07:00
benedettadavico 4c2bf3642e update changelong 2025-02-04 10:29:48 +01:00
Jędrzej Stuczyński 70e2e32385 Feature/remove double spending bloomfilter (#5417)
* removed all uses of the bloomfilter inside nym-api

* changed http status code on bf queries
2025-02-03 16:11:13 +00:00
Jon Häggblad 68a192daa3 Upgrade to thiserror 2.0 (#5414)
* Upgrade to thiserror 2.0

* Remove line macros in vesting contract error type

* Name positional arguments in GatewayRequestsError

* Named positional argument

* Revert "Remove line macros in vesting contract error type"

This reverts commit 49f937da3f.

* Use positional arguments for line
2025-02-03 10:50:11 +01:00
dependabot[bot] d6aacae14e build(deps): bump the patch-updates group across 1 directory with 9 updates (#5406)
Bumps the patch-updates group with 9 updates in the / directory:

| Package | From | To |
| --- | --- | --- |
| [clap](https://github.com/clap-rs/clap) | `4.5.26` | `4.5.27` |
| [clap_complete](https://github.com/clap-rs/clap) | `4.5.40` | `4.5.44` |
| [getset](https://github.com/jbaublitz/getset) | `0.1.3` | `0.1.4` |
| [indicatif](https://github.com/console-rs/indicatif) | `0.17.9` | `0.17.11` |
| [log](https://github.com/rust-lang/log) | `0.4.22` | `0.4.25` |
| [pin-project](https://github.com/taiki-e/pin-project) | `1.1.7` | `1.1.8` |
| [semver](https://github.com/dtolnay/semver) | `1.0.24` | `1.0.25` |
| [serde_json](https://github.com/serde-rs/json) | `1.0.135` | `1.0.138` |
| [bip32](https://github.com/iqlusioninc/crates) | `0.5.2` | `0.5.3` |



Updates `clap` from 4.5.26 to 4.5.27
- [Release notes](https://github.com/clap-rs/clap/releases)
- [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md)
- [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.5.26...clap_complete-v4.5.27)

Updates `clap_complete` from 4.5.40 to 4.5.44
- [Release notes](https://github.com/clap-rs/clap/releases)
- [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md)
- [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.5.40...clap_complete-v4.5.44)

Updates `getset` from 0.1.3 to 0.1.4
- [Release notes](https://github.com/jbaublitz/getset/releases)
- [Commits](https://github.com/jbaublitz/getset/compare/0.1.3...0.1.4)

Updates `indicatif` from 0.17.9 to 0.17.11
- [Release notes](https://github.com/console-rs/indicatif/releases)
- [Commits](https://github.com/console-rs/indicatif/compare/0.17.9...0.17.11)

Updates `log` from 0.4.22 to 0.4.25
- [Release notes](https://github.com/rust-lang/log/releases)
- [Changelog](https://github.com/rust-lang/log/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/log/compare/0.4.22...0.4.25)

Updates `pin-project` from 1.1.7 to 1.1.8
- [Release notes](https://github.com/taiki-e/pin-project/releases)
- [Changelog](https://github.com/taiki-e/pin-project/blob/main/CHANGELOG.md)
- [Commits](https://github.com/taiki-e/pin-project/compare/v1.1.7...v1.1.8)

Updates `semver` from 1.0.24 to 1.0.25
- [Release notes](https://github.com/dtolnay/semver/releases)
- [Commits](https://github.com/dtolnay/semver/compare/1.0.24...1.0.25)

Updates `serde_json` from 1.0.135 to 1.0.138
- [Release notes](https://github.com/serde-rs/json/releases)
- [Commits](https://github.com/serde-rs/json/compare/v1.0.135...v1.0.138)

Updates `bip32` from 0.5.2 to 0.5.3
- [Commits](https://github.com/iqlusioninc/crates/compare/bip32/v0.5.2...bip32/v0.5.3)

---
updated-dependencies:
- dependency-name: clap
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: clap_complete
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: getset
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: indicatif
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: log
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: pin-project
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: semver
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: serde_json
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
- dependency-name: bip32
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: patch-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-01-31 11:27:52 +01:00
Jon Häggblad 6f00023d09 Send shutdown instead of panic when reaching max fail (#5398)
* Send shutdown instead of panic when reaching max fail

* Stop quicker on failure

* Update comment
2025-01-31 10:39:37 +01:00
Tommy Verrall 982ec56874 Merge pull request #5300 from nymtech/feat/nymnode-entrypoint-docker
Nymnode entrypoint docker
2025-01-31 09:08:50 +01:00
Jack Wampler 5dcc1ed6dc Merge pull request #5401 from nymtech/jmwample/nym-api-route
Relocate a validator api function
2025-01-30 09:50:58 -07:00
Jon Häggblad d62bc0a10b Downgrade harmless log message from info to debug (#5403) 2025-01-30 13:36:06 +01:00
jmwample 882003c08c fmt 2025-01-29 14:58:04 -07:00
jmwample b71a491872 relocate a validator api function 2025-01-29 14:55:16 -07:00
Yana Matrosova 8f48ae08c4 Redirect from mixnode page to nodes page (#5397)
Co-authored-by: Yana <yanok87@users.noreply.github.com>
2025-01-28 17:30:45 +00:00
Yana 31b9623407 Change Explorer URL to new smooshed nodes 2025-01-28 13:00:01 +02:00
Jędrzej Stuczyński 6d90ffdd2c reduce log severity for checking topology validity (#5395) 2025-01-28 09:29:51 +00:00
Drazen Urch 9550934d1f Pre shutdown hooks for GatewayClient (#5381) 2025-01-27 20:00:37 +01:00
import this ff91d4619e [HOTFIX/DOCs]: Update pre-built-binaries.mdx (#5385) 2025-01-24 10:31:19 +00:00
Jack Wampler 9d01474277 Merge pull request #5355 from nymtech/jmwample/dot
DNS resolver configuration for internal HTTP client lookups
2025-01-23 10:41:39 -07:00
jmwample 8d10552d7c hickory dns error mgmt 2025-01-23 08:29:56 -07:00
import this 04fd197f5a [DOCs]: Add more backup guides, clean up deprecated, fix URLs, add sha verf (#5384)
* fix socks5 syntax

* reshape backup and restore and add proxy

* fix URLS

* remove deprecated node-api-check - archived for when there is time to maintain the tool

* add hash verification step
2025-01-23 15:14:31 +00:00
Jon Häggblad 4eadaf8292 Fix missing path triggers for CI (#5380)
* Fix missing path triggers for CI

* Sort alphabetically to make it easier to maintain
2025-01-22 23:46:07 +01:00
jmwample 32e39ebc6b square cargo.lock with upstream branch 2025-01-22 14:32:04 -07:00
jmwample 117eb83a0b managing returned iterators 2025-01-22 14:30:16 -07:00
jmwample c964c137f4 fmt 2025-01-22 14:30:16 -07:00
jmwample 35b43d5b20 missed Lookup strategy 2025-01-22 14:30:16 -07:00
jmwample bf88b34898 fix wasm compile (exclude wasm target from DoH / DoT) 2025-01-22 14:30:16 -07:00
jmwample 93140a1aa7 minor fixes for clarity, interface access, and wasm exclusion 2025-01-22 14:30:16 -07:00
jmwample f594bfc9ab remove h3 because it causes an error 2025-01-22 14:30:12 -07:00
jmwample 4327e2945a DNS-over-X for internal domain name (i.e. API client) lookups 2025-01-22 14:29:44 -07:00
Bogdan-Ștefan Neacşu 8670693952 Uncouple storage reference for bandwidth client (#5372) 2025-01-22 12:12:06 +01:00
mfahampshire 57c38ef222 temp remove cargodoc command (#5375) 2025-01-22 10:09:47 +00:00
mfahampshire 8e05386a0b Max/tssdk docs maintenance (#5364)
* add temp warning
2025-01-20 13:02:56 +00:00
Tommy Verrall 13cfa55e6c Merge pull request #5327 from nymtech/marcdbz-patch-1
Update README.md
2025-01-20 09:36:25 +01:00
Tommy Verrall 18e628acde Merge pull request #5328 from nymtech/marcdbz-patch-2
Update README.md
2025-01-20 09:35:58 +01:00
Tommy Verrall b163dba2d4 Merge pull request #5356 from nymtech/release/2025.1-reeses
2025.1-reeses to master
2025-01-20 09:35:09 +01:00
import this e67b2b020a [DOCs/operators]: Bump release version (#5362)
* bump release version

* bump version in setup guide

* PR finished
2025-01-17 18:12:12 +00:00
Fran Arbanas a0daabab03 fix version 2025-01-16 10:10:16 +01:00
Fran Arbanas b0a5b60945 update version 2025-01-16 10:06:34 +01:00
Marc 01c7ea72dd Update README.md
Fixed typo and updated operators link
2025-01-09 20:28:18 +01:00
Marc dfd1df5706 Update README.md
Updated the Tauri link
2025-01-09 20:26:04 +01:00
benedetta davico aa83501ed0 Merge pull request #5289 from nymtech/release/2024.14-crunch-patched
Merging patched crunch to master
2025-01-08 10:33:03 +01:00
Fran Arbanas 5f06414a12 bump version 2024-12-20 14:34:34 +01:00
Fran Arbanas 656838811a fix permissions 2024-12-20 14:34:10 +01:00
Fran Arbanas 7b8458630a bump version 2024-12-20 14:22:07 +01:00
Fran Arbanas cf2ab08b4d fix dockerfile 2024-12-20 14:20:43 +01:00
Fran Arbanas 2466112829 test version 2024-12-20 13:19:18 +01:00
Fran Arbanas e5306908e4 feat: add entrypoint script 2024-12-20 13:18:52 +01:00
benedetta davico b628a5f814 Merge pull request #5263 from nymtech/release/2024.14-crunch
Merge release/2024.14-crunch to master
2024-12-13 11:49:27 +01:00
benedetta davico 62045d76b3 Merge pull request #5172 from nymtech/release/2024.13-magura-patched
Update master with latest releases
2024-11-26 11:53:05 +01:00
benedetta davico f8317f5a03 Merge pull request #5025 from nymtech/release/2024.12-aero
Aero to master
2024-10-24 10:54:37 +02:00
benedetta davico c3ec970a37 Merge pull request #4928 from nymtech/release/2024.11-wedel
Release/2024.11-wedel to master
2024-09-26 08:24:53 +02:00
Jędrzej Stuczyński 5a573bc278 Merge pull request #4866 from nymtech/release/2024.10-caramello
Release/2024.10 caramello
2024-09-11 15:09:50 +01:00
benedetta davico 3d200db722 Merge pull request #4749 from nymtech/release/2024.9-topdeck-pre-develop-merge
release/2024.9 topdeck pre develop merge
2024-08-06 17:14:17 +02:00
Tommy Verrall e4139713cb Merge pull request #4724 from nymtech/release/2024.8-wispa
Merge release/2024.8-wispa into master
2024-07-24 08:25:52 +01:00
272 changed files with 7515 additions and 4682 deletions
+18 -3
View File
@@ -8,17 +8,18 @@ on:
- 'explorer-api/**'
- 'gateway/**'
- 'integrations/**'
- 'mixnode/**'
- 'nym-api/**'
- 'nym-data-observatory/**'
- 'nym-credential-proxy/**'
- 'nym-network-monitor/**'
- 'nym-node/**'
- 'nym-node-status-api/**'
- 'nym-outfox/**'
- 'nym-validator-rewarder/**'
- 'sdk/lib/**'
- 'nyx-chain-watcher/**'
- 'sdk/ffi/**'
- 'sdk/rust/**'
- 'service-providers/**'
- 'nym-browser-extension/storage/**'
- 'tools/**'
- 'wasm/**'
- 'Cargo.toml'
@@ -53,6 +54,20 @@ jobs:
override: true
components: rustfmt, clippy
# To avoid running out of disk space, skip generating debug symbols
- name: Set debug to false (unix)
if: contains(matrix.os, 'ubuntu') || contains(matrix.os, 'mac')
run: |
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
git diff
- name: Set debug to false (win)
if: contains(matrix.os, 'windows')
shell: pwsh
run: |
(Get-Content Cargo.toml) -replace '\[profile.dev\]', "`$&`ndebug = false" | Set-Content Cargo.toml
git diff
- name: Check formatting
uses: actions-rs/cargo@v1
with:
+2
View File
@@ -9,6 +9,8 @@ on:
paths:
- 'contracts/**'
- 'common/**'
- 'Cargo.lock'
- 'Cargo.toml'
- '.github/workflows/ci-contracts.yml'
jobs:
+6
View File
@@ -30,6 +30,12 @@ jobs:
override: true
components: rustfmt, clippy
- name: Set debug to false
working-directory: nym-wallet
run: |
sed -i.bak '1s/^/\[profile.dev\]\ndebug = false\n\n/' Cargo.toml
git diff
- name: Build all binaries
uses: actions-rs/cargo@v1
with:
+6
View File
@@ -1,6 +1,7 @@
name: ci-sdk-wasm
on:
workflow_dispatch:
pull_request:
paths:
- 'wasm/**'
@@ -44,6 +45,11 @@ jobs:
- name: Install wasm-bindgen-cli
run: cargo install wasm-bindgen-cli
- name: Set debug to false
run: |
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
git diff
- name: "Build"
run: make sdk-wasm-build
+47 -1
View File
@@ -4,8 +4,49 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2025.2-hu] (2025-01-28)
## [2025.3-ruta] (2025-02-10)
- Push down forget me to client configs ([#5431])
- Fix statistics shutdown ([#5426])
- Make wait_for_graceful_shutdown to be pub ([#5424])
- Upgrade to thiserror 2.0 ([#5414])
- build(deps): bump the patch-updates group across 1 directory with 9 updates ([#5406])
- Relocate a validator api function ([#5401])
- Send shutdown instead of panic when reaching max fail ([#5398])
- Change Explorer URL to new smooshed nodes ([#5396])
- reduce log severity for checking topology validity ([#5395])
- MixnetClient can send ClientRequests ([#5381])
- Fix missing path triggers for CI ([#5380])
- Uncouple storage reference for bandwidth client ([#5372])
- build(deps): bump tokio from 1.40.0 to 1.43.0 ([#5370])
- DNS resolver configuration for internal HTTP client lookups ([#5355])
- Update README.md ([#5328])
- Update README.md ([#5327])
[#5431]: https://github.com/nymtech/nym/pull/5431
[#5426]: https://github.com/nymtech/nym/pull/5426
[#5424]: https://github.com/nymtech/nym/pull/5424
[#5414]: https://github.com/nymtech/nym/pull/5414
[#5406]: https://github.com/nymtech/nym/pull/5406
[#5401]: https://github.com/nymtech/nym/pull/5401
[#5398]: https://github.com/nymtech/nym/pull/5398
[#5396]: https://github.com/nymtech/nym/pull/5396
[#5395]: https://github.com/nymtech/nym/pull/5395
[#5381]: https://github.com/nymtech/nym/pull/5381
[#5380]: https://github.com/nymtech/nym/pull/5380
[#5372]: https://github.com/nymtech/nym/pull/5372
[#5370]: https://github.com/nymtech/nym/pull/5370
[#5355]: https://github.com/nymtech/nym/pull/5355
[#5328]: https://github.com/nymtech/nym/pull/5328
[#5327]: https://github.com/nymtech/nym/pull/5327
## [2025.2-hu] (2025-02-04)
- Feature/remove double spending bloomfilter ([#5417])
- HU - Downgrade harmless log message from info to debug ([#5405])
- lower default ticket verification quorum to 0.7 ([#5404])
- Downgrade harmless log message from info to debug ([#5403])
- Redirect from mixnode page to nodes page ([#5397])
- chore :update version of chain watcher and validator rewarder ([#5394])
- bugfix: correctly handle ingore epoch roles flag ([#5390])
- bugfix: terminate mixnet socket listener on shutdown ([#5389])
@@ -40,6 +81,11 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
- NS API: add mixnet scraper ([#5200])
- build(deps): bump criterion from 0.4.0 to 0.5.1 ([#4911])
[#5417]: https://github.com/nymtech/nym/pull/5417
[#5405]: https://github.com/nymtech/nym/pull/5405
[#5404]: https://github.com/nymtech/nym/pull/5404
[#5403]: https://github.com/nymtech/nym/pull/5403
[#5397]: https://github.com/nymtech/nym/pull/5397
[#5394]: https://github.com/nymtech/nym/pull/5394
[#5390]: https://github.com/nymtech/nym/pull/5390
[#5389]: https://github.com/nymtech/nym/pull/5389
Generated
+1394 -1080
View File
File diff suppressed because it is too large Load Diff
+60 -54
View File
@@ -48,13 +48,12 @@ members = [
"common/credentials-interface",
"common/crypto",
"common/dkg",
"common/ecash-double-spending",
"common/ecash-time",
"common/execute",
"common/exit-policy",
"common/gateway-requests",
"common/gateway-storage",
"common/gateway-stats-storage",
"common/gateway-storage",
"common/http-api-client",
"common/http-api-common",
"common/inclusion-probability",
@@ -93,6 +92,7 @@ members = [
"common/topology",
"common/tun",
"common/types",
"common/verloc",
"common/wasm/client-core",
"common/wasm/storage",
"common/wasm/utils",
@@ -104,6 +104,22 @@ members = [
"explorer-api/explorer-client",
"gateway",
"integrations/bity",
"nym-api",
"nym-api/nym-api-requests",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-network-monitor",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-node-status-api/nym-node-status-client",
"nym-node/nym-node-metrics",
"nym-node/nym-node-requests",
"nym-outfox",
"nym-validator-rewarder",
"nyx-chain-watcher",
"sdk/ffi/cpp",
"sdk/ffi/go",
"sdk/ffi/shared",
@@ -112,26 +128,16 @@ members = [
"service-providers/common",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"nym-api",
"nym-api/nym-api-requests",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-network-monitor",
"nyx-chain-watcher",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-node-status-api/nym-node-status-client",
"nym-outfox",
"nym-validator-rewarder",
"tools/echo-server",
"tools/internal/ssl-inject",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/mixnet-connectivity-check",
# "tools/internal/sdk-version-bump",
"tools/internal/ssl-inject",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/nym-cli",
"tools/nym-id-cli",
@@ -143,13 +149,6 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"common/verloc",
"tools/internal/mixnet-connectivity-check",
]
default-members = [
@@ -173,7 +172,6 @@ exclude = [
"explorer",
"contracts",
"nym-wallet",
"nym-vpn/ui/src-tauri",
"cpu-cycles",
]
@@ -189,18 +187,21 @@ readme = "README.md"
[workspace.dependencies]
addr = "0.15.6"
aead = "0.5.2"
aes = "0.8.1"
aes-gcm = "0.10.1"
aes-gcm-siv = "0.11.1"
aead = "0.5.2"
ammonia = "4"
anyhow = "1.0.95"
arc-swap = "1.7.1"
argon2 = "0.5.0"
async-trait = "0.1.85"
axum-client-ip = "0.6.1"
async-trait = "0.1.86"
axum = "0.7.5"
axum-client-ip = "0.6.1"
axum-extra = "0.9.4"
axum-test = "16.2.0"
base64 = "0.22.1"
base85rs = "0.1.3"
bincode = "1.3.3"
bip39 = { version = "2.0.0", features = ["zeroize"] }
bit-vec = "0.7.0" # can we unify those?
@@ -211,17 +212,17 @@ bs58 = "0.5.1"
bytecodec = "0.4.15"
bytes = "1.7.2"
cargo_metadata = "0.18.1"
celes = "2.4.0"
celes = "2.5.0"
cfg-if = "1.0.0"
chacha20 = "0.9.0"
chacha20poly1305 = "0.10.1"
chrono = "0.4.39"
cipher = "0.4.3"
clap = "4.5.26"
clap = "4.5.30"
clap_complete = "4.5"
clap_complete_fig = "4.5"
colored = "2.0"
comfy-table = "7.1.3"
colored = "2.2"
comfy-table = "7.1.4"
console = "0.15.10"
console-subscriber = "0.1.1"
console_error_panic_hook = "0.1"
@@ -241,8 +242,9 @@ doc-comment = "0.3"
dotenvy = "0.15.6"
ecdsa = "0.16"
ed25519-dalek = "2.1"
etherparse = "0.13.0"
env_logger = "0.11.6"
envy = "0.4"
etherparse = "0.13.0"
eyre = "0.6.9"
fastrand = "2.1.1"
flate2 = "1.0.35"
@@ -250,22 +252,23 @@ futures = "0.3.31"
futures-util = "0.3"
generic-array = "0.14.7"
getrandom = "0.2.10"
getset = "0.1.3"
getset = "0.1.4"
handlebars = "3.5.5"
headers = "0.4.0"
hex = "0.4.3"
hex-literal = "0.3.3"
hickory-resolver = "0.24.3"
hkdf = "0.12.3"
hmac = "0.12.1"
http = "1"
http-body-util = "0.1"
httpcodec = "0.2.3"
human-repr = "1.1.0"
humantime = "2.1.0"
humantime-serde = "1.1.1"
human-repr = "1.1.0"
hyper = "1.4.1"
hyper = "1.6.0"
hyper-util = "0.1"
indicatif = "0.17.9"
indicatif = "0.17.11"
inquire = "0.6.2"
ip_network = "0.4.1"
ipnetwork = "0.20"
@@ -277,13 +280,12 @@ ledger-transport = "0.10.0"
ledger-transport-hid = "0.10.0"
log = "0.4"
maxminddb = "0.23.0"
rs_merkle = "1.4.2"
mime = "0.3.17"
moka = { version = "0.12", features = ["future"] }
nix = "0.27.1"
notify = "5.1.0"
okapi = "0.7.0"
once_cell = "1.20.2"
once_cell = "1.20.3"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
parking_lot = "0.12.3"
@@ -292,7 +294,7 @@ petgraph = "0.6.5"
pin-project = "1.1"
pin-project-lite = "0.2.16"
pretty_env_logger = "0.4.0"
publicsuffix = "2.2.3"
publicsuffix = "2.3.0"
quote = "1"
rand = "0.8.5"
rand_chacha = "0.3"
@@ -306,20 +308,21 @@ reqwest = { version = "0.12.4", default-features = false }
rocket = "0.5.0"
rocket_cors = "0.6.0"
rocket_okapi = "0.8.0"
rs_merkle = "1.4.2"
safer-ffi = "0.1.13"
schemars = "0.8.21"
semver = "1.0.24"
semver = "1.0.25"
serde = "1.0.217"
serde_bytes = "0.11.15"
serde_derive = "1.0"
serde_json = "1.0.135"
serde_json_path = "0.7.1"
serde_json = "1.0.138"
serde_json_path = "0.7.2"
serde_repr = "0.1"
serde_with = "3.9.0"
serde_yaml = "0.9.25"
sha2 = "0.10.8"
si-scale = "0.2.3"
sphinx-packet = "0.1.1"
sphinx-packet = "0.3.1"
sqlx = "0.7.4"
strum = "0.26"
strum_macros = "0.26"
@@ -329,27 +332,30 @@ sysinfo = "0.33.0"
tap = "1.0.1"
tar = "0.4.43"
tempfile = "3.15"
thiserror = "1.0.64"
thiserror = "2.0"
time = "0.3.37"
tokio = "1.39"
tokio = "1.43"
tokio-postgres = "0.7"
tokio-stream = "0.1.17"
tokio-test = "0.4.4"
tokio-tun = "0.11.5"
tokio-tungstenite = { version = "0.20.1" }
tokio-util = "0.7.13"
toml = "0.8.19"
tower = "0.4.13"
toml = "0.8.20"
tower = "0.5.2"
tower-http = "0.5.2"
tracing = "0.1.41"
tracing-log = "0.2"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = "0.3.19"
tracing-tree = "0.2.2"
tracing-log = "0.2"
ts-rs = "10.1.0"
tungstenite = { version = "0.20.1", default-features = false }
uniffi = "0.29.0"
uniffi_build = "0.29.0"
url = "2.5"
utoipa = "5.2"
utoipa-swagger-ui = "8.0"
utoipa-swagger-ui = "8.1"
utoipauto = "0.2"
uuid = "*"
vergen = { version = "=8.3.1", default-features = false }
@@ -385,10 +391,10 @@ cw4 = { version = "=1.1.2" }
cw-controllers = { version = "=1.1.0" }
# cosmrs-related
bip32 = { version = "0.5.2", default-features = false }
bip32 = { version = "0.5.3", default-features = false }
cosmrs = { version = "0.21.0" }
cosmrs = { version = "0.21.1" }
tendermint = "0.40.0"
tendermint-rpc = "0.40.0"
prost = { version = "0.13", default-features = false }
+2 -2
View File
@@ -13,7 +13,7 @@ The platform is composed of multiple Rust crates. Top-level executable binary cr
* `nym-client` - an executable which you can build into your own applications. Use it for interacting with Nym nodes.
* `nym-socks5-client` - a Socks5 proxy you can run on your machine and use with existing applications.
* `nym-explorer` - a (projected) block explorer and (existing) mixnet viewer.
* `nym-wallet` - a desktop wallet implemented using the [Tauri](https://tauri.studio/en/docs/about/intro) framework.
* `nym-wallet` - a desktop wallet implemented using the [Tauri](https://tauri.app)) framework.
* `nym-cli` - a tool for interacting with the network from the CLI.
<!-- coming soon
* `nym-network-monitor` - sends packets through the full system to check that they are working as expected, and stores node uptime histories as the basis of a rewards system ("mixmining" or "proof-of-mixing").
@@ -66,4 +66,4 @@ As a general approach, licensing is as follows this pattern:
- libraries and components are Apache 2.0 or MIT
- documentation is Apache 2.0 or CC0-1.0
Nym Node Operators and Validators Temrs and Conditions can be found [here](https://nym.com/terms-and-conditions/operators/v1.0.0).
Nym Node Operators and Validators Terms and Conditions can be found [here](https://nym.com/operators-validators-terms).
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.47"
version = "1.1.48"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+6 -2
View File
@@ -56,7 +56,7 @@ pub fn default_data_directory<P: AsRef<Path>>(id: P) -> PathBuf {
.join(DEFAULT_DATA_DIR)
}
#[derive(Debug, Deserialize, PartialEq, Serialize)]
#[derive(Debug, Deserialize, PartialEq, Serialize, Clone)]
pub struct Config {
#[serde(flatten)]
pub base: BaseClientConfig,
@@ -94,6 +94,10 @@ impl CliClientConfig for Config {
}
impl Config {
pub fn base(&self) -> BaseClientConfig {
self.base.clone()
}
pub fn new<S: AsRef<str>>(id: S) -> Self {
Config {
base: BaseClientConfig::new(id.as_ref(), env!("CARGO_PKG_VERSION")),
@@ -209,7 +213,7 @@ impl SocketType {
}
}
#[derive(Debug, Deserialize, PartialEq, Eq, Serialize)]
#[derive(Debug, Deserialize, PartialEq, Eq, Serialize, Clone)]
#[serde(default, deny_unknown_fields)]
pub struct Socket {
pub socket_type: SocketType,
@@ -107,5 +107,8 @@ enabled = {{ debug.stats_reporting.enabled }}
provider_address = '{{ debug.stats_reporting.provider_address }}'
reporting_interval = '{{ debug.stats_reporting.reporting_interval }}'
[debug.forget_me]
client = {{ debug.forget_me.client }}
stats = {{ debug.forget_me.stats }}
"#;
+16 -6
View File
@@ -20,7 +20,7 @@ pub use nym_sphinx::addressing::clients::Recipient;
pub mod config;
type NativeClientBuilder<'a> = BaseClientBuilder<'a, QueryHttpRpcNyxdClient, OnDiskPersistent>;
type NativeClientBuilder = BaseClientBuilder<QueryHttpRpcNyxdClient, OnDiskPersistent>;
pub struct SocketClient {
/// Client configuration options, including, among other things, packet sending rates,
@@ -32,6 +32,10 @@ pub struct SocketClient {
}
impl SocketClient {
pub fn config(&self) -> Config {
self.config.clone()
}
pub fn new(config: Config, custom_mixnet: Option<PathBuf>) -> Self {
SocketClient {
config,
@@ -45,7 +49,7 @@ impl SocketClient {
client_output: ClientOutput,
client_state: ClientState,
self_address: &Recipient,
shutdown: nym_task::TaskClient,
task_client: nym_task::TaskClient,
packet_type: PacketType,
) {
info!("Starting websocket listener...");
@@ -73,10 +77,15 @@ impl SocketClient {
shared_lane_queue_lengths,
reply_controller_sender,
Some(packet_type),
task_client.fork("websocket_handler"),
);
websocket::Listener::new(config.socket.host, config.socket.listening_port)
.start(websocket_handler, shutdown);
websocket::Listener::new(
config.socket.host,
config.socket.listening_port,
task_client.with_suffix("websocket_listener"),
)
.start(websocket_handler);
}
/// blocking version of `start_socket` method. Will run forever (or until SIGINT is sent)
@@ -108,8 +117,9 @@ impl SocketClient {
let storage = self.initialise_storage().await?;
let user_agent = nym_bin_common::bin_info!().into();
let mut base_client = BaseClientBuilder::new(&self.config.base, storage, dkg_query_client)
.with_user_agent(user_agent);
let mut base_client =
BaseClientBuilder::new(self.config().base(), storage, dkg_query_client)
.with_user_agent(user_agent);
if let Some(custom_mixnet) = &self.custom_mixnet {
base_client = base_client.with_stored_topology(custom_mixnet)?;
+1
View File
@@ -82,6 +82,7 @@ impl From<Init> for OverrideConfig {
nyxd_urls: init_config.common_args.nyxd_urls,
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
stats_reporting_address: init_config.common_args.stats_reporting_address,
forget_me: init_config.common_args.forget_me.into(),
}
}
}
+3
View File
@@ -16,6 +16,7 @@ use nym_bin_common::completions::{fig_generate, ArgShell};
use nym_client::client::Recipient;
use nym_client_core::cli_helpers::CliClient;
use nym_client_core::client::base_client::storage::migration_helpers::v1_1_33;
use nym_client_core::config::ForgetMe;
use nym_config::OptionalSet;
use std::error::Error;
use std::net::IpAddr;
@@ -106,6 +107,7 @@ pub(crate) struct OverrideConfig {
nyxd_urls: Option<Vec<url::Url>>,
enabled_credentials_mode: Option<bool>,
stats_reporting_address: Option<Recipient>,
forget_me: ForgetMe,
}
pub(crate) async fn execute(args: Cli) -> Result<(), Box<dyn Error + Send + Sync>> {
@@ -133,6 +135,7 @@ pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
args.fastmode,
)
.with_base(BaseClientConfig::with_disabled_cover_traffic, args.no_cover)
.with_base(BaseClientConfig::with_forget_me, args.forget_me)
.with_optional(Config::with_port, args.port)
.with_optional(Config::with_host, args.host)
.with_optional_custom_env_ext(
+1
View File
@@ -41,6 +41,7 @@ impl From<Run> for OverrideConfig {
nyxd_urls: run_config.common_args.nyxd_urls,
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
stats_reporting_address: run_config.common_args.stats_reporting_address,
forget_me: run_config.common_args.forget_me.into(),
}
}
}
+66 -40
View File
@@ -19,6 +19,7 @@ use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::connections::{
ConnectionCommand, ConnectionCommandSender, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::TaskClient;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::Instant;
@@ -43,9 +44,11 @@ pub(crate) struct HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
}
impl HandlerBuilder {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
msg_input: InputMessageSender,
client_connection_tx: ConnectionCommandSender,
@@ -54,6 +57,7 @@ impl HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
) -> Self {
Self {
msg_input,
@@ -63,11 +67,14 @@ impl HandlerBuilder {
lane_queue_lengths,
reply_controller_sender,
packet_type,
task_client,
}
}
// TODO: make sure we only ever have one active handler
pub fn create_active_handler(&self) -> Handler {
let mut task_client = self.task_client.fork("active_handler");
task_client.disarm();
Handler {
msg_input: self.msg_input.clone(),
client_connection_tx: self.client_connection_tx.clone(),
@@ -78,6 +85,7 @@ impl HandlerBuilder {
lane_queue_lengths: self.lane_queue_lengths.clone(),
reply_controller_sender: self.reply_controller_sender.clone(),
packet_type: self.packet_type,
task_client,
}
}
}
@@ -92,16 +100,18 @@ pub(crate) struct Handler {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
}
impl Drop for Handler {
fn drop(&mut self) {
if self
if let Err(err) = self
.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect)
.is_err()
{
error!("we failed to disconnect the receiver from the buffer! presumably the shutdown procedure has been initiated!")
if !self.task_client.is_shutdown_poll() {
error!("failed to disconnect the receiver from the buffer: {err}");
}
}
}
}
@@ -125,10 +135,23 @@ impl Handler {
};
// get the number of pending replies waiting for reply surbs
let reply_queue_length = self
let reply_queue_length = match self
.reply_controller_sender
.get_lane_queue_length(connection_id)
.await;
.await
{
Ok(length) => length,
Err(err) => {
if !self.task_client.is_shutdown_poll() {
error!(
"Failed to get reply queue length for connection {connection_id}: {err}"
);
}
// We're just going to assume that the queue is empty, and I think that's okay
// during shutdown.
0
}
};
let queue_length = base_length + reply_queue_length;
@@ -168,10 +191,11 @@ impl Handler {
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
self.msg_input
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send message to the input buffer: {err}");
}
}
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
let TransmissionLane::ConnectionId(connection_id) = lane else {
@@ -200,10 +224,11 @@ impl Handler {
let input_msg =
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
self.msg_input
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send anonymous message to the input buffer: {err}");
}
}
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
let TransmissionLane::ConnectionId(connection_id) = lane else {
@@ -227,10 +252,11 @@ impl Handler {
});
let input_msg = InputMessage::new_reply(recipient_tag, message, lane, self.packet_type);
self.msg_input
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send reply message to the input buffer: {err}");
}
}
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
let TransmissionLane::ConnectionId(connection_id) = lane else {
@@ -245,9 +271,14 @@ impl Handler {
}
fn handle_closed_connection(&self, connection_id: u64) -> Option<ServerResponse> {
self.client_connection_tx
if let Err(err) = self
.client_connection_tx
.unbounded_send(ConnectionCommand::Close(connection_id))
.unwrap();
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send close connection command: {err}");
}
}
None
}
@@ -362,11 +393,10 @@ impl Handler {
}
}
async fn listen_for_requests(
&mut self,
mut msg_receiver: ReconstructedMessagesReceiver,
mut task_client: nym_task::TaskClient,
) {
async fn listen_for_requests(&mut self, mut msg_receiver: ReconstructedMessagesReceiver) {
let mut task_client = self.task_client.fork("select");
task_client.disarm();
while !task_client.is_shutdown() {
tokio::select! {
// we can either get a client request from the websocket
@@ -415,15 +445,7 @@ impl Handler {
}
// consume self to make sure `drop` is called after this is done
pub(crate) async fn handle_connection(
mut self,
socket: TcpStream,
mut task_client: nym_task::TaskClient,
) {
// We don't want a crash in the connection handler to trigger a shutdown of the whole
// process.
task_client.disarm();
pub(crate) async fn handle_connection(mut self, socket: TcpStream) {
let ws_stream = match accept_async(socket).await {
Ok(ws_stream) => ws_stream,
Err(err) => {
@@ -436,14 +458,18 @@ impl Handler {
let (reconstructed_sender, reconstructed_receiver) = mpsc::unbounded();
// tell the buffer to start sending stuff to us
self.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverAnnounce(
reconstructed_sender,
))
.expect("the buffer request failed!");
if let Err(err) =
self.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverAnnounce(
reconstructed_sender,
))
{
if !self.task_client.is_shutdown_poll() {
error!("failed to announce the receiver to the buffer: {err}");
}
}
self.listen_for_requests(reconstructed_receiver, task_client)
.await;
self.listen_for_requests(reconstructed_receiver).await;
}
}
+11 -17
View File
@@ -3,6 +3,7 @@
use super::handler::HandlerBuilder;
use log::*;
use nym_task::TaskClient;
use std::net::IpAddr;
use std::{net::SocketAddr, process, sync::Arc};
use tokio::io::AsyncWriteExt;
@@ -22,21 +23,19 @@ impl State {
pub(crate) struct Listener {
address: SocketAddr,
state: State,
task_client: TaskClient,
}
impl Listener {
pub(crate) fn new(host: IpAddr, port: u16) -> Self {
pub(crate) fn new(host: IpAddr, port: u16, task_client: TaskClient) -> Self {
Listener {
address: SocketAddr::new(host, port),
state: State::AwaitingConnection,
task_client,
}
}
pub(crate) async fn run(
&mut self,
handler: HandlerBuilder,
mut task_client: nym_task::TaskClient,
) {
pub(crate) async fn run(&mut self, handler: HandlerBuilder) {
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
Ok(listener) => listener,
Err(err) => {
@@ -47,11 +46,11 @@ impl Listener {
let notify = Arc::new(Notify::new());
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
// When the handler finishes we check if shutdown is signalled
_ = notify.notified() => {
if task_client.is_shutdown() {
if self.task_client.is_shutdown() {
log::trace!("Websocket listener: detected shutdown after connection closed");
break;
}
@@ -60,7 +59,7 @@ impl Listener {
}
// ... but when there is no connected client at the time of shutdown being
// signalled, we handle it here.
_ = task_client.recv() => {
_ = self.task_client.recv() => {
if !self.state.is_connected() {
log::trace!("Not connected: shutting down");
break;
@@ -88,9 +87,8 @@ impl Listener {
// hanging because the executor doesn't come back here
let notify_clone = Arc::clone(&notify);
let fresh_handler = handler.create_active_handler();
let task_client_handler = task_client.clone();
tokio::spawn(async move {
fresh_handler.handle_connection(socket, task_client_handler).await;
fresh_handler.handle_connection(socket).await;
notify_clone.notify_one();
});
self.state = State::Connected;
@@ -104,13 +102,9 @@ impl Listener {
log::debug!("Websocket listener: Exiting");
}
pub(crate) fn start(
mut self,
handler: HandlerBuilder,
shutdown: nym_task::TaskClient,
) -> JoinHandle<()> {
pub(crate) fn start(mut self, handler: HandlerBuilder) -> JoinHandle<()> {
info!("Running websocket on {:?}", self.address.to_string());
tokio::spawn(async move { self.run(handler, shutdown).await })
tokio::spawn(async move { self.run(handler).await })
}
}
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.47"
version = "1.1.48"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
+1
View File
@@ -93,6 +93,7 @@ impl From<Init> for OverrideConfig {
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
outfox: false,
stats_reporting_address: init_config.common_args.stats_reporting_address,
forget_me: init_config.common_args.forget_me.into(),
}
}
}
+3 -1
View File
@@ -17,7 +17,7 @@ use nym_bin_common::completions::{fig_generate, ArgShell};
use nym_client_core::cli_helpers::CliClient;
use nym_client_core::client::base_client::storage::migration_helpers::v1_1_33;
use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup;
use nym_client_core::config::{GroupBy, TopologyStructure};
use nym_client_core::config::{ForgetMe, GroupBy, TopologyStructure};
use nym_config::OptionalSet;
use nym_sphinx::addressing::Recipient;
use nym_sphinx::params::{PacketSize, PacketType};
@@ -113,6 +113,7 @@ pub(crate) struct OverrideConfig {
enabled_credentials_mode: Option<bool>,
outfox: bool,
stats_reporting_address: Option<Recipient>,
forget_me: ForgetMe,
}
pub(crate) async fn execute(args: Cli) -> Result<(), Box<dyn Error + Send + Sync>> {
@@ -179,6 +180,7 @@ pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
BaseClientConfig::with_topology_structure,
topology_structure,
)
.with_base(BaseClientConfig::with_forget_me, args.forget_me)
.with_optional(Config::with_anonymous_replies, args.use_anonymous_replies)
.with_optional(Config::with_port, args.port)
.with_optional(Config::with_ip, args.ip)
+1
View File
@@ -65,6 +65,7 @@ impl From<Run> for OverrideConfig {
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
outfox: run_config.outfox,
stats_reporting_address: run_config.common_args.stats_reporting_address,
forget_me: run_config.common_args.forget_me.into(),
}
}
}
+4
View File
@@ -113,4 +113,8 @@ enabled = {{ core.debug.stats_reporting.enabled }}
provider_address = '{{ core.debug.stats_reporting.provider_address }}'
reporting_interval = '{{ core.debug.stats_reporting.reporting_interval }}'
[core.debug.forget_me]
client = {{ core.debug.forget_me.client }}
stats = {{ core.debug.forget_me.stats }}
"#;
@@ -60,7 +60,7 @@ impl From<IpAddr> for IpPair {
std::net::IpAddr::V4(ipv4_addr) => (ipv4_addr.octets()[2], ipv4_addr.octets()[3]),
std::net::IpAddr::V6(ipv6_addr) => (ipv6_addr.octets()[14], ipv6_addr.octets()[15]),
};
let last_bytes = (before_last_byte as u16) << 8 | last_byte as u16;
let last_bytes = ((before_last_byte as u16) << 8) | last_byte as u16;
let ipv4 = Ipv4Addr::new(
WG_TUN_DEVICE_IP_ADDRESS_V4.octets()[0],
WG_TUN_DEVICE_IP_ADDRESS_V4.octets()[1],
+1
View File
@@ -40,6 +40,7 @@ nym-crypto = { path = "../crypto" }
nym-explorer-client = { path = "../../explorer-api/explorer-client" }
nym-gateway-client = { path = "../client-libs/gateway-client" }
nym-gateway-requests = { path = "../gateway-requests" }
nym-http-api-client = { path = "../http-api-client" }
nym-metrics = { path = "../nym-metrics" }
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
@@ -145,6 +145,11 @@ impl Config {
self
}
pub fn with_forget_me(mut self, forget_me: ForgetMe) -> Self {
self.debug.forget_me = forget_me;
self
}
// TODO: this should be refactored properly
// as of 12.09.23 the below is true (not sure how this comment will rot in the future)
// medium_toggle:
@@ -719,6 +724,9 @@ pub struct DebugConfig {
/// Defines all configuration options related to stats reporting.
pub stats_reporting: StatsReporting,
/// Defines all configuration options related to the forget me flag.
pub forget_me: ForgetMe,
}
impl DebugConfig {
@@ -741,6 +749,69 @@ impl Default for DebugConfig {
topology: Default::default(),
reply_surbs: Default::default(),
stats_reporting: Default::default(),
forget_me: Default::default(),
}
}
}
#[derive(Clone, Default, Debug, Deserialize, PartialEq, Serialize, Copy)]
pub struct ForgetMe {
client: bool,
stats: bool,
}
impl From<bool> for ForgetMe {
fn from(value: bool) -> Self {
if value {
Self::new_all()
} else {
Self::new_none()
}
}
}
impl ForgetMe {
pub fn new_all() -> Self {
Self {
client: true,
stats: true,
}
}
pub fn new_client() -> Self {
Self {
client: true,
stats: false,
}
}
pub fn new_stats() -> Self {
Self {
client: false,
stats: true,
}
}
pub fn new(client: bool, stats: bool) -> Self {
Self { client, stats }
}
pub fn any(&self) -> bool {
self.client || self.stats
}
pub fn client(&self) -> bool {
self.client
}
pub fn stats(&self) -> bool {
self.stats
}
pub fn new_none() -> Self {
Self {
client: false,
stats: false,
}
}
}
@@ -182,7 +182,7 @@ impl From<ConfigV5> for Config {
maximum_reply_key_age: value.debug.reply_surbs.maximum_reply_key_age,
surb_mix_hops: value.debug.reply_surbs.surb_mix_hops,
},
stats_reporting: Default::default(),
..Default::default()
},
}
}
@@ -15,6 +15,7 @@ pub mod error;
mod manager;
mod models;
#[derive(Clone)]
pub struct OnDiskGatewaysDetails {
manager: StorageManager,
}
@@ -20,12 +20,12 @@ pub enum InMemStorageError {
MalformedGateway(#[from] BadGateway),
}
#[derive(Debug, Default)]
#[derive(Clone, Debug, Default)]
pub struct InMemGatewaysDetails {
inner: Arc<RwLock<InMemStorageInner>>,
}
#[derive(Debug, Default)]
#[derive(Clone, Debug, Default)]
struct InMemStorageInner {
active_gateway: Option<String>,
gateways: HashMap<String, GatewayRegistration>,
@@ -93,6 +93,10 @@ pub struct CommonClientInitArgs {
/// Sets the address to report statistics
#[cfg_attr(feature = "cli", clap(long, hide = true))]
pub stats_reporting_address: Option<Recipient>,
/// Sets the forget me flag
#[cfg_attr(feature = "cli", clap(long, hide = true, default_value_t = false))]
pub forget_me: bool,
}
pub struct InitResultsWithConfig<T> {
@@ -61,4 +61,8 @@ pub struct CommonClientRunArgs {
/// Sets the address to report statistics
#[cfg_attr(feature = "cli", clap(long, hide = true))]
pub stats_reporting_address: Option<Recipient>,
/// Sets the forget me flag
#[cfg_attr(feature = "cli", clap(long, hide = true, default_value_t = false))]
pub forget_me: bool,
}
@@ -1,6 +1,7 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::mix_traffic::ClientRequestSender;
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use crate::client::base_client::storage::helpers::store_client_keys;
@@ -31,13 +32,15 @@ use crate::init::{
setup_gateway,
types::{GatewaySetup, InitialisationResult},
};
use crate::{config, spawn_future, ForgetMe};
use crate::{config, spawn_future};
use futures::channel::mpsc;
use log::*;
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_config_types::ForgetMe;
use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::{encryption, identity};
use nym_crypto::hkdf::DerivationMaterial;
use nym_gateway_client::client::config::GatewayClientConfig;
use nym_gateway_client::{
AcknowledgementReceiver, GatewayClient, GatewayConfig, MixnetMessageReceiver, PacketRouter,
@@ -175,8 +178,8 @@ impl From<bool> for CredentialsToggle {
}
}
pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
config: &'a Config,
pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
config: Config,
client_store: S,
dkg_query_client: Option<C>,
@@ -191,19 +194,19 @@ pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
forget_me: ForgetMe,
derivation_material: Option<DerivationMaterial>,
}
impl<'a, C, S> BaseClientBuilder<'a, C, S>
impl<C, S> BaseClientBuilder<C, S>
where
S: MixnetClientStorage + 'static,
C: DkgQueryClient + Send + Sync + 'static,
{
pub fn new(
base_config: &'a Config,
base_config: Config,
client_store: S,
dkg_query_client: Option<C>,
) -> BaseClientBuilder<'a, C, S> {
) -> BaseClientBuilder<C, S> {
BaseClientBuilder {
config: base_config,
client_store,
@@ -216,13 +219,22 @@ where
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
connection_fd_callback: None,
forget_me: Default::default(),
derivation_material: None,
}
}
#[must_use]
pub fn with_derivation_material(
mut self,
derivation_material: Option<DerivationMaterial>,
) -> Self {
self.derivation_material = derivation_material;
self
}
#[must_use]
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
self.forget_me = forget_me.clone();
self.config.debug.forget_me = *forget_me;
self
}
@@ -298,7 +310,7 @@ where
topology_accessor: TopologyAccessor,
mix_tx: BatchMixMessageSender,
stats_tx: ClientStatsSender,
shutdown: TaskClient,
task_client: TaskClient,
) {
info!("Starting loop cover traffic stream...");
@@ -311,9 +323,10 @@ where
debug_config.traffic,
debug_config.cover_traffic,
stats_tx,
task_client,
);
stream.start_with_shutdown(shutdown);
stream.start();
}
#[allow(clippy::too_many_arguments)]
@@ -328,7 +341,7 @@ where
reply_controller_receiver: ReplyControllerReceiver,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
shutdown: TaskClient,
task_client: TaskClient,
packet_type: PacketType,
stats_tx: ClientStatsSender,
) {
@@ -346,8 +359,9 @@ where
lane_queue_lengths,
client_connection_rx,
stats_tx,
task_client,
)
.start_with_shutdown(shutdown, packet_type);
.start(packet_type);
}
// buffer controlling all messages fetched from provider
@@ -370,8 +384,9 @@ where
reply_key_storage,
reply_controller_sender,
metrics_reporter,
shutdown,
);
controller.start_with_shutdown(shutdown)
controller.start()
}
#[allow(clippy::too_many_arguments)]
@@ -560,15 +575,22 @@ where
topology_accessor: TopologyAccessor,
local_gateway: NodeIdentity,
wait_for_gateway: bool,
mut shutdown: TaskClient,
mut task_client: TaskClient,
) -> Result<(), ClientCoreError> {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The background topology refesher is not going to be started");
task_client.disarm();
}
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
task_client,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
@@ -609,15 +631,11 @@ where
}
}
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The topology refesher is not going to be started");
shutdown.disarm();
} else {
if !topology_config.disable_refreshing {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start_with_shutdown(shutdown);
topology_refresher.start();
}
Ok(())
@@ -628,30 +646,29 @@ where
user_agent: Option<UserAgent>,
client_stats_id: String,
input_sender: Sender<InputMessage>,
shutdown: TaskClient,
task_client: TaskClient,
) -> ClientStatsSender {
info!("Starting statistics control...");
StatisticsControl::create_and_start_with_shutdown(
StatisticsControl::create_and_start(
config.debug.stats_reporting,
user_agent
.map(|u| u.application)
.unwrap_or("unknown".to_string()),
client_stats_id,
input_sender.clone(),
shutdown.with_suffix("controller"),
task_client,
)
}
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown: TaskClient,
forget_me: ForgetMe,
) -> BatchMixMessageSender {
) -> (BatchMixMessageSender, ClientRequestSender) {
info!("Starting mix traffic controller...");
let (mix_traffic_controller, mix_tx) =
MixTrafficController::new(gateway_transceiver, forget_me);
mix_traffic_controller.start_with_shutdown(shutdown);
mix_tx
let (mix_traffic_controller, mix_tx, client_tx) =
MixTrafficController::new(gateway_transceiver, shutdown);
mix_traffic_controller.start();
(mix_tx, client_tx)
}
// TODO: rename it as it implies the data is persistent whilst one can use InMemBackend
@@ -686,6 +703,7 @@ where
setup_method: GatewaySetup,
key_store: &S::KeyStore,
details_store: &S::GatewaysDetailsStore,
derivation_material: Option<DerivationMaterial>,
) -> Result<InitialisationResult, ClientCoreError>
where
<S::KeyStore as KeyStore>::StorageError: Sync + Send,
@@ -695,7 +713,12 @@ where
if key_store.load_keys().await.is_err() {
info!("could not find valid client keys - a new set will be generated");
let mut rng = OsRng;
let keys = ClientKeys::generate_new(&mut rng);
let keys = if let Some(derivation_material) = derivation_material {
ClientKeys::from_master_key(&mut rng, &derivation_material)
.map_err(|_| ClientCoreError::HkdfDerivationError {})?
} else {
ClientKeys::generate_new(&mut rng)
};
store_client_keys(keys, key_store).await?;
}
@@ -717,6 +740,7 @@ where
self.setup_method,
self.client_store.key_store(),
self.client_store.gateway_details_store(),
self.derivation_material,
)
.await?;
@@ -773,7 +797,7 @@ where
);
let stats_reporter = Self::start_statistics_control(
self.config,
&self.config,
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
@@ -799,7 +823,7 @@ where
let gateway_transceiver = Self::setup_gateway_transceiver(
self.custom_gateway_transceiver,
self.config,
&self.config,
init_res,
bandwidth_controller,
&details_store,
@@ -833,10 +857,9 @@ where
// traffic stream.
// The MixTrafficController then sends the actual traffic
let message_sender = Self::start_mix_traffic_controller(
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
shutdown.fork("mix_traffic_controller"),
self.forget_me,
);
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -911,6 +934,8 @@ where
},
stats_reporter,
task_handle: shutdown,
client_request_sender,
forget_me: self.config.debug.forget_me,
})
}
}
@@ -922,6 +947,7 @@ pub struct BaseClient {
pub client_output: ClientOutputStatus,
pub client_state: ClientState,
pub stats_reporter: ClientStatsSender,
pub client_request_sender: ClientRequestSender,
pub task_handle: TaskHandle,
pub forget_me: ForgetMe,
}
@@ -65,6 +65,7 @@ pub trait MixnetClientStorage {
fn gateway_details_store(&self) -> &Self::GatewaysDetailsStore;
}
#[derive(Clone)]
pub struct Ephemeral {
key_store: InMemEphemeralKeys,
reply_store: reply_storage::Empty,
@@ -120,6 +121,7 @@ impl MixnetClientStorage for Ephemeral {
}
}
#[derive(Clone)]
#[cfg(all(
not(target_arch = "wasm32"),
feature = "fs-surb-storage",
@@ -13,6 +13,7 @@ use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::TaskClient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -64,6 +65,8 @@ where
packet_type: PacketType,
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
impl<R> Stream for LoopCoverTrafficStream<R>
@@ -110,6 +113,7 @@ impl LoopCoverTrafficStream<OsRng> {
traffic_config: config::Traffic,
cover_config: config::CoverTraffic,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let rng = OsRng;
@@ -128,6 +132,7 @@ impl LoopCoverTrafficStream<OsRng> {
secondary_packet_size: traffic_config.secondary_packet_size,
packet_type: traffic_config.packet_type,
stats_tx,
task_client,
}
}
@@ -175,7 +180,7 @@ impl LoopCoverTrafficStream<OsRng> {
}
};
let cover_message = generate_loop_cover_packet(
let cover_message = match generate_loop_cover_packet(
&mut self.rng,
topology_ref,
&self.ack_key,
@@ -184,8 +189,15 @@ impl LoopCoverTrafficStream<OsRng> {
self.cover_traffic.loop_cover_traffic_average_delay,
cover_traffic_packet_size,
self.packet_type,
)
.expect("Somehow failed to generate a loop cover message with a valid topology");
) {
Ok(cover_message) => cover_message,
Err(err) => {
warn!(
"Somehow failed to generate a loop cover message with a valid topology: {err}"
);
return;
}
};
if let Err(err) = self.mix_tx.try_send(vec![cover_message]) {
match err {
@@ -217,7 +229,7 @@ impl LoopCoverTrafficStream<OsRng> {
tokio::task::yield_now().await;
}
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
pub fn start(mut self) {
if self.cover_traffic.disable_loop_cover_traffic_stream {
// we should have never got here in the first place - the task should have never been created to begin with
// so panic and review the code that lead to this branch
@@ -231,6 +243,8 @@ impl LoopCoverTrafficStream<OsRng> {
);
self.set_next_delay(sampled);
let mut shutdown = self.task_client.fork("select");
spawn_future(async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
@@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::key_manager::persistence::KeyStore;
use nym_crypto::asymmetric::{encryption, identity};
use nym_crypto::{
asymmetric::{encryption, identity},
hkdf::{DerivationMaterial, InvalidLength},
};
use nym_gateway_requests::shared_key::{LegacySharedKeys, SharedGatewayKey, SharedSymmetricKey};
use nym_sphinx::acknowledgements::AckKey;
use rand::{CryptoRng, RngCore};
@@ -10,6 +13,7 @@ use std::sync::Arc;
use zeroize::ZeroizeOnDrop;
pub mod persistence;
mod test;
// Note: to support key rotation in the future, all keys will require adding an extra smart pointer,
// most likely an AtomicCell, or if it doesn't work as I think it does, a Mutex. Although I think
@@ -43,6 +47,24 @@ impl ClientKeys {
}
}
pub fn from_master_key<R>(
rng: &mut R,
derivation_material: &DerivationMaterial,
) -> Result<Self, InvalidLength>
where
R: RngCore + CryptoRng,
{
let secret = derivation_material.derive_secret()?;
Ok(ClientKeys {
identity_keypair: Arc::new(identity::KeyPair::from_secret(
secret,
derivation_material.index(),
)),
encryption_keypair: Arc::new(encryption::KeyPair::new(rng)),
ack_key: Arc::new(AckKey::new(rng)),
})
}
pub fn from_keys(
id_keypair: identity::KeyPair,
enc_keypair: encryption::KeyPair,
@@ -5,6 +5,7 @@ use crate::client::key_manager::ClientKeys;
use async_trait::async_trait;
use rand::{CryptoRng, RngCore};
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Mutex;
#[cfg(not(target_arch = "wasm32"))]
@@ -65,6 +66,7 @@ pub enum OnDiskKeysError {
},
}
#[derive(Clone)]
#[cfg(not(target_arch = "wasm32"))]
pub struct OnDiskKeys {
paths: ClientKeysPaths,
@@ -194,8 +196,9 @@ impl KeyStore for OnDiskKeys {
}
}
#[derive(Clone)]
pub struct InMemEphemeralKeys {
keys: Mutex<ClientKeys>,
keys: Arc<Mutex<ClientKeys>>,
}
impl InMemEphemeralKeys {
@@ -204,7 +207,7 @@ impl InMemEphemeralKeys {
R: RngCore + CryptoRng,
{
InMemEphemeralKeys {
keys: Mutex::new(ClientKeys::generate_new(rng)),
keys: Arc::new(Mutex::new(ClientKeys::generate_new(rng))),
}
}
}
@@ -0,0 +1,89 @@
#[cfg(test)]
mod tests {
use crate::client::key_manager::ClientKeys;
use nym_crypto::hkdf::DerivationMaterial;
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
#[test]
fn test_from_master_key_success() {
// Set up a deterministic RNG.
let seed = [33u8; 32];
let mut rng = ChaCha20Rng::from_seed(seed);
// Set up the derivation material.
let master_key = b"this is a secret master key";
let salt = b"unique-salt";
let derivation_material = DerivationMaterial::new(master_key, 0, salt);
// Generate ClientKeys from the master key.
let client_keys = ClientKeys::from_master_key(&mut rng, &derivation_material)
.expect("Failed to create client keys");
assert_eq!(
client_keys.identity_keypair().public_key().to_string(),
String::from("FX4Undr5LPPBA7zThWWpAKXKQTXSbW1C28PnxbCqUkU4")
);
assert_eq!(
client_keys.identity_keypair().private_key().to_string(),
String::from("6S3uMi2rU5SwyUUYCiMrF5qqdcYnEDMYLggBSvavVzEt")
);
}
#[test]
fn test_from_master_key_deterministic_identity() {
// Using identical derivation material should result in the exactly same identity keypair.
let seed = [1u8; 32];
let mut rng1 = ChaCha20Rng::from_seed(seed);
let mut rng2 = ChaCha20Rng::from_seed(seed);
let master_key = b"another secret master key";
let salt = b"deterministic-salt";
let index = 7u32;
let derivation_material = DerivationMaterial::new(master_key, index, salt);
let client_keys1 = ClientKeys::from_master_key(&mut rng1, &derivation_material)
.expect("Failed to create client keys (first instance)");
let client_keys2 = ClientKeys::from_master_key(&mut rng2, &derivation_material)
.expect("Failed to create client keys (second instance)");
assert_eq!(
client_keys1.identity_keypair().public_key().to_string(),
client_keys2.identity_keypair().public_key().to_string()
);
assert_eq!(
client_keys1.identity_keypair().private_key().to_string(),
client_keys2.identity_keypair().private_key().to_string()
);
}
#[test]
fn test_from_master_key_different_indices() {
// Changing the index should yield a different identity key.
let seed = [5u8; 32];
let mut rng = ChaCha20Rng::from_seed(seed);
let master_key = b"same secret key";
let salt = b"same-salt";
let derivation_material1 = DerivationMaterial::new(master_key, 1, salt);
let derivation_material2 = DerivationMaterial::new(master_key, 2, salt);
let client_keys1 = ClientKeys::from_master_key(&mut rng, &derivation_material1)
.expect("Failed to create client keys for index 1");
let client_keys2 = ClientKeys::from_master_key(&mut rng, &derivation_material2)
.expect("Failed to create client keys for index 2");
assert_ne!(
client_keys1.identity_keypair().public_key().to_string(),
client_keys2.identity_keypair().public_key().to_string()
);
assert_ne!(
client_keys1.identity_keypair().private_key().to_string(),
client_keys2.identity_keypair().private_key().to_string()
);
}
}
@@ -2,13 +2,18 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::{spawn_future, ForgetMe};
use crate::error::ClientCoreError;
use crate::spawn_future;
use log::*;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use transceiver::ErasedGatewayError;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
pub type ClientRequestReceiver = tokio::sync::mpsc::Receiver<ClientRequest>;
pub type ClientRequestSender = tokio::sync::mpsc::Sender<ClientRequest>;
pub mod transceiver;
@@ -23,52 +28,73 @@ pub struct MixTrafficController {
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
mix_rx: BatchMixMessageReceiver,
client_rx: ClientRequestReceiver,
// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
consecutive_gateway_failure_count: usize,
forget_me: ForgetMe,
task_client: TaskClient,
}
impl MixTrafficController {
pub fn new<T>(
gateway_transceiver: T,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender)
task_client: TaskClient,
) -> (
MixTrafficController,
BatchMixMessageSender,
ClientRequestSender,
)
where
T: GatewayTransceiver + Send + 'static,
{
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(1);
(
MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_rx: message_receiver,
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
task_client,
},
message_sender,
client_sender,
)
}
pub fn new_dynamic(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender) {
task_client: TaskClient,
) -> (
MixTrafficController,
BatchMixMessageSender,
ClientRequestSender,
) {
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(1);
(
MixTrafficController {
gateway_transceiver,
mix_rx: message_receiver,
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
task_client,
},
message_sender,
client_sender,
)
}
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
async fn on_messages(
&mut self,
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
debug_assert!(!mix_packets.is_empty());
let result = if mix_packets.len() == 1 {
@@ -80,64 +106,60 @@ impl MixTrafficController {
.await
};
match result {
Err(err) => {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
self.consecutive_gateway_failure_count += 1;
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// todo: in the future this should initiate a 'graceful' shutdown or try
// to reconnect?
panic!("failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead. Can't do anything about it yet :(")
}
}
Ok(_) => {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
result
}
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
pub fn start(mut self) {
spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
self.on_messages(mix_packets).await;
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
},
None => {
log::trace!("MixTrafficController: Stopping since channel closed");
break;
}
},
_ = shutdown.recv_with_delay() => {
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
match self.gateway_transceiver.send_client_request(client_request).await {
Ok(_) => (),
Err(e) => error!("Failed to send client request: {}", e),
};
},
None => {
log::trace!("MixTrafficController, client request channel closed");
}
},
_ = self.task_client.recv() => {
log::trace!("MixTrafficController: Received shutdown");
break;
}
}
}
shutdown.recv_timeout().await;
if self.forget_me.any() {
log::info!("Sending forget me request to the gateway");
match self
.gateway_transceiver
.send_client_request(ClientRequest::ForgetMe {
client: self.forget_me.client(),
stats: self.forget_me.stats(),
})
.await
{
Ok(_) => {
log::info!("Successfully sent forget me request to the gateway");
}
Err(err) => {
log::error!("Failed to send forget me request to the gateway: {err}");
}
}
}
self.task_client.recv_timeout().await;
log::debug!("MixTrafficController: Exiting");
});
@@ -86,7 +86,9 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
(**self).send_client_request(message).await
let _ = (**self).send_client_request(message.clone()).await?;
log::debug!("Sent client request: {:?}", message);
Ok(())
}
}
@@ -143,14 +145,7 @@ where
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.gateway_client.shared_key() {
self.gateway_client
.send_websocket_message(message.encrypt(&*shared_key)?)
.await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
}
self.gateway_client.send_client_request(message).await
}
}
@@ -11,6 +11,7 @@ use nym_sphinx::{
acknowledgements::{identifier::recover_identifier, AckKey},
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
};
use nym_task::TaskClient;
use std::sync::Arc;
/// Module responsible for listening for any data resembling acknowledgements from the network
@@ -20,6 +21,7 @@ pub(super) struct AcknowledgementListener {
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
impl AcknowledgementListener {
@@ -28,12 +30,14 @@ impl AcknowledgementListener {
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
AcknowledgementListener {
ack_key,
ack_receiver,
action_sender,
stats_tx,
task_client,
}
}
@@ -64,9 +68,14 @@ impl AcknowledgementListener {
trace!("Received {} from the mix network", frag_id);
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
self.action_sender
if let Err(err) = self
.action_sender
.unbounded_send(Action::new_remove(frag_id))
.unwrap();
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send remove action to action controller: {err}");
}
}
}
async fn handle_ack_receiver_item(&mut self, item: Vec<Vec<u8>>) {
@@ -76,10 +85,10 @@ impl AcknowledgementListener {
}
}
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
pub(super) async fn run(&mut self) {
debug!("Started AcknowledgementListener with graceful shutdown support");
while !shutdown.is_shutdown() {
while !self.task_client.is_shutdown() {
tokio::select! {
acks = self.ack_receiver.next() => match acks {
Some(acks) => self.handle_ack_receiver_item(acks).await,
@@ -88,12 +97,12 @@ impl AcknowledgementListener {
break;
}
},
_ = shutdown.recv_with_delay() => {
_ = self.task_client.recv() => {
log::trace!("AcknowledgementListener: Received shutdown");
}
}
}
shutdown.recv_timeout().await;
self.task_client.recv_timeout().await;
log::debug!("AcknowledgementListener: Exiting");
}
}
@@ -9,6 +9,7 @@ use log::*;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::Delay as SphinxDelay;
use nym_task::TaskClient;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
@@ -101,6 +102,8 @@ pub(super) struct ActionController {
/// Channel for notifying `RetransmissionRequestListener` about expired acknowledgements.
retransmission_sender: RetransmissionRequestSender,
task_client: TaskClient,
}
impl ActionController {
@@ -108,6 +111,7 @@ impl ActionController {
config: Config,
retransmission_sender: RetransmissionRequestSender,
incoming_actions: AckActionReceiver,
task_client: TaskClient,
) -> Self {
ActionController {
config,
@@ -115,6 +119,7 @@ impl ActionController {
pending_acks_timers: NonExhaustiveDelayQueue::new(),
incoming_actions,
retransmission_sender,
task_client,
}
}
@@ -216,11 +221,7 @@ impl ActionController {
}
// note: when the entry expires it's automatically removed from pending_acks_timers
fn handle_expired_ack_timer(
&mut self,
expired_ack: Expired<FragmentIdentifier>,
task_client: &mut nym_task::TaskClient,
) {
fn handle_expired_ack_timer(&mut self, expired_ack: Expired<FragmentIdentifier>) {
// I'm honestly not sure how to handle it, because getting it means other things in our
// system are already misbehaving. If we ever see this panic, then I guess we should worry
// about it. Perhaps just reschedule it at later point?
@@ -238,15 +239,13 @@ impl ActionController {
// downgrading an arc and then upgrading vs cloning is difference of 30ns vs 15ns
// so it's literally a NO difference while it might prevent us from unnecessarily
// resending data (in maybe 1 in 1 million cases, but it's something)
if self
if let Err(err) = self
.retransmission_sender
.unbounded_send(Arc::downgrade(pending_ack_data))
.is_err()
{
assert!(
task_client.is_shutdown_poll(),
"Failed to send pending ack for retransmission"
);
if !self.task_client.is_shutdown_poll() {
log::error!("Failed to send pending ack for retransmission: {err}");
}
}
} else {
// this shouldn't cause any issues but shouldn't have happened to begin with!
@@ -265,10 +264,10 @@ impl ActionController {
}
}
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
pub(super) async fn run(&mut self) {
debug!("Started ActionController with graceful shutdown support");
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
action = self.incoming_actions.next() => match action {
Some(action) => self.process_action(action),
@@ -280,19 +279,19 @@ impl ActionController {
}
},
expired_ack = self.pending_acks_timers.next() => match expired_ack {
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack, &mut shutdown),
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack),
None => {
log::trace!("ActionController: Stopping since ack channel closed");
break;
}
},
_ = shutdown.recv_with_delay() => {
_ = self.task_client.recv() => {
log::trace!("ActionController: Received shutdown");
break;
}
}
}
shutdown.recv_timeout().await;
self.task_client.recv_timeout().await;
log::debug!("ActionController: Exiting");
}
}
@@ -11,6 +11,7 @@ use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
@@ -23,6 +24,7 @@ where
input_receiver: InputMessageReceiver,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
}
impl<R> InputMessageListener<R>
@@ -36,11 +38,13 @@ where
input_receiver: InputMessageReceiver,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
) -> Self {
InputMessageListener {
input_receiver,
message_handler,
reply_controller_sender,
task_client,
}
}
@@ -63,8 +67,14 @@ where
lane: TransmissionLane,
) {
// offload reply handling to the dedicated task
self.reply_controller_sender
if let Err(err) = self
.reply_controller_sender
.send_reply(recipient_tag, data, lane)
{
if !self.task_client.is_shutdown_poll() {
error!("failed to send a reply - {err}");
}
}
}
async fn handle_plain_message(
@@ -164,10 +174,10 @@ where
};
}
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
pub(super) async fn run(&mut self) {
debug!("Started InputMessageListener with graceful shutdown support");
while !shutdown.is_shutdown() {
while !self.task_client.is_shutdown() {
tokio::select! {
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
@@ -178,12 +188,12 @@ where
break;
}
},
_ = shutdown.recv_with_delay() => {
_ = self.task_client.recv() => {
log::trace!("InputMessageListener: Received shutdown");
}
}
}
shutdown.recv_timeout().await;
self.task_client.recv_timeout().await;
log::debug!("InputMessageListener: Exiting");
}
}
@@ -24,6 +24,7 @@ use nym_sphinx::{
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use std::{
sync::{Arc, Weak},
@@ -66,7 +67,7 @@ pub(crate) enum PacketDestination {
/// Structure representing a data `Fragment` that is on-route to the specified `Recipient`
#[derive(Debug)]
pub(crate) struct PendingAcknowledgement {
pub struct PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
destination: PacketDestination,
@@ -216,6 +217,7 @@ where
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();
@@ -225,6 +227,7 @@ where
action_config,
retransmission_tx,
connectors.ack_action_receiver,
task_client.fork("action_controller"),
);
// will listen for any acks coming from the network
@@ -233,6 +236,7 @@ where
connectors.ack_receiver,
connectors.ack_action_sender.clone(),
stats_tx,
task_client.fork("acknowledgement_listener"),
);
// will listen for any new messages from the client
@@ -240,6 +244,7 @@ where
connectors.input_receiver,
message_handler.clone(),
reply_controller_sender.clone(),
task_client.fork("input_message_listener"),
);
// will listen for any ack timeouts and trigger retransmission
@@ -249,12 +254,16 @@ where
message_handler,
retransmission_rx,
reply_controller_sender,
task_client.fork("retransmission_request_listener"),
);
// will listen for events indicating the packet was sent through the network so that
// the retransmission timer should be started.
let sent_notification_listener =
SentNotificationListener::new(connectors.sent_notifier, connectors.ack_action_sender);
let sent_notification_listener = SentNotificationListener::new(
connectors.sent_notifier,
connectors.ack_action_sender,
task_client.with_suffix("sent_notification_listener"),
);
AcknowledgementController {
acknowledgement_listener,
@@ -265,53 +274,35 @@ where
}
}
pub(super) fn start_with_shutdown(
self,
shutdown: nym_task::TaskClient,
packet_type: PacketType,
) {
pub(super) fn start(self, packet_type: PacketType) {
let mut acknowledgement_listener = self.acknowledgement_listener;
let mut input_message_listener = self.input_message_listener;
let mut retransmission_request_listener = self.retransmission_request_listener;
let mut sent_notification_listener = self.sent_notification_listener;
let mut action_controller = self.action_controller;
let shutdown_handle = shutdown.fork("acknowledgement_listener");
spawn_future(async move {
acknowledgement_listener
.run_with_shutdown(shutdown_handle)
.await;
acknowledgement_listener.run().await;
debug!("The acknowledgement listener has finished execution!");
});
let shutdown_handle = shutdown.fork("input_message_listener");
spawn_future(async move {
input_message_listener
.run_with_shutdown(shutdown_handle)
.await;
input_message_listener.run().await;
debug!("The input listener has finished execution!");
});
let shutdown_handle = shutdown.fork("retransmission_request_listener");
spawn_future(async move {
retransmission_request_listener
.run_with_shutdown(shutdown_handle, packet_type)
.await;
retransmission_request_listener.run(packet_type).await;
debug!("The retransmission request listener has finished execution!");
});
let shutdown_handle = shutdown.fork("sent_notification_listener");
spawn_future(async move {
sent_notification_listener
.run_with_shutdown(shutdown_handle)
.await;
sent_notification_listener.run().await;
debug!("The sent notification listener has finished execution!");
});
spawn_future(async move {
action_controller
.run_with_shutdown(shutdown.with_suffix("action_controller"))
.await;
action_controller.run().await;
debug!("The controller has finished execution!");
});
}
@@ -14,7 +14,7 @@ use log::*;
use nym_sphinx::chunking::fragment::Fragment;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
use nym_task::connections::TransmissionLane;
use nym_task::{connections::TransmissionLane, TaskClient};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
@@ -25,6 +25,7 @@ pub(super) struct RetransmissionRequestListener<R> {
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
}
impl<R> RetransmissionRequestListener<R>
@@ -37,6 +38,7 @@ where
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
) -> Self {
RetransmissionRequestListener {
maximum_retransmissions,
@@ -44,6 +46,7 @@ where
message_handler,
request_receiver,
reply_controller_sender,
task_client,
}
}
@@ -79,9 +82,12 @@ where
if let Some(limit) = self.maximum_retransmissions {
if timed_out_ack.retransmissions >= limit {
warn!("reached maximum number of allowed retransmissions for the packet");
self.action_sender
if let Err(err) = self
.action_sender
.unbounded_send(Action::new_remove(frag_id))
.unwrap();
{
error!("Failed to send remove action to the controller: {err}");
}
return;
}
}
@@ -93,11 +99,16 @@ where
} => {
// if this is retransmission for reply, offload it to the dedicated task
// that deals with all the surbs
return self.reply_controller_sender.send_retransmission_data(
if let Err(err) = self.reply_controller_sender.send_retransmission_data(
*recipient_tag,
weak_timed_out_ack,
*extra_surb_request,
);
) {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send retransmission data to the reply controller: {err}");
}
}
return;
}
PacketDestination::KnownRecipient(recipient) => {
self.prepare_normal_retransmission_chunk(
@@ -114,9 +125,12 @@ where
Err(err) => {
warn!("Could not retransmit the packet - {err}");
// we NEED to start timer here otherwise we will have this guy permanently stuck in memory
self.action_sender
if let Err(err) = self
.action_sender
.unbounded_send(Action::new_start_timer(frag_id))
.unwrap();
{
error!("Failed to send start timer action to the controller: {err}");
}
return;
}
};
@@ -141,9 +155,14 @@ where
// is sent to the `OutQueueControl` and has gone through its internal queue
// with the additional poisson delay.
// And since Actions are executed in order `UpdateTimer` will HAVE TO be executed before `StartTimer`
self.action_sender
if let Err(err) = self
.action_sender
.unbounded_send(Action::new_update_pending_ack(frag_id, new_delay))
.unwrap();
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send update pending ack action to the controller: {err}");
}
}
// send to `OutQueueControl` to eventually send to the mix network
self.message_handler
@@ -157,14 +176,10 @@ where
.await
}
pub(super) async fn run_with_shutdown(
&mut self,
mut shutdown: nym_task::TaskClient,
packet_type: PacketType,
) {
pub(super) async fn run(&mut self, packet_type: PacketType) {
debug!("Started RetransmissionRequestListener with graceful shutdown support");
while !shutdown.is_shutdown() {
while !self.task_client.is_shutdown() {
tokio::select! {
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
@@ -173,12 +188,12 @@ where
break;
}
},
_ = shutdown.recv() => {
_ = self.task_client.recv() => {
log::trace!("RetransmissionRequestListener: Received shutdown");
}
}
}
shutdown.recv_timeout().await;
self.task_client.recv_timeout().await;
log::debug!("RetransmissionRequestListener: Exiting");
}
}
@@ -6,6 +6,7 @@ use super::SentPacketNotificationReceiver;
use futures::StreamExt;
use log::*;
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use nym_task::TaskClient;
/// Module responsible for starting up retransmission timers.
/// It is required because when we send our packet to the `real traffic stream` controlled
@@ -14,16 +15,19 @@ use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
pub(super) struct SentNotificationListener {
sent_notifier: SentPacketNotificationReceiver,
action_sender: AckActionSender,
task_client: TaskClient,
}
impl SentNotificationListener {
pub(super) fn new(
sent_notifier: SentPacketNotificationReceiver,
action_sender: AckActionSender,
task_client: TaskClient,
) -> Self {
SentNotificationListener {
sent_notifier,
action_sender,
task_client,
}
}
@@ -32,15 +36,20 @@ impl SentNotificationListener {
trace!("sent off a cover message - no need to start retransmission timer!");
return;
}
self.action_sender
if let Err(err) = self
.action_sender
.unbounded_send(Action::new_start_timer(frag_id))
.unwrap();
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send start timer action to action controller: {err}");
}
}
}
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
pub(super) async fn run(&mut self) {
debug!("Started SentNotificationListener with graceful shutdown support");
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
frag_id = self.sent_notifier.next() => match frag_id {
Some(frag_id) => {
@@ -51,13 +60,13 @@ impl SentNotificationListener {
break;
}
},
_ = shutdown.recv_with_delay() => {
_ = self.task_client.recv() => {
log::trace!("SentNotificationListener: Received shutdown");
break;
}
}
}
assert!(shutdown.is_shutdown_poll());
assert!(self.task_client.is_shutdown_poll());
log::debug!("SentNotificationListener: Exiting");
}
}
@@ -19,6 +19,7 @@ use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_task::TaskClient;
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
@@ -149,12 +150,14 @@ pub(crate) struct MessageHandler<R> {
topology_access: TopologyAccessor,
reply_key_storage: SentReplyKeys,
tag_storage: UsedSenderTags,
task_client: TaskClient,
}
impl<R> MessageHandler<R>
where
R: CryptoRng + Rng,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
config: Config,
rng: R,
@@ -163,6 +166,7 @@ where
topology_access: TopologyAccessor,
reply_key_storage: SentReplyKeys,
tag_storage: UsedSenderTags,
task_client: TaskClient,
) -> Self
where
R: Copy,
@@ -183,6 +187,7 @@ where
topology_access,
reply_key_storage,
tag_storage,
task_client,
}
}
@@ -609,15 +614,25 @@ where
}
pub(crate) fn update_ack_delay(&self, id: FragmentIdentifier, new_delay: Delay) {
self.action_sender
if let Err(err) = self
.action_sender
.unbounded_send(Action::UpdatePendingAck(id, new_delay))
.expect("action control task has died")
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send update action to the controller: {err}");
}
}
}
pub(crate) fn insert_pending_acks(&self, pending_acks: Vec<PendingAcknowledgement>) {
self.action_sender
if let Err(err) = self
.action_sender
.unbounded_send(Action::new_insert(pending_acks))
.expect("action control task has died")
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send insert action to the controller: {err}");
}
}
}
// tells real message sender (with the poisson timer) to send this to the mix network
@@ -626,9 +641,14 @@ where
messages: Vec<RealMessage>,
transmission_lane: TransmissionLane,
) {
self.real_message_sender
if let Err(err) = self
.real_message_sender
.send((messages, transmission_lane))
.await
.expect("real message receiver task (OutQueueControl) has died");
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to forward messages to the real message sender: {err}");
}
}
}
}
@@ -31,6 +31,7 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::PacketType;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_task::TaskClient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
@@ -147,6 +148,7 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let rng = OsRng;
@@ -177,6 +179,7 @@ impl RealMessagesController<OsRng> {
topology_access.clone(),
reply_storage.key_storage(),
reply_storage.tags_storage(),
task_client.fork("message_handler"),
);
let ack_control = AcknowledgementController::new(
@@ -186,6 +189,7 @@ impl RealMessagesController<OsRng> {
message_handler.clone(),
reply_controller_sender,
stats_tx.clone(),
task_client.fork("ack_control"),
);
let reply_control = ReplyController::new(
@@ -193,6 +197,7 @@ impl RealMessagesController<OsRng> {
message_handler,
reply_storage,
reply_controller_receiver,
task_client.fork("reply_controller"),
);
let out_queue_control = OutQueueControl::new(
@@ -205,6 +210,7 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths,
client_connection_rx,
stats_tx,
task_client.with_suffix("out_queue_control"),
);
RealMessagesController {
@@ -214,22 +220,20 @@ impl RealMessagesController<OsRng> {
}
}
pub fn start_with_shutdown(self, shutdown: nym_task::TaskClient, packet_type: PacketType) {
pub fn start(self, packet_type: PacketType) {
let mut out_queue_control = self.out_queue_control;
let ack_control = self.ack_control;
let mut reply_control = self.reply_control;
let shutdown_handle = shutdown.fork("out_queue_control");
spawn_future(async move {
out_queue_control.run_with_shutdown(shutdown_handle).await;
out_queue_control.run().await;
debug!("The out queue controller has finished execution!");
});
let shutdown_handle = shutdown.fork("reply_control");
spawn_future(async move {
reply_control.run_with_shutdown(shutdown_handle).await;
reply_control.run().await;
debug!("The reply controller has finished execution!");
});
ack_control.start_with_shutdown(shutdown.with_suffix("ack_control"), packet_type);
ack_control.start(packet_type);
}
}
@@ -22,6 +22,7 @@ use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, C
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -117,6 +118,8 @@ where
/// Channel used for sending metrics events (specifically `PacketStatistics` events) to the metrics tracker.
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
#[derive(Debug)]
@@ -176,6 +179,7 @@ where
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
OutQueueControl {
config,
@@ -190,6 +194,7 @@ where
client_connection_rx,
lane_queue_lengths,
stats_tx,
task_client,
}
}
@@ -198,7 +203,9 @@ where
// queues and client load rather than the required delay. So realistically we can treat
// whatever is about to happen as negligible additional delay.
trace!("{} is about to get sent to the mixnet", frag_id);
self.sent_notifier.unbounded_send(frag_id).unwrap();
if let Err(err) = self.sent_notifier.unbounded_send(frag_id) {
error!("Failed to notify about sent message: {err}");
}
}
fn loop_cover_message_size(&mut self) -> PacketSize {
@@ -271,7 +278,9 @@ where
};
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
log::error!("Failed to send: {err}");
if !self.task_client.is_shutdown_poll() {
log::error!("Failed to send: {err}");
}
} else {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
@@ -504,7 +513,7 @@ where
}
#[cfg(not(target_arch = "wasm32"))]
fn log_status(&self, shutdown: &mut nym_task::TaskClient) {
fn log_status(&self, shutdown: &mut TaskClient) {
use crate::error::ClientCoreStatusMessage;
let packets = self.transmission_buffer.total_size();
@@ -535,17 +544,19 @@ where
}
}
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
pub(super) async fn run(&mut self) {
debug!("Started OutQueueControl with graceful shutdown support");
let mut shutdown = self.task_client.fork("select");
#[cfg(not(target_arch = "wasm32"))]
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
loop {
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("OutQueueControl: Received shutdown");
break;
}
@@ -20,6 +20,7 @@ use nym_sphinx::message::{NymMessage, PlainMessage};
use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::TaskClient;
use std::collections::HashSet;
use std::sync::Arc;
@@ -152,6 +153,7 @@ struct ReceivedMessagesBuffer<R: MessageReceiver> {
inner: Arc<Mutex<ReceivedMessagesBufferInner<R>>>,
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
}
impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
@@ -160,6 +162,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
ReceivedMessagesBuffer {
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
@@ -172,6 +175,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
})),
reply_key_storage,
reply_controller_sender,
task_client,
}
}
@@ -257,11 +261,15 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
};
self.reply_controller_sender.send_additional_surbs(
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
msg.sender_tag,
reply_surbs,
from_surb_request,
)
) {
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
}
}
reconstructed
}
@@ -276,8 +284,14 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
ReplyMessageContent::Data { message } => reconstructed.push(message.into()),
ReplyMessageContent::SurbRequest { recipient, amount } => {
debug!("received request for {amount} additional reply SURBs from {recipient}");
self.reply_controller_sender
.send_additional_surbs_request(*recipient, amount);
if let Err(err) = self
.reply_controller_sender
.send_additional_surbs_request(*recipient, amount)
{
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
}
}
}
}
@@ -399,16 +413,19 @@ pub enum ReceivedBufferMessage {
struct RequestReceiver<R: MessageReceiver> {
received_buffer: ReceivedMessagesBuffer<R>,
query_receiver: ReceivedBufferRequestReceiver,
task_client: TaskClient,
}
impl<R: MessageReceiver> RequestReceiver<R> {
fn new(
received_buffer: ReceivedMessagesBuffer<R>,
query_receiver: ReceivedBufferRequestReceiver,
task_client: TaskClient,
) -> Self {
RequestReceiver {
received_buffer,
query_receiver,
task_client,
}
}
@@ -423,12 +440,12 @@ impl<R: MessageReceiver> RequestReceiver<R> {
}
}
async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
async fn run(&mut self) {
debug!("Started RequestReceiver with graceful shutdown support");
while !shutdown.is_shutdown() {
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv_with_delay() => {
_ = self.task_client.recv() => {
log::trace!("RequestReceiver: Received shutdown");
}
request = self.query_receiver.next() => {
@@ -441,7 +458,7 @@ impl<R: MessageReceiver> RequestReceiver<R> {
},
}
}
shutdown.recv_timeout().await;
self.task_client.recv().await;
log::debug!("RequestReceiver: Exiting");
}
}
@@ -449,25 +466,25 @@ impl<R: MessageReceiver> RequestReceiver<R> {
struct FragmentedMessageReceiver<R: MessageReceiver> {
received_buffer: ReceivedMessagesBuffer<R>,
mixnet_packet_receiver: MixnetMessageReceiver,
task_client: TaskClient,
}
impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
fn new(
received_buffer: ReceivedMessagesBuffer<R>,
mixnet_packet_receiver: MixnetMessageReceiver,
task_client: TaskClient,
) -> Self {
FragmentedMessageReceiver {
received_buffer,
mixnet_packet_receiver,
task_client,
}
}
async fn run_with_shutdown(
&mut self,
mut shutdown: nym_task::TaskClient,
) -> Result<(), MessageRecoveryError> {
async fn run(&mut self) -> Result<(), MessageRecoveryError> {
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
while !shutdown.is_shutdown() {
while !self.task_client.is_shutdown() {
tokio::select! {
new_messages = self.mixnet_packet_receiver.next() => {
if let Some(new_messages) = new_messages {
@@ -477,12 +494,12 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
break;
}
},
_ = shutdown.recv_with_delay() => {
_ = self.task_client.recv_with_delay() => {
log::trace!("FragmentedMessageReceiver: Received shutdown");
}
}
}
shutdown.recv_timeout().await;
self.task_client.recv_timeout().await;
log::debug!("FragmentedMessageReceiver: Exiting");
Ok(())
}
@@ -501,41 +518,42 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
metrics_reporter: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let received_buffer = ReceivedMessagesBuffer::new(
local_encryption_keypair,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
task_client.fork("received_messages_buffer"),
);
ReceivedMessagesBufferController {
fragmented_message_receiver: FragmentedMessageReceiver::new(
received_buffer.clone(),
mixnet_packet_receiver,
task_client.fork("fragmented_message_receiver"),
),
request_receiver: RequestReceiver::new(
received_buffer,
query_receiver,
task_client.with_suffix("request_receiver"),
),
request_receiver: RequestReceiver::new(received_buffer, query_receiver),
}
}
pub fn start_with_shutdown(self, shutdown: nym_task::TaskClient) {
pub fn start(self) {
let mut fragmented_message_receiver = self.fragmented_message_receiver;
let mut request_receiver = self.request_receiver;
let shutdown_handle = shutdown.fork("fragmented_message_receiver");
spawn_future(async move {
match fragmented_message_receiver
.run_with_shutdown(shutdown_handle)
.await
{
match fragmented_message_receiver.run().await {
Ok(_) => {}
Err(e) => error!("{e}"),
}
});
spawn_future(async move {
request_receiver
.run_with_shutdown(shutdown.with_suffix("request_receiver"))
.await;
request_receiver.run().await;
});
}
}
@@ -12,6 +12,7 @@ use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurb;
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_task::connections::{ConnectionId, TransmissionLane};
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use std::cmp::{max, min};
use std::collections::btree_map::Entry;
@@ -68,6 +69,9 @@ pub struct ReplyController<R> {
message_handler: MessageHandler<R>,
full_reply_storage: CombinedReplyStorage,
// Listen for shutdown signals
task_client: TaskClient,
}
impl<R> ReplyController<R>
@@ -79,6 +83,7 @@ where
message_handler: MessageHandler<R>,
full_reply_storage: CombinedReplyStorage,
request_receiver: ReplyControllerReceiver,
task_client: TaskClient,
) -> Self {
ReplyController {
config,
@@ -87,6 +92,7 @@ where
pending_retransmissions: HashMap::new(),
message_handler,
full_reply_storage,
task_client,
}
}
@@ -846,9 +852,11 @@ where
// todo!()
// }
pub(crate) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
pub(crate) async fn run(&mut self) {
debug!("Started ReplyController with graceful shutdown support");
let mut shutdown = self.task_client.fork("select");
let polling_rate = Duration::from_secs(5);
let mut stale_inspection = new_interval_stream(polling_rate);
@@ -860,7 +868,7 @@ where
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv_with_delay() => {
_ = shutdown.recv() => {
log::trace!("ReplyController: Received shutdown");
},
req = self.request_receiver.next() => match req {
@@ -15,6 +15,27 @@ pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerR
(tx.into(), rx)
}
#[derive(Debug, thiserror::Error)]
pub enum ReplyControllerSenderError {
#[error("failed to send retransmission data to reply controller")]
SendRetransmissionData(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("failed to send reply to reply controller")]
SendReply(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("failed to send additional surbs to reply controller")]
AdditionalSurbs(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("failed to send additional surbs request to reply controller")]
AdditionalSurbsRequest(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("failed to request lane queue length from reply controller")]
LaneQueueLength(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("response channel was dropped before we could receive the response")]
ResponseChannelDropped(#[source] oneshot::Canceled),
}
#[derive(Debug, Clone)]
pub struct ReplyControllerSender(mpsc::UnboundedSender<ReplyControllerMessage>);
@@ -30,14 +51,14 @@ impl ReplyControllerSender {
recipient: AnonymousSenderTag,
timed_out_ack: Weak<PendingAcknowledgement>,
extra_surb_request: bool,
) {
) -> Result<(), ReplyControllerSenderError> {
self.0
.unbounded_send(ReplyControllerMessage::RetransmitReply {
recipient,
timed_out_ack,
extra_surb_request,
})
.expect("ReplyControllerReceiver has died!")
.map_err(ReplyControllerSenderError::SendRetransmissionData)
}
pub(crate) fn send_reply(
@@ -45,14 +66,14 @@ impl ReplyControllerSender {
recipient: AnonymousSenderTag,
message: Vec<u8>,
lane: TransmissionLane,
) {
) -> Result<(), ReplyControllerSenderError> {
self.0
.unbounded_send(ReplyControllerMessage::SendReply {
recipient,
message,
lane,
})
.expect("ReplyControllerReceiver has died!")
.map_err(ReplyControllerSenderError::SendReply)
}
pub(crate) fn send_additional_surbs(
@@ -60,42 +81,47 @@ impl ReplyControllerSender {
sender_tag: AnonymousSenderTag,
reply_surbs: Vec<ReplySurb>,
from_surb_request: bool,
) {
) -> Result<(), ReplyControllerSenderError> {
self.0
.unbounded_send(ReplyControllerMessage::AdditionalSurbs {
sender_tag,
reply_surbs,
from_surb_request,
})
.expect("ReplyControllerReceiver has died!")
.map_err(ReplyControllerSenderError::AdditionalSurbs)
}
pub(crate) fn send_additional_surbs_request(&self, recipient: Recipient, amount: u32) {
pub(crate) fn send_additional_surbs_request(
&self,
recipient: Recipient,
amount: u32,
) -> Result<(), ReplyControllerSenderError> {
self.0
.unbounded_send(ReplyControllerMessage::AdditionalSurbsRequest {
recipient: Box::new(recipient),
amount,
})
.expect("ReplyControllerReceiver has died!")
.map_err(ReplyControllerSenderError::AdditionalSurbsRequest)
}
pub async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> usize {
pub async fn get_lane_queue_length(
&self,
connection_id: ConnectionId,
) -> Result<usize, ReplyControllerSenderError> {
let (response_tx, response_rx) = oneshot::channel();
self.0
if let Err(err) = self
.0
.unbounded_send(ReplyControllerMessage::LaneQueueLength {
connection_id,
response_channel: response_tx,
})
.expect("ReplyControllerReceiver has died!");
match response_rx.await {
Ok(length) => length,
Err(_) => {
error!("The reply controller has dropped our response channel!");
// TODO: should we panic here instead? this message implies something weird and unrecoverable has happened
0
}
{
return Err(ReplyControllerSenderError::LaneQueueLength(err));
}
response_rx
.await
.map_err(ReplyControllerSenderError::ResponseChannelDropped)
}
}
@@ -110,7 +136,10 @@ impl ReplyQueueLengths {
}
}
pub async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> usize {
pub async fn get_lane_queue_length(
&self,
connection_id: ConnectionId,
) -> Result<usize, ReplyControllerSenderError> {
self.reply_controller_sender
.get_lane_queue_length(connection_id)
.await
@@ -120,7 +149,7 @@ impl ReplyQueueLengths {
pub(crate) type ReplyControllerReceiver = mpsc::UnboundedReceiver<ReplyControllerMessage>;
#[derive(Debug)]
pub(crate) enum ReplyControllerMessage {
pub enum ReplyControllerMessage {
RetransmitReply {
recipient: AnonymousSenderTag,
timed_out_ack: Weak<PendingAcknowledgement>,
@@ -22,7 +22,7 @@ use nym_sphinx::addressing::Recipient;
use nym_statistics_common::clients::{
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
};
use nym_task::connections::TransmissionLane;
use nym_task::{connections::TransmissionLane, TaskClient};
use std::time::Duration;
use crate::{
@@ -51,6 +51,9 @@ pub(crate) struct StatisticsControl {
/// Config for stats reporting (enabled, address, interval)
reporting_config: StatsReporting,
/// Task client for listening for shutdown
task_client: TaskClient,
}
impl StatisticsControl {
@@ -59,19 +62,24 @@ impl StatisticsControl {
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
task_client: TaskClient,
) -> (Self, ClientStatsSender) {
let (stats_tx, stats_rx) = tokio::sync::mpsc::unbounded_channel();
let stats = ClientStatsController::new(client_stats_id, client_type);
let mut task_client_stats_sender = task_client.fork("stats_sender");
task_client_stats_sender.disarm();
(
StatisticsControl {
stats,
stats_rx,
report_tx,
reporting_config,
task_client,
},
ClientStatsSender::new(Some(stats_tx)),
ClientStatsSender::new(Some(stats_tx), task_client_stats_sender),
)
}
@@ -91,7 +99,7 @@ impl StatisticsControl {
}
}
async fn run_with_shutdown(&mut self, mut task_client: nym_task::TaskClient) {
async fn run(&mut self) {
log::debug!("Started StatisticsControl with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
@@ -121,8 +129,13 @@ impl StatisticsControl {
let mut snapshot_interval =
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
log::trace!("StatisticsControl: Received shutdown");
break;
},
stats_event = self.stats_rx.recv() => match stats_event {
Some(stats_event) => self.stats.handle_event(stats_event),
None => {
@@ -144,34 +157,34 @@ impl StatisticsControl {
}
_ = local_report_interval.next() => {
self.stats.local_report(&mut task_client);
self.stats.local_report(&mut self.task_client);
}
_ = task_client.recv_with_delay() => {
log::trace!("StatisticsControl: Received shutdown");
break;
},
}
}
task_client.recv_timeout().await;
log::debug!("StatisticsControl: Exiting");
}
pub(crate) fn start_with_shutdown(mut self, task_client: nym_task::TaskClient) {
pub(crate) fn start(mut self) {
spawn_future(async move {
self.run_with_shutdown(task_client).await;
self.run().await;
})
}
pub(crate) fn create_and_start_with_shutdown(
pub(crate) fn create_and_start(
reporting_config: StatsReporting,
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
task_client: nym_task::TaskClient,
task_client: TaskClient,
) -> ClientStatsSender {
let (controller, sender) =
Self::create(reporting_config, client_type, client_stats_id, report_tx);
controller.start_with_shutdown(task_client);
let (controller, sender) = Self::create(
reporting_config,
client_type,
client_stats_id,
report_tx,
task_client,
);
controller.start();
sender
}
}
@@ -6,6 +6,7 @@ pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
use futures::StreamExt;
use log::*;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_task::TaskClient;
use nym_topology::NymTopologyError;
use std::time::Duration;
@@ -43,6 +44,8 @@ pub struct TopologyRefresher {
refresh_rate: Duration,
consecutive_failure_count: usize,
task_client: TaskClient,
}
impl TopologyRefresher {
@@ -50,12 +53,14 @@ impl TopologyRefresher {
cfg: TopologyRefresherConfig,
topology_accessor: TopologyAccessor,
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
task_client: TaskClient,
) -> Self {
TopologyRefresher {
topology_provider,
topology_accessor,
refresh_rate: cfg.refresh_rate,
consecutive_failure_count: 0,
task_client,
}
}
@@ -142,7 +147,7 @@ impl TopologyRefresher {
}
}
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
pub fn start(mut self) {
spawn_future(async move {
debug!("Started TopologyRefresher with graceful shutdown support");
@@ -155,17 +160,17 @@ impl TopologyRefresher {
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
while !shutdown.is_shutdown() {
while !self.task_client.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = shutdown.recv() => {
_ = self.task_client.recv() => {
log::trace!("TopologyRefresher: Received shutdown");
},
}
}
shutdown.recv_timeout().await;
self.task_client.recv_timeout().await;
log::debug!("TopologyRefresher: Exiting");
})
}
+13
View File
@@ -36,6 +36,13 @@ pub enum ClientCoreError {
#[error("no gateway with id: {0}")]
NoGatewayWithId(String),
#[error("Invalid URL: {0}")]
InvalidUrl(String),
#[cfg(not(target_arch = "wasm32"))]
#[error("resolution failed: {0}")]
ResolutionFailed(#[from] nym_http_api_client::HickoryDnsError),
#[error("no gateways on network")]
NoGatewaysOnNetwork,
@@ -96,6 +103,9 @@ pub enum ClientCoreError {
#[error("timed out while trying to establish gateway connection")]
GatewayConnectionTimeout,
#[error("failed to forward mix messages to gateway")]
GatewayFailedToForwardMessages,
#[error("no ping measurements for the gateway ({identity}) performed")]
NoGatewayMeasurements { identity: String },
@@ -212,6 +222,9 @@ pub enum ClientCoreError {
"fresh registration with gateway {gateway_id} somehow requires an additional key upgrade!"
)]
UnexpectedKeyUpgrade { gateway_id: String },
#[error("failed to derive keys from master key")]
HkdfDerivationError {},
}
/// Set of messages that the client can send to listeners via the task manager
+4 -3
View File
@@ -15,6 +15,9 @@ use std::{sync::Arc, time::Duration};
use tungstenite::Message;
use url::Url;
#[cfg(not(target_arch = "wasm32"))]
use crate::init::websockets::connect_async;
use nym_topology::NodeId;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
@@ -23,8 +26,6 @@ use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
@@ -132,7 +133,7 @@ pub async fn gateways_for_init<R: Rng>(
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await {
Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout),
Ok(Err(conn_failure)) => Err(conn_failure.into()),
Ok(Err(conn_failure)) => Err(conn_failure),
Ok(Ok((stream, _))) => Ok(stream),
}
}
+2
View File
@@ -26,6 +26,8 @@ use serde::Serialize;
pub mod helpers;
pub mod types;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod websockets;
// helpers for error wrapping
+44
View File
@@ -0,0 +1,44 @@
use crate::error::ClientCoreError;
use nym_http_api_client::HickoryDnsResolver;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::handshake::client::Response;
use url::{Host, Url};
use std::net::SocketAddr;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), ClientCoreError> {
let resolver = HickoryDnsResolver::default();
let uri = Url::parse(endpoint).map_err(|_| ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
let port: u16 = uri.port_or_known_default().unwrap_or(443);
let host = uri
.host()
.ok_or(ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
// the default std resolve
let sock_addrs: Vec<SocketAddr> = match host {
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Domain(domain) => {
// Do a DNS lookup for the domain using our custom DNS resolver
resolver
.resolve_str(domain)
.await?
.into_iter()
.map(|a| SocketAddr::new(a, port))
.collect()
}
};
let stream = TcpStream::connect(&sock_addrs[..]).await?;
tokio_tungstenite::client_async_tls(endpoint, stream)
.await
.map_err(Into::into)
}
-45
View File
@@ -33,48 +33,3 @@ where
{
tokio::spawn(future);
}
#[derive(Clone, Default, Debug)]
pub struct ForgetMe {
client: bool,
stats: bool,
}
impl ForgetMe {
pub fn new_all() -> Self {
Self {
client: true,
stats: true,
}
}
pub fn new_client() -> Self {
Self {
client: true,
stats: false,
}
}
pub fn new_stats() -> Self {
Self {
client: false,
stats: true,
}
}
pub fn new(client: bool, stats: bool) -> Self {
Self { client, stats }
}
pub fn any(&self) -> bool {
self.client || self.stats
}
pub fn client(&self) -> bool {
self.client
}
pub fn stats(&self) -> bool {
self.stats
}
}
@@ -22,7 +22,7 @@ mod error;
mod manager;
mod models;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Backend {
temporary_old_path: Option<PathBuf>,
database_path: PathBuf,
@@ -19,7 +19,7 @@ pub mod fs_backend;
#[error("no information provided")]
pub struct UndefinedError;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Empty {
// we need to keep 'basic' metadata here to "load" the CombinedReplyStorage
pub min_surb_threshold: usize,
@@ -27,6 +27,7 @@ nym-credential-storage = { path = "../../credential-storage" }
nym-credentials-interface = { path = "../../credentials-interface" }
nym-crypto = { path = "../../crypto" }
nym-gateway-requests = { path = "../../gateway-requests" }
nym-http-api-client = { path = "../../http-api-client" }
nym-network-defaults = { path = "../../network-defaults" }
nym-sphinx = { path = "../../nymsphinx" }
nym-statistics-common = { path = "../../statistics" }
@@ -40,8 +40,6 @@ use url::Url;
use std::os::fd::RawFd;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
#[cfg(not(unix))]
use std::os::raw::c_int as RawFd;
@@ -53,6 +51,11 @@ use zeroize::Zeroizing;
pub mod config;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod websockets;
#[cfg(not(target_arch = "wasm32"))]
use websockets::connect_async;
pub struct GatewayConfig {
pub gateway_identity: identity::PublicKey,
@@ -201,15 +204,7 @@ impl<C, St> GatewayClient<C, St> {
"Attemting to establish connection to gateway at: {}",
self.gateway_address
);
let ws_stream = match connect_async(&self.gateway_address).await {
Ok((ws_stream, _)) => ws_stream,
Err(error) => {
return Err(GatewayClientError::NetworkConnectionFailed {
address: self.gateway_address.clone(),
source: error,
})
}
};
let (ws_stream, _) = connect_async(&self.gateway_address).await?;
self.connection = SocketState::Available(Box::new(ws_stream));
@@ -271,6 +266,19 @@ impl<C, St> GatewayClient<C, St> {
}
}
pub async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.shared_key() {
let encrypted = message.encrypt(&*shared_key)?;
Box::pin(self.send_websocket_message(encrypted)).await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
}
}
async fn read_control_response(&mut self) -> Result<ServerResponse, GatewayClientError> {
// we use the fact that all request responses are Message::Text and only pushed
// sphinx packets are Message::Binary
@@ -1045,7 +1053,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
connection: SocketState::NotConnected,
packet_router,
bandwidth_controller: None,
stats_reporter: ClientStatsSender::new(None),
stats_reporter: ClientStatsSender::new(None, task_client.clone()),
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback: None,
@@ -0,0 +1,53 @@
use crate::error::GatewayClientError;
use nym_http_api_client::HickoryDnsResolver;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::handshake::client::Response;
use url::{Host, Url};
use std::net::SocketAddr;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
let resolver = HickoryDnsResolver::default();
let uri =
Url::parse(endpoint).map_err(|_| GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
let port: u16 = uri.port_or_known_default().unwrap_or(443);
let host = uri
.host()
.ok_or(GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
// the default std resolve
let sock_addrs: Vec<SocketAddr> = match host {
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Domain(domain) => {
// Do a DNS lookup for the domain using our custom DNS resolver
resolver
.resolve_str(domain)
.await?
.into_iter()
.map(|a| SocketAddr::new(a, port))
.collect()
}
};
let stream = TcpStream::connect(&sock_addrs[..]).await.map_err(|error| {
GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: error.into(),
}
})?;
tokio_tungstenite::client_async_tls(endpoint, stream)
.await
.map_err(|error| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: error,
})
}
@@ -44,7 +44,11 @@ pub enum GatewayClientError {
NetworkConnectionFailed { address: String, source: WsError },
#[error("Invalid URL: {0}")]
InvalidURL(String),
InvalidUrl(String),
#[cfg(not(target_arch = "wasm32"))]
#[error("resolution failed: {0}")]
ResolutionFailed(#[from] nym_http_api_client::HickoryDnsError),
#[error("No shared key was provided or obtained")]
NoSharedKeyAvailable,
@@ -11,8 +11,7 @@ use crate::{
use nym_api_requests::ecash::models::{
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
BatchRedeemTicketsBody, EcashBatchTicketRedemptionResponse, EcashTicketVerificationResponse,
IssuedTicketbooksChallengeResponse, IssuedTicketbooksForResponse, SpentCredentialsResponse,
VerifyEcashTicketBody,
IssuedTicketbooksChallengeResponse, IssuedTicketbooksForResponse, VerifyEcashTicketBody,
};
use nym_api_requests::ecash::{
BlindSignRequestBody, BlindedSignatureResponse, PartialCoinIndicesSignatureResponse,
@@ -647,13 +646,6 @@ impl NymApiClient {
.await?)
}
#[deprecated]
pub async fn spent_credentials_filter(
&self,
) -> Result<SpentCredentialsResponse, ValidatorClientError> {
Ok(self.nym_api.double_spending_filter_v1().await?)
}
pub async fn partial_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
@@ -31,6 +31,7 @@ pub use nym_api_requests::{
StakeSaturationResponse, UptimeResponse,
},
nym_nodes::{CachedNodesResponse, SkimmedNode},
NymNetworkDetailsResponse,
};
pub use nym_coconut_dkg_common::types::EpochId;
use nym_contracts_common::IdentityKey;
@@ -849,20 +850,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn double_spending_filter_v1(&self) -> Result<SpentCredentialsResponse, NymAPIError> {
self.get_json(
&[
routes::API_VERSION,
routes::ECASH_ROUTES,
routes::DOUBLE_SPENDING_FILTER_V1,
],
NO_PARAMS,
)
.await
}
#[instrument(level = "debug", skip(self))]
async fn partial_expiration_date_signatures(
&self,
@@ -1027,6 +1014,15 @@ pub trait NymApiClientExt: ApiClient {
)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_network_details(&self) -> Result<NymNetworkDetailsResponse, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::NETWORK, routes::DETAILS],
NO_PARAMS,
)
.await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -13,8 +13,6 @@ pub const DETAILED: &str = "detailed";
pub const DETAILED_UNFILTERED: &str = "detailed-unfiltered";
pub const ACTIVE: &str = "active";
pub const REWARDED: &str = "rewarded";
pub const DOUBLE_SPENDING_FILTER_V1: &str = "double-spending-filter-v1";
pub const ECASH_ROUTES: &str = "ecash";
pub use ecash::*;
@@ -67,6 +65,8 @@ pub const STAKE_SATURATION: &str = "stake-saturation";
pub const INCLUSION_CHANCE: &str = "inclusion-probability";
pub const SUBMIT_GATEWAY: &str = "submit-gateway-monitoring-results";
pub const SUBMIT_NODE: &str = "submit-node-monitoring-results";
pub const PERFORMANCE: &str = "performance";
pub const SERVICE_PROVIDERS: &str = "services";
pub const DETAILS: &str = "details";
pub const NETWORK: &str = "network";
@@ -8,81 +8,81 @@ use thiserror::Error;
#[derive(Error, Debug, PartialEq)]
pub enum VestingContractError {
#[error("VESTING ({}): {0}", line!())]
#[error("VESTING ({l}): {0}", l = line!())]
Std(#[from] StdError),
#[error("VESTING: {0}")]
OverflowError(#[from] OverflowError),
#[error("VESTING ({}): Account does not exist - {0}", line!())]
#[error("VESTING ({l}): Account does not exist - {0}", l = line!())]
NoAccountForAddress(String),
#[error("VESTING ({}): Only admin can perform this action, {0} is not admin", line!())]
#[error("VESTING ({l}): Only admin can perform this action, {0} is not admin", l = line!())]
NotAdmin(String),
#[error("VESTING ({}): Balance not found for existing account ({0}), this is a bug", line!())]
#[error("VESTING ({l}): Balance not found for existing account ({0}), this is a bug", l = line!())]
NoBalanceForAddress(String),
#[error("VESTING ({}): Insufficient balance for address {0} -> {1}", line!())]
#[error("VESTING ({l}): Insufficient balance for address {0} -> {1}", l = line!())]
InsufficientBalance(String, u128),
#[error("VESTING ({}): Insufficient spendable balance for address {0} -> {1}", line!())]
#[error("VESTING ({l}): Insufficient spendable balance for address {0} -> {1}", l = line!())]
InsufficientSpendable(String, u128),
#[error(
"VESTING ({}):Only delegation owner can perform delegation actions, {0} is not the delegation owner"
, line!())]
"VESTING ({l}):Only delegation owner can perform delegation actions, {0} is not the delegation owner"
, l = line!())]
NotDelegate(String),
#[error("VESTING ({}): Total vesting amount is inprobably low -> {0}, this is likely an error", line!())]
#[error("VESTING ({l}): Total vesting amount is inprobably low -> {0}, this is likely an error", l = line!())]
ImprobableVestingAmount(u128),
#[error("VESTING ({}): Address {0} has already bonded a node", line!())]
#[error("VESTING ({l}): Address {0} has already bonded a node", l = line!())]
AlreadyBonded(String),
#[error("VESTING ({}): Received empty funds vector", line!())]
#[error("VESTING ({l}): Received empty funds vector", l = line!())]
EmptyFunds,
#[error("VESTING ({}): Received wrong denom: {0}, expected {1}", line!())]
#[error("VESTING ({l}): Received wrong denom: {0}, expected {1}", l = line!())]
WrongDenom(String, String),
#[error("VESTING ({}): Received multiple denoms, expected 1", line!())]
#[error("VESTING ({l}): Received multiple denoms, expected 1", l = line!())]
MultipleDenoms,
#[error("VESTING ({}): No delegations found for account {0}, mix_identity {1}", line!())]
#[error("VESTING ({l}): No delegations found for account {0}, mix_identity {1}", l = line!())]
NoSuchDelegation(Addr, NodeId),
#[error("VESTING ({}): Only mixnet contract can perform this operation, got {0}", line!())]
#[error("VESTING ({l}): Only mixnet contract can perform this operation, got {0}", l = line!())]
NotMixnetContract(Addr),
#[error("VESTING ({}): Calculation underflowed", line!())]
#[error("VESTING ({l}): Calculation underflowed", l = line!())]
Underflow,
#[error("VESTING ({}): No bond found for account {0}", line!())]
#[error("VESTING ({l}): No bond found for account {0}", l = line!())]
NoBondFound(String),
#[error("VESTING: Attempted to reduce mixnode bond pledge below zero! The current pledge is {current} and we attempted to reduce it by {decrease_by}.")]
InvalidBondPledgeReduction { current: Coin, decrease_by: Coin },
#[error("VESTING ({}): Action can only be executed by account owner -> {0}", line!())]
#[error("VESTING ({l}): Action can only be executed by account owner -> {0}", l = line!())]
NotOwner(String),
#[error("VESTING ({}): Invalid address: {0}", line!())]
#[error("VESTING ({l}): Invalid address: {0}", l = line!())]
InvalidAddress(String),
#[error("VESTING ({}): Account already exists: {0}", line!())]
#[error("VESTING ({l}): Account already exists: {0}", l = line!())]
AccountAlreadyExists(String),
#[error("VESTING ({}): Staking account already exists: {0}", line!())]
#[error("VESTING ({l}): Staking account already exists: {0}", l = line!())]
StakingAccountAlreadyExists(String),
#[error("VESTING ({}): Too few coins sent for vesting account creation, sent {sent}, need at least {need}", line!())]
#[error("VESTING ({l}): Too few coins sent for vesting account creation, sent {sent}, need at least {need}", l = line!())]
MinVestingFunds { sent: u128, need: u128 },
#[error("VESTING ({}): Maximum amount of locked coins has already been pledged: {current}, cap is {cap}", line!())]
#[error("VESTING ({l}): Maximum amount of locked coins has already been pledged: {current}, cap is {cap}", l = line!())]
LockedPledgeCapReached { current: Uint128, cap: Uint128 },
#[error("VESTING: ({}: Account owned by {owner} has unpopulated vesting periods!", line!())]
#[error("VESTING: ({l}: Account owned by {owner} has unpopulated vesting periods!", l = line!())]
UnpopulatedVestingPeriods { owner: Addr },
#[error("VESTING: Vesting account associated with {0} already exists, only addresses with not existing vesting accounts can be added as staking addresses")]
+1 -1
View File
@@ -19,7 +19,7 @@ use std::error::Error;
// `SELECT total_tickets, used_tickets FROM ecash_ticketbook WHERE expiration_date >= ?`, today_date
// then for each calculate the diff total_tickets - used_tickets and multiply the result by the size of the ticket
#[async_trait]
pub trait Storage: Send + Sync {
pub trait Storage: Clone + Send + Sync {
type StorageError: Error;
async fn close(&self);
@@ -26,7 +26,6 @@ nym-api-requests = { path = "../../nym-api/nym-api-requests" }
nym-credentials = { path = "../credentials" }
nym-credentials-interface = { path = "../credentials-interface" }
nym-ecash-contract-common = { path = "../cosmwasm-smart-contracts/ecash-contract" }
nym-ecash-double-spending = { path = "../ecash-double-spending" }
nym-gateway-requests = { path = "../gateway-requests" }
nym-gateway-storage = { path = "../gateway-storage" }
nym-task = { path = "../task" }
+4 -4
View File
@@ -24,6 +24,7 @@ ed25519-dalek = { workspace = true, features = ["rand_core"], optional = true }
rand = { workspace = true, optional = true }
serde_bytes = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"], optional = true }
sha2 = { workspace = true, optional = true }
subtle-encoding = { workspace = true, features = ["bech32-preview"] }
thiserror = { workspace = true }
zeroize = { workspace = true, optional = true, features = ["zeroize_derive"] }
@@ -36,11 +37,10 @@ nym-pemstore = { path = "../../common/pemstore", version = "0.3.0" }
rand_chacha = { workspace = true }
[features]
default = ["sphinx"]
default = []
aead = ["dep:aead", "aead/std", "aes-gcm-siv", "generic-array"]
serde = ["dep:serde", "serde_bytes", "ed25519-dalek/serde", "x25519-dalek/serde"]
asymmetric = ["x25519-dalek", "ed25519-dalek", "zeroize"]
hashing = ["blake3", "digest", "hkdf", "hmac", "generic-array"]
hashing = ["blake3", "digest", "hkdf", "hmac", "generic-array", "sha2"]
stream_cipher = ["aes", "ctr", "cipher", "generic-array"]
sphinx = ["nym-sphinx-types/sphinx"]
outfox = ["nym-sphinx-types/outfox"]
sphinx = ["nym-sphinx-types/sphinx"]
+18 -100
View File
@@ -202,6 +202,18 @@ impl PemStorableKey for PublicKey {
}
}
impl From<x25519_dalek::PublicKey> for PublicKey {
fn from(public_key: x25519_dalek::PublicKey) -> Self {
PublicKey(public_key)
}
}
impl From<PublicKey> for x25519_dalek::PublicKey {
fn from(public_key: PublicKey) -> Self {
public_key.0
}
}
#[derive(Zeroize, ZeroizeOnDrop)]
pub struct PrivateKey(x25519_dalek::StaticSecret);
@@ -308,109 +320,15 @@ impl PemStorableKey for PrivateKey {
}
}
// compatibility with sphinx keys:
#[cfg(feature = "sphinx")]
impl From<PublicKey> for nym_sphinx_types::PublicKey {
fn from(key: PublicKey) -> Self {
nym_sphinx_types::PublicKey::from(key.to_bytes())
impl From<x25519_dalek::StaticSecret> for PrivateKey {
fn from(secret: x25519_dalek::StaticSecret) -> Self {
PrivateKey(secret)
}
}
#[cfg(feature = "sphinx")]
impl<'a> From<&'a PublicKey> for nym_sphinx_types::PublicKey {
fn from(key: &'a PublicKey) -> Self {
nym_sphinx_types::PublicKey::from((*key).to_bytes())
}
}
#[cfg(feature = "sphinx")]
impl From<nym_sphinx_types::PublicKey> for PublicKey {
fn from(pub_key: nym_sphinx_types::PublicKey) -> Self {
Self(x25519_dalek::PublicKey::from(*pub_key.as_bytes()))
}
}
#[cfg(feature = "sphinx")]
impl From<PrivateKey> for nym_sphinx_types::PrivateKey {
fn from(key: PrivateKey) -> Self {
nym_sphinx_types::PrivateKey::from(key.to_bytes())
}
}
#[cfg(feature = "sphinx")]
impl<'a> From<&'a PrivateKey> for nym_sphinx_types::PrivateKey {
fn from(key: &'a PrivateKey) -> Self {
nym_sphinx_types::PrivateKey::from(key.to_bytes())
}
}
#[cfg(feature = "sphinx")]
impl From<nym_sphinx_types::PrivateKey> for PrivateKey {
fn from(private_key: nym_sphinx_types::PrivateKey) -> Self {
let private_key_bytes = private_key.to_bytes();
assert_eq!(private_key_bytes.len(), PRIVATE_KEY_SIZE);
Self::from_bytes(&private_key_bytes).unwrap()
}
}
#[cfg(test)]
mod sphinx_key_conversion {
use super::*;
use rand_chacha::rand_core::SeedableRng;
use rand_chacha::ChaCha20Rng;
pub(super) fn test_rng() -> ChaCha20Rng {
let dummy_seed = [42u8; 32];
ChaCha20Rng::from_seed(dummy_seed)
}
const NUM_ITERATIONS: usize = 100;
#[test]
fn works_for_forward_conversion() {
let mut rng = test_rng();
for _ in 0..NUM_ITERATIONS {
let keys = KeyPair::new(&mut rng);
let private = &keys.private_key;
let public = &keys.public_key;
let dummy_remote = KeyPair::new(&mut rng);
let dh1 = private.diffie_hellman(&dummy_remote.public_key);
let public_bytes = public.to_bytes();
let sphinx_private: nym_sphinx_types::PrivateKey = private.into();
let recovered_private = PrivateKey::from(sphinx_private);
let dh2 = recovered_private.diffie_hellman(&dummy_remote.public_key);
let sphinx_public: nym_sphinx_types::PublicKey = public.into();
let recovered_public = PublicKey::from(sphinx_public);
assert_eq!(public_bytes, recovered_public.to_bytes());
// even though the byte representation of the private key changed, the resultant DH is the same
// which is what matters
assert_eq!(dh1, dh2);
}
}
#[test]
fn works_for_backward_conversion() {
for _ in 0..NUM_ITERATIONS {
let (sphinx_private, sphinx_public) = nym_sphinx_types::crypto::keygen();
let private_bytes = sphinx_private.to_bytes();
let public_bytes = sphinx_public.as_bytes();
let private: PrivateKey = sphinx_private.into();
let recovered_sphinx_private: nym_sphinx_types::PrivateKey = private.into();
let public: PublicKey = sphinx_public.into();
let recovered_sphinx_public: nym_sphinx_types::PublicKey = public.into();
assert_eq!(private_bytes, recovered_sphinx_private.to_bytes());
assert_eq!(public_bytes, recovered_sphinx_public.as_bytes());
}
impl AsRef<x25519_dalek::StaticSecret> for PrivateKey {
fn as_ref(&self) -> &x25519_dalek::StaticSecret {
&self.0
}
}
+37 -3
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
pub use ed25519_dalek::SignatureError;
use ed25519_dalek::{Signer, SigningKey};
use ed25519_dalek::{SecretKey, Signer, SigningKey};
pub use ed25519_dalek::{Verifier, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH, SIGNATURE_LENGTH};
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
use std::fmt::{self, Debug, Display, Formatter};
@@ -18,7 +18,7 @@ pub mod serde_helpers;
use nym_sphinx_types::{DestinationAddressBytes, DESTINATION_ADDRESS_LENGTH};
#[cfg(feature = "rand")]
use rand::{CryptoRng, RngCore};
use rand::{CryptoRng, Rng, RngCore};
#[cfg(feature = "serde")]
use serde::de::Error as SerdeError;
#[cfg(feature = "serde")]
@@ -62,16 +62,33 @@ pub struct KeyPair {
// nothing secret about public key
#[zeroize(skip)]
public_key: PublicKey,
#[zeroize(skip)]
index: u32,
}
/// All keys will always have an index field populated this is to prevent anyone from figuring out if
/// the keys are derived or random, and alter their behaviour based on that.
impl KeyPair {
#[cfg(feature = "rand")]
pub fn new<R: RngCore + CryptoRng>(rng: &mut R) -> Self {
let index = rng.gen();
let ed25519_signing_key = ed25519_dalek::SigningKey::generate(rng);
KeyPair {
private_key: PrivateKey(ed25519_signing_key.to_bytes()),
public_key: PublicKey(ed25519_signing_key.verifying_key()),
index,
}
}
pub fn from_secret(secret: SecretKey, index: u32) -> Self {
let ed25519_signing_key = SigningKey::from(secret);
KeyPair {
private_key: PrivateKey(ed25519_signing_key.to_bytes()),
public_key: PublicKey(ed25519_signing_key.verifying_key()),
index,
}
}
@@ -87,15 +104,31 @@ impl KeyPair {
Ok(KeyPair {
private_key: PrivateKey::from_bytes(priv_bytes)?,
public_key: PublicKey::from_bytes(pub_bytes)?,
index: fake_index(pub_bytes),
})
}
}
/// Reduces a byte slice into a u32 value by XOR-ing all its bytes into a 4-byte accumulator.
/// The process iterates over every byte in the input slice, XOR-ing each one into a slot based on its index modulo 4.
/// If the input slice contains fewer than 4 bytes, the remaining positions in the accumulator remain zero.
/// Finally, the accumulator is interpreted in big-endian order to produce the resulting u32.
/// Index is used to verify deterministic identity key, master key and salt are also requried for verification.
fn fake_index(input: &[u8]) -> u32 {
let mut accumulator = [0u8; 4];
for (i, &byte) in input.iter().enumerate() {
accumulator[i % 4] ^= byte;
}
u32::from_be_bytes(accumulator)
}
impl From<PrivateKey> for KeyPair {
fn from(private_key: PrivateKey) -> Self {
let public_key = (&private_key).into();
KeyPair {
public_key: (&private_key).into(),
public_key,
private_key,
index: fake_index(public_key.to_bytes().as_ref()),
}
}
}
@@ -115,6 +148,7 @@ impl PemStorableKeyPair for KeyPair {
KeyPair {
private_key,
public_key,
index: fake_index(public_key.to_bytes().as_ref()),
}
}
}
+81
View File
@@ -8,6 +8,10 @@ use hkdf::{
},
Hkdf,
};
use sha2::{Sha256, Sha512};
pub use hkdf::InvalidLength;
use zeroize::ZeroizeOnDrop;
/// Perform HKDF `extract` then `expand` as a single step.
pub fn extract_then_expand<D>(
@@ -28,3 +32,80 @@ where
Ok(okm)
}
/// `DerivationMaterial` encapsulates parameters for deterministic key derivation using
/// HKDF (SHA-512).
///
/// It consists of:
/// - A master key (`master_key`): the base secret.
/// - An index (`index`): ensures unique derivations.
/// - A salt (`salt`): adds additional uniqueness, should be application specific.
///
/// Use the `derive_secret()` method to generate a 32-byte secret. To prepare for a new derivation,
/// call the `next()` method, which increments the index. **It is the caller's responsibility to
/// track and persist the derivation index if keys need to be rederived.**
///
/// # Example
///
/// ```rust
/// use nym_crypto::hkdf::DerivationMaterial;
///
/// let master_key = [0u8; 32]; // your secret master key
/// let salt = b"unique-salt-value";
/// let material = DerivationMaterial::new(master_key, 0, salt);
///
/// // Derive a secret
/// let secret = material.derive_secret().expect("Failed to derive secret");
///
/// // Prepare for the next derivation
/// let next_material = material.next();
/// ```
#[derive(ZeroizeOnDrop)]
pub struct DerivationMaterial {
master_key: [u8; 32],
index: u32,
salt: Vec<u8>,
}
impl DerivationMaterial {
pub fn index(&self) -> u32 {
self.index
}
pub fn salt(&self) -> &[u8] {
&self.salt
}
/// Derives a 32-byte seed from a master seed and an index using HKDF (with SHA-512).
///
/// The `salt` and the use of the index (as info) bind this derivation to an application/client.
pub fn derive_secret(&self) -> Result<[u8; 32], hkdf::InvalidLength> {
let salt = &self.salt;
let info = self.index.to_be_bytes(); // Use the index as info
let hk = Hkdf::<Sha512>::new(Some(salt), &self.master_key);
let mut okm = [0u8; 32];
hk.expand(&info, &mut okm)?;
Ok(okm)
}
pub fn new<T: AsRef<[u8]>>(master_key: T, index: u32, salt: &[u8]) -> Self {
// Coerce master_key to [u8; 32]
let mut hasher = Sha256::new();
hasher.update(master_key.as_ref());
let master_key = hasher.finalize().into();
Self {
master_key,
index,
salt: salt.to_vec(),
}
}
pub fn next(&self) -> Self {
Self {
master_key: self.master_key,
index: self.index + 1,
salt: self.salt.clone(),
}
}
}
-14
View File
@@ -1,14 +0,0 @@
[package]
name = "nym-ecash-double-spending"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
bit-vec = { workspace = true }
bloomfilter = { workspace = true }
nym-network-defaults = { path = "../network-defaults" }
-136
View File
@@ -1,136 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use bit_vec::BitVec;
use bloomfilter::Bloom;
use nym_network_defaults::{BloomfilterParameters, ECASH_DS_BLOOMFILTER_PARAMS};
pub struct DoubleSpendingFilter {
params: BloomfilterParameters,
inner: Bloom<Vec<u8>>,
}
impl Default for DoubleSpendingFilter {
fn default() -> Self {
DoubleSpendingFilter::new_empty_ecash()
}
}
pub fn bloom_from_params<T>(params: &BloomfilterParameters, bitvec: BitVec) -> Bloom<Vec<T>> {
assert_eq!(params.bitmap_size, bitvec.len() as u64);
Bloom::from_bit_vec(
bitvec,
params.bitmap_size,
params.num_hashes,
params.sip_keys,
)
}
impl DoubleSpendingFilter {
pub fn new_empty(params: BloomfilterParameters) -> Self {
let bitvec = BitVec::from_elem(params.bitmap_size as usize, false);
DoubleSpendingFilter {
inner: bloom_from_params(&params, bitvec),
params,
}
}
pub fn params(&self) -> BloomfilterParameters {
self.params
}
pub fn rebuild(&self) -> DoubleSpendingFilterBuilder {
DoubleSpendingFilterBuilder::new(self.params)
}
pub fn reset(&mut self) {
self.inner.clear()
}
pub fn new_empty_ecash() -> Self {
DoubleSpendingFilter::new_empty(ECASH_DS_BLOOMFILTER_PARAMS)
}
pub fn builder(params: BloomfilterParameters) -> DoubleSpendingFilterBuilder {
DoubleSpendingFilterBuilder::new(params)
}
pub fn from_bytes(params: BloomfilterParameters, bitmap: &[u8]) -> Self {
DoubleSpendingFilter {
inner: bloom_from_params(&params, BitVec::from_bytes(bitmap)),
params,
}
}
pub fn replace_bitvec(&mut self, new: BitVec) {
self.inner = bloom_from_params(&self.params, new)
}
pub fn dump_bitmap(&self) -> Vec<u8> {
self.inner.bitmap()
}
pub fn set(&mut self, b: &Vec<u8>) {
self.inner.set(b);
}
pub fn check(&self, b: &Vec<u8>) -> bool {
self.inner.check(b)
}
}
pub struct DoubleSpendingFilterBuilder {
params: BloomfilterParameters,
bit_vec_builder: Option<BitVecBuilder>,
}
impl DoubleSpendingFilterBuilder {
pub fn new(params: BloomfilterParameters) -> Self {
DoubleSpendingFilterBuilder {
params,
bit_vec_builder: None,
}
}
pub fn add_bytes(&mut self, b: &[u8]) -> bool {
match &mut self.bit_vec_builder {
None => {
self.bit_vec_builder = Some(BitVecBuilder::new(b));
true
}
Some(builder) => builder.add_bytes(b),
}
}
pub fn build(self) -> DoubleSpendingFilter {
match self.bit_vec_builder {
None => DoubleSpendingFilter::new_empty(self.params),
Some(builder) => DoubleSpendingFilter {
inner: bloom_from_params(&self.params, builder.finish()),
params: self.params,
},
}
}
}
pub struct BitVecBuilder(BitVec);
impl BitVecBuilder {
pub fn new(initial_bitmap: &[u8]) -> Self {
BitVecBuilder(BitVec::from_bytes(initial_bitmap))
}
pub fn add_bytes(&mut self, b: &[u8]) -> bool {
let add = BitVec::from_bytes(b);
if self.0.len() != add.len() {
return false;
}
self.0.or(&add);
true
}
pub fn finish(self) -> BitVec {
self.0
}
}
+6 -6
View File
@@ -59,12 +59,12 @@ pub enum GatewayRequestsError {
source: NymNodeRoutingAddressError,
},
#[error("received request had invalid size. (actual: {0}, but expected one of: {} (ACK), {} (REGULAR), {}, {}, {} (EXTENDED))",
PacketSize::AckPacket.size(),
PacketSize::RegularPacket.size(),
PacketSize::ExtendedPacket8.size(),
PacketSize::ExtendedPacket16.size(),
PacketSize::ExtendedPacket32.size())
#[error("received request had invalid size. (actual: {0}, but expected one of: {a} (ACK), {r} (REGULAR), {e8}, {e16}, {e32} (EXTENDED))",
a = PacketSize::AckPacket.size(),
r = PacketSize::RegularPacket.size(),
e8 = PacketSize::ExtendedPacket8.size(),
e16 = PacketSize::ExtendedPacket16.size(),
e32 = PacketSize::ExtendedPacket32.size())
]
RequestOfInvalidSize(usize),
@@ -13,7 +13,7 @@ use std::str::FromStr;
use tungstenite::Message;
// wrapper for all encrypted requests for ease of use
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[non_exhaustive]
pub enum ClientRequest {
UpgradeKey {
+8 -1
View File
@@ -12,9 +12,10 @@ license.workspace = true
[dependencies]
async-trait = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
reqwest = { workspace = true, features = ["json", "gzip"] }
http.workspace = true
url = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
@@ -22,7 +23,13 @@ tracing = { workspace = true }
nym-bin-common = { path = "../bin-common" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hickory-resolver = { workspace = true, features = ["dns-over-https-rustls", "webpki-roots"] }
# for request timeout until https://github.com/seanmonstar/reqwest/issues/1135 is fixed
[target."cfg(target_arch = \"wasm32\")".dependencies.wasmtimer]
workspace = true
features = ["tokio"]
[dev-dependencies]
tokio = { workspace = true, features=["rt", "macros"] }
+178
View File
@@ -0,0 +1,178 @@
//! DNS resolver configuration for internal lookups.
//!
//! The resolver itself is the set combination of the google, cloudflare, and quad9 endpoints
//! supporting DoH and DoT.
//!
//! This resolver implements a fallback mechanism where, should the DNS-over-TLS resolution fail, a
//! followup resolution will be done using the hosts configured default (e.g. `/etc/resolve.conf` on
//! linux).
//!
//! Requires the `dns-over-https-rustls`, `webpki-roots` feature for the
//! `hickory-resolver` crate
#![deny(missing_docs)]
use crate::ClientBuilder;
use std::{net::SocketAddr, sync::Arc};
use hickory_resolver::lookup_ip::LookupIp;
use hickory_resolver::{
config::{LookupIpStrategy, NameServerConfigGroup, ResolverConfig, ResolverOpts},
error::ResolveError,
lookup_ip::LookupIpIntoIter,
TokioAsyncResolver,
};
use once_cell::sync::OnceCell;
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
use tracing::warn;
impl ClientBuilder {
/// Override the DNS resolver implementation used by the underlying http client.
pub fn dns_resolver<R: Resolve + 'static>(mut self, resolver: Arc<R>) -> Self {
self.reqwest_client_builder = self.reqwest_client_builder.dns_resolver(resolver);
self
}
}
struct SocketAddrs {
iter: LookupIpIntoIter,
}
#[derive(Debug, thiserror::Error)]
#[error("hickory-dns resolver error: {hickory_error}")]
/// Error occurring while resolving a hostname into an IP address.
pub struct HickoryDnsError {
#[from]
hickory_error: ResolveError,
}
/// Wrapper around an `AsyncResolver`, which implements the `Resolve` trait.
#[derive(Debug, Default, Clone)]
pub struct HickoryDnsResolver {
/// Since we might not have been called in the context of a
/// Tokio Runtime in initialization, so we must delay the actual
/// construction of the resolver.
state: Arc<OnceCell<TokioAsyncResolver>>,
fallback: Arc<OnceCell<TokioAsyncResolver>>,
}
impl Resolve for HickoryDnsResolver {
fn resolve(&self, name: Name) -> Resolving {
let resolver = self.state.clone();
let fallback = self.fallback.clone();
Box::pin(async move {
let resolver = resolver.get_or_try_init(new_resolver)?;
// try the primary DNS resolver that we set up (DoH or DoT or whatever)
let lookup = match resolver.lookup_ip(name.as_str()).await {
Ok(res) => res,
Err(e) => {
// on failure use the fall back system configured DNS resolver
warn!("primary DNS failed w/ error {e}: using system fallback");
let resolver = fallback.get_or_try_init(new_resolver_system)?;
resolver.lookup_ip(name.as_str()).await?
}
};
let addrs: Addrs = Box::new(SocketAddrs {
iter: lookup.into_iter(),
});
Ok(addrs)
})
}
}
impl Iterator for SocketAddrs {
type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|ip_addr| SocketAddr::new(ip_addr, 0))
}
}
impl HickoryDnsResolver {
/// Attempt to resolve a domain name to a set of ['IpAddr']s
pub async fn resolve_str(&self, name: &str) -> Result<LookupIp, HickoryDnsError> {
let resolver = self.state.get_or_try_init(new_resolver)?;
// try the primary DNS resolver that we set up (DoH or DoT or whatever)
let lookup = match resolver.lookup_ip(name).await {
Ok(res) => res,
Err(e) => {
// on failure use the fall back system configured DNS resolver
warn!("primary DNS failed w/ error {e}: using system fallback");
let resolver = self.fallback.get_or_try_init(new_resolver_system)?;
resolver.lookup_ip(name).await?
}
};
Ok(lookup)
}
}
/// Create a new resolver with a custom DoT based configuration. The options are overridden to look
/// up for both IPv4 and IPv6 addresses to work with "happy eyeballs" algorithm.
fn new_resolver() -> Result<TokioAsyncResolver, HickoryDnsError> {
let mut name_servers = NameServerConfigGroup::google_tls();
name_servers.merge(NameServerConfigGroup::google_https());
// name_servers.merge(NameServerConfigGroup::google_h3());
name_servers.merge(NameServerConfigGroup::quad9_tls());
name_servers.merge(NameServerConfigGroup::quad9_https());
name_servers.merge(NameServerConfigGroup::cloudflare_tls());
name_servers.merge(NameServerConfigGroup::cloudflare_https());
let config = ResolverConfig::from_parts(None, Vec::new(), name_servers);
let mut opts = ResolverOpts::default();
opts.ip_strategy = LookupIpStrategy::Ipv4AndIpv6;
// Would like to enable this when 0.25 stabilizes
// opts.server_ordering_strategy = ServerOrderingStrategy::RoundRobin;
Ok(TokioAsyncResolver::tokio(config, opts))
}
/// Create a new resolver with the default configuration, which reads from the system DNS config
/// (i.e. `/etc/resolve.conf` in unix). The options are overridden to look up for both IPv4 and IPv6
/// addresses to work with "happy eyeballs" algorithm.
fn new_resolver_system() -> Result<TokioAsyncResolver, HickoryDnsError> {
let (config, mut opts) = hickory_resolver::system_conf::read_system_conf()?;
opts.ip_strategy = LookupIpStrategy::Ipv4AndIpv6;
Ok(TokioAsyncResolver::tokio(config, opts))
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn reqwest_hickory_doh() {
let resolver = HickoryDnsResolver::default();
let client = reqwest::ClientBuilder::new()
.dns_resolver(resolver.into())
.build()
.unwrap();
let resp = client
.get("http://ifconfig.me:80")
.send()
.await
.unwrap()
.bytes()
.await
.unwrap();
assert!(!resp.is_empty());
}
#[tokio::test]
async fn dns_lookup() -> Result<(), HickoryDnsError> {
let resolver = HickoryDnsResolver::default();
let domain = "ifconfig.me";
let addrs = resolver.resolve_str(domain).await?;
assert!(addrs.into_iter().next().is_some());
Ok(())
}
}
File diff suppressed because it is too large Load Diff
+5
View File
@@ -7,11 +7,16 @@ use http::HeaderValue;
use nym_bin_common::build_information::{BinaryBuildInformation, BinaryBuildInformationOwned};
use serde::{Deserialize, Serialize};
/// Characteristic elements sent to the API providing basic context information of the requesting client.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct UserAgent {
/// The internal crate / application / subsystem making use of API client
pub application: String,
/// version of the calling crate / application / subsystem
pub version: String,
/// client platform
pub platform: String,
/// source commit version for the calling calling crate / subsystem
pub git_commit: String,
}
+1 -1
View File
@@ -26,4 +26,4 @@ utoipa = [ "dep:utoipa" ]
[build-dependencies]
regex = { workspace = true }
cargo_metadata = { version = "0.18" }
cargo_metadata = { workspace = true }
+1 -1
View File
@@ -31,7 +31,7 @@ nym-pemstore = { path = "../pemstore" }
nym-network-defaults = { path = "../network-defaults", default-features = false }
[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
criterion = { workspace = true, features = ["html_reports"] }
[[bench]]
-2
View File
@@ -48,12 +48,10 @@ features = ["sync"]
[features]
default = ["sphinx"]
sphinx = [
"nym-crypto/sphinx",
"nym-sphinx-params/sphinx",
"nym-sphinx-types/sphinx",
]
outfox = [
"nym-crypto/outfox",
"nym-sphinx-params/outfox",
"nym-sphinx-types/outfox",
]
+1 -1
View File
@@ -8,7 +8,7 @@ license = { workspace = true }
repository = { workspace = true }
[dependencies]
nym-crypto = { path = "../../crypto", features = ["asymmetric"] } # all addresses are expressed in terms on their crypto keys
nym-crypto = { path = "../../crypto", features = ["asymmetric", "sphinx"] } # all addresses are expressed in terms on their crypto keys
nym-sphinx-types = { path = "../types", features = ["sphinx"] } # we need to be able to refer to some types defined inside sphinx crate
serde = { workspace = true } # implementing serialization/deserialization for some types, like `Recipient`
thiserror = { workspace = true }
@@ -12,6 +12,7 @@ rand = { workspace = true }
bs58 = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
sphinx-packet = { workspace = true }
nym-crypto = { path = "../../crypto", features = ["stream_cipher", "rand"] }
nym-sphinx-addressing = { path = "../addressing" }
@@ -7,11 +7,12 @@ use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, MAX_NODE_ADDRESS_UNPADDED_LEN};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
use nym_sphinx_types::{NymPacket, SURBMaterial, SphinxError, SURB};
use nym_sphinx_types::{Destination, NymPacket, SURBMaterial, SphinxError, SURB};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use serde::de::{Error as SerdeError, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use sphinx_packet::route::Node;
use std::fmt::{self, Formatter};
use std::time;
@@ -83,6 +84,25 @@ impl ReplySurb {
packet_size.plaintext_size() - ack_overhead - ReplySurbKeyDigestAlgorithm::output_size() - 1
}
pub fn construct_with_route<R>(
rng: &mut R,
destination: Destination,
average_delay: time::Duration,
route: &[Node],
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let surb_material = SURBMaterial::new(route.to_vec(), delays, destination);
// this can't fail as we know we have a valid route to gateway and have correct number of delays
Ok(ReplySurb {
surb: surb_material.construct_SURB().unwrap(),
encryption_key: SurbEncryptionKey::new(rng),
})
}
// TODO: should this return `ReplySURBError` for consistency sake
// or keep `NymTopologyError` because it's the only error it can actually return?
pub fn construct<R>(
@@ -123,11 +143,15 @@ impl ReplySurb {
pub fn to_bytes(&self) -> Vec<u8> {
// KEY || SURB_BYTES
self.encryption_key
let bytes: Vec<u8> = self.encryption_key
.to_bytes()
.into_iter()
.chain(self.surb.to_bytes())
.collect()
.collect();
assert_eq!(bytes.len(), ReplySurb::serialized_len());
bytes
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, ReplySurbError> {
@@ -170,6 +170,7 @@ impl RepliableMessage {
}
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, InvalidReplyRequestError> {
// println!("Trying to deserialize message: {} bytes", bytes.len());
if bytes.len() < SENDER_TAG_SIZE + 1 {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
@@ -559,7 +560,7 @@ mod tests {
let mut address_bytes = [0; NODE_ADDRESS_LENGTH];
rng.fill_bytes(&mut address_bytes);
let dummy_private = PrivateKey::new_with_rng(rng);
let dummy_private = PrivateKey::random_from_rng(rng);
let pub_key = (&dummy_private).into();
Node {
address: NodeAddressBytes::from_bytes(address_bytes),
+4
View File
@@ -9,6 +9,7 @@ repository = { workspace = true }
[dependencies]
bytes = { workspace = true }
cfg-if = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
thiserror = { workspace = true }
log = { workspace = true }
@@ -20,5 +21,8 @@ nym-metrics = { path = "../../nym-metrics" }
nym-sphinx-addressing = { path = "../addressing" }
nym-sphinx-acknowledgements = { path = "../acknowledgements" }
[features]
no-acks = []
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
+14 -9
View File
@@ -130,28 +130,33 @@ impl Decoder for NymCodec {
mod packet_encoding {
use super::*;
use nym_sphinx_types::{
crypto, Delay as SphinxDelay, Destination, DestinationAddressBytes, Node, NodeAddressBytes,
DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, NODE_ADDRESS_LENGTH,
Delay as SphinxDelay, Destination, DestinationAddressBytes, Node, NodeAddressBytes,
PrivateKey, DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, NODE_ADDRESS_LENGTH,
};
fn random_pubkey() -> nym_sphinx_types::PublicKey {
let private_key = PrivateKey::random();
(&private_key).into()
}
fn make_valid_outfox_packet(size: PacketSize) -> NymPacket {
let (_, node1_pk) = crypto::keygen();
let node1_pk = random_pubkey();
let node1 = Node::new(
NodeAddressBytes::from_bytes([5u8; NODE_ADDRESS_LENGTH]),
node1_pk,
);
let (_, node2_pk) = crypto::keygen();
let node2_pk = random_pubkey();
let node2 = Node::new(
NodeAddressBytes::from_bytes([4u8; NODE_ADDRESS_LENGTH]),
node2_pk,
);
let (_, node3_pk) = crypto::keygen();
let node3_pk = random_pubkey();
let node3 = Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
node3_pk,
);
let (_, node4_pk) = crypto::keygen();
let node4_pk = random_pubkey();
let node4 = Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
node4_pk,
@@ -170,17 +175,17 @@ mod packet_encoding {
}
fn make_valid_sphinx_packet(size: PacketSize) -> NymPacket {
let (_, node1_pk) = crypto::keygen();
let node1_pk = random_pubkey();
let node1 = Node::new(
NodeAddressBytes::from_bytes([5u8; NODE_ADDRESS_LENGTH]),
node1_pk,
);
let (_, node2_pk) = crypto::keygen();
let node2_pk = random_pubkey();
let node2 = Node::new(
NodeAddressBytes::from_bytes([4u8; NODE_ADDRESS_LENGTH]),
node2_pk,
);
let (_, node3_pk) = crypto::keygen();
let node3_pk = random_pubkey();
let node3 = Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
node3_pk,
+92 -27
View File
@@ -4,8 +4,10 @@ use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressE
use nym_sphinx_params::{PacketSize, PacketType};
use nym_sphinx_types::{
Delay as SphinxDelay, DestinationAddressBytes, NodeAddressBytes, NymPacket, NymPacketError,
NymProcessedPacket, OutfoxError, PrivateKey, ProcessedPacket, SphinxError,
NymProcessedPacket, OutfoxError, PrivateKey, ProcessedPacketData, SphinxError,
Version as SphinxPacketVersion,
};
use std::fmt::Display;
use thiserror::Error;
use crate::packet::FramedNymPacket;
@@ -13,12 +15,38 @@ use nym_metrics::nanos;
use nym_sphinx_forwarding::packet::MixPacket;
#[derive(Debug)]
pub enum MixProcessingResult {
pub enum MixProcessingResultData {
/// Contains unwrapped data that should first get delayed before being sent to next hop.
ForwardHop(MixPacket, Option<SphinxDelay>),
ForwardHop {
packet: MixPacket,
delay: Option<SphinxDelay>,
},
/// Contains all data extracted out of the final hop packet that could be forwarded to the destination.
FinalHop(ProcessedFinalHop),
FinalHop { final_hop_data: ProcessedFinalHop },
}
#[derive(Debug, Copy, Clone)]
pub enum MixPacketVersion {
Outfox,
Sphinx(SphinxPacketVersion),
}
impl Display for MixPacketVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
MixPacketVersion::Outfox => "outfox".fmt(f),
MixPacketVersion::Sphinx(sphinx_version) => {
write!(f, "sphinx-{}", sphinx_version.value())
}
}
}
}
#[derive(Debug)]
pub struct MixProcessingResult {
pub packet_version: MixPacketVersion,
pub processing_data: MixProcessingResultData,
}
type ForwardAck = MixPacket;
@@ -107,37 +135,63 @@ fn perform_final_processing(
) -> Result<MixProcessingResult, PacketProcessingError> {
match packet {
NymProcessedPacket::Sphinx(packet) => {
match packet {
ProcessedPacket::ForwardHop(packet, address, delay) => {
process_forward_hop(NymPacket::Sphinx(*packet), address, delay, packet_type)
}
let processing_data = match packet.data {
ProcessedPacketData::ForwardHop {
next_hop_packet,
next_hop_address,
delay,
} => process_forward_hop(
NymPacket::Sphinx(next_hop_packet),
next_hop_address,
delay,
packet_type,
),
// right now there's no use for the surb_id included in the header - probably it should get removed from the
// sphinx all together?
ProcessedPacket::FinalHop(destination, _, payload) => process_final_hop(
ProcessedPacketData::FinalHop {
destination,
identifier: _,
payload,
} => process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
),
}
}?;
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Sphinx(packet.version),
processing_data,
})
}
NymProcessedPacket::Outfox(packet) => {
let next_address = *packet.next_address();
let packet = packet.into_packet();
if packet.is_final_hop() {
process_final_hop(
let processing_data = process_final_hop(
DestinationAddressBytes::from_bytes(next_address),
packet.recover_plaintext()?.to_vec(),
packet_size,
packet_type,
)
)?;
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Outfox,
processing_data,
})
} else {
let mix_packet = MixPacket::new(
let packet = MixPacket::new(
NymNodeRoutingAddress::try_from_bytes(&next_address)?,
NymPacket::Outfox(packet),
PacketType::Outfox,
);
Ok(MixProcessingResult::ForwardHop(mix_packet, None))
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Outfox,
processing_data: MixProcessingResultData::ForwardHop {
packet,
delay: None,
},
})
}
}
}
@@ -148,14 +202,16 @@ fn process_final_hop(
payload: Vec<u8>,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<MixProcessingResult, PacketProcessingError> {
) -> Result<MixProcessingResultData, PacketProcessingError> {
let (forward_ack, message) = split_into_ack_and_message(payload, packet_size, packet_type)?;
Ok(MixProcessingResult::FinalHop(ProcessedFinalHop {
destination,
forward_ack,
message,
}))
Ok(MixProcessingResultData::FinalHop {
final_hop_data: ProcessedFinalHop {
destination,
forward_ack,
message,
},
})
}
fn split_into_ack_and_message(
@@ -174,8 +230,12 @@ fn split_into_ack_and_message(
| PacketSize::ExtendedPacket32
| PacketSize::OutfoxRegularPacket => {
trace!("received a normal packet!");
let (ack_data, message) = split_hop_data_into_ack_and_message(data, packet_type)?;
let (ack_first_hop, ack_packet) =
cfg_if::cfg_if! {
if #[cfg(feature = "no-acks")] {
return Ok((None, data));
} else {
let (ack_data, message) = split_hop_data_into_ack_and_message(data, packet_type)?;
let (ack_first_hop, ack_packet) =
match SurbAck::try_recover_first_hop_packet(&ack_data, packet_type) {
Ok((first_hop, packet)) => (first_hop, packet),
Err(err) => {
@@ -183,8 +243,10 @@ fn split_into_ack_and_message(
return Err(err.into());
}
};
let forward_ack = MixPacket::new(ack_first_hop, ack_packet, packet_type);
Ok((Some(forward_ack), message))
let forward_ack = MixPacket::new(ack_first_hop, ack_packet, packet_type);
Ok((Some(forward_ack), message))
}
}
}
}
}
@@ -211,11 +273,14 @@ fn process_forward_hop(
forward_address: NodeAddressBytes,
delay: SphinxDelay,
packet_type: PacketType,
) -> Result<MixProcessingResult, PacketProcessingError> {
) -> Result<MixProcessingResultData, PacketProcessingError> {
let next_hop_address = NymNodeRoutingAddress::try_from(forward_address)?;
let mix_packet = MixPacket::new(next_hop_address, packet, packet_type);
Ok(MixProcessingResult::ForwardHop(mix_packet, Some(delay)))
let packet = MixPacket::new(next_hop_address, packet, packet_type);
Ok(MixProcessingResultData::ForwardHop {
packet,
delay: Some(delay),
})
}
// TODO: what more could we realistically test here?
+2 -2
View File
@@ -16,5 +16,5 @@ nym-sphinx-types = { path = "../types" }
[features]
default = ["sphinx"]
sphinx = ["nym-crypto/sphinx", "nym-sphinx-types/outfox"]
outfox = ["nym-crypto/outfox", "nym-sphinx-types/outfox"]
sphinx = ["nym-sphinx-types/outfox"]
outfox = ["nym-sphinx-types/outfox"]
+17 -7
View File
@@ -1,14 +1,22 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::{array::TryFromSliceError, fmt};
use thiserror::Error;
#[cfg(feature = "outfox")]
use nym_outfox::packet::{OutfoxPacket, OutfoxProcessedPacket};
#[cfg(feature = "sphinx")]
use sphinx_packet::{SphinxPacket, SphinxPacketBuilder};
#[cfg(feature = "outfox")]
pub use nym_outfox::{
constants::MIN_PACKET_SIZE, constants::MIX_PARAMS_LEN, constants::OUTFOX_PACKET_OVERHEAD,
error::OutfoxError,
};
// re-exporting types and constants available in sphinx
#[cfg(feature = "outfox")]
use nym_outfox::packet::{OutfoxPacket, OutfoxProcessedPacket};
#[cfg(feature = "sphinx")]
pub use sphinx_packet::{
constants::{
@@ -21,12 +29,10 @@ pub use sphinx_packet::{
payload::{Payload, PAYLOAD_OVERHEAD_SIZE},
route::{Destination, DestinationAddressBytes, Node, NodeAddressBytes, SURBIdentifier},
surb::{SURBMaterial, SURB},
Error as SphinxError, ProcessedPacket,
version::Version,
version::UPDATED_LEGACY_VERSION,
Error as SphinxError, ProcessedPacket, ProcessedPacketData,
};
#[cfg(feature = "sphinx")]
use sphinx_packet::{SphinxPacket, SphinxPacketBuilder};
use std::{array::TryFromSliceError, fmt};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum NymPacketError {
@@ -85,8 +91,12 @@ impl NymPacket {
destination: &Destination,
delays: &[Delay],
) -> Result<NymPacket, NymPacketError> {
// FIXME:
// for now explicitly use the legacy version until sufficient number of nodes
// understand both variants
Ok(NymPacket::Sphinx(
SphinxPacketBuilder::new()
.with_version(UPDATED_LEGACY_VERSION)
.with_payload_size(size)
.build_packet(message, route, destination, delays)?,
))
@@ -30,6 +30,10 @@ pub struct Config {
}
impl Config {
pub fn base(&self) -> BaseClientConfig {
self.base.clone()
}
pub fn new<S: Into<String>>(id: S, version: S, provider_mix_address: S) -> Self {
Config {
base: BaseClientConfig::new(id, version),
+7 -3
View File
@@ -76,6 +76,10 @@ where
<S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
<S::KeyStore as KeyStore>::StorageError: Send + Sync,
{
pub fn config(&self) -> Config {
self.config.clone()
}
pub fn new(
config: Config,
storage: S,
@@ -233,7 +237,7 @@ where
};
let mut base_builder =
BaseClientBuilder::new(&self.config.base, self.storage, dkg_query_client)
BaseClientBuilder::new(self.config.base(), self.storage, dkg_query_client)
.with_gateway_setup(self.setup_method)
.with_user_agent(self.user_agent);
@@ -241,7 +245,7 @@ where
base_builder = base_builder.with_stored_topology(custom_mixnet)?;
}
let packet_type = self.config.base.debug.traffic.packet_type;
let packet_type = self.config.base().debug.traffic.packet_type;
let mut started_client = base_builder.start_base().await?;
let self_address = started_client.address;
let client_input = started_client.client_input.register_producer();
@@ -252,7 +256,7 @@ where
Self::start_socks5_listener(
&self.config.socks5,
self.config.base.debug,
self.config.base().debug,
client_input,
client_output,
client_state,
+12 -3
View File
@@ -25,19 +25,28 @@ pub type ClientStatsReceiver = tokio::sync::mpsc::UnboundedReceiver<ClientStatsE
#[derive(Clone)]
pub struct ClientStatsSender {
stats_tx: Option<UnboundedSender<ClientStatsEvents>>,
task_client: TaskClient,
}
impl ClientStatsSender {
/// Create a new statistics Sender
pub fn new(stats_tx: Option<UnboundedSender<ClientStatsEvents>>) -> Self {
ClientStatsSender { stats_tx }
pub fn new(
stats_tx: Option<UnboundedSender<ClientStatsEvents>>,
task_client: TaskClient,
) -> Self {
ClientStatsSender {
stats_tx,
task_client,
}
}
/// Report a statistics event using the sender.
pub fn report(&self, event: ClientStatsEvents) {
if let Some(tx) = &self.stats_tx {
if let Err(err) = tx.send(event) {
log::error!("Failed to send stats event: {:?}", err);
if !self.task_client.is_shutdown_poll() {
log::error!("Failed to send stats event: {err}");
}
}
}
}
+6 -2
View File
@@ -221,7 +221,7 @@ impl TaskManager {
}
}
pub(crate) async fn wait_for_graceful_shutdown(&mut self) {
pub async fn wait_for_graceful_shutdown(&mut self) {
if let Some(notify_rx) = self.notify_rx.take() {
drop(notify_rx);
}
@@ -299,6 +299,8 @@ impl Clone for TaskClient {
None
};
log::debug!("Cloned task client: {name:?}");
TaskClient {
name,
shutdown: AtomicBool::new(self.shutdown.load(Ordering::Relaxed)),
@@ -315,7 +317,7 @@ impl TaskClient {
const MAX_NAME_LENGTH: usize = 128;
const OVERFLOW_NAME: &'static str = "reached maximum TaskClient children name depth";
const SHUTDOWN_TIMEOUT_WAITING_FOR_SIGNAL_ON_EXIT: Duration = Duration::from_secs(5);
const SHUTDOWN_TIMEOUT_WAITING_FOR_SIGNAL_ON_EXIT: Duration = Duration::from_secs(10);
fn new(
notify: watch::Receiver<()>,
@@ -344,6 +346,7 @@ impl TaskClient {
format!("unknown-{suffix}")
};
log::debug!("Forked task client: {child_name}");
child.name = Some(child_name);
child
}
@@ -377,6 +380,7 @@ impl TaskClient {
} else {
format!("unknown-{suffix}")
};
log::debug!("Renamed task client: {name}");
self.named(name)
}
+1 -1
View File
@@ -27,7 +27,7 @@ wasm-bindgen = { workspace = true, optional = true }
## internal
nym-config = { path = "../config" }
nym-crypto = { path = "../crypto", features = ["sphinx", "outfox"] }
nym-crypto = { path = "../crypto" }
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
nym-sphinx-addressing = { path = "../nymsphinx/addressing" }
nym-sphinx-types = { path = "../nymsphinx/types", features = [
+2 -2
View File
@@ -11,7 +11,7 @@ use std::borrow::Borrow;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::net::IpAddr;
use tracing::{debug, warn};
use tracing::{debug, trace, warn};
pub use crate::node::{EntryDetails, RoutingNode, SupportedRoles};
pub use error::NymTopologyError;
@@ -293,7 +293,7 @@ impl NymTopology {
let has_exit_gateways = !self.rewarded_set.exit_gateways.is_empty();
let has_entry_gateways = !self.rewarded_set.entry_gateways.is_empty();
debug!(
trace!(
has_layer1 = %has_layer1,
has_layer2 = %has_layer2,
has_layer3 = %has_layer3,
+1 -1
View File
@@ -105,7 +105,7 @@ impl<'a> From<&'a RoutingNode> for SphinxNode {
.try_into()
.unwrap();
SphinxNode::new(node_address_bytes, (&node.sphinx_key).into())
SphinxNode::new(node_address_bytes, node.sphinx_key.into())
}
}

Some files were not shown because too many files have changed in this diff Show More