Compare commits

...

180 Commits

Author SHA1 Message Date
import this 340960d957 [DOCs/operators]: Release notes for v2025.19 kase (#6157)
* add release and operators notes

* bump up version

* fix location in csv to USA

* bump up stats

* typo fix
2025-10-31 09:02:13 +00:00
Mark Sinclair 554e446208 change migration and bump version 2025-10-31 09:02:13 +00:00
Mark Sinclair a6325b922a bump version to rc 2025-10-31 09:02:13 +00:00
Mark Sinclair fae4768b99 add tracing output 2025-10-31 09:02:13 +00:00
Mark Sinclair 2689a4dbd8 clippy 2025-10-31 09:02:13 +00:00
Mark Sinclair 02e40ccaef save custom_http_port to db 2025-10-31 09:02:13 +00:00
Mark Sinclair b216338364 allow NS API to run once for scraping for troubleshooting and debugging 2025-10-31 09:02:13 +00:00
Mark Sinclair 4a83bb9ba8 wip 2025-10-31 09:02:13 +00:00
Mark Sinclair 66ec3b037f ns-api: fix scraping bug when operator specifies custom node HTTP API port in bond 2025-10-31 09:02:13 +00:00
import this 168baa5071 [Feature/operators]: QUIC bridge deployment script v2 (#6145)
* new quick deployment script

* docs tweak

* update script to use .deb postinst

* final clean - ready to go

* correct nym-node config dir search with a fallback
2025-10-31 09:02:13 +00:00
Simon Wicky 7aef468839 remove unused deps (#6151) 2025-10-31 09:02:12 +00:00
Simon Wicky cb3ccd7f7e use typed builder (#6150) 2025-10-31 09:02:12 +00:00
Simon Wicky 16509dbace allow overwriting existing sdk shutdown manager 2025-10-31 09:02:12 +00:00
Simon Wicky b907ccbd5b typo 2025-10-31 09:02:12 +00:00
Simon Wicky 7b3194d7d2 calling for shutdown from the MixTrafficController 2025-10-31 09:02:12 +00:00
Jędrzej Stuczyński 0b81edfc66 using same hierarchy of trackers for client shutdown control 2025-10-31 09:02:12 +00:00
Tommy Verrall 8c6150e5ef Internal comments 2025-10-31 09:02:12 +00:00
Tommy Verrall 46164a389a Fix comments 2025-10-31 09:02:12 +00:00
Tommy Verrall 96d256b48f Better message to come in the PR description 2025-10-31 09:02:12 +00:00
Simon Wicky f98b93f60e tommy is too quick 2025-10-31 09:02:12 +00:00
Simon Wicky 0f6c356f39 configurable mixnet client startup timeout 2025-10-31 09:02:12 +00:00
p17o cf3f5d9a53 Update quic_bridge_deployment.sh for IPv4 and .deb package (#6138)
Updated ping commands to explicitly use IPv4 and adjusted file permission checks with sudo. Changed the forward address prompt to specify IPv4 and modified the binary download process to fetch and install the latest .deb release URL automatically.
2025-10-31 09:02:12 +00:00
Jędrzej Stuczyński 2f47e6349a bugfix: update internal owner address in transferred share (#6139) 2025-10-31 09:02:12 +00:00
Tommy Verrall 4b2b5390d3 Last failing test - fix 2025-10-31 09:02:12 +00:00
Tommy Verrall 734ff63f6d Use explicit Vec<ApiUrl> handling in BaseClientBuilder
- Replace NymNetworkDetails with explicit API URL handling
- Fix deprecated from_network() usage and improve fallback logic
- Add URL validation and remove unused backwards compatibility
2025-10-31 09:02:11 +00:00
Tommy Verrall 910c24e7da Actually commit the recommended changes 2025-10-31 09:02:11 +00:00
Tommy Verrall c96b73172a Fix broken tests in CI 2025-10-31 09:02:11 +00:00
Tommy Verrall 823c2c7262 Replace deprecated from_network() with new_with_fronted_urls() 2025-10-31 09:02:11 +00:00
Jędrzej Stuczyński 03d1b72a9e feat: expose more explicit new_with_fronted_urls builder for http API client (#6136) 2025-10-31 09:02:10 +00:00
Jędrzej Stuczyński 066669440f bugfix: update stored epoch share when changing ownership (#6135) 2025-10-31 09:01:27 +00:00
Jędrzej Stuczyński 9fb3443fd1 bugfix: update stored epoch share when changing announce address (#6131)
* bugfix: update stored epoch share when changing announce address

* chore: remove placeholder legacy mixnode bonding test [mixnet contract]
2025-10-31 09:01:27 +00:00
Tommy Verrall cd89a590b5 Fix new_from_env() to populate nym_api_urls for domain fronting 2025-10-31 09:01:27 +00:00
Tommy Verrall 06cb8bd969 fix all clippy messages 2025-10-31 09:01:27 +00:00
Tommy Verrall c72fef169c Add more tests for retry logic 2025-10-31 09:01:27 +00:00
Tommy Verrall 3c27eb41b8 Fix confusing tracing logs 2025-10-31 09:01:27 +00:00
Tommy Verrall 2428374fe7 Fix retries - this is working 2025-10-31 09:01:27 +00:00
Tommy Verrall 5a491f0a7e Add configuration-based domain fronting support
Changes:
- Add network_details field to BaseClientBuilder (optional, backwards compatible)
- Add with_network_details() method for opt-in domain fronting
- Update construct_nym_api_client() to use from_network() when network_details provided
- Enable network-defaults feature in nym-client-core Cargo.toml
- SDK passes network_details to BaseClientBuilder
2025-10-31 09:01:27 +00:00
Tommy Verrall 530f9ccb6f Fix CI issues 2025-10-31 09:01:26 +00:00
Tommy Verrall 2228a81d43 Allow clippy::enum_variant_names for BuilderConfigError 2025-10-31 09:01:26 +00:00
Tommy Verrall c8a1b53071 Improve error handling
Changes:
- Replace String error with BuilderConfigError enum in BuilderConfigBuilder
- Update tests to use pattern matching instead of string assertions
2025-10-31 09:01:26 +00:00
Tommy Verrall b896aaaed1 - Add DEFAULT_NYM_API_RETRIES constant (replaces magic number 3)
- Run cargo fmt on all affected packages
- All clippy warnings resolved
2025-10-31 09:01:26 +00:00
Tommy Verrall 828ffc6710 not sure what happened but it's fixed 2025-10-31 09:01:26 +00:00
Andy Duplain 05cdb27029 VPN-4262: Update Url to return url and front fields.
The VPN client is using the `Url` type alot now and in order to avoid
double URL-parsing we would like the content of the `Url` type exposed.
2025-10-31 09:01:26 +00:00
Tommy Verrall 3f29d3eba5 Add accessor methods for Url internals
Add inner_url() and fronts() accessor methods to nym_http_api_client::Url
for VPN client integration
2025-10-31 09:01:22 +00:00
Tommy Verrall aa2c41dc41 Merge resolution 2025-10-31 09:01:02 +00:00
Tommy Verrall 048de771ab Remove tests for removed with_nym_api_client method
These tests were referencing with_nym_api_client() which was removed when
cleaning domain fronting code from this branch
2025-10-31 09:01:02 +00:00
Tommy Verrall e1b06f02f3 Add optional builder pattern for BuilderConfig (non-breaking)
Addresses @jstuczyn's feedback about too many arguments by adding
BuilderConfigBuilder as an alternative to the existing new() method.
2025-10-31 09:01:02 +00:00
Tommy Verrall b5b8b8f224 fix conversion type && make the retry count configurable 2025-10-31 09:01:02 +00:00
Tommy Verrall 2d7141dfb1 Revert node filtering changes per Andrew's feedback
Andrew clarified that get_basic_entry_assigned_nodes_v2() already filters by
supported_roles.entry
2025-10-31 09:01:02 +00:00
Tommy Verrall a07522258f Remove domain fronting code to keep gateway changes only
This branch now contains only gateway registration improvements:
- Multiple URL fallback support in gateways_for_init()
- Get all entry-capable nodes for registration
- Performance and code quality improvements
2025-10-31 09:00:17 +00:00
Tommy Verrall 547a441002 Address PR feedback: simplify code and reduce log noise
- Reverted all changes to topology_control/nym_api_provider.rs
- Changed info/warn logs to debug for custom client messages
- Removed unused _rng parameter from gateways_for_init()
- Simplified URL builder to always use new_with_urls()
2025-10-31 09:00:17 +00:00
Tommy Verrall 93208fb5e0 Fix clippy warnings: use arrays instead of vec! in tests 2025-10-31 09:00:17 +00:00
Bogdan-Ștefan Neacşu c9b50dd979 Introduce event backchannel (#6119)
* Introduce even backchannel

* Rust fmt

* Rename Event to MixnetClientEvent

* Use unbounded_send for events

* Remove unused file

* Remove mut borrow

* Event hierarchy and mixnet client intermediary

* Export MixTrafficEvent in sdk
2025-10-31 09:00:15 +00:00
Jędrzej Stuczyński 74cdfd5d94 Merge pull request #6099 from nymtech/bugfix/incompatibility-fixes
Bugfix/incompatibility fixes
2025-10-31 08:59:40 +00:00
Jędrzej Stuczyński 953e813f0e Bugfix/bloomfilters purge (#6089)
* remove all old bloomfilters upon starting binary

* remove old bloomfilter file upon purging secondary data
2025-10-31 08:59:38 +00:00
Tommy Verrall 29cf5058a6 feat: pass custom HTTP client through SDK stack for domain fronting
- Add with_nym_api_client() to BaseClientBuilder, MixnetClientBuilder, and RegistrationClientBuilderConfig

- Modify nym_api_provider to fetch all nodes then filter by supported_roles.entry (fixes metadata inconsistency)

- Update helpers.rs to build HTTP client with all nym_apis URLs and retries for fallback support

- Fix SDK to use entry_capable_nodes() instead of entry_gateways() for broader gateway selection

This enables domain fronting and URL rotation throughout the entire SDK stack, improving censorship resistance and connection reliability. All changes are backward compatible - custom client is optional.
2025-10-31 08:57:08 +00:00
Tommy Verrall a2856552d8 enable URL rotation and retries for mixnet gateway init 2025-10-31 08:57:08 +00:00
Andrej Mihajlov a33c603471 Update dirs to 6.0 2025-10-31 08:57:08 +00:00
Jędrzej Stuczyński a9f9266992 bugfix: nym-credential-proxy query params parsing regression (#6121) 2025-10-31 08:57:08 +00:00
Tommy Verrall cf34d0d24a Skip ipv6 metadata endpoint request (#6118)
Co-authored-by: Tommy Verrall <tommy@nymtech.net>
2025-10-31 08:57:07 +00:00
Jędrzej Stuczyński 5fa7b0a709 bugfix: revert some dep updates introduced in #6043 (#6120) 2025-10-31 08:57:07 +00:00
Andrej Mihajlov e232b4fd24 Revert "Propagate cancel token to mixnet client"
This reverts commit 50a259d454.
2025-10-31 08:57:07 +00:00
Jędrzej Stuczyński 609f174e8d chore: restore pending dkg contract state migration (#6116)
since it has not yet been run on mainnet
2025-10-31 08:57:07 +00:00
benedetta davico a0f4627647 Update lib.go 2025-10-31 08:57:07 +00:00
Jędrzej Stuczyński 258f8f5f5d bugfix: retrieve and update ticketbook in the same query (#6101)
* bugfix: retrieve and update ticketbook in the same query

* bump up NS version

* Update Cargo.toml

* remove SKIP LOCKED part of the query

---------

Co-authored-by: benedetta davico <46782255+benedettadavico@users.noreply.github.com>
2025-10-31 08:57:07 +00:00
mfahampshire 38220e05f1 DOCS Jarlsberg Release (#6111)
* First pass release notes

* build info
2025-10-31 08:57:07 +00:00
Andrej Mihajlov 6250ebe235 Propagate cancel token to mixnet client 2025-10-31 08:57:07 +00:00
mfahampshire a55323c0e2 Patch for operators to open wg metadata port (#6106) 2025-10-31 08:57:07 +00:00
Jędrzej Stuczyński baa8ac3610 bugfix: use custom topology provider for list of init gateways (#6092) 2025-10-31 08:57:07 +00:00
Jędrzej Stuczyński 933da11e8f bugfix: include network name in the default gateway probe config path (#6100) 2025-10-31 08:57:07 +00:00
Jędrzej Stuczyński 0469036da4 feat: expose obtaining reference to Mnemonic from DirectSecp256k1HdWallet (#6083)
* feat: expose obtaining reference to Mnemonic from DirectSecp256k1HdWallet

* updated getters for stringified mnemonic
2025-10-31 08:57:07 +00:00
Georgio Nicolas 63f9a856fa Another offering for Clippy 2025-10-31 08:57:07 +00:00
Georgio Nicolas c068948c62 Offerings for clippy 2025-10-31 08:57:06 +00:00
Georgio Nicolas 0105f9fa5e Precompute BSGS table 2025-10-31 08:57:06 +00:00
Georgio Nicolas 004c737965 Use LazyLock to precompute generators 2025-10-31 08:57:06 +00:00
Georgio Nicolas 549121ca32 Fix clippy suggestion 2025-10-31 08:57:06 +00:00
Georgio Nicolas 3f2278dafc Fix zeroization 2025-10-31 08:57:06 +00:00
Georgio Nicolas 25ce0ac814 replace unsafe static values by function calls 2025-10-31 08:57:06 +00:00
Mark Sinclair 38d313a101 ns-api: add descriptions to dVPN gateway responses (#6102)
* ns-api: add descriptions to dVPN gateway responses

* clippy

* fmt

---------

Co-authored-by: Mark Sinclair <mmsinclair@users.noreply.github.com>
2025-10-31 08:57:06 +00:00
import this f67bc0ead5 [DOCs/operators] QUIC deployment script & docs (#6098)
* add quic_bridge_deployment.sh

* create a snippet with quick install steps

* add quic deployment to changelog

* add quic deployment to node config page

* add version compatibility callout

* last edits and scraped stats update

* correct name of QUIC snippet

* fix naming

* fix naming

* re-run python-prebuild.sh aka time-now updated

* attempt to fix vercel build the hard way

* rerun npm

* build with pnpm

* restore lock file and rebuild w pnpm

* chore: update pnpm lockfile

* attempt to fix build

* attempt to fix runtime builds

* update ci-docs run OS
2025-10-31 08:57:06 +00:00
Mark Sinclair 90aaa3572d Update ci-docs.yml 2025-10-31 08:57:06 +00:00
Mark Sinclair ecc61e4a4a NS API: use new probe download filesize and milliseconds field (#6097)
* use milliseconds field

* change score thresholds

* bump to version 4.0.8

* NS API: adjust score categories (#6103)

* testing scores

* test version

* Update Cargo.toml

---------

Co-authored-by: Mark Sinclair <mmsinclair@users.noreply.github.com>
Co-authored-by: benedetta davico <46782255+benedettadavico@users.noreply.github.com>
2025-10-31 08:57:06 +00:00
Jędrzej Stuczyński 22ac4919e5 bugfix: testnet manager 02sql migration (#6096) 2025-10-31 08:57:06 +00:00
Jędrzej Stuczyński a1e7cc8e87 chore: remove unnecessary closure in 'calculate_score' inside node-status-api 2025-10-31 08:57:06 +00:00
Mark Sinclair 57df00637c ns-api: use download files size from probes instead of parsing filenames 2025-10-31 08:57:06 +00:00
Jędrzej Stuczyński c7eb3bdb7b moved nym-gateway-probe to monorepo and updated rust-edition to 2024 (#6094)
dont build netstack in CI

additional rust 2024 fixes

fixes

removed temp.rs

first round of cleanup

removed duplicated NS types

moved gateway probe to the monorepo
2025-10-31 08:57:05 +00:00
Mark Sinclair 8f9b704541 ns-api: add new fields for probe output for query_metadata and download file size and duration in ms (#6091)
Co-authored-by: Mark Sinclair <mmsinclair@users.noreply.github.com>
2025-10-31 08:56:49 +00:00
Mark Sinclair 6a956c790a NS API: clamp load to offline when score is offline and add mixnet_score field to preformance_v2 (#6076)
* ns-api: when `score` is `Offline`, clamp `load` to `Offline`

* ns-api: bump version

* ns-api: add mixnet score field to performance_v2 struct

---------

Co-authored-by: Mark Sinclair <mmsinclair@users.noreply.github.com>
2025-10-31 08:56:49 +00:00
mfahampshire 2ff5c7221a Max/fix wasm client + build commands (#6043)
* Debug logging 

* Yield based logging

* Reintroduce non-dummy task manager, try add counting for
BatchMessageSender, a couple of compiler target introductions on use
statements.

* Fixed time runtime err

* Uncomment forgetme/rememberme

* remove diffs from debug

* missed commented out forgetme

* yet more forgetme comments

* * Added missing clientreqestsender clone to wasm client to stop
  premature drop & busyloop
* Removed hacky mem::forget fix

* Remove debug panic_hook

* Conditional import + use of wasm_utils::console_log

* add wasm_util dep

* Commenting out or removing debug logging

* Remove missed comment

* cleanup gitignore

* clippy

* update go version in ci

* removed unused deps

* add clippy ignore

* remove mixfetch from ci build

* add minifetch fix

* comment out unused ts builds

* stop contract clients killing ci for the moment

* wasm target locking for imports

* Either remove console_log! macro or introduce cfg(debug_assertions)

* downgrade netlink

* debug assertions for console_log import

* modify config logging (debug -> normal)

* remove clone for client_request_sender + grab directly in struct
  creation

* reintroduce debug print for config in debug mode

* remove ood / unused custom topology from worker example file

* clippy

* clippy - ignore todo() tests

* modified humantime test in line with new parsing rules
2025-10-31 08:56:49 +00:00
benedetta davico 2235a6e147 Merge pull request #6113 from nymtech/release/2025.18-jarlsberg
Merge release/2025.18-jarlsberg to master
2025-10-15 10:22:16 +02:00
benedettadavico db6defa122 update changelog 2025-10-14 12:07:26 +02:00
Jędrzej Stuczyński df7768dec0 Bugfix/bloomfilters purge (#6089)
* remove all old bloomfilters upon starting binary

* remove old bloomfilter file upon purging secondary data
2025-10-06 14:02:32 +01:00
benedettadavico f3a449b7cc bump versions 2025-10-06 14:38:00 +02:00
benedetta davico 48c06545ab Merge pull request #6087 from nymtech/serinko/autorun-callout-msg 2025-10-05 12:15:23 +02:00
serinko f53e5fe8dd add a quick start message 2025-10-05 11:48:49 +02:00
Jędrzej Stuczyński fc98c497b4 feat: DKG contract method for updating announce address (#6050)
* added new dkg execute methods for ownership transfer and announce address update

* cherry-pick TestableNymContract for the dkg contract from #5091

* tests

* schema fixes

* removed old queued migrations
2025-10-02 17:19:03 +01:00
benedetta davico cf21593ffa Merge pull request #6080 from nymtech/release/2025.17-isabirra
Merge release/2025.17-isabirra to master
2025-10-02 16:06:41 +02:00
benedetta davico 92a88cdf9a Merge pull request #6079 from nymtech/release/2025.17-isabirra
Release/2025.17 isabirra
2025-10-02 16:00:53 +02:00
Bogdan-Ștefan Neacşu 026d3a6466 Get wireguard keypair as arg instead of reading it from disk (#6078)
* Get wireguard keypair as arg instead of reading it from disk

* Move keypair out of NymNode

* Remove legacy auth client
2025-10-02 16:27:48 +03:00
import this 53c4fde314 Hotfix: Update API source in node ping tester script (#6082) 2025-10-02 12:53:51 +00:00
Simon Wicky 3f55e62764 ci fixes
(cherry picked from commit caf40e7a37)
2025-10-02 14:05:43 +02:00
import this 00cc54f5c3 [DOCs/operators]: Release notes 2025.17-isabirra & New tools documentation (#6081)
* initialise release update notes

* add api changes

* create tools page and document nym-node-cli usage

* syntax fix

* document cmd tools

* add tools and ufw command to changelog

* add ufw 51830 to nym-node ports snippet

* ready for review except missing version hash info

* finished - ready for review

* add spectre delegation wizzard
2025-10-02 11:56:12 +00:00
import this c1904840e1 Feature: Node rewards tracker (#6064) 2025-10-02 08:52:46 +00:00
benedetta davico c652e3bdcd Benny/ci contract fix (#5962)
* use different runner

* Update Makefile

* Update Makefile

* Update ci-contracts-upload-binaries.yml

change to dtolnay

* Update ci-contracts-upload-binaries.yml

allow features alloc

* Update ci-contracts-upload-binaries.yml

try a specific cosmwasm-check

* Update ci-contracts-upload-binaries.yml

temp disable - until the right cosm check is found

* try new runner

* remove version check

* try to dockerize

* test

* remove rust install

* test

* change runner

* .

* set cargo path

* set path

* diff image

* error

* set path

* .

* aah

* .

* remove singlepass feature

* change runner

* Update ci-contracts-upload-binaries.yml

---------

Co-authored-by: Tommy Verrall <60836166+tommyv1987@users.noreply.github.com>
2025-10-02 09:15:48 +01:00
benedetta davico f9844416df Update ci-contracts.yml 2025-10-01 12:45:18 +02:00
benedetta davico bbea2ff9e9 Add nym-node binary 2025-10-01 12:06:07 +02:00
Simon Wicky 4acaec48b4 update runner for nym stats api build (#6077) 2025-10-01 09:48:16 +02:00
Simon Wicky 51779c06a4 Registration Client (#6059)
* removing wg-gateway-client

* bandwidth_provider trait

* authenticator client

* adapt ip-packet-client

* nit

* registration_client

* accomodate new shutdown and bugfix

* sdk changes

* cleanup and shutdown management

* remove credential mode

* error cleanup

* better error handling

* removing useless cover traffic delay

* wasm client stuff

* cfg unix

* more wasm stuff

* change authenticator client to not be blocked by mixnet client
2025-09-30 15:50:04 +02:00
import this 5cc650e901 Feature: Ping probe all nodes /described from a server (#6074)
* initialise test-nodes-pings.sh

* add retry
2025-09-30 13:30:54 +00:00
Simon Wicky a7ec178c9f [Stats API] Add flat table to stats API (#6073)
* add flat table to stats API

* remove day column
2025-09-30 14:30:05 +02:00
benedetta davico 4e97a2f871 Update push-credential-proxy.yaml 2025-09-30 12:03:04 +02:00
benedetta davico 5fbfc21fb2 Bump cred proxy version 2025-09-30 11:22:18 +02:00
benedettadavico 3d45801bb7 Fix swagger v2 endpoint 2025-09-30 10:26:55 +02:00
benedettadavico 3aea9f127b Update changelog 2025-09-30 10:23:55 +02:00
benedetta davico a26ff644cc Update mainnet.rs 2025-09-29 19:16:49 +02:00
Mark Sinclair a0e37e78e2 Node Status API: add bridge information to dVPN endpoint (#6069)
* ns api: add node scraper for bridge information and add to dVPN gateway output

* extra error reporting

* run sqlx-prepare

* fix clippy

---------

Co-authored-by: Mark Sinclair <mmsinclair@users.noreply.github.com>
Co-authored-by: benedettadavico <benedetta.davico@gmail.com>
2025-09-29 16:27:10 +01:00
Jędrzej Stuczyński b3d02e3ba7 feat: NS ticket faucet (#6047)
* ns-api: remove sqlite support

ns-api: add env var to skip migrations for local dev

ns-api: tidy up imports

ns-api: fix deserialisation fo node descriptions

update dockerfile

update README

fix up README and example env

ns-api: bump major version to 4

ns-api: add more geoip data and new performance field in dvpn responses

* ability to import partial ticketbooks

* wip: adding common ecash state to NS API

* buffering ticketbooks

* wip

* distribute tickets when getting testrun assignment

* passing ticketbook data to gateway probe

* wrapped around storage tx

* ticketbook query fixes

* clippy

* modified testrun assignment to always return tickets

* Update version

* Update push-node-status-agent.yaml

* Update Cargo.toml

* add entrypoint for ns agents

* sqlx prepare and cargo fmt

* clippy fixes

* Update ci-check-ns-api-version.yml

---------

Co-authored-by: Mark Sinclair <mmsinclair@users.noreply.github.com>
Co-authored-by: benedetta davico <46782255+benedettadavico@users.noreply.github.com>
Co-authored-by: benedettadavico <benedetta.davico@gmail.com>
2025-09-29 14:53:15 +01:00
Mark Sinclair f5b5177073 Update push-node-status-api.yaml 2025-09-26 20:14:30 +01:00
Simon Wicky a29df08463 frontdoor typo fix (#6067) 2025-09-26 12:06:40 +02:00
Simon Wicky 6a028417ad [chore] Clippy fix (#6060)
* clippy multiple of fix

* removed dead code?

* huh?

* ci fixes
2025-09-26 11:58:59 +02:00
benedetta davico 4cd4dc2d1c Merge pull request #6065 from nymtech/release/2025.17-isabirra 2025-09-26 10:34:09 +02:00
Jack Wampler 983cba21ba Bridge proto client params in Self-Described (#6035) 2025-09-25 11:24:21 -06:00
benedetta davico b9fb2c4e0a Merge pull request #6062 from nymtech/benny/gw-fixes
Bugfix | Fix the registration handshake
2025-09-24 15:28:15 +02:00
Simon Wicky 7bcd3fe754 fixy fix 2025-09-24 15:14:42 +02:00
import this eeb0278d13 Bugfix: Nym node CLI download nym-node exception (#6058)
* dowloand nym-node script fix

* ready for review

* ready for review

* fix landing page flow

* fix landing page flow
2025-09-24 12:49:48 +00:00
benedettadavico 7147ba56e2 adding log 2025-09-24 14:40:53 +02:00
benedetta davico 7bf5553fd1 Update ci-build-upload-binaries.yml 2025-09-24 14:15:23 +02:00
benedettadavico 810b0628bb testing fixes 2025-09-24 14:06:42 +02:00
Drazen Urch 8d28016e08 Run SM with cancel_on_panic (#6054) 2025-09-23 10:51:13 +02:00
Mark Sinclair fb0b55d540 Node Status API: remove sqlite support (#6004)
* ns-api: remove sqlite support

ns-api: add env var to skip migrations for local dev

ns-api: tidy up imports

ns-api: fix deserialisation fo node descriptions

update dockerfile

update README

fix up README and example env

ns-api: bump major version to 4

ns-api: add more geoip data and new performance field in dvpn responses

* ns-api: polyfill dVPN probe outcomes to make compatible with existing clients

* Use explicit transaction for testrun status change (#6046)

* Use explicit transaction for testrun status change

* Improve run scripts

* Skip locked rows

* bump version 4.0.2

* Fix build.rs

* Fix up .sqlx queries

* Bump agent version and change dockerfile to run the agent in a loop

* Make time between agents configurable by env var SLEEP_TIME

* Update entrypoint.sh

* Update Dockerfile with full path

* Force bigint to avoid postgres numeric cast

* Add override args to agent entry point, bump agent version and NS API version

---------

Co-authored-by: Mark Sinclair <mmsinclair@users.noreply.github.com>
Co-authored-by: dynco-nym <173912580+dynco-nym@users.noreply.github.com>
2025-09-19 17:00:54 +01:00
import this 1bb973e4a7 Feature: Nym node html landing page (#6053)
* add proper landing page and hook it to node autorun

* Update nym-node version

---------

Co-authored-by: benedetta davico <46782255+benedettadavico@users.noreply.github.com>
Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
2025-09-19 13:15:16 +00:00
benedetta davico f0d8dabb9f Merge pull request #6042 from nymtech/release/2025.16-halloumi
Merge release/2025.16-halloumi to master
2025-09-17 14:20:19 +02:00
Drazen Urch aa1cad4422 Transparent ShutdownManager with cascading ShutdownTrackers (#6040)
* Idea for transparent ShutdownManager use

* Tracker hierarchies

* Fix wasm shutdown, convinience shutdown method
2025-09-17 12:51:00 +02:00
benedettadavico e388e67357 bump versions 2025-09-17 12:40:40 +02:00
benedetta davico 44ac5e1ced Merge pull request #6044 from nymtech/feature/merge-halloumi
Merge release/2025.16-halloumi to develop using separated branch
2025-09-17 11:54:08 +02:00
Bogdan-Ștefan Neacşu 78d3d78a8c Merge remote-tracking branch 'origin/develop' into release/2025.16-halloumi 2025-09-17 12:24:40 +03:00
Bogdan-Ștefan Neacşu 05734e6fe9 Revert "Backport metadata endpoint (#6010)"
This reverts commit d984d085a7.
2025-09-17 12:21:48 +03:00
import this d2d90160be [DOCs/operators]: Release notes for v2025.16-halloumi (#6039) 2025-09-16 11:38:07 +00:00
benedetta davico 6731b89714 update rust version 2025-09-16 11:41:49 +02:00
benedetta davico 55aea37b89 typo 2025-09-16 11:29:32 +02:00
Jędrzej Stuczyński 748478c89e chore: made http-api-client-macro doctest compile (#6037) 2025-09-16 09:58:09 +01:00
Simon Wicky 1286842c6d convenience for ShutdownTracker (#6038) 2025-09-16 09:50:52 +01:00
Jędrzej Stuczyński b6213bc016 chore: remove legacy nodes from nym api [and kinda-ish from node status api] (#6021)
* remove [most of] legacy data from nym-api endpoints

* chore: removed contamination with legacy nodes data

* added /v1/nym-nodes/stake-saturation/{node_id}

* added /v1/legacy/mixnodes and /v1/legacy/gateways

* removed scraping of legacy mixnodes in NS api

* remove export of removed types

* huge warnings on attempting to use removed commands in the wallet

* fixed reference to removed type in tests
2025-09-16 09:05:29 +01:00
benedettadavico 737c4d79e0 update changelog 2025-09-16 09:16:40 +02:00
benedetta davico b105e5a15d TEMP remove nym-node from publishing 2025-09-16 09:15:19 +02:00
Drazen Urch 90e9e3cff8 Domain fronting integration (#5974)
* feat: unify HTTP client creation and enable domain fronting

Enhanced the base nym_http_api_client to reduce fragmentation and enable domain fronting:

- Added SerializationFormat enum for explicit JSON/bincode choice (no auto-detection)
- Added from_network() method to create clients from NymNetworkDetails with domain fronting
- Added with_bincode() builder method for explicit serialization configuration
- Set Accept header based on serialization preference
- Added deprecation paths for NymApiClient wrapper and nym_api::Client re-export
- Enabled domain fronting support via network defaults feature

This is part of a broader effort to consolidate HTTP client implementations across the codebase,
reducing ~500 lines of wrapper code and providing automatic domain fronting for censorship resistance.

* feat: migrate NymApiClient usage to unified HTTP client

- Wire up domain fronting configuration in NymNetworkDetails
- Implement NymApiClientExt trait for base nym_http_api_client::Client
- Migrate direct NymApiClient usage in multiple components:
  - nym-network-monitor
  - verloc measurements
  - connection tester
  - coconut/ecash client
  - validator rewarder
- Add Copy derive to ApiUrlConst to enable iteration
- Update error handling and Display implementations

This enables automatic domain fronting for all Nym API calls via the configured CDN front hosts.

* fix: resolve all compilation errors after NymApiClient migration

- Add missing nym-http-api-client dependencies to multiple crates
- Add NymApiClientExt trait imports where needed
- Fix type mismatches from NymApiClient to unified Client
- Add error conversions for NymAPIError in various error enums
- Implement missing trait methods (get_current_rewarded_set, get_all_basic_nodes_with_metadata, get_all_described_nodes)
- Fix type conversions for RewardedSetResponse in network monitor
- Update all API client instantiation to use new unified HTTP client

* feat: complete migration to unified HTTP client and fix all compilation errors

- Added missing NymApiClientExt trait methods (get_all_expanded_nodes, change_base_urls)
- Fixed all compilation errors across the workspace
- Updated nym-node to use unified client instead of deprecated NymApiClient
- Fixed type conversions for RewardedSetResponse → EpochRewardedSet
- Added nym-http-api-client dependency where needed
- Updated all examples and documentation to use new client API

* fix: provide all API URLs for automatic failover in endpoint rotation

Previously, when rotating API endpoints, only a single URL was provided to the
HTTP client, defeating the purpose of having multiple URLs for resilience.

Changes:
- NymApiTopologyProvider now provides all URLs in rotated order when switching endpoints
- NymApisClient similarly provides all URLs starting from the working endpoint
- Added clarifying comments for broadcast/exhaustive query methods where single URLs are intentionally used
- This enables the HTTP client's built-in failover mechanism while maintaining endpoint rotation behavior

The fix ensures that if the primary endpoint fails, the client can automatically
failover to alternative endpoints without manual intervention, improving overall
network resilience.

* Update common/client-core/src/client/base_client/mod.rs

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>

* Remove error generics, address PR comments

* Explicit warning on missing fronting configuration

* Assorted CI fixes

* Registry proc-macro

* Rename macro

* Syn workspace version

* Where do we need to put inventory

* Ergonomics and call sites, incept the builder

* fix: Address critical issues in client configuration registry implementation

- Fixed HeaderMapInit parsing bug that would cause compilation errors
- Added comprehensive documentation with usage examples and DSL reference
- Improved error handling with better error messages for invalid headers
- Added test coverage for both macro and registry functionality
- Added debug inspection capabilities for registered configurations
- Fixed module name conflicts in tests by using separate modules

All tests now passing:
- 7 macro tests validating DSL parsing and code generation
- 4 registry tests verifying configuration collection and application

* Use default value for the ports until api is deployed

* Feature/improved http error (#6025)

* use display impl for urls

* feat: attempt to add more details to reqwest errors

* temporarily restored GenericRequestFailure variant

* another restoration

* cleanup

* Some debug tooling, and default timeout fix

* Fix user-agent override

* Fix various wasm things

---------

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
Co-authored-by: Bogdan-Ștefan Neacşu <bogdan@nymtech.net>
2025-09-15 14:32:15 +02:00
benedetta davico 87a188ca06 Bump cred-proxy version 2025-09-11 14:28:52 +02:00
Jędrzej Stuczyński 0ee387d983 Feature/cancellation migration (#6014)
* squashing work on using cancellation in nym crates

making nym-task wasm compilable

removed sending of status messages

replaced TaskManager with ShutdownManager in the validator rewarder

additional helpers for ShutdownManager

simplified ShutdownToken by removing the name field

TaskClient => ShutdownToken within all client tasks

wip: remove TaskHandle

* track all long-living client tasks

* add task tracking for most top level tasks within nym-node

* improved default builder

* split up cancellation module

* module documentation and unit tests

* nym node fixes and naming consistency

* wasm fixes

* assert_eq => assert

* wasm fixes and made 'run_until_shutdown' take reference instead of ownership

* linux-specific fixes to IpPacketRouter

* post rebasing fixes for signing monitor

* add ShutdownManager constructor to build it from an external token

* applying PR review suggestions
2025-09-10 13:56:39 +01:00
Jędrzej Stuczyński d3cdaf373b Feature/credential proxy crate (#6018)
* moved storage and deposits buffer to the common lib

* move more of the state into the shared lib

* extracted the rest of the features into the shared lib

* fixed test imports

* clippy
2025-09-10 09:28:38 +01:00
Jędrzej Stuczyński 7c5f10a219 refresh mixnet contract on epoch progression (#6023) 2025-09-09 09:59:54 +01:00
Simon Wicky f90fc4f2f0 Moving clients crate from vpn-client repo to here (#6015)
* moving crates as is

* changes due to crate moving

* cargo fmt
2025-09-08 10:50:18 +02:00
Jędrzej Stuczyński e95aca715c feat: use ShutdownToken (CancellationToken inside) for nym-api (#5997)
* make nym-api use ShutdownToken instead of TaskClient

* ignore public-api tests if env is not set

* removed default features to avoid pulling in openssl
2025-09-08 09:45:28 +01:00
benedettadavico 4d0898c633 bump versions 2025-09-08 09:30:59 +02:00
Bogdan-Ștefan Neacşu d984d085a7 Backport metadata endpoint (#6010)
* Wireguard private metadata (#5915)

* Wireguard metadata client library (#5943)
2025-09-05 11:14:37 +03:00
Bogdan-Ștefan Neacşu 8e7d1d510d Use default value for the ports until api is deployed (#6007) 2025-09-04 15:55:56 +03:00
import this 4062734a31 [DOCs/operators]: NIP-2 tokenomics update & fix csv2md bug (#6008) 2025-09-04 11:29:44 +02:00
import this ccd8ff26a3 Feature: Delegation program stake checker and adjuster (#5980)
* initialise stake adjustment program

* add readme file with a simple guide

* syntax

* syntax

* FINISHED: faster and returning more data

* change dwl link to develop branch
2025-09-03 16:06:06 +00:00
import this 43d043a9cd Feature: Nym node autorun CLI (#5916)
* initial commit - add prereqs install script

* add env vars prompt

* automate latest binary url env var

* add install node script

* add modes to nym-node install script

* start main cli framework

* adding branch var for easier deployment and testing

* add systemd config

* add proxy and wss setup script

* add landing page stub and fix nginx script

* add nginx setup

* fix typo

* add checks for existing dir and wg prompt

* add nginx commands

* add service file check

* add service file check

* convention alignment

* add checks to nginx setup

* cleanup old code

* add bonding prompt and nym node run fns

* fix syntax

* fix syntax

* fix syntax

* fix syntax

* fix syntax

* fix syntax

* fix syntax

* fix syntax

* add service script to init

* fix syntax

* fix syntax

* add chmod

* fix script logic

* syntax fix

* syntax fix

* silent mode trial

* fix evn prompt script

* make scripts interactive

* indent fix

* correct node-install script

* initial mixnode setup working - gws need more love

* fix bonding function

* syntax fix

* improve run noide as service script

* improve service script

* improve run service fn

* fix logic

* beautify

* beautify

* create run node as service script

* syntax fix

* attempt to resolve memory running out issue

* attempt to resolve memory running out issue

* attempt to resolve memory running out issue

* attempt to resolve memory running out issue

* attempt to resolve memory running out issue

* attempt to resolve memory running out issue

* attempt to resolve memory running out issue

* attempt to resolve memory running out issue

* setting wireguard

* solved memory issues

* rename landing page template

* modify wireguard enabled fn

* layout change

* syntax fix

* modify node setup script

* sync up envs

* return missing function

* fix urls

* fix network manager script execution

* fix wss and nginx

* fix layout

* tweak WG contion

* syntax fix

* add init placeholder

* syntax fix

* redefine wireguard check logic

* check if node exists

* add argparse and dev option

* styling

* add panic

* add error message

* improve logic

* improve logic

* add arg

* add dev arg for all levels

* add confirmation loop

* styling

* fix bonding question

* syntax edit

* syntax edit

* syntax edit

* refactor for already bonded nodes

* add default branch on top and define metavar

* fix node install script

* clean and prepare for review

* indentation fix

* fix nginx setup

* fix nginx setup

* style cleanup

* fix try error logic

* tune --dev option to run before command correctly

* fix y/n convention across the modules

* add explorer URL to the message

* minor layout fixes
2025-09-02 20:34:24 +00:00
Drazen Urch 3d6cf730c2 NS-API: Cast to BIGINT to make i64 work (#6003) 2025-09-02 18:35:25 +01:00
Jędrzej Stuczyński c0f8d98b63 bugfix: return from MixTrafficController if client request channel has closed (#6002) 2025-09-02 10:23:25 +01:00
Jędrzej Stuczyński 91995da4f1 chore: use updated version of simulate endpoint (#5988) 2025-09-02 10:12:52 +01:00
Jędrzej Stuczyński 01fa1df66c feat: shared library for attempting to retrieve update mode attestation (#5954)
* feat: shared library for attempting to retrieve update mode attestation

* clippy

* add nym- prefix to the crate name

* use pure-rust impl for jwt-simple
2025-09-02 09:28:32 +01:00
Jędrzej Stuczyński baddaaac22 feat: nym signers monitor (#5933)
* initialise nym-signers-monitor

* creating nyxd client

* performing checks

* sending notifications on failure

* rate limitting on notifications + clippy
2025-09-02 09:27:09 +01:00
elsirion 2c4b5f168b fix: use WASM compatible time API in client (#5948) 2025-09-02 09:26:06 +01:00
Jędrzej Stuczyński 2e22cad074 bugfix: make sure tables are removed in correct order to not trigger FK constraint issue (#5987) 2025-08-29 09:02:58 +01:00
benedetta davico f105bcbafe Merge pull request #5968 from nymtech/release/2025.15-gruyere
merge gruyere to master
2025-08-21 12:20:35 +02:00
benedetta davico dc0f4af2c1 Merge pull request #5937 from nymtech/release/2025.14-feta 2025-08-13 11:12:19 +02:00
benedetta davico 2a621e07a8 Merge pull request #5907 from nymtech/release/2025.13-emmental
Merge release/2025.13-emmental to master
2025-07-22 16:23:44 +02:00
benedetta davico 485aeebabd Merge pull request #5886 from nymtech/release/2025.12-dolcelatte
Merge release/2025.12-dolcelatte to master
2025-07-09 15:25:16 +02:00
benedetta davico 3b726bada9 Merge pull request #5839 from nymtech/release/2025.11-cheddar
merge release/2025.11-cheddar to master
2025-06-11 13:09:43 +02:00
benedetta davico 1d1b2e17d2 Merge pull request #5807 from nymtech/release/2025.10-brie 2025-05-28 09:38:15 +02:00
benedetta davico b5b2dbdfd8 Merge pull request #5776 from nymtech/release/2025.9-appenzeller
Release/2025.9-appenzeller to master
2025-05-16 13:23:10 +02:00
benedetta davico 82806f47d8 Merge pull request #5735 from nymtech/release/2025.8-tourist
Merge release/2025.8-tourist to master
2025-05-05 12:11:39 +02:00
benedetta davico c6f85cf23e Merge pull request #5727 from nymtech/release/2025.7-tex
Merge tex to master
2025-04-22 10:50:43 +02:00
benedetta davico ed8de7234d Merge pull request #5672 from nymtech/release/2025.6-chuckles
Merge release/2025.6-chuckles into master
2025-04-02 10:34:51 +02:00
benedetta davico e25d83b047 Merge pull request #5641 from nymtech/release/2025.5-chokito
Merge chokito to master
2025-03-24 10:14:50 +01:00
Jędrzej Stuczyński 9974d480b5 Merge pull request #5574 from nymtech/release/2025.4-dorina-patched
Release/2025.4-dorina-patched to master
2025-03-11 10:37:06 +00:00
benedetta davico 2211f13cdd Merge pull request #5551 from nymtech/release/2025.4-dorina
Merge release/2025.4-dorina to master
2025-03-04 13:55:27 +01:00
benedetta davico 4505f18a02 Merge pull request #5485 from nymtech/release/2025.3-ruta
Release/2025.3 ruta to master
2025-02-18 10:08:08 +01:00
benedetta davico a717a18948 Merge pull request #5430 from nymtech/release/2025.2-hu
Merge release/2025.2-hu to master
2025-02-06 13:58:55 +01:00
987 changed files with 45519 additions and 25415 deletions
@@ -21,7 +21,7 @@ jobs:
strategy:
fail-fast: false
matrix:
platform: [ arc-ubuntu-22.04 ]
platform: [ arc-linux-latest ]
runs-on: ${{ matrix.platform }}
env:
+1 -1
View File
@@ -9,7 +9,7 @@ on:
jobs:
wasm:
runs-on: arc-ubuntu-22.04
runs-on: arc-linux-latest
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
+10 -1
View File
@@ -81,12 +81,21 @@ jobs:
command: fmt
args: --all -- --check
- name: Clippy
- name: Clippy (macos)
if: contains(matrix.os, 'mac')
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace --all-targets --exclude nym-gateway-probe -- -D warnings
- name: Clippy (non-macos)
if: contains(matrix.os, 'linux') || contains(matrix.os, 'windows')
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace --all-targets -- -D warnings
- name: Build all binaries
uses: actions-rs/cargo@v1
with:
@@ -10,7 +10,7 @@ env:
jobs:
check-if-tag-exists:
runs-on: arc-ubuntu-22.04-dind
runs-on: arc-linux-latest-dind
steps:
- name: Checkout repo
uses: actions/checkout@v4
@@ -11,7 +11,7 @@ jobs:
strategy:
fail-fast: false
matrix:
platform: [ arc-ubuntu-22.04 ]
platform: [ arc-linux-latest-dind ]
runs-on: ${{ matrix.platform }}
env:
@@ -28,18 +28,11 @@ jobs:
mkdir -p $OUTPUT_DIR
echo $OUTPUT_DIR
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: stable
target: wasm32-unknown-unknown
override: true
- name: Build contracts
run: make optimize-contracts
- name: Install cosmwasm-check
run: cargo install cosmwasm-check
- name: Build release contracts
run: make publish-contracts
- name: Check optimized contracts
run: make docker-check-contracts
- name: Prepare build output
shell: bash
+2 -2
View File
@@ -17,7 +17,7 @@ jobs:
build:
# since it's going to be compiled into wasm, there's absolutely
# no point in running CI on different OS-es
runs-on: ubuntu-22.04
runs-on: arc-linux-latest
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
@@ -54,7 +54,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --lib --manifest-path contracts/Cargo.toml
args: --lib --manifest-path contracts/Cargo.toml --all-features
- name: Check formatting
uses: actions-rs/cargo@v1
+1 -1
View File
@@ -10,7 +10,7 @@ on:
jobs:
build:
runs-on: arc-ubuntu-22.04
runs-on: arc-linux-latest
env:
RUSTUP_PERMIT_COPY_RENAME: 1
defaults:
+1 -1
View File
@@ -11,7 +11,7 @@ on:
jobs:
build:
runs-on: arc-ubuntu-22.04
runs-on: arc-linux-latest
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
+7 -7
View File
@@ -4,14 +4,14 @@ on:
workflow_dispatch:
pull_request:
paths:
- 'wasm/**'
- 'clients/client-core/**'
- 'common/**'
- '.github/workflows/ci-sdk-wasm.yml'
- "wasm/**"
- "clients/client-core/**"
- "common/**"
- ".github/workflows/ci-sdk-wasm.yml"
jobs:
wasm:
runs-on: arc-ubuntu-22.04
runs-on: arc-linux-latest
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
@@ -33,7 +33,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.23.7"
go-version: "1.24.6"
- name: Install wasm-pack
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
@@ -41,7 +41,7 @@ jobs:
- name: Install wasm-opt
uses: ./.github/actions/install-wasm-opt
with:
version: '116'
version: "116"
- name: Install wasm-bindgen-cli
run: cargo install wasm-bindgen-cli
+2 -2
View File
@@ -21,7 +21,7 @@ jobs:
fail-fast: false
matrix:
include:
- os: arc-ubuntu-22.04
- os: arc-linux-latest
target: x86_64-unknown-linux-gnu
runs-on: ${{ matrix.os }}
@@ -56,7 +56,7 @@ jobs:
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: 1.86.0
toolchain: 1.88.0
override: true
- name: Build all binaries
+1 -1
View File
@@ -8,7 +8,7 @@ env:
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
@@ -20,7 +20,7 @@ env:
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
+2 -2
View File
@@ -14,7 +14,7 @@ env:
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
@@ -69,6 +69,6 @@ jobs:
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile-pg . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
@@ -8,7 +8,7 @@ env:
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
+164
View File
@@ -4,6 +4,170 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2025.18-jarlsberg] (2025-10-14)
- ns-api: add descriptions to dVPN gateway responses ([#6102])
- NS API: use new probe download filesize and milliseconds field ([#6097])
- ns-api: use download files size from probes instead of parsing filenames ([#6095])
- ns-api: add new fields for probe output for query_metadata and download file size and duration in ms ([#6091])
- Bugfix/bloomfilters purge ([#6089])
- Hotfix: Update API source in node ping tester script ([#6082])
- Get wireguard keypair as arg instead of reading it from disk ([#6078])
- Feature: Ping probe all nodes /described nodes from a server ([#6074])
- Node Status API: add bridge information to dVPN endpoint ([#6069])
- frontdoor typo fix ([#6067])
- Feature: Node rewards tracker ([#6064])
- [chore] Clippy fix ([#6060])
- Registration Client ([#6059])
- Bugfix: Nym node CLI download nym-node exception ([#6058])
- Feature: Nym node html landing page ([#6053])
- feat: DKG contract method for updating announce address ([#6050])
- feat: NS ticket faucet ([#6047])
- Bridge proto client params in Self-Described ([#6035])
- Node Status API: remove sqlite support ([#6004])
- Benny/ci contract fix ([#5962])
[#6102]: https://github.com/nymtech/nym/pull/6102
[#6097]: https://github.com/nymtech/nym/pull/6097
[#6095]: https://github.com/nymtech/nym/pull/6095
[#6091]: https://github.com/nymtech/nym/pull/6091
[#6089]: https://github.com/nymtech/nym/pull/6089
[#6082]: https://github.com/nymtech/nym/pull/6082
[#6078]: https://github.com/nymtech/nym/pull/6078
[#6074]: https://github.com/nymtech/nym/pull/6074
[#6069]: https://github.com/nymtech/nym/pull/6069
[#6067]: https://github.com/nymtech/nym/pull/6067
[#6064]: https://github.com/nymtech/nym/pull/6064
[#6060]: https://github.com/nymtech/nym/pull/6060
[#6059]: https://github.com/nymtech/nym/pull/6059
[#6058]: https://github.com/nymtech/nym/pull/6058
[#6053]: https://github.com/nymtech/nym/pull/6053
[#6050]: https://github.com/nymtech/nym/pull/6050
[#6047]: https://github.com/nymtech/nym/pull/6047
[#6035]: https://github.com/nymtech/nym/pull/6035
[#6004]: https://github.com/nymtech/nym/pull/6004
[#5962]: https://github.com/nymtech/nym/pull/5962
## [2025.17-isabirra] (2025-09-29)
- Bugfix | Fix the registration handshake ([#6062])
- Convenience for ShutdownTracker ([#6038])
- chore: made http-api-client-macro doctest compile ([#6037])
- feat: refresh mixnet contract on epoch progression ([#6023])
- chore: remove legacy nodes from nym api [and kinda-ish from node status api] ([#6021])
- Feature/credential proxy crate ([#6018])
- Moving clients crate from vpn-client repo to here ([#6015])
- Feature/cancellation migration ([#6014])
- Use default value for the ports until api is deployed ([#6007])
- bugfix: return from MixTrafficController if client request channel has closed ([#6002])
- Revert "Create an axum_test client for more integrated unit testing (… ([#5999])
- chore: upgraded syn to 2.0 and removed nym-execute ([#5998])
- feat: use `ShutdownToken` (`CancellationToken` inside) for nym-api ([#5997])
- bugfix: Recipient deserialisation for deserialisers missing bytes specialisation ([#5991])
- chore: use updated version of simulate endpoint ([#5988])
- chore: purge temp databases on build ([#5984])
- Bump sha.js from 2.4.11 to 2.4.12 ([#5983])
- Feature: Delegation program stake checker and adjuster ([#5980])
- build(deps): bump actions/setup-java from 4 to 5 ([#5975])
- Domain fronting integration ([#5974])
- chore: internal hidden command to force advance nyx epoch ([#5964])
- Create an axum_test client for more integrated unit testing ([#5956])
- feat: shared library for attempting to retrieve update mode attestation ([#5954])
- Bump slab from 0.4.10 to 0.4.11 ([#5952])
- build(deps): bump actions/first-interaction from 1 to 3 ([#5950])
- fix: use WASM compatible time API in client ([#5948])
- feat: credential proxy deposit pool ([#5945])
- build(deps): bump actions/download-artifact from 4 to 5 ([#5939])
- feat: nym signers monitor ([#5933])
- Bump console from 0.15.11 to 0.16.0 ([#5931])
- Bump mock_instant from 0.5.3 to 0.6.0 ([#5930])
- Bump tokio from 1.46.1 to 1.47.1 ([#5929])
- Bump defguard_wireguard_rs from v0.4.7 to v0.7.5 ([#5928])
- Bump indicatif from 0.17.11 to 0.18.0 ([#5924])
- Feature: Nym node autorun CLI ([#5916])
- build(deps): bump mikefarah/yq from 4.45.4 to 4.47.1 ([#5911])
- build(deps): bump pbkdf2 from 3.1.2 to 3.1.3 ([#5869])
[#6062]: https://github.com/nymtech/nym/pull/6062
[#6038]: https://github.com/nymtech/nym/pull/6038
[#6037]: https://github.com/nymtech/nym/pull/6037
[#6023]: https://github.com/nymtech/nym/pull/6023
[#6021]: https://github.com/nymtech/nym/pull/6021
[#6018]: https://github.com/nymtech/nym/pull/6018
[#6015]: https://github.com/nymtech/nym/pull/6015
[#6014]: https://github.com/nymtech/nym/pull/6014
[#6007]: https://github.com/nymtech/nym/pull/6007
[#6002]: https://github.com/nymtech/nym/pull/6002
[#5999]: https://github.com/nymtech/nym/pull/5999
[#5998]: https://github.com/nymtech/nym/pull/5998
[#5997]: https://github.com/nymtech/nym/pull/5997
[#5991]: https://github.com/nymtech/nym/pull/5991
[#5988]: https://github.com/nymtech/nym/pull/5988
[#5984]: https://github.com/nymtech/nym/pull/5984
[#5983]: https://github.com/nymtech/nym/pull/5983
[#5980]: https://github.com/nymtech/nym/pull/5980
[#5975]: https://github.com/nymtech/nym/pull/5975
[#5974]: https://github.com/nymtech/nym/pull/5974
[#5964]: https://github.com/nymtech/nym/pull/5964
[#5956]: https://github.com/nymtech/nym/pull/5956
[#5954]: https://github.com/nymtech/nym/pull/5954
[#5952]: https://github.com/nymtech/nym/pull/5952
[#5950]: https://github.com/nymtech/nym/pull/5950
[#5948]: https://github.com/nymtech/nym/pull/5948
[#5945]: https://github.com/nymtech/nym/pull/5945
[#5939]: https://github.com/nymtech/nym/pull/5939
[#5933]: https://github.com/nymtech/nym/pull/5933
[#5931]: https://github.com/nymtech/nym/pull/5931
[#5930]: https://github.com/nymtech/nym/pull/5930
[#5929]: https://github.com/nymtech/nym/pull/5929
[#5928]: https://github.com/nymtech/nym/pull/5928
[#5924]: https://github.com/nymtech/nym/pull/5924
[#5916]: https://github.com/nymtech/nym/pull/5916
[#5911]: https://github.com/nymtech/nym/pull/5911
[#5869]: https://github.com/nymtech/nym/pull/5869
## [2025.16-halloumi] (2025-09-16)
- Backport metadata endpoint ([#6010])
- bugfix: make sure tables are removed in correct order to not trigger FK constraint issue ([#5987])
- chore: move authenticator into gateway crate ([#5982])
- Fix the ns api ci workflow ([#5981])
- Remove freshness check on testrun submit ([#5977])
- Update sysinfo to the latest ([#5976])
- bugfix: manually calculate per node work on rewarded set changes ([#5972])
- fixing the ci for ns agent ([#5965])
- Feature/testing utils ([#5963])
- bugfix: fix ci-build for linux (and use updated runner) ([#5958])
- chore: updated refs to cheddar rev of nym repo ([#5955])
- http api client adjustment ([#5953])
- chore: fix rust 1.89 clippy issues ([#5944])
- Wireguard metadata client library ([#5943])
- chore: remove unused import ([#5942])
- feat: introduce additional checks when attempting to send to bounded channels ([#5941])
- Move credential verifier in peer controller ([#5938])
- change PK/FK on expiration date signatures tables ([#5934])
- Wireguard private metadata ([#5915])
[#6010]: https://github.com/nymtech/nym/pull/6010
[#5987]: https://github.com/nymtech/nym/pull/5987
[#5982]: https://github.com/nymtech/nym/pull/5982
[#5981]: https://github.com/nymtech/nym/pull/5981
[#5977]: https://github.com/nymtech/nym/pull/5977
[#5976]: https://github.com/nymtech/nym/pull/5976
[#5972]: https://github.com/nymtech/nym/pull/5972
[#5965]: https://github.com/nymtech/nym/pull/5965
[#5963]: https://github.com/nymtech/nym/pull/5963
[#5958]: https://github.com/nymtech/nym/pull/5958
[#5955]: https://github.com/nymtech/nym/pull/5955
[#5953]: https://github.com/nymtech/nym/pull/5953
[#5944]: https://github.com/nymtech/nym/pull/5944
[#5943]: https://github.com/nymtech/nym/pull/5943
[#5942]: https://github.com/nymtech/nym/pull/5942
[#5941]: https://github.com/nymtech/nym/pull/5941
[#5938]: https://github.com/nymtech/nym/pull/5938
[#5934]: https://github.com/nymtech/nym/pull/5934
[#5915]: https://github.com/nymtech/nym/pull/5915
## [2025.15-gruyere] (2025-08-20)
- Migrate strum to 0.27.2 ([#5960])
Generated
+781 -256
View File
File diff suppressed because it is too large Load Diff
+28 -19
View File
@@ -31,6 +31,7 @@ members = [
"common/client-libs/mixnet-client",
"common/client-libs/validator-client",
"common/commands",
"common/nym-common",
"common/config",
"common/cosmwasm-smart-contracts/coconut-dkg",
"common/cosmwasm-smart-contracts/contracts-common",
@@ -43,6 +44,7 @@ members = [
"common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/nym-pool-contract",
"common/cosmwasm-smart-contracts/vesting-contract",
"common/credential-proxy",
"common/credential-storage",
"common/credential-utils",
"common/credential-verification",
@@ -58,6 +60,7 @@ members = [
"common/gateway-stats-storage",
"common/gateway-storage",
"common/http-api-client",
"common/http-api-client-macro",
"common/http-api-common",
"common/inclusion-probability",
"common/ip-packet-requests",
@@ -66,6 +69,8 @@ members = [
"common/network-defaults",
"common/node-tester-utils",
"common/nonexhaustive-delayqueue",
"common/nym-cache",
"common/nym-connection-monitor",
"common/nym-id",
"common/nym-metrics",
"common/nym_offline_compact_ecash",
@@ -84,6 +89,7 @@ members = [
"common/nymsphinx/types",
"common/nyxd-scraper",
"common/pemstore",
"common/registration",
"common/serde-helpers",
"common/service-provider-requests-common",
"common/socks5-client-core",
@@ -91,11 +97,13 @@ members = [
"common/socks5/requests",
"common/statistics",
"common/store-cipher",
"common/task", "common/test-utils",
"common/task",
"common/test-utils",
"common/ticketbooks-merkle",
"common/topology",
"common/tun",
"common/types",
"common/upgrade-mode-check",
"common/verloc",
"common/wasm/client-core",
"common/wasm/storage",
@@ -111,10 +119,12 @@ members = [
"gateway",
"nym-api",
"nym-api/nym-api-requests",
"nym-authenticator-client",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-ip-packet-client",
"nym-network-monitor",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
@@ -123,6 +133,8 @@ members = [
"nym-node/nym-node-metrics",
"nym-node/nym-node-requests",
"nym-outfox",
"nym-registration-client",
"nym-signers-monitor",
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
@@ -141,7 +153,6 @@ members = [
# "tools/internal/sdk-version-bump",
"tools/internal/ssl-inject",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/internal/validator-status-check",
"tools/nym-cli",
@@ -154,6 +165,7 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"nym-gateway-probe"
]
default-members = [
@@ -172,16 +184,16 @@ default-members = [
"tools/nymvisor",
]
exclude = ["explorer", "contracts", "nym-wallet", "cpu-cycles"]
exclude = ["contracts", "nym-wallet", "cpu-cycles"]
[workspace.package]
authors = ["Nym Technologies SA"]
repository = "https://github.com/nymtech/nym"
homepage = "https://nymtech.net"
documentation = "https://nymtech.net"
edition = "2021"
edition = "2024"
license = "Apache-2.0"
rust-version = "1.81"
rust-version = "1.85"
readme = "README.md"
[workspace.dependencies]
@@ -203,7 +215,6 @@ base64 = "0.22.1"
base85rs = "0.1.3"
bincode = "1.3.3"
bip39 = { version = "2.0.0", features = ["zeroize"] }
bit-vec = "0.7.0" # can we unify those?
bitvec = "1.0.0"
blake3 = "1.7.0"
bloomfilter = "3.0.1"
@@ -231,13 +242,11 @@ criterion = "0.5"
csv = "1.3.1"
ctr = "0.9.1"
cupid = "0.6.1"
curve25519-dalek = "4.1"
dashmap = "5.5.3"
# We want https://github.com/DefGuard/wireguard-rs/pull/64 , but there's no crates.io release being pushed out anymore
defguard_wireguard_rs = { git = "https://github.com/DefGuard/wireguard-rs.git", rev = "v0.4.7" }
digest = "0.10.7"
dirs = "5.0"
doc-comment = "0.3"
dirs = "6.0"
dotenvy = "0.15.6"
dyn-clone = "1.0.19"
ecdsa = "0.16"
@@ -253,11 +262,8 @@ futures = "0.3.31"
futures-util = "0.3"
generic-array = "0.14.7"
getrandom = "0.2.10"
getset = "0.1.5"
handlebars = "3.5.5"
headers = "0.4.0"
hex = "0.4.3"
hex-literal = "0.3.3"
hickory-resolver = "0.25"
hkdf = "0.12.3"
hmac = "0.12.1"
@@ -271,20 +277,20 @@ hyper = "1.6.0"
hyper-util = "0.1"
indicatif = "0.18.0"
inquire = "0.6.2"
inventory = "0.3.21"
ip_network = "0.4.1"
ipnetwork = "0.20"
itertools = "0.14.0"
jwt-simple = { version = "0.12.12", default-features = false, features = ["pure-rust"] }
k256 = "0.13"
lazy_static = "1.5.0"
ledger-transport = "0.10.0"
ledger-transport-hid = "0.10.0"
log = "0.4"
maxminddb = "0.23.0"
mime = "0.3.17"
moka = { version = "0.12", features = ["future"] }
nix = "0.27.1"
notify = "5.1.0"
okapi = "0.7.0"
once_cell = "1.21.3"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
@@ -292,7 +298,7 @@ parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
pin-project = "1.1"
pin-project-lite = "0.2.16"
pnet_packet = "0.35.0"
publicsuffix = "2.3.0"
proc_pidinfo = "0.1.3"
quote = "1"
@@ -300,13 +306,10 @@ rand = "0.8.5"
rand_chacha = "0.3"
rand_core = "0.6.3"
rand_distr = "0.4"
rand_pcg = "0.3.1"
rand_seeder = "0.2.3"
rayon = "1.5.1"
regex = "1.10.6"
reqwest = { version = "0.12.15", default-features = false }
rs_merkle = "1.5.0"
safer-ffi = "0.1.13"
schemars = "0.8.22"
semver = "1.0.26"
serde = "1.0.219"
@@ -317,6 +320,7 @@ serde_json_path = "0.7.2"
serde_repr = "0.1"
serde_with = "3.9.0"
serde_yaml = "0.9.25"
serde_plain = "1.0.2"
sha2 = "0.10.9"
si-scale = "0.2.3"
snow = "0.9.6"
@@ -329,6 +333,7 @@ syn = "2"
sysinfo = "0.37.0"
tap = "1.0.1"
tar = "0.4.44"
test-with = { version = "0.15.4", default-features = false }
tempfile = "3.20"
thiserror = "2.0"
time = "0.3.41"
@@ -348,8 +353,10 @@ tracing-opentelemetry = "0.19.0"
tracing-subscriber = "0.3.19"
tracing-tree = "0.2.2"
tracing-indicatif = "0.3.9"
tracing-test = "0.2.5"
ts-rs = "10.1.0"
tungstenite = { version = "0.20.1", default-features = false }
typed-builder = "0.23.0"
uniffi = "0.29.2"
uniffi_build = "0.29.0"
url = "2.5"
@@ -358,6 +365,7 @@ utoipa-swagger-ui = "8.1"
utoipauto = "0.2"
uuid = "*"
vergen = { version = "=8.3.1", default-features = false }
vergen-gitcl = { version = "1.0.8", default-features = false }
walkdir = "2"
x25519-dalek = "2.0.0"
zeroize = "1.7.0"
@@ -397,18 +405,19 @@ prost = { version = "0.13", default-features = false }
# wasm-related dependencies
gloo-utils = "0.2.0"
gloo-net = "0.6.0"
gloo-timers = "0.3.0"
indexed_db_futures = "0.6.4"
js-sys = "0.3.76"
serde-wasm-bindgen = "0.6.5"
tsify = "0.4.5"
tokio_with_wasm = { version = "0.8.7" }
wasm-bindgen = "0.2.99"
wasm-bindgen-futures = "0.4.49"
wasm-bindgen-test = "0.3.49"
wasmtimer = "0.4.1"
web-sys = "0.3.76"
# for local development:
#[patch.crates-io]
#sphinx-packet = { path = "../sphinx" }
+13 -5
View File
@@ -107,16 +107,16 @@ sdk-wasm-build:
$(MAKE) -C nym-browser-extension/storage wasm-pack
$(MAKE) -C wasm/client
$(MAKE) -C wasm/node-tester
$(MAKE) -C wasm/mix-fetch
# $(MAKE) -C wasm/mix-fetch
$(MAKE) -C wasm/zknym-lib
#$(MAKE) -C wasm/full-nym-wasm
# $(MAKE) -C wasm/full-nym-wasm
# run this from npm/yarn to ensure tools are in the path, e.g. yarn build:sdk from root of repo
sdk-typescript-build:
npx lerna run --scope @nymproject/sdk build --stream
npx lerna run --scope @nymproject/mix-fetch build --stream
npx lerna run --scope @nymproject/node-tester build --stream
yarn --cwd sdk/typescript/codegen/contract-clients build
# npx lerna run --scope @nymproject/mix-fetch build --stream
# npx lerna run --scope @nymproject/node-tester build --stream
# yarn --cwd sdk/typescript/codegen/contract-clients build
# NOTE: These targets are part of the main workspace (but not as wasm32-unknown-unknown)
WASM_CRATES = extension-storage nym-client-wasm nym-node-tester-wasm zknym-lib
@@ -154,6 +154,7 @@ CONTRACTS_OUT_DIR = contracts/artifacts
#
COSMWASM_OPTIMIZER_IMAGE ?= cosmwasm/optimizer:0.17.0
COSMWASM_OPTIMIZER_PLATFORM ?= linux/amd64
COSMWASM_CHECK_IMAGE ?= rust:1.88
# Ensure clean build environment and run the optimizer
optimize-contracts:
@@ -179,6 +180,13 @@ optimize-contracts:
# Cleanup temporary artefacts directory
@rm -rf artifacts 2>/dev/null || true
# Check artifacts with cosmwasm-check inside the optimizer image
docker-check-contracts:
@docker run --rm --platform $(COSMWASM_OPTIMIZER_PLATFORM) \
-v $(CURDIR):/code --workdir /code \
--entrypoint /bin/sh \
$(COSMWASM_CHECK_IMAGE) -lc 'apt-get update && apt-get install -y --no-install-recommends llvm-dev libclang-dev pkg-config && export PATH="/usr/local/cargo/bin:/usr/local/rustup/bin:$$PATH" && cargo install cosmwasm-check --locked && WASMER_ENGINE=universal WASMER_COMPILER=singlepass cosmwasm-check contracts/artifacts/*.wasm'
wasm-opt-contracts:
@for WASM in $(WASM_CONTRACT_DIR)/*.wasm; do \
echo "Running wasm-opt on $$WASM"; \
+2 -2
View File
@@ -1,10 +1,10 @@
[package]
name = "nym-client"
version = "1.1.61"
version = "1.1.64"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
rust-version = "1.70"
rust-version = "1.85"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+15 -10
View File
@@ -11,7 +11,7 @@ use nym_client_core::client::base_client::{
BaseClientBuilder, ClientInput, ClientOutput, ClientState,
};
use nym_sphinx::params::PacketType;
use nym_task::TaskHandle;
use nym_task::ShutdownManager;
use nym_validator_client::QueryHttpRpcNyxdClient;
use std::error::Error;
use std::path::PathBuf;
@@ -29,6 +29,8 @@ pub struct SocketClient {
/// Optional path to a .json file containing standalone network details.
custom_mixnet: Option<PathBuf>,
shutdown_manager: ShutdownManager,
}
impl SocketClient {
@@ -40,6 +42,7 @@ impl SocketClient {
SocketClient {
config,
custom_mixnet,
shutdown_manager: Default::default(),
}
}
@@ -49,7 +52,7 @@ impl SocketClient {
client_output: ClientOutput,
client_state: ClientState,
self_address: &Recipient,
task_client: nym_task::TaskClient,
shutdown_token: nym_task::ShutdownToken,
packet_type: PacketType,
) {
info!("Starting websocket listener...");
@@ -57,6 +60,7 @@ impl SocketClient {
let ClientInput {
connection_command_sender,
input_sender,
..
} = client_input;
let ClientOutput {
@@ -77,24 +81,24 @@ impl SocketClient {
shared_lane_queue_lengths,
reply_controller_sender,
Some(packet_type),
task_client.fork("websocket_handler"),
shutdown_token.clone(),
);
websocket::Listener::new(
config.socket.host,
config.socket.listening_port,
task_client.with_suffix("websocket_listener"),
shutdown_token.child_token(),
)
.start(websocket_handler);
}
/// blocking version of `start_socket` method. Will run forever (or until SIGINT is sent)
pub async fn run_socket_forever(self) -> Result<(), Box<dyn Error + Send + Sync>> {
let shutdown = self.start_socket().await?;
let mut shutdown = self.start_socket().await?;
let res = shutdown.wait_for_shutdown().await;
shutdown.run_until_shutdown().await;
log::info!("Stopping nym-client");
res
Ok(())
}
async fn initialise_storage(&self) -> Result<OnDiskPersistent, ClientError> {
@@ -119,6 +123,7 @@ impl SocketClient {
let mut base_client =
BaseClientBuilder::new(self.config().base(), storage, dkg_query_client)
.with_shutdown(self.shutdown_manager.shutdown_tracker_owned())
.with_user_agent(user_agent);
if let Some(custom_mixnet) = &self.custom_mixnet {
@@ -128,7 +133,7 @@ impl SocketClient {
Ok(base_client)
}
pub async fn start_socket(self) -> Result<TaskHandle, ClientError> {
pub async fn start_socket(self) -> Result<ShutdownManager, ClientError> {
if !self.config.socket.socket_type.is_websocket() {
return Err(ClientError::InvalidSocketMode);
}
@@ -147,13 +152,13 @@ impl SocketClient {
client_output,
client_state,
&self_address,
started_client.task_handle.get_handle(),
self.shutdown_manager.child_shutdown_token(),
packet_type,
);
info!("Client startup finished!");
info!("The address of this client is: {self_address}");
Ok(started_client.task_handle)
Ok(self.shutdown_manager)
}
}
+21 -27
View File
@@ -19,7 +19,7 @@ use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::connections::{
ConnectionCommand, ConnectionCommandSender, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::Instant;
@@ -44,7 +44,7 @@ pub(crate) struct HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}
impl HandlerBuilder {
@@ -57,7 +57,7 @@ impl HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
Self {
msg_input,
@@ -67,14 +67,13 @@ impl HandlerBuilder {
lane_queue_lengths,
reply_controller_sender,
packet_type,
task_client,
shutdown_token,
}
}
// TODO: make sure we only ever have one active handler
pub fn create_active_handler(&self) -> Handler {
let mut task_client = self.task_client.fork("active_handler");
task_client.disarm();
let shutdown_token = self.shutdown_token.clone();
Handler {
msg_input: self.msg_input.clone(),
client_connection_tx: self.client_connection_tx.clone(),
@@ -85,7 +84,7 @@ impl HandlerBuilder {
lane_queue_lengths: self.lane_queue_lengths.clone(),
reply_controller_sender: self.reply_controller_sender.clone(),
packet_type: self.packet_type,
task_client,
shutdown_token,
}
}
}
@@ -100,19 +99,14 @@ pub(crate) struct Handler {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}
impl Drop for Handler {
fn drop(&mut self) {
if let Err(err) = self
let _ = self
.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect)
{
if !self.task_client.is_shutdown_poll() {
error!("failed to disconnect the receiver from the buffer: {err}");
}
}
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect);
}
}
@@ -142,7 +136,7 @@ impl Handler {
{
Ok(length) => length,
Err(err) => {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!(
"Failed to get reply queue length for connection {connection_id}: {err}"
);
@@ -192,7 +186,7 @@ impl Handler {
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send message to the input buffer: {err}");
}
}
@@ -225,7 +219,7 @@ impl Handler {
let input_msg =
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send anonymous message to the input buffer: {err}");
}
}
@@ -253,7 +247,7 @@ impl Handler {
let input_msg = InputMessage::new_reply(recipient_tag, message, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send reply message to the input buffer: {err}");
}
}
@@ -275,7 +269,7 @@ impl Handler {
.client_connection_tx
.unbounded_send(ConnectionCommand::Close(connection_id))
{
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send close connection command: {err}");
}
}
@@ -394,11 +388,14 @@ impl Handler {
}
async fn listen_for_requests(&mut self, mut msg_receiver: ReconstructedMessagesReceiver) {
let mut task_client = self.task_client.fork("select");
task_client.disarm();
let shutdown_token = self.shutdown_token.clone();
while !task_client.is_shutdown() {
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
log::trace!("Websocket handler: Received shutdown");
break;
}
// we can either get a client request from the websocket
socket_msg = self.next_websocket_request() => {
if socket_msg.is_none() {
@@ -436,9 +433,6 @@ impl Handler {
break;
}
}
_ = task_client.recv() => {
log::trace!("Websocket handler: Received shutdown");
}
}
}
log::debug!("Websocket handler: Exiting");
@@ -464,7 +458,7 @@ impl Handler {
reconstructed_sender,
))
{
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("failed to announce the receiver to the buffer: {err}");
}
}
+7 -7
View File
@@ -3,7 +3,7 @@
use super::handler::HandlerBuilder;
use log::*;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::net::IpAddr;
use std::{net::SocketAddr, process, sync::Arc};
use tokio::io::AsyncWriteExt;
@@ -23,15 +23,15 @@ impl State {
pub(crate) struct Listener {
address: SocketAddr,
state: State,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}
impl Listener {
pub(crate) fn new(host: IpAddr, port: u16, task_client: TaskClient) -> Self {
pub(crate) fn new(host: IpAddr, port: u16, shutdown_token: ShutdownToken) -> Self {
Listener {
address: SocketAddr::new(host, port),
state: State::AwaitingConnection,
task_client,
shutdown_token,
}
}
@@ -46,11 +46,11 @@ impl Listener {
let notify = Arc::new(Notify::new());
while !self.task_client.is_shutdown() {
while !self.shutdown_token.is_cancelled() {
tokio::select! {
// When the handler finishes we check if shutdown is signalled
_ = notify.notified() => {
if self.task_client.is_shutdown() {
if self.shutdown_token.is_cancelled() {
log::trace!("Websocket listener: detected shutdown after connection closed");
break;
}
@@ -59,7 +59,7 @@ impl Listener {
}
// ... but when there is no connected client at the time of shutdown being
// signalled, we handle it here.
_ = self.task_client.recv() => {
_ = self.shutdown_token.cancelled() => {
if !self.state.is_connected() {
log::trace!("Not connected: shutting down");
break;
+2 -2
View File
@@ -1,10 +1,10 @@
[package]
name = "nym-socks5-client"
version = "1.1.61"
version = "1.1.64"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
rust-version = "1.70"
rust-version = "1.85"
license.workspace = true
[dependencies]
+5 -5
View File
@@ -1,8 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::channel::mpsc;
use futures::StreamExt;
use futures::channel::mpsc;
use notify::event::{DataChange, MetadataKind, ModifyKind};
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
@@ -96,10 +96,10 @@ impl AsyncFileWatcher {
// when testing I was consistently getting two `Modify(Data(Any))` events in quick succession
// (probably to modify content and metadata).
// we really only want to propagate one of them
if let Some(previous) = self.last_received.get(&event.kind) {
if now.duration_since(*previous) < self.tick_duration {
return false;
}
if let Some(previous) = self.last_received.get(&event.kind)
&& now.duration_since(*previous) < self.tick_duration
{
return false;
}
let Some(filters) = &self.filters else {
+2
View File
@@ -13,6 +13,8 @@ base64 = { workspace = true }
bincode = { workspace = true }
rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
semver = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
nym-credentials-interface = { path = "../credentials-interface" }
@@ -0,0 +1,273 @@
// Copyright 2025 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::PeerPublicKey;
use crate::{
AuthenticatorVersion, Error,
latest::registration::IpPair,
traits::{FinalMessage, InitMessage, QueryBandwidthMessage, TopUpMessage, Versionable},
v2, v3, v4, v5,
};
// This is very redundant with AuthenticatorRequest and I reckon they could be smooshed.
// It is a bit out of scope for me at the moment though
#[derive(Debug)]
pub enum ClientMessage {
Initial(Box<dyn InitMessage + Send + Sync + 'static>),
Final(Box<dyn FinalMessage + Send + Sync + 'static>),
Query(Box<dyn QueryBandwidthMessage + Send + Sync + 'static>),
TopUp(Box<dyn TopUpMessage + Send + Sync + 'static>),
}
impl ClientMessage {
// check if message is wasteful e.g. contains a credential
pub fn is_wasteful(&self) -> bool {
match self {
Self::Final(msg) => msg.credential().is_some(),
Self::TopUp(_) => true,
Self::Initial(_) | Self::Query(_) => false,
}
}
fn version(&self) -> AuthenticatorVersion {
match self {
ClientMessage::Initial(msg) => msg.version(),
ClientMessage::Final(msg) => msg.version(),
ClientMessage::Query(msg) => msg.version(),
ClientMessage::TopUp(msg) => msg.version(),
}
}
pub fn bytes(&self, reply_to: Recipient) -> Result<(Vec<u8>, u64), Error> {
match self.version() {
AuthenticatorVersion::V1 => Err(Error::UnsupportedVersion),
AuthenticatorVersion::V2 => {
use v2::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
request::AuthenticatorRequest,
};
match self {
ClientMessage::Initial(init_message) => {
let (req, id) = AuthenticatorRequest::new_initial_request(
InitMessage {
pub_key: init_message.pub_key(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Final(final_message) => {
let (req, id) = AuthenticatorRequest::new_final_request(
FinalMessage {
gateway_client: GatewayClient {
pub_key: final_message.gateway_client_pub_key(),
private_ip: final_message
.gateway_client_ipv4()
.ok_or(Error::UnsupportedMessage)?
.into(),
mac: ClientMac::new(final_message.gateway_client_mac()),
},
credential: final_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Query(query_message) => {
let (req, id) = AuthenticatorRequest::new_query_request(
query_message.pub_key(),
reply_to,
);
Ok((req.to_bytes()?, id))
}
_ => Err(Error::UnsupportedMessage),
}
}
AuthenticatorVersion::V3 => {
use v3::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
request::AuthenticatorRequest,
topup::TopUpMessage,
};
match self {
ClientMessage::Initial(init_message) => {
let (req, id) = AuthenticatorRequest::new_initial_request(
InitMessage {
pub_key: init_message.pub_key(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Final(final_message) => {
let (req, id) = AuthenticatorRequest::new_final_request(
FinalMessage {
gateway_client: GatewayClient {
pub_key: final_message.gateway_client_pub_key(),
private_ip: final_message
.gateway_client_ipv4()
.ok_or(Error::UnsupportedMessage)?
.into(),
mac: ClientMac::new(final_message.gateway_client_mac()),
},
credential: final_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Query(query_message) => {
let (req, id) = AuthenticatorRequest::new_query_request(
query_message.pub_key(),
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::TopUp(top_up_message) => {
let (req, id) = AuthenticatorRequest::new_topup_request(
TopUpMessage {
pub_key: top_up_message.pub_key(),
credential: top_up_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
}
}
AuthenticatorVersion::V4 => {
use v4::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
request::AuthenticatorRequest,
topup::TopUpMessage,
};
match self {
ClientMessage::Initial(init_message) => {
let (req, id) = AuthenticatorRequest::new_initial_request(
InitMessage {
pub_key: init_message.pub_key(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Final(final_message) => {
let (req, id) = AuthenticatorRequest::new_final_request(
FinalMessage {
gateway_client: GatewayClient {
pub_key: final_message.gateway_client_pub_key(),
private_ips: IpPair {
ipv4: final_message
.gateway_client_ipv4()
.ok_or(Error::UnsupportedMessage)?,
ipv6: final_message
.gateway_client_ipv6()
.ok_or(Error::UnsupportedMessage)?,
}
.into(),
mac: ClientMac::new(final_message.gateway_client_mac()),
},
credential: final_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Query(query_message) => {
let (req, id) = AuthenticatorRequest::new_query_request(
query_message.pub_key(),
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::TopUp(top_up_message) => {
let (req, id) = AuthenticatorRequest::new_topup_request(
TopUpMessage {
pub_key: top_up_message.pub_key(),
credential: top_up_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
}
}
AuthenticatorVersion::V5 => {
use v5::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
request::AuthenticatorRequest,
topup::TopUpMessage,
};
match self {
ClientMessage::Initial(init_message) => {
let (req, id) = AuthenticatorRequest::new_initial_request(InitMessage {
pub_key: init_message.pub_key(),
});
Ok((req.to_bytes()?, id))
}
ClientMessage::Final(final_message) => {
let (req, id) = AuthenticatorRequest::new_final_request(FinalMessage {
gateway_client: GatewayClient {
pub_key: final_message.gateway_client_pub_key(),
private_ips: IpPair {
ipv4: final_message
.gateway_client_ipv4()
.ok_or(Error::UnsupportedMessage)?,
ipv6: final_message
.gateway_client_ipv6()
.ok_or(Error::UnsupportedMessage)?,
},
mac: ClientMac::new(final_message.gateway_client_mac()),
},
credential: final_message.credential(),
});
Ok((req.to_bytes()?, id))
}
ClientMessage::Query(query_message) => {
let (req, id) =
AuthenticatorRequest::new_query_request(query_message.pub_key());
Ok((req.to_bytes()?, id))
}
ClientMessage::TopUp(top_up_message) => {
let (req, id) = AuthenticatorRequest::new_topup_request(TopUpMessage {
pub_key: top_up_message.pub_key(),
credential: top_up_message.credential(),
});
Ok((req.to_bytes()?, id))
}
}
}
AuthenticatorVersion::UNKNOWN => Err(Error::UnknownVersion),
}
}
pub fn use_surbs(&self) -> bool {
use AuthenticatorVersion::*;
match self.version() {
V1 | V2 | V3 | V4 => false,
V5 => true,
UNKNOWN => true,
}
}
}
// Same comment as above struct
#[derive(Debug)]
pub struct QueryMessageImpl {
pub pub_key: PeerPublicKey,
pub version: AuthenticatorVersion,
}
impl Versionable for QueryMessageImpl {
fn version(&self) -> AuthenticatorVersion {
self.version
}
}
impl QueryBandwidthMessage for QueryMessageImpl {
fn pub_key(&self) -> PeerPublicKey {
self.pub_key
}
}
+13 -2
View File
@@ -23,6 +23,17 @@ pub enum Error {
#[error("conversion: {0}")]
Conversion(String),
#[error("failed to serialize response packet: {source}")]
FailedToSerializeResponsePacket { source: Box<bincode::ErrorKind> },
// TODO add version number for debugging
#[error("unknown version number")]
UnknownVersion,
// TODO add version number for debugging
#[error("unsupported request version")]
UnsupportedVersion,
#[error("gateway doesn't support this type of message")]
UnsupportedMessage,
#[error(transparent)]
Bincode(#[from] bincode::Error),
}
+6 -1
View File
@@ -1,6 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod client_message;
pub mod request;
pub mod response;
pub mod traits;
pub mod v1;
pub mod v2;
@@ -10,11 +13,13 @@ pub mod v5;
mod error;
mod util;
mod version;
pub use error::Error;
pub use v5 as latest;
pub use version::AuthenticatorVersion;
pub const CURRENT_VERSION: u8 = 5;
pub const CURRENT_VERSION: u8 = latest::VERSION;
fn make_bincode_serializer() -> impl bincode::Options {
use bincode::Options;
@@ -0,0 +1,204 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use crate::traits::{FinalMessage, InitMessage, QueryBandwidthMessage, TopUpMessage};
use crate::{v1, v2, v3, v4, v5};
#[derive(Debug)]
pub enum AuthenticatorRequest {
Initial {
msg: Box<dyn InitMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
Final {
msg: Box<dyn FinalMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
QueryBandwidth {
msg: Box<dyn QueryBandwidthMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
TopUpBandwidth {
msg: Box<dyn TopUpMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
}
impl From<v1::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v1::request::AuthenticatorRequest) -> Self {
match value.data {
v1::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v1::request::AuthenticatorRequestData::Final(gateway_client) => Self::Final {
msg: Box::new(gateway_client),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v1::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl From<v2::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v2::request::AuthenticatorRequest) -> Self {
match value.data {
v2::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v2::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v2::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl From<v3::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v3::request::AuthenticatorRequest) -> Self {
match value.data {
v3::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v3::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v3::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
v3::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl From<v4::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v4::request::AuthenticatorRequest) -> Self {
match value.data {
v4::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v4::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v4::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
v4::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl From<v5::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v5::request::AuthenticatorRequest) -> Self {
match value.data {
v5::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
},
v5::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
},
v5::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
}
}
v5::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
}
}
}
}
}
@@ -0,0 +1,106 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::traits::{
Id, PendingRegistrationResponse, RegisteredResponse, RemainingBandwidthResponse,
TopUpBandwidthResponse,
};
use crate::{v2, v3, v4, v5};
#[derive(Debug)]
pub enum AuthenticatorResponse {
PendingRegistration(Box<dyn PendingRegistrationResponse + Send + Sync + 'static>),
Registered(Box<dyn RegisteredResponse + Send + Sync + 'static>),
RemainingBandwidth(Box<dyn RemainingBandwidthResponse + Send + Sync + 'static>),
TopUpBandwidth(Box<dyn TopUpBandwidthResponse + Send + Sync + 'static>),
}
impl Id for AuthenticatorResponse {
fn id(&self) -> u64 {
match self {
AuthenticatorResponse::PendingRegistration(pending_registration_response) => {
pending_registration_response.id()
}
AuthenticatorResponse::Registered(registered_response) => registered_response.id(),
AuthenticatorResponse::RemainingBandwidth(remaining_bandwidth_response) => {
remaining_bandwidth_response.id()
}
AuthenticatorResponse::TopUpBandwidth(top_up_bandwidth_response) => {
top_up_bandwidth_response.id()
}
}
}
}
impl From<v2::response::AuthenticatorResponse> for AuthenticatorResponse {
fn from(value: v2::response::AuthenticatorResponse) -> Self {
match value.data {
v2::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Self::PendingRegistration(Box::new(pending_registration_response)),
v2::response::AuthenticatorResponseData::Registered(registered_response) => {
Self::Registered(Box::new(registered_response))
}
v2::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Self::RemainingBandwidth(Box::new(remaining_bandwidth_response)),
}
}
}
impl From<v3::response::AuthenticatorResponse> for AuthenticatorResponse {
fn from(value: v3::response::AuthenticatorResponse) -> Self {
match value.data {
v3::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Self::PendingRegistration(Box::new(pending_registration_response)),
v3::response::AuthenticatorResponseData::Registered(registered_response) => {
Self::Registered(Box::new(registered_response))
}
v3::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Self::RemainingBandwidth(Box::new(remaining_bandwidth_response)),
v3::response::AuthenticatorResponseData::TopUpBandwidth(top_up_bandwidth_response) => {
Self::TopUpBandwidth(Box::new(top_up_bandwidth_response))
}
}
}
}
impl From<v4::response::AuthenticatorResponse> for AuthenticatorResponse {
fn from(value: v4::response::AuthenticatorResponse) -> Self {
match value.data {
v4::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Self::PendingRegistration(Box::new(pending_registration_response)),
v4::response::AuthenticatorResponseData::Registered(registered_response) => {
Self::Registered(Box::new(registered_response))
}
v4::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Self::RemainingBandwidth(Box::new(remaining_bandwidth_response)),
v4::response::AuthenticatorResponseData::TopUpBandwidth(top_up_bandwidth_response) => {
Self::TopUpBandwidth(Box::new(top_up_bandwidth_response))
}
}
}
}
impl From<v5::response::AuthenticatorResponse> for AuthenticatorResponse {
fn from(value: v5::response::AuthenticatorResponse) -> Self {
match value.data {
v5::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Self::PendingRegistration(Box::new(pending_registration_response)),
v5::response::AuthenticatorResponseData::Registered(registered_response) => {
Self::Registered(Box::new(registered_response))
}
v5::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Self::RemainingBandwidth(Box::new(remaining_bandwidth_response)),
v5::response::AuthenticatorResponseData::TopUpBandwidth(top_up_bandwidth_response) => {
Self::TopUpBandwidth(Box::new(top_up_bandwidth_response))
}
}
}
}
+437 -220
View File
@@ -1,49 +1,105 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::fmt;
use std::net::{Ipv4Addr, Ipv6Addr};
use nym_credentials_interface::CredentialSpendingData;
use nym_crypto::asymmetric::x25519::PrivateKey;
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::clients::Recipient;
use nym_wireguard_types::PeerPublicKey;
use crate::{
v1, v2, v3, v4,
v5::{self, registration::IpPair},
Error,
};
use crate::latest::registration::IpPair;
use crate::{AuthenticatorVersion, Error, v1, v2, v3, v4, v5};
#[derive(Copy, Clone, Debug)]
pub enum AuthenticatorVersion {
V1,
V2,
V3,
V4,
V5,
UNKNOWN,
pub trait Versionable {
fn version(&self) -> AuthenticatorVersion;
}
impl From<Protocol> for AuthenticatorVersion {
fn from(value: Protocol) -> Self {
if value.service_provider_type != ServiceProviderType::Authenticator {
AuthenticatorVersion::UNKNOWN
} else if value.version == v1::VERSION {
AuthenticatorVersion::V1
} else if value.version == v2::VERSION {
AuthenticatorVersion::V2
} else if value.version == v3::VERSION {
AuthenticatorVersion::V3
} else if value.version == v4::VERSION {
AuthenticatorVersion::V4
} else if value.version == v5::VERSION {
AuthenticatorVersion::V5
} else {
AuthenticatorVersion::UNKNOWN
}
impl Versionable for v1::GatewayClient {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V1
}
}
pub trait InitMessage {
impl Versionable for v1::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V1
}
}
impl Versionable for v2::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V2
}
}
impl Versionable for v3::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V3
}
}
impl Versionable for v4::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V4
}
}
impl Versionable for v5::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V5
}
}
impl Versionable for v2::registration::FinalMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V2
}
}
impl Versionable for v3::registration::FinalMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V3
}
}
impl Versionable for v4::registration::FinalMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V4
}
}
impl Versionable for v5::registration::FinalMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V5
}
}
impl Versionable for PeerPublicKey {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V3
}
}
impl Versionable for v3::topup::TopUpMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V3
}
}
impl Versionable for v4::topup::TopUpMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V4
}
}
impl Versionable for v5::topup::TopUpMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V5
}
}
pub trait InitMessage: Versionable + fmt::Debug {
fn pub_key(&self) -> PeerPublicKey;
}
@@ -77,15 +133,18 @@ impl InitMessage for v5::registration::InitMessage {
}
}
pub trait FinalMessage {
fn pub_key(&self) -> PeerPublicKey;
pub trait FinalMessage: Versionable + fmt::Debug {
fn gateway_client_pub_key(&self) -> PeerPublicKey;
fn verify(&self, private_key: &PrivateKey, nonce: u64) -> Result<(), Error>;
fn private_ips(&self) -> IpPair;
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr>;
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr>;
fn gateway_client_mac(&self) -> Vec<u8>;
fn credential(&self) -> Option<CredentialSpendingData>;
}
impl FinalMessage for v1::GatewayClient {
fn pub_key(&self) -> PeerPublicKey {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
self.pub_key
}
@@ -97,13 +156,28 @@ impl FinalMessage for v1::GatewayClient {
self.private_ip.into()
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
match self.private_ip {
std::net::IpAddr::V4(ipv4_addr) => Some(ipv4_addr),
std::net::IpAddr::V6(_) => None,
}
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
None
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
None
}
}
impl FinalMessage for v2::registration::FinalMessage {
fn pub_key(&self) -> PeerPublicKey {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
self.gateway_client.pub_key
}
@@ -115,13 +189,28 @@ impl FinalMessage for v2::registration::FinalMessage {
self.gateway_client.private_ip.into()
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
match self.gateway_client.private_ip {
std::net::IpAddr::V4(ipv4_addr) => Some(ipv4_addr),
std::net::IpAddr::V6(_) => None,
}
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
None
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.gateway_client.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
self.credential.clone()
}
}
impl FinalMessage for v3::registration::FinalMessage {
fn pub_key(&self) -> PeerPublicKey {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
self.gateway_client.pub_key
}
@@ -133,13 +222,28 @@ impl FinalMessage for v3::registration::FinalMessage {
self.gateway_client.private_ip.into()
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
match self.gateway_client.private_ip {
std::net::IpAddr::V4(ipv4_addr) => Some(ipv4_addr),
std::net::IpAddr::V6(_) => None,
}
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
None
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.gateway_client.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
self.credential.clone()
}
}
impl FinalMessage for v4::registration::FinalMessage {
fn pub_key(&self) -> PeerPublicKey {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
self.gateway_client.pub_key
}
@@ -151,13 +255,25 @@ impl FinalMessage for v4::registration::FinalMessage {
self.gateway_client.private_ips.into()
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
Some(self.gateway_client.private_ips.ipv4)
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
Some(self.gateway_client.private_ips.ipv6)
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.gateway_client.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
self.credential.clone()
}
}
impl FinalMessage for v5::registration::FinalMessage {
fn pub_key(&self) -> PeerPublicKey {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
self.gateway_client.pub_key
}
@@ -169,12 +285,24 @@ impl FinalMessage for v5::registration::FinalMessage {
self.gateway_client.private_ips
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
Some(self.gateway_client.private_ips.ipv4)
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
Some(self.gateway_client.private_ips.ipv6)
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.gateway_client.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
self.credential.clone()
}
}
pub trait QueryBandwidthMessage {
pub trait QueryBandwidthMessage: Versionable + fmt::Debug {
fn pub_key(&self) -> PeerPublicKey;
}
@@ -184,7 +312,7 @@ impl QueryBandwidthMessage for PeerPublicKey {
}
}
pub trait TopUpMessage {
pub trait TopUpMessage: Versionable + fmt::Debug {
fn pub_key(&self) -> PeerPublicKey;
fn credential(&self) -> CredentialSpendingData;
}
@@ -219,197 +347,286 @@ impl TopUpMessage for v5::topup::TopUpMessage {
}
}
pub enum AuthenticatorRequest {
Initial {
msg: Box<dyn InitMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
Final {
msg: Box<dyn FinalMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
QueryBandwidth {
msg: Box<dyn QueryBandwidthMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
TopUpBandwidth {
msg: Box<dyn TopUpMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
pub trait Id {
fn id(&self) -> u64;
}
impl From<v1::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v1::request::AuthenticatorRequest) -> Self {
match value.data {
v1::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v1::request::AuthenticatorRequestData::Final(gateway_client) => Self::Final {
msg: Box::new(gateway_client),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v1::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
impl Id for v2::response::PendingRegistrationResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl From<v2::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v2::request::AuthenticatorRequest) -> Self {
match value.data {
v2::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v2::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v2::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
impl Id for v3::response::PendingRegistrationResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl From<v3::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v3::request::AuthenticatorRequest) -> Self {
match value.data {
v3::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v3::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v3::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
v3::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
impl Id for v4::response::PendingRegistrationResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl From<v4::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v4::request::AuthenticatorRequest) -> Self {
match value.data {
v4::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v4::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v4::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
v4::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
impl Id for v5::response::PendingRegistrationResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl From<v5::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v5::request::AuthenticatorRequest) -> Self {
match value.data {
v5::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
},
v5::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
},
v5::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
}
}
v5::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
}
}
}
impl Id for v2::response::RegisteredResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v3::response::RegisteredResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v4::response::RegisteredResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v5::response::RegisteredResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v2::response::RemainingBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v3::response::RemainingBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v4::response::RemainingBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v5::response::RemainingBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v3::response::TopUpBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v4::response::TopUpBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v5::response::TopUpBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
pub trait PendingRegistrationResponse: Id + fmt::Debug {
fn nonce(&self) -> u64;
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error>;
fn pub_key(&self) -> PeerPublicKey;
fn private_ips(&self) -> IpPair;
}
impl PendingRegistrationResponse for v2::response::PendingRegistrationResponse {
fn nonce(&self) -> u64 {
self.reply.nonce
}
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error> {
self.reply.gateway_data.verify(gateway_key, self.nonce())
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.gateway_data.pub_key
}
fn private_ips(&self) -> IpPair {
self.reply.gateway_data.private_ip.into()
}
}
impl PendingRegistrationResponse for v3::response::PendingRegistrationResponse {
fn nonce(&self) -> u64 {
self.reply.nonce
}
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error> {
self.reply.gateway_data.verify(gateway_key, self.nonce())
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.gateway_data.pub_key
}
fn private_ips(&self) -> IpPair {
self.reply.gateway_data.private_ip.into()
}
}
impl PendingRegistrationResponse for v4::response::PendingRegistrationResponse {
fn nonce(&self) -> u64 {
self.reply.nonce
}
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error> {
self.reply.gateway_data.verify(gateway_key, self.nonce())
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.gateway_data.pub_key
}
fn private_ips(&self) -> IpPair {
self.reply.gateway_data.private_ips.into()
}
}
impl PendingRegistrationResponse for v5::response::PendingRegistrationResponse {
fn nonce(&self) -> u64 {
self.reply.nonce
}
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error> {
self.reply.gateway_data.verify(gateway_key, self.nonce())
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.gateway_data.pub_key
}
fn private_ips(&self) -> IpPair {
self.reply.gateway_data.private_ips
}
}
pub trait RegisteredResponse: Id + fmt::Debug {
fn private_ips(&self) -> IpPair;
fn pub_key(&self) -> PeerPublicKey;
fn wg_port(&self) -> u16;
}
impl RegisteredResponse for v2::response::RegisteredResponse {
fn private_ips(&self) -> IpPair {
self.reply.private_ip.into()
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.pub_key
}
fn wg_port(&self) -> u16 {
self.reply.wg_port
}
}
impl RegisteredResponse for v3::response::RegisteredResponse {
fn private_ips(&self) -> IpPair {
self.reply.private_ip.into()
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.pub_key
}
fn wg_port(&self) -> u16 {
self.reply.wg_port
}
}
impl RegisteredResponse for v4::response::RegisteredResponse {
fn private_ips(&self) -> IpPair {
self.reply.private_ips.into()
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.pub_key
}
fn wg_port(&self) -> u16 {
self.reply.wg_port
}
}
impl RegisteredResponse for v5::response::RegisteredResponse {
fn private_ips(&self) -> IpPair {
self.reply.private_ips
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.pub_key
}
fn wg_port(&self) -> u16 {
self.reply.wg_port
}
}
pub trait RemainingBandwidthResponse: Id + fmt::Debug {
fn available_bandwidth(&self) -> Option<i64>;
}
impl RemainingBandwidthResponse for v2::response::RemainingBandwidthResponse {
fn available_bandwidth(&self) -> Option<i64> {
self.reply.as_ref().map(|r| r.available_bandwidth)
}
}
impl RemainingBandwidthResponse for v3::response::RemainingBandwidthResponse {
fn available_bandwidth(&self) -> Option<i64> {
self.reply.as_ref().map(|r| r.available_bandwidth)
}
}
impl RemainingBandwidthResponse for v4::response::RemainingBandwidthResponse {
fn available_bandwidth(&self) -> Option<i64> {
self.reply.as_ref().map(|r| r.available_bandwidth)
}
}
impl RemainingBandwidthResponse for v5::response::RemainingBandwidthResponse {
fn available_bandwidth(&self) -> Option<i64> {
self.reply.as_ref().map(|r| r.available_bandwidth)
}
}
pub trait TopUpBandwidthResponse: Id + fmt::Debug {
fn available_bandwidth(&self) -> i64;
}
impl TopUpBandwidthResponse for v3::response::TopUpBandwidthResponse {
fn available_bandwidth(&self) -> i64 {
self.reply.available_bandwidth
}
}
impl TopUpBandwidthResponse for v4::response::TopUpBandwidthResponse {
fn available_bandwidth(&self) -> i64 {
self.reply.available_bandwidth
}
}
impl TopUpBandwidthResponse for v5::response::TopUpBandwidthResponse {
fn available_bandwidth(&self) -> i64 {
self.reply.available_bandwidth
}
}
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{engine::general_purpose, Engine};
use base64::{Engine, engine::general_purpose};
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{engine::general_purpose, Engine};
use base64::{Engine, engine::general_purpose};
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{engine::general_purpose, Engine};
use base64::{Engine, engine::general_purpose};
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{engine::general_purpose, Engine};
use base64::{Engine, engine::general_purpose};
use nym_credentials_interface::CredentialSpendingData;
use nym_network_defaults::constants::{WG_TUN_DEVICE_IP_ADDRESS_V4, WG_TUN_DEVICE_IP_ADDRESS_V6};
use nym_wireguard_types::PeerPublicKey;
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{engine::general_purpose, Engine};
use base64::{Engine, engine::general_purpose};
use nym_credentials_interface::CredentialSpendingData;
use nym_network_defaults::constants::{WG_TUN_DEVICE_IP_ADDRESS_V4, WG_TUN_DEVICE_IP_ADDRESS_V6};
use nym_wireguard_types::PeerPublicKey;
@@ -0,0 +1,195 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::{v1, v2, v3, v4, v5};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
#[derive(Copy, Clone, Debug, PartialEq, strum_macros::Display)]
#[strum(serialize_all = "snake_case")]
pub enum AuthenticatorVersion {
/// introduced in wispa release (1.1.5)
V1,
/// introduced in aero release (1.1.9)
V2,
/// introduced in magura release (1.1.10)
V3,
/// introduced in crunch release (1.2.0)
V4,
/// introduced in dorina-patched release (1.6.1)
V5,
UNKNOWN,
}
impl AuthenticatorVersion {
pub const LATEST: Self = Self::V5;
pub const fn release_version(&self) -> semver::Version {
match self {
AuthenticatorVersion::V1 => semver::Version::new(1, 1, 5),
AuthenticatorVersion::V2 => semver::Version::new(1, 1, 9),
AuthenticatorVersion::V3 => semver::Version::new(1, 1, 10),
AuthenticatorVersion::V4 => semver::Version::new(1, 2, 0),
AuthenticatorVersion::V5 => semver::Version::new(1, 6, 1),
AuthenticatorVersion::UNKNOWN => semver::Version::new(0, 0, 0),
}
}
}
impl From<Protocol> for AuthenticatorVersion {
fn from(value: Protocol) -> Self {
if value.service_provider_type != ServiceProviderType::Authenticator {
AuthenticatorVersion::UNKNOWN
} else if value.version == v1::VERSION {
AuthenticatorVersion::V1
} else if value.version == v2::VERSION {
AuthenticatorVersion::V2
} else if value.version == v3::VERSION {
AuthenticatorVersion::V3
} else if value.version == v4::VERSION {
AuthenticatorVersion::V4
} else if value.version == v5::VERSION {
AuthenticatorVersion::V5
} else {
AuthenticatorVersion::UNKNOWN
}
}
}
impl From<u8> for AuthenticatorVersion {
fn from(value: u8) -> Self {
if value == v1::VERSION {
AuthenticatorVersion::V1
} else if value == v2::VERSION {
AuthenticatorVersion::V2
} else if value == v3::VERSION {
AuthenticatorVersion::V3
} else if value == v4::VERSION {
AuthenticatorVersion::V4
} else if value == v5::VERSION {
AuthenticatorVersion::V5
} else {
AuthenticatorVersion::UNKNOWN
}
}
}
impl From<&str> for AuthenticatorVersion {
fn from(value: &str) -> Self {
let Ok(semver) = semver::Version::parse(value) else {
return Self::UNKNOWN;
};
semver.into()
}
}
impl From<Option<&String>> for AuthenticatorVersion {
fn from(value: Option<&String>) -> Self {
match value {
None => Self::UNKNOWN,
Some(value) => value.as_str().into(),
}
}
}
impl From<String> for AuthenticatorVersion {
fn from(value: String) -> Self {
Self::from(value.as_str())
}
}
impl From<Option<String>> for AuthenticatorVersion {
fn from(value: Option<String>) -> Self {
value.as_ref().into()
}
}
impl From<semver::Version> for AuthenticatorVersion {
fn from(semver: semver::Version) -> Self {
if semver < AuthenticatorVersion::V1.release_version() {
return Self::UNKNOWN;
}
if semver < AuthenticatorVersion::V2.release_version() {
return Self::V1;
}
if semver < AuthenticatorVersion::V3.release_version() {
return Self::V2;
}
if semver < AuthenticatorVersion::V4.release_version() {
return Self::V3;
}
if semver < AuthenticatorVersion::V5.release_version() {
return Self::V4;
}
// if provided version is higher (or equal) to release version of V5,
// we return the latest (i.e. v5)
debug_assert_eq!(
Self::V5,
Self::LATEST,
"a new AuthenticatorVersion variant has been introduced without adjusting the `From<semver::Version>` trait"
);
Self::LATEST
}
}
#[cfg(test)]
mod tests {
use super::super::latest;
use super::*;
#[test]
fn strum_display() {
// sanity check on formatting and casing
assert_eq!("v1", AuthenticatorVersion::V1.to_string());
assert_eq!("v2", AuthenticatorVersion::V2.to_string());
assert_eq!("unknown", AuthenticatorVersion::UNKNOWN.to_string());
}
#[test]
fn u8_conversion() {
assert_eq!(AuthenticatorVersion::V1, AuthenticatorVersion::from(1u8));
assert_eq!(AuthenticatorVersion::V2, AuthenticatorVersion::from(2u8));
assert_eq!(
AuthenticatorVersion::UNKNOWN,
AuthenticatorVersion::from(latest::VERSION + 1)
);
assert_eq!(
AuthenticatorVersion::UNKNOWN,
AuthenticatorVersion::from(0u8)
);
assert_eq!(
AuthenticatorVersion::UNKNOWN,
AuthenticatorVersion::from(255u8)
);
}
#[test]
fn semver_checks() {
assert_eq!(AuthenticatorVersion::UNKNOWN, "1.1.4".into());
assert_eq!(AuthenticatorVersion::UNKNOWN, "0.1.0".into());
assert_eq!(AuthenticatorVersion::UNKNOWN, "1.0.4".into());
assert_eq!(AuthenticatorVersion::V1, "1.1.5".into());
assert_eq!(AuthenticatorVersion::V1, "1.1.6".into());
assert_eq!(AuthenticatorVersion::V1, "1.1.8".into());
assert_eq!(AuthenticatorVersion::V2, "1.1.9".into());
assert_eq!(AuthenticatorVersion::V3, "1.1.10".into());
assert_eq!(AuthenticatorVersion::V3, "1.1.11".into());
assert_eq!(AuthenticatorVersion::V3, "1.1.60".into());
assert_eq!(AuthenticatorVersion::V4, "1.2.0".into());
assert_eq!(AuthenticatorVersion::V4, "1.2.1".into());
assert_eq!(AuthenticatorVersion::V4, "1.5.1".into());
assert_eq!(AuthenticatorVersion::V4, "1.6.0".into());
assert_eq!(AuthenticatorVersion::V5, "1.6.1".into());
assert_eq!(AuthenticatorVersion::V5, "1.6.11".into());
assert_eq!(AuthenticatorVersion::V5, "1.7.0".into());
assert_eq!(AuthenticatorVersion::V5, "1.16.11".into());
assert_eq!(AuthenticatorVersion::V5, "1.17.0".into());
}
}
+1
View File
@@ -7,6 +7,7 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = { workspace = true }
bip39 = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
+2
View File
@@ -23,10 +23,12 @@ use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
pub use event::BandwidthStatusMessage;
pub use traits::{BandwidthTicketProvider, DEFAULT_TICKETS_TO_SPEND};
pub mod acquire;
pub mod error;
mod event;
mod traits;
mod utils;
#[derive(Debug)]
+42
View File
@@ -0,0 +1,42 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use nym_credential_storage::storage::Storage;
use nym_credentials_interface::TicketType;
use nym_crypto::asymmetric::ed25519;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use crate::{error::BandwidthControllerError, BandwidthController, PreparedCredential};
pub const DEFAULT_TICKETS_TO_SPEND: u32 = 1;
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait BandwidthTicketProvider: Send + Sync {
async fn get_ecash_ticket(
&self,
ticket_type: TicketType,
gateway_id: ed25519::PublicKey,
tickets_to_spend: u32,
) -> Result<PreparedCredential, BandwidthControllerError>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<C, St> BandwidthTicketProvider for BandwidthController<C, St>
where
C: DkgQueryClient + Sync + Send,
St: nym_credential_storage::storage::Storage,
<St as Storage>::StorageError: Send + Sync + 'static,
{
async fn get_ecash_ticket(
&self,
ticket_type: TicketType,
gateway_id: ed25519::PublicKey,
tickets_to_spend: u32,
) -> Result<PreparedCredential, BandwidthControllerError> {
self.prepare_ecash_ticket(ticket_type, gateway_id.to_bytes(), tickets_to_spend)
.await
}
}
+1 -1
View File
@@ -13,7 +13,7 @@ use nym_credentials_interface::{
};
use nym_ecash_time::Date;
use nym_validator_client::coconut::all_ecash_api_clients;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nym_api::{EpochId, NymApiClientExt};
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use nym_validator_client::EcashApiClient;
use rand::prelude::SliceRandom;
+2 -2
View File
@@ -1,8 +1,8 @@
use clap::Args;
use clap::builder::Command;
use clap::clap_derive::ValueEnum;
use clap::Args;
use clap_complete::generator::generate;
use clap_complete::Shell as ClapShell;
use clap_complete::generator::generate;
use std::io;
pub fn fig_generate(command: &mut Command, name: &str) {
+8 -4
View File
@@ -3,7 +3,7 @@ name = "nym-client-core"
version = "1.1.15"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
rust-version = "1.76"
rust-version = "1.85"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -36,7 +36,7 @@ nym-bandwidth-controller = { path = "../bandwidth-controller" }
nym-crypto = { path = "../crypto" }
nym-gateway-client = { path = "../client-libs/gateway-client" }
nym-gateway-requests = { path = "../gateway-requests" }
nym-http-api-client = { path = "../http-api-client" }
nym-http-api-client = { path = "../http-api-client", features = ["network-defaults"] }
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
@@ -53,6 +53,7 @@ nym-client-core-config-types = { path = "./config-types", features = [
nym-client-core-surb-storage = { path = "./surb-storage" }
nym-client-core-gateways-storage = { path = "./gateways-storage" }
nym-ecash-time = { path = "../ecash-time" }
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
nym-mixnet-client = { path = "../client-libs/mixnet-client", default-features = false }
@@ -68,7 +69,6 @@ workspace = true
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.hyper-util]
workspace = true
features = ["tokio"]
###
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
workspace = true
@@ -102,7 +102,7 @@ workspace = true
features = ["tokio"]
[target."cfg(target_arch = \"wasm32\")".dependencies.gloo-timers]
version = "0.3.0"
workspace = true
features = ["futures"]
[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-utils]
@@ -113,6 +113,10 @@ features = ["websocket"]
workspace = true
features = ["wasm-bindgen"]
[target."cfg(target_arch = \"wasm32\")".dependencies.tokio_with_wasm]
workspace = true
features = ["full"]
[dev-dependencies]
tempfile = { workspace = true }
@@ -707,13 +707,10 @@ pub struct DebugConfig {
/// Defines all configuration options related to reply SURBs.
pub reply_surbs: ReplySurbs,
/// Defines all configuration options related to stats reporting.
pub stats_reporting: StatsReporting,
/// Defines all configuration options related to the forget me flag.
pub forget_me: ForgetMe,
/// Defines all configuration options related to the remember me flag.
pub remember_me: RememberMe,
}
@@ -543,10 +543,8 @@ pub struct DebugConfigV6 {
/// Defines all configuration options related to reply SURBs.
pub reply_surbs: ReplySurbsV6,
/// Defines all configuration options related to stats reporting.
pub stats_reporting: StatsReportingV6,
/// Defines all configuration options related to the forget me flag.
pub forget_me: ForgetMeV6,
@@ -31,7 +31,6 @@ impl StorageManager {
}
})?;
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
@@ -114,13 +114,12 @@ where
})?;
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::gateways_for_init(
&mut rng,
&core.client.nym_api_urls,
user_agent,
core.debug.topology.minimum_gateway_performance,
core.debug.topology.ignore_ingress_epoch_role,
None,
)
.await?
};
@@ -173,13 +173,12 @@ where
})?;
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::gateways_for_init(
&mut rng,
&core.client.nym_api_urls,
user_agent,
core.debug.topology.minimum_gateway_performance,
core.debug.topology.ignore_ingress_epoch_role,
None,
)
.await?
};
+350 -95
View File
@@ -7,11 +7,12 @@ use super::statistics_control::StatisticsControl;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
use crate::client::event_control::EventControl;
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
use crate::client::real_messages_control;
use crate::client::real_messages_control::RealMessagesController;
use crate::client::received_buffer::{
@@ -27,13 +28,13 @@ use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::{
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
};
use crate::config;
use crate::config::{Config, DebugConfig};
use crate::error::ClientCoreError;
use crate::init::{
setup_gateway,
types::{GatewaySetup, InitialisationResult},
};
use crate::{config, spawn_future};
use futures::channel::mpsc;
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_config_types::{ForgetMe, RememberMe};
@@ -48,16 +49,15 @@ use nym_gateway_client::{
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
use nym_statistics_common::clients::ClientStatsSender;
use nym_statistics_common::generate_client_stats_id;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_task::{TaskClient, TaskHandle};
use nym_task::ShutdownTracker;
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, NymApiClient, UserAgent};
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use rand::prelude::SliceRandom;
use rand::rngs::OsRng;
use rand::thread_rng;
@@ -67,9 +67,16 @@ use std::path::Path;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use tracing::*;
use url::Url;
#[cfg(target_arch = "wasm32")]
#[cfg(debug_assertions)]
use wasm_utils::console_log;
/// Default number of retries for Nym API requests when using network details with domain fronting.
/// This allows the client to try alternative URLs if the primary endpoint is unavailable.
const DEFAULT_NYM_API_RETRIES: usize = 3;
#[cfg(all(
not(target_arch = "wasm32"),
feature = "fs-surb-storage",
@@ -80,10 +87,28 @@ pub mod non_wasm_helpers;
pub mod helpers;
pub mod storage;
#[derive(Clone, Copy, Debug)]
pub enum MixnetClientEvent {
Traffic(MixTrafficEvent),
}
pub type EventReceiver = mpsc::UnboundedReceiver<MixnetClientEvent>;
#[derive(Clone)]
pub struct EventSender(pub mpsc::UnboundedSender<MixnetClientEvent>);
impl EventSender {
pub fn send(&self, event: MixnetClientEvent) {
if let Err(err) = self.0.unbounded_send(event) {
tracing::warn!("Failed to send error event. The caller event reader was closed: {err}");
}
}
}
#[derive(Clone)]
pub struct ClientInput {
pub connection_command_sender: ConnectionCommandSender,
pub input_sender: InputMessageSender,
pub client_request_sender: ClientRequestSender,
}
impl ClientInput {
@@ -95,7 +120,6 @@ impl ClientInput {
}
}
#[derive(Clone)]
pub struct ClientOutput {
pub received_buffer_request_sender: ReceivedBufferRequestSender,
}
@@ -192,10 +216,14 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
client_store: S,
dkg_query_client: Option<C>,
// Optional API URLs for domain fronting support
nym_api_urls: Option<Vec<nym_network_defaults::ApiUrl>>,
wait_for_gateway: bool,
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
shutdown: Option<TaskClient>,
shutdown: Option<ShutdownTracker>,
event_tx: Option<EventSender>,
user_agent: Option<UserAgent>,
setup_method: GatewaySetup,
@@ -220,10 +248,12 @@ where
config: base_config,
client_store,
dkg_query_client,
nym_api_urls: None,
wait_for_gateway: false,
custom_topology_provider: None,
custom_gateway_transceiver: None,
shutdown: None,
event_tx: None,
user_agent: None,
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
@@ -241,6 +271,16 @@ where
self
}
/// Set Nym API URLs for domain fronting support.
///
/// When provided, the client will use these API URLs (which include front_hosts)
/// to construct HTTP clients with domain fronting enabled.
#[must_use]
pub fn with_nym_api_urls(mut self, nym_api_urls: Vec<nym_network_defaults::ApiUrl>) -> Self {
self.nym_api_urls = Some(nym_api_urls);
self
}
#[must_use]
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
self.config.debug.forget_me = *forget_me;
@@ -281,11 +321,17 @@ where
}
#[must_use]
pub fn with_shutdown(mut self, shutdown: TaskClient) -> Self {
pub fn with_shutdown(mut self, shutdown: ShutdownTracker) -> Self {
self.shutdown = Some(shutdown);
self
}
#[must_use]
pub fn with_event_tx(mut self, event_tx: EventSender) -> Self {
self.event_tx = Some(event_tx);
self
}
#[must_use]
pub fn with_user_agent(mut self, user_agent: UserAgent) -> Self {
self.user_agent = Some(user_agent);
@@ -316,6 +362,18 @@ where
details.client_address()
}
fn start_event_control(
parent_event_tx: Option<EventSender>,
children_event_rx: EventReceiver,
shutdown_tracker: &ShutdownTracker,
) {
let event_control = EventControl::new(parent_event_tx, children_event_rx);
shutdown_tracker.try_spawn_named_with_shutdown(
async move { event_control.run().await },
"EventControl",
);
}
// future constantly pumping loop cover traffic at some specified average rate
// the pumped traffic goes to the MixTrafficController
fn start_cover_traffic_stream(
@@ -325,11 +383,11 @@ where
topology_accessor: TopologyAccessor,
mix_tx: BatchMixMessageSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
shutdown_tracker: &ShutdownTracker,
) {
info!("Starting loop cover traffic stream...");
tracing::info!("Starting loop cover traffic stream...");
let stream = LoopCoverTrafficStream::new(
let mut stream = LoopCoverTrafficStream::new(
ack_key,
debug_config.acknowledgements.average_ack_delay,
mix_tx,
@@ -338,10 +396,9 @@ where
debug_config.traffic,
debug_config.cover_traffic,
stats_tx,
task_client,
);
stream.start();
shutdown_tracker
.try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream");
}
#[allow(clippy::too_many_arguments)]
@@ -357,13 +414,12 @@ where
reply_controller_receiver: ReplyControllerReceiver,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
task_client: TaskClient,
packet_type: PacketType,
stats_tx: ClientStatsSender,
shutdown_tracker: &ShutdownTracker,
) {
info!("Starting real traffic stream...");
tracing::info!("Starting real traffic stream...");
RealMessagesController::new(
let real_messages_controller = RealMessagesController::new(
controller_config,
key_rotation_config,
ack_receiver,
@@ -376,9 +432,63 @@ where
lane_queue_lengths,
client_connection_rx,
stats_tx,
task_client,
)
.start(packet_type);
shutdown_tracker.clone_shutdown_token(),
);
// break out all the subtasks
let (mut out_queue_control, mut reply_controller, ack_controller) =
real_messages_controller.into_tasks();
let (
mut ack_listener,
mut input_listener,
mut retransmission_listener,
mut sent_notification_listener,
mut ack_action_controller,
) = ack_controller.into_tasks();
shutdown_tracker.try_spawn_named(
async move { out_queue_control.run().await },
"RealMessagesController::OutQueueControl",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { reply_controller.run(shutdown_token).await },
"RealMessagesController::ReplyController",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { ack_listener.run(shutdown_token).await },
"AcknowledgementController::AcknowledgementListener",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { input_listener.run(shutdown_token).await },
"AcknowledgementController::InputMessageListener",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { retransmission_listener.run(shutdown_token).await },
"AcknowledgementController::RetransmissionRequestListener",
);
shutdown_tracker.try_spawn_named_with_shutdown(
async move {
sent_notification_listener.run().await;
},
"AcknowledgementController::SentNotificationListener",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { ack_action_controller.run(shutdown_token).await },
"AcknowledgementController::ActionController",
);
// .start(packet_type);
}
// buffer controlling all messages fetched from provider
@@ -389,21 +499,29 @@ where
mixnet_receiver: MixnetMessageReceiver,
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
shutdown: TaskClient,
metrics_reporter: ClientStatsSender,
shutdown_tracker: &ShutdownTracker,
) {
info!("Starting received messages buffer controller...");
let controller: ReceivedMessagesBufferController<SphinxMessageReceiver> =
ReceivedMessagesBufferController::new(
local_encryption_keypair,
query_receiver,
mixnet_receiver,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
shutdown,
);
controller.start()
tracing::info!("Starting received messages buffer controller...");
let controller = ReceivedMessagesBufferController::<SphinxMessageReceiver>::new(
local_encryption_keypair,
query_receiver,
mixnet_receiver,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
shutdown_tracker.clone_shutdown_token(),
);
let (mut msg_receiver, mut req_receiver) = controller.into_tasks();
shutdown_tracker.try_spawn_named(
async move { msg_receiver.run().await },
"ReceivedMessagesBufferController::FragmentedMessageReceiver",
);
shutdown_tracker.try_spawn_named(
async move { req_receiver.run().await },
"ReceivedMessagesBufferController::RequestReceiver",
);
}
#[allow(clippy::too_many_arguments)]
@@ -415,7 +533,7 @@ where
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
shutdown: TaskClient,
shutdown_tracker: &ShutdownTracker,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
<S::KeyStore as KeyStore>::StorageError: Send + Sync + 'static,
@@ -434,7 +552,7 @@ where
packet_router,
bandwidth_controller,
stats_reporter,
shutdown,
shutdown_tracker.clone_shutdown_token(),
)
} else {
let cfg = GatewayConfig::new(
@@ -459,7 +577,7 @@ where
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
shutdown_tracker.clone_shutdown_token(),
)
};
@@ -495,7 +613,7 @@ where
details_store
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
.await.map_err(|err| {
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
tracing::error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
})?
}
@@ -522,7 +640,7 @@ where
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
mut shutdown: TaskClient,
shutdown_tracker: &ShutdownTracker,
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
where
<S::KeyStore as KeyStore>::StorageError: Send + Sync + 'static,
@@ -539,7 +657,6 @@ where
Err(ClientCoreError::CustomGatewaySelectionExpected)
} else {
// and make sure to invalidate the task client, so we wouldn't cause premature shutdown
shutdown.disarm();
custom_gateway_transceiver.set_packet_router(packet_router)?;
Ok(custom_gateway_transceiver)
};
@@ -555,7 +672,7 @@ where
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
shutdown_tracker,
)
.await?;
@@ -566,7 +683,7 @@ where
custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
config_topology: config::Topology,
nym_api_urls: Vec<Url>,
nym_api_client: NymApiClient,
nym_api_client: nym_http_api_client::Client,
) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or_else(|| {
@@ -586,26 +703,24 @@ where
topology_accessor: TopologyAccessor,
local_gateway: NodeIdentity,
wait_for_gateway: bool,
mut task_client: TaskClient,
shutdown_tracker: &ShutdownTracker,
) -> Result<(), ClientCoreError> {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The background topology refesher is not going to be started");
task_client.disarm();
tracing::info!("The background topology refresher is not going to be started");
}
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
task_client,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
info!("Obtaining initial network topology");
tracing::info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
@@ -631,13 +746,13 @@ where
.wait_for_gateway(local_gateway, waiting_timeout)
.await
{
error!(
tracing::error!(
"the gateway did not come back online within the specified timeout: {err}"
);
return Err(err.into());
}
} else {
error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
return Err(err.into());
}
}
@@ -645,8 +760,11 @@ where
if !topology_config.disable_refreshing {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start();
tracing::info!("Starting topology refresher...");
shutdown_tracker.try_spawn_named_with_shutdown(
async move { topology_refresher.run().await },
"TopologyRefresher",
);
}
Ok(())
@@ -657,9 +775,9 @@ where
user_agent: Option<UserAgent>,
client_stats_id: String,
input_sender: Sender<InputMessage>,
task_client: TaskClient,
shutdown_tracker: &ShutdownTracker,
) -> ClientStatsSender {
info!("Starting statistics control...");
tracing::info!("Starting statistics control...");
StatisticsControl::create_and_start(
config.debug.stats_reporting,
user_agent
@@ -667,18 +785,30 @@ where
.unwrap_or("unknown".to_string()),
client_stats_id,
input_sender.clone(),
task_client,
shutdown_tracker,
)
}
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown: TaskClient,
shutdown_tracker: &ShutdownTracker,
event_tx: EventSender,
) -> (BatchMixMessageSender, ClientRequestSender) {
info!("Starting mix traffic controller...");
let (mix_traffic_controller, mix_tx, client_tx) =
MixTrafficController::new(gateway_transceiver, shutdown);
mix_traffic_controller.start();
tracing::info!("Starting mix traffic controller...");
let mut mix_traffic_controller = MixTrafficController::new(
gateway_transceiver,
shutdown_tracker.clone_shutdown_token(),
event_tx,
);
let mix_tx = mix_traffic_controller.mix_tx();
let client_tx = mix_traffic_controller.client_tx();
shutdown_tracker.try_spawn_named(
async move { mix_traffic_controller.run().await },
"MixTrafficController",
);
(mix_tx, client_tx)
}
@@ -686,7 +816,7 @@ where
async fn setup_persistent_reply_storage(
backend: S::ReplyStore,
key_rotation_config: KeyRotationConfig,
shutdown: TaskClient,
shutdown_tracker: &ShutdownTracker,
) -> Result<CombinedReplyStorage, ClientCoreError>
where
<S::ReplyStore as ReplyStorageBackend>::StorageError: Sync + Send,
@@ -711,13 +841,14 @@ where
})?;
let store_clone = mem_store.clone();
spawn_future!(
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move {
persistent_storage
.flush_on_shutdown(store_clone, shutdown)
.flush_on_shutdown(store_clone, shutdown_token)
.await
},
"PersistentReplyStorage::flush_on_shutdown"
"PersistentReplyStorage::flush_on_shutdown",
);
Ok(mem_store)
@@ -735,7 +866,7 @@ where
{
// if client keys do not exist already, create and persist them
if key_store.load_keys().await.is_err() {
info!("could not find valid client keys - a new set will be generated");
tracing::info!("could not find valid client keys - a new set will be generated");
let mut rng = OsRng;
let keys = if let Some(derivation_material) = derivation_material {
ClientKeys::from_master_key(&mut rng, &derivation_material)
@@ -749,21 +880,75 @@ where
setup_gateway(setup_method, key_store, details_store).await
}
fn construct_nym_api_client(config: &Config, user_agent: Option<UserAgent>) -> NymApiClient {
fn construct_nym_api_client(
nym_api_urls: Option<&Vec<nym_network_defaults::ApiUrl>>,
config: &Config,
user_agent: Option<UserAgent>,
) -> Result<nym_http_api_client::Client, ClientCoreError> {
tracing::debug!(
"construct_nym_api_client called with nym_api_urls: {}",
nym_api_urls.is_some()
);
// If API URLs are provided, use new_with_fronted_urls() which handles domain fronting
if let Some(nym_api_urls) = nym_api_urls {
if nym_api_urls.is_empty() {
tracing::warn!("Provided nym_api_urls is empty, falling back to config endpoints");
} else {
tracing::info!(
"Building nym-api client from provided URLs (with domain fronting support): {} URLs",
nym_api_urls.len()
);
let mut builder =
nym_http_api_client::ClientBuilder::new_with_fronted_urls(nym_api_urls.clone())
.map_err(ClientCoreError::from)?
.with_retries(DEFAULT_NYM_API_RETRIES);
if let Some(user_agent) = user_agent {
builder = builder.with_user_agent(user_agent);
}
return builder.build().map_err(ClientCoreError::from);
}
}
// Fallback to basic client for backwards compatibility
tracing::debug!("Building basic nym-api HTTP client from config endpoints");
let mut nym_api_urls = config.get_nym_api_endpoints();
if nym_api_urls.is_empty() {
tracing::warn!("No API endpoints configured in config, this may cause issues");
}
nym_api_urls.shuffle(&mut thread_rng());
// Convert config URLs to ApiUrl format for consistency
let api_urls: Vec<nym_network_defaults::ApiUrl> = nym_api_urls
.into_iter()
.map(|url| nym_network_defaults::ApiUrl {
url: url.to_string(),
front_hosts: None,
})
.collect();
tracing::debug!("Using {} config API endpoints", api_urls.len());
let mut builder = nym_http_api_client::ClientBuilder::new_with_fronted_urls(api_urls)
.map_err(ClientCoreError::from)?
.with_retries(DEFAULT_NYM_API_RETRIES)
.with_bincode();
if let Some(user_agent) = user_agent {
NymApiClient::new_with_user_agent(nym_api_urls[0].clone(), user_agent)
} else {
NymApiClient::new(nym_api_urls[0].clone())
builder = builder.with_user_agent(user_agent);
}
builder.build().map_err(ClientCoreError::from)
}
async fn determine_key_rotation_state(
client: &NymApiClient,
client: &nym_http_api_client::Client,
) -> Result<KeyRotationConfig, ClientCoreError> {
Ok(client.nym_api.get_key_rotation_info().await?.into())
Ok(client.get_key_rotation_info().await?.into())
}
pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
@@ -774,7 +959,12 @@ where
<S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
<S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
{
info!("Starting nym client");
tracing::info!("Starting nym client");
#[cfg(debug_assertions)]
#[cfg(target_arch = "wasm32")]
{
console_log!("Starting base Nym Client");
}
// derive (or load) client keys and gateway configuration
let init_res = Self::initialise_keys_and_gateway(
@@ -803,17 +993,22 @@ where
// channels responsible for controlling real messages
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
// channels responsible for event management
let (event_sender, event_receiver) = mpsc::unbounded();
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor =
TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
// Shutdown notifier for signalling tasks to stop
let shutdown = self
.shutdown
.map(Into::<TaskHandle>::into)
.unwrap_or_default()
.name_if_unnamed("BaseNymClient");
// Create a shutdown tracker for this client - either as a child of provided tracker
// or get one from the registry
let shutdown_tracker = match self.shutdown {
Some(parent_tracker) => parent_tracker.clone(),
None => nym_task::create_sdk_shutdown_tracker()?,
};
Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
// channels responsible for dealing with reply-related fun
let (reply_controller_sender, reply_controller_receiver) =
@@ -830,7 +1025,11 @@ where
.dkg_query_client
.map(|client| BandwidthController::new(credential_store, client));
let nym_api_client = Self::construct_nym_api_client(&self.config, self.user_agent.clone());
let nym_api_client = Self::construct_nym_api_client(
self.nym_api_urls.as_ref(),
&self.config,
self.user_agent.clone(),
)?;
let key_rotation_config = Self::determine_key_rotation_state(&nym_api_client).await?;
let topology_provider = Self::setup_topology_provider(
@@ -845,7 +1044,7 @@ where
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
shutdown.fork("statistics_control"),
&shutdown_tracker.clone(),
);
// needs to be started as the first thing to block if required waiting for the gateway
@@ -855,14 +1054,14 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
shutdown.fork("topology_refresher"),
&shutdown_tracker.clone(),
)
.await?;
let gateway_packet_router = PacketRouter::new(
ack_sender,
mixnet_messages_sender,
shutdown.get_handle().named("gateway-packet-router"),
shutdown_tracker.clone_shutdown_token(),
);
let gateway_transceiver = Self::setup_gateway_transceiver(
@@ -875,7 +1074,7 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
shutdown.fork("gateway_transceiver"),
&shutdown_tracker.clone(),
)
.await?;
let gateway_ws_fd = gateway_transceiver.ws_fd();
@@ -883,7 +1082,7 @@ where
let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
shutdown.fork("persistent_reply_storage"),
&shutdown_tracker.clone(),
)
.await?;
@@ -893,8 +1092,8 @@ where
mixnet_messages_receiver,
reply_storage.key_storage(),
reply_controller_sender.clone(),
shutdown.fork("received_messages_buffer"),
stats_reporter.clone(),
&shutdown_tracker.clone(),
);
// The message_sender is the transmitter for any component generating sphinx packets
@@ -904,7 +1103,8 @@ where
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
shutdown.fork("mix_traffic_controller"),
&shutdown_tracker.clone(),
EventSender(event_sender),
);
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -933,9 +1133,8 @@ where
reply_controller_receiver,
shared_lane_queue_lengths.clone(),
client_connection_rx,
shutdown.fork("real_traffic_controller"),
self.config.debug.traffic.packet_type,
stats_reporter.clone(),
&shutdown_tracker.clone(),
);
if !self
@@ -951,12 +1150,19 @@ where
shared_topology_accessor.clone(),
message_sender,
stats_reporter.clone(),
shutdown.fork("cover_traffic_stream"),
&shutdown_tracker.clone(),
);
}
debug!("Core client startup finished!");
debug!("The address of this client is: {self_address}");
tracing::debug!("Core client startup finished!");
tracing::debug!("The address of this client is: {self_address}");
#[cfg(debug_assertions)]
#[cfg(target_arch = "wasm32")]
{
console_log!("Core client startup finished!");
console_log!("Rust::start_base: the address of this client is: {self_address}");
}
Ok(BaseClient {
address: self_address,
@@ -965,6 +1171,7 @@ where
client_input: ClientInput {
connection_command_sender: client_connection_tx,
input_sender,
client_request_sender,
},
},
client_output: ClientOutputStatus::AwaitingConsumer {
@@ -979,8 +1186,7 @@ where
gateway_connection: GatewayConnection { gateway_ws_fd },
},
stats_reporter,
task_handle: shutdown,
client_request_sender,
shutdown_handle: shutdown_tracker, // The primary tracker for this client
forget_me: self.config.debug.forget_me,
remember_me: self.config.debug.remember_me,
})
@@ -994,8 +1200,57 @@ pub struct BaseClient {
pub client_output: ClientOutputStatus,
pub client_state: ClientState,
pub stats_reporter: ClientStatsSender,
pub client_request_sender: ClientRequestSender,
pub task_handle: TaskHandle,
pub shutdown_handle: ShutdownTracker,
pub forget_me: ForgetMe,
pub remember_me: RememberMe,
}
#[cfg(test)]
mod tests {
use super::*;
use nym_network_defaults::{ApiUrl, NymNetworkDetails};
#[test]
fn test_network_details_with_multiple_urls() {
// Verify that network details can be configured with multiple API URLs
let mut network_details = NymNetworkDetails::new_empty();
network_details.nym_api_urls = Some(vec![
ApiUrl {
url: "https://validator.nymtech.net/api/".to_string(),
front_hosts: None,
},
ApiUrl {
url: "https://nym-frontdoor.vercel.app/api/".to_string(),
front_hosts: Some(vec!["vercel.app".to_string(), "vercel.com".to_string()]),
},
]);
assert_eq!(network_details.nym_api_urls.as_ref().unwrap().len(), 2);
assert!(network_details.nym_api_urls.as_ref().unwrap()[1]
.front_hosts
.is_some());
}
#[test]
fn test_network_details_with_front_hosts() {
// Verify that ApiUrl can store domain fronting configuration
let api_url = ApiUrl {
url: "https://nym-frontdoor.vercel.app/api/".to_string(),
front_hosts: Some(vec!["vercel.app".to_string(), "vercel.com".to_string()]),
};
assert_eq!(api_url.url, "https://nym-frontdoor.vercel.app/api/");
assert_eq!(api_url.front_hosts.as_ref().unwrap().len(), 2);
assert!(api_url
.front_hosts
.as_ref()
.unwrap()
.contains(&"vercel.app".to_string()));
}
#[test]
fn test_default_nym_api_retries_constant() {
// Verify the retry constant is set correctly
assert_eq!(DEFAULT_NYM_API_RETRIES, 3);
}
}
@@ -3,7 +3,7 @@
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::topology_control::TopologyAccessor;
use crate::{config, spawn_future};
use crate::config;
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use nym_sphinx::acknowledgements::AckKey;
@@ -12,7 +12,6 @@ use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::TaskClient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -69,8 +68,6 @@ where
packet_type: PacketType,
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
impl<R> Stream for LoopCoverTrafficStream<R>
@@ -117,7 +114,6 @@ impl LoopCoverTrafficStream<OsRng> {
traffic_config: config::Traffic,
cover_config: config::CoverTraffic,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let rng = OsRng;
@@ -137,7 +133,6 @@ impl LoopCoverTrafficStream<OsRng> {
use_legacy_sphinx_format: traffic_config.use_legacy_sphinx_format,
packet_type: traffic_config.packet_type,
stats_tx,
task_client,
}
}
@@ -210,7 +205,7 @@ impl LoopCoverTrafficStream<OsRng> {
TrySendError::Full(_) => {
// This isn't a problem, if the channel is full means we're already sending the
// max amount of messages downstream can handle.
tracing::debug!("Failed to send cover message - channel full");
tracing::trace!("Failed to send cover message - channel full");
}
TrySendError::Closed(_) => {
tracing::warn!("Failed to send cover message - channel closed");
@@ -230,17 +225,24 @@ impl LoopCoverTrafficStream<OsRng> {
// JS: due to identical logical structure to OutQueueControl::on_message(), this is also
// presumably required to prevent bugs in the future. Exact reason is still unknown to me.
// TODO: temporary and BAD workaround for wasm (we should find a way to yield here in wasm)
#[cfg(not(target_arch = "wasm32"))]
tokio::task::yield_now().await;
{
tokio::task::yield_now().await;
}
#[cfg(target_arch = "wasm32")]
{
tokio_with_wasm::task::yield_now().await;
}
}
// it's fine if cover traffic stream task gets killed whilst processing next message
#[allow(clippy::panic)]
pub fn start(mut self) {
pub async fn run(&mut self) {
if self.cover_traffic.disable_loop_cover_traffic_stream {
// we should have never got here in the first place - the task should have never been created to begin with
// so panic and review the code that lead to this branch
panic!("attempted to start LoopCoverTrafficStream while config explicitly disabled it.")
panic!("attempted to run LoopCoverTrafficStream while config explicitly disabled it.")
}
// we should set initial delay only when we actually start the stream
@@ -250,32 +252,11 @@ impl LoopCoverTrafficStream<OsRng> {
);
self.set_next_delay(sampled);
let mut shutdown = self.task_client.fork("select");
while self.next().await.is_some() {
self.on_new_message().await;
}
spawn_future!(
async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
tracing::trace!("LoopCoverTrafficStream: Received shutdown");
}
next = self.next() => {
if next.is_some() {
self.on_new_message().await;
} else {
tracing::trace!("LoopCoverTrafficStream: Stopping since channel closed");
break;
}
}
}
}
shutdown.recv_timeout().await;
tracing::debug!("LoopCoverTrafficStream: Exiting");
},
"LoopCoverTrafficStream"
)
// this should never get triggered
error!("cover traffic stream has been exhausted!")
}
}
@@ -0,0 +1,40 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::StreamExt;
use crate::client::base_client::{EventReceiver, EventSender, MixnetClientEvent};
/// Launches and manages task events, propagating upwards what is not strictly internal.
pub(crate) struct EventControl {
parent_event_tx: Option<EventSender>,
children_event_rx: EventReceiver,
}
impl EventControl {
pub(crate) fn new(
parent_event_tx: Option<EventSender>,
children_event_rx: EventReceiver,
) -> Self {
EventControl {
parent_event_tx,
children_event_rx,
}
}
fn is_internal(event: MixnetClientEvent) -> bool {
match event {
MixnetClientEvent::Traffic(_) => false,
}
}
pub(crate) async fn run(mut self) {
while let Some(event) = self.children_event_rx.next().await {
if let Some(parent_event_tx) = &self.parent_event_tx {
if !Self::is_internal(event) {
parent_event_tx.send(event);
}
}
}
}
}
@@ -2,9 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
#![allow(unused_imports)]
use std::time::Duration;
#[cfg(target_arch = "wasm32")]
pub use wasmtimer::{std::Instant, tokio::*};
pub type IntervalStream = gloo_timers::future::IntervalStream;
@@ -1,12 +1,13 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::error::ClientCoreError;
use crate::spawn_future;
use crate::client::{
base_client::{EventSender, MixnetClientEvent},
mix_traffic::transceiver::GatewayTransceiver,
};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use tracing::*;
use transceiver::ErasedGatewayError;
@@ -19,33 +20,41 @@ pub mod transceiver;
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
const MAX_FAILURE_COUNT: usize = 100;
/// Reduced from 100 to 20 to fail fast (~1-2 seconds instead of ~6 seconds).
/// If we can't send 20 packets in a row, the gateway is unreachable.
const MAX_FAILURE_COUNT: usize = 20;
// that's also disgusting.
pub struct Empty;
#[derive(Clone, Copy, Debug)]
pub enum MixTrafficEvent {
FailedSendingSphinx,
}
pub struct MixTrafficController {
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
mix_tx: BatchMixMessageSender,
mix_rx: BatchMixMessageReceiver,
client_rx: ClientRequestReceiver,
client_tx: ClientRequestSender,
// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
consecutive_gateway_failure_count: usize,
task_client: TaskClient,
shutdown_token: ShutdownToken,
event_tx: EventSender,
}
impl MixTrafficController {
pub fn new<T>(
gateway_transceiver: T,
task_client: TaskClient,
) -> (
MixTrafficController,
BatchMixMessageSender,
ClientRequestSender,
)
shutdown_token: ShutdownToken,
event_tx: EventSender,
) -> MixTrafficController
where
T: GatewayTransceiver + Send + 'static,
{
@@ -54,41 +63,32 @@ impl MixTrafficController {
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(8);
(
MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_rx: message_receiver,
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
task_client,
},
message_sender,
client_sender,
)
MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_tx: message_sender,
mix_rx: message_receiver,
client_rx: client_receiver,
client_tx: client_sender,
consecutive_gateway_failure_count: 0,
shutdown_token,
event_tx,
}
}
pub fn new_dynamic(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
task_client: TaskClient,
) -> (
MixTrafficController,
BatchMixMessageSender,
ClientRequestSender,
) {
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(8);
(
MixTrafficController {
gateway_transceiver,
mix_rx: message_receiver,
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
task_client,
},
message_sender,
client_sender,
)
shutdown_token: ShutdownToken,
event_tx: EventSender,
) -> MixTrafficController {
Self::new(gateway_transceiver, shutdown_token, event_tx)
}
pub fn client_tx(&self) -> ClientRequestSender {
self.client_tx.clone()
}
pub fn mix_tx(&self) -> BatchMixMessageSender {
self.mix_tx.clone()
}
async fn on_messages(
@@ -107,7 +107,7 @@ impl MixTrafficController {
tokio::select! {
biased;
_ = self.task_client.recv() => {
_ = self.shutdown_token.cancelled() => {
trace!("received shutdown while handling messages");
Ok(())
}
@@ -127,7 +127,7 @@ impl MixTrafficController {
async fn on_client_request(&mut self, client_request: ClientRequest) {
tokio::select! {
biased;
_ = self.task_client.recv() => {
_ = self.shutdown_token.cancelled() => {
trace!("received shutdown while handling client request");
}
result = self.gateway_transceiver.send_client_request(client_request) => {
@@ -138,51 +138,42 @@ impl MixTrafficController {
}
}
pub fn start(mut self) {
spawn_future!(
async move {
debug!("Started MixTrafficController with graceful shutdown support");
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
pub async fn run(&mut self) {
debug!("Started MixTrafficController with graceful shutdown support");
loop {
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
trace!("MixTrafficController: Received shutdown");
break;
}
// mix_rx should never error out as we're holding one instance of the sender
Some(mix_packets) = self.mix_rx.recv() => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
// IMO it shouldn't be signalled from there but it is how it is
// TODO : report the failure upwards and shutdown from upwards
// Gateway is dead, we have to shut down currently
error!("Signalling shutdown from the MixTrafficController");
self.shutdown_token.cancel();
break;
}
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
},
None => {
tracing::trace!("MixTrafficController: Stopping since channel closed");
break;
}
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
self.on_client_request(client_request).await;
},
None => {
tracing::trace!("MixTrafficController, client request channel closed");
}
},
}
},
// client_rx should never error out as we're holding one instance of the sender
Some(client_request) = self.client_rx.recv() => {
self.on_client_request(client_request).await;
}
self.task_client.recv_timeout().await;
tracing::debug!("MixTrafficController: Exiting");
},
"MixTrafficController"
);
}
}
debug!("MixTrafficController: Exiting");
}
}
+1
View File
@@ -3,6 +3,7 @@
pub mod base_client;
pub mod cover_traffic_stream;
pub(crate) mod event_control;
pub(crate) mod helpers;
pub mod inbound_messages;
pub mod key_manager;
@@ -10,18 +10,17 @@ use nym_sphinx::{
acknowledgements::{identifier::recover_identifier, AckKey},
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
};
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::sync::Arc;
use tracing::*;
/// Module responsible for listening for any data resembling acknowledgements from the network
/// and firing actions to remove them from the 'Pending' state.
pub(super) struct AcknowledgementListener {
pub(crate) struct AcknowledgementListener {
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
impl AcknowledgementListener {
@@ -30,14 +29,12 @@ impl AcknowledgementListener {
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
AcknowledgementListener {
ack_key,
ack_receiver,
action_sender,
stats_tx,
task_client,
}
}
@@ -68,14 +65,9 @@ impl AcknowledgementListener {
trace!("Received {frag_id} from the mix network");
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
if let Err(err) = self
let _ = self
.action_sender
.unbounded_send(Action::new_remove(frag_id))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send remove action to action controller: {err}");
}
}
.unbounded_send(Action::new_remove(frag_id));
}
async fn handle_ack_receiver_item(&mut self, item: Vec<Vec<u8>>) {
@@ -85,11 +77,16 @@ impl AcknowledgementListener {
}
}
pub(super) async fn run(&mut self) {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started AcknowledgementListener with graceful shutdown support");
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
tracing::trace!("AcknowledgementListener: Received shutdown");
break;
}
acks = self.ack_receiver.next() => match acks {
Some(acks) => self.handle_ack_receiver_item(acks).await,
None => {
@@ -97,12 +94,9 @@ impl AcknowledgementListener {
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("AcknowledgementListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("AcknowledgementListener: Exiting");
}
}
@@ -8,7 +8,7 @@ use futures::StreamExt;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::Delay as SphinxDelay;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
@@ -82,7 +82,7 @@ impl Config {
}
}
pub(super) struct ActionController {
pub(crate) struct ActionController {
/// Configurable parameters of the `ActionController`
config: Config,
@@ -102,8 +102,6 @@ pub(super) struct ActionController {
/// Channel for notifying `RetransmissionRequestListener` about expired acknowledgements.
retransmission_sender: RetransmissionRequestSender,
task_client: TaskClient,
}
impl ActionController {
@@ -111,7 +109,6 @@ impl ActionController {
config: Config,
retransmission_sender: RetransmissionRequestSender,
incoming_actions: AckActionReceiver,
task_client: TaskClient,
) -> Self {
ActionController {
config,
@@ -119,7 +116,6 @@ impl ActionController {
pending_acks_timers: NonExhaustiveDelayQueue::new(),
incoming_actions,
retransmission_sender,
task_client,
}
}
@@ -226,14 +222,9 @@ impl ActionController {
// downgrading an arc and then upgrading vs cloning is difference of 30ns vs 15ns
// so it's literally a NO difference while it might prevent us from unnecessarily
// resending data (in maybe 1 in 1 million cases, but it's something)
if let Err(err) = self
let _ = self
.retransmission_sender
.unbounded_send(Arc::downgrade(pending_ack_data))
{
if !self.task_client.is_shutdown_poll() {
tracing::error!("Failed to send pending ack for retransmission: {err}");
}
}
.unbounded_send(Arc::downgrade(pending_ack_data));
} else {
// this shouldn't cause any issues but shouldn't have happened to begin with!
error!("An already removed pending ack has expired")
@@ -251,11 +242,16 @@ impl ActionController {
}
}
pub(super) async fn run(&mut self) {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started ActionController with graceful shutdown support");
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
tracing::trace!("ActionController: Received shutdown");
break;
}
action = self.incoming_actions.next() => match action {
Some(action) => self.process_action(action),
None => {
@@ -272,13 +268,8 @@ impl ActionController {
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("ActionController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("ActionController: Exiting");
}
}
@@ -10,21 +10,20 @@ use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use rand::{CryptoRng, Rng};
use tracing::*;
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
/// putting everything into sphinx packets, etc.
/// It also makes an initial sending attempt for said messages.
pub(super) struct InputMessageListener<R>
pub(crate) struct InputMessageListener<R>
where
R: CryptoRng + Rng,
{
input_receiver: InputMessageReceiver,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
}
impl<R> InputMessageListener<R>
@@ -38,13 +37,11 @@ where
input_receiver: InputMessageReceiver,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
) -> Self {
InputMessageListener {
input_receiver,
message_handler,
reply_controller_sender,
task_client,
}
}
@@ -68,14 +65,9 @@ where
max_retransmissions: Option<u32>,
) {
// offload reply handling to the dedicated task
if let Err(err) =
let _ =
self.reply_controller_sender
.send_reply(recipient_tag, data, lane, max_retransmissions)
{
if !self.task_client.is_shutdown_poll() {
error!("failed to send a reply - {err}");
}
}
.send_reply(recipient_tag, data, lane, max_retransmissions);
}
async fn handle_plain_message(
@@ -221,13 +213,13 @@ where
};
}
pub(super) async fn run(&mut self) {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started InputMessageListener with graceful shutdown support");
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
biased;
_ = self.task_client.recv() => {
_ = shutdown_token.cancelled() => {
tracing::trace!("InputMessageListener: Received shutdown");
break;
}
@@ -243,7 +235,6 @@ where
}
}
self.task_client.recv_timeout().await;
tracing::debug!("InputMessageListener: Exiting");
}
}
@@ -10,7 +10,6 @@ use self::{
use crate::client::inbound_messages::InputMessageReceiver;
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller::ReplyControllerSender;
use crate::spawn_future;
use action_controller::AckActionReceiver;
use futures::channel::mpsc;
use nym_gateway_client::AcknowledgementReceiver;
@@ -23,13 +22,11 @@ use nym_sphinx::{
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use std::{
sync::{Arc, Weak},
time::Duration,
};
use tracing::*;
pub(crate) use action_controller::{AckActionSender, Action};
@@ -190,6 +187,9 @@ pub(super) struct Config {
/// Predefined packet size used for the encapsulated messages.
packet_size: PacketSize,
/// Type of packets used for retransmissions
packet_type: PacketType,
}
impl Config {
@@ -197,12 +197,14 @@ impl Config {
maximum_retransmissions: Option<u32>,
ack_wait_addition: Duration,
ack_wait_multiplier: f64,
packet_type: PacketType,
) -> Self {
Config {
maximum_retransmissions,
ack_wait_addition,
ack_wait_multiplier,
packet_size: Default::default(),
packet_type,
}
}
@@ -212,7 +214,7 @@ impl Config {
}
}
pub(super) struct AcknowledgementController<R>
pub(crate) struct AcknowledgementController<R>
where
R: CryptoRng + Rng,
{
@@ -234,7 +236,6 @@ where
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();
@@ -244,7 +245,6 @@ where
action_config,
retransmission_tx,
connectors.ack_action_receiver,
task_client.fork("action_controller"),
);
// will listen for any acks coming from the network
@@ -253,7 +253,6 @@ where
connectors.ack_receiver,
connectors.ack_action_sender.clone(),
stats_tx,
task_client.fork("acknowledgement_listener"),
);
// will listen for any new messages from the client
@@ -261,7 +260,6 @@ where
connectors.input_receiver,
message_handler.clone(),
reply_controller_sender.clone(),
task_client.fork("input_message_listener"),
);
// will listen for any ack timeouts and trigger retransmission
@@ -271,16 +269,13 @@ where
message_handler,
retransmission_rx,
reply_controller_sender,
task_client.fork("retransmission_request_listener"),
config.packet_type,
);
// will listen for events indicating the packet was sent through the network so that
// the retransmission timer should be started.
let sent_notification_listener = SentNotificationListener::new(
connectors.sent_notifier,
connectors.ack_action_sender,
task_client.with_suffix("sent_notification_listener"),
);
let sent_notification_listener =
SentNotificationListener::new(connectors.sent_notifier, connectors.ack_action_sender);
AcknowledgementController {
acknowledgement_listener,
@@ -291,51 +286,21 @@ where
}
}
pub(super) fn start(self, packet_type: PacketType) {
let mut acknowledgement_listener = self.acknowledgement_listener;
let mut input_message_listener = self.input_message_listener;
let mut retransmission_request_listener = self.retransmission_request_listener;
let mut sent_notification_listener = self.sent_notification_listener;
let mut action_controller = self.action_controller;
spawn_future!(
async move {
acknowledgement_listener.run().await;
debug!("The acknowledgement listener has finished execution!");
},
"AcknowledgementController::AcknowledgementListener"
);
spawn_future!(
async move {
input_message_listener.run().await;
debug!("The input listener has finished execution!");
},
"AcknowledgementController::InputMessageListener"
);
spawn_future!(
async move {
retransmission_request_listener.run(packet_type).await;
debug!("The retransmission request listener has finished execution!");
},
"AcknowledgementController::RetransmissionRequestListener"
);
spawn_future!(
async move {
sent_notification_listener.run().await;
debug!("The sent notification listener has finished execution!");
},
"AcknowledgementController::SentNotificationListener"
);
spawn_future!(
async move {
action_controller.run().await;
debug!("The controller has finished execution!");
},
"AcknowledgementController::ActionController"
);
pub(crate) fn into_tasks(
self,
) -> (
AcknowledgementListener,
InputMessageListener<R>,
RetransmissionRequestListener<R>,
SentNotificationListener,
ActionController,
) {
(
self.acknowledgement_listener,
self.input_message_listener,
self.retransmission_request_listener,
self.sent_notification_listener,
self.action_controller,
)
}
}
@@ -13,19 +13,19 @@ use futures::StreamExt;
use nym_sphinx::chunking::fragment::Fragment;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
use nym_task::{connections::TransmissionLane, TaskClient};
use nym_task::{connections::TransmissionLane, ShutdownToken};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
use tracing::*;
// responsible for packet retransmission upon fired timer
pub(super) struct RetransmissionRequestListener<R> {
pub(crate) struct RetransmissionRequestListener<R> {
maximum_retransmissions: Option<u32>,
action_sender: AckActionSender,
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
packet_type: PacketType,
}
impl<R> RetransmissionRequestListener<R>
@@ -38,7 +38,7 @@ where
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
packet_type: PacketType,
) -> Self {
RetransmissionRequestListener {
maximum_retransmissions,
@@ -46,7 +46,7 @@ where
message_handler,
request_receiver,
reply_controller_sender,
task_client,
packet_type,
}
}
@@ -67,7 +67,6 @@ where
async fn on_retransmission_request(
&mut self,
weak_timed_out_ack: Weak<PendingAcknowledgement>,
packet_type: PacketType,
) {
let timed_out_ack = match weak_timed_out_ack.upgrade() {
Some(timed_out_ack) => timed_out_ack,
@@ -97,22 +96,18 @@ where
} => {
// if this is retransmission for reply, offload it to the dedicated task
// that deals with all the surbs
if let Err(err) = self.reply_controller_sender.send_retransmission_data(
let _ = self.reply_controller_sender.send_retransmission_data(
*recipient_tag,
weak_timed_out_ack,
*extra_surb_request,
) {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send retransmission data to the reply controller: {err}");
}
}
);
return;
}
PacketDestination::KnownRecipient(recipient) => {
self.prepare_normal_retransmission_chunk(
**recipient,
timed_out_ack.message_chunk.clone(),
packet_type,
self.packet_type,
)
.await
}
@@ -153,14 +148,9 @@ where
// is sent to the `OutQueueControl` and has gone through its internal queue
// with the additional poisson delay.
// And since Actions are executed in order `UpdateTimer` will HAVE TO be executed before `StartTimer`
if let Err(err) = self
let _ = self
.action_sender
.unbounded_send(Action::new_update_pending_ack(frag_id, new_delay))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send update pending ack action to the controller: {err}");
}
}
.unbounded_send(Action::new_update_pending_ack(frag_id, new_delay));
// send to `OutQueueControl` to eventually send to the mix network
self.message_handler
@@ -174,18 +164,18 @@ where
.await
}
pub(super) async fn run(&mut self, packet_type: PacketType) {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started RetransmissionRequestListener with graceful shutdown support");
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
biased;
_ = self.task_client.recv() => {
_ = shutdown_token.cancelled() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
break;
}
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack).await,
None => {
tracing::trace!("RetransmissionRequestListener: Stopping since channel closed");
break;
@@ -194,7 +184,6 @@ where
}
}
self.task_client.recv_timeout().await;
tracing::debug!("RetransmissionRequestListener: Exiting");
}
}
@@ -5,29 +5,25 @@ use super::action_controller::{AckActionSender, Action};
use super::SentPacketNotificationReceiver;
use futures::StreamExt;
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use nym_task::TaskClient;
use tracing::*;
/// Module responsible for starting up retransmission timers.
/// It is required because when we send our packet to the `real traffic stream` controlled
/// by a poisson timer, there's no guarantee the message will be sent immediately, so we might
/// accidentally fire retransmission way quicker than we should have.
pub(super) struct SentNotificationListener {
pub(crate) struct SentNotificationListener {
sent_notifier: SentPacketNotificationReceiver,
action_sender: AckActionSender,
task_client: TaskClient,
}
impl SentNotificationListener {
pub(super) fn new(
sent_notifier: SentPacketNotificationReceiver,
action_sender: AckActionSender,
task_client: TaskClient,
) -> Self {
SentNotificationListener {
sent_notifier,
action_sender,
task_client,
}
}
@@ -36,37 +32,18 @@ impl SentNotificationListener {
trace!("sent off a cover message - no need to start retransmission timer!");
return;
}
if let Err(err) = self
let _ = self
.action_sender
.unbounded_send(Action::new_start_timer(frag_id))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send start timer action to action controller: {err}");
}
}
.unbounded_send(Action::new_start_timer(frag_id));
}
pub(super) async fn run(&mut self) {
pub(crate) async fn run(&mut self) {
debug!("Started SentNotificationListener with graceful shutdown support");
while !self.task_client.is_shutdown() {
tokio::select! {
frag_id = self.sent_notifier.next() => match frag_id {
Some(frag_id) => {
self.on_sent_message(frag_id).await;
}
None => {
tracing::trace!("SentNotificationListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("SentNotificationListener: Received shutdown");
break;
}
}
while let Some(frag_id) = self.sent_notifier.next().await {
self.on_sent_message(frag_id).await;
}
assert!(self.task_client.is_shutdown_poll());
tracing::debug!("SentNotificationListener: Exiting");
}
}
@@ -20,7 +20,7 @@ use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
@@ -189,7 +189,7 @@ pub(crate) struct MessageHandler<R> {
topology_access: TopologyAccessor,
reply_key_storage: SentReplyKeys,
tag_storage: UsedSenderTags,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}
impl<R> MessageHandler<R>
@@ -205,7 +205,7 @@ where
topology_access: TopologyAccessor,
reply_key_storage: SentReplyKeys,
tag_storage: UsedSenderTags,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self
where
R: Copy,
@@ -228,7 +228,7 @@ where
topology_access,
reply_key_storage,
tag_storage,
task_client,
shutdown_token,
}
}
@@ -712,7 +712,7 @@ where
.action_sender
.unbounded_send(Action::UpdatePendingAck(id, new_delay))
{
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send update action to the controller: {err}");
}
}
@@ -723,7 +723,7 @@ where
.action_sender
.unbounded_send(Action::new_insert(pending_acks))
{
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send insert action to the controller: {err}");
}
}
@@ -737,7 +737,7 @@ where
) {
tokio::select! {
biased;
_ = self.task_client.recv() => {
_ = self.shutdown_token.cancelled() => {
trace!("received shutdown while attempting to forward mixnet messages");
}
sending_res = self.real_message_sender.send((messages, transmission_lane)) => {
@@ -14,26 +14,21 @@ use crate::client::replies::reply_controller::{
ReplyController, ReplyControllerReceiver, ReplyControllerSender,
};
use crate::client::replies::reply_storage::CombinedReplyStorage;
use crate::config;
use crate::{
client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
topology_control::TopologyAccessor,
},
spawn_future,
use crate::client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
topology_control::TopologyAccessor,
};
use crate::config;
use futures::channel::mpsc;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::PacketType;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
use tracing::*;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
pub(crate) use acknowledgement_control::{AckActionSender, Action};
@@ -69,6 +64,7 @@ impl<'a> From<&'a Config> for acknowledgement_control::Config {
cfg.traffic.maximum_number_of_retransmissions,
cfg.acks.ack_wait_addition,
cfg.acks.ack_wait_multiplier,
cfg.traffic.packet_type,
)
.with_custom_packet_size(cfg.traffic.primary_packet_size)
}
@@ -146,7 +142,7 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
let rng = OsRng;
@@ -178,7 +174,7 @@ impl RealMessagesController<OsRng> {
topology_access.clone(),
reply_storage.key_storage(),
reply_storage.tags_storage(),
task_client.fork("message_handler"),
shutdown_token.clone(),
);
let ack_control = AcknowledgementController::new(
@@ -188,7 +184,6 @@ impl RealMessagesController<OsRng> {
message_handler.clone(),
reply_controller_sender,
stats_tx.clone(),
task_client.fork("ack_control"),
);
let reply_control = ReplyController::new(
@@ -196,7 +191,6 @@ impl RealMessagesController<OsRng> {
message_handler,
reply_storage,
reply_controller_receiver,
task_client.fork("reply_controller"),
);
let out_queue_control = OutQueueControl::new(
@@ -209,7 +203,7 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths,
client_connection_rx,
stats_tx,
task_client.with_suffix("out_queue_control"),
shutdown_token.clone(),
);
RealMessagesController {
@@ -219,26 +213,13 @@ impl RealMessagesController<OsRng> {
}
}
pub fn start(self, packet_type: PacketType) {
let mut out_queue_control = self.out_queue_control;
let ack_control = self.ack_control;
let mut reply_control = self.reply_control;
spawn_future!(
async move {
out_queue_control.run().await;
debug!("The out queue controller has finished execution!");
},
"RealMessagesController::OutQueueControl)"
);
spawn_future!(
async move {
reply_control.run().await;
debug!("The reply controller has finished execution!");
},
"RealMessagesController::ReplyController"
);
ack_control.start(packet_type);
pub fn into_tasks(
self,
) -> (
OutQueueControl<OsRng>,
ReplyController<OsRng>,
AcknowledgementController<OsRng>,
) {
(self.out_queue_control, self.reply_control, self.ack_control)
}
}
@@ -21,7 +21,7 @@ use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, C
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -31,9 +31,9 @@ use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
// use wasm_utils::console_log;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::{sleep, Sleep};
mod sending_delay_controller;
/// Configurable parameters of the `OutQueueControl`
@@ -119,7 +119,7 @@ where
/// Channel used for sending metrics events (specifically `PacketStatistics` events) to the metrics tracker.
stats_tx: ClientStatsSender,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}
#[derive(Debug)]
@@ -179,7 +179,7 @@ where
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
OutQueueControl {
config,
@@ -194,7 +194,7 @@ where
client_connection_rx,
lane_queue_lengths,
stats_tx,
task_client,
shutdown_token,
}
}
@@ -282,7 +282,7 @@ where
let sending_res = tokio::select! {
biased;
_ = self.task_client.recv() => {
_ = self.shutdown_token.cancelled() => {
trace!("received shutdown signal while attempting to send mix message");
return
}
@@ -293,11 +293,13 @@ where
match sending_res {
Err(_) => {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
tracing::error!(
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
);
}
// Early return to avoid further processing when channel is closed
return;
}
Ok(_) => {
let event = if fragment_id.is_some() {
@@ -325,9 +327,15 @@ where
// ready and hence was immediately re-scheduled causing other tasks to be starved;
// yield makes it go back the scheduling queue regardless of its value availability
// TODO: temporary and BAD workaround for wasm (we should find a way to yield here in wasm)
#[cfg(not(target_arch = "wasm32"))]
tokio::task::yield_now().await;
{
tokio::task::yield_now().await;
}
#[cfg(target_arch = "wasm32")]
{
tokio_with_wasm::task::yield_now().await;
}
}
fn on_close_connection(&mut self, connection_id: ConnectionId) {
@@ -536,9 +544,7 @@ where
}
#[cfg(not(target_arch = "wasm32"))]
fn log_status(&self, shutdown: &mut TaskClient) {
use crate::error::ClientCoreStatusMessage;
fn log_status(&self) {
let packets = self.transmission_buffer.total_size();
let lanes = self.transmission_buffer.lanes();
let mult = self.sending_delay_controller.current_multiplier();
@@ -567,32 +573,33 @@ where
tracing::debug!("{status_str}");
}
// Send status message to whoever is listening (possibly UI)
if mult == self.sending_delay_controller.max_multiplier() {
shutdown.send_status_msg(Box::new(ClientCoreStatusMessage::GatewayIsVerySlow));
} else if mult > self.sending_delay_controller.min_multiplier() {
shutdown.send_status_msg(Box::new(ClientCoreStatusMessage::GatewayIsSlow));
}
// leave the code commented in case somebody wanted to restore this logic with a different channel
// // Send status message to whoever is listening (possibly UI)
// if mult == self.sending_delay_controller.max_multiplier() {
// shutdown.send_status_msg(Box::new(ClientCoreStatusMessage::GatewayIsVerySlow));
// } else if mult > self.sending_delay_controller.min_multiplier() {
// shutdown.send_status_msg(Box::new(ClientCoreStatusMessage::GatewayIsSlow));
// }
}
pub(super) async fn run(&mut self) {
pub(crate) async fn run(&mut self) {
debug!("Started OutQueueControl with graceful shutdown support");
let mut shutdown = self.task_client.fork("select");
// avoid borrow on self
let shutdown_token = self.shutdown_token.clone();
#[cfg(not(target_arch = "wasm32"))]
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
while !shutdown.is_shutdown() {
loop {
tokio::select! {
biased;
_ = shutdown.recv() => {
_ = shutdown_token.cancelled() => {
tracing::trace!("OutQueueControl: Received shutdown");
break;
}
_ = status_timer.tick() => {
self.log_status(&mut shutdown);
self.log_status();
}
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
@@ -602,16 +609,16 @@ where
}
}
}
shutdown.recv_timeout().await;
}
#[cfg(target_arch = "wasm32")]
{
while !shutdown.is_shutdown() {
loop {
tokio::select! {
biased;
_ = shutdown.recv() => {
_ = shutdown_token.cancelled() => {
tracing::trace!("OutQueueControl: Received shutdown");
break;
}
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
@@ -83,11 +83,13 @@ impl SendingDelayController {
self.current_multiplier
}
#[allow(dead_code)]
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn min_multiplier(&self) -> u32 {
self.lower_bound
}
#[allow(dead_code)]
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn max_multiplier(&self) -> u32 {
self.upper_bound
@@ -1,10 +1,10 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::helpers::get_time_now;
use crate::client::replies::{
reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
};
use crate::spawn_future;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::StreamExt;
@@ -19,10 +19,10 @@ use nym_sphinx::message::{NymMessage, PlainMessage};
use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use tracing::*;
// The interval at which we check for stale buffers
@@ -54,7 +54,7 @@ struct ReceivedMessagesBufferInner<R: MessageReceiver> {
stats_tx: ClientStatsSender,
// Periodically check for stale buffers to clean up
last_stale_check: Instant,
last_stale_check: crate::client::helpers::Instant,
}
impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
@@ -154,7 +154,7 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
}
fn cleanup_stale_buffers(&mut self) {
let now = Instant::now();
let now = get_time_now();
if now - self.last_stale_check > STALE_BUFFER_CHECK_INTERVAL {
self.last_stale_check = now;
self.message_receiver
@@ -171,7 +171,7 @@ struct ReceivedMessagesBuffer<R: MessageReceiver> {
inner: Arc<Mutex<ReceivedMessagesBufferInner<R>>>,
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}
impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
@@ -180,7 +180,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
ReceivedMessagesBuffer {
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
@@ -190,11 +190,11 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
message_sender: None,
recently_reconstructed: HashSet::new(),
stats_tx,
last_stale_check: Instant::now(),
last_stale_check: get_time_now(),
})),
reply_key_storage,
reply_controller_sender,
task_client,
shutdown_token,
}
}
@@ -315,7 +315,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
reply_surbs,
from_surb_request,
) {
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("{err}");
}
}
@@ -338,7 +338,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
.reply_controller_sender
.send_additional_surbs_request(*recipient, amount)
{
if !self.task_client.is_shutdown_poll() {
if !self.shutdown_token.is_cancelled() {
error!("{err}");
}
}
@@ -465,22 +465,22 @@ pub enum ReceivedBufferMessage {
ReceiverDisconnect,
}
struct RequestReceiver<R: MessageReceiver> {
pub(crate) struct RequestReceiver<R: MessageReceiver> {
received_buffer: ReceivedMessagesBuffer<R>,
query_receiver: ReceivedBufferRequestReceiver,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}
impl<R: MessageReceiver> RequestReceiver<R> {
fn new(
received_buffer: ReceivedMessagesBuffer<R>,
query_receiver: ReceivedBufferRequestReceiver,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
RequestReceiver {
received_buffer,
query_receiver,
task_client,
shutdown_token,
}
}
@@ -495,66 +495,70 @@ impl<R: MessageReceiver> RequestReceiver<R> {
}
}
async fn run(&mut self) {
pub(crate) async fn run(&mut self) {
debug!("Started RequestReceiver with graceful shutdown support");
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
biased;
_ = self.task_client.recv() => {
_ = self.shutdown_token.cancelled() => {
tracing::trace!("RequestReceiver: Received shutdown");
break;
}
request = self.query_receiver.next() => {
if let Some(message) = request {
self.handle_message(message).await
} else {
tracing::trace!("RequestReceiver: Stopping since channel closed");
self.shutdown_token.cancelled().await;
break;
}
},
}
}
self.task_client.recv().await;
tracing::debug!("RequestReceiver: Exiting");
}
}
struct FragmentedMessageReceiver<R: MessageReceiver> {
pub(crate) struct FragmentedMessageReceiver<R: MessageReceiver> {
received_buffer: ReceivedMessagesBuffer<R>,
mixnet_packet_receiver: MixnetMessageReceiver,
task_client: TaskClient,
shutdown_token: ShutdownToken,
}
impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
fn new(
received_buffer: ReceivedMessagesBuffer<R>,
mixnet_packet_receiver: MixnetMessageReceiver,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
FragmentedMessageReceiver {
received_buffer,
mixnet_packet_receiver,
task_client,
shutdown_token,
}
}
async fn run(&mut self) -> Result<(), MessageRecoveryError> {
pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> {
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
tracing::trace!("FragmentedMessageReceiver: Received shutdown");
break;
}
new_messages = self.mixnet_packet_receiver.next() => {
if let Some(new_messages) = new_messages {
self.received_buffer.handle_new_received(new_messages).await?;
} else {
tracing::trace!("FragmentedMessageReceiver: Stopping since channel closed");
self.shutdown_token.cancelled().await;
break;
}
},
_ = self.task_client.recv_with_delay() => {
tracing::trace!("FragmentedMessageReceiver: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("FragmentedMessageReceiver: Exiting");
Ok(())
}
@@ -573,48 +577,31 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
metrics_reporter: ClientStatsSender,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
let received_buffer = ReceivedMessagesBuffer::new(
local_encryption_keypair,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
task_client.fork("received_messages_buffer"),
shutdown_token.clone(),
);
ReceivedMessagesBufferController {
fragmented_message_receiver: FragmentedMessageReceiver::new(
received_buffer.clone(),
mixnet_packet_receiver,
task_client.fork("fragmented_message_receiver"),
shutdown_token.clone(),
),
request_receiver: RequestReceiver::new(
received_buffer,
query_receiver,
task_client.with_suffix("request_receiver"),
shutdown_token.clone(),
),
}
}
pub fn start(self) {
let mut fragmented_message_receiver = self.fragmented_message_receiver;
let mut request_receiver = self.request_receiver;
spawn_future!(
async move {
match fragmented_message_receiver.run().await {
Ok(_) => {}
Err(e) => error!("{e}"),
}
},
"ReceivedMessagesBufferController::FragmentedMessageReceiver"
);
spawn_future!(
async move {
request_receiver.run().await;
},
"ReceivedMessagesBufferController::RequestReceiver"
);
pub(crate) fn into_tasks(self) -> (FragmentedMessageReceiver<R>, RequestReceiver<R>) {
(self.fragmented_message_receiver, self.request_receiver)
}
}
@@ -7,7 +7,7 @@ use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationC
use crate::client::replies::reply_storage::CombinedReplyStorage;
use crate::config;
use futures::StreamExt;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use rand::rngs::OsRng;
use rand::{CryptoRng, Rng};
use std::time::Duration;
@@ -60,9 +60,6 @@ pub struct ReplyController<R> {
receiver_controller: ReceiverReplyController<R>,
request_receiver: ReplyControllerReceiver,
// Listen for shutdown signals
task_client: TaskClient,
}
impl ReplyController<OsRng> {
@@ -71,7 +68,6 @@ impl ReplyController<OsRng> {
message_handler: MessageHandler<OsRng>,
full_reply_storage: CombinedReplyStorage,
request_receiver: ReplyControllerReceiver,
task_client: TaskClient,
) -> Self {
ReplyController {
config,
@@ -86,7 +82,6 @@ impl ReplyController<OsRng> {
message_handler,
),
request_receiver,
task_client,
}
}
}
@@ -148,22 +143,21 @@ where
self.sender_controller.inspect_and_clear_stale_data(now)
}
pub(crate) async fn run(&mut self) {
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started ReplyController with graceful shutdown support");
let mut shutdown = self.task_client.fork("reply-controller");
let polling_rate = Duration::from_secs(5);
let mut stale_inspection = new_interval_stream(polling_rate);
let polling_rate = self.config.key_rotation.epoch_duration / 8;
let mut invalidation_inspection = new_interval_stream(polling_rate);
while !shutdown.is_shutdown() {
loop {
tokio::select! {
biased;
_ = shutdown.recv() => {
_ = shutdown_token.cancelled() => {
tracing::trace!("ReplyController: Received shutdown");
break;
},
req = self.request_receiver.next() => match req {
Some(req) => self.handle_request(req).await,
@@ -181,7 +175,6 @@ where
}
}
}
assert!(shutdown.is_shutdown_poll());
tracing::debug!("ReplyController: Exiting");
}
}
@@ -16,21 +16,17 @@
#![warn(clippy::todo)]
#![warn(clippy::dbg_macro)]
use crate::client::inbound_messages::{InputMessage, InputMessageSender};
use futures::StreamExt;
use nym_client_core_config_types::StatsReporting;
use nym_sphinx::addressing::Recipient;
use nym_statistics_common::clients::{
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
};
use nym_task::{connections::TransmissionLane, TaskClient};
use nym_task::{connections::TransmissionLane, ShutdownToken, ShutdownTracker};
use std::time::Duration;
use crate::{
client::inbound_messages::{InputMessage, InputMessageSender},
spawn_future,
};
/// Time interval between reporting statistics locally (logging/task_client)
/// Time interval between reporting statistics locally (logging/shutdown_token)
const LOCAL_REPORT_INTERVAL: Duration = Duration::from_secs(2);
/// Interval for taking snapshots of the statistics
const SNAPSHOT_INTERVAL: Duration = Duration::from_millis(500);
@@ -51,9 +47,6 @@ pub(crate) struct StatisticsControl {
/// Config for stats reporting (enabled, address, interval)
reporting_config: StatsReporting,
/// Task client for listening for shutdown
task_client: TaskClient,
}
impl StatisticsControl {
@@ -62,24 +55,20 @@ impl StatisticsControl {
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> (Self, ClientStatsSender) {
let (stats_tx, stats_rx) = tokio::sync::mpsc::unbounded_channel();
let stats = ClientStatsController::new(client_stats_id, client_type);
let mut task_client_stats_sender = task_client.fork("stats_sender");
task_client_stats_sender.disarm();
(
StatisticsControl {
stats,
stats_rx,
report_tx,
reporting_config,
task_client,
},
ClientStatsSender::new(Some(stats_tx), task_client_stats_sender),
ClientStatsSender::new(Some(stats_tx), shutdown_token),
)
}
@@ -99,7 +88,8 @@ impl StatisticsControl {
}
}
async fn run(&mut self) {
// manually control the shutdown mechanism as we don't want to get interrupted mid-snapshot
pub async fn run(&mut self, shutdown_token: ShutdownToken) {
tracing::debug!("Started StatisticsControl with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
@@ -129,10 +119,10 @@ impl StatisticsControl {
let mut snapshot_interval =
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
biased;
_ = self.task_client.recv() => {
_ = shutdown_token.cancelled() => {
tracing::trace!("StatisticsControl: Received shutdown");
break;
},
@@ -157,37 +147,34 @@ impl StatisticsControl {
}
_ = local_report_interval.next() => {
self.stats.local_report(&mut self.task_client);
self.stats.local_report();
}
}
}
tracing::debug!("StatisticsControl: Exiting");
}
pub(crate) fn start(mut self) {
spawn_future!(
async move {
self.run().await;
},
"StatisticsControl"
)
}
pub(crate) fn create_and_start(
reporting_config: StatsReporting,
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
task_client: TaskClient,
shutdown_tracker: &ShutdownTracker,
) -> ClientStatsSender {
let (controller, sender) = Self::create(
let (mut controller, sender) = Self::create(
reporting_config,
client_type,
client_stats_id,
report_tx,
task_client,
shutdown_tracker.child_shutdown_token(),
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move {
controller.run(shutdown_token).await;
},
"StatisticsControl",
);
controller.start();
sender
}
}
@@ -1,11 +1,9 @@
// Copyright 2021-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::spawn_future;
pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
use futures::StreamExt;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_task::TaskClient;
use nym_topology::NymTopologyError;
use std::time::Duration;
use tracing::*;
@@ -41,8 +39,6 @@ pub struct TopologyRefresher {
refresh_rate: Duration,
consecutive_failure_count: usize,
task_client: TaskClient,
}
impl TopologyRefresher {
@@ -50,14 +46,12 @@ impl TopologyRefresher {
cfg: TopologyRefresherConfig,
topology_accessor: TopologyAccessor,
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
task_client: TaskClient,
) -> Self {
TopologyRefresher {
topology_provider,
topology_accessor,
refresh_rate: cfg.refresh_rate,
consecutive_failure_count: 0,
task_client,
}
}
@@ -144,40 +138,30 @@ impl TopologyRefresher {
}
}
pub fn start(mut self) {
spawn_future!(
async move {
debug!("Started TopologyRefresher with graceful shutdown support");
// it's perfectly fine if task is interrupted mid-refresh
// there's no data to persist or send over
pub async fn run(&mut self) {
debug!("Started TopologyRefresher with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(self.refresh_rate),
);
#[cfg(not(target_arch = "wasm32"))]
let mut interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(self.refresh_rate));
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
while !self.task_client.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = self.task_client.recv() => {
tracing::trace!("TopologyRefresher: Received shutdown");
},
}
}
self.task_client.recv_timeout().await;
tracing::debug!("TopologyRefresher: Exiting");
},
"TopologyRefresher"
)
while interval.next().await.is_some() {
self.try_refresh().await;
}
// this should never get triggered
error!("topology refresher interval has been exhausted!")
}
}
@@ -2,8 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use nym_mixnet_contract_common::EpochRewardedSet;
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
use nym_topology::NymTopology;
use nym_validator_client::nym_api::NymApiClientExt;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
@@ -39,30 +41,43 @@ impl Config {
pub struct NymApiTopologyProvider {
config: Config,
validator_client: nym_validator_client::client::NymApiClient,
validator_client: nym_http_api_client::Client,
nym_api_urls: Vec<Url>,
currently_used_api: usize,
use_bincode: bool,
}
impl NymApiTopologyProvider {
pub fn new(
config: impl Into<Config>,
mut nym_api_urls: Vec<Url>,
mut validator_client: nym_validator_client::client::NymApiClient,
validator_client: nym_http_api_client::Client,
) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
validator_client.change_nym_api(nym_api_urls[0].clone());
NymApiTopologyProvider {
let mut provider = NymApiTopologyProvider {
config: config.into(),
validator_client,
nym_api_urls,
currently_used_api: 0,
}
use_bincode: true,
};
// Set all API URLs - the client will try them in order with automatic failover
provider.validator_client.change_base_urls(
provider
.nym_api_urls
.iter()
.map(|u| u.clone().into())
.collect(),
);
provider
}
pub fn disable_bincode(&mut self) {
self.validator_client.use_bincode = false;
self.use_bincode = false;
// Note: The unified client doesn't support toggling bincode after creation.
// This would require recreating the client without bincode.
// For now, we'll track the preference but it won't take effect.
warn!("Disabling bincode on existing client is not currently supported");
}
fn use_next_nym_api(&mut self) {
@@ -72,8 +87,19 @@ impl NymApiTopologyProvider {
}
self.currently_used_api = (self.currently_used_api + 1) % self.nym_api_urls.len();
self.validator_client
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
// Provide all URLs starting from the next one in rotation order
// This enables automatic failover to other endpoints
let rotated_urls: Vec<_> = self
.nym_api_urls
.iter()
.cycle()
.skip(self.currently_used_api)
.take(self.nym_api_urls.len())
.map(|u| u.clone().into())
.collect();
self.validator_client.change_base_urls(rotated_urls)
}
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
@@ -99,8 +125,13 @@ impl NymApiTopologyProvider {
.filter(|n| n.performance.round_to_integer() >= self.config.min_node_performance())
.collect::<Vec<_>>();
NymTopology::new(metadata.to_topology_metadata(), rewarded_set, Vec::new())
.with_skimmed_nodes(&nodes_filtered)
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
NymTopology::new(
metadata.to_topology_metadata(),
epoch_rewarded_set,
Vec::new(),
)
.with_skimmed_nodes(&nodes_filtered)
} else {
// if we're not using extended topology, we're only getting active set mixnodes and gateways
@@ -148,8 +179,13 @@ impl NymApiTopologyProvider {
}
}
NymTopology::new(metadata.to_topology_metadata(), rewarded_set, Vec::new())
.with_skimmed_nodes(&nodes)
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
NymTopology::new(
metadata.to_topology_metadata(),
epoch_rewarded_set,
Vec::new(),
)
.with_skimmed_nodes(&nodes)
};
if !topology.is_minimally_routable() {
+13 -4
View File
@@ -4,6 +4,7 @@
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_task::RegistryAccessError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::nym_api::error::NymAPIError;
@@ -56,10 +57,7 @@ pub enum ClientCoreError {
ListOfNymApisIsEmpty,
#[error("failed to resolve a query to nym API: {source}")]
NymApiQueryFailure {
#[from]
source: NymAPIError,
},
NymApiQueryFailure { source: Box<NymAPIError> },
#[error(
"the current network topology seem to be insufficient to route any packets through:\n\t{0}"
@@ -245,6 +243,9 @@ pub enum ClientCoreError {
#[error("failed to select valid gateway due to incomputable latency")]
GatewaySelectionFailure { source: WeightedError },
#[error("Could not access task registry, {0}")]
RegistryAccess(#[from] RegistryAccessError),
}
impl From<tungstenite::Error> for ClientCoreError {
@@ -255,6 +256,14 @@ impl From<tungstenite::Error> for ClientCoreError {
}
}
impl From<NymAPIError> for ClientCoreError {
fn from(err: NymAPIError) -> ClientCoreError {
ClientCoreError::NymApiQueryFailure {
source: Box::new(err),
}
}
}
/// Set of messages that the client can send to listeners via the task manager
#[derive(Debug)]
pub enum ClientCoreStatusMessage {
+132 -20
View File
@@ -7,7 +7,8 @@ use futures::{SinkExt, StreamExt};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::GatewayClient;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKeyRef;
use nym_validator_client::client::{IdentityKeyRef, NymApiClientExt};
use nym_validator_client::nym_nodes::SkimmedNodesWithMetadata;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
#[cfg(unix)]
@@ -44,6 +45,7 @@ type WsConn = JSWebsocket;
const CONCURRENT_GATEWAYS_MEASURED: usize = 20;
const MEASUREMENTS: usize = 3;
const DEFAULT_NYM_API_RETRIES: usize = 3;
#[cfg(not(target_arch = "wasm32"))]
const CONN_TIMEOUT: Duration = Duration::from_millis(1500);
@@ -83,32 +85,88 @@ struct GatewayWithLatency<'a, G: ConnectableGateway> {
latency: Duration,
}
// Helper to collect all pages of entry nodes - replicates NymApiClient's convenience method
async fn get_all_basic_entry_nodes_with_metadata(
client: &nym_http_api_client::Client,
use_bincode: bool,
) -> Result<SkimmedNodesWithMetadata, ClientCoreError> {
// Get first page to obtain metadata
let mut page = 0;
let res = client
.get_basic_entry_assigned_nodes_v2(false, Some(page), None, use_bincode)
.await?;
let mut nodes = res.nodes.data;
let metadata = res.metadata;
if res.nodes.pagination.total == nodes.len() {
return Ok(SkimmedNodesWithMetadata::new(nodes, metadata));
}
page += 1;
// Collect remaining pages
loop {
let mut res = client
.get_basic_entry_assigned_nodes_v2(false, Some(page), None, use_bincode)
.await?;
if !metadata.consistency_check(&res.metadata) {
return Err(ClientCoreError::ValidatorClientError(
nym_validator_client::ValidatorClientError::InconsistentPagedMetadata,
));
}
nodes.append(&mut res.nodes.data);
if nodes.len() < res.nodes.pagination.total {
page += 1
} else {
break;
}
}
Ok(SkimmedNodesWithMetadata::new(nodes, metadata))
}
impl<'a, G: ConnectableGateway> GatewayWithLatency<'a, G> {
fn new(gateway: &'a G, latency: Duration) -> Self {
GatewayWithLatency { gateway, latency }
}
}
pub async fn gateways_for_init<R: Rng>(
rng: &mut R,
pub async fn gateways_for_init(
nym_apis: &[Url],
user_agent: Option<UserAgent>,
minimum_performance: u8,
ignore_epoch_roles: bool,
retry_count: Option<usize>,
) -> Result<Vec<RoutingNode>, ClientCoreError> {
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
let client = if let Some(user_agent) = user_agent {
nym_validator_client::client::NymApiClient::new_with_user_agent(nym_api.clone(), user_agent)
} else {
nym_validator_client::client::NymApiClient::new(nym_api.clone())
};
// Build client with ALL URLs for fallback support
let nym_api_urls: Vec<nym_http_api_client::Url> = nym_apis
.iter()
.map(|url| nym_http_api_client::Url::from(url.clone()))
.collect();
tracing::debug!("Fetching list of gateways from: {nym_api}");
if nym_api_urls.is_empty() {
return Err(ClientCoreError::ListOfNymApisIsEmpty);
}
let gateways = client
.get_all_basic_entry_assigned_nodes_with_metadata()
let retry_count = retry_count.unwrap_or(DEFAULT_NYM_API_RETRIES);
let mut builder = nym_http_api_client::ClientBuilder::new_with_urls(nym_api_urls.clone())?
.with_retries(retry_count)
.with_bincode();
if let Some(user_agent) = user_agent {
builder = builder.with_user_agent(user_agent);
}
let client = builder.build().map_err(|e| {
ClientCoreError::ValidatorClientError(nym_validator_client::ValidatorClientError::from(e))
})?;
tracing::debug!("Fetching list of gateways from: {:?}", nym_api_urls);
// Use our helper to handle pagination
let gateways = get_all_basic_entry_nodes_with_metadata(&client, true)
.await?
.nodes;
info!("nym api reports {} gateways", gateways.len());
@@ -117,17 +175,15 @@ pub async fn gateways_for_init<R: Rng>(
// filter out gateways below minimum performance and ones that could operate as a mixnode
// (we don't want instability)
let valid_gateways = gateways
let valid_gateways: Vec<RoutingNode> = gateways
.iter()
.filter(|g| ignore_epoch_roles || !g.supported_roles.mixnode)
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<_>>();
tracing::debug!("After checking validity: {}", valid_gateways.len());
tracing::trace!("Valid gateways: {valid_gateways:#?}");
.collect();
tracing::info!(
"and {} after validity and performance filtering",
"Found {} valid gateways after filtering",
valid_gateways.len()
);
@@ -290,13 +346,20 @@ pub(super) fn get_specified_gateway(
must_use_tls: bool,
) -> Result<RoutingNode, ClientCoreError> {
tracing::debug!("Requesting specified gateway: {gateway_identity}");
let user_gateway = ed25519::PublicKey::from_base58_string(gateway_identity)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
let gateway = gateways
.iter()
.find(|gateway| gateway.identity_key == user_gateway)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_identity.to_string()))?;
.ok_or_else(|| {
tracing::debug!(
"Gateway {gateway_identity} not found in {} available gateways",
gateways.len()
);
ClientCoreError::NoGatewayWithId(gateway_identity.to_string())
})?;
let Some(entry_details) = gateway.entry.as_ref() else {
return Err(ClientCoreError::UnsupportedEntry {
@@ -359,3 +422,52 @@ pub(super) async fn register_with_gateway(
authenticated_ephemeral_client: gateway_client,
})
}
#[cfg(test)]
mod tests {
use url::Url;
#[test]
fn test_single_url_builds_without_retries() {
let urls = [Url::parse("https://api.nym.com").unwrap()];
let nym_api_urls: Vec<nym_http_api_client::Url> = urls
.iter()
.map(|url| nym_http_api_client::Url::from(url.clone()))
.collect();
assert_eq!(nym_api_urls.len(), 1, "Should have exactly one URL");
}
#[test]
fn test_multiple_urls_prepared_for_retries() {
let urls = vec![
Url::parse("https://api1.nym.com").unwrap(),
Url::parse("https://api2.nym.com").unwrap(),
Url::parse("https://api3.nym.com").unwrap(),
];
let nym_api_urls: Vec<nym_http_api_client::Url> = urls
.iter()
.map(|url| nym_http_api_client::Url::from(url.clone()))
.collect();
assert_eq!(nym_api_urls.len(), 3, "Should have all three URLs");
assert!(
nym_api_urls.len() > 1,
"Multiple URLs trigger retry behavior"
);
}
#[test]
fn test_empty_url_list_is_detected() {
let urls: Vec<Url> = vec![];
let nym_api_urls: Vec<nym_http_api_client::Url> = urls
.iter()
.map(|url| nym_http_api_client::Url::from(url.clone()))
.collect();
assert!(nym_api_urls.is_empty(), "Empty list should remain empty");
}
}
+6 -36
View File
@@ -17,17 +17,19 @@ pub use nym_topology::{
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
};
#[deprecated(note = "use spawn_future from nym_task crate instead")]
#[cfg(target_arch = "wasm32")]
#[track_caller]
pub fn spawn_future<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(future);
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
}
// TODO: expose similar API to the rest of the codebase,
// perhaps with some simple trait for a task to define its name
#[deprecated(note = "use spawn_future from nym_task crate instead")]
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn_future<F>(future: F)
@@ -37,35 +39,3 @@ where
{
tokio::spawn(future);
}
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn_named_future<F>(future: F, name: &str)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
cfg_if::cfg_if! {if #[cfg(tokio_unstable)] {
#[allow(clippy::expect_used)]
tokio::task::Builder::new().name(name).spawn(future).expect("failed to spawn future");
} else {
let _ = name;
tracing::debug!(r#"the underlying binary hasn't been built with `RUSTFLAGS="--cfg tokio_unstable"` - the future naming won't do anything"#);
spawn_future(future);
}}
}
#[macro_export]
macro_rules! spawn_future {
($future:expr) => {{
$crate::spawn_future($future)
}};
($future:expr, $name:expr) => {{
cfg_if::cfg_if! {if #[cfg(not(target_arch = "wasm32"))] {
$crate::spawn_named_future($future, $name)
} else {
let _ = $name;
$crate::spawn_future($future)
}}
}};
}
+2 -2
View File
@@ -40,7 +40,7 @@ where
pub async fn flush_on_shutdown(
mut self,
mem_state: CombinedReplyStorage,
mut shutdown: nym_task::TaskClient,
shutdown: nym_task::ShutdownToken,
) {
use tracing::{debug, error, info};
@@ -50,7 +50,7 @@ where
return;
}
shutdown.recv().await;
shutdown.cancelled().await;
info!("PersistentReplyStorage is flushing all reply-related data to underlying storage");
if let Err(err) = self.backend.flush_surb_storage(&mem_state).await {
@@ -12,7 +12,7 @@ use crate::socket_state::{ws_fd, PartiallyDelegatedHandle, SocketState};
use crate::traits::GatewayPacketRouter;
use crate::{cleanup_socket_message, try_decrypt_binary_message};
use futures::{SinkExt, StreamExt};
use nym_bandwidth_controller::{BandwidthController, BandwidthStatusMessage};
use nym_bandwidth_controller::BandwidthController;
use nym_credential_storage::ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_credentials::CredentialSpendingData;
@@ -27,7 +27,7 @@ use nym_gateway_requests::{
use nym_sphinx::forwarding::packet::MixPacket;
use nym_statistics_common::clients::connection::ConnectionStatsEvent;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use rand::rngs::OsRng;
use std::sync::Arc;
@@ -109,7 +109,7 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
/// Listen to shutdown messages and send notifications back to the task manager
task_client: TaskClient,
shutdown_token: ShutdownToken,
}
impl<C, St> GatewayClient<C, St> {
@@ -124,7 +124,7 @@ impl<C, St> GatewayClient<C, St> {
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> Self {
GatewayClient {
cfg,
@@ -141,7 +141,7 @@ impl<C, St> GatewayClient<C, St> {
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
task_client,
shutdown_token,
}
}
@@ -293,7 +293,7 @@ impl<C, St> GatewayClient<C, St> {
loop {
tokio::select! {
_ = self.task_client.recv() => {
_ = self.shutdown_token.cancelled() => {
log::trace!("GatewayClient control response: Received shutdown");
log::debug!("GatewayClient control response: Exiting");
break Err(GatewayClientError::ConnectionClosedGatewayShutdown);
@@ -514,7 +514,7 @@ impl<C, St> GatewayClient<C, St> {
self.cfg.bandwidth.require_tickets,
derive_aes256_gcm_siv_key,
#[cfg(not(target_arch = "wasm32"))]
self.task_client.clone(),
self.shutdown_token.clone(),
)
.await
.map_err(GatewayClientError::RegistrationFailure),
@@ -631,9 +631,6 @@ impl<C, St> GatewayClient<C, St> {
self.negotiated_protocol = protocol_version;
log::debug!("authenticated: {status}, bandwidth remaining: {bandwidth_remaining}");
self.task_client.send_status_msg(Box::new(
BandwidthStatusMessage::RemainingBandwidth(bandwidth_remaining),
));
Ok(())
}
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
@@ -1069,7 +1066,7 @@ impl<C, St> GatewayClient<C, St> {
.expect("no shared key present even though we're authenticated!"),
),
self.bandwidth.clone(),
self.task_client.clone(),
self.shutdown_token.clone(),
)
}
_ => unreachable!(),
@@ -1143,8 +1140,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
// perfectly fine here, because it's not meant to be used
let (ack_tx, _) = mpsc::unbounded();
let (mix_tx, _) = mpsc::unbounded();
let task_client = TaskClient::dummy();
let packet_router = PacketRouter::new(ack_tx, mix_tx, task_client.clone());
let shutdown_token = ShutdownToken::default();
let packet_router = PacketRouter::new(ack_tx, mix_tx, shutdown_token.clone());
GatewayClient {
cfg: GatewayClientConfig::default().with_disabled_credentials_mode(true),
@@ -1157,11 +1154,11 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
connection: SocketState::NotConnected,
packet_router,
bandwidth_controller: None,
stats_reporter: ClientStatsSender::new(None, task_client.clone()),
stats_reporter: ClientStatsSender::new(None, shutdown_token.clone()),
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
task_client,
shutdown_token,
}
}
@@ -1170,7 +1167,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
packet_router: PacketRouter,
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
task_client: TaskClient,
shutdown_token: ShutdownToken,
) -> GatewayClient<C, St> {
// invariants that can't be broken
// (unless somebody decided to expose some field that wasn't meant to be exposed)
@@ -1193,7 +1190,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
negotiated_protocol: self.negotiated_protocol,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
task_client,
shutdown_token,
}
}
}
@@ -7,7 +7,7 @@
use crate::error::GatewayClientError;
use crate::GatewayPacketRouter;
use futures::channel::mpsc;
use nym_task::TaskClient;
use nym_task::ShutdownToken;
pub type MixnetMessageSender = mpsc::UnboundedSender<Vec<Vec<u8>>>;
pub type MixnetMessageReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
@@ -19,14 +19,14 @@ pub type AcknowledgementReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
pub struct PacketRouter {
ack_sender: AcknowledgementSender,
mixnet_message_sender: MixnetMessageSender,
shutdown: TaskClient,
shutdown: ShutdownToken,
}
impl PacketRouter {
pub fn new(
ack_sender: AcknowledgementSender,
mixnet_message_sender: MixnetMessageSender,
shutdown: TaskClient,
shutdown: ShutdownToken,
) -> Self {
PacketRouter {
ack_sender,
@@ -42,7 +42,7 @@ impl PacketRouter {
if let Err(err) = self.mixnet_message_sender.unbounded_send(received_messages) {
// check if the failure is due to the shutdown being in progress and thus the receiver channel
// having already been dropped
if self.shutdown.is_shutdown_poll() || self.shutdown.is_dummy() {
if self.shutdown.is_cancelled() {
// This should ideally not happen, but it's ok
tracing::warn!("Failed to send mixnet messages due to receiver task shutdown");
return Err(GatewayClientError::ShutdownInProgress);
@@ -58,7 +58,7 @@ impl PacketRouter {
if let Err(err) = self.ack_sender.unbounded_send(received_acks) {
// check if the failure is due to the shutdown being in progress and thus the receiver channel
// having already been dropped
if self.shutdown.is_shutdown_poll() || self.shutdown.is_dummy() {
if self.shutdown.is_cancelled() {
// This should ideally not happen, but it's ok
tracing::warn!("Failed to send acks due to receiver task shutdown");
return Err(GatewayClientError::ShutdownInProgress);
@@ -69,10 +69,6 @@ impl PacketRouter {
}
Ok(())
}
pub fn disarm(&mut self) {
self.shutdown.disarm();
}
}
impl GatewayPacketRouter for PacketRouter {
@@ -11,7 +11,7 @@ use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_gateway_requests::{SensitiveServerResponse, ServerResponse, SimpleGatewayRequestsError};
use nym_task::TaskClient;
use nym_task::ShutdownToken;
use si_scale::helpers::bibytes2;
use std::os::raw::c_int as RawFd;
use std::sync::Arc;
@@ -87,13 +87,13 @@ impl PartiallyDelegatedRouter {
}
}
async fn run(mut self, mut split_stream: SplitStream<WsConn>, mut task_client: TaskClient) {
async fn run(mut self, mut split_stream: SplitStream<WsConn>, shutdown_token: ShutdownToken) {
let mut chunked_stream = (&mut split_stream).ready_chunks(8);
let ret: Result<_, GatewayClientError> = loop {
tokio::select! {
biased;
// received system-wide shutdown
_ = task_client.recv() => {
_ = shutdown_token.cancelled() => {
log::trace!("GatewayClient listener: Received shutdown");
log::debug!("GatewayClient listener: Exiting");
return;
@@ -118,11 +118,7 @@ impl PartiallyDelegatedRouter {
let return_res = match ret {
Err(err) => self.stream_return.send(Err(err)),
Ok(_) => {
self.packet_router.disarm();
task_client.disarm();
self.stream_return.send(Ok(split_stream))
}
Ok(_) => self.stream_return.send(Ok(split_stream)),
};
if return_res.is_err() {
@@ -266,8 +262,8 @@ impl PartiallyDelegatedRouter {
Ok(plaintexts)
}
fn spawn(self, split_stream: SplitStream<WsConn>, task_client: TaskClient) {
let fut = async move { self.run(split_stream, task_client).await };
fn spawn(self, split_stream: SplitStream<WsConn>, shutdown_token: ShutdownToken) {
let fut = async move { self.run(split_stream, shutdown_token).await };
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(fut);
@@ -283,7 +279,7 @@ impl PartiallyDelegatedHandle {
packet_router: PacketRouter,
shared_key: Arc<SharedGatewayKey>,
client_bandwidth: ClientBandwidth,
shutdown: TaskClient,
shutdown: ShutdownToken,
) -> Self {
// when called for, it NEEDS TO yield back the stream so that we could merge it and
// read control request responses.
@@ -3,7 +3,7 @@ name = "nym-validator-client"
version = "0.1.0"
authors = ["Jędrzej Stuczyński <andrew@nymtech.net>"]
edition = "2021"
rust-version = "1.56"
rust-version = "1.85"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+16 -145
View File
@@ -5,8 +5,8 @@ use crate::nyxd::{self, NyxdClient};
use crate::signing::direct_wallet::DirectSecp256k1HdWallet;
use crate::signing::signer::{NoSigner, OfflineSigner};
use crate::{
nym_api, DirectSigningReqwestRpcValidatorClient, QueryReqwestRpcValidatorClient,
ReqwestRpcClient, ValidatorClientError,
DirectSigningReqwestRpcValidatorClient, QueryReqwestRpcValidatorClient, ReqwestRpcClient,
ValidatorClientError,
};
use nym_api_requests::ecash::models::{
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
@@ -20,11 +20,9 @@ use nym_api_requests::ecash::{
PartialExpirationDateSignatureResponse, VerificationKeyResponse,
};
use nym_api_requests::models::{
ApiHealthResponse, GatewayBondAnnotated, GatewayCoreStatusResponse,
HistoricalPerformanceResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
NymNodeDescription, RewardEstimationResponse, StakeSaturationResponse,
ApiHealthResponse, GatewayCoreStatusResponse, HistoricalPerformanceResponse,
MixnodeCoreStatusResponse, NymNodeDescription,
};
use nym_api_requests::models::{LegacyDescribedGateway, MixNodeBondAnnotated};
use nym_api_requests::nym_nodes::{
NodesByAddressesResponse, SemiSkimmedNodesWithMetadata, SkimmedNode, SkimmedNodesWithMetadata,
};
@@ -153,7 +151,7 @@ impl Config {
pub struct Client<C, S = NoSigner> {
// ideally they would have been read-only, but unfortunately rust doesn't have such features
// #[deprecated(note = "please use `nym_api_client` instead")]
pub nym_api: nym_api::Client,
pub nym_api: nym_http_api_client::Client,
// pub nym_api_client: NymApiClient,
pub nyxd: NyxdClient<C, S>,
}
@@ -214,7 +212,7 @@ impl Client<ReqwestRpcClient> {
impl<C> Client<C> {
pub fn new_with_rpc_client(config: Config, rpc_client: C) -> Self {
let nym_api_client = nym_api::Client::new(config.api_url.clone(), None);
let nym_api_client = nym_http_api_client::Client::new(config.api_url.clone(), None);
Client {
nym_api: nym_api_client,
@@ -228,7 +226,7 @@ impl<C, S> Client<C, S> {
where
S: OfflineSigner,
{
let nym_api_client = nym_api::Client::new(config.api_url.clone(), None);
let nym_api_client = nym_http_api_client::Client::new(config.api_url.clone(), None);
Client {
nym_api: nym_api_client,
@@ -249,65 +247,6 @@ impl<C, S> Client<C, S> {
self.nym_api.change_base_urls(vec![new_endpoint.into()])
}
#[deprecated]
pub async fn get_cached_mixnodes(&self) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_mixnodes_detailed(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
Ok(self.nym_api.get_mixnodes_detailed().await?)
}
#[deprecated]
pub async fn get_cached_mixnodes_detailed_unfiltered(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
Ok(self.nym_api.get_mixnodes_detailed_unfiltered().await?)
}
#[deprecated]
pub async fn get_cached_rewarded_mixnodes(
&self,
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_rewarded_mixnodes_detailed(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_mixnodes_detailed().await?)
}
#[deprecated]
pub async fn get_cached_active_mixnodes(
&self,
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_active_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_active_mixnodes_detailed(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
Ok(self.nym_api.get_active_mixnodes_detailed().await?)
}
#[deprecated]
pub async fn get_cached_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
Ok(self.nym_api.get_gateways().await?)
}
#[deprecated]
pub async fn get_cached_gateways_detailed_unfiltered(
&self,
) -> Result<Vec<GatewayBondAnnotated>, ValidatorClientError> {
Ok(self.nym_api.get_gateways_detailed_unfiltered().await?)
}
pub async fn get_full_node_performance_history(
&self,
node_id: NodeId,
@@ -385,38 +324,25 @@ impl<C, S> Client<C, S> {
}
}
/// DEPRECATED: Use nym_http_api_client::Client with from_network() or with_bincode() instead
#[deprecated(
since = "1.2.0",
note = "Use nym_http_api_client::Client::from_network() or ClientBuilder::with_bincode() instead"
)]
#[derive(Clone)]
pub struct NymApiClient {
pub use_bincode: bool,
pub nym_api: nym_api::Client,
pub nym_api: nym_http_api_client::Client,
// TODO: perhaps if we really need it at some (currently I don't see any reasons for it)
// we could re-implement the communication with the REST API on port 1317
}
impl From<nym_api::Client> for NymApiClient {
fn from(nym_api: nym_api::Client) -> Self {
NymApiClient {
use_bincode: false,
nym_api,
}
}
}
// we have to allow the use of deprecated method here as they're calling the deprecated trait methods
#[allow(deprecated)]
impl NymApiClient {
pub fn new(api_url: Url) -> Self {
let nym_api = nym_api::Client::new(api_url, None);
NymApiClient {
use_bincode: true,
nym_api,
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn new_with_timeout(api_url: Url, timeout: std::time::Duration) -> Self {
let nym_api = nym_api::Client::new(api_url, Some(timeout));
let nym_api = nym_http_api_client::Client::new(api_url, Some(timeout));
NymApiClient {
use_bincode: true,
@@ -431,10 +357,10 @@ impl NymApiClient {
}
pub fn new_with_user_agent(api_url: Url, user_agent: impl Into<UserAgent>) -> Self {
let nym_api = nym_api::Client::builder::<_, ValidatorClientError>(api_url)
let nym_api = nym_http_api_client::Client::builder(api_url)
.expect("invalid api url")
.with_user_agent(user_agent.into())
.build::<ValidatorClientError>()
.build()
.expect("failed to build nym api client");
NymApiClient {
@@ -571,37 +497,6 @@ impl NymApiClient {
Ok(self.nym_api.health().await?)
}
#[deprecated]
pub async fn get_cached_active_mixnodes(
&self,
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_active_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_rewarded_mixnodes(
&self,
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_mixnodes(&self) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
Ok(self.nym_api.get_gateways().await?)
}
#[deprecated]
pub async fn get_cached_described_gateways(
&self,
) -> Result<Vec<LegacyDescribedGateway>, ValidatorClientError> {
Ok(self.nym_api.get_gateways_described().await?)
}
pub async fn get_all_described_nodes(
&self,
) -> Result<Vec<NymNodeDescription>, ValidatorClientError> {
@@ -668,30 +563,6 @@ impl NymApiClient {
.await?)
}
#[deprecated]
pub async fn get_mixnode_status(
&self,
mix_id: NodeId,
) -> Result<MixnodeStatusResponse, ValidatorClientError> {
Ok(self.nym_api.get_mixnode_status(mix_id).await?)
}
#[deprecated]
pub async fn get_mixnode_reward_estimation(
&self,
mix_id: NodeId,
) -> Result<RewardEstimationResponse, ValidatorClientError> {
Ok(self.nym_api.get_mixnode_reward_estimation(mix_id).await?)
}
#[deprecated]
pub async fn get_mixnode_stake_saturation(
&self,
mix_id: NodeId,
) -> Result<StakeSaturationResponse, ValidatorClientError> {
Ok(self.nym_api.get_mixnode_stake_saturation(mix_id).await?)
}
pub async fn blind_sign(
&self,
request_body: &BlindSignRequestBody,
@@ -3,7 +3,6 @@
use crate::nyxd::contract_traits::{DkgQueryClient, PagedDkgQueryClient};
use crate::nyxd::error::NyxdError;
use crate::NymApiClient;
use nym_coconut_dkg_common::types::{EpochId, NodeIndex};
use nym_coconut_dkg_common::verification_key::ContractVKShare;
use nym_compact_ecash::error::CompactEcashError;
@@ -15,7 +14,7 @@ use url::Url;
// TODO: it really doesn't feel like this should live in this crate.
#[derive(Clone)]
pub struct EcashApiClient {
pub api_client: NymApiClient,
pub api_client: nym_http_api_client::Client,
pub verification_key: VerificationKeyAuth,
pub node_id: NodeIndex,
pub cosmos_address: cosmrs::AccountId,
@@ -25,10 +24,15 @@ impl Display for EcashApiClient {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[id: {}] {} @ {}",
"[id: {}] {} @ ({})",
self.node_id,
self.cosmos_address,
self.api_client.api_url()
self.api_client
.base_urls()
.iter()
.map(|url| url.to_string())
.collect::<Vec<String>>()
.join(", ")
)
}
}
@@ -60,6 +64,9 @@ pub enum EcashApiError {
source: CompactEcashError,
},
#[error("failed to create API client: {0}")]
ClientError(String),
#[error("the provided account address is malformed: {source}")]
MalformedAccountAddress {
#[from]
@@ -89,8 +96,13 @@ impl TryFrom<ContractVKShare> for EcashApiClient {
// In non-client applications this resolver can cause warning logs about H2 connection
// failure. This indicates that the long lived https connection was closed by the remote
// peer and the resolver will have to reconnect. It should not impact actual functionality
let api_client = nym_http_api_client::Client::builder(url_address)
.map_err(|e| EcashApiError::ClientError(e.to_string()))?
.build()
.map_err(|e| EcashApiError::ClientError(e.to_string()))?;
Ok(EcashApiClient {
api_client: NymApiClient::new(url_address),
api_client,
verification_key: VerificationKeyAuth::try_from_bs58(&share.share)?,
node_id: share.node_index,
cosmos_address: share.owner.as_str().parse()?,
@@ -1,7 +1,8 @@
use crate::nym_api::NymApiClientExt;
use crate::nyxd::contract_traits::MixnetQueryClient;
use crate::nyxd::error::NyxdError;
use crate::nyxd::Config as ClientConfig;
use crate::{NymApiClient, QueryHttpRpcNyxdClient, ValidatorClientError};
use crate::{QueryHttpRpcNyxdClient, ValidatorClientError};
use colored::Colorize;
use core::fmt;
use itertools::Itertools;
@@ -87,8 +88,17 @@ fn setup_connection_tests<H: BuildHasher + 'static>(
}
});
let api_connection_test_clients = api_urls.map(|(network, url)| {
ClientForConnectionTest::Api(network, url.clone(), NymApiClient::new(url))
let api_connection_test_clients = api_urls.filter_map(|(network, url)| {
match nym_http_api_client::Client::builder(url.clone()).and_then(|b| b.build()) {
Ok(client) => Some(ClientForConnectionTest::Api(network, url, client)),
Err(err) => {
eprintln!(
"Failed to create API client for {}: {err}",
network.network_name
);
None
}
}
});
nyxd_connection_test_clients.chain(api_connection_test_clients)
@@ -160,7 +170,7 @@ async fn test_nyxd_connection(
async fn test_nym_api_connection(
network: NymNetworkDetails,
url: &Url,
client: &NymApiClient,
client: &nym_http_api_client::Client,
) -> ConnectionResult {
let result = match timeout(
Duration::from_secs(CONNECTION_TEST_TIMEOUT_SEC),
@@ -186,7 +196,7 @@ async fn test_nym_api_connection(
enum ClientForConnectionTest {
Nyxd(NymNetworkDetails, Url, Box<QueryHttpRpcNyxdClient>),
Api(NymNetworkDetails, Url, NymApiClient),
Api(NymNetworkDetails, Url, nym_http_api_client::Client),
}
impl ClientForConnectionTest {
@@ -9,8 +9,7 @@ use thiserror::Error;
pub enum ValidatorClientError {
#[error("nym api request failed: {source}")]
NymAPIError {
#[from]
source: nym_api::error::NymAPIError,
source: Box<nym_api::error::NymAPIError>,
},
#[error("Tendermint RPC request failure: {0}")]
@@ -28,3 +27,11 @@ pub enum ValidatorClientError {
#[error("No validator API url has been provided")]
NoAPIUrlAvailable,
}
impl From<nym_api::error::NymAPIError> for ValidatorClientError {
fn from(source: nym_api::error::NymAPIError) -> Self {
ValidatorClientError::NymAPIError {
source: Box::new(source),
}
}
}
@@ -14,7 +14,6 @@ pub mod signing;
pub use crate::error::ValidatorClientError;
pub use crate::rpc::reqwest::ReqwestRpcClient;
pub use crate::signing::direct_wallet::DirectSecp256k1HdWallet;
pub use client::NymApiClient;
pub use client::{Client, Config, EcashApiClient};
pub use nym_api_requests::*;
pub use nym_http_api_client::UserAgent;
@@ -1,7 +1,6 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_api_requests::models::RequestError;
use nym_http_api_client::HttpClientError;
pub type NymAPIError = HttpClientError<RequestError>;
pub type NymAPIError = HttpClientError;
@@ -3,6 +3,7 @@
use crate::nym_api::error::NymAPIError;
use crate::nym_api::routes::{ecash, CORE_STATUS_COUNT, SINCE_ARG};
use crate::nym_nodes::SkimmedNodesWithMetadata;
use async_trait::async_trait;
use nym_api_requests::ecash::models::{
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
@@ -15,9 +16,8 @@ use nym_api_requests::ecash::models::{
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, BinaryBuildInformationOwned, ChainBlocksStatusResponse,
ChainStatusResponse, KeyRotationInfoResponse, LegacyDescribedMixNode, NodePerformanceResponse,
NodeRefreshBody, NymNodeDescription, PerformanceHistoryResponse, RewardedSetResponse,
SignerInformationResponse,
ChainStatusResponse, KeyRotationInfoResponse, NodePerformanceResponse, NodeRefreshBody,
NymNodeDescription, PerformanceHistoryResponse, RewardedSetResponse, SignerInformationResponse,
};
use nym_api_requests::nym_nodes::{
NodesByAddressesRequestBody, NodesByAddressesResponse, PaginatedCachedNodesResponseV1,
@@ -31,26 +31,22 @@ pub use nym_api_requests::{
VerifyEcashCredentialBody,
},
models::{
ComputeRewardEstParam, GatewayBondAnnotated, GatewayCoreStatusResponse,
GatewayStatusReportResponse, GatewayUptimeHistoryResponse, LegacyDescribedGateway,
MixNodeBondAnnotated, MixnodeCoreStatusResponse, MixnodeStatusReportResponse,
MixnodeStatusResponse, MixnodeUptimeHistoryResponse, RewardEstimationResponse,
StakeSaturationResponse, UptimeResponse,
GatewayCoreStatusResponse, GatewayStatusReportResponse, GatewayUptimeHistoryResponse,
MixnodeCoreStatusResponse, MixnodeStatusReportResponse, MixnodeStatusResponse,
MixnodeUptimeHistoryResponse, StakeSaturationResponse, UptimeResponse,
},
nym_nodes::{CachedNodesResponse, SemiSkimmedNode, SkimmedNode},
nym_nodes::{CachedNodesResponse, SemiSkimmedNode, SemiSkimmedNodesWithMetadata, SkimmedNode},
NymNetworkDetailsResponse,
};
use nym_contracts_common::IdentityKey;
use nym_http_api_client::{ApiClient, NO_PARAMS};
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, NodeId, NymNodeDetails};
use nym_mixnet_contract_common::{IdentityKeyRef, NodeId, NymNodeDetails};
use std::net::IpAddr;
use time::format_description::BorrowedFormatItem;
use time::Date;
use tracing::instrument;
use crate::ValidatorClientError;
pub use nym_coconut_dkg_common::types::EpochId;
pub use nym_http_api_client::Client;
pub mod error;
pub mod routes;
@@ -62,6 +58,9 @@ pub fn rfc_3339_date() -> Vec<BorrowedFormatItem<'static>> {
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait NymApiClientExt: ApiClient {
/// Get the current API URL being used by the client
fn api_url(&self) -> &url::Url;
async fn health(&self) -> Result<ApiHealthResponse, NymAPIError> {
self.get_json(
&[
@@ -87,104 +86,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(&[routes::V1_API_VERSION, routes::MIXNODES], NO_PARAMS)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS,
routes::MIXNODES,
routes::DETAILED,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateways_detailed(&self) -> Result<Vec<GatewayBondAnnotated>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS,
routes::GATEWAYS,
routes::DETAILED,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateways_detailed_unfiltered(
&self,
) -> Result<Vec<GatewayBondAnnotated>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS,
routes::GATEWAYS,
routes::DETAILED_UNFILTERED,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_detailed_unfiltered(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS,
routes::MIXNODES,
routes::DETAILED_UNFILTERED,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateways(&self) -> Result<Vec<GatewayBond>, NymAPIError> {
self.get_json(&[routes::V1_API_VERSION, routes::GATEWAYS], NO_PARAMS)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateways_described(&self) -> Result<Vec<LegacyDescribedGateway>, NymAPIError> {
self.get_json(
&[routes::V1_API_VERSION, routes::GATEWAYS, routes::DESCRIBED],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_described(&self) -> Result<Vec<LegacyDescribedMixNode>, NymAPIError> {
self.get_json(
&[routes::V1_API_VERSION, routes::MIXNODES, routes::DESCRIBED],
NO_PARAMS,
)
.await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_node_performance_history(
&self,
@@ -241,6 +142,156 @@ pub trait NymApiClientExt: ApiClient {
.await
}
async fn get_current_rewarded_set(&self) -> Result<RewardedSetResponse, NymAPIError> {
self.get_rewarded_set().await
}
async fn get_all_basic_nodes_with_metadata(
&self,
) -> Result<SkimmedNodesWithMetadata, NymAPIError> {
// unroll first loop iteration in order to obtain the metadata
let mut page = 0;
let res = self
.get_basic_nodes_v2(false, Some(page), None, true)
.await?;
let mut nodes = res.nodes.data;
let metadata = res.metadata;
if res.nodes.pagination.total == nodes.len() {
return Ok(SkimmedNodesWithMetadata::new(nodes, metadata));
}
page += 1;
loop {
let mut res = self
.get_basic_nodes_v2(false, Some(page), None, true)
.await?;
if !metadata.consistency_check(&res.metadata) {
// Create a custom error for inconsistent metadata
return Err(NymAPIError::InternalResponseInconsistency {
url: self.api_url().clone(),
details: "Inconsistent paged metadata".to_string(),
});
}
nodes.append(&mut res.nodes.data);
if nodes.len() >= res.nodes.pagination.total {
break;
} else {
page += 1
}
}
Ok(SkimmedNodesWithMetadata::new(nodes, metadata))
}
async fn get_all_basic_active_mixing_assigned_nodes_with_metadata(
&self,
) -> Result<SkimmedNodesWithMetadata, NymAPIError> {
// Get all mixing nodes that are in the active/rewarded set
let mut page = 0;
let res = self
.get_basic_active_mixing_assigned_nodes_v2(false, Some(page), None, false)
.await?;
let metadata = res.metadata;
let mut nodes = res.nodes.data;
if res.nodes.pagination.total == nodes.len() {
return Ok(SkimmedNodesWithMetadata::new(nodes, metadata));
}
page += 1;
loop {
let res = self
.get_basic_active_mixing_assigned_nodes_v2(false, Some(page), None, false)
.await?;
if !metadata.consistency_check(&res.metadata) {
return Err(NymAPIError::InternalResponseInconsistency {
url: self.api_url().clone(),
details: "Inconsistent paged metadata".to_string(),
});
}
nodes.append(&mut res.nodes.data.clone());
// Check if we've got all nodes
if nodes.len() >= res.nodes.pagination.total {
break;
} else {
page += 1;
}
}
Ok(SkimmedNodesWithMetadata::new(nodes, metadata))
}
async fn get_all_basic_entry_assigned_nodes_with_metadata(
&self,
) -> Result<SkimmedNodesWithMetadata, NymAPIError> {
// Get all nodes that can act as entry gateways
let mut page = 0;
let res = self
.get_basic_entry_assigned_nodes_v2(false, Some(page), None, false)
.await?;
let metadata = res.metadata;
let mut nodes = res.nodes.data;
if res.nodes.pagination.total == nodes.len() {
return Ok(SkimmedNodesWithMetadata::new(nodes, metadata));
}
page += 1;
loop {
let res = self
.get_basic_entry_assigned_nodes_v2(false, Some(page), None, false)
.await?;
if !metadata.consistency_check(&res.metadata) {
return Err(NymAPIError::InternalResponseInconsistency {
url: self.api_url().clone(),
details: "Inconsistent paged metadata".to_string(),
});
}
nodes.append(&mut res.nodes.data.clone());
// Check if we've got all nodes
if nodes.len() >= res.nodes.pagination.total {
break;
} else {
page += 1;
}
}
Ok(SkimmedNodesWithMetadata::new(nodes, metadata))
}
async fn get_all_described_nodes(&self) -> Result<Vec<NymNodeDescription>, NymAPIError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut descriptions = Vec::new();
loop {
let mut res = self.get_nodes_described(Some(page), None).await?;
descriptions.append(&mut res.data);
if descriptions.len() < res.pagination.total {
page += 1
} else {
break;
}
}
Ok(descriptions)
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_nym_nodes(
&self,
@@ -268,6 +319,25 @@ pub trait NymApiClientExt: ApiClient {
.await
}
async fn get_all_bonded_nym_nodes(&self) -> Result<Vec<NymNodeDetails>, ValidatorClientError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut bonds = Vec::new();
loop {
let mut res = self.get_nym_nodes(Some(page), None).await?;
bonds.append(&mut res.data);
if bonds.len() < res.pagination.total {
page += 1
} else {
break;
}
}
Ok(bonds)
}
#[deprecated]
#[tracing::instrument(level = "debug", skip_all)]
async fn get_basic_mixnodes(&self) -> Result<CachedNodesResponse<SkimmedNode>, NymAPIError> {
@@ -677,42 +747,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(
&[routes::V1_API_VERSION, routes::MIXNODES, routes::ACTIVE],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_active_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS,
routes::MIXNODES,
routes::ACTIVE,
routes::DETAILED,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(
&[routes::V1_API_VERSION, routes::MIXNODES, routes::REWARDED],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_report(
@@ -789,24 +823,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_mixnodes_detailed(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS,
routes::MIXNODES,
routes::REWARDED,
routes::DETAILED,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateway_core_status_count(
@@ -874,104 +890,6 @@ pub trait NymApiClientExt: ApiClient {
}
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_status(
&self,
mix_id: NodeId,
) -> Result<MixnodeStatusResponse, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS_ROUTES,
routes::MIXNODE,
&mix_id.to_string(),
routes::STATUS,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_reward_estimation(
&self,
mix_id: NodeId,
) -> Result<RewardEstimationResponse, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS_ROUTES,
routes::MIXNODE,
&mix_id.to_string(),
routes::REWARD_ESTIMATION,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn compute_mixnode_reward_estimation(
&self,
mix_id: NodeId,
request_body: &ComputeRewardEstParam,
) -> Result<RewardEstimationResponse, NymAPIError> {
self.post_json(
&[
routes::V1_API_VERSION,
routes::STATUS_ROUTES,
routes::MIXNODE,
&mix_id.to_string(),
routes::COMPUTE_REWARD_ESTIMATION,
],
NO_PARAMS,
request_body,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_stake_saturation(
&self,
mix_id: NodeId,
) -> Result<StakeSaturationResponse, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS_ROUTES,
routes::MIXNODE,
&mix_id.to_string(),
routes::STAKE_SATURATION,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[allow(deprecated)]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_inclusion_probability(
&self,
mix_id: NodeId,
) -> Result<nym_api_requests::models::InclusionProbabilityResponse, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::STATUS_ROUTES,
routes::MIXNODE,
&mix_id.to_string(),
routes::INCLUSION_CHANCE,
],
NO_PARAMS,
)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_current_node_performance(
&self,
@@ -1020,34 +938,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_blacklisted(&self) -> Result<Vec<NodeId>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::MIXNODES,
routes::BLACKLISTED,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateways_blacklisted(&self) -> Result<Vec<IdentityKey>, NymAPIError> {
self.get_json(
&[
routes::V1_API_VERSION,
routes::GATEWAYS,
routes::BLACKLISTED,
],
NO_PARAMS,
)
.await
}
#[instrument(level = "debug", skip(self, request_body))]
async fn blind_sign(
&self,
@@ -1371,8 +1261,49 @@ pub trait NymApiClientExt: ApiClient {
)
.await
}
/// Method to change the base API URLs being used by the client
fn change_base_urls(&mut self, urls: Vec<url::Url>);
/// Retrieve expanded information for all bonded nodes on the network
async fn get_all_expanded_nodes(&self) -> Result<SemiSkimmedNodesWithMetadata, NymAPIError> {
// Unroll the first iteration to get the metadata
let mut page = 0;
let res = self.get_expanded_nodes(false, Some(page), None).await?;
let mut nodes = res.nodes.data;
let metadata = res.metadata;
if res.nodes.pagination.total == nodes.len() {
return Ok(SemiSkimmedNodesWithMetadata::new(nodes, metadata));
}
page += 1;
loop {
let mut res = self.get_expanded_nodes(false, Some(page), None).await?;
nodes.append(&mut res.nodes.data);
if nodes.len() < res.nodes.pagination.total {
page += 1
} else {
break;
}
}
Ok(SemiSkimmedNodesWithMetadata::new(nodes, metadata))
}
}
// Client is already nym_http_api_client::Client (re-exported above), so just one impl needed
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl NymApiClientExt for Client {}
impl NymApiClientExt for nym_http_api_client::Client {
fn api_url(&self) -> &url::Url {
self.current_url().as_ref()
}
fn change_base_urls(&mut self, urls: Vec<url::Url>) {
self.change_base_urls(urls.into_iter().map(|u| u.into()).collect());
}
}
@@ -136,6 +136,27 @@ pub trait DkgSigningClient {
self.execute_dkg_contract(fee, req, "trigger DKG resharing".to_string(), vec![])
.await
}
async fn transfer_ownership(
&self,
transfer_to: String,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
let req = DkgExecuteMsg::TransferOwnership { transfer_to };
self.execute_dkg_contract(fee, req, "".to_string(), vec![])
.await
}
async fn update_announce_address(
&self,
new_address: String,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
let req = DkgExecuteMsg::UpdateAnnounceAddress { new_address };
self.execute_dkg_contract(fee, req, "".to_string(), vec![])
.await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -168,6 +189,7 @@ where
mod tests {
use super::*;
use crate::nyxd::contract_traits::tests::IgnoreValue;
use nym_coconut_dkg_common::msg::ExecuteMsg;
// it's enough that this compiles and clippy is happy about it
#[allow(dead_code)]
@@ -210,6 +232,12 @@ mod tests {
DkgExecuteMsg::AdvanceEpochState {} => client.advance_dkg_epoch_state(None).ignore(),
DkgExecuteMsg::TriggerReset {} => client.trigger_dkg_reset(None).ignore(),
DkgExecuteMsg::TriggerResharing {} => client.trigger_dkg_resharing(None).ignore(),
ExecuteMsg::TransferOwnership { transfer_to } => {
client.transfer_ownership(transfer_to, None).ignore()
}
ExecuteMsg::UpdateAnnounceAddress { new_address } => {
client.update_announce_address(new_address, None).ignore()
}
};
}
}
@@ -28,7 +28,7 @@ use cosmrs::proto::cosmwasm::wasm::v1::{
QueryRawContractStateResponse, QuerySmartContractStateRequest, QuerySmartContractStateResponse,
};
use cosmrs::tendermint::{block, chain, Hash};
use cosmrs::{AccountId, Coin as CosmosCoin, Tx};
use cosmrs::{AccountId, Coin as CosmosCoin};
use prost::Message;
use serde::{Deserialize, Serialize};
@@ -556,23 +556,12 @@ pub trait CosmWasmClient: TendermintRpcClient {
Ok(serde_json::from_slice(&res.data)?)
}
// deprecation warning is due to the fact the protobuf files built were based on cosmos-sdk 0.44,
// where they prefer using tx_bytes directly. However, in 0.42, which we are using at the time
// of writing this, the option does not work
// TODO: we should really stop using the `tx` argument here and use `tx_bytes` exlusively,
// however, at the time of writing this update, while our QA and mainnet networks do support it,
// sandbox is still running old version of wasmd that lacks support for `tx_bytes`
#[allow(deprecated)]
async fn query_simulate(
&self,
tx: Option<Tx>,
tx_bytes: Vec<u8>,
) -> Result<SimulateResponse, NyxdError> {
async fn query_simulate(&self, tx_bytes: Vec<u8>) -> Result<SimulateResponse, NyxdError> {
let path = Some("/cosmos.tx.v1beta1.Service/Simulate".to_owned());
let req = SimulateRequest {
tx: tx.map(Into::into),
tx_bytes,
..Default::default()
};
let res = self
@@ -81,17 +81,14 @@ where
auth_info: single_unspecified_signer_auth(public_key, sequence_response.sequence),
signatures: vec![Vec::new()],
};
self.query_simulate(Some(partial_tx), Vec::new()).await
// for completion sake, once we're able to transition into using `tx_bytes`,
// we might want to use something like this instead:
// let tx_raw: tx::Raw = cosmrs::proto::cosmos::tx::v1beta1::TxRaw {
// body_bytes: partial_tx.body.into_bytes().unwrap(),
// auth_info_bytes: partial_tx.auth_info.into_bytes().unwrap(),
// signatures: partial_tx.signatures,
// }
// .into();
// self.query_simulate(None, tx_raw.to_bytes().unwrap()).await
let tx_raw: tx::Raw = cosmrs::proto::cosmos::tx::v1beta1::TxRaw {
body_bytes: partial_tx.body.into_bytes()?,
auth_info_bytes: partial_tx.auth_info.into_bytes()?,
signatures: partial_tx.signatures,
}
.into();
self.query_simulate(tx_raw.to_bytes()?).await
}
async fn upload(
@@ -10,7 +10,7 @@ use cosmrs::tx;
use cosmrs::tx::SignDoc;
use nym_config::defaults;
use thiserror::Error;
use zeroize::{Zeroize, ZeroizeOnDrop};
use zeroize::{Zeroize, ZeroizeOnDrop, Zeroizing};
type Secp256k1Keypair = (SigningKey, PublicKey);
@@ -128,9 +128,20 @@ impl DirectSecp256k1HdWallet {
Ok(accounts)
}
pub fn secret(&self) -> &bip39::Mnemonic {
&self.secret
}
#[deprecated(
note = "use either .secret() for obtaining &bip39::Mnemonic or .mnemonic_string() for Zeroizing wrapper around the String"
)]
pub fn mnemonic(&self) -> String {
self.secret.to_string()
}
pub fn mnemonic_string(&self) -> Zeroizing<String> {
Zeroizing::new(self.secret.to_string())
}
}
#[must_use]
+1
View File
@@ -38,6 +38,7 @@ cosmrs = { workspace = true }
cosmwasm-std = { workspace = true }
nym-validator-client = { path = "../client-libs/validator-client" }
nym-http-api-client = { path = "../http-api-client" }
nym-bin-common = { path = "../../common/bin-common", features = ["output_format"] }
nym-crypto = { path = "../../common/crypto", features = ["asymmetric"] }
nym-network-defaults = { path = "../network-defaults" }
+1 -1
View File
@@ -2,12 +2,12 @@
// SPDX-License-Identifier: Apache-2.0
use crate::context::errors::ContextError;
pub use nym_http_api_client::Client as NymApiClient;
use nym_network_defaults::{
setup_env,
var_names::{MIXNET_CONTRACT_ADDRESS, NYM_API, NYXD, VESTING_CONTRACT_ADDRESS},
NymNetworkDetails,
};
pub use nym_validator_client::nym_api::Client as NymApiClient;
use nym_validator_client::nyxd::{self, AccountId, NyxdClient};
use nym_validator_client::{
DirectSigningHttpRpcNyxdClient, DirectSigningHttpRpcValidatorClient, QueryHttpRpcNyxdClient,
@@ -18,6 +18,6 @@ pub fn create_account(args: Args, prefix: &str) {
let wallet = DirectSecp256k1HdWallet::from_mnemonic(prefix, mnemonic);
// Output address and mnemonics into separate lines for easier parsing
println!("{}", wallet.mnemonic());
println!("{}", wallet.mnemonic_string().as_str());
println!("{}", wallet.try_derive_accounts().unwrap()[0].address());
}
@@ -5,6 +5,16 @@ use crate::types::{EncodedBTEPublicKeyWithProof, NodeIndex};
use cosmwasm_schema::cw_serde;
use cosmwasm_std::Addr;
pub type BlockHeight = u64;
pub type TransactionIndex = u32;
#[cw_serde]
pub struct OwnershipTransfer {
pub node_index: NodeIndex,
pub from: Addr,
pub to: Addr,
}
#[cw_serde]
pub struct DealerDetails {
pub address: Addr,
@@ -73,6 +73,17 @@ pub enum ExecuteMsg {
TriggerReset {},
TriggerResharing {},
/// Transfers ownership of the epoch dealer to another address.
/// This assumes off-chain hand-over of keys
TransferOwnership {
transfer_to: String,
},
/// Update announce address of this signer
UpdateAnnounceAddress {
new_address: String,
},
}
#[cw_serde]
@@ -20,5 +20,7 @@ rand_chacha = { workspace = true }
rand = { workspace = true }
cw-multi-test = { workspace = true }
nym-contracts-common = { path = "../contracts-common" }
[lints]
workspace = true
@@ -1,20 +1,100 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use cosmwasm_std::testing::{message_info, MockApi, MockQuerier, MockStorage};
use cosmwasm_std::testing::{MockApi, MockQuerier, MockStorage, message_info};
use cosmwasm_std::{
coins, Addr, BankMsg, CosmosMsg, Empty, Env, MemoryStorage, MessageInfo, Order, OwnedDeps,
Response, StdResult, Storage,
Addr, BankMsg, CosmosMsg, Decimal, Empty, Env, MemoryStorage, MessageInfo, Order, OwnedDeps,
Response, StdResult, Storage, coins,
};
use cw_storage_plus::{KeyDeserialize, Map, Prefix, PrimaryKey};
use nym_contracts_common::events::may_find_attribute;
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha20Rng;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::str::FromStr;
pub const TEST_DENOM: &str = "unym";
pub const TEST_PREFIX: &str = "n";
pub trait FindAttribute {
fn attribute<E, S>(&self, event_type: E, attribute: &str) -> String
where
E: Into<Option<S>>,
S: Into<String>;
fn any_attribute(&self, attribute: &str) -> String {
self.attribute::<_, String>(None, attribute)
}
fn any_parsed_attribute<T>(&self, attribute: &str) -> T
where
T: FromStr,
<T as FromStr>::Err: Debug,
{
self.parsed_attribute::<_, String, T>(None, attribute)
}
fn parsed_attribute<E, S, T>(&self, event_type: E, attribute: &str) -> T
where
E: Into<Option<S>>,
S: Into<String>,
T: FromStr,
<T as FromStr>::Err: Debug;
fn decimal<E, S>(&self, event_type: E, attribute: &str) -> Decimal
where
E: Into<Option<S>>,
S: Into<String>,
{
self.parsed_attribute(event_type, attribute)
}
}
#[track_caller]
pub fn find_attribute<S: Into<String>>(
event_type: Option<S>,
attribute: &str,
response: &Response,
) -> String {
let event_type = event_type.map(Into::into);
for event in &response.events {
if let Some(typ) = &event_type {
if &event.ty != typ {
continue;
}
}
if let Some(attr) = may_find_attribute(event, attribute) {
return attr;
}
}
// this is only used in tests so panic here is fine
panic!("did not find the attribute")
}
impl FindAttribute for Response {
fn attribute<E, S>(&self, event_type: E, attribute: &str) -> String
where
E: Into<Option<S>>,
S: Into<String>,
{
find_attribute(event_type.into(), attribute, self)
}
fn parsed_attribute<E, S, T>(&self, event_type: E, attribute: &str) -> T
where
E: Into<Option<S>>,
S: Into<String>,
T: FromStr,
<T as FromStr>::Err: Debug,
{
find_attribute(event_type.into(), attribute, self)
.parse()
.unwrap()
}
}
pub fn mock_api() -> MockApi {
MockApi::default().with_prefix(TEST_PREFIX)
}
@@ -4,12 +4,12 @@
use crate::{ContractTester, TestableNymContract};
use cosmwasm_std::testing::{message_info, mock_env};
use cosmwasm_std::{
from_json, Addr, Coin, ContractInfo, Deps, DepsMut, Env, MessageInfo, Response, StdResult,
Storage, Timestamp,
Addr, BlockInfo, Coin, ContractInfo, Deps, DepsMut, Env, MessageInfo, Response, StdResult,
Storage, Timestamp, from_json,
};
use cw_multi_test::{next_block, AppResponse, Executor};
use serde::de::DeserializeOwned;
use cw_multi_test::{AppResponse, Executor, next_block};
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::any::type_name;
use std::fmt::Debug;
@@ -62,6 +62,8 @@ pub trait ContractOpts {
coins: &[Coin],
message: Self::ExecuteMsg,
) -> Result<Response, Self::ContractError>;
fn unchecked_contract_address<D: TestableNymContract>(&self) -> Addr;
}
impl<C> ContractOpts for ContractTester<C>
@@ -130,14 +132,47 @@ where
C::execute()(self.deps_mut(), env, info, message)
}
fn unchecked_contract_address<D: TestableNymContract>(&self) -> Addr {
self.unchecked_contract_address::<D>()
}
}
pub trait ChainOpts: ContractOpts {
fn set_contract_balance(&mut self, balance: Coin);
fn next_block(&mut self);
fn update_block<F: Fn(&mut BlockInfo)>(&mut self, action: F);
fn set_to_epoch(&mut self) {
self.set_block_time(Timestamp::from_seconds(0))
}
fn set_block_time(&mut self, time: Timestamp);
fn set_to_genesis(&mut self) {
self.update_block(|block| {
block.height = 1;
})
}
fn next_block(&mut self) {
self.update_block(next_block)
}
fn advance_day_of_blocks(&mut self) {
self.update_block(|block| {
block.time = block.time.plus_seconds(24 * 60 * 60);
block.height += 17280;
})
}
fn advance_time_by(&mut self, delta_secs: u64) {
self.update_block(|block| {
block.time = block.time.plus_seconds(delta_secs);
block.height += 1
})
}
fn set_block_time(&mut self, time: Timestamp) {
self.update_block(|b| b.time = time)
}
fn execute_msg(
&mut self,
@@ -186,12 +221,9 @@ where
)
.unwrap();
}
fn next_block(&mut self) {
self.app.update_block(next_block)
}
fn set_block_time(&mut self, time: Timestamp) {
self.app.update_block(|b| b.time = time)
fn update_block<F: Fn(&mut BlockInfo)>(&mut self, action: F) {
self.app.update_block(action)
}
fn execute_msg(
@@ -2,22 +2,23 @@
// SPDX-License-Identifier: Apache-2.0
use crate::{
CommonStorageKeys, ContractOpts, ContractTester, StorageWrapper, TestableNymContract,
TEST_DENOM,
CommonStorageKeys, ContractOpts, ContractTester, StorageWrapper, TEST_DENOM,
TestableNymContract,
};
use cosmwasm_std::testing::message_info;
use cosmwasm_std::{
coin, coins, from_json, to_json_vec, Addr, Coin, MessageInfo, StdError, StdResult, Storage,
Addr, Coin, MessageInfo, StdError, StdResult, Storage, coin, coins, from_json, to_json_vec,
};
use cw_multi_test::Executor;
use cw_storage_plus::{Key, Path, PrimaryKey};
use rand::RngCore;
use rand_chacha::ChaCha20Rng;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::any::type_name;
use std::ops::Deref;
pub use rand::prelude::*;
pub trait StorageReader {
fn common_key(&self, key: CommonStorageKeys) -> Option<&[u8]>;
@@ -1,16 +1,16 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{mock_api, test_rng, TEST_DENOM};
use crate::{TEST_DENOM, mock_api, test_rng};
use cosmwasm_std::testing::MockApi;
use cosmwasm_std::{
coin, coins, Addr, Binary, Deps, DepsMut, Empty, Env, MessageInfo, Order, QuerierWrapper,
Record, Response, Storage,
Addr, Binary, Deps, DepsMut, Empty, Env, MessageInfo, Order, QuerierWrapper, Record, Response,
Storage, coin, coins,
};
use cw_multi_test::{App, AppBuilder, BankKeeper, Contract, ContractWrapper, Executor};
use rand_chacha::ChaCha20Rng;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::marker::PhantomData;
@@ -47,27 +47,11 @@ pub trait TestableNymContract {
fn query() -> QueryFn<Self::QueryMsg, Self::ContractError>;
fn migrate() -> PermissionedFn<Self::MigrateMsg, Self::ContractError>;
fn base_init_msg() -> Self::InitMsg;
// // for now we don't care about custom queriers
// fn contract_wrapper() -> ContractWrapper<
// Self::ExecuteMsg,
// Self::InitMsg,
// Self::QueryMsg,
// Self::ContractError,
// anyhow::Error,
// anyhow::Error,
// Empty,
// Empty,
// Empty,
// Self::ContractError,
// Self::ContractError,
// Self::MigrateMsg,
// Self::ContractError,
// > {
// ContractWrapper::new(Self::execute(), Self::instantiate(), Self::query())
// .with_migrate(Self::migrate())
// }
// not all instances will require default init message, some will always have to provide customised values
#[allow(clippy::unimplemented)]
fn base_init_msg() -> Self::InitMsg {
unimplemented!("attempted to instantiate contract without defining instantiate message")
}
fn dyn_contract() -> Box<dyn Contract<Empty>> {
Box::new(
@@ -92,6 +76,7 @@ pub struct ContractTesterBuilder<C> {
app: App<BankKeeper, MockApi, StorageWrapper>,
storage: StorageWrapper,
pub well_known_contracts: HashMap<&'static str, Addr>,
code_ids: HashMap<&'static str, u64>,
}
impl<C> ContractTesterBuilder<C> {
@@ -125,20 +110,33 @@ impl<C> ContractTesterBuilder<C> {
app,
storage,
well_known_contracts: Default::default(),
code_ids: Default::default(),
}
}
pub fn master_address(&self) -> Addr {
self.master_address.clone()
}
pub fn instantiate<D: TestableNymContract>(
mut self,
custom_init_msg: Option<D::InitMsg>,
) -> ContractTesterBuilder<C> {
self.instantiate_contract::<D>(custom_init_msg);
self
}
pub fn instantiate_contract<D: TestableNymContract>(
&mut self,
custom_init_msg: Option<D::InitMsg>,
) {
let code_id = self.app.store_code(D::dyn_contract());
let contract_address = self
.app
.instantiate_contract(
code_id,
self.master_address.clone(),
&custom_init_msg.unwrap_or(D::base_init_msg()),
&custom_init_msg.unwrap_or_else(|| D::base_init_msg()),
&[],
D::NAME,
Some(self.master_address.to_string()),
@@ -154,8 +152,28 @@ impl<C> ContractTesterBuilder<C> {
)
.unwrap();
self.code_ids.insert(D::NAME, code_id);
self.well_known_contracts.insert(D::NAME, contract_address);
self
}
// uses the SAME code
pub fn migrate_contract<D: TestableNymContract>(&mut self, migrate_msg: &D::MigrateMsg) {
self.app
.migrate_contract(
self.master_address.clone(),
self.unchecked_contract_address::<D>(),
migrate_msg,
self.unchecked_contract_code_id::<D>(),
)
.unwrap();
}
pub fn unchecked_contract_address<D: TestableNymContract>(&self) -> Addr {
self.well_known_contracts.get(D::NAME).unwrap().clone()
}
fn unchecked_contract_code_id<D: TestableNymContract>(&self) -> u64 {
*self.code_ids.get(D::NAME).unwrap()
}
pub fn build(self) -> ContractTester<C>
@@ -229,6 +247,10 @@ where
self.insert_common_storage_key(key, value);
self
}
pub fn unchecked_contract_address<D: TestableNymContract>(&self) -> Addr {
self.well_known_contracts.get(D::NAME).unwrap().clone()
}
}
impl<C> Storage for ContractTester<C>
@@ -0,0 +1,53 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use cosmwasm_std::{Binary, CustomQuery, QuerierWrapper, StdResult, from_json};
use serde::Serialize;
use serde::de::DeserializeOwned;
// re-expose methods from QuerierWrapper as traits so that we could more easily define extension traits
pub trait ContractQuerier {
fn query_contract<T: DeserializeOwned>(
&self,
address: impl Into<String>,
msg: &impl Serialize,
) -> StdResult<T>;
fn query_contract_storage(
&self,
address: impl Into<String>,
key: impl Into<Binary>,
) -> StdResult<Option<Vec<u8>>>;
fn query_contract_storage_value<T: DeserializeOwned>(
&self,
address: impl Into<String>,
key: impl Into<Binary>,
) -> StdResult<Option<T>> {
match self.query_contract_storage(address, key)? {
None => Ok(None),
Some(value) => Ok(Some(from_json(&value)?)),
}
}
}
impl<C> ContractQuerier for QuerierWrapper<'_, C>
where
C: CustomQuery,
{
fn query_contract<T: DeserializeOwned>(
&self,
address: impl Into<String>,
msg: &impl Serialize,
) -> StdResult<T> {
self.query_wasm_smart(address, msg)
}
fn query_contract_storage(
&self,
address: impl Into<String>,
key: impl Into<Binary>,
) -> StdResult<Option<Vec<u8>>> {
self.query_wasm_raw(address, key)
}
}
@@ -10,6 +10,7 @@ pub mod events;
pub mod signing;
pub mod types;
pub mod contract_querier;
pub mod helpers;
pub use types::*;

Some files were not shown because too many files have changed in this diff Show More