Compare commits

..

147 Commits

Author SHA1 Message Date
Jędrzej Stuczyński 3c48fa6b8c temp: dont setup hickory 2025-09-09 17:22:25 +01:00
Jędrzej Stuczyński 689cb3a9c2 TEMP TO REMOVE: extra log during deposit 2025-09-09 17:16:55 +01:00
Jędrzej Stuczyński 60870fc458 another restoration 2025-09-09 16:44:09 +01:00
Jędrzej Stuczyński b44997f412 temporarily restored GenericRequestFailure variant 2025-09-09 16:17:00 +01:00
Jędrzej Stuczyński 34aaa60a44 feat: attempt to add more details to reqwest errors 2025-09-09 14:47:44 +01:00
Jędrzej Stuczyński dfa4563d22 use display impl for urls 2025-09-09 13:35:40 +01:00
Bogdan-Ștefan Neacşu f30cc7a8a1 Use default value for the ports until api is deployed 2025-09-08 11:17:36 +03:00
durch 4f08161315 fix: Address critical issues in client configuration registry implementation
- Fixed HeaderMapInit parsing bug that would cause compilation errors
- Added comprehensive documentation with usage examples and DSL reference
- Improved error handling with better error messages for invalid headers
- Added test coverage for both macro and registry functionality
- Added debug inspection capabilities for registered configurations
- Fixed module name conflicts in tests by using separate modules

All tests now passing:
- 7 macro tests validating DSL parsing and code generation
- 4 registry tests verifying configuration collection and application
2025-09-05 20:58:27 +02:00
durch 23d4c75597 Ergonomics and call sites, incept the builder 2025-09-05 18:18:45 +02:00
durch c24ca29212 Where do we need to put inventory 2025-09-05 17:30:03 +02:00
durch 6cff49caf6 Syn workspace version 2025-09-05 17:16:56 +02:00
durch ea31aa6bcf Rename macro 2025-09-05 17:15:13 +02:00
durch 410683dae6 Registry proc-macro 2025-09-05 17:12:44 +02:00
Drazen Urch 043fb31c09 Merge branch 'develop' into domain-fronting-integration 2025-09-01 19:10:41 +02:00
durch e36573463c Merge branch 'domain-fronting-integration' of https://github.com/nymtech/nym into domain-fronting-integration 2025-09-01 14:50:05 +02:00
durch c79471e9ad Assorted CI fixes 2025-09-01 14:48:19 +02:00
Bogdan-Ștefan Neacşu a557ac22c7 Revert "Create an axum_test client for more integrated unit testing (#5956)" (#5999)
This reverts commit efd61eb47c.
2025-09-01 15:37:10 +03:00
Drazen Urch 1cd8713d26 Merge branch 'develop' into domain-fronting-integration 2025-09-01 14:04:10 +02:00
Jędrzej Stuczyński 55ef89178b chore: upgraded syn to 2.0 and removed nym-execute (#5998) 2025-09-01 12:59:13 +01:00
Jędrzej Stuczyński d97be2d8ef bugfix: Recipient deserialisation for deserialisers missing bytes specialisation (#5991)
* bugfix: Recipient deserialisation for deserialisers missing bytes specialisation

for example toml or json will just default to visit_seq ignoring bytes related optimisations

* clippy
2025-09-01 11:30:35 +01:00
Bogdan-Ștefan Neacşu efd61eb47c Create an axum_test client for more integrated unit testing (#5956) 2025-09-01 13:27:06 +03:00
durch edb797da68 Explicit warning on missing fronting configuration 2025-09-01 12:25:25 +02:00
durch 72e1290d8f Remove error generics, address PR comments 2025-09-01 11:28:17 +02:00
Drazen Urch 905d8ed7de Update common/client-core/src/client/base_client/mod.rs
Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
2025-09-01 11:21:24 +02:00
benedetta davico 4a01973b31 Merge pull request #5981 from nymtech/benny/ns-api-ci-fix
Fix the ns api ci workflow
2025-09-01 11:02:21 +02:00
Mark Sinclair 9ad9c3b8e7 Bug fix: NS API monikers (#5990)
* node-status-api: fix missing monikers because of deserialisation issues from unstructured data

* node-status-api: bump version after bug fix monikers

---------

Co-authored-by: Mark Sinclair <mmsinclair@users.noreply.github.com>
2025-09-01 09:48:37 +01:00
dependabot[bot] 6706500132 build(deps): bump pbkdf2 from 3.1.2 to 3.1.3 (#5869)
Bumps [pbkdf2](https://github.com/crypto-browserify/pbkdf2) from 3.1.2 to 3.1.3.
- [Changelog](https://github.com/browserify/pbkdf2/blob/master/CHANGELOG.md)
- [Commits](https://github.com/crypto-browserify/pbkdf2/compare/v3.1.2...v3.1.3)

---
updated-dependencies:
- dependency-name: pbkdf2
  dependency-version: 3.1.3
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 15:52:10 +01:00
dependabot[bot] 33fe059c28 Bump console from 0.15.11 to 0.16.0 (#5931)
Bumps [console](https://github.com/console-rs/console) from 0.15.11 to 0.16.0.
- [Release notes](https://github.com/console-rs/console/releases)
- [Changelog](https://github.com/console-rs/console/blob/main/CHANGELOG.md)
- [Commits](https://github.com/console-rs/console/compare/0.15.11...0.16.0)

---
updated-dependencies:
- dependency-name: console
  dependency-version: 0.16.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 14:24:03 +01:00
dependabot[bot] d6ed2b770b Bump indicatif from 0.17.11 to 0.18.0 (#5924)
Bumps [indicatif](https://github.com/console-rs/indicatif) from 0.17.11 to 0.18.0.
- [Release notes](https://github.com/console-rs/indicatif/releases)
- [Commits](https://github.com/console-rs/indicatif/compare/0.17.11...0.18.0)

---
updated-dependencies:
- dependency-name: indicatif
  dependency-version: 0.18.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 14:23:39 +01:00
dependabot[bot] 7c18a3dced Bump mock_instant from 0.5.3 to 0.6.0 (#5930)
Bumps [mock_instant](https://github.com/museun/mock_instant) from 0.5.3 to 0.6.0.
- [Commits](https://github.com/museun/mock_instant/commits/v0.6.0)

---
updated-dependencies:
- dependency-name: mock_instant
  dependency-version: 0.6.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 14:20:22 +01:00
dependabot[bot] 09475ab4e0 build(deps): bump mikefarah/yq from 4.45.4 to 4.47.1 (#5911)
Bumps [mikefarah/yq](https://github.com/mikefarah/yq) from 4.45.4 to 4.47.1.
- [Release notes](https://github.com/mikefarah/yq/releases)
- [Changelog](https://github.com/mikefarah/yq/blob/master/release_notes.txt)
- [Commits](https://github.com/mikefarah/yq/compare/v4.45.4...v4.47.1)

---
updated-dependencies:
- dependency-name: mikefarah/yq
  dependency-version: 4.47.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 13:36:39 +01:00
dependabot[bot] b7606cd2ef Bump defguard_wireguard_rs from v0.4.7 to v0.7.5 (#5928)
Bumps [defguard_wireguard_rs](https://github.com/DefGuard/wireguard-rs) from v0.4.7 to v0.7.5.
- [Release notes](https://github.com/DefGuard/wireguard-rs/releases)
- [Commits](https://github.com/DefGuard/wireguard-rs/compare/ef1cf3714629bf5016fb38cbb7320451dc69fb09...d090d2249e5bb3d4154f07de098387e2ab69bfdc)

---
updated-dependencies:
- dependency-name: defguard_wireguard_rs
  dependency-version: d090d2249e5bb3d4154f07de098387e2ab69bfdc
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 13:35:42 +01:00
dependabot[bot] 006a57312d Bump tokio from 1.46.1 to 1.47.1 (#5929)
Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.46.1 to 1.47.1.
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-1.46.1...tokio-1.47.1)

---
updated-dependencies:
- dependency-name: tokio
  dependency-version: 1.47.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 13:35:18 +01:00
dependabot[bot] 9b5aded8a5 build(deps): bump actions/download-artifact from 4 to 5 (#5939)
Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 4 to 5.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 13:33:31 +01:00
dependabot[bot] f4a69636fe build(deps): bump actions/first-interaction from 1 to 3 (#5950)
Bumps [actions/first-interaction](https://github.com/actions/first-interaction) from 1 to 3.
- [Release notes](https://github.com/actions/first-interaction/releases)
- [Commits](https://github.com/actions/first-interaction/compare/v1...v3)

---
updated-dependencies:
- dependency-name: actions/first-interaction
  dependency-version: '3'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 13:31:29 +01:00
dependabot[bot] 0463d88646 Bump slab from 0.4.10 to 0.4.11 (#5952)
Bumps [slab](https://github.com/tokio-rs/slab) from 0.4.10 to 0.4.11.
- [Release notes](https://github.com/tokio-rs/slab/releases)
- [Changelog](https://github.com/tokio-rs/slab/blob/master/CHANGELOG.md)
- [Commits](https://github.com/tokio-rs/slab/compare/v0.4.10...v0.4.11)

---
updated-dependencies:
- dependency-name: slab
  dependency-version: 0.4.11
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 13:31:02 +01:00
dependabot[bot] 534bf5d824 build(deps): bump actions/setup-java from 4 to 5 (#5975)
Bumps [actions/setup-java](https://github.com/actions/setup-java) from 4 to 5.
- [Release notes](https://github.com/actions/setup-java/releases)
- [Commits](https://github.com/actions/setup-java/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/setup-java
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 13:30:00 +01:00
dependabot[bot] 34684b14db Bump sha.js from 2.4.11 to 2.4.12 (#5983)
Bumps [sha.js](https://github.com/crypto-browserify/sha.js) from 2.4.11 to 2.4.12.
- [Changelog](https://github.com/browserify/sha.js/blob/master/CHANGELOG.md)
- [Commits](https://github.com/crypto-browserify/sha.js/compare/v2.4.11...v2.4.12)

---
updated-dependencies:
- dependency-name: sha.js
  dependency-version: 2.4.12
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-29 13:29:18 +01:00
durch 4e5ccf7926 fix: provide all API URLs for automatic failover in endpoint rotation
Previously, when rotating API endpoints, only a single URL was provided to the
HTTP client, defeating the purpose of having multiple URLs for resilience.

Changes:
- NymApiTopologyProvider now provides all URLs in rotated order when switching endpoints
- NymApisClient similarly provides all URLs starting from the working endpoint
- Added clarifying comments for broadcast/exhaustive query methods where single URLs are intentionally used
- This enables the HTTP client's built-in failover mechanism while maintaining endpoint rotation behavior

The fix ensures that if the primary endpoint fails, the client can automatically
failover to alternative endpoints without manual intervention, improving overall
network resilience.
2025-08-29 13:37:45 +02:00
durch 0a8eb940bb feat: complete migration to unified HTTP client and fix all compilation errors
- Added missing NymApiClientExt trait methods (get_all_expanded_nodes, change_base_urls)
- Fixed all compilation errors across the workspace
- Updated nym-node to use unified client instead of deprecated NymApiClient
- Fixed type conversions for RewardedSetResponse → EpochRewardedSet
- Added nym-http-api-client dependency where needed
- Updated all examples and documentation to use new client API
2025-08-29 13:36:22 +02:00
durch 34fb67602c fix: resolve all compilation errors after NymApiClient migration
- Add missing nym-http-api-client dependencies to multiple crates
- Add NymApiClientExt trait imports where needed
- Fix type mismatches from NymApiClient to unified Client
- Add error conversions for NymAPIError in various error enums
- Implement missing trait methods (get_current_rewarded_set, get_all_basic_nodes_with_metadata, get_all_described_nodes)
- Fix type conversions for RewardedSetResponse in network monitor
- Update all API client instantiation to use new unified HTTP client
2025-08-29 13:35:41 +02:00
durch 766ae8dd8e feat: migrate NymApiClient usage to unified HTTP client
- Wire up domain fronting configuration in NymNetworkDetails
- Implement NymApiClientExt trait for base nym_http_api_client::Client
- Migrate direct NymApiClient usage in multiple components:
  - nym-network-monitor
  - verloc measurements
  - connection tester
  - coconut/ecash client
  - validator rewarder
- Add Copy derive to ApiUrlConst to enable iteration
- Update error handling and Display implementations

This enables automatic domain fronting for all Nym API calls via the configured CDN front hosts.
2025-08-29 13:33:37 +02:00
durch bd1fd73ba0 feat: unify HTTP client creation and enable domain fronting
Enhanced the base nym_http_api_client to reduce fragmentation and enable domain fronting:

- Added SerializationFormat enum for explicit JSON/bincode choice (no auto-detection)
- Added from_network() method to create clients from NymNetworkDetails with domain fronting
- Added with_bincode() builder method for explicit serialization configuration
- Set Accept header based on serialization preference
- Added deprecation paths for NymApiClient wrapper and nym_api::Client re-export
- Enabled domain fronting support via network defaults feature

This is part of a broader effort to consolidate HTTP client implementations across the codebase,
reducing ~500 lines of wrapper code and providing automatic domain fronting for censorship resistance.
2025-08-29 13:33:37 +02:00
Jędrzej Stuczyński b2266d04ef chore: internal hidden command to force advance nyx epoch (#5964) 2025-08-29 11:41:22 +01:00
Jędrzej Stuczyński 911b365609 chore: purge temp databases on build (#5984)
* purge any temp databases on build

* updated min rust version

* fixed clippy::manual_abs_diff' in verloc due to updated msrv

* wasm
2025-08-29 11:41:08 +01:00
Jędrzej Stuczyński e9acc014ed feat: credential proxy deposit pool (#5945)
* chore: rename VpnApiError to CredentialProxyError

* reorganise deposit flow

* updated sql tables et al.

* insert information about deposit usage failure

* remove old deposit maker

* nym credential proxy to monitor quorum state to stop issuance if it'd fail

* clippy

* target lock new modules

* windows clippy

* renamed migration file due to rebasing
2025-08-29 09:39:57 +01:00
Jędrzej Stuczyński 0f66e5a154 bugfix: make sure tables are removed in correct order to not trigger FK constraint issue (#5987) 2025-08-29 09:03:22 +01:00
Bogdan-Ștefan Neacşu f8337d9b38 Wireguard metadata client library (#5943) 2025-08-28 15:43:46 +03:00
Bogdan-Ștefan Neacşu 4fb252c44b Wireguard private metadata (#5915) 2025-08-28 15:14:52 +03:00
Jędrzej Stuczyński 17708cdf92 bugfix: manually calculate per node work on rewarded set changes (#5972) 2025-08-27 12:33:24 +01:00
Andrej Mihajlov a9c56ef9ac Merge pull request #5976 from nymtech/am/update-sysinfo
Update sysinfo to the latest
2025-08-27 10:58:06 +02:00
Jędrzej Stuczyński 724420f97c chore: move authenticator into gateway crate (#5982)
* removed unused bits of authenticator config

* moved authenticator into gateway

* cleaned up imports

* clippy
2025-08-27 09:05:02 +01:00
benedettadavico 66d0296f47 update dockerfile to pg 2025-08-26 16:02:11 +02:00
benedettadavico 03bbbf44e9 ns api ci fix 2025-08-26 16:02:11 +02:00
dynco-nym 0a48fa6172 Remove freshness check on testrun submit (#5977)
* Remove freshness check on testrun submit
- freshness is enforced by a background task
  that marks testruns as stale after a
  configured amount of time

* Move code around

* Add humantime

* Update launch script

* Fix typo

* Adjust agent run script

* Configure user agent

* Bump version
2025-08-26 12:26:13 +02:00
Andrej Mihajlov 5c8749a2e1 Update sysinfo to the latest
Shakes out windows@0.57 from the tree
2025-08-23 09:29:47 +02:00
p17o 18d9d807f2 Added VPS provider hostraha.com (#5959) 2025-08-22 12:31:31 +00:00
import this 3a7393d316 [DOCs/operators]: Roll back from v2025.15-gruyere to v2025.14-feta (#5973)
* release notes and version bump up

* update stats

* roll version back and comment new notes
2025-08-22 11:23:16 +00:00
import this 6ce5f707c6 [DOCs/operators]: Release notes for v2025.15 gruyere (#5969)
* release notes and version bump up

* update stats
2025-08-21 11:49:52 +00:00
benedetta davico 766a1d4497 Merge pull request #5965 from nymtech/benny/fix-ns-agent-ci
fixing the ci for ns agent
2025-08-21 12:22:05 +02:00
benedetta davico 35c83f0a31 Merge pull request #5967 from nymtech/release/2025.15-gruyere
merge gruyere to develop
2025-08-21 12:20:43 +02:00
benedetta davico 01dd4a7972 Merge pull request #5958 from nymtech/bugfix/linux-build-ci
bugfix: fix ci-build for linux (and use updated runner)
2025-08-21 10:32:51 +02:00
Jędrzej Stuczyński c2e335557e Feature/testing utils (#5963)
* helper wrapper for stream-sink channel

* similar helper for async read/write

* example tests and clippy
2025-08-20 16:17:09 +01:00
benedettadavico 40e1cbc7a9 update changelog 2025-08-20 12:34:04 +02:00
Jędrzej Stuczyński c133e0e88b chore: updated refs to cheddar rev of nym repo (#5955)
* chore: updated refs to cheddar rev of nym repo

* update statistics-api version
2025-08-19 09:28:42 +01:00
Andrej Mihajlov 5b716633de Merge pull request #5960 from nymtech/am/update-strum
Migrate strum to 0.27.2
2025-08-18 18:28:22 +02:00
Andrej Mihajlov 834538300d Migrate to strum 0.27.2 2025-08-18 18:02:41 +02:00
Jędrzej Stuczyński bd0d70f7cd bugfix: fix ci-build for linux (and use updated runner) 2025-08-15 09:37:41 +01:00
Jędrzej Stuczyński 979485c582 http api client adjustment (#5953)
* missing feature lock for attempting to clone client

* added helper macro to generate user agent without additional imports
2025-08-13 12:52:16 +01:00
Bogdan-Ștefan Neacşu d95f66bd90 Move credential verifier in peer controller (#5938)
* Move credential verifier in peer controller

* Send back errors of peer controller
2025-08-13 13:09:44 +03:00
Jędrzej Stuczyński 906dfb2fb0 change PK/FK on expiration date signatures tables (#5934)
* update nym-credential-proxy

* update credential-storage

* update nym-api

* clippy
2025-08-12 09:03:53 +01:00
Jędrzej Stuczyński 7daa726626 feat: introduce additional checks when attempting to send to bounded channels (#5941)
* feat: introduce additional checks when attempting to send to bounded channels

or to a fallible gateway

* return error rather than panic when merging socket during shutdown
2025-08-11 09:15:12 +01:00
Jędrzej Stuczyński 067f492ad6 chore: fix rust 1.89 clippy issues (#5944) 2025-08-08 13:03:05 +01:00
Jędrzej Stuczyński ed73ec9ce6 chore: remove unused import (#5942) 2025-08-07 09:56:08 +01:00
import this 61606630bd [DOCs/operators]: Release notes v2025.14-feta (#5935)
* update release

* fix typos
2025-08-06 08:24:38 +00:00
benedettadavico 2d3deeb424 bump versions 2025-08-06 09:56:08 +02:00
benedetta davico 3827dc357d Merge pull request #5936 from nymtech/release/2025.14-feta
Merge release/2025.14-feta to develop
2025-08-06 09:54:29 +02:00
benedetta davico a70e9e23d3 Merge branch 'develop' into release/2025.14-feta 2025-08-06 09:48:22 +02:00
Jędrzej Stuczyński dc59149a5d squashed feature/ecash-liveness-check (#5890) (#5890)
delay to gruyere

chore: delay to Feta

added threshold information to the response

nym api test clippy

bugfixes and endpoint improvements

expose results on api endpoints

wip: making nym api monitor network signers

added fallback legacy queries to get basic support idea

refactored the code to expose bool-only methods for status

ecash-signer-check lib for obtaining basic ecash signer information
2025-08-05 12:28:42 +01:00
benedetta davico e418c7587a Merge pull request #5914 from nymtech/feature/nym-node-gw-reset
nym-node debug command to reset providers db
2025-08-05 12:05:44 +02:00
import this 33339c085d [DOCs/operators]: Update ISP list (#5918)
* update ISP list

* remove typo
2025-07-31 13:47:27 +00:00
Sachin Kamath 863f329106 docs: update validator instructions and waitlist callout (#5922) 2025-07-30 15:03:39 +00:00
import this 314a37cabe WG exit policy scripts update (#5921)
* add NIP-3 ports to WG manager script

* add monero ports to local testing script

* console output snippet update
2025-07-30 09:43:39 +00:00
Jack Wampler 917f391948 Make DNS Resolver fallback optional (#5920)
default to no dns system fallback, but keep support
2025-07-29 11:00:24 -06:00
Jędrzej Stuczyński 0b4deda621 nym-node debug command to reset providers db 2025-07-25 13:33:12 +01:00
benedetta davico d01867ca8d save version 2025-07-25 11:27:42 +02:00
benedetta davico 502c63b291 Fix broken CI 2025-07-25 11:19:27 +02:00
Jędrzej Stuczyński a4e674c98b basic zulip client for sending messages (#5913) 2025-07-24 16:22:35 +01:00
Jędrzej Stuczyński 7f97f13799 chore: nym node tokio console (#5909)
* conditionally enable console-subscriber within nym-node

* Update ci-build-upload-binaries.yml

* Update ci-build-upload-binaries.yml

add features console

* updated feature name

* fixed filtering on tracing layers

* add track_caller when spawning futures for better tokio-console support

* allow [client] tasks to specify their names when used within tokio console

* clippy

* pre-emptively fix wasm clippy

---------

Co-authored-by: Tommy Verrall <60836166+tommyv1987@users.noreply.github.com>
2025-07-24 11:00:58 +01:00
Bogdan-Ștefan Neacşu b975d08342 Remove old free credential handle (#5864)
* Set cached storage counters to 0 (#5812)

* Set cached storage counters to 0

* u64 to i64 log possible error

* Check addition too

Debug commit

Remove more data from wg storage peer

Put actual ticket type in storage

Simplify add peer

Finish rebase

Pass defguard Peer

Cache less data for consumption

GatewayStorage traits

Wg API trait

Mock test structures

Unit test for peer controller

EcashManager trait

Init test of Authenticator

Remove peer test

* Fix windows different API

* Use make_bincode_serializer like in other places

* Add log_slow_statements to gateway storage

* Use correct LevelFilter

* Fix clippy

* More win fix

* Win clippy

* Use two error variants more

* Use only one Arc<RwLock<T>> instead of many more

* Remove commented test

* Specific trait import
2025-07-23 17:07:12 +03:00
Jędrzej Stuczyński 8e44f9f07f chore: allow compatibility with 'CDLA-Permissive-2.0' (#5910) 2025-07-23 14:48:40 +01:00
benedettadavico 85604e8305 bump versions 2025-07-23 10:18:45 +02:00
benedetta davico 8461d085a5 Merge pull request #5906 from nymtech/release/2025.13-emmental
merge release/2025.13-emmental to develop
2025-07-22 16:23:29 +02:00
Drazen Urch af9f6e5ca0 Allow PG database backend (#5880)
* feat(db): add SQL query wrapper for PostgreSQL placeholder conversion

- Created query_wrapper module with functions to automatically convert
  SQLite ? placeholders to PostgreSQL $1, $2, ... format
- Updated build.rs to handle mutually exclusive feature flags
- Modified one query in mixnodes.rs as proof of concept
- Added type conversions for PostgreSQL compatibility (u32->i64, u16->i32)

This is a checkpoint commit before converting all queries to use the wrapper.

* feat(nym-node-status-api): add PostgreSQL database support via feature flags

Implement dual database support for SQLite and PostgreSQL through Cargo feature flags.
The implementation uses a query wrapper that automatically converts SQLite-style ?
placeholders to PostgreSQL-style $1, $2, ... placeholders at runtime.

Key changes:
- Add query wrapper functions that handle placeholder conversion
- Convert all sqlx::query\! macros to use wrapper functions
- Handle type conversions between databases (i64 vs i32)
- Add feature-gated implementations for database-specific SQL syntax
- Update Makefile with clippy targets for both database features
- Document database support in README

* feat(nym-node-status-agent): add multi-API support with random selection

Agents can now connect to multiple APIs and randomly select one for each testrun:
- Accept multiple --server arguments in format "address:port:auth_key"
- Randomly shuffle server list before attempting connections
- Try each server until a testrun is obtained
- Submit results back only to the API that provided the testrun
- Continue to next server if one is down or has no testruns available

* feat(nym-node-status): implement primary/secondary server architecture

- Agent now requests testruns only from primary server (first in list)
- Results are submitted to all configured servers in parallel
- Secondary servers accept external testruns via new v2 endpoint
- Added auto-creation of gateway and testrun records on secondary servers
- New database queries: get_or_create_gateway, insert_external_testrun
- Client library enhanced with submit_results_with_context method

* Bump Node status API version

* Fix build workdir

* Bump to 3.1.4

* Fix types and queries

* 3.1.6

* Fix gateway perf, bump 3.1.7

* NodeId -> i32, 3.1.8

* Bump agent version

* i64 -> i32

* Use image yq

* Migration and more types

* Update remaining JSONB columns

* Simplify server config

* Update build path

* Change delimiter

* bump agent

* Split up pg and sqlite builds

* More typing fixes, build-and-push script

* Fix Dockerfile-pg

* Bump node-status-api

* TYping

* Agent build script

* More logging around testruns

* Fail loudly on read errors

* Cleanup

* Debug get gateways query

* Fix get_gateways query

* Use pg cert, 3.1.16

* Submit regular results to primary server

* Bump freshenss cutoff

* Update Cargo.lock

* fix: resolve rebase conflicts and compilation errors

After rebasing onto develop, fixed several issues:
- Fixed borrowed data escapes error by using sqlx::query directly in transaction functions
- Removed unused imports and cleaned up code
- Maintained database-specific implementations for transaction functions

* fmt

* Make PG default to make lives easier

* Performance improvements for Explorer v2

* Fix sqlite build

* Fix PG migration

* Tests round 1

* DB tests

* More tests

* And some more tests

* And some more, more tests

* cargo fmt

* Fix some failing lints

* Fix lioness version problems

* Clippy in tests

---------

Co-authored-by: dynco-nym <173912580+dynco-nym@users.noreply.github.com>
2025-07-22 15:25:43 +02:00
import this a9ae2017f5 [DOCs/operators]: Release notes/v2025.13 emmental & NIP-3 announcement (#5908)
* initialise PR, add dev notes and bump node version

* add operators tool and update api stats
2025-07-22 12:10:43 +00:00
Bogdan-Ștefan Neacşu 09ebe7f9e9 Support mnemonic in the NS agent (#5883)
Co-authored-by: benedettadavico <benedetta.davico@gmail.com>
2025-07-22 14:21:12 +03:00
Andrej Mihajlov b72915c224 Merge pull request #5905 from nymtech/am/sqlx-guard-obtain-db-path-from-pool
sqlx-pool-guard: obtain filename from connect options
2025-07-22 11:57:55 +02:00
Andrej Mihajlov add3e864e3 sqlx-pool-guard: obtain filename from connect options 2025-07-22 11:09:39 +02:00
benedettadavico 578c9b0567 update changelog 2025-07-22 11:09:35 +02:00
Andrej Mihajlov 8f6f696f36 Merge pull request #5896 from nymtech/am/handle-table-allocate-more-memory 2025-07-22 11:09:11 +02:00
Jędrzej Stuczyński e9165763b6 Feature/dkg snapshot epoch (#5900)
* define storage item for holding historical DKG state

* make all epoch storage operations go through proxy functions

* make each saving action also apply to the historical item

* removed usage of update_epoch function

* test correct save heights

* exposed query for the epoch state at specified height

* regenerated contract schema

* restored default cw-plus behaviour as in hindsight it makes more sense
2025-07-21 17:32:57 +01:00
mfahampshire 6c1149708b GW Probe docs: Go dep. + new required mnemonic (#5897)
* add note on go dep

* updated -h and useage doc
2025-07-18 12:36:30 +00:00
Mark Sinclair aaf6931d78 nym-node-status-ui placeholder (#5902)
Co-authored-by: Mark Sinclair <mmsinclair@users.noreply.github.com>
2025-07-17 20:04:45 +01:00
Jędrzej Stuczyński 97804f2fe5 Feature/dkg epoch dealers query (#5899)
* feat: add GetEpochDealers and GetEpochDealersAddresses queries to the DKG contract

* extended DkgQueryClient with new queries

* updated contract schema

* unit tests
2025-07-17 12:26:01 +01:00
Jędrzej Stuczyński 802d9b69ca fix: don't allow mixnode running in exit mode (#5898)
* fix: don't allow mixnode running in exit mode

* fixed error message
2025-07-17 10:57:16 +01:00
Andrej Mihajlov 7313857bc8 Allocate more memory to account for a drift in handle table size in between calls 2025-07-16 13:29:45 +02:00
benedettadavico 779174ada5 update wallet changelog 2025-07-15 14:57:49 +02:00
benedettadavico 8771c1dfa6 bump wallet version 2025-07-15 14:47:49 +02:00
benedettadavico 329ad83fc0 bump versions 2025-07-15 10:04:51 +02:00
Jack Wampler aea5872ad0 bump h2 dependency to fix DoH connection close logging (#5893) 2025-07-14 12:56:56 -06:00
Mark Sinclair 9e9abd74d7 Update ci-sonar.yml
[skip ci]
2025-07-14 17:34:26 +01:00
Mark Sinclair 3832508af7 Update sonar-project.properties 2025-07-14 17:33:10 +01:00
Mark Sinclair 69a4e33b17 Create sonar-project.properties 2025-07-14 17:25:30 +01:00
Mark Sinclair 83385421ff Create ci-sonar.yml 2025-07-14 17:24:42 +01:00
Jędrzej Stuczyński ec53b570dc listen for shutdown signals during nym-node startup (#5879)
this is to avoid situation where the process can't be killed without 'kill -9' because the logic to listen to shutdown signals hasn't been hit yet
2025-07-14 12:13:40 +01:00
Jędrzej Stuczyński ebcc658f98 chain scraper: ignore precommits from missing validators (#5867) 2025-07-14 08:46:19 +01:00
Mark Sinclair 6a155721c6 Update push-node-status-agent.yaml 2025-07-11 13:51:10 +01:00
Mark Sinclair 1bb8b3a3ec Update push-node-status-api.yaml 2025-07-11 13:50:07 +01:00
Mark Sinclair 8d1a16eb02 Update push-node-status-api.yaml 2025-07-11 11:46:21 +01:00
Mark Sinclair 8d10cf70e9 Update push-node-status-api.yaml 2025-07-11 11:36:16 +01:00
Mark Sinclair e32df10b4d Update push-node-status-api.yaml 2025-07-11 11:30:26 +01:00
Mark Sinclair d1660c01e6 Update push-node-status-api.yaml 2025-07-11 11:12:09 +01:00
Sachin Kamath 14378b1db9 hotfix: fix contract build in Makefile (#5892) 2025-07-11 15:32:49 +05:30
dynco-nym 35bbf5fd84 Batch SQL writes for packet stats (#5874)
* Move stuff around

* Batch SQL operations

* Clippy

* Bump version

* Remove shared queue which was always re-initialized

* Make max_concurrent_tasks configurable

* fixed typo

* clippy

---------

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
2025-07-11 10:53:19 +01:00
Mark Sinclair c374a4935a Update push-node-status-agent.yaml (#5882)
* Update push-node-status-agent.yaml

* Update push-node-status-api.yaml

* Update push-node-status-api.yaml

Fix up typo

* Update push-node-status-agent.yaml

* Update push-node-status-api.yaml
2025-07-11 10:29:05 +01:00
Jędrzej Stuczyński 513f4f652d Merge pull request #5887 from nymtech/merge/release/2025.12-dolcelatte
merge: release/2025.12-dolcelatte into develop
2025-07-10 09:16:58 +01:00
Sachin Kamath 82b9425ca6 chore: build contracts with cw optimizer (#5888) 2025-07-09 21:45:10 +05:30
Jędrzej Stuczyński 615e98b166 Merge branch 'develop' into merge/release/2025.12-dolcelatte 2025-07-09 15:37:41 +01:00
import this b11f6c6c70 [DOCs/operators]: Release notes v2025.12-dolcelatte (#5881)
* initialise release update

* add dev features and bugfixes

* add version

---------

Co-authored-by: mfahampshire <maxhampshire@pm.me>
2025-07-09 13:32:46 +00:00
Jędrzej Stuczyński 2f5e8e0bcd feat: forbid running mixnode + entry on the same node (#5878) 2025-07-09 08:59:55 +01:00
Jędrzej Stuczyński 812a8782b4 ignore 'Send' responses when claiming bandwidth (#5884) 2025-07-08 09:09:18 +01:00
benedettadavico 089c47cce7 update changelog 2025-07-07 15:44:15 +02:00
Jędrzej Stuczyński 833114372a bugfix: key-rotation + reply SURBs (#5876)
* wip: changes to surb logic + stronger db typing

* surb invalidation logic

* chore: remove unused deps

* resolving todos

* a lot of additional bugfixes

* 1.88 clippy

* wasm fixes

* wasm clippy

* wallet clippy

* wait for epoch end when setting up new network

* split ReplyController into Sender and Receiver for easier reasoning

* additional reply surbs improvements

includes, but is not limited to: unconditionally reseting sender tag on restart, limiting number of surb re-requests, resetting stale surbs on load

* fixed calculation of number of removed surbs

* add additional calculated field to key rotation info

* DBG: 'request_reply_surbs_for_queue_clearing' temp logs

* fixes for silly mistakes

* conditionally reduce log severity
2025-07-04 16:29:03 +01:00
Jack Wampler a7b57d7e58 Make Mix hops optional for Mixnet Client SURBs (#5861)
* allow SURBs to be configured without mix hops

* gateways require consistency in surb format so if disabling mixnhops - use updated format
2025-07-03 09:21:50 -06:00
benedettadavico 84e10a654c Revert "Bump ns-api version"
This reverts commit d724f94319.
2025-07-01 15:26:55 +02:00
benedetta davico d724f94319 Bump ns-api version 2025-07-01 15:19:56 +02:00
Jędrzej Stuczyński d0692a567a feat: basic performance contract integration [within Nym API] (#5871)
* renamed nym-api config fields

* decouple rewarder startup from network monitor

* additional sections in nym-api config

* removed vesting queries in circulating supply calculator

* added memoized field for last submitted performance measurement

* wip: performance contract refresher

* cleaned up various contract caches

* modified cache refresher to allow passing update fn

* implement performance cache refreshing

* updated lefthook.yml to run cargo fmt

* impl NodePerformanceProvider trait

* dynamically using specific performance provider

* pre warm up performance contract cache and forbid the mode if its empty

* clippy

* introduce fallback setting for performance contract if value for given epoch is not available

* move some functions around
2025-07-01 11:29:50 +01:00
Jędrzej Stuczyński 2ae38b9e49 chore: 1.88 clippy (#5877)
* 1.88 clippy

* wasm clippy

* wallet clippy
2025-07-01 10:28:57 +01:00
benedetta davico ef5990658a Merge pull request #5873 from nymtech/wallet/fix-link 2025-06-26 13:26:36 +02:00
benedettadavico 658dec8299 fix the broken link 2025-06-26 12:44:47 +02:00
dynco-nym 447352b8d6 Set busy_timeout in sqlx (#5872)
* Set busy_timeout

* Bump version
2025-06-26 10:44:06 +02:00
Tommy Verrall d6bb0979d0 fix imports
- it was not compiling due to this
2025-06-24 16:12:06 +02:00
Jędrzej Stuczyński fa1d47e941 Bugfix/backwards compat (#5865)
* lowered log severity

* make nodes use legacy encoding for forwarding packets

* note regarding localnet noise
2025-06-19 09:57:46 +01:00
Jędrzej Stuczyński 44ec6d6bc8 bugfix: allow gateways to permit authentication from v4 clients (#5862) 2025-06-18 09:17:54 +01:00
Jędrzej Stuczyński 6d47046a38 fixed client route for obtaining v2 list of gateways (#5859) 2025-06-16 14:32:46 +01:00
Simon Wicky 5cfd09cd99 fix removal of qa env 2025-06-13 10:03:50 +02:00
benedettadavico 40b4670d80 bump versions 2025-06-12 12:21:02 +02:00
910 changed files with 41643 additions and 16463 deletions
@@ -38,15 +38,14 @@ jobs:
rm -rf ci-builds || true
mkdir -p $OUTPUT_DIR
echo $OUTPUT_DIR
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libudev-dev
- name: Sets env vars for tokio if set in manual dispatch inputs
run: |
echo 'RUSTFLAGS="--cfg tokio_unstable"' >> $GITHUB_ENV
if: github.event_name == 'workflow_dispatch' && inputs.add_tokio_unstable == true
run: |
echo "RUSTFLAGS=--cfg tokio_unstable" >> $GITHUB_ENV
echo "CARGO_FEATURES=--features tokio-console" >> $GITHUB_ENV
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
@@ -103,7 +102,6 @@ jobs:
if [ ${{ github.event_name == 'workflow_dispatch' && inputs.enable_deb == true }} = true ]; then
cp target/debian/*.deb $OUTPUT_DIR
fi
- name: Deploy branch to CI www
continue-on-error: true
uses: easingthemes/ssh-deploy@main
+7 -7
View File
@@ -38,7 +38,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ arc-ubuntu-22.04, custom-windows-11, custom-macos-15 ]
os: [ arc-linux-latest, custom-windows-11, custom-macos-15 ]
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
@@ -46,9 +46,9 @@ jobs:
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools protobuf-compiler
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools protobuf-compiler cmake
continue-on-error: true
if: contains(matrix.os, 'ubuntu')
if: contains(matrix.os, 'linux')
- name: Check out repository code
uses: actions/checkout@v4
@@ -63,7 +63,7 @@ jobs:
# To avoid running out of disk space, skip generating debug symbols
- name: Set debug to false (unix)
if: contains(matrix.os, 'ubuntu') || contains(matrix.os, 'mac')
if: contains(matrix.os, 'linux') || contains(matrix.os, 'mac')
run: |
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
git diff
@@ -93,14 +93,14 @@ jobs:
command: build
- name: Build all examples
if: contains(matrix.os, 'ubuntu')
if: contains(matrix.os, 'linux')
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace --examples
- name: Run all tests
if: contains(matrix.os, 'ubuntu')
if: contains(matrix.os, 'linux')
uses: actions-rs/cargo@v1
env:
NYM_API: https://sandbox-nym-api1.nymtech.net/api
@@ -109,7 +109,7 @@ jobs:
args: --workspace
- name: Run expensive tests
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && contains(matrix.os, 'ubuntu')
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && contains(matrix.os, 'linux')
uses: actions-rs/cargo@v1
with:
command: test
@@ -16,7 +16,7 @@ jobs:
uses: actions/checkout@v4
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -16,7 +16,7 @@ jobs:
uses: actions/checkout@v4
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -31,33 +31,26 @@ jobs:
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: stable
target: wasm32-unknown-unknown
override: true
- name: Install wasm-opt
uses: ./.github/actions/install-wasm-opt
with:
version: '114'
- name: Install cosmwasm-check
run: cargo install cosmwasm-check
- name: Build release contracts
run: make contracts
run: make publish-contracts
- name: Prepare build output
shell: bash
env:
OUTPUT_DIR: ci-contract-builds/${{ github.ref_name }}
run: |
cp contracts/target/wasm32-unknown-unknown/release/mixnet_contract.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/vesting_contract.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/nym_coconut_dkg.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/cw3_flex_multisig.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/cw4_group.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/nym_ecash.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/nym_pool_contract.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/nym_performance_contract.wasm $OUTPUT_DIR
find contracts/artifacts -maxdepth 1 -type f -name '*.wasm' -exec cp {} $OUTPUT_DIR \;
# Also include the optimizer-generated checksums if present
if [ -f contracts/artifacts/checksums.txt ]; then
cp contracts/artifacts/checksums.txt $OUTPUT_DIR
fi
- name: Deploy branch to CI www
continue-on-error: true
@@ -0,0 +1,75 @@
name: ci-nym-wallet-storybook
on:
pull_request:
paths:
- 'nym-wallet/**'
- '.github/workflows/ci-nym-wallet-storybook.yml'
jobs:
build:
runs-on: custom-linux
steps:
- uses: actions/checkout@v4
- name: Install rsync
run: sudo apt-get install rsync
continue-on-error: true
- uses: rlespinasse/github-slug-action@v3.x
- uses: actions/setup-node@v4
with:
node-version: 20
- name: Setup yarn
run: npm install -g yarn
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Install wasm-pack
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
- name: Build dependencies
run: yarn && yarn build
- name: Build storybook
run: yarn storybook:build
working-directory: ./nym-wallet
- name: Deploy branch to CI www (storybook)
continue-on-error: true
uses: easingthemes/ssh-deploy@main
env:
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
ARGS: "-rltgoDzvO --delete"
SOURCE: "nym-wallet/storybook-static/"
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/wallet-${{ env.GITHUB_REF_SLUG }}
EXCLUDE: "/dist/, /node_modules/"
- name: Matrix - Node Install
run: npm install
working-directory: .github/workflows/support-files
- name: Matrix - Send Notification
env:
NYM_NOTIFICATION_KIND: nym-wallet
NYM_PROJECT_NAME: "nym-wallet"
NYM_CI_WWW_BASE: "${{ secrets.NYM_CI_WWW_BASE }}"
NYM_CI_WWW_LOCATION: "wallet-${{ env.GITHUB_REF_SLUG }}"
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
GIT_BRANCH: "${GITHUB_REF##*/}"
IS_SUCCESS: "${{ job.status == 'success' }}"
MATRIX_SERVER: "${{ secrets.MATRIX_SERVER }}"
MATRIX_ROOM: "${{ secrets.MATRIX_ROOM }}"
MATRIX_USER_ID: "${{ secrets.MATRIX_USER_ID }}"
MATRIX_TOKEN: "${{ secrets.MATRIX_TOKEN }}"
MATRIX_DEVICE_ID: "${{ secrets.MATRIX_DEVICE_ID }}"
uses: docker://keybaseio/client:stable-node
with:
args: .github/workflows/support-files/notifications/entry_point.sh
+19
View File
@@ -0,0 +1,19 @@
name: Run SonarQube Scan
on:
push:
branches:
- develop
# pull_request:
# types: [opened, synchronize, reopened]
jobs:
sonarqube:
name: SonarQube
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
- name: SonarQube Scan
uses: SonarSource/sonarqube-scan-action@v5
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
+1 -1
View File
@@ -6,7 +6,7 @@ jobs:
greeting:
runs-on: ubuntu-latest
steps:
- uses: actions/first-interaction@v1
- uses: actions/first-interaction@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
issue-message: 'Thank you for raising this issue'
+1 -1
View File
@@ -31,7 +31,7 @@ jobs:
- name: Check out repository code
uses: actions/checkout@v4
- name: Download report from previous job
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
with:
name: report
path: .github/workflows/support-files/notifications
@@ -25,7 +25,7 @@ jobs:
uses: actions/checkout@v4
- name: Install Java
uses: actions/setup-java@v4
uses: actions/setup-java@v5
with:
distribution: "temurin"
java-version: "17"
@@ -91,7 +91,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Download binary artifact
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
with:
name: nyms5-apk-arch64
path: apk
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-credential-proxy/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-network-monitor/Cargo.toml
+38 -15
View File
@@ -5,8 +5,15 @@ on:
inputs:
gateway_probe_git_ref:
type: string
default: nym-vpn-core-v1.4.0
required: true
description: Which gateway probe git ref to build the image with
release_image:
description: 'Tag image as a release'
required: true
default: false
type: boolean
env:
WORKING_DIRECTORY: "nym-node-status-api/nym-node-status-agent"
CONTAINER_NAME: "node-status-agent"
@@ -31,10 +38,10 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
run: |
VERSION=$(yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml)
echo "result=$VERSION" >> $GITHUB_OUTPUT
- name: cleanup-gateway-probe-ref
id: cleanup_gateway_probe_ref
@@ -43,19 +50,35 @@ jobs:
GIT_REF_SLUG="${GATEWAY_PROBE_GIT_REF//\//-}"
echo "git_ref=${GIT_REF_SLUG}" >> $GITHUB_OUTPUT
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
fi
- name: Set GIT_TAG variable
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }} -m "Version ${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
- name: Initialize RELEASE_TAG
run: echo "RELEASE_TAG=" >> $GITHUB_ENV
- name: Set RELEASE_TAG for release
if: github.event.inputs.release_image == 'true'
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
- name: Set IMAGE_NAME_AND_TAGS variable
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
- name: New env vars
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
# - name: Remove existing tag if exists
# run: |
# if git rev-parse $${{ env.GIT_TAG }} >/dev/null 2>&1; then
# git push --delete origin $${{ env.GIT_TAG }}
# git tag -d $${{ env.GIT_TAG }}
# fi
# - name: Create tag
# run: |
# git tag -a $${{ env.GIT_TAG }} -m "Version ${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}"
# git push origin $${{ env.GIT_TAG }}
- name: BuildAndPushImageOnHarbor
run: |
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }}
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
+39 -20
View File
@@ -1,7 +1,13 @@
name: Build and upload Node Status API container to harbor.nymte.ch
on:
workflow_dispatch:
inputs:
release_image:
description: 'Tag image as a release'
required: true
default: false
type: boolean
env:
WORKING_DIRECTORY: "nym-node-status-api/nym-node-status-api"
CONTAINER_NAME: "node-status-api"
@@ -26,30 +32,43 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Check if tag exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
echo "Tag ${{ steps.get_version.outputs.result }} already exists"
fi
VERSION=$(yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml)
echo "result=$VERSION" >> $GITHUB_OUTPUT
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Set GIT_TAG variable
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: Initialise RELEASE_TAG
run: echo "RELEASE_TAG=" >> $GITHUB_ENV
- name: Set RELEASE_TAG for release
if: github.event.inputs.release_image == 'true'
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
- name: Set IMAGE_NAME_AND_TAGS variable
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
- name: New env vars
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
# - name: Remove existing tag if exists, then create
# run: |
# if git rev-parse "$GIT_TAG" >/dev/null 2>&1; then
# echo "Tag '$GIT_TAG' already exists, deleting"
# git push --delete origin "$GIT_TAG"
# git tag -d "$GIT_TAG"
# echo "Tag '$GIT_TAG' deleted"
# else
# echo "Tag '$GIT_TAG' does not exist, creating it"
# git tag -a $GIT_TAG -m "Version ${{ steps.get_version.outputs.result }}"
# git push origin $GIT_TAG
# echo "Tag '$GIT_TAG' created"
# fi
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile-pg . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-api/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
uses: mikefarah/yq@v4.47.1
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -5,6 +5,8 @@
>
> ➡️➡️➡️➡️➡️ **View output:**
>
> `storybook`: https://{{ env.NYM_CI_WWW_LOCATION }}.{{ env.NYM_CI_WWW_BASE }}
>
> `branch` {{ env.GITHUB_SERVER_URL }}/{{ env.GITHUB_REPOSITORY }}/tree/{{ env.GIT_BRANCH_NAME }}
>
> `commit` {{ env.GITHUB_SERVER_URL }}/{{ env.GITHUB_REPOSITORY }}/commit/{{ env.GITHUB_SHA }}
+2
View File
@@ -35,6 +35,8 @@ validator-api/keypair
contracts/mixnet/code_id
contracts/mixnet/Justfile
contracts/mixnet/Makefile
artifacts
contracts/artifacts
validator-config
*.patch
validator-api-config.toml
+152
View File
@@ -4,6 +4,158 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2025.15-gruyere] (2025-08-20)
- Migrate strum to 0.27.2 ([#5960])
- WG exit policy scripts update ([#5921])
- Make DNS Resolver fallback optional ([#5920])
- nym-node debug command to reset providers db ([#5914])
- basic zulip client for sending messages ([#5913])
- chore: allow compatibility with 'CDLA-Permissive-2.0' ([#5910])
- feat: ecash liveness check ([#5890])
- Remove old free credential handle ([#5864])
[#5960]: https://github.com/nymtech/nym/pull/5960
[#5921]: https://github.com/nymtech/nym/pull/5921
[#5920]: https://github.com/nymtech/nym/pull/5920
[#5914]: https://github.com/nymtech/nym/pull/5914
[#5913]: https://github.com/nymtech/nym/pull/5913
[#5910]: https://github.com/nymtech/nym/pull/5910
[#5890]: https://github.com/nymtech/nym/pull/5890
[#5864]: https://github.com/nymtech/nym/pull/5864
## [2025.14-feta] (2025-08-05)
- chore: nym node tokio console ([#5909])
- Feature/dkg snapshot epoch ([#5900])
- Feature/dkg epoch dealers query ([#5899])
- sqlx-pool-guard: allocate more memory on windows ([#5896])
- Support mnemonic in the NS agent ([#5883])
- Allow PG database backend ([#5880])
[#5909]: https://github.com/nymtech/nym/pull/5909
[#5900]: https://github.com/nymtech/nym/pull/5900
[#5899]: https://github.com/nymtech/nym/pull/5899
[#5896]: https://github.com/nymtech/nym/pull/5896
[#5883]: https://github.com/nymtech/nym/pull/5883
[#5880]: https://github.com/nymtech/nym/pull/5880
## [2025.13-emmental] (2025-07-22)
- fix: don't allow mixnode running in exit mode ([#5898])
- fix contract build process in Makefile ([#5892])
- bugfix: ignore 'Send' responses when claiming bandwidth ([#5884])
- Update push-node-status-agent.yaml ([#5882])
- listen for shutdown signals during nym-node startup ([#5879])
- feat: forbid running mixnode + entry on the same node ([#5878])
- chore: 1.88 clippy ([#5877])
- Batch SQL writes for packet stats ([#5874])
- fix the broken link ([#5873])
- Set busy_timeout in sqlx ([#5872])
- feat: basic performance contract integration [within Nym API] ([#5871])
- scraper bugfix: ignore precommits from missing validators ([#5867])
- Return true remaining ([#5866])
- Make Mix hops optional for Mixnet Client SURBs ([#5861])
- Check gateway supported versions ([#5860])
- Add build info endpoints ([#5857])
- Clear out screaming logs ([#5856])
- fix removal of qa env ([#5855])
- Use display when printing paths ([#5853])
- feat: initial performance contract ([#5833])
- Security patches for the `dkg` crate ([#5828])
- HTTP Discovery objects & network defaults ([#5814])
[#5898]: https://github.com/nymtech/nym/pull/5898
[#5892]: https://github.com/nymtech/nym/pull/5892
[#5884]: https://github.com/nymtech/nym/pull/5884
[#5882]: https://github.com/nymtech/nym/pull/5882
[#5879]: https://github.com/nymtech/nym/pull/5879
[#5878]: https://github.com/nymtech/nym/pull/5878
[#5877]: https://github.com/nymtech/nym/pull/5877
[#5874]: https://github.com/nymtech/nym/pull/5874
[#5873]: https://github.com/nymtech/nym/pull/5873
[#5872]: https://github.com/nymtech/nym/pull/5872
[#5871]: https://github.com/nymtech/nym/pull/5871
[#5867]: https://github.com/nymtech/nym/pull/5867
[#5866]: https://github.com/nymtech/nym/pull/5866
[#5861]: https://github.com/nymtech/nym/pull/5861
[#5860]: https://github.com/nymtech/nym/pull/5860
[#5857]: https://github.com/nymtech/nym/pull/5857
[#5856]: https://github.com/nymtech/nym/pull/5856
[#5855]: https://github.com/nymtech/nym/pull/5855
[#5853]: https://github.com/nymtech/nym/pull/5853
[#5833]: https://github.com/nymtech/nym/pull/5833
[#5828]: https://github.com/nymtech/nym/pull/5828
[#5814]: https://github.com/nymtech/nym/pull/5814
## [2025.12-dolcelatte] (2025-07-07)
- bugfix: key-rotation + reply SURBs ([#5876])
- Bugfix/backwards compat ([#5865])
- bugfix: allow gateways to permit authentication from v4 clients ([#5862])
- fixed client route for obtaining v2 list of gateways ([#5859])
- Updated browser extension piece removal ([#5849])
- Remove/old env references ([#5848])
- Remove qa env ([#5847])
- remove not used old mock-api ([#5845])
- remove bity dir ([#5844])
- build(deps-dev): bump webpack-dev-server from 4.13.2 to 5.2.1 in /wasm/mix-fetch/internal-dev ([#5843])
- Amended the buy section ([#5841])
- Removing test-net faucet ([#5840])
- Feature/node status dvpn directory ([#5829])
- build(deps-dev): bump webpack-dev-server from 4.15.2 to 5.2.1 in /nym-credential-proxy/vpn-api-lib-wasm/internal-dev ([#5826])
- bugfix: fix swapped total and circulating supplies ([#5822])
- build(deps): bump tar-fs from 3.0.8 to 3.0.9 in /sdk/typescript/tests/integration-tests/mix-fetch ([#5821])
- Url scheme warning log ([#5819])
- chore: adjust heuristic for wireguard peer activity ([#5818])
- Use the same client bandwidth for top up ([#5813])
- Replace chrono with time in NS API ([#5811])
- build(deps-dev): bump http-proxy-middleware from 2.0.4 to 2.0.9 in /clients/native/examples/js-examples/websocket ([#5810])
- build(deps): bump tokio from 1.44.2 to 1.45.1 ([#5798])
- Close sqlite pool before moving or reopening databases ([#5796])
- HTTP Client Retries, Fallbacks, and Redirects ([#5789])
- feat: key rotation ([#5777])
- build(deps): bump next from 14.2.15 to 14.2.26 in /documentation/docs ([#5772])
- build(deps): bump undici from 5.28.5 to 5.29.0 in /.github/actions/nym-hash-releases/src ([#5771])
- build(deps): bump cargo_metadata from 0.18.1 to 0.19.2 ([#5765])
- build(deps): bump tempfile from 3.19.1 to 3.20.0 ([#5764])
- [Feature] Noise XKpsk3 integration (2025 version) ([#5692])
- feature: nympool contract ([#5464])
- chore: fixed typo in API endpoint parameter ([#5449])
[#5876]: https://github.com/nymtech/nym/pull/5876
[#5865]: https://github.com/nymtech/nym/pull/5865
[#5862]: https://github.com/nymtech/nym/pull/5862
[#5859]: https://github.com/nymtech/nym/pull/5859
[#5849]: https://github.com/nymtech/nym/pull/5849
[#5848]: https://github.com/nymtech/nym/pull/5848
[#5847]: https://github.com/nymtech/nym/pull/5847
[#5845]: https://github.com/nymtech/nym/pull/5845
[#5844]: https://github.com/nymtech/nym/pull/5844
[#5843]: https://github.com/nymtech/nym/pull/5843
[#5841]: https://github.com/nymtech/nym/pull/5841
[#5840]: https://github.com/nymtech/nym/pull/5840
[#5829]: https://github.com/nymtech/nym/pull/5829
[#5826]: https://github.com/nymtech/nym/pull/5826
[#5822]: https://github.com/nymtech/nym/pull/5822
[#5821]: https://github.com/nymtech/nym/pull/5821
[#5819]: https://github.com/nymtech/nym/pull/5819
[#5818]: https://github.com/nymtech/nym/pull/5818
[#5813]: https://github.com/nymtech/nym/pull/5813
[#5811]: https://github.com/nymtech/nym/pull/5811
[#5810]: https://github.com/nymtech/nym/pull/5810
[#5798]: https://github.com/nymtech/nym/pull/5798
[#5796]: https://github.com/nymtech/nym/pull/5796
[#5789]: https://github.com/nymtech/nym/pull/5789
[#5777]: https://github.com/nymtech/nym/pull/5777
[#5772]: https://github.com/nymtech/nym/pull/5772
[#5771]: https://github.com/nymtech/nym/pull/5771
[#5765]: https://github.com/nymtech/nym/pull/5765
[#5764]: https://github.com/nymtech/nym/pull/5764
[#5692]: https://github.com/nymtech/nym/pull/5692
[#5464]: https://github.com/nymtech/nym/pull/5464
[#5449]: https://github.com/nymtech/nym/pull/5449
## [2025.11-cheddar] (2025-06-10)
- No autoremoval of peers ([#5831])
+686
View File
@@ -0,0 +1,686 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Nym is a privacy platform that uses mixnet technology to protect against metadata surveillance. The platform consists of several key components:
- Mixnet nodes (mixnodes) for packet mixing
- Gateways (entry/exit points for the network)
- Clients for interacting with the network
- Network monitoring tools
- Validators for network consensus
- Various service providers and integrations
## Build Commands
### Rust Components
```bash
# Default build (debug)
cargo build
# Release build
cargo build --release
# Build a specific package
cargo build -p <package-name>
# Build main components
make build
# Build release versions of main binaries and contracts
make build-release
# Build specific binaries
make build-nym-cli
cargo build -p nym-node --release
cargo build -p nym-api --release
```
### Testing
```bash
# Run clippy, unit tests, and formatting
make test
# Run all tests including slow tests
make test-all
# Run clippy on all workspaces
make clippy
# Run unit tests for a specific package
cargo test -p <package-name>
# Run only expensive/ignored tests
cargo test --workspace -- --ignored
# Run API tests
dotenv -f envs/sandbox.env -- cargo test --test public-api-tests
# Run tests with specific log level
RUST_LOG=debug cargo test -p <package-name>
# Run specific test scripts
./nym-node/tests/test_apis.sh
./scripts/wireguard-exit-policy/exit-policy-tests.sh
```
### Linting and Formatting
```bash
# Run rustfmt on all code
make fmt
# Check formatting without modifying
cargo fmt --all -- --check
# Run clippy with all targets
cargo clippy --workspace --all-targets -- -D warnings
# TypeScript linting
yarn lint
yarn lint:fix
yarn types:lint:fix
# Check dependencies for security/licensing issues
cargo deny check
```
### WASM Components
```bash
# Build all WASM components
make sdk-wasm-build
# Build TypeScript SDK
yarn build:sdk
npx lerna run --scope @nymproject/sdk build --stream
# Build and test WASM components
make sdk-wasm
# Build specific WASM packages
cd wasm/client && make
cd wasm/mix-fetch && make
cd wasm/node-tester && make
```
### Contract Development
```bash
# Build all contracts
make contracts
# Build contracts in release mode
make build-release-contracts
# Generate contract schemas
make contract-schema
# Run wasm-opt on contracts
make wasm-opt-contracts
# Check contracts with cosmwasm-check
make cosmwasm-check-contracts
```
### Running Components
```bash
# Run nym-node as a mixnode
cargo run -p nym-node -- run --mode mixnode
# Run nym-node as a gateway
cargo run -p nym-node -- run --mode gateway
# Run the network monitor
cargo run -p nym-network-monitor
# Run the API server
cargo run -p nym-api
# Run with specific environment
dotenv -f envs/sandbox.env -- cargo run -p nym-api
# Start a local network
./scripts/localnet_start.sh
```
## Architecture
The Nym platform consists of various components organized as a monorepo:
1. **Core Mixnet Infrastructure**:
- `nym-node`: Core binary supporting mixnode and gateway modes
- `common/nymsphinx`: Implementation of the Sphinx packet format
- `common/topology`: Network topology management
- `common/types`: Shared data types across components
2. **Network Monitoring**:
- `nym-network-monitor`: Monitors the network's reliability and performance
- `nym-api`: API server for network stats and monitoring data
- Metrics tracking for nodes, routes, and overall network health
3. **Client Implementations**:
- `clients/native`: Native Rust client implementation
- `clients/socks5`: SOCKS5 proxy client for standard applications
- `wasm`: WebAssembly client implementations (for browsers)
- `nym-connect`: Desktop and mobile clients
4. **Blockchain & Smart Contracts**:
- `common/cosmwasm-smart-contracts`: Smart contract implementations
- `contracts`: CosmWasm contracts for the Nym network
- `common/ledger`: Blockchain integration
5. **Utilities & Tools**:
- `tools`: Various CLI tools and utilities
- `sdk`: SDKs for different languages and platforms
- `documentation`: Documentation generation and management
## Packet System
Nym uses a modified Sphinx packet format for its mixnet:
1. **Message Chunking**:
- Messages are divided into "sets" and "fragments"
- Each fragment fits in a single Sphinx packet
- The `common/nymsphinx/chunking` module handles message fragmentation
2. **Routing**:
- Packets traverse through 3 layers of mixnodes
- Routing information is encrypted in layers (onion routing)
- The final gateway receives and processes the messages
3. **Monitoring**:
- Monitoring system tracks packet delivery through the network
- Routes are analyzed for reliability statistics
- Node performance metrics are collected
## Network Protocol
Nym implements the Loopix mixnet design with several key privacy features:
1. **Continuous-time Mixing**:
- Each mixnode delays messages independently with an exponential distribution
- This creates random reordering of packets, destroying timing correlations
- Offers better anonymity properties than batch mixing approaches
2. **Cover Traffic**:
- Clients and nodes generate dummy "loop" packets that circulate through the network
- These packets are indistinguishable from real traffic
- Creates a baseline level of traffic that hides actual communication patterns
- Provides unobservability (hiding when and how much real traffic is being sent)
3. **Stratified Network Architecture**:
- Traffic flows through Entry Gateway → 3 Mixnode Layers → Exit Gateway
- Path selection is independent per-message (unlike Tor)
- Each node connects only to adjacent layers
4. **Anonymous Replies**:
- Single-Use Reply Blocks (SURBs) allow receiving messages without revealing identity
- Enables bidirectional communication while maintaining privacy
## Network Monitoring Architecture
The network monitoring system is a core component that measures mixnet reliability:
1. The `nym-network-monitor` sends test packets through the network
2. These packets follow predefined routes through multiple mixnodes
3. Metrics are collected about:
- Successful and failed packet deliveries
- Node reliability (percentage of successful packet handling)
- Route reliability (which specific route combinations work best)
4. Results are stored in the database and used by `nym-api` to:
- Present node performance statistics
- Determine network rewards
- Provide route selection guidance to clients
In the current branch, metrics collection is being enhanced with a fanout approach to submit to multiple API endpoints.
## Development Environment
### Required Dependencies
- Rust toolchain (stable, 1.80+)
- Node.js (v20+) and yarn for TypeScript components
- SQLite for local database development
- PostgreSQL for API database (optional, for full API functionality)
- CosmWasm tools for contract development
- For building contracts: `wasm-opt` tool from `binaryen`
- Python 3.8+ for some scripts
- Docker (optional, for containerized development)
- protoc (Protocol Buffers compiler) for some components
### Environment Configurations
The `envs/` directory contains pre-configured environments:
#### Available Environments
- **`local.env`**: Local development environment
- Points to local services (localhost)
- Uses test mnemonics and keys
- Ideal for testing without external dependencies
- **`sandbox.env`**: Sandbox test network
- Public test network with real nodes
- Test tokens available from faucet
- Contract addresses for sandbox deployment
- API: https://sandbox-nym-api1.nymtech.net
- **`mainnet.env`**: Production mainnet
- Real network with real tokens
- Production contract addresses
- API: https://validator.nymtech.net
- Use with caution!
- **`canary.env`**: Canary deployment
- Pre-release testing environment
- Tests new features before mainnet
- **`mainnet-local-api.env`**: Hybrid environment
- Uses mainnet contracts but local API
- Useful for API development against mainnet data
#### Key Environment Variables
```bash
# Network configuration
NETWORK_NAME=sandbox # Network identifier
BECH32_PREFIX=n # Address prefix (n for sandbox, n for mainnet)
NYM_API=https://sandbox-nym-api1.nymtech.net/api
NYXD=https://rpc.sandbox.nymtech.net
NYM_API_NETWORK=sandbox
# Contract addresses (network-specific)
MIXNET_CONTRACT_ADDRESS=n1xr3rq8yvd7qplsw5yx90ftsr2zdhg4e9z60h5duusgxpv72hud3sjkxkav
VESTING_CONTRACT_ADDRESS=n1unyuj8qnmygvzuex3dwmg9yzt9alhvyeat0uu0jedg2wj33efl5qackslz
# ... other contract addresses
# Mnemonic for testing (NEVER use in production)
MNEMONIC="clutch captain shoe salt awake harvest setup primary inmate ugly among become"
# API Keys and tokens
IPINFO_API_TOKEN=your_token_here
AUTHENTICATOR_PASSWORD=password_here
# Logging
RUST_LOG=info # Options: error, warn, info, debug, trace
RUST_BACKTRACE=1 # Enable backtraces
# Database
DATABASE_URL=postgresql://user:pass@localhost/nym_api
```
#### Using Environment Files
```bash
# Load environment and run command
dotenv -f envs/sandbox.env -- cargo run -p nym-api
# Export to shell
source envs/sandbox.env
# Use with make targets
dotenv -f envs/sandbox.env -- make run-api-tests
```
## Initial Setup
### First Time Setup
1. **Install Prerequisites**
```bash
# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Install Node.js and yarn
# Via nvm (recommended):
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh | bash
nvm install 20
npm install -g yarn
# Install build tools
# Ubuntu/Debian:
sudo apt-get install build-essential pkg-config libssl-dev protobuf-compiler libpq-dev
# macOS:
brew install protobuf postgresql
# Install wasm-opt for contract builds
npm install -g wasm-opt
# Add wasm target for Rust
rustup target add wasm32-unknown-unknown
```
2. **Clone and Setup Repository**
```bash
git clone https://github.com/nymtech/nym.git
cd nym/nym
# Install JavaScript dependencies
yarn install
# Build the project
make build
```
3. **Database Setup (Optional, for API development)**
```bash
# Install PostgreSQL
# Create database
createdb nym_api
# Run migrations (from nym-api directory)
cd nym-api
sqlx migrate run
```
### Quick Start
```bash
# Run a mixnode locally
dotenv -f envs/sandbox.env -- cargo run -p nym-node -- run --mode mixnode --id my-mixnode
# Run a gateway locally
dotenv -f envs/sandbox.env -- cargo run -p nym-node -- run --mode gateway --id my-gateway
# Run the API server
dotenv -f envs/sandbox.env -- cargo run -p nym-api
# Run a client
cargo run -p nym-client -- init --id my-client
cargo run -p nym-client -- run --id my-client
```
## CI/CD Pipeline
The project uses GitHub Actions for CI/CD with several key workflows:
1. **Build and Test**:
- `ci-build.yml`: Main build workflow for Rust components
- Tests are run on multiple platforms (Linux, Windows, macOS)
- Includes formatting check (rustfmt) and linting (clippy)
2. **Release Process**:
- Binary artifacts are published on release tags
- Multiple platform builds are created
3. **Documentation**:
- Documentation is automatically built and deployed
## Database Structure
The system uses SQLite databases with tables like:
- `mixnode_status`: Status information about mixnodes
- `gateway_status`: Status information about gateways
- `routes`: Route performance information (success/failure of specific paths)
- `monitor_run`: Information about monitoring test runs
## Development Workflows
### Running a Node
To run the mixnode or gateway:
```bash
# Run nym-node as a mixnode with specified identity
cargo run -p nym-node -- run --mode mixnode --id my-mixnode
# Run nym-node as a gateway
cargo run -p nym-node -- run --mode gateway --id my-gateway
```
### Configuration
Nodes can be configured with files in various locations:
- Command-line arguments
- Environment variables
- `.env` files specified with `--config-env-file`
### Monitoring
To monitor the health of your node:
- View logs for real-time information
- Use the node's HTTP API for status information
- Check the explorer for public node statistics
## Common Libraries
- `common/types`: Shared data types across all components
- `common/crypto`: Cryptographic primitives and wrappers
- `common/client-core`: Core client functionality
- `common/gateway-client`: Client-gateway communication
- `common/task`: Task management and concurrency utilities
- `common/nymsphinx`: Sphinx packet implementation for mixnet
- `common/topology`: Network topology management
- `common/credentials`: Credential system for privacy-preserving authentication
- `common/bandwidth-controller`: Bandwidth management and accounting
## Code Conventions
- Error handling: Use anyhow/thiserror for structured error handling
- Logging: Use the tracing framework for logging and diagnostics
- State management: Generally use Tokio/futures for async code
- Configuration: Use the config crate and env vars with defaults
- Database: Use sqlx for type-safe database queries
- Follow clippy recommendations and rustfmt formatting
- Use semantic commit messages: feat, fix, docs, refactor, test, chore
## When Making Changes
- Run `make test` before submitting PRs
- Follow Rust naming conventions
- Use `clippy` to check for common issues
- Update SQLx query caches when modifying DB queries: `cargo sqlx prepare`
- Consider backward compatibility for protocol changes
- Use lefthook pre-commit hooks for TypeScript formatting
- Run `cargo deny check` to verify dependency compliance
- Test against both sandbox and local environments when possible
- Update relevant documentation and CHANGELOG.md
## Development Tools
### Useful Cargo Commands
```bash
# Check for outdated dependencies
cargo outdated
# Analyze binary size
cargo bloat --release -p nym-node
# Generate dependency graph
cargo tree -p nym-api
# Run with instrumentation
cargo run --features profiling -p nym-node
# Check for security advisories
cargo audit
```
### Database Tools
```bash
# SQLx CLI for migrations
cargo install sqlx-cli
# Create new migration
cd nym-api && sqlx migrate add <migration_name>
# Prepare query metadata for offline compilation
cargo sqlx prepare --workspace
# View database schema
./nym-api/enter_db.sh
```
### Development Scripts
- `scripts/build_topology.py`: Generate network topology files
- `scripts/node_api_check.py`: Verify node API endpoints
- `scripts/network_tunnel_manager.sh`: Manage network tunnels
- `scripts/localnet_start.sh`: Start a local test network
- Various deployment scripts in `deployment/` for different environments
## Debugging
- Enable more verbose logging with the RUST_LOG environment variable:
```
RUST_LOG=debug,nym_node=trace cargo run -p nym-node -- run --mode mixnode
```
- Use the HTTP API endpoints for status information
- Check monitoring data in the database for network performance metrics
- For complex issues, use tracing tools to follow packet flow
- Enable backtraces: `RUST_BACKTRACE=full`
- For WASM debugging: Use browser developer tools with source maps
## Deployment and Advanced Configurations
### Deployment Structure
The `deployment/` directory contains Ansible playbooks and configurations for various deployment scenarios:
- **`aws/`**: AWS-specific deployment configurations
- **`mixnode/`**: Mixnode deployment playbooks
- **`gateway/`**: Gateway deployment playbooks
- **`validator/`**: Validator node deployment
- **`sandbox-v2/`**: Complete sandbox environment setup
- **`big-dipper-2/`**: Block explorer deployment
### Sandbox V2 Deployment
The sandbox-v2 deployment (`deployment/sandbox-v2/`) provides a complete test environment:
```bash
# Key playbooks:
- deploy.yaml # Main deployment orchestrator
- deploy-mixnodes.yaml # Deploy mixnodes
- deploy-gateways.yaml # Deploy gateways
- deploy-validators.yaml # Deploy validator nodes
- deploy-nym-api.yaml # Deploy API services
```
### Custom Environment Setup
To create a custom environment:
1. Copy an existing env file: `cp envs/sandbox.env envs/custom.env`
2. Modify the network endpoints and contract addresses
3. Update the `NETWORK_NAME` to your identifier
4. Set appropriate mnemonics and keys (use fresh ones for production!)
### Contract Addresses
Contract addresses are network-specific and defined in environment files:
- Mixnet contract: Manages mixnode/gateway registry
- Vesting contract: Handles token vesting schedules
- Coconut contracts: Privacy-preserving credentials
- Name service: Human-readable address mapping
- Ecash contract: Electronic cash functionality
### Local Network Setup
For a completely local network:
```bash
# Start local chain
./scripts/localnet_start.sh
# Deploy contracts
cd contracts
make deploy-local
# Start nodes with local config
dotenv -f envs/local.env -- cargo run -p nym-node -- run --mode mixnode
```
## Common Issues and Troubleshooting
### Database Issues
- When modifying database queries, you must update SQLx query caches:
```bash
cargo sqlx prepare
```
- If you see SQLx errors about missing query files, this is likely the cause
- For "database is locked" errors with SQLite, ensure only one process accesses the DB
- For PostgreSQL connection issues, verify DATABASE_URL and that the server is running
### API Connection Issues
- Check the environment variables pointing to the APIs (NYM_API, NYXD)
- Verify network connectivity and API health endpoints
- For authentication issues, check node keys and credentials
- Common endpoints to verify:
- API health: `$NYM_API/health`
- Chain status: `$NYXD/status`
- Contract info: `$NYXD/cosmwasm/wasm/v1/contract/$CONTRACT_ADDRESS`
### Build Problems
- Clean dependencies with `cargo clean` for a fresh build
- Check for compatible Rust version (1.80+ recommended)
- For smart contract builds, ensure wasm-opt is installed: `npm install -g wasm-opt`
- For cross-compilation issues, check target-specific dependencies
- WASM build issues: Ensure wasm32-unknown-unknown target is installed:
```bash
rustup target add wasm32-unknown-unknown
```
- For "cannot find -lpq" errors, install PostgreSQL development files:
```bash
# Ubuntu/Debian
sudo apt-get install libpq-dev
# macOS
brew install postgresql
```
### Environment Issues
- Contract address mismatches: Ensure you're using the correct environment file
- "Account sequence mismatch": The account nonce is out of sync, wait and retry
- Token decimal issues: Sandbox uses different decimal places than mainnet
- API version mismatches: Ensure your local API version matches the network
- "Insufficient funds": Get test tokens from faucet (sandbox) or check balance
- Gateway/mixnode bonding issues: Verify minimum stake requirements
## Working with Routes and Monitoring
1. Route monitoring metrics are stored in a `routes` table with:
- Layer node IDs (layer1, layer2, layer3, gw)
- Success flag (boolean)
- Timestamp
2. To analyze routes:
- Check `NetworkAccount` and `AccountingRoute` in `nym-network-monitor/src/accounting.rs`
- View monitoring logic in `common/nymsphinx/chunking/monitoring.rs`
- Observe how routes are submitted to the database in the `submit_accounting_routes_to_db` function
## Performance Optimization
### Profiling and Benchmarking
```bash
# Run benchmarks
cargo bench -p nym-node
# Profile with perf (Linux)
cargo build --release --features profiling
perf record --call-graph=dwarf ./target/release/nym-node run --mode mixnode
perf report
# Generate flamegraph
cargo install flamegraph
cargo flamegraph --bin nym-node -- run --mode mixnode
```
### Common Performance Considerations
- Use bounded channels for backpressure
- Batch database operations where possible
- Monitor memory usage with `RUST_LOG=nym_node::metrics=debug`
- Use connection pooling for database connections
- Consider using `jemalloc` for better memory allocation performance
Generated
+1721 -2435
View File
File diff suppressed because it is too large Load Diff
+26 -15
View File
@@ -39,7 +39,8 @@ members = [
"common/cosmwasm-smart-contracts/ecash-contract",
"common/cosmwasm-smart-contracts/group-contract",
"common/cosmwasm-smart-contracts/mixnet-contract",
"common/cosmwasm-smart-contracts/multisig-contract", "common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/multisig-contract",
"common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/nym-pool-contract",
"common/cosmwasm-smart-contracts/vesting-contract",
"common/credential-storage",
@@ -49,13 +50,14 @@ members = [
"common/credentials-interface",
"common/crypto",
"common/dkg",
"common/ecash-signer-check",
"common/ecash-signer-check-types",
"common/ecash-time",
"common/execute",
"common/exit-policy",
"common/gateway-requests",
"common/gateway-stats-storage",
"common/gateway-storage",
"common/http-api-client",
"common/http-api-client", "common/http-api-client-macro",
"common/http-api-common",
"common/inclusion-probability",
"common/ip-packet-requests",
@@ -89,7 +91,7 @@ members = [
"common/socks5/requests",
"common/statistics",
"common/store-cipher",
"common/task",
"common/task", "common/test-utils",
"common/ticketbooks-merkle",
"common/topology",
"common/tun",
@@ -99,7 +101,12 @@ members = [
"common/wasm/storage",
"common/wasm/utils",
"common/wireguard",
"common/wireguard-private-metadata/client",
"common/wireguard-private-metadata/server",
"common/wireguard-private-metadata/shared",
"common/wireguard-private-metadata/tests",
"common/wireguard-types",
"common/zulip-client",
"documentation/autodoc",
"gateway",
"nym-api",
@@ -123,7 +130,6 @@ members = [
"sdk/ffi/go",
"sdk/ffi/shared",
"sdk/rust/nym-sdk",
"service-providers/authenticator",
"service-providers/common",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -161,7 +167,6 @@ default-members = [
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"tools/nymvisor",
@@ -176,7 +181,7 @@ homepage = "https://nymtech.net"
documentation = "https://nymtech.net"
edition = "2021"
license = "Apache-2.0"
rust-version = "1.80"
rust-version = "1.81"
readme = "README.md"
[workspace.dependencies]
@@ -217,8 +222,8 @@ clap_complete = "4.5"
clap_complete_fig = "4.5"
colored = "2.2"
comfy-table = "7.1.4"
console = "0.15.11"
console-subscriber = "0.1.1"
console = "0.16.0"
console-subscriber = "0.4.1"
console_error_panic_hook = "0.1"
const-str = "0.5.6"
const_format = "0.2.34"
@@ -234,6 +239,7 @@ digest = "0.10.7"
dirs = "5.0"
doc-comment = "0.3"
dotenvy = "0.15.6"
dyn-clone = "1.0.19"
ecdsa = "0.16"
ed25519-dalek = "2.1"
encoding_rs = "0.8.35"
@@ -263,8 +269,9 @@ humantime = "2.2.0"
humantime-serde = "1.1.1"
hyper = "1.6.0"
hyper-util = "0.1"
indicatif = "0.17.11"
indicatif = "0.18.0"
inquire = "0.6.2"
inventory = "0.3.21"
ip_network = "0.4.1"
ipnetwork = "0.20"
itertools = "0.14.0"
@@ -311,22 +318,23 @@ serde_json_path = "0.7.2"
serde_repr = "0.1"
serde_with = "3.9.0"
serde_yaml = "0.9.25"
serde_plain = "1.0.2"
sha2 = "0.10.9"
si-scale = "0.2.3"
snow = "0.9.6"
sphinx-packet = "=0.6.0"
sqlx = "0.8.6"
strum = "0.26"
strum_macros = "0.26"
strum = "0.27.2"
strum_macros = "0.27.2"
subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.33.0"
syn = "2"
sysinfo = "0.37.0"
tap = "1.0.1"
tar = "0.4.44"
tempfile = "3.20"
thiserror = "2.0"
time = "0.3.41"
tokio = "1.45"
tokio = "1.47"
tokio-postgres = "0.7"
tokio-stream = "0.1.17"
tokio-test = "0.4.4"
@@ -434,6 +442,9 @@ opt-level = 'z'
# lto = true
opt-level = 'z'
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[workspace.lints.clippy]
unwrap_used = "deny"
expect_used = "deny"
+58 -10
View File
@@ -12,7 +12,11 @@ help:
@echo " clippy: run clippy for all workspaces"
@echo " test: run clippy, unit tests, and formatting."
@echo " test-all: like test, but also includes the expensive tests"
@echo " deb: build debian packages
@echo " deb: build debian packages"
@echo ""
@echo "Contract building targets:"
@echo " contracts: build contracts for development (includes wasm-opt)"
@echo " publish-contracts: build contracts using Docker optimizer (deterministic)"
# -----------------------------------------------------------------------------
# Meta targets
@@ -130,25 +134,69 @@ cargo-test: sdk-wasm-test
clippy: sdk-wasm-lint
# -----------------------------------------------------------------------------
# Build contracts ready for deploy
# Build CosmWasm contracts (deterministic docker build)
# -----------------------------------------------------------------------------
CONTRACTS=vesting_contract mixnet_contract nym_ecash cw3_flex_multisig cw4_group nym_coconut_dkg nym_pool_contract nym_performance_contract
CONTRACTS_WASM=$(addsuffix .wasm, $(CONTRACTS))
CONTRACTS_OUT_DIR=contracts/target/wasm32-unknown-unknown/release
contracts: build-release-contracts wasm-opt-contracts cosmwasm-check-contracts
WASM_CONTRACT_DIR := contracts/target/wasm32-unknown-unknown/release
# Find every direct contract folder that contains a Cargo.toml
CONTRACT_DIRS := $(shell find contracts -type f -name Cargo.toml \( ! -path "contracts/Cargo.toml" \) | grep -v integration-tests | xargs -n1 dirname | sort -u)
CONTRACTS_OUT_DIR = contracts/artifacts
# Build all contracts via the official CosmWasm optimizer image (one invocation per contract)
# See : https://github.com/CosmWasm/optimizer?tab=readme-ov-file#contracts-excluded-from-workspace
# The optimizer ships separate multi-arch images. ARM builds are *not* bit-for-bit identical to the
# canonical x86_64 build (see README notice in CosmWasm/optimizer). For reproducible artefacts we
# therefore always run the amd64 variant by default.
# Override with :
# $ COSMWASM_OPTIMIZER_IMAGE=cosmwasm/optimizer-arm64:0.17.0 make contracts-publish
#
COSMWASM_OPTIMIZER_IMAGE ?= cosmwasm/optimizer:0.17.0
COSMWASM_OPTIMIZER_PLATFORM ?= linux/amd64
# Ensure clean build environment and run the optimizer
optimize-contracts:
@rm -rf artifacts 2>/dev/null || true
@echo "=== Ensuring clean build environment"
docker volume rm nym_contracts_cache 2>/dev/null || true
docker volume rm registry_cache 2>/dev/null || true
@for DIR in $(CONTRACT_DIRS); do \
echo "=== Optimizing $${DIR}"; \
docker run --rm --platform $(COSMWASM_OPTIMIZER_PLATFORM) \
-v $(CURDIR):/code \
--mount type=volume,source=nym_contracts_cache,target=/target \
--mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
-e CARGO_BUILD_INCREMENTAL=false \
-e RUSTFLAGS="-C target-cpu=generic -C debuginfo=0" \
-e SOURCE_DATE_EPOCH=1 \
$(COSMWASM_OPTIMIZER_IMAGE) $${DIR}; \
done
@mkdir -p $(CONTRACTS_OUT_DIR)
@cp artifacts/*.wasm $(CONTRACTS_OUT_DIR)/ 2>/dev/null || true
@cd $(CONTRACTS_OUT_DIR) && sha256sum *.wasm > checksums.txt
# Cleanup temporary artefacts directory
@rm -rf artifacts 2>/dev/null || true
wasm-opt-contracts:
for contract in $(CONTRACTS_WASM); do \
wasm-opt --signext-lowering -Os $(CONTRACTS_OUT_DIR)/$$contract -o $(CONTRACTS_OUT_DIR)/$$contract; \
@for WASM in $(WASM_CONTRACT_DIR)/*.wasm; do \
echo "Running wasm-opt on $$WASM"; \
wasm-opt --signext-lowering -Os $$WASM -o $$WASM ; \
done
cosmwasm-check-contracts:
for contract in $(CONTRACTS_WASM); do \
cosmwasm-check $(CONTRACTS_OUT_DIR)/$$contract; \
@for WASM in $(WASM_CONTRACT_DIR)/*.wasm; do \
echo "Checking $$WASM"; \
cosmwasm-check $$WASM ; \
done
# Default development build
contracts: build-release-contracts wasm-opt-contracts cosmwasm-check-contracts
# Publishing build used by CI deterministic Docker optimiser
publish-contracts: optimize-contracts cosmwasm-check-contracts
# Consider adding 's' to make plural consistent (beware: used in github workflow)
contract-schema:
$(MAKE) -C contracts schema
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.57"
version = "1.1.61"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -111,7 +111,7 @@ impl SocketClient {
let dkg_query_client = if self.config.base.client.disabled_credentials_mode {
None
} else {
Some(default_query_dkg_client_from_config(&self.config.base))
Some(default_query_dkg_client_from_config(&self.config.base)?)
};
let storage = self.initialise_storage().await?;
+1 -1
View File
@@ -318,7 +318,7 @@ impl Handler {
async fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
debug!("Handling text message request");
trace!("Content: {:?}", msg);
trace!("Content: {msg:?}");
self.received_response_type = ReceivedResponseType::Text;
let client_request = ClientRequest::try_from_text(msg);
+2 -2
View File
@@ -68,9 +68,9 @@ impl Listener {
new_conn = tcp_listener.accept() => {
match new_conn {
Ok((mut socket, remote_addr)) => {
debug!("Received connection from {:?}", remote_addr);
debug!("Received connection from {remote_addr:?}");
if self.state.is_connected() {
warn!("Tried to open a duplicate websocket connection. The request came from {}", remote_addr);
warn!("Tried to open a duplicate websocket connection. The request came from {remote_addr}");
// if we've already got a connection, don't allow another one
// while we only ever want to accept a single connection, we don't want
// to leave clients hanging (and also allow for reconnection if it somehow
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.57"
version = "1.1.61"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
+1 -1
View File
@@ -137,7 +137,7 @@ impl AsyncFileWatcher {
log::error!("the file watcher receiver has been dropped!");
}
} else {
log::debug!("will not propagate information about {:?}", event);
log::debug!("will not propagate information about {event:?}");
}
}
Err(err) => {
@@ -28,8 +28,6 @@ pub type HmacSha256 = Hmac<Sha256>;
pub type Nonce = u64;
pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: u64 = 250 * 1024 * 1024 * 1024; // 250 GB
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct IpPair {
pub ipv4: Ipv4Addr,
@@ -5,7 +5,7 @@ use crate::error::BandwidthControllerError;
use crate::utils::{
get_aggregate_verification_key, get_coin_index_signatures, get_expiration_date_signatures,
};
use log::info;
use log::{error, info};
use nym_credential_storage::storage::Storage;
use nym_credentials::ecash::bandwidth::IssuanceTicketBook;
use nym_credentials::ecash::utils::obtain_aggregate_wallet;
@@ -42,7 +42,8 @@ where
deposit_amount.into(),
None,
)
.await?;
.await
.inspect_err(|err| error!("ticketbook deposit fail with: {err}"))?;
let deposit_id = result.parse_singleton_u32_contract_data()?;
+1 -1
View File
@@ -11,7 +11,7 @@ impl std::fmt::Display for BandwidthStatusMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BandwidthStatusMessage::RemainingBandwidth(b) => {
write!(f, "remaining bandwidth: {}", b)
write!(f, "remaining bandwidth: {b}")
}
BandwidthStatusMessage::NoBandwidth => write!(f, "no bandwidth left"),
}
+3 -3
View File
@@ -13,7 +13,7 @@ use nym_credentials_interface::{
};
use nym_ecash_time::Date;
use nym_validator_client::coconut::all_ecash_api_clients;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nym_api::{EpochId, NymApiClientExt};
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use nym_validator_client::EcashApiClient;
use rand::prelude::SliceRandom;
@@ -207,7 +207,7 @@ where
<St as Storage>::StorageError: Send + Sync + 'static,
{
if let Some(stored) = storage
.get_expiration_date_signatures(expiration_date)
.get_expiration_date_signatures(expiration_date, epoch_id)
.await
.map_err(BandwidthControllerError::credential_storage_error)?
{
@@ -220,7 +220,7 @@ where
ecash_apis,
|api| async move {
api.api_client
.global_expiration_date_signatures(Some(expiration_date))
.global_expiration_date_signatures(Some(expiration_date), Some(epoch_id))
.await
},
format!("aggregated coin index signatures for date {expiration_date}"),
+7 -5
View File
@@ -13,10 +13,10 @@ async-trait = { workspace = true }
base64 = { workspace = true }
bs58 = { workspace = true }
clap = { workspace = true, optional = true }
cfg-if = { workspace = true }
comfy-table = { workspace = true, optional = true }
futures = { workspace = true }
humantime-serde = { workspace = true }
log = { workspace = true }
humantime = { workspace = true }
rand = { workspace = true }
rand_chacha = { workspace = true }
serde = { workspace = true, features = ["derive"] }
@@ -25,20 +25,18 @@ sha2 = { workspace = true }
si-scale = { workspace = true }
thiserror = { workspace = true }
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["macros"] }
time = { workspace = true }
tokio = { workspace = true, features = ["sync", "macros"] }
tracing = { workspace = true }
zeroize = { workspace = true }
# internal
nym-id = { path = "../nym-id" }
nym-bandwidth-controller = { path = "../bandwidth-controller" }
nym-config = { path = "../config" }
nym-crypto = { path = "../crypto" }
nym-gateway-client = { path = "../client-libs/gateway-client" }
nym-gateway-requests = { path = "../gateway-requests" }
nym-http-api-client = { path = "../http-api-client" }
nym-metrics = { path = "../nym-metrics" }
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
@@ -55,6 +53,7 @@ nym-client-core-config-types = { path = "./config-types", features = [
nym-client-core-surb-storage = { path = "./surb-storage" }
nym-client-core-gateways-storage = { path = "./gateways-storage" }
nym-ecash-time = { path = "../ecash-time" }
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
nym-mixnet-client = { path = "../client-libs/mixnet-client", default-features = false }
@@ -126,3 +125,6 @@ fs-surb-storage = ["nym-client-core-surb-storage/fs-surb-storage"]
fs-gateways-storage = ["nym-client-core-gateways-storage/fs-gateways-storage"]
wasm = ["nym-gateway-client/wasm"]
metrics-server = []
[lints]
workspace = true
+8 -12
View File
@@ -57,9 +57,7 @@ const DEFAULT_MAXIMUM_ALLOWED_SURB_REQUEST_SIZE: u32 = 500;
const DEFAULT_MAXIMUM_REPLY_SURB_REREQUEST_WAITING_PERIOD: Duration = Duration::from_secs(10);
const DEFAULT_MAXIMUM_REPLY_SURB_DROP_WAITING_PERIOD: Duration = Duration::from_secs(5 * 60);
// 12 hours
const DEFAULT_MAXIMUM_REPLY_SURB_AGE: Duration = Duration::from_secs(12 * 60 * 60);
const DEFAULT_MAXIMUM_REPLY_SURB_REREQUESTS: usize = 5;
// 24 hours
const DEFAULT_MAXIMUM_REPLY_KEY_AGE: Duration = Duration::from_secs(24 * 60 * 60);
@@ -418,6 +416,9 @@ pub struct Traffic {
/// will be routed as usual, to the entry gateway, through three mix nodes, egressing
/// through the exit gateway. If mix hops are disabled, traffic will be routed directly
/// from the entry gateway to the exit gateway, bypassing the mix nodes.
///
/// This overrides the `use_legacy_sphinx_format` setting as reduced mix hops
/// requires use of the updated SURB packet format.
pub disable_mix_hops: bool,
}
@@ -625,10 +626,9 @@ pub struct ReplySurbs {
#[serde(with = "humantime_serde")]
pub maximum_reply_surb_drop_waiting_period: Duration,
/// Defines maximum amount of time given reply surb is going to be valid for.
/// This is going to be superseded by key rotation once implemented.
#[serde(with = "humantime_serde")]
pub maximum_reply_surb_age: Duration,
/// Defines maximum number of times the client is going to re-request reply surbs
/// for clearing pending messages before giving up after making no progress.
pub maximum_reply_surbs_rerequests: usize,
/// Defines maximum amount of time given reply key is going to be valid for.
/// This is going to be superseded by key rotation once implemented.
@@ -638,9 +638,6 @@ pub struct ReplySurbs {
/// Specifies the number of mixnet hops the packet should go through. If not specified, then
/// the default value is used.
pub surb_mix_hops: Option<u8>,
/// Specifies if we should reset all the sender tags on startup
pub fresh_sender_tags: bool,
}
impl Default for ReplySurbs {
@@ -655,10 +652,9 @@ impl Default for ReplySurbs {
maximum_reply_surb_rerequest_waiting_period:
DEFAULT_MAXIMUM_REPLY_SURB_REREQUEST_WAITING_PERIOD,
maximum_reply_surb_drop_waiting_period: DEFAULT_MAXIMUM_REPLY_SURB_DROP_WAITING_PERIOD,
maximum_reply_surb_age: DEFAULT_MAXIMUM_REPLY_SURB_AGE,
maximum_reply_surbs_rerequests: DEFAULT_MAXIMUM_REPLY_SURB_REREQUESTS,
maximum_reply_key_age: DEFAULT_MAXIMUM_REPLY_KEY_AGE,
surb_mix_hops: None,
fresh_sender_tags: false,
}
}
}
@@ -189,14 +189,13 @@ impl From<ConfigV6> for Config {
.debug
.reply_surbs
.maximum_reply_surb_drop_waiting_period,
maximum_reply_surb_age: value.debug.reply_surbs.maximum_reply_surb_age,
maximum_reply_key_age: value.debug.reply_surbs.maximum_reply_key_age,
surb_mix_hops: value.debug.reply_surbs.surb_mix_hops,
minimum_reply_surb_threshold_buffer: value
.debug
.reply_surbs
.minimum_reply_surb_threshold_buffer,
fresh_sender_tags: value.debug.reply_surbs.fresh_sender_tags,
..Default::default()
},
stats_reporting: StatsReporting {
enabled: value.debug.stats_reporting.enabled,
@@ -3,17 +3,18 @@ name = "nym-client-core-gateways-storage"
version = "0.1.0"
edition = "2021"
license.workspace = true
rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait.workspace = true
cosmrs.workspace = true
log.workspace = true
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
time.workspace = true
tokio = { workspace = true, features = ["sync"] }
tracing.workspace = true
url.workspace = true
zeroize = { workspace = true, features = ["zeroize_derive"] }
@@ -26,6 +27,7 @@ features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"]
optional = true
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
+13 -4
View File
@@ -2,23 +2,30 @@
// SPDX-License-Identifier: Apache-2.0
#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
#[cfg(feature = "fs-gateways-storage")]
{
use anyhow::Context;
use sqlx::{Connection, SqliteConnection};
use std::env;
let out_dir = env::var("OUT_DIR").unwrap();
let out_dir = env::var("OUT_DIR")?;
let database_path = format!("{out_dir}/gateways-storage-example.sqlite");
// remove the db file if it already existed from previous build
// in case it was from a different branch
if std::fs::exists(&database_path)? {
std::fs::remove_file(&database_path)?;
}
let mut conn = SqliteConnection::connect(&format!("sqlite://{database_path}?mode=rwc"))
.await
.expect("Failed to create SQLx database connection");
.context("Failed to create SQLx database connection")?;
sqlx::migrate!("./fs_gateways_migrations")
.run(&mut conn)
.await
.expect("Failed to perform SQLx migrations");
.context("Failed to perform SQLx migrations")?;
#[cfg(target_family = "unix")]
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);
@@ -28,4 +35,6 @@ async fn main() {
// not a valid windows path... but hey, it works...
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
}
Ok(())
}
@@ -7,12 +7,12 @@ use crate::{
RawActiveGateway, RawCustomGatewayDetails, RawRegisteredGateway, RawRemoteGatewayDetails,
},
};
use log::{debug, error};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use tracing::{debug, error};
#[derive(Debug, Clone)]
pub struct StorageManager {
@@ -12,12 +12,12 @@ use crate::{
error::ClientCoreError,
init::types::{GatewaySelectionSpecification, GatewaySetup},
};
use log::info;
use nym_client_core_gateways_storage::GatewayDetails;
use nym_crypto::asymmetric::ed25519;
use nym_topology::NymTopology;
use nym_validator_client::UserAgent;
use std::path::PathBuf;
use tracing::info;
#[cfg_attr(feature = "cli", derive(clap::Args))]
#[derive(Debug, Clone)]
@@ -81,14 +81,14 @@ where
// Attempt to use a user-provided gateway, if possible
let user_chosen_gateway_id = common_args.gateway_id;
log::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
tracing::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
let selection_spec = GatewaySelectionSpecification::new(
user_chosen_gateway_id.map(|id| id.to_base58_string()),
Some(common_args.latency_based_selection),
common_args.force_tls_gateway,
);
log::debug!("Gateway selection specification: {selection_spec:?}");
tracing::debug!("Gateway selection specification: {selection_spec:?}");
let registered_gateways = get_all_registered_identities(&details_store).await?;
@@ -58,6 +58,7 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.signatures_path.unwrap())?
}
};
@@ -64,6 +64,7 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.credential_path.unwrap())?
}
};
@@ -58,6 +58,7 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.signatures_path.unwrap())?
}
};
@@ -58,6 +58,7 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.key_path.unwrap())?
}
};
@@ -12,7 +12,6 @@ use crate::{
},
init::types::{GatewaySelectionSpecification, GatewaySetup, InitResults},
};
use log::info;
use nym_client_core_gateways_storage::GatewayDetails;
use nym_crypto::asymmetric::ed25519;
use nym_sphinx::addressing::Recipient;
@@ -20,6 +19,7 @@ use nym_topology::NymTopology;
use nym_validator_client::UserAgent;
use rand::rngs::OsRng;
use std::path::PathBuf;
use tracing::info;
// we can suppress this warning (as suggested by linter itself) since we're only using it in our own code
#[allow(async_fn_in_trait)]
@@ -130,23 +130,23 @@ where
// Attempt to use a user-provided gateway, if possible
let user_chosen_gateway_id = common_args.gateway;
log::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
tracing::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
let selection_spec = GatewaySelectionSpecification::new(
user_chosen_gateway_id.map(|id| id.to_base58_string()),
Some(common_args.latency_based_selection),
common_args.force_tls_gateway,
);
log::debug!("Gateway selection specification: {selection_spec:?}");
tracing::debug!("Gateway selection specification: {selection_spec:?}");
// Load and potentially override config
log::debug!("Init arguments: {init_args:#?}");
tracing::debug!("Init arguments: {init_args:#?}");
let config = C::construct_config(&init_args);
log::debug!("Constructed config: {config:#?}");
tracing::debug!("Constructed config: {config:#?}");
let paths = config.common_paths();
let core = config.core_config();
log::info!(
tracing::info!(
"Using nym-api: {}",
core.client
.nym_api_urls
@@ -18,6 +18,7 @@ use crate::client::received_buffer::{
ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
};
use crate::client::replies::reply_controller;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyControllerSender};
use crate::client::replies::reply_storage::{
CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys,
@@ -34,7 +35,6 @@ use crate::init::{
};
use crate::{config, spawn_future};
use futures::channel::mpsc;
use log::*;
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_config_types::{ForgetMe, RememberMe};
use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
@@ -56,13 +56,18 @@ use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender,
use nym_task::{TaskClient, TaskHandle};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use rand::prelude::SliceRandom;
use rand::rngs::OsRng;
use rand::thread_rng;
use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use std::path::Path;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use tracing::*;
use url::Url;
#[cfg(all(
@@ -130,9 +135,11 @@ pub enum ClientInputStatus {
}
impl ClientInputStatus {
#[allow(clippy::panic)]
pub fn register_producer(&mut self) -> ClientInput {
match std::mem::replace(self, ClientInputStatus::Connected) {
ClientInputStatus::AwaitingProducer { client_input } => client_input,
// critical failure implying misuse of software
ClientInputStatus::Connected => panic!("producer was already registered before"),
}
}
@@ -144,9 +151,11 @@ pub enum ClientOutputStatus {
}
impl ClientOutputStatus {
#[allow(clippy::panic)]
pub fn register_consumer(&mut self) -> ClientOutput {
match std::mem::replace(self, ClientOutputStatus::Connected) {
ClientOutputStatus::AwaitingConsumer { client_output } => client_output,
// critical failure implying misuse of software
ClientOutputStatus::Connected => panic!("consumer was already registered before"),
}
}
@@ -338,6 +347,7 @@ where
#[allow(clippy::too_many_arguments)]
fn start_real_traffic_controller(
controller_config: real_messages_control::Config,
key_rotation_config: KeyRotationConfig,
topology_accessor: TopologyAccessor,
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
@@ -355,6 +365,7 @@ where
RealMessagesController::new(
controller_config,
key_rotation_config,
ack_receiver,
input_receiver,
mix_sender,
@@ -453,7 +464,7 @@ where
};
let gateway_failure = |err| {
log::error!("Could not authenticate and start up the gateway connection - {err}");
tracing::error!("Could not authenticate and start up the gateway connection - {err}");
ClientCoreError::GatewayClientError {
gateway_id: details.gateway_id.to_base58_string(),
source: Box::new(err),
@@ -555,14 +566,14 @@ where
custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
config_topology: config::Topology,
nym_api_urls: Vec<Url>,
user_agent: Option<UserAgent>,
nym_api_client: nym_http_api_client::Client,
) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or_else(|| {
Box::new(NymApiTopologyProvider::new(
config_topology,
nym_api_urls,
user_agent,
nym_api_client,
))
})
}
@@ -598,7 +609,7 @@ where
topology_refresher.try_refresh().await;
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
log::error!(
tracing::error!(
"The current network topology seem to be insufficient to route any packets through \
- check if enough nodes and a gateway are online - source: {err}"
);
@@ -674,27 +685,40 @@ where
// TODO: rename it as it implies the data is persistent whilst one can use InMemBackend
async fn setup_persistent_reply_storage(
backend: S::ReplyStore,
key_rotation_config: KeyRotationConfig,
shutdown: TaskClient,
) -> Result<CombinedReplyStorage, ClientCoreError>
where
<S::ReplyStore as ReplyStorageBackend>::StorageError: Sync + Send,
S::ReplyStore: Send + Sync,
{
log::trace!("Setup persistent reply storage");
tracing::trace!("Setup persistent reply storage");
let now = OffsetDateTime::now_utc();
let expected_current_key_rotation_start =
key_rotation_config.expected_current_key_rotation_start(now);
// time of the start of one epoch BEFORE the CURRENT rotation has begun
// this indicates the starting time of when packets with the current keys might have been constructed
// (i.e. any surbs OLDER than that MUST BE invalid)
let prior_epoch_start =
expected_current_key_rotation_start - key_rotation_config.epoch_duration;
let persistent_storage = PersistentReplyStorage::new(backend);
let mem_store = persistent_storage
.load_state_from_backend()
.load_state_from_backend(prior_epoch_start)
.await
.map_err(|err| ClientCoreError::SurbStorageError {
source: Box::new(err),
})?;
let store_clone = mem_store.clone();
spawn_future(async move {
persistent_storage
.flush_on_shutdown(store_clone, shutdown)
.await
});
spawn_future!(
async move {
persistent_storage
.flush_on_shutdown(store_clone, shutdown)
.await
},
"PersistentReplyStorage::flush_on_shutdown"
);
Ok(mem_store)
}
@@ -715,7 +739,7 @@ where
let mut rng = OsRng;
let keys = if let Some(derivation_material) = derivation_material {
ClientKeys::from_master_key(&mut rng, &derivation_material)
.map_err(|_| ClientCoreError::HkdfDerivationError {})?
.map_err(|_| ClientCoreError::HkdfDerivationError)?
} else {
ClientKeys::generate_new(&mut rng)
};
@@ -725,6 +749,33 @@ where
setup_gateway(setup_method, key_store, details_store).await
}
fn construct_nym_api_client(
config: &Config,
user_agent: Option<UserAgent>,
) -> Result<nym_http_api_client::Client, ClientCoreError> {
let mut nym_api_urls = config.get_nym_api_endpoints();
nym_api_urls.shuffle(&mut thread_rng());
let mut builder = nym_http_api_client::Client::builder(nym_api_urls[0].clone())
.map_err(|source| ClientCoreError::NymApiQueryFailure { source })?;
if let Some(user_agent) = user_agent {
builder = builder.with_user_agent(user_agent);
}
builder = builder.with_bincode();
builder
.build()
.map_err(|source| ClientCoreError::NymApiQueryFailure { source })
}
async fn determine_key_rotation_state(
client: &nym_http_api_client::Client,
) -> Result<KeyRotationConfig, ClientCoreError> {
Ok(client.get_key_rotation_info().await?.into())
}
pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
where
S::ReplyStore: Send + Sync,
@@ -789,11 +840,14 @@ where
.dkg_query_client
.map(|client| BandwidthController::new(credential_store, client));
let nym_api_client = Self::construct_nym_api_client(&self.config, self.user_agent.clone())?;
let key_rotation_config = Self::determine_key_rotation_state(&nym_api_client).await?;
let topology_provider = Self::setup_topology_provider(
self.custom_topology_provider.take(),
self.config.debug.topology,
self.config.get_nym_api_endpoints(),
self.user_agent.clone(),
nym_api_client,
);
let stats_reporter = Self::start_statistics_control(
@@ -838,6 +892,7 @@ where
let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
shutdown.fork("persistent_reply_storage"),
)
.await?;
@@ -878,6 +933,7 @@ where
Self::start_real_traffic_controller(
controller_config,
key_rotation_config,
shared_topology_accessor.clone(),
ack_receiver,
input_receiver,
@@ -7,13 +7,13 @@ use crate::{
config::Config,
error::ClientCoreError,
};
use log::{error, info, trace};
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
use std::{io, path::Path};
use time::OffsetDateTime;
use tracing::{error, info, trace};
use url::Url;
async fn setup_fresh_backend<P: AsRef<Path>>(
@@ -90,7 +90,7 @@ pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
let db_path = db_path.as_ref();
if db_path.exists() {
info!("Loading existing surb database");
match fs_backend::Backend::try_load(db_path, surb_config.fresh_sender_tags).await {
match fs_backend::Backend::try_load(db_path).await {
Ok(backend) => Ok(backend),
Err(err) => {
error!("setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
@@ -114,41 +114,32 @@ pub async fn setup_fs_gateways_storage<P: AsRef<Path>>(
})
}
pub fn create_bandwidth_controller<St: CredentialStorage>(
config: &Config,
storage: St,
) -> BandwidthController<QueryHttpRpcNyxdClient, St> {
let nyxd_url = config
.get_validator_endpoints()
.pop()
.expect("No nyxd validator endpoint provided");
create_bandwidth_controller_with_urls(nyxd_url, storage)
}
pub fn create_bandwidth_controller_with_urls<St: CredentialStorage>(
nyxd_url: Url,
storage: St,
) -> BandwidthController<QueryHttpRpcNyxdClient, St> {
let client = default_query_dkg_client(nyxd_url);
) -> Result<BandwidthController<QueryHttpRpcNyxdClient, St>, ClientCoreError> {
let client = default_query_dkg_client(nyxd_url)?;
BandwidthController::new(storage, client)
Ok(BandwidthController::new(storage, client))
}
pub fn default_query_dkg_client_from_config(config: &Config) -> QueryHttpRpcNyxdClient {
pub fn default_query_dkg_client_from_config(
config: &Config,
) -> Result<QueryHttpRpcNyxdClient, ClientCoreError> {
let nyxd_url = config
.get_validator_endpoints()
.pop()
.expect("No nyxd validator endpoint provided");
.ok_or(ClientCoreError::RpcClientMissingUrl)?;
default_query_dkg_client(nyxd_url)
}
pub fn default_query_dkg_client(nyxd_url: Url) -> QueryHttpRpcNyxdClient {
pub fn default_query_dkg_client(nyxd_url: Url) -> Result<QueryHttpRpcNyxdClient, ClientCoreError> {
let details = nym_network_defaults::NymNetworkDetails::new_from_env();
let client_config = nyxd::Config::try_from_nym_network_details(&details)
.expect("failed to construct validator client config");
.map_err(|source| ClientCoreError::InvalidNetworkDetails { source })?;
// overwrite env configuration with config URLs
QueryHttpRpcNyxdClient::connect(client_config, nyxd_url.as_str())
.expect("Could not construct query client")
.map_err(|source| ClientCoreError::RpcClientCreationFailure { source })
}
@@ -6,7 +6,6 @@ use crate::client::topology_control::TopologyAccessor;
use crate::{config, spawn_future};
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use log::*;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::cover::generate_loop_cover_packet;
@@ -19,6 +18,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::TrySendError;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
@@ -210,10 +210,10 @@ impl LoopCoverTrafficStream<OsRng> {
TrySendError::Full(_) => {
// This isn't a problem, if the channel is full means we're already sending the
// max amount of messages downstream can handle.
log::debug!("Failed to send cover message - channel full");
tracing::debug!("Failed to send cover message - channel full");
}
TrySendError::Closed(_) => {
log::warn!("Failed to send cover message - channel closed");
tracing::warn!("Failed to send cover message - channel closed");
}
}
} else {
@@ -235,6 +235,7 @@ impl LoopCoverTrafficStream<OsRng> {
tokio::task::yield_now().await;
}
#[allow(clippy::panic)]
pub fn start(mut self) {
if self.cover_traffic.disable_loop_cover_traffic_stream {
// we should have never got here in the first place - the task should have never been created to begin with
@@ -251,27 +252,30 @@ impl LoopCoverTrafficStream<OsRng> {
let mut shutdown = self.task_client.fork("select");
spawn_future(async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
spawn_future!(
async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("LoopCoverTrafficStream: Received shutdown");
}
next = self.next() => {
if next.is_some() {
self.on_new_message().await;
} else {
log::trace!("LoopCoverTrafficStream: Stopping since channel closed");
break;
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
tracing::trace!("LoopCoverTrafficStream: Received shutdown");
}
next = self.next() => {
if next.is_some() {
self.on_new_message().await;
} else {
tracing::trace!("LoopCoverTrafficStream: Stopping since channel closed");
break;
}
}
}
}
}
shutdown.recv_timeout().await;
log::debug!("LoopCoverTrafficStream: Exiting");
})
shutdown.recv_timeout().await;
tracing::debug!("LoopCoverTrafficStream: Exiting");
},
"LoopCoverTrafficStream"
)
}
}
@@ -135,7 +135,9 @@ impl InputMessage {
recipient_tag,
data,
lane,
max_retransmissions: None,
// \/ set it to SOME sane default so that if we run out of surbs and constantly
// fail to request more, we wouldn't be stuck in limbo
max_retransmissions: Some(10),
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -4,10 +4,10 @@
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::error::ClientCoreError;
use crate::spawn_future;
use log::*;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use tracing::*;
use transceiver::ErasedGatewayError;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
@@ -96,72 +96,93 @@ impl MixTrafficController {
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
debug_assert!(!mix_packets.is_empty());
let result = if mix_packets.len() == 1 {
let send_future = if mix_packets.len() == 1 {
// SAFETY: we just checked we have one packet
#[allow(clippy::unwrap_used)]
let mix_packet = mix_packets.pop().unwrap();
self.gateway_transceiver.send_mix_packet(mix_packet).await
self.gateway_transceiver.send_mix_packet(mix_packet)
} else {
self.gateway_transceiver
.batch_send_mix_packets(mix_packets)
.await
self.gateway_transceiver.batch_send_mix_packets(mix_packets)
};
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling messages");
Ok(())
}
result = send_future => {
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
result
result
}
}
}
async fn on_client_request(&mut self, client_request: ClientRequest) {
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling client request");
}
result = self.gateway_transceiver.send_client_request(client_request) => {
if let Err(err) = result {
error!("Failed to send client request: {err}")
}
}
}
}
pub fn start(mut self) {
spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");
while !self.task_client.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
},
None => {
log::trace!("MixTrafficController: Stopping since channel closed");
spawn_future!(
async move {
debug!("Started MixTrafficController with graceful shutdown support");
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
break;
}
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
match self.gateway_transceiver.send_client_request(client_request).await {
Ok(_) => (),
Err(e) => error!("Failed to send client request: {}", e),
};
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
},
None => {
tracing::trace!("MixTrafficController: Stopping since channel closed");
break;
}
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
self.on_client_request(client_request).await;
},
None => {
tracing::trace!("MixTrafficController, client request channel closed");
}
},
None => {
log::trace!("MixTrafficController, client request channel closed");
}
},
_ = self.task_client.recv() => {
log::trace!("MixTrafficController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;
log::debug!("MixTrafficController: Exiting");
});
self.task_client.recv_timeout().await;
tracing::debug!("MixTrafficController: Exiting");
},
"MixTrafficController"
);
}
}
@@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use log::{debug, error};
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::error::GatewayClientError;
@@ -14,6 +13,7 @@ use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use thiserror::Error;
use tracing::{debug, error};
#[cfg(not(target_arch = "wasm32"))]
use futures::channel::oneshot;
@@ -27,7 +27,7 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
ErasedGatewayError(Box::new(err))
}
/// This combines combines the functionalities of being able to send and receive mix packets.
/// This combines the functionalities of being able to send and receive mix packets.
#[async_trait]
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> ed25519::PublicKey;
@@ -87,7 +87,7 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
message: ClientRequest,
) -> Result<(), GatewayClientError> {
let _ = (**self).send_client_request(message.clone()).await?;
log::debug!("Sent client request: {:?}", message);
tracing::debug!("Sent client request: {:?}", message);
Ok(())
}
}
@@ -269,6 +269,8 @@ pub struct MockGateway {
}
impl Default for MockGateway {
// test code
#[allow(clippy::unwrap_used)]
fn default() -> Self {
MockGateway {
dummy_identity: "3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7"
@@ -5,7 +5,6 @@ use super::action_controller::{AckActionSender, Action};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use futures::StreamExt;
use log::*;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::{
acknowledgements::{identifier::recover_identifier, AckKey},
@@ -13,6 +12,7 @@ use nym_sphinx::{
};
use nym_task::TaskClient;
use std::sync::Arc;
use tracing::*;
/// Module responsible for listening for any data resembling acknowledgements from the network
/// and firing actions to remove them from the 'Pending' state.
@@ -65,7 +65,7 @@ impl AcknowledgementListener {
return;
}
trace!("Received {} from the mix network", frag_id);
trace!("Received {frag_id} from the mix network");
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
if let Err(err) = self
@@ -93,16 +93,16 @@ impl AcknowledgementListener {
acks = self.ack_receiver.next() => match acks {
Some(acks) => self.handle_ack_receiver_item(acks).await,
None => {
log::trace!("AcknowledgementListener: Stopping since channel closed");
tracing::trace!("AcknowledgementListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
log::trace!("AcknowledgementListener: Received shutdown");
tracing::trace!("AcknowledgementListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
log::debug!("AcknowledgementListener: Exiting");
tracing::debug!("AcknowledgementListener: Exiting");
}
}
@@ -5,7 +5,6 @@ use super::PendingAcknowledgement;
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::Delay as SphinxDelay;
@@ -13,6 +12,7 @@ use nym_task::TaskClient;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tracing::*;
pub(crate) type AckActionSender = mpsc::UnboundedSender<Action>;
pub(crate) type AckActionReceiver = mpsc::UnboundedReceiver<Action>;
@@ -126,7 +126,7 @@ impl ActionController {
fn handle_insert(&mut self, pending_acks: Vec<PendingAcknowledgement>) {
for pending_ack in pending_acks {
let frag_id = pending_ack.message_chunk.fragment_identifier();
trace!("{} is inserted", frag_id);
trace!("{frag_id} is inserted");
if self
.pending_acks_data
@@ -161,22 +161,16 @@ impl ActionController {
let new_queue_key = self.pending_acks_timers.insert(frag_id, timeout);
*queue_key = Some(new_queue_key)
} else {
debug!(
"Tried to START TIMER on pending ack that is already gone! - {}",
frag_id
);
debug!("Tried to START TIMER on pending ack that is already gone! - {frag_id}");
}
}
fn handle_remove(&mut self, frag_id: FragmentIdentifier) {
trace!("{} is getting removed", frag_id);
trace!("{frag_id} is getting removed");
match self.pending_acks_data.remove(&frag_id) {
None => {
debug!(
"Tried to REMOVE pending ack that is already gone! - {}",
frag_id
);
debug!("Tried to REMOVE pending ack that is already gone! - {frag_id}");
}
Some((_, queue_key)) => {
if let Some(queue_key) = queue_key {
@@ -188,10 +182,7 @@ impl ActionController {
} else {
// I'm not 100% sure if having a `None` key is even possible here
// (REMOVE would have to be called before START TIMER),
debug!(
"Tried to REMOVE pending ack without TIMER active - {}",
frag_id
);
debug!("Tried to REMOVE pending ack without TIMER active - {frag_id}");
}
}
}
@@ -200,27 +191,26 @@ impl ActionController {
// initiated basically as a first step of retransmission. At first data has its delay updated
// (as new sphinx packet was created with new expected delivery time)
fn handle_update_pending_ack(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
trace!("{} is updating its delay", frag_id);
trace!("{frag_id} is updating its delay");
// TODO: is it possible to solve this without either locking or temporarily removing the value?
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
// this Action is triggered by `RetransmissionRequestListener` (for 'normal' packets)
// SAFETY: this Action is triggered by `RetransmissionRequestListener` (for 'normal' packets)
// or `ReplyController` (for 'reply' packets) which held the other potential
// reference to this Arc. HOWEVER, before the Action was pushed onto the queue, the reference
// was dropped hence this unwrap is safe.
#[allow(clippy::unwrap_used)]
let mut inner_data = Arc::try_unwrap(pending_ack_data).unwrap();
inner_data.update_retransmitted(delay);
self.pending_acks_data
.insert(frag_id, (Arc::new(inner_data), queue_key));
} else {
debug!(
"Tried to UPDATE TIMER on pending ack that is already gone! - {}",
frag_id
);
debug!("Tried to UPDATE TIMER on pending ack that is already gone! - {frag_id}");
}
}
// note: when the entry expires it's automatically removed from pending_acks_timers
#[allow(clippy::panic)]
fn handle_expired_ack_timer(&mut self, expired_ack: Expired<FragmentIdentifier>) {
let frag_id = expired_ack.into_inner();
@@ -241,7 +231,7 @@ impl ActionController {
.unbounded_send(Arc::downgrade(pending_ack_data))
{
if !self.task_client.is_shutdown_poll() {
log::error!("Failed to send pending ack for retransmission: {err}");
tracing::error!("Failed to send pending ack for retransmission: {err}");
}
}
} else {
@@ -269,7 +259,7 @@ impl ActionController {
action = self.incoming_actions.next() => match action {
Some(action) => self.process_action(action),
None => {
log::trace!(
tracing::trace!(
"ActionController: Stopping since incoming actions channel closed"
);
break;
@@ -278,17 +268,17 @@ impl ActionController {
expired_ack = self.pending_acks_timers.next() => match expired_ack {
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack),
None => {
log::trace!("ActionController: Stopping since ack channel closed");
tracing::trace!("ActionController: Stopping since ack channel closed");
break;
}
},
_ = self.task_client.recv() => {
log::trace!("ActionController: Received shutdown");
tracing::trace!("ActionController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;
log::debug!("ActionController: Exiting");
tracing::debug!("ActionController: Exiting");
}
}
@@ -5,7 +5,6 @@ use crate::client::inbound_messages::{InputMessage, InputMessageReceiver};
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
use crate::client::replies::reply_controller::ReplyControllerSender;
use log::*;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
@@ -13,6 +12,7 @@ use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use tracing::*;
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
/// putting everything into sphinx packets, etc.
@@ -120,6 +120,7 @@ where
}
}
#[allow(clippy::panic)]
async fn on_input_message(&mut self, msg: InputMessage) {
match msg {
InputMessage::Regular {
@@ -213,7 +214,9 @@ where
self.handle_premade_packets(msgs, lane).await
}
// MessageWrappers can't be nested
InputMessage::MessageWrapper { .. } => unimplemented!(),
InputMessage::MessageWrapper { .. } => {
panic!("attempted to use nested MessageWrapper")
}
},
};
}
@@ -223,21 +226,24 @@ where
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("InputMessageListener: Received shutdown");
break;
}
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
self.on_input_message(input_msg).await;
},
None => {
log::trace!("InputMessageListener: Stopping since channel closed");
tracing::trace!("InputMessageListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
log::trace!("InputMessageListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
log::debug!("InputMessageListener: Exiting");
tracing::debug!("InputMessageListener: Exiting");
}
}
@@ -13,7 +13,6 @@ use crate::client::replies::reply_controller::ReplyControllerSender;
use crate::spawn_future;
use action_controller::AckActionReceiver;
use futures::channel::mpsc;
use log::*;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::params::{PacketSize, PacketType};
@@ -30,6 +29,7 @@ use std::{
sync::{Arc, Weak},
time::Duration,
};
use tracing::*;
pub(crate) use action_controller::{AckActionSender, Action};
@@ -298,29 +298,44 @@ where
let mut sent_notification_listener = self.sent_notification_listener;
let mut action_controller = self.action_controller;
spawn_future(async move {
acknowledgement_listener.run().await;
debug!("The acknowledgement listener has finished execution!");
});
spawn_future!(
async move {
acknowledgement_listener.run().await;
debug!("The acknowledgement listener has finished execution!");
},
"AcknowledgementController::AcknowledgementListener"
);
spawn_future(async move {
input_message_listener.run().await;
debug!("The input listener has finished execution!");
});
spawn_future!(
async move {
input_message_listener.run().await;
debug!("The input listener has finished execution!");
},
"AcknowledgementController::InputMessageListener"
);
spawn_future(async move {
retransmission_request_listener.run(packet_type).await;
debug!("The retransmission request listener has finished execution!");
});
spawn_future!(
async move {
retransmission_request_listener.run(packet_type).await;
debug!("The retransmission request listener has finished execution!");
},
"AcknowledgementController::RetransmissionRequestListener"
);
spawn_future(async move {
sent_notification_listener.run().await;
debug!("The sent notification listener has finished execution!");
});
spawn_future!(
async move {
sent_notification_listener.run().await;
debug!("The sent notification listener has finished execution!");
},
"AcknowledgementController::SentNotificationListener"
);
spawn_future(async move {
action_controller.run().await;
debug!("The controller has finished execution!");
});
spawn_future!(
async move {
action_controller.run().await;
debug!("The controller has finished execution!");
},
"AcknowledgementController::ActionController"
);
}
}
@@ -10,13 +10,13 @@ use crate::client::real_messages_control::message_handler::{MessageHandler, Prep
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
use crate::client::replies::reply_controller::ReplyControllerSender;
use futures::StreamExt;
use log::*;
use nym_sphinx::chunking::fragment::Fragment;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
use nym_task::{connections::TransmissionLane, TaskClient};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
use tracing::*;
// responsible for packet retransmission upon fired timer
pub(super) struct RetransmissionRequestListener<R> {
@@ -179,19 +179,22 @@ where
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
break;
}
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
None => {
log::trace!("RetransmissionRequestListener: Stopping since channel closed");
tracing::trace!("RetransmissionRequestListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
log::trace!("RetransmissionRequestListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
log::debug!("RetransmissionRequestListener: Exiting");
tracing::debug!("RetransmissionRequestListener: Exiting");
}
}
@@ -4,9 +4,9 @@
use super::action_controller::{AckActionSender, Action};
use super::SentPacketNotificationReceiver;
use futures::StreamExt;
use log::*;
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use nym_task::TaskClient;
use tracing::*;
/// Module responsible for starting up retransmission timers.
/// It is required because when we send our packet to the `real traffic stream` controlled
@@ -56,17 +56,17 @@ impl SentNotificationListener {
self.on_sent_message(frag_id).await;
}
None => {
log::trace!("SentNotificationListener: Stopping since channel closed");
tracing::trace!("SentNotificationListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
log::trace!("SentNotificationListener: Received shutdown");
tracing::trace!("SentNotificationListener: Received shutdown");
break;
}
}
}
assert!(self.task_client.is_shutdown_poll());
log::debug!("SentNotificationListener: Exiting");
tracing::debug!("SentNotificationListener: Exiting");
}
}
@@ -9,6 +9,7 @@ use crate::client::real_messages_control::{AckActionSender, Action};
use crate::client::replies::reply_controller::MaxRetransmissions;
use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags};
use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit};
use nym_client_core_surb_storage::RetrievedReplySurb;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessage, ReplyMessage};
@@ -34,6 +35,9 @@ pub enum PreparationError {
#[error(transparent)]
NymTopologyError(#[from] NymTopologyError),
#[error("message wasn't split into any fragments!")]
EmptyFragments,
#[error("message too long for a single SURB, splitting into {fragments} fragments.")]
MessageTooLongForSingleSurb { fragments: usize },
@@ -44,10 +48,7 @@ pub enum PreparationError {
}
impl PreparationError {
fn return_surbs(
self,
returned_surbs: Vec<ReplySurbWithKeyRotation>,
) -> SurbWrappedPreparationError {
fn return_surbs(self, returned_surbs: Vec<RetrievedReplySurb>) -> SurbWrappedPreparationError {
SurbWrappedPreparationError {
source: self,
returned_surbs: Some(returned_surbs),
@@ -61,7 +62,7 @@ pub struct SurbWrappedPreparationError {
#[source]
source: PreparationError,
returned_surbs: Option<Vec<ReplySurbWithKeyRotation>>,
returned_surbs: Option<Vec<RetrievedReplySurb>>,
}
impl<T> From<T> for SurbWrappedPreparationError
@@ -83,7 +84,7 @@ impl SurbWrappedPreparationError {
target: &AnonymousSenderTag,
) -> PreparationError {
if let Some(reply_surbs) = self.returned_surbs {
surb_storage.insert_surbs(target, reply_surbs)
surb_storage.re_insert_reply_surbs(target, reply_surbs)
}
self.source
}
@@ -105,6 +106,9 @@ pub(crate) struct Config {
/// will be routed as usual, to the entry gateway, through three mix nodes, egressing
/// through the exit gateway. If mix hops are disabled, traffic will be routed directly
/// from the entry gateway to the exit gateway, bypassing the mix nodes.
///
/// This overrides the `use_legacy_sphinx_format` setting as reduced mix hops
/// requires use of the updated SURB packet format.
disable_mix_hops: bool,
/// Average delay a data packet is going to get delay at a single mixnode.
@@ -159,8 +163,12 @@ impl Config {
}
/// Configure whether messages senders using this config should use mix hops or not when sending messages.
///
/// This overrides the `use_legacy_sphinx_format` setting as disabled mix hops
/// requires use of the updated SURB packet format.
pub fn disable_mix_hops(mut self, disable_mix_hops: bool) -> Self {
self.disable_mix_hops = disable_mix_hops;
self.use_legacy_sphinx_format = false;
self
}
}
@@ -224,6 +232,10 @@ where
}
}
pub(crate) fn topology_access_handle(&self) -> &TopologyAccessor {
&self.topology_access
}
fn get_or_create_sender_tag(&mut self, recipient: &Recipient) -> AnonymousSenderTag {
if let Some(existing) = self.tag_storage.try_get_existing(recipient) {
trace!("we already had sender tag for {recipient}");
@@ -291,7 +303,7 @@ where
&mut self,
target: AnonymousSenderTag,
message: ReplyMessage,
reply_surb: ReplySurbWithKeyRotation,
reply_surb: RetrievedReplySurb,
is_extra_surb_request: bool,
) -> Result<(), SurbWrappedPreparationError> {
let msg = NymMessage::new_reply(message);
@@ -311,6 +323,16 @@ where
});
}
if fragment.is_empty() {
error!("CRITICAL FAILURE: our split message didn't result in any sendable fragments");
return Err(SurbWrappedPreparationError {
source: PreparationError::EmptyFragments,
returned_surbs: Some(vec![reply_surb]),
});
}
// SAFETY: we just checked we have one fragment
#[allow(clippy::unwrap_used)]
let chunk = fragment.pop().unwrap();
let chunk_clone = chunk.clone();
let prepared_fragment = self
@@ -322,7 +344,10 @@ where
Some(chunk.fragment_identifier()),
);
let delay = prepared_fragment.total_delay;
let max_retransmissions = None;
// we have to set a maximum number of retransmissions in case we fail to retrieve
// surbs for a long period of time; we don't want to be stuck constantly resending the data
let max_retransmissions = Some(10);
let pending_ack = PendingAcknowledgement::new_anonymous(
chunk,
delay,
@@ -345,7 +370,7 @@ where
pub(crate) async fn try_request_additional_reply_surbs(
&mut self,
from: AnonymousSenderTag,
reply_surb: ReplySurbWithKeyRotation,
reply_surb: RetrievedReplySurb,
amount: u32,
) -> Result<(), SurbWrappedPreparationError> {
debug!("requesting {amount} reply SURBs from {from}");
@@ -385,11 +410,9 @@ where
&mut self,
target: AnonymousSenderTag,
fragments: Vec<FragmentWithMaxRetransmissions>,
reply_surbs: Vec<ReplySurbWithKeyRotation>,
reply_surbs: impl IntoIterator<Item = RetrievedReplySurb>,
lane: TransmissionLane,
) -> Result<(), SurbWrappedPreparationError> {
// TODO: technically this is performing an unnecessary cloning, but in the grand scheme of things
// is it really that bad?
self.try_send_reply_chunks(
target,
fragments.into_iter().map(|f| (lane, f)).collect(),
@@ -402,7 +425,7 @@ where
&mut self,
target: AnonymousSenderTag,
fragments: Vec<(TransmissionLane, FragmentWithMaxRetransmissions)>,
reply_surbs: Vec<ReplySurbWithKeyRotation>,
reply_surbs: impl IntoIterator<Item = RetrievedReplySurb>,
) -> Result<(), SurbWrappedPreparationError> {
let prepared_fragments = self
.prepare_reply_chunks_for_sending(
@@ -525,6 +548,7 @@ where
pending_acks.push(pending_ack);
}
drop(topology_permit);
self.insert_pending_acks(pending_acks);
self.forward_messages(real_messages, lane).await;
@@ -564,7 +588,7 @@ where
)
.await?;
log::trace!("storing {} reply keys", reply_keys.len());
tracing::trace!("storing {} reply keys", reply_keys.len());
self.reply_key_storage.insert_multiple(reply_keys);
Ok(())
@@ -604,7 +628,7 @@ where
)
.await?;
log::trace!("storing {} reply keys", reply_keys.len());
tracing::trace!("storing {} reply keys", reply_keys.len());
self.reply_key_storage.insert_multiple(reply_keys);
Ok(())
@@ -634,20 +658,12 @@ where
pub(crate) async fn prepare_reply_chunks_for_sending(
&mut self,
fragments: Vec<Fragment>,
reply_surbs: Vec<ReplySurbWithKeyRotation>,
reply_surbs: impl IntoIterator<Item = RetrievedReplySurb>,
) -> Result<Vec<PreparedFragment>, SurbWrappedPreparationError> {
debug_assert_eq!(
fragments.len(),
reply_surbs.len(),
"attempted to send {} fragments with {} reply surbs",
fragments.len(),
reply_surbs.len()
);
let topology_permit = self.topology_access.get_read_permit().await;
let topology = match self.get_topology(&topology_permit) {
Ok(topology) => topology,
Err(err) => return Err(err.return_surbs(reply_surbs)),
Err(err) => return Err(err.return_surbs(reply_surbs.into_iter().collect())),
};
Ok(fragments
@@ -655,12 +671,13 @@ where
.zip(reply_surbs.into_iter())
.map(|(fragment, reply_surb)| {
// unwrap here is fine as we know we have a valid topology
#[allow(clippy::unwrap_used)]
self.message_preparer
.prepare_reply_chunk_for_sending(
fragment,
topology,
&self.config.ack_key,
reply_surb,
reply_surb.into(),
PacketType::Mix,
)
.unwrap()
@@ -670,7 +687,7 @@ where
pub(crate) async fn try_prepare_single_reply_chunk_for_sending(
&mut self,
reply_surb: ReplySurbWithKeyRotation,
reply_surb: RetrievedReplySurb,
chunk: Fragment,
) -> Result<PreparedFragment, SurbWrappedPreparationError> {
let topology_permit = self.topology_access.get_read_permit().await;
@@ -683,7 +700,7 @@ where
chunk,
topology,
&self.config.ack_key,
reply_surb,
reply_surb.into(),
PacketType::Mix,
)?;
@@ -714,17 +731,21 @@ where
// tells real message sender (with the poisson timer) to send this to the mix network
pub(crate) async fn forward_messages(
&self,
&mut self,
messages: Vec<RealMessage>,
transmission_lane: TransmissionLane,
) {
if let Err(err) = self
.real_message_sender
.send((messages, transmission_lane))
.await
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to forward messages to the real message sender: {err}");
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while attempting to forward mixnet messages");
}
sending_res = self.real_message_sender.send((messages, transmission_lane)) => {
if sending_res.is_err() {
error!(
"failed to forward mixnet messages due to closed channel (outside of shutdown!)"
);
}
}
}
}
@@ -24,7 +24,6 @@ use crate::{
spawn_future,
};
use futures::channel::mpsc;
use log::*;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
@@ -34,7 +33,9 @@ use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_task::TaskClient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
use tracing::*;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
pub(crate) use acknowledgement_control::{AckActionSender, Action};
pub(crate) mod acknowledgement_control;
@@ -85,12 +86,6 @@ impl<'a> From<&'a Config> for real_traffic_stream::Config {
}
}
impl<'a> From<&'a Config> for reply_controller::Config {
fn from(cfg: &'a Config) -> Self {
reply_controller::Config::new(cfg.reply_surbs)
}
}
impl<'a> From<&'a Config> for message_handler::Config {
fn from(cfg: &'a Config) -> Self {
message_handler::Config::new(
@@ -139,6 +134,7 @@ impl RealMessagesController<OsRng> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
config: Config,
key_rotation_config: KeyRotationConfig,
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
@@ -169,7 +165,8 @@ impl RealMessagesController<OsRng> {
// create all configs for the components
let ack_control_config = (&config).into();
let out_queue_config = (&config).into();
let reply_controller_config = (&config).into();
let reply_controller_config =
reply_controller::Config::new(config.reply_surbs, key_rotation_config);
let message_handler_config = (&config).into();
// create the actual components
@@ -227,14 +224,20 @@ impl RealMessagesController<OsRng> {
let ack_control = self.ack_control;
let mut reply_control = self.reply_control;
spawn_future(async move {
out_queue_control.run().await;
debug!("The out queue controller has finished execution!");
});
spawn_future(async move {
reply_control.run().await;
debug!("The reply controller has finished execution!");
});
spawn_future!(
async move {
out_queue_control.run().await;
debug!("The out queue controller has finished execution!");
},
"RealMessagesController::OutQueueControl)"
);
spawn_future!(
async move {
reply_control.run().await;
debug!("The reply controller has finished execution!");
},
"RealMessagesController::ReplyController"
);
ack_control.start(packet_type);
}
@@ -9,7 +9,6 @@ use crate::client::transmission_buffer::TransmissionBuffer;
use crate::config;
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use log::*;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
@@ -27,6 +26,7 @@ use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
@@ -202,7 +202,7 @@ where
// well technically the message was not sent just yet, but now it's up to internal
// queues and client load rather than the required delay. So realistically we can treat
// whatever is about to happen as negligible additional delay.
trace!("{} is about to get sent to the mixnet", frag_id);
trace!("{frag_id} is about to get sent to the mixnet");
if let Err(err) = self.sent_notifier.unbounded_send(frag_id) {
error!("Failed to notify about sent message: {err}");
}
@@ -249,6 +249,8 @@ where
}
};
// SAFETY: our topology must be valid at this point
#[allow(clippy::expect_used)]
(
generate_loop_cover_packet(
&mut self.rng,
@@ -278,17 +280,33 @@ where
}
};
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
if !self.task_client.is_shutdown_poll() {
log::error!("Failed to send: {err}");
let sending_res = tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown signal while attempting to send mix message");
return
}
sending_res = self.mix_tx.send(vec![next_message]) => {
sending_res
}
};
match sending_res {
Err(_) => {
if !self.task_client.is_shutdown_poll() {
tracing::error!(
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
);
}
}
Ok(_) => {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}
} else {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}
// notify ack controller about sending our message only after we actually managed to push it
@@ -313,7 +331,7 @@ where
}
fn on_close_connection(&mut self, connection_id: ConnectionId) {
log::debug!("Removing lane for connection: {connection_id}");
tracing::debug!("Removing lane for connection: {connection_id}");
self.transmission_buffer
.remove(&TransmissionLane::ConnectionId(connection_id));
}
@@ -325,7 +343,7 @@ where
fn adjust_current_average_message_sending_delay(&mut self) {
let used_slots = self.mix_tx.max_capacity() - self.mix_tx.capacity();
log::trace!(
tracing::trace!(
"used_slots: {used_slots}, current_multiplier: {}",
self.sending_delay_controller.current_multiplier()
);
@@ -334,7 +352,7 @@ where
.sending_delay_controller
.is_backpressure_currently_detected(used_slots)
{
log::trace!("Backpressure detected");
tracing::trace!("Backpressure detected");
self.sending_delay_controller.record_backpressure_detected();
}
@@ -436,9 +454,11 @@ where
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some((real_messages, conn_id))) => {
log::trace!("handling real_messages: size: {}", real_messages.len());
tracing::trace!("handling real_messages: size: {}", real_messages.len());
self.transmission_buffer.store(&conn_id, real_messages);
// SAFETY: we just stored the message
#[allow(clippy::expect_used)]
let real_next = self.pop_next_message().expect("Just stored one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
@@ -483,10 +503,12 @@ where
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some((real_messages, conn_id))) => {
log::trace!("handling real_messages: size: {}", real_messages.len());
tracing::trace!("handling real_messages: size: {}", real_messages.len());
// First store what we got for the given connection id
self.transmission_buffer.store(&conn_id, real_messages);
// SAFETY: we just stored the message
#[allow(clippy::expect_used)]
let real_next = self.pop_next_message().expect("we just added one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
@@ -538,11 +560,11 @@ where
};
if packets > 1000 {
log::warn!("{status_str}");
tracing::warn!("{status_str}");
} else if packets > 0 {
log::info!("{status_str}");
tracing::info!("{status_str}");
} else {
log::debug!("{status_str}");
tracing::debug!("{status_str}");
}
// Send status message to whoever is listening (possibly UI)
@@ -566,7 +588,7 @@ where
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("OutQueueControl: Received shutdown");
tracing::trace!("OutQueueControl: Received shutdown");
break;
}
_ = status_timer.tick() => {
@@ -575,7 +597,7 @@ where
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
} else {
log::trace!("OutQueueControl: Stopping since channel closed");
tracing::trace!("OutQueueControl: Stopping since channel closed");
break;
}
}
@@ -589,18 +611,18 @@ where
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("OutQueueControl: Received shutdown");
tracing::trace!("OutQueueControl: Received shutdown");
}
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
} else {
log::trace!("OutQueueControl: Stopping since channel closed");
tracing::trace!("OutQueueControl: Stopping since channel closed");
break;
}
}
}
}
log::debug!("OutQueueControl: Exiting");
tracing::debug!("OutQueueControl: Exiting");
}
}
@@ -98,12 +98,12 @@ impl SendingDelayController {
self.current_multiplier =
(self.current_multiplier + 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
log::debug!(
tracing::debug!(
"Increasing sending delay multiplier to: {}",
self.current_multiplier
);
} else {
log::warn!("Trying to increase delay multipler higher than allowed");
tracing::warn!("Trying to increase delay multipler higher than allowed");
}
}
@@ -112,7 +112,7 @@ impl SendingDelayController {
self.current_multiplier =
(self.current_multiplier - 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
log::debug!(
tracing::debug!(
"Decreasing sending delay multiplier to: {}",
self.current_multiplier
);
@@ -164,11 +164,11 @@ impl SendingDelayController {
self.current_multiplier()
);
if self.current_multiplier() > 0 {
log::debug!("{}", status_str);
tracing::debug!("{status_str}");
} else if self.current_multiplier() > 1 {
log::info!("{}", status_str);
tracing::info!("{status_str}");
} else if self.current_multiplier() > 2 {
log::warn!("{}", status_str);
tracing::warn!("{status_str}");
}
self.time_when_logged_about_elevated_multiplier = now;
}
@@ -8,7 +8,6 @@ use crate::spawn_future;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::StreamExt;
use log::*;
use nym_crypto::asymmetric::x25519;
use nym_crypto::Digest;
use nym_gateway_client::MixnetMessageReceiver;
@@ -24,6 +23,7 @@ use nym_task::TaskClient;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::*;
// The interval at which we check for stale buffers
const STALE_BUFFER_CHECK_INTERVAL: Duration = Duration::from_secs(10);
@@ -198,6 +198,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
}
#[allow(clippy::panic)]
async fn disconnect_sender(&mut self) {
let mut guard = self.inner.lock().await;
if guard.message_sender.is_none() {
@@ -208,6 +209,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
guard.message_sender = None;
}
#[allow(clippy::panic)]
async fn connect_sender(&mut self, sender: ReconstructedMessagesSender) {
let mut guard = self.inner.lock().await;
if guard.message_sender.is_some() {
@@ -221,10 +223,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
let stored_messages = std::mem::take(&mut guard.messages);
if !stored_messages.is_empty() {
if let Err(err) = sender.unbounded_send(stored_messages) {
error!(
"The sender channel we just received is already invalidated - {:?}",
err
);
error!("The sender channel we just received is already invalidated - {err:?}");
// put the values back to the buffer
// the returned error has two fields: err: SendError and val: T,
// where val is the value that was failed to get sent;
@@ -310,13 +309,15 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
};
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
msg.sender_tag,
reply_surbs,
from_surb_request,
) {
if !self.task_client.is_shutdown_poll() {
error!("{err}");
if !reply_surbs.is_empty() {
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
msg.sender_tag,
reply_surbs,
from_surb_request,
) {
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
}
}
}
@@ -500,20 +501,20 @@ impl<R: MessageReceiver> RequestReceiver<R> {
tokio::select! {
biased;
_ = self.task_client.recv() => {
log::trace!("RequestReceiver: Received shutdown");
tracing::trace!("RequestReceiver: Received shutdown");
}
request = self.query_receiver.next() => {
if let Some(message) = request {
self.handle_message(message).await
} else {
log::trace!("RequestReceiver: Stopping since channel closed");
tracing::trace!("RequestReceiver: Stopping since channel closed");
break;
}
},
}
}
self.task_client.recv().await;
log::debug!("RequestReceiver: Exiting");
tracing::debug!("RequestReceiver: Exiting");
}
}
@@ -544,17 +545,17 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
if let Some(new_messages) = new_messages {
self.received_buffer.handle_new_received(new_messages).await?;
} else {
log::trace!("FragmentedMessageReceiver: Stopping since channel closed");
tracing::trace!("FragmentedMessageReceiver: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv_with_delay() => {
log::trace!("FragmentedMessageReceiver: Received shutdown");
tracing::trace!("FragmentedMessageReceiver: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
log::debug!("FragmentedMessageReceiver: Exiting");
tracing::debug!("FragmentedMessageReceiver: Exiting");
Ok(())
}
}
@@ -600,14 +601,20 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
let mut fragmented_message_receiver = self.fragmented_message_receiver;
let mut request_receiver = self.request_receiver;
spawn_future(async move {
match fragmented_message_receiver.run().await {
Ok(_) => {}
Err(e) => error!("{e}"),
}
});
spawn_future(async move {
request_receiver.run().await;
});
spawn_future!(
async move {
match fragmented_message_receiver.run().await {
Ok(_) => {}
Err(e) => error!("{e}"),
}
},
"ReceivedMessagesBufferController::FragmentedMessageReceiver"
);
spawn_future!(
async move {
request_receiver.run().await;
},
"ReceivedMessagesBufferController::RequestReceiver"
);
}
}
@@ -0,0 +1,169 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_topology::NymTopologyMetadata;
use nym_validator_client::models::{
EpochId, KeyRotationId, KeyRotationInfoResponse, KeyRotationState,
};
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Clone, Copy)]
pub(crate) enum SurbRefreshState {
WaitingForNextRotation { last_known: KeyRotationId },
ScheduledForNextInvocation,
}
#[derive(Clone, Copy)]
pub(crate) struct ReferenceEpoch {
pub(crate) absolute_epoch_id: EpochId,
pub(crate) start_time: OffsetDateTime,
}
#[derive(Clone, Copy)]
pub(crate) struct KeyRotationConfig {
pub(crate) epoch_duration: Duration,
pub(crate) rotation_state: KeyRotationState,
pub(crate) reference_epoch: ReferenceEpoch,
}
impl From<KeyRotationInfoResponse> for KeyRotationConfig {
fn from(value: KeyRotationInfoResponse) -> Self {
KeyRotationConfig {
epoch_duration: value.details.epoch_duration,
rotation_state: value.details.key_rotation_state,
reference_epoch: ReferenceEpoch {
absolute_epoch_id: value.details.current_absolute_epoch_id,
start_time: value.details.current_epoch_start,
},
}
}
}
impl KeyRotationConfig {
pub(crate) fn rotation_lifetime(&self) -> Duration {
(self.rotation_state.validity_epochs + 1) * self.epoch_duration
}
pub(crate) fn key_rotation_id(&self, current_absolute_epoch_id: EpochId) -> KeyRotationId {
self.rotation_state
.key_rotation_id(current_absolute_epoch_id)
}
// this is called with the assumption that now is always > reference epoch start
pub(crate) fn expected_current_epoch_id(&self, now: OffsetDateTime) -> EpochId {
let diff_secs = (now - self.reference_epoch.start_time).as_seconds_f64();
let epochs = (diff_secs / self.epoch_duration.as_secs_f64()).floor() as u32;
self.reference_epoch.absolute_epoch_id + epochs
}
fn initial_rotation_epoch_start(&self) -> OffsetDateTime {
let epochs_diff = self
.reference_epoch
.absolute_epoch_id
.saturating_sub(self.rotation_state.initial_epoch_id);
self.reference_epoch.start_time - epochs_diff * self.epoch_duration
}
pub(crate) fn key_rotation_start(&self, key_rotation_id: KeyRotationId) -> OffsetDateTime {
let rotation_duration = self.rotation_state.validity_epochs * self.epoch_duration;
let initial_start = self.initial_rotation_epoch_start();
// note: key rotation starts from 0
initial_start + rotation_duration * key_rotation_id
}
pub(crate) fn expected_current_key_rotation_id(&self, now: OffsetDateTime) -> KeyRotationId {
let expected_current_epoch = self.expected_current_epoch_id(now);
self.key_rotation_id(expected_current_epoch)
}
pub(crate) fn expected_current_key_rotation_start(
&self,
now: OffsetDateTime,
) -> OffsetDateTime {
let expected_current_key_rotation_id = self.expected_current_key_rotation_id(now);
self.key_rotation_start(expected_current_key_rotation_id)
}
pub(crate) fn epoch_stuck(&self, topology_metadata: NymTopologyMetadata) -> bool {
// add leeway of 2mins each direction since transition is not instantaneous
let lower_bound = topology_metadata.refreshed_at - Duration::from_secs(2);
let upper_bound = topology_metadata.refreshed_at + Duration::from_secs(2);
let expected_epoch_lower = self.expected_current_epoch_id(lower_bound);
let expected_epoch_upper = self.expected_current_epoch_id(upper_bound);
topology_metadata.absolute_epoch_id != expected_epoch_lower
&& topology_metadata.absolute_epoch_id != expected_epoch_upper
}
}
#[cfg(test)]
mod tests {
use super::*;
use time::macros::datetime;
fn mock_config() -> KeyRotationConfig {
KeyRotationConfig {
epoch_duration: Duration::from_secs(60 * 60),
rotation_state: KeyRotationState {
validity_epochs: 10,
initial_epoch_id: 80,
},
reference_epoch: ReferenceEpoch {
absolute_epoch_id: 100,
start_time: datetime!(2025-06-30 12:00:00+00:00),
},
}
}
#[test]
fn expected_current_key_rotation_start() {
// rot0: 80-89
// rot1: 90-99
// rot2: 100-109
// rot3: 110-119
// ... etc
let cfg = mock_config();
assert_eq!(
cfg.initial_rotation_epoch_start(),
datetime!(2025-06-29 16:00:00+00:00)
);
let fake_now = datetime!(2025-06-30 12:00:00+00:00);
assert_eq!(cfg.expected_current_epoch_id(fake_now), 100);
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 2);
assert_eq!(
cfg.expected_current_key_rotation_start(fake_now),
datetime!(2025-06-30 12:00:00+00:00)
);
let fake_now = datetime!(2025-06-30 12:30:00+00:00);
assert_eq!(cfg.expected_current_epoch_id(fake_now), 100);
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 2);
assert_eq!(
cfg.expected_current_key_rotation_start(fake_now),
datetime!(2025-06-30 12:00:00+00:00)
);
let fake_now = datetime!(2025-06-30 13:01:00+00:00);
assert_eq!(cfg.expected_current_epoch_id(fake_now), 101);
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 2);
assert_eq!(
cfg.expected_current_key_rotation_start(fake_now),
datetime!(2025-06-30 12:00:00+00:00)
);
let fake_now = datetime!(2025-06-30 22:02:00+00:00);
assert_eq!(cfg.expected_current_epoch_id(fake_now), 110);
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 3);
assert_eq!(
cfg.expected_current_key_rotation_start(fake_now),
datetime!(2025-06-30 22:00:00+00:00)
);
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,901 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
use crate::client::real_messages_control::message_handler::{
FragmentWithMaxRetransmissions, MessageHandler, PreparationError,
};
use crate::client::replies::reply_controller::key_rotation_helpers::SurbRefreshState;
use crate::client::replies::reply_controller::Config;
use crate::client::topology_control::TopologyAccessor;
use crate::client::transmission_buffer::TransmissionBuffer;
use futures::channel::oneshot;
use nym_client_core_surb_storage::{ReceivedReplySurb, ReceivedReplySurbsMap};
use nym_crypto::aes::cipher::crypto_common::rand_core::CryptoRng;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_task::connections::{ConnectionId, TransmissionLane};
use nym_topology::NymTopologyMetadata;
use rand::Rng;
use std::cmp::{max, min};
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::mem;
use std::sync::{Arc, Weak};
use time::OffsetDateTime;
use tracing::{debug, error, info, trace, warn};
struct SenderData {
current_clear_rerequest_counter: usize,
pending_replies: TransmissionBuffer<FragmentWithMaxRetransmissions>,
pending_retransmissions: BTreeMap<FragmentIdentifier, Weak<PendingAcknowledgement>>,
last_request_failure: OffsetDateTime,
}
impl Default for SenderData {
fn default() -> Self {
SenderData {
current_clear_rerequest_counter: 0,
pending_replies: Default::default(),
pending_retransmissions: Default::default(),
last_request_failure: OffsetDateTime::UNIX_EPOCH,
}
}
}
impl SenderData {
fn total_pending(&self) -> usize {
let pending_replies = self.pending_replies.total_size();
let pending_retransmissions = self.pending_retransmissions.len();
let total_pending = pending_retransmissions + pending_replies;
debug!("total queue size: {total_pending} = pending data {pending_replies} + pending retransmission {pending_retransmissions}");
total_pending
}
pub(crate) fn increment_current_clear_rerequest_counter(&mut self) {
self.current_clear_rerequest_counter += 1;
}
pub(crate) fn reset_current_clear_rerequest_counter(&mut self) {
self.current_clear_rerequest_counter = 0;
}
pub(crate) fn reset_last_request_failure(&mut self, now: OffsetDateTime) -> OffsetDateTime {
mem::replace(&mut self.last_request_failure, now)
}
}
/// Reply controller responsible for controlling receiver-related part
/// of replies, such as requesting additional reply SURBs
pub struct ReceiverReplyController<R> {
config: Config,
surb_refresh_state: SurbRefreshState,
topology_access: TopologyAccessor,
surb_senders: HashMap<AnonymousSenderTag, SenderData>,
unavailable: HashMap<AnonymousSenderTag, OffsetDateTime>,
surbs_storage: ReceivedReplySurbsMap,
// TODO: incorporate that field at some point
// and use binomial distribution to determine the expected required number
// of surbs required to send the message through
// expected_reliability: f32,
message_handler: MessageHandler<R>,
}
impl<R> ReceiverReplyController<R>
where
R: CryptoRng + Rng,
{
pub(crate) fn new(
config: Config,
storage: ReceivedReplySurbsMap,
message_handler: MessageHandler<R>,
) -> Self {
let topology_access = message_handler.topology_access_handle().clone();
ReceiverReplyController {
config,
surb_refresh_state: SurbRefreshState::WaitingForNextRotation {
last_known: config
.key_rotation
.expected_current_key_rotation_id(OffsetDateTime::now_utc()),
},
topology_access,
surb_senders: Default::default(),
unavailable: Default::default(),
surbs_storage: storage,
message_handler,
}
}
fn get_or_create_surb_sender(&mut self, tag: &AnonymousSenderTag) -> &mut SenderData {
self.surb_senders.entry(*tag).or_default()
}
async fn current_topology_metadata(&self) -> Option<NymTopologyMetadata> {
self.topology_access.current_metadata().await
}
fn insert_pending_replies<I: IntoIterator<Item = FragmentWithMaxRetransmissions>>(
&mut self,
recipient: &AnonymousSenderTag,
fragments: I,
lane: TransmissionLane,
) {
trace!("buffering pending replies for {recipient}");
self.surb_senders
.entry(*recipient)
.or_default()
.pending_replies
.store(&lane, fragments)
}
fn re_insert_pending_replies(
&mut self,
recipient: &AnonymousSenderTag,
fragments: Vec<(TransmissionLane, FragmentWithMaxRetransmissions)>,
) {
trace!("re-inserting pending replies for {recipient}");
// the buffer should ALWAYS exist at this point, if it doesn't, it's a bug...
self.surb_senders
.entry(*recipient)
.or_default()
.pending_replies
.store_multiple(fragments)
}
fn re_insert_pending_retransmission(
&mut self,
recipient: &AnonymousSenderTag,
data: Vec<Arc<PendingAcknowledgement>>,
) {
trace!("re-inserting pending retransmissions for {recipient}");
// SAFETY: the underlying entry MUST exist as we've just got data from there
// and we hold a mut reference
#[allow(clippy::expect_used)]
let map_entry = &mut self
.surb_senders
.get_mut(recipient)
.expect("our pending retransmission entry is somehow gone!")
.pending_retransmissions;
for pending in data {
// if it's 0, we don't need to do anything - we just got that ack!
if Arc::strong_count(&pending) > 1 {
let id = pending.inner_fragment_identifier();
let downgraded = Arc::downgrade(&pending);
map_entry.insert(id, downgraded);
}
}
}
fn should_request_more_surbs(&self, target: &AnonymousSenderTag) -> bool {
trace!("checking if we should request more surbs from {target}");
let total_queue = self
.surb_senders
.get(target)
.map(|pending| pending.total_pending())
.unwrap_or_default();
// only consider 'fresh' surbs
let available_surbs = self.surbs_storage.available_fresh_surbs(target);
let pending_surbs = self.surbs_storage.pending_reception(target) as usize;
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
let max_surbs_threshold = self.surbs_storage.max_surb_threshold();
let min_surbs_threshold_buffer =
self.config.reply_surbs.minimum_reply_surb_threshold_buffer;
// After clearing the queue, we want to have at least `min_surbs_threshold` surbs available
// and reserved for requesting additional surbs, and in addition to that we also want to
// have `min_surbs_threshold_buffer` surbs available proactively.
let target_surbs_after_clearing_queue = min_surbs_threshold + min_surbs_threshold_buffer;
// Check if we have enough surbs to handle the total queue and maintain minimum thresholds
let total_required_surbs = total_queue + target_surbs_after_clearing_queue;
let total_available_surbs = pending_surbs + available_surbs;
debug!("available surbs: {available_surbs} pending surbs: {pending_surbs} threshold range: {min_surbs_threshold}..+{min_surbs_threshold_buffer}..{max_surbs_threshold}");
// We should request more surbs if:
// 1. We haven't hit the maximum surb threshold, and
// 2. We don't have enough surbs to handle the queue plus minimum thresholds
let is_below_max_threshold = total_available_surbs < max_surbs_threshold;
let is_below_required_surbs = total_available_surbs < total_required_surbs;
is_below_max_threshold && is_below_required_surbs
}
pub(crate) async fn handle_send_reply(
&mut self,
recipient_tag: AnonymousSenderTag,
data: Vec<u8>,
lane: TransmissionLane,
max_retransmissions: Option<u32>,
) {
if !self.surbs_storage.contains_surbs_for(&recipient_tag) {
if self
.unavailable
.insert(recipient_tag, OffsetDateTime::now_utc())
.is_none()
{
// don't report it every single time
warn!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
} else {
trace!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
}
return;
}
trace!("handling reply to {recipient_tag}");
let mut fragments = self.message_handler.split_reply_message(data);
let total_size = fragments.len();
trace!("This reply requires {total_size} SURBs");
// for the purposes of sending reply, do allow using possibly stale entries
let available_surbs = self.surbs_storage.available_surbs(&recipient_tag);
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
let max_to_send = if available_surbs > min_surbs_threshold {
min(fragments.len(), available_surbs - min_surbs_threshold)
} else {
0
};
if max_to_send > 0 {
let (surbs, surbs_left) = self
.surbs_storage
.get_reply_surbs(&recipient_tag, max_to_send);
debug!(
"retrieved {} reply surbs. {surbs_left} surbs remaining in storage",
surbs.as_ref().map(|s| s.len()).unwrap_or_default()
);
if let Some(reply_surbs) = surbs {
let to_send = fragments
.drain(..reply_surbs.len())
.map(|f| FragmentWithMaxRetransmissions {
fragment: f,
max_retransmissions,
})
.collect::<Vec<_>>();
if let Err(err) = self
.message_handler
.try_send_reply_chunks_on_lane(
recipient_tag,
to_send.clone(),
reply_surbs,
lane,
)
.await
{
let err = err.return_unused_surbs(&self.surbs_storage, &recipient_tag);
warn!("failed to send reply to {recipient_tag}: {err}");
info!(
"buffering {no_fragments} fragments for {recipient_tag}",
no_fragments = to_send.len()
);
self.insert_pending_replies(&recipient_tag, to_send, lane);
}
}
}
// if there's leftover data we didn't send because we didn't have enough (or any) surbs - buffer it
if !fragments.is_empty() {
// Ideally we should have enough surbs above the minimum threshold to handle sending
// new replies without having to first request more surbs. That's why I'd like to log
// these cases as they might indicate a problem with the surb management.
debug!(
"buffering {no_fragments} fragments for {recipient_tag}",
no_fragments = fragments.len()
);
let fragments: Vec<_> = fragments
.into_iter()
.map(|fragment| FragmentWithMaxRetransmissions {
fragment,
max_retransmissions,
})
.collect();
self.insert_pending_replies(&recipient_tag, fragments, lane);
}
if self.should_request_more_surbs(&recipient_tag) {
self.request_reply_surbs_for_queue_clearing(recipient_tag)
.await;
}
}
async fn request_additional_reply_surbs(
&mut self,
target: AnonymousSenderTag,
amount: u32,
) -> Result<(), PreparationError> {
debug!("requesting {amount} additional reply surbs for {target}");
let (reply_surb, _) = self
.surbs_storage
.get_reply_surb_ignoring_threshold(&target);
let reply_surb = reply_surb.ok_or(PreparationError::NotEnoughSurbs {
available: 0,
required: 1,
})?;
if let Err(err) = self
.message_handler
.try_request_additional_reply_surbs(target, reply_surb, amount)
.await
{
let err = err.return_unused_surbs(&self.surbs_storage, &target);
warn!("failed to request additional surbs from {target}: {err}",);
return Err(err);
} else {
self.surbs_storage
.increment_pending_reception(&target, amount);
}
Ok(())
}
async fn try_clear_pending_retransmission(&mut self, target: AnonymousSenderTag) {
trace!("trying to clear pending retransmission queue");
let available_surbs = self.surbs_storage.available_surbs(&target);
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
let max_to_clear = if available_surbs > min_surbs_threshold {
available_surbs - min_surbs_threshold
} else {
trace!("we don't have enough surbs for retransmission queue clearing...");
return;
};
trace!("we can clear up to {max_to_clear} entries");
let Some(pending) = self.surb_senders.get_mut(&target) else {
trace!("no pending entry for {target}!");
return;
};
let mut to_take = Vec::new();
while to_take.len() < max_to_clear {
if let Some((_, data)) = pending.pending_retransmissions.pop_first() {
// no need to do anything if we failed to upgrade the reference,
// it means we got the ack while the data was waiting in the queue
if let Some(upgraded) = data.upgrade() {
to_take.push(upgraded)
}
} else {
// our map is empty!
break;
}
}
if to_take.is_empty() {
// no need to do anything
return;
}
let (surbs_for_reply, _) = self.surbs_storage.get_reply_surbs(&target, to_take.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
self.re_insert_pending_retransmission(&target, to_take);
return;
};
let to_send_vec = to_take.iter().map(|ack| ack.fragment_data()).collect();
let prepared_fragments = match self
.message_handler
.prepare_reply_chunks_for_sending(to_send_vec, surbs_for_reply)
.await
{
Ok(prepared) => prepared,
Err(err) => {
let err = err.return_unused_surbs(&self.surbs_storage, &target);
self.re_insert_pending_retransmission(&target, to_take);
warn!("failed to clear pending retransmission queue for {target}: {err}",);
return;
}
};
// we can't fail at this point, so drop all references to acks so that timer updates wouldn't blow up
drop(to_take);
self.message_handler
.send_retransmission_reply_chunks(prepared_fragments, TransmissionLane::Retransmission)
.await;
}
fn pop_at_most_pending_replies(
&mut self,
from: &AnonymousSenderTag,
amount: usize,
) -> Option<Vec<(TransmissionLane, FragmentWithMaxRetransmissions)>> {
// if possible, pop all pending replies, if not, pop only entries for which we'd have a reply surb
let pending = self.surb_senders.get_mut(from)?;
let total = pending.pending_replies.total_size();
trace!("pending queue has {total} elements");
if total == 0 {
return None;
}
pending
.pending_replies
.pop_at_most_n_next_messages_at_random(amount)
}
#[allow(clippy::panic)]
async fn try_clear_pending_queue(&mut self, target: AnonymousSenderTag) {
trace!("trying to clear pending queue");
let available_surbs = self.surbs_storage.available_surbs(&target);
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
let max_to_clear = if available_surbs > min_surbs_threshold {
available_surbs - min_surbs_threshold
} else {
trace!("we don't have enough surbs for queue clearing...");
return;
};
trace!("we can clear up to {max_to_clear} entries");
// we're guaranteed to not get more entries than we have reply surbs for
if let Some(to_send) = self.pop_at_most_pending_replies(&target, max_to_clear) {
let to_send_clone = to_send.clone();
if to_send_clone.is_empty() {
panic!(
"please let the devs know if you ever see this message (reply_controller.rs)"
);
}
let (surbs_for_reply, _) = self
.surbs_storage
.get_reply_surbs(&target, to_send_clone.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
self.re_insert_pending_replies(&target, to_send);
return;
};
if let Err(err) = self
.message_handler
.try_send_reply_chunks(target, to_send_clone, surbs_for_reply)
.await
{
let err = err.return_unused_surbs(&self.surbs_storage, &target);
self.re_insert_pending_replies(&target, to_send);
warn!("failed to clear pending queue for {target}: {err}");
}
} else {
trace!("the pending queue is empty");
}
}
fn reset_rerequest_counter(&mut self, from: &AnonymousSenderTag) {
if let Some(pending) = self.surb_senders.get_mut(from) {
pending.reset_current_clear_rerequest_counter()
}
}
pub(crate) async fn handle_received_surbs(
&mut self,
from: AnonymousSenderTag,
reply_surbs: Vec<ReplySurbWithKeyRotation>,
from_surb_request: bool,
) {
trace!("handling received surbs");
// clear the requesting flag since we should have been asking for surbs
if from_surb_request {
self.surbs_storage
.decrement_pending_reception(&from, reply_surbs.len() as u32);
}
// store received surbs
self.surbs_storage.insert_fresh_surbs(&from, reply_surbs);
// reset, if applicable, request counter
self.reset_rerequest_counter(&from);
// use as many as we can for clearing pending retransmission queue
self.try_clear_pending_retransmission(from).await;
// use as many as we can for clearing pending 'normal' queue
self.try_clear_pending_queue(from).await;
// if we have to, request more
if self.should_request_more_surbs(&from) {
self.request_reply_surbs_for_queue_clearing(from).await;
}
}
fn buffer_pending_ack(
&mut self,
recipient: AnonymousSenderTag,
ack_ref: Arc<PendingAcknowledgement>,
weak_ack_ref: Weak<PendingAcknowledgement>,
) {
let frag_id = ack_ref.inner_fragment_identifier();
let pending = self.surb_senders.entry(recipient).or_default();
if let Entry::Vacant(e) = pending.pending_retransmissions.entry(frag_id) {
e.insert(weak_ack_ref);
} else {
warn!(
"we're already trying to retransmit {frag_id}. We must be really behind in surbs!"
);
}
}
pub(crate) async fn handle_reply_retransmission(
&mut self,
recipient_tag: AnonymousSenderTag,
timed_out_ack: Weak<PendingAcknowledgement>,
extra_surbs_request: bool,
) {
// seems we got the ack in the end
let ack_ref = match timed_out_ack.upgrade() {
Some(ack) => ack,
None => {
debug!("we received the ack for one of the reply packets as we were putting it in the retransmission queue");
return;
}
};
// if this is retransmission for obtaining additional reply surbs,
// we can dip below the storage threshold
let (maybe_reply_surb, _) = if extra_surbs_request {
self.surbs_storage
.get_reply_surb_ignoring_threshold(&recipient_tag)
} else {
self.surbs_storage.get_reply_surb(&recipient_tag)
};
if let Some(reply_surb) = maybe_reply_surb {
match self
.message_handler
.try_prepare_single_reply_chunk_for_sending(reply_surb, ack_ref.fragment_data())
.await
{
Ok(prepared) => {
// drop the ack ref so that controller would not panic on `UpdateTimer` if that task
// got to handle the action before this function terminated (which is very much
// possible if `forward_messages` takes a while)
drop(ack_ref);
self.message_handler
.update_ack_delay(prepared.fragment_identifier, prepared.total_delay);
self.message_handler
.forward_messages(vec![prepared.into()], TransmissionLane::Retransmission)
.await;
}
Err(err) => {
let err = err.return_unused_surbs(&self.surbs_storage, &recipient_tag);
warn!("failed to prepare message for retransmission - {err}");
// we buffer that packet and to try another day
self.buffer_pending_ack(recipient_tag, ack_ref, timed_out_ack);
if self.should_request_more_surbs(&recipient_tag) {
self.request_reply_surbs_for_queue_clearing(recipient_tag)
.await;
}
}
};
} else {
self.buffer_pending_ack(recipient_tag, ack_ref, timed_out_ack);
if self.should_request_more_surbs(&recipient_tag) {
self.request_reply_surbs_for_queue_clearing(recipient_tag)
.await;
}
}
}
// to be honest this doesn't make a lot of sense in the context of `connection_id`,
// it should really be asked per tag
pub(crate) fn handle_lane_queue_length(
&self,
connection_id: ConnectionId,
response_channel: oneshot::Sender<usize>,
) {
// TODO: if we ever have duplicate ids for different senders, it means our rng is super weak
// thus I don't think we have to worry about it?
let lane = TransmissionLane::ConnectionId(connection_id);
for buf in self.surb_senders.values().map(|p| &p.pending_replies) {
if let Some(length) = buf.lane_length(&lane) {
if response_channel.send(length).is_err() {
error!("the requester for lane queue length has dropped the response channel!")
}
return;
}
}
// make sure that if we didn't find that lane, we reply with 0
if response_channel.send(0).is_err() {
error!("the requester for lane queue length has dropped the response channel!")
}
}
// TODO: modify this method to more accurately determine the amount of surbs it needs to request
// it should take into consideration the average latency, sending rate and queue size.
// it should request as many surbs as it takes to saturate its sending rate before next batch arrives
async fn request_reply_surbs_for_queue_clearing(&mut self, target: AnonymousSenderTag) {
trace!("requesting surbs for queue clearing");
let total_queue = self
.surb_senders
.get(&target)
.map(|pending| pending.total_pending() as u32)
.unwrap_or_default();
let min_surbs_buffer = self.config.reply_surbs.minimum_reply_surb_threshold_buffer as u32;
// To proactively request additional surbs, we aim to have a buffer of extra surbs in our
// storage.
let total_queue_with_buffer = total_queue + min_surbs_buffer;
let request_size = min(
self.config.reply_surbs.maximum_reply_surb_request_size,
max(
total_queue_with_buffer,
self.config.reply_surbs.minimum_reply_surb_request_size,
),
);
if let Err(err) = self
.request_additional_reply_surbs(target, request_size)
.await
{
let now = OffsetDateTime::now_utc();
let sender_info = self.get_or_create_surb_sender(&target);
let last_failure = sender_info.reset_last_request_failure(now);
// only log at higher level if it's the first time this error has occurred in a while
if now - last_failure > time::Duration::seconds(30) {
warn!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
} else {
debug!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
}
}
}
pub(crate) async fn inspect_stale_pending_data(&mut self) {
let mut to_request = Vec::new();
let mut to_remove = Vec::new();
let now = OffsetDateTime::now_utc();
for (pending_reply_target, vals) in self.surb_senders.iter_mut() {
// for now recreate old behaviour
let retransmission_buf = &vals.pending_replies;
if retransmission_buf.is_empty() {
continue;
}
let Some(last_received_time) = self
.surbs_storage
.surbs_last_received_at(pending_reply_target)
else {
error!("we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!", retransmission_buf.total_size());
to_remove.push(*pending_reply_target);
continue;
};
let diff = now - last_received_time;
let max_rerequest_wait = self
.config
.reply_surbs
.maximum_reply_surb_rerequest_waiting_period;
let max_drop_wait = self
.config
.reply_surbs
.maximum_reply_surb_drop_waiting_period;
let max_rerequests = self.config.reply_surbs.maximum_reply_surbs_rerequests;
// if we have already requested extra surbs because of the stale entry,
// don't do it again (otherwise we'll get stuck in a constant cycle of requesting more surbs
// if client is offline)
if vals.current_clear_rerequest_counter > max_rerequests {
to_remove.push(*pending_reply_target);
debug!("we have reached the maximum threshold of attempting to request surbs from {pending_reply_target}. dropping the sender");
continue;
}
if diff > max_rerequest_wait {
if diff > max_drop_wait {
to_remove.push(*pending_reply_target)
} else {
debug!("We haven't received any surbs in {} from {pending_reply_target}. Going to explicitly ask for more", humantime::format_duration(diff.unsigned_abs()));
vals.increment_current_clear_rerequest_counter();
to_request.push(*pending_reply_target);
}
}
}
for pending_reply_target in to_request {
self.request_reply_surbs_for_queue_clearing(pending_reply_target)
.await;
self.surbs_storage
.reset_pending_reception(&pending_reply_target)
}
for to_remove in to_remove {
// TODO: in the 'old' version we just removed pending messages,
// not retransmissions, but I think those should follow the same logic.
// if something breaks because of that. I guess here is your explanation, future reader
self.surb_senders.remove(&to_remove);
}
}
pub(crate) async fn check_surb_refresh(&mut self) {
let Some(current_rotation_id) = self.topology_access.current_key_rotation_id().await else {
warn!("failed to retrieve current key rotation id from the network topology");
return;
};
if let SurbRefreshState::WaitingForNextRotation { last_known } = self.surb_refresh_state {
if last_known == current_rotation_id {
trace!("no changes in key rotation id");
} else {
// key rotation actually changed and given the polling rate (1/8th epoch) we should have plenty
// of time to perform the upgrade.
// but wait for one more call before doing this so that the clients could also resync
// their topologies and discover new rotation
self.surb_refresh_state = SurbRefreshState::ScheduledForNextInvocation;
}
return;
}
// here we are in `SurbRefreshState::ScheduledForNextInvocation` state
let mut marked_as_stale = HashMap::new();
// 1. mark all existing surbs we have as possibly stale
for mut map_entry in self.surbs_storage.as_raw_iter_mut() {
let (sender, received) = map_entry.pair_mut();
let num_downgraded = received.downgrade_freshness();
trace!("{sender}: {num_downgraded} downgraded");
if num_downgraded != 0 {
marked_as_stale.insert(*sender, num_downgraded);
}
}
// 2. attempt to re-request the equivalent number of fresh surbs
// TODO PROBLEM: if our request gets lost, we might be in trouble...
// we need some sort of retry mechanism
for (sender, num_to_request) in marked_as_stale {
if self
.request_additional_reply_surbs(sender, num_to_request as u32)
.await
.is_err()
{
warn!("surb refresh request failed")
}
}
self.surb_refresh_state = SurbRefreshState::WaitingForNextRotation {
last_known: current_rotation_id,
};
}
pub(crate) async fn inspect_and_clear_stale_data(&mut self, now: OffsetDateTime) {
// technically we don't know if epoch is stuck, but we're flying in blind here,
// so we have to assume the worst and not purge anything depending on proper epoch progression
let is_epoch_stuck = self
.current_topology_metadata()
.await
.map(|m| self.config.key_rotation.epoch_stuck(m))
.unwrap_or(false);
// expected time of when the CURRENT key rotation has begun
let expected_current_key_rotation_start = self
.config
.key_rotation
.expected_current_key_rotation_start(now);
// expected ID of the CURRENT key rotation
let expected_current_key_rotation = self
.config
.key_rotation
.expected_current_key_rotation_id(now);
// time of the start of one epoch BEFORE the CURRENT rotation has begun
// this indicates the starting time of when packets with the current keys might have been constructed
let prior_epoch_start =
expected_current_key_rotation_start - self.config.key_rotation.epoch_duration;
// time of the start of one epoch AFTER the current rotation has begun
// this indicates the end of transition period and any packets constructed with keys different
// from the current one are definitely invalid
let following_epoch_start =
expected_current_key_rotation_start + self.config.key_rotation.epoch_duration;
// define a closure for validating individual surbs
// (we have to run it twice for different piles)
let basic_surb_retention_logic = |received_surb: &ReceivedReplySurb| {
if is_epoch_stuck {
let diff = now - received_surb.received_at();
return diff < self.config.key_rotation.rotation_lifetime();
}
if received_surb.received_at() < prior_epoch_start {
// it's definitely from previous rotation
return false;
}
let surb_rotation = received_surb.key_rotation();
if surb_rotation.is_unknown() {
// can't do anything, so just retain it
return true;
}
// TODO: will this backfire during transition period where we need surbs to refresh surbs
// and we failed to send a request?
if surb_rotation.is_even() && expected_current_key_rotation % 2 == 1 {
return false;
}
if surb_rotation.is_odd() && expected_current_key_rotation % 2 == 0 {
return false;
}
true
};
// 1. purge full old clients data (this applies to RECEIVER)
self.surbs_storage.retain(|_, received| {
if is_epoch_stuck {
// if epoch is stuck, we can't do much (because we don't know for certain if rotation has advanced)
// apart from the basic check of surbs being received more than maximum lifetime of a rotation
// because at that point we know they must be invalid
let diff = now - received.surbs_last_received_at();
return diff < self.config.key_rotation.rotation_lifetime();
}
// if surbs were received more than 1h before the start of the current rotation,
// they're DEFINITELY invalid.
// if it was up until 1h AFTER the start of the current rotation they MIGHT be valid -
// we don't know for sure, unless the client explicitly attached rotation information
// (which only applies to more recent versions of clients so we can't 100% rely on that)
if received.surbs_last_received_at() < prior_epoch_start {
return false;
}
// 1.1. check individual surbs (same basic logic applies)
received.retain_fresh_surbs(&basic_surb_retention_logic);
// 1.2. check the possibly stale entries
// 1.2.1. check if we're beyond the key rotation transition period,
// if so those surbs are definitely unusable
if now > following_epoch_start {
received.drop_possibly_stale_surbs();
}
// 1.2.2. otherwise continue with the same logic as the fresh ones
received.retain_possibly_stale_surbs(&basic_surb_retention_logic);
// no surbs left, we're not expecting any AND we haven't received anything in a while
// (i.e. sender probably abandoned us)
let max_drop_wait = self
.config
.reply_surbs
.maximum_reply_surb_drop_waiting_period;
let last_received = received.surbs_last_received_at();
let possibly_abandoned = last_received + max_drop_wait < now;
if received.is_empty() && received.pending_reception() == 0 && possibly_abandoned {
return false;
}
true
});
// 1.3 inspect old unavailable receivers to clear any stale data
self.unavailable
.retain(|_, last_reported| now - *last_reported < time::Duration::seconds(30));
}
}
@@ -3,12 +3,12 @@
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
use futures::channel::{mpsc, oneshot};
use log::error;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_task::connections::{ConnectionId, TransmissionLane};
use std::sync::Weak;
use tracing::error;
pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerReceiver) {
let (tx, rx) = mpsc::unbounded();
@@ -0,0 +1,101 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller::Config;
use nym_client_core_surb_storage::{CombinedReplyStorage, SentReplyKeys, UsedSenderTags};
use nym_crypto::aes::cipher::crypto_common::rand_core::CryptoRng;
use nym_sphinx::addressing::Recipient;
use rand::Rng;
use std::cmp::min;
use std::time::Duration;
use time::OffsetDateTime;
use tracing::{debug, trace, warn};
/// Reply controller responsible for controlling sender-related part
/// of replies, such as checking if any reply keys are stale
pub struct SenderReplyController<R> {
config: Config,
tags_storage: UsedSenderTags,
sent_reply_keys: SentReplyKeys,
message_handler: MessageHandler<R>,
}
impl<R> SenderReplyController<R>
where
R: CryptoRng + Rng,
{
pub(crate) fn new(
config: Config,
storage: &CombinedReplyStorage,
message_handler: MessageHandler<R>,
) -> Self {
SenderReplyController {
config,
tags_storage: storage.tags_storage(),
sent_reply_keys: storage.key_storage(),
message_handler,
}
}
pub(crate) async fn handle_surb_request(&mut self, recipient: Recipient, mut amount: u32) {
// 1. check whether we sent any surbs in the past to this recipient, otherwise
// they have no business in asking for more
if !self.tags_storage.exists(&recipient) {
warn!("{recipient} asked us for reply SURBs even though we never sent them any anonymous messages before!");
return;
}
// 2. check whether the requested amount is within sane range
if amount
> self
.config
.reply_surbs
.maximum_allowed_reply_surb_request_size
{
warn!("The requested reply surb amount is larger than our maximum allowed ({amount} > {}). Lowering it to a more sane value...", self.config.reply_surbs.maximum_allowed_reply_surb_request_size);
amount = self
.config
.reply_surbs
.maximum_allowed_reply_surb_request_size;
}
// 3. construct and send the surbs away
// (send them in smaller batches to make the experience a bit smoother
let mut remaining = amount;
while remaining > 0 {
let to_send = min(remaining, 100);
if let Err(err) = self
.message_handler
.try_send_additional_reply_surbs(
recipient,
to_send,
nym_sphinx::params::PacketType::Mix,
)
.await
{
warn!("failed to send additional surbs to {recipient} - {err}");
} else {
trace!("sent {to_send} reply SURBs to {recipient}");
}
remaining -= to_send;
}
}
pub(crate) fn inspect_and_clear_stale_data(&self, now: OffsetDateTime) {
// check reply keys (this applies to SENDER)
self.sent_reply_keys.retain(|_, reply_key| {
let diff = now - reply_key.sent_at;
if diff > self.config.reply_surbs.maximum_reply_key_age {
let std_diff = Duration::try_from(diff).unwrap_or_default();
let diff_formatted = humantime::format_duration(std_diff);
debug!("it's been {diff_formatted} since we created this reply key. it's probably never going to get used, so we're going to purge it...");
false
} else {
true
}
});
}
}
@@ -93,14 +93,14 @@ impl StatisticsControl {
None,
);
if let Err(err) = self.report_tx.send(report_message).await {
log::error!("Failed to report client stats: {:?}", err);
tracing::error!("Failed to report client stats: {err:?}");
} else {
self.stats.reset();
}
}
async fn run(&mut self) {
log::debug!("Started StatisticsControl with graceful shutdown support");
tracing::debug!("Started StatisticsControl with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut stats_report_interval = tokio_stream::wrappers::IntervalStream::new(
@@ -133,13 +133,13 @@ impl StatisticsControl {
tokio::select! {
biased;
_ = self.task_client.recv() => {
log::trace!("StatisticsControl: Received shutdown");
tracing::trace!("StatisticsControl: Received shutdown");
break;
},
stats_event = self.stats_rx.recv() => match stats_event {
Some(stats_event) => self.stats.handle_event(stats_event),
None => {
log::trace!("StatisticsControl: shutting down due to closed stats channel");
tracing::trace!("StatisticsControl: shutting down due to closed stats channel");
break;
}
},
@@ -161,13 +161,16 @@ impl StatisticsControl {
}
}
}
log::debug!("StatisticsControl: Exiting");
tracing::debug!("StatisticsControl: Exiting");
}
pub(crate) fn start(mut self) {
spawn_future(async move {
self.run().await;
})
spawn_future!(
async move {
self.run().await;
},
"StatisticsControl"
)
}
pub(crate) fn create_and_start(
@@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError, NymTopologyMetadata};
use nym_validator_client::models::KeyRotationId;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -125,7 +126,7 @@ impl TopologyAccessor {
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<'_, NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
@@ -134,6 +135,21 @@ impl TopologyAccessor {
}
}
pub async fn current_mixnet_epoch_id(&self) -> Option<u32> {
let route_provider = self.current_route_provider().await?;
Some(route_provider.absolute_epoch_id())
}
pub async fn current_key_rotation_id(&self) -> Option<KeyRotationId> {
let route_provider = self.current_route_provider().await?;
Some(route_provider.current_key_rotation())
}
pub async fn current_metadata(&self) -> Option<NymTopologyMetadata> {
let route_provider = self.current_route_provider().await?;
Some(route_provider.metadata())
}
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
self.inner.controlled_manually.store(true, Ordering::SeqCst);
self.inner.update(Some(new_topology)).await;
@@ -4,11 +4,11 @@
use crate::spawn_future;
pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
use futures::StreamExt;
use log::*;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_task::TaskClient;
use nym_topology::NymTopologyError;
use std::time::Duration;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
@@ -20,7 +20,7 @@ mod accessor;
pub mod nym_api_provider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::TopologyProvider;
pub use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
// TODO: move it to config later
const MAX_FAILURE_COUNT: usize = 10;
@@ -145,36 +145,39 @@ impl TopologyRefresher {
}
pub fn start(mut self) {
spawn_future(async move {
debug!("Started TopologyRefresher with graceful shutdown support");
spawn_future!(
async move {
debug!("Started TopologyRefresher with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
self.refresh_rate,
));
#[cfg(not(target_arch = "wasm32"))]
let mut interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(self.refresh_rate),
);
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
while !self.task_client.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = self.task_client.recv() => {
log::trace!("TopologyRefresher: Received shutdown");
},
while !self.task_client.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = self.task_client.recv() => {
tracing::trace!("TopologyRefresher: Received shutdown");
},
}
}
}
self.task_client.recv_timeout().await;
log::debug!("TopologyRefresher: Exiting");
})
self.task_client.recv_timeout().await;
tracing::debug!("TopologyRefresher: Exiting");
},
"TopologyRefresher"
)
}
}
@@ -2,13 +2,14 @@
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use log::{debug, error, warn};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::{NymTopology, NymTopologyMetadata};
use nym_validator_client::UserAgent;
use nym_mixnet_contract_common::EpochRewardedSet;
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
use nym_topology::NymTopology;
use nym_validator_client::nym_api::NymApiClientExt;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
use tracing::{debug, error, warn};
use url::Url;
#[derive(Debug)]
@@ -40,38 +41,43 @@ impl Config {
pub struct NymApiTopologyProvider {
config: Config,
validator_client: nym_validator_client::client::NymApiClient,
validator_client: nym_http_api_client::Client,
nym_api_urls: Vec<Url>,
currently_used_api: usize,
use_bincode: bool,
}
impl NymApiTopologyProvider {
pub fn new(
config: impl Into<Config>,
mut nym_api_urls: Vec<Url>,
user_agent: Option<UserAgent>,
validator_client: nym_http_api_client::Client,
) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
let validator_client = if let Some(user_agent) = user_agent {
nym_validator_client::client::NymApiClient::new_with_user_agent(
nym_api_urls[0].clone(),
user_agent,
)
} else {
nym_validator_client::client::NymApiClient::new(nym_api_urls[0].clone())
};
NymApiTopologyProvider {
let mut provider = NymApiTopologyProvider {
config: config.into(),
validator_client,
nym_api_urls,
currently_used_api: 0,
}
use_bincode: true,
};
// Set all API URLs - the client will try them in order with automatic failover
provider.validator_client.change_base_urls(
provider
.nym_api_urls
.iter()
.map(|u| u.clone().into())
.collect(),
);
provider
}
pub fn disable_bincode(&mut self) {
self.validator_client.use_bincode = false;
self.use_bincode = false;
// Note: The unified client doesn't support toggling bincode after creation.
// This would require recreating the client without bincode.
// For now, we'll track the preference but it won't take effect.
warn!("Disabling bincode on existing client is not currently supported");
}
fn use_next_nym_api(&mut self) {
@@ -81,8 +87,19 @@ impl NymApiTopologyProvider {
}
self.currently_used_api = (self.currently_used_api + 1) % self.nym_api_urls.len();
self.validator_client
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
// Provide all URLs starting from the next one in rotation order
// This enables automatic failover to other endpoints
let rotated_urls: Vec<_> = self
.nym_api_urls
.iter()
.cycle()
.skip(self.currently_used_api)
.take(self.nym_api_urls.len())
.map(|u| u.clone().into())
.collect();
self.validator_client.change_base_urls(rotated_urls)
}
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
@@ -108,9 +125,10 @@ impl NymApiTopologyProvider {
.filter(|n| n.performance.round_to_integer() >= self.config.min_node_performance())
.collect::<Vec<_>>();
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
NymTopology::new(
NymTopologyMetadata::new(metadata.rotation_id, metadata.absolute_epoch_id),
rewarded_set,
metadata.to_topology_metadata(),
epoch_rewarded_set,
Vec::new(),
)
.with_skimmed_nodes(&nodes_filtered)
@@ -124,7 +142,7 @@ impl NymApiTopologyProvider {
// TODO: we really should be getting ACTIVE gateways only
let gateways_fut = self
.validator_client
.get_all_basic_entry_assigned_nodes_v2();
.get_all_basic_entry_assigned_nodes_with_metadata();
let (rewarded_set, mixnodes_res, gateways_res) =
futures::try_join!(rewarded_set_fut, mixnodes_fut, gateways_fut)
@@ -136,7 +154,7 @@ impl NymApiTopologyProvider {
let metadata = mixnodes_res.metadata;
let mixnodes = mixnodes_res.nodes;
if gateways_res.metadata != metadata {
if !gateways_res.metadata.consistency_check(&metadata) {
warn!("inconsistent nodes metadata between mixnodes and gateways calls! {metadata:?} and {:?}", gateways_res.metadata);
return None;
}
@@ -161,9 +179,10 @@ impl NymApiTopologyProvider {
}
}
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
NymTopology::new(
NymTopologyMetadata::new(metadata.rotation_id, metadata.absolute_epoch_id),
rewarded_set,
metadata.to_topology_metadata(),
epoch_rewarded_set,
Vec::new(),
)
.with_skimmed_nodes(&nodes)
@@ -36,11 +36,18 @@ impl SizedData for Fragment {
}
}
#[derive(Default)]
pub(crate) struct TransmissionBuffer<T> {
buffer: HashMap<TransmissionLane, LaneBufferEntry<T>>,
}
impl<T> Default for TransmissionBuffer<T> {
fn default() -> Self {
TransmissionBuffer {
buffer: HashMap::new(),
}
}
}
impl<T> TransmissionBuffer<T> {
pub(crate) fn new() -> Self {
TransmissionBuffer {
@@ -211,7 +218,7 @@ impl<T> TransmissionBuffer<T> {
};
let msg = self.pop_front_from_lane(&lane)?;
log::trace!("picking to send from lane: {:?}", lane);
tracing::trace!("picking to send from lane: {lane:?}");
Some((lane, msg))
}
+25 -2
View File
@@ -6,7 +6,10 @@ use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::nym_api::error::NymAPIError;
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::ValidatorClientError;
use rand::distributions::WeightedError;
use std::error::Error;
use std::path::PathBuf;
@@ -52,7 +55,15 @@ pub enum ClientCoreError {
#[error("list of nym apis is empty")]
ListOfNymApisIsEmpty,
#[error("the current network topology seem to be insufficient to route any packets through")]
#[error("failed to resolve a query to nym API: {source}")]
NymApiQueryFailure {
#[from]
source: NymAPIError,
},
#[error(
"the current network topology seem to be insufficient to route any packets through:\n\t{0}"
)]
InsufficientNetworkTopology(#[from] NymTopologyError),
#[error("experienced a failure with our reply surb persistent storage: {source}")]
@@ -221,7 +232,19 @@ pub enum ClientCoreError {
UnexpectedKeyUpgrade { gateway_id: String },
#[error("failed to derive keys from master key")]
HkdfDerivationError {},
HkdfDerivationError,
#[error("missing url for constructing RPC client")]
RpcClientMissingUrl,
#[error("provided nym network details were malformed: {source}")]
InvalidNetworkDetails { source: NyxdError },
#[error("failed to construct RPC client: {source}")]
RpcClientCreationFailure { source: NyxdError },
#[error("failed to select valid gateway due to incomputable latency")]
GatewaySelectionFailure { source: WeightedError },
}
impl From<tungstenite::Error> for ClientCoreError {
+78 -18
View File
@@ -4,16 +4,17 @@
use crate::error::ClientCoreError;
use crate::init::types::RegistrationResult;
use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::GatewayClient;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKeyRef;
use nym_validator_client::client::{IdentityKeyRef, NymApiClientExt};
use nym_validator_client::nym_nodes::SkimmedNodesWithMetadata;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
#[cfg(unix)]
use std::os::fd::RawFd;
use std::{sync::Arc, time::Duration};
use tracing::{debug, info, trace, warn};
use tungstenite::Message;
use url::Url;
@@ -83,6 +84,48 @@ struct GatewayWithLatency<'a, G: ConnectableGateway> {
latency: Duration,
}
// Helper to collect all pages of entry nodes - replicates NymApiClient's convenience method
async fn get_all_basic_entry_nodes_with_metadata(
client: &nym_http_api_client::Client,
use_bincode: bool,
) -> Result<SkimmedNodesWithMetadata, ClientCoreError> {
// Get first page to obtain metadata
let mut page = 0;
let res = client
.get_basic_entry_assigned_nodes_v2(false, Some(page), None, use_bincode)
.await?;
let mut nodes = res.nodes.data;
let metadata = res.metadata;
if res.nodes.pagination.total == nodes.len() {
return Ok(SkimmedNodesWithMetadata::new(nodes, metadata));
}
page += 1;
// Collect remaining pages
loop {
let mut res = client
.get_basic_entry_assigned_nodes_v2(false, Some(page), None, use_bincode)
.await?;
if !metadata.consistency_check(&res.metadata) {
return Err(ClientCoreError::ValidatorClientError(
nym_validator_client::ValidatorClientError::InconsistentPagedMetadata,
));
}
nodes.append(&mut res.nodes.data);
if nodes.len() < res.nodes.pagination.total {
page += 1
} else {
break;
}
}
Ok(SkimmedNodesWithMetadata::new(nodes, metadata))
}
impl<'a, G: ConnectableGateway> GatewayWithLatency<'a, G> {
fn new(gateway: &'a G, latency: Duration) -> Self {
GatewayWithLatency { gateway, latency }
@@ -99,18 +142,35 @@ pub async fn gateways_for_init<R: Rng>(
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
let client = if let Some(user_agent) = user_agent {
nym_validator_client::client::NymApiClient::new_with_user_agent(nym_api.clone(), user_agent)
} else {
nym_validator_client::client::NymApiClient::new(nym_api.clone())
};
log::debug!("Fetching list of gateways from: {nym_api}");
// Use the unified HTTP client directly with optional user agent
let mut builder = nym_http_api_client::Client::builder(nym_api.clone())
.map_err(|e| {
ClientCoreError::ValidatorClientError(
nym_validator_client::ValidatorClientError::NymAPIError { source: e },
)
})?
.with_bincode(); // Use bincode for better performance
let gateways = client.get_all_basic_entry_assigned_nodes_v2().await?.nodes;
if let Some(user_agent) = user_agent {
builder = builder.with_user_agent(user_agent);
}
let client = builder.build().map_err(|e| {
ClientCoreError::ValidatorClientError(
nym_validator_client::ValidatorClientError::NymAPIError { source: e },
)
})?;
tracing::debug!("Fetching list of gateways from: {nym_api}");
// Use our helper to handle pagination
let gateways = get_all_basic_entry_nodes_with_metadata(&client, true)
.await?
.nodes;
info!("nym api reports {} gateways", gateways.len());
log::trace!("Gateways: {:#?}", gateways);
tracing::trace!("Gateways: {gateways:#?}");
// filter out gateways below minimum performance and ones that could operate as a mixnode
// (we don't want instability)
@@ -120,10 +180,10 @@ pub async fn gateways_for_init<R: Rng>(
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<_>>();
log::debug!("After checking validity: {}", valid_gateways.len());
log::trace!("Valid gateways: {:#?}", valid_gateways);
tracing::debug!("After checking validity: {}", valid_gateways.len());
tracing::trace!("Valid gateways: {valid_gateways:#?}");
log::info!(
tracing::info!(
"and {} after validity and performance filtering",
valid_gateways.len()
);
@@ -145,7 +205,7 @@ async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure)
}
async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, ClientCoreError>
async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<'_, G>, ClientCoreError>
where
G: ConnectableGateway,
{
@@ -242,7 +302,7 @@ pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
let gateways_with_latency = gateways_with_latency.lock().await;
let chosen = gateways_with_latency
.choose_weighted(rng, |item| 1. / item.latency.as_secs_f32())
.expect("invalid selection weight!");
.map_err(|source| ClientCoreError::GatewaySelectionFailure { source })?;
info!(
"chose gateway {} with average latency of {:?}",
@@ -286,7 +346,7 @@ pub(super) fn get_specified_gateway(
gateways: &[RoutingNode],
must_use_tls: bool,
) -> Result<RoutingNode, ClientCoreError> {
log::debug!("Requesting specified gateway: {}", gateway_identity);
tracing::debug!("Requesting specified gateway: {gateway_identity}");
let user_gateway = ed25519::PublicKey::from_base58_string(gateway_identity)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
@@ -326,7 +386,7 @@ pub(super) async fn register_with_gateway(
);
gateway_client.establish_connection().await.map_err(|err| {
log::warn!("Failed to establish connection with gateway!");
tracing::warn!("Failed to establish connection with gateway!");
ClientCoreError::GatewayClientError {
gateway_id: gateway_id.to_base58_string(),
source: Box::new(err),
@@ -336,7 +396,7 @@ pub(super) async fn register_with_gateway(
.perform_initial_authentication()
.await
.map_err(|err| {
log::warn!("Failed to register with the gateway {gateway_id}: {err}");
tracing::warn!("Failed to register with the gateway {gateway_id}: {err}");
ClientCoreError::GatewayClientError {
gateway_id: gateway_id.to_base58_string(),
source: Box::new(err),
+5 -5
View File
@@ -63,7 +63,7 @@ where
K::StorageError: Send + Sync + 'static,
D::StorageError: Send + Sync + 'static,
{
log::trace!("Setting up new gateway");
tracing::trace!("Setting up new gateway");
// if we're setting up new gateway, we must have had generated long-term client keys before
let client_keys = load_client_keys(key_store).await?;
@@ -202,10 +202,10 @@ where
K::StorageError: Send + Sync + 'static,
D::StorageError: Send + Sync + 'static,
{
log::debug!("Setting up gateway");
tracing::debug!("Setting up gateway");
match setup {
GatewaySetup::MustLoad { gateway_id } => {
log::debug!("GatewaySetup::MustLoad with id: {gateway_id:?}");
tracing::debug!("GatewaySetup::MustLoad with id: {gateway_id:?}");
use_loaded_gateway_details(key_store, details_store, gateway_id).await
}
GatewaySetup::New {
@@ -214,7 +214,7 @@ where
#[cfg(unix)]
connection_fd_callback,
} => {
log::debug!("GatewaySetup::New with spec: {specification:?}");
tracing::debug!("GatewaySetup::New with spec: {specification:?}");
setup_new_gateway(
key_store,
details_store,
@@ -230,7 +230,7 @@ where
gateway_details,
client_keys: managed_keys,
} => {
log::debug!("GatewaySetup::ReuseConnection");
tracing::debug!("GatewaySetup::ReuseConnection");
Ok(reuse_gateway_connection(
*authenticated_ephemeral_client,
*gateway_details,
+38 -2
View File
@@ -18,18 +18,54 @@ pub use nym_topology::{
};
#[cfg(target_arch = "wasm32")]
pub(crate) fn spawn_future<F>(future: F)
pub fn spawn_future<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(future);
}
// TODO: expose similar API to the rest of the codebase,
// perhaps with some simple trait for a task to define its name
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn spawn_future<F>(future: F)
#[track_caller]
pub fn spawn_future<F>(future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(future);
}
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn_named_future<F>(future: F, name: &str)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
cfg_if::cfg_if! {if #[cfg(tokio_unstable)] {
#[allow(clippy::expect_used)]
tokio::task::Builder::new().name(name).spawn(future).expect("failed to spawn future");
} else {
let _ = name;
tracing::debug!(r#"the underlying binary hasn't been built with `RUSTFLAGS="--cfg tokio_unstable"` - the future naming won't do anything"#);
spawn_future(future);
}}
}
#[macro_export]
macro_rules! spawn_future {
($future:expr) => {{
$crate::spawn_future($future)
}};
($future:expr, $name:expr) => {{
cfg_if::cfg_if! {if #[cfg(not(target_arch = "wasm32"))] {
$crate::spawn_named_future($future, $name)
} else {
let _ = $name;
$crate::spawn_future($future)
}}
}};
}
+3 -2
View File
@@ -9,7 +9,7 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
dashmap.workspace = true
log.workspace = true
tracing.workspace = true
thiserror.workspace = true
time.workspace = true
@@ -23,13 +23,14 @@ features = ["fs"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
workspace = true
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"]
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"]
optional = true
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx-pool-guard]
path = "../../../sqlx-pool-guard"
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
+7 -4
View File
@@ -2,23 +2,24 @@
// SPDX-License-Identifier: Apache-2.0
#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
#[cfg(feature = "fs-surb-storage")]
{
use anyhow::Context;
use sqlx::{Connection, SqliteConnection};
use std::env;
let out_dir = env::var("OUT_DIR").unwrap();
let out_dir = env::var("OUT_DIR")?;
let database_path = format!("{out_dir}/fs-surbs-example.sqlite");
let mut conn = SqliteConnection::connect(&format!("sqlite://{database_path}?mode=rwc"))
.await
.expect("Failed to create SQLx database connection");
.context("Failed to create SQLx database connection")?;
sqlx::migrate!("./fs_surbs_migrations")
.run(&mut conn)
.await
.expect("Failed to perform SQLx migrations");
.context("Failed to perform SQLx migrations")?;
#[cfg(target_family = "unix")]
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);
@@ -28,4 +29,6 @@ async fn main() {
// not a valid windows path... but hey, it works...
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
}
Ok(())
}
@@ -0,0 +1,81 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
-- change `previous_flush_timestamp` unix timestamp to `previous_flush` timestamp
CREATE TABLE status_new
(
flush_in_progress INTEGER NOT NULL,
previous_flush TIMESTAMP WITHOUT TIME ZONE NOT NULL,
client_in_use INTEGER NOT NULL
);
INSERT INTO status_new (flush_in_progress, previous_flush, client_in_use)
SELECT flush_in_progress,
datetime(previous_flush_timestamp, 'unixepoch') AS previous_flush,
client_in_use
FROM status;
DROP TABLE status;
ALTER TABLE status_new
RENAME TO status;
-- change `sent_at_timestamp` unix timestamp to `sent_at` timestamp
CREATE TABLE reply_key_new
(
key_digest BLOB NOT NULL UNIQUE,
reply_key BLOB NOT NULL UNIQUE,
sent_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
INSERT INTO reply_key_new (key_digest, reply_key, sent_at)
SELECT key_digest,
reply_key,
datetime(sent_at_timestamp, 'unixepoch') AS sent_at
FROM reply_key;
DROP TABLE reply_key;
ALTER TABLE reply_key_new
RENAME TO reply_key;
-- change `last_sent_timestamp` unix timestamp to `sent_at` last_sent
CREATE TABLE reply_surb_sender_new
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
last_sent TIMESTAMP WITHOUT TIME ZONE NOT NULL,
tag BLOB NOT NULL UNIQUE
);
INSERT INTO reply_surb_sender_new (id, last_sent, tag)
SELECT id,
datetime(last_sent_timestamp, 'unixepoch') AS last_sent,
tag
FROM reply_surb_sender;
-- recreate `reply_surb` table due to foreign key constraint
CREATE TABLE reply_surb_new
(
reply_surb_sender_id INTEGER NOT NULL,
reply_surb BLOB NOT NULL,
encoded_key_rotation TINYINT NOT NULL,
FOREIGN KEY (reply_surb_sender_id) REFERENCES reply_surb_sender_new (id)
);
INSERT INTO reply_surb_new
SELECT *
FROM reply_surb;
DROP TABLE reply_surb;
ALTER TABLE reply_surb_new
RENAME TO reply_surb;
DROP TABLE reply_surb_sender;
ALTER TABLE reply_surb_sender_new
RENAME TO reply_surb_sender;
@@ -0,0 +1,12 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
-- don't persist sender_tag in the DB. instead generate fresh one on each restart
-- this will:
-- A) further help against correlation attacks
-- B) realistically after client restarts, we might be in new key rotation anyway meaning receiver would have to start
-- "from scratch" with surbs
DROP TABLE sender_tag;
@@ -4,6 +4,7 @@
use crate::backend::Empty;
use crate::{CombinedReplyStorage, ReplyStorageBackend};
use async_trait::async_trait;
use time::OffsetDateTime;
// well, right now we don't have the browser storage : (
// so we keep everything in memory
@@ -38,7 +39,10 @@ impl ReplyStorageBackend for Backend {
self.empty.init_fresh(fresh).await
}
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError> {
self.empty.load_surb_storage().await
async fn load_surb_storage(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<CombinedReplyStorage, Self::StorageError> {
self.empty.load_surb_storage(surb_freshness_cutoff).await
}
}
@@ -3,17 +3,15 @@
use crate::backend::fs_backend::{
error::StorageError,
models::{
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag,
StoredSurbSender,
},
models::{ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSurbSender},
};
use log::{error, info};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use time::OffsetDateTime;
use tracing::{error, info};
use sqlx_pool_guard::SqlitePoolGuard;
@@ -39,7 +37,7 @@ impl StorageManager {
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(&database_path)
.filename(database_path)
.create_if_missing(fresh)
.disable_statement_logging();
@@ -51,8 +49,7 @@ impl StorageManager {
}
};
let connection_pool =
SqlitePoolGuard::new(database_path.as_ref().to_path_buf(), connection_pool);
let connection_pool = SqlitePoolGuard::new(connection_pool);
if let Err(err) = sqlx::migrate!("./fs_surbs_migrations")
.run(&*connection_pool)
@@ -81,9 +78,11 @@ impl StorageManager {
}
pub async fn create_status_table(&self) -> Result<(), sqlx::Error> {
sqlx::query!("INSERT INTO status(flush_in_progress, previous_flush_timestamp, client_in_use) VALUES (0, 0, 1)")
.execute(&*self.connection_pool)
.await?;
sqlx::query!(
"INSERT INTO status(flush_in_progress, previous_flush, client_in_use) VALUES (0, 0, 1)"
)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
@@ -94,18 +93,18 @@ impl StorageManager {
.map(|r| r.flush_in_progress > 0)
}
pub async fn set_previous_flush_timestamp(&self, timestamp: i64) -> Result<(), sqlx::Error> {
sqlx::query!("UPDATE status SET previous_flush_timestamp = ?", timestamp)
pub async fn set_previous_flush(&self, timestamp: OffsetDateTime) -> Result<(), sqlx::Error> {
sqlx::query!("UPDATE status SET previous_flush = ?", timestamp)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_previous_flush_timestamp(&self) -> Result<i64, sqlx::Error> {
sqlx::query!("SELECT previous_flush_timestamp FROM status;")
pub async fn get_previous_flush_time(&self) -> Result<OffsetDateTime, sqlx::Error> {
sqlx::query!(r#"SELECT previous_flush AS "previous_flush: OffsetDateTime" FROM status"#)
.fetch_one(&*self.connection_pool)
.await
.map(|r| r.previous_flush_timestamp)
.map(|r| r.previous_flush)
}
pub async fn set_flush_status(&self, in_progress: bool) -> Result<(), sqlx::Error> {
@@ -131,32 +130,6 @@ impl StorageManager {
Ok(())
}
pub async fn delete_all_tags(&self) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM sender_tag;")
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_tags(&self) -> Result<Vec<StoredSenderTag>, sqlx::Error> {
sqlx::query_as!(StoredSenderTag, "SELECT * FROM sender_tag;",)
.fetch_all(&*self.connection_pool)
.await
}
pub async fn insert_tag(&self, stored_tag: StoredSenderTag) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO sender_tag(recipient, tag) VALUES (?, ?);
"#,
stored_tag.recipient,
stored_tag.tag
)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn delete_all_reply_keys(&self) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM reply_key;")
.execute(&*self.connection_pool)
@@ -165,7 +138,7 @@ impl StorageManager {
}
pub async fn get_reply_keys(&self) -> Result<Vec<StoredReplyKey>, sqlx::Error> {
sqlx::query_as!(StoredReplyKey, "SELECT * FROM reply_key;",)
sqlx::query_as("SELECT * FROM reply_key;")
.fetch_all(&*self.connection_pool)
.await
}
@@ -176,11 +149,11 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO reply_key(key_digest, reply_key, sent_at_timestamp) VALUES (?, ?, ?);
INSERT INTO reply_key(key_digest, reply_key, sent_at) VALUES (?, ?, ?);
"#,
stored_reply_key.key_digest,
stored_reply_key.reply_key,
stored_reply_key.sent_at_timestamp
stored_reply_key.sent_at
)
.execute(&*self.connection_pool)
.await?;
@@ -188,7 +161,7 @@ impl StorageManager {
}
pub async fn get_surb_senders(&self) -> Result<Vec<StoredSurbSender>, sqlx::Error> {
sqlx::query_as!(StoredSurbSender, "SELECT * FROM reply_surb_sender;",)
sqlx::query_as("SELECT * FROM reply_surb_sender;")
.fetch_all(&*self.connection_pool)
.await
}
@@ -199,10 +172,10 @@ impl StorageManager {
) -> Result<i64, sqlx::Error> {
let id = sqlx::query!(
r#"
INSERT INTO reply_surb_sender(tag, last_sent_timestamp) VALUES (?, ?);
INSERT INTO reply_surb_sender(tag, last_sent) VALUES (?, ?);
"#,
stored_surb_sender.tag,
stored_surb_sender.last_sent_timestamp
stored_surb_sender.last_sent
)
.execute(&*self.connection_pool)
.await?
@@ -217,7 +190,7 @@ impl StorageManager {
sqlx::query_as!(
StoredReplySurb,
r#"
SELECT reply_surb_sender_id, reply_surb, encoded_key_rotation as "encoded_key_rotation: u8" FROM reply_surb
SELECT reply_surb_sender_id, reply_surb, encoded_key_rotation as "encoded_key_rotation: u8" FROM reply_surb
WHERE reply_surb_sender_id = ?
"#,
sender_id
@@ -4,20 +4,16 @@
use crate::{
backend::fs_backend::{
manager::StorageManager,
models::{
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag,
StoredSurbSender,
},
models::{ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSurbSender},
},
surb_storage::ReceivedReplySurbs,
CombinedReplyStorage, ReceivedReplySurbsMap, ReplyStorageBackend, SentReplyKeys,
UsedSenderTags,
};
use async_trait::async_trait;
use log::{debug, error, info, warn};
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use std::path::{Path, PathBuf};
use time::OffsetDateTime;
use tracing::{error, info, warn};
pub use self::error::StorageError;
@@ -57,10 +53,7 @@ impl Backend {
}
}
pub async fn try_load<P: AsRef<Path>>(
database_path: P,
fresh_sender_tags: bool,
) -> Result<Self, StorageError> {
pub async fn try_load<P: AsRef<Path>>(database_path: P) -> Result<Self, StorageError> {
let owned_path: PathBuf = database_path.as_ref().into();
if owned_path.file_name().is_none() {
return Err(StorageError::DatabasePathWithoutFilename {
@@ -69,7 +62,7 @@ impl Backend {
}
let manager = StorageManager::init(database_path, false).await?;
match Self::try_load_inner(&manager, fresh_sender_tags).await {
match Self::try_load_inner(&manager).await {
Ok(()) => Ok(Backend {
temporary_old_path: None,
database_path: owned_path,
@@ -87,18 +80,15 @@ impl Backend {
self.manager.close_pool().await
}
async fn try_load_inner(
manager: &StorageManager,
fresh_sender_tags: bool,
) -> Result<(), StorageError> {
async fn try_load_inner(manager: &StorageManager) -> Result<(), StorageError> {
// the database flush wasn't fully finished and thus the data is in inconsistent state
// (we don't really know what's properly saved or what's not)
if manager.get_flush_status().await? {
return Err(StorageError::IncompleteDataFlush);
}
let last_flush_timestamp = manager.get_previous_flush_timestamp().await?;
if last_flush_timestamp == 0 {
let last_flush = manager.get_previous_flush_time().await?;
if last_flush == OffsetDateTime::UNIX_EPOCH {
// either this client has been running since 1970 or the flush failed
return Err(StorageError::IncompleteDataFlush);
}
@@ -118,15 +108,6 @@ impl Backend {
return Err(err.into());
}
let last_flush = match OffsetDateTime::from_unix_timestamp(last_flush_timestamp) {
Ok(last_flush) => last_flush,
Err(err) => {
return Err(StorageError::CorruptedData {
details: format!("failed to parse stored timestamp - {err}"),
});
}
};
// in theory clients can use our reply surbs whenever they want, even a year in the future
// (assuming no key rotation has happened)
// but the way it's currently coded, everyone will purge old data
@@ -144,14 +125,6 @@ impl Backend {
manager.delete_all_reply_keys().await?;
}
if days > 2 {
info!("it's been over {days} days and {hours} hours since we last used our data store. our used sender tags are already outdated - we're going to purge them now.");
manager.delete_all_tags().await?;
} else if fresh_sender_tags {
debug!("starting with fresh sender tags");
manager.delete_all_tags().await?;
}
Ok(())
}
@@ -196,7 +169,7 @@ impl Backend {
async fn end_storage_flush(&self) -> Result<(), StorageError> {
self.manager
.set_previous_flush_timestamp(OffsetDateTime::now_utc().unix_timestamp())
.set_previous_flush(OffsetDateTime::now_utc())
.await?;
Ok(self.manager.set_flush_status(false).await?)
}
@@ -209,29 +182,6 @@ impl Backend {
Ok(self.manager.set_client_in_use_status(false).await?)
}
async fn get_stored_tags(&self) -> Result<UsedSenderTags, StorageError> {
let stored = self.manager.get_tags().await?;
// stop at the first instance of corruption. if even a single entry is malformed,
// something weird has happened and we can't trust the rest of the data
let raw = stored
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, _>>()?;
Ok(UsedSenderTags::from_raw(raw))
}
async fn dump_sender_tags(&self, tags: &UsedSenderTags) -> Result<(), StorageError> {
for map_ref in tags.as_raw_iter() {
let (recipient, tag) = map_ref.pair();
self.manager
.insert_tag(StoredSenderTag::new(*recipient, *tag))
.await?;
}
Ok(())
}
async fn get_stored_reply_keys(&self) -> Result<SentReplyKeys, StorageError> {
let stored = self.manager.get_reply_keys().await?;
@@ -255,14 +205,17 @@ impl Backend {
Ok(())
}
async fn get_stored_reply_surbs(&self) -> Result<ReceivedReplySurbsMap, StorageError> {
async fn get_stored_reply_surbs(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<ReceivedReplySurbsMap, StorageError> {
let surb_senders = self.manager.get_surb_senders().await?;
let metadata = self.get_reply_surb_storage_metadata().await?;
let mut received_surbs = Vec::with_capacity(surb_senders.len());
for sender in surb_senders {
let sender_id = sender.id;
let (sender_tag, surbs_last_received_at_timestamp): (AnonymousSenderTag, i64) =
let (sender_tag, surbs_last_received_at): (AnonymousSenderTag, OffsetDateTime) =
sender.try_into()?;
let stored_surbs = self
.manager
@@ -274,15 +227,17 @@ impl Backend {
received_surbs.push((
sender_tag,
ReceivedReplySurbs::new_retrieved(stored_surbs, surbs_last_received_at_timestamp),
ReceivedReplySurbs::new_retrieved(stored_surbs, surbs_last_received_at),
))
}
Ok(ReceivedReplySurbsMap::from_raw(
let received_surbs = ReceivedReplySurbsMap::from_raw(
metadata.min_reply_surb_threshold as usize,
metadata.max_reply_surb_threshold as usize,
received_surbs,
))
);
received_surbs.drop_stale_loaded_surbs(surb_freshness_cutoff);
Ok(received_surbs)
}
async fn dump_reply_surbs(
@@ -304,6 +259,14 @@ impl Backend {
.insert_reply_surb(StoredReplySurb::new(sender_id, reply_surb))
.await?
}
// TODO: should we also retain the stale ones?
if received_surbs.possibly_stale_left() != 0 {
warn!(
"dropping {} possibly stale surbs for {tag}",
received_surbs.possibly_stale_left()
);
}
}
Ok(())
}
@@ -347,7 +310,6 @@ impl ReplyStorageBackend for Backend {
self.rotate().await?;
self.start_storage_flush().await?;
self.dump_sender_tags(storage.tags_storage_ref()).await?;
self.dump_sender_reply_keys(storage.key_storage_ref())
.await?;
let surbs_ref = storage.surbs_storage_ref();
@@ -364,12 +326,14 @@ impl ReplyStorageBackend for Backend {
.await
}
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError> {
async fn load_surb_storage(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<CombinedReplyStorage, Self::StorageError> {
let reply_keys = self.get_stored_reply_keys().await?;
let tags = self.get_stored_tags().await?;
let reply_surbs = self.get_stored_reply_surbs().await?;
let reply_surbs = self.get_stored_reply_surbs(surb_freshness_cutoff).await?;
Ok(CombinedReplyStorage::load(reply_keys, reply_surbs, tags))
Ok(CombinedReplyStorage::load(reply_keys, reply_surbs))
}
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
@@ -3,6 +3,7 @@
use crate::backend::fs_backend::error::StorageError;
use crate::key_storage::UsedReplyKey;
use crate::ReceivedReplySurb;
use nym_crypto::generic_array::typenum::Unsigned;
use nym_crypto::Digest;
use nym_sphinx::addressing::clients::{Recipient, RecipientBytes};
@@ -12,6 +13,8 @@ use nym_sphinx::anonymous_replies::{
ReplySurb, ReplySurbWithKeyRotation, SurbEncryptionKey, SurbEncryptionKeySize,
};
use nym_sphinx::params::{ReplySurbKeyDigestAlgorithm, SphinxKeyRotation};
use sqlx::FromRow;
use time::OffsetDateTime;
#[derive(Debug, Clone)]
pub struct StoredSenderTag {
@@ -58,11 +61,11 @@ impl TryFrom<StoredSenderTag> for (RecipientBytes, AnonymousSenderTag) {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, FromRow)]
pub struct StoredReplyKey {
pub key_digest: Vec<u8>,
pub reply_key: Vec<u8>,
pub sent_at_timestamp: i64,
pub sent_at: OffsetDateTime,
}
impl StoredReplyKey {
@@ -70,7 +73,7 @@ impl StoredReplyKey {
StoredReplyKey {
key_digest: key_digest.to_vec(),
reply_key: (*reply_key).to_bytes(),
sent_at_timestamp: reply_key.sent_at_timestamp,
sent_at: reply_key.sent_at,
}
}
}
@@ -100,32 +103,30 @@ impl TryFrom<StoredReplyKey> for (EncryptionKeyDigest, UsedReplyKey) {
});
};
Ok((
digest,
UsedReplyKey::new(reply_key, value.sent_at_timestamp),
))
Ok((digest, UsedReplyKey::new(reply_key, value.sent_at)))
}
}
#[derive(FromRow)]
pub struct StoredSurbSender {
pub id: i64,
pub tag: Vec<u8>,
pub last_sent_timestamp: i64,
pub last_sent: OffsetDateTime,
}
impl StoredSurbSender {
pub fn new(tag: AnonymousSenderTag, last_sent_timestamp: i64) -> Self {
pub fn new(tag: AnonymousSenderTag, last_sent: OffsetDateTime) -> Self {
StoredSurbSender {
// for the purposes of STORING data,
// we ignore that field anyway
id: 0,
tag: tag.to_bytes().to_vec(),
last_sent_timestamp,
last_sent,
}
}
}
impl TryFrom<StoredSurbSender> for (AnonymousSenderTag, i64) {
impl TryFrom<StoredSurbSender> for (AnonymousSenderTag, OffsetDateTime) {
type Error = StorageError;
fn try_from(value: StoredSurbSender) -> Result<Self, Self::Error> {
@@ -140,7 +141,7 @@ impl TryFrom<StoredSurbSender> for (AnonymousSenderTag, i64) {
Ok((
AnonymousSenderTag::from_bytes(sender_tag_bytes),
value.last_sent_timestamp,
value.last_sent,
))
}
}
@@ -155,10 +156,10 @@ pub struct StoredReplySurb {
}
impl StoredReplySurb {
pub fn new(reply_surb_sender_id: i64, reply_surb: &ReplySurbWithKeyRotation) -> Self {
pub fn new(reply_surb_sender_id: i64, reply_surb: &ReceivedReplySurb) -> Self {
StoredReplySurb {
reply_surb_sender_id,
reply_surb: reply_surb.inner_reply_surb().to_bytes(),
reply_surb: reply_surb.surb.inner_reply_surb().to_bytes(),
encoded_key_rotation: reply_surb.key_rotation() as u8,
}
}
@@ -5,6 +5,7 @@ use crate::CombinedReplyStorage;
use async_trait::async_trait;
use std::error::Error;
use thiserror::Error;
use time::OffsetDateTime;
// TODO: this should now live inside our wasm/client-core
pub mod browser_backend;
@@ -53,7 +54,10 @@ impl ReplyStorageBackend for Empty {
Ok(())
}
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError> {
async fn load_surb_storage(
&self,
_: OffsetDateTime,
) -> Result<CombinedReplyStorage, Self::StorageError> {
Ok(CombinedReplyStorage::new(
self.min_surb_threshold,
self.max_surb_threshold,
@@ -80,7 +84,10 @@ pub trait ReplyStorageBackend: Sized {
/// (such as surb thresholds)
async fn init_fresh(&mut self, fresh: &CombinedReplyStorage) -> Result<(), Self::StorageError>;
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError>;
async fn load_surb_storage(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<CombinedReplyStorage, Self::StorageError>;
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
Ok(())
@@ -25,12 +25,11 @@ impl CombinedReplyStorage {
pub fn load(
sent_reply_keys: SentReplyKeys,
received_reply_surbs: ReceivedReplySurbsMap,
used_tags: UsedSenderTags,
) -> Self {
CombinedReplyStorage {
sent_reply_keys,
received_reply_surbs,
used_tags,
used_tags: UsedSenderTags::new(),
}
}
@@ -47,8 +47,12 @@ impl SentReplyKeys {
self.inner.data.iter()
}
pub fn retain(&self, f: impl FnMut(&EncryptionKeyDigest, &mut UsedReplyKey) -> bool) {
self.inner.data.retain(f);
}
pub fn insert_multiple(&self, keys: Vec<SurbEncryptionKey>) {
let now = OffsetDateTime::now_utc().unix_timestamp();
let now = OffsetDateTime::now_utc();
for key in keys {
self.insert(UsedReplyKey::new(key, now))
}
@@ -71,15 +75,12 @@ impl SentReplyKeys {
pub struct UsedReplyKey {
key: SurbEncryptionKey,
// the purpose of this field is to perform invalidation at relatively very long intervals
pub sent_at_timestamp: i64,
pub sent_at: OffsetDateTime,
}
impl UsedReplyKey {
pub(crate) fn new(key: SurbEncryptionKey, sent_at_timestamp: i64) -> Self {
UsedReplyKey {
key,
sent_at_timestamp,
}
pub(crate) fn new(key: SurbEncryptionKey, sent_at: OffsetDateTime) -> Self {
UsedReplyKey { key, sent_at }
}
}
+8 -4
View File
@@ -4,8 +4,9 @@
pub use backend::*;
pub use combined::CombinedReplyStorage;
pub use key_storage::SentReplyKeys;
pub use surb_storage::ReceivedReplySurbsMap;
pub use surb_storage::{ReceivedReplySurb, ReceivedReplySurbsMap, RetrievedReplySurb};
pub use tag_storage::UsedSenderTags;
use time::OffsetDateTime;
mod backend;
mod combined;
@@ -29,8 +30,11 @@ where
PersistentReplyStorage { backend }
}
pub async fn load_state_from_backend(&self) -> Result<CombinedReplyStorage, T::StorageError> {
self.backend.load_surb_storage().await
pub async fn load_state_from_backend(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<CombinedReplyStorage, T::StorageError> {
self.backend.load_surb_storage(surb_freshness_cutoff).await
}
pub async fn flush_on_shutdown(
@@ -38,7 +42,7 @@ where
mem_state: CombinedReplyStorage,
mut shutdown: nym_task::TaskClient,
) {
use log::{debug, error, info};
use tracing::{debug, error, info};
debug!("Started PersistentReplyStorage");
if let Err(err) = self.backend.start_storage_session().await {
@@ -1,15 +1,45 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use dashmap::iter::Iter;
use dashmap::iter::{Iter, IterMut};
use dashmap::DashMap;
use log::trace;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::params::SphinxKeyRotation;
use std::cmp::min;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use time::OffsetDateTime;
use tracing::{error, info, trace};
#[derive(Debug)]
pub struct RetrievedReplySurb {
pub(crate) reply_surb: ReceivedReplySurb,
pub(crate) stale_pile: bool,
}
impl RetrievedReplySurb {
pub(crate) fn new_fresh(reply_surb: ReceivedReplySurb) -> Self {
RetrievedReplySurb {
reply_surb,
stale_pile: false,
}
}
pub(crate) fn new_stale(reply_surb: ReceivedReplySurb) -> Self {
RetrievedReplySurb {
reply_surb,
stale_pile: true,
}
}
}
impl From<RetrievedReplySurb> for ReplySurbWithKeyRotation {
fn from(retrieved: RetrievedReplySurb) -> Self {
retrieved.reply_surb.into()
}
}
#[derive(Debug, Clone)]
pub struct ReceivedReplySurbsMap {
@@ -57,17 +87,40 @@ impl ReceivedReplySurbsMap {
self.inner.data.iter()
}
pub fn remove(&self, target: &AnonymousSenderTag) {
self.inner.data.remove(target);
pub fn as_raw_iter_mut(&self) -> IterMut<'_, AnonymousSenderTag, ReceivedReplySurbs> {
self.inner.data.iter_mut()
}
pub fn reset_surbs_last_received_at(&self, target: &AnonymousSenderTag) {
if let Some(mut entry) = self.inner.data.get_mut(target) {
entry.surbs_last_received_at_timestamp = OffsetDateTime::now_utc().unix_timestamp();
fn total_surbs(&self) -> usize {
self.inner
.data
.iter()
.map(|entry| entry.value().data.len())
.sum()
}
pub fn drop_stale_loaded_surbs(&self, cutoff: OffsetDateTime) {
let before = self.total_surbs();
self.inner.data.retain(|_, v| {
if v.surbs_last_received_at() < cutoff {
return false;
}
v.data.retain(|s| s.received_at > cutoff);
!v.data.is_empty()
});
let after = self.total_surbs();
let diff = before - after;
if diff != 0 {
info!("removed {diff} stale reply SURBs")
}
}
pub fn surbs_last_received_at(&self, target: &AnonymousSenderTag) -> Option<i64> {
pub fn retain(&self, f: impl FnMut(&AnonymousSenderTag, &mut ReceivedReplySurbs) -> bool) {
self.inner.data.retain(f);
}
pub fn surbs_last_received_at(&self, target: &AnonymousSenderTag) -> Option<OffsetDateTime> {
self.inner
.data
.get(target)
@@ -126,15 +179,25 @@ impl ReceivedReplySurbsMap {
.unwrap_or_default()
}
pub fn available_fresh_surbs(&self, target: &AnonymousSenderTag) -> usize {
self.inner
.data
.get(target)
.map(|entry| entry.fresh_left())
.unwrap_or_default()
}
pub fn contains_surbs_for(&self, target: &AnonymousSenderTag) -> bool {
self.inner.data.contains_key(target)
}
/// Attempt to retrieve the specified number of reply SURBs for the target sender
/// and return the number of SURBs remaining in the storage after the call.
pub fn get_reply_surbs(
&self,
target: &AnonymousSenderTag,
amount: usize,
) -> (Option<Vec<ReplySurbWithKeyRotation>>, usize) {
) -> (Option<Vec<RetrievedReplySurb>>, usize) {
if let Some(mut entry) = self.inner.data.get_mut(target) {
let surbs_left = entry.items_left();
if surbs_left < self.min_surb_threshold() + amount {
@@ -150,34 +213,72 @@ impl ReceivedReplySurbsMap {
pub fn get_reply_surb_ignoring_threshold(
&self,
target: &AnonymousSenderTag,
) -> Option<(Option<ReplySurbWithKeyRotation>, usize)> {
self.inner
.data
.get_mut(target)
.map(|mut s| s.get_reply_surb())
) -> (Option<RetrievedReplySurb>, usize) {
let Some(mut entry) = self.inner.data.get_mut(target) else {
return (None, 0);
};
entry.get_reply_surb()
}
pub fn get_reply_surb(
&self,
target: &AnonymousSenderTag,
) -> Option<(Option<ReplySurbWithKeyRotation>, usize)> {
self.inner.data.get_mut(target).map(|mut entry| {
let surbs_left = entry.items_left();
if surbs_left < self.min_surb_threshold() {
(None, surbs_left)
} else {
entry.get_reply_surb()
}
})
) -> (Option<RetrievedReplySurb>, usize) {
let Some(mut entry) = self.inner.data.get_mut(target) else {
return (None, 0);
};
let surbs_left = entry.items_left();
if surbs_left < self.min_surb_threshold() {
(None, surbs_left)
} else {
entry.get_reply_surb()
}
}
pub fn insert_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
pub fn re_insert_reply_surbs(
&self,
target: &AnonymousSenderTag,
surbs: Vec<RetrievedReplySurb>,
) {
error!("re-inserting {} unused surbs", surbs.len());
let mut entry = self.inner.data.entry(*target).or_insert_with(|| {
// this branch should realistically NEVER happen, but software be software, so let's not crash
error!("attempting to return surbs to no longer existing entry {target}");
ReceivedReplySurbs::new(VecDeque::new())
});
let entry = entry.value_mut();
for returned_surb in surbs.into_iter().rev() {
if returned_surb.stale_pile {
entry.possibly_stale.push_front(returned_surb.reply_surb)
} else {
entry.data.push_front(returned_surb.reply_surb)
}
}
}
pub fn insert_fresh_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
&self,
target: &AnonymousSenderTag,
surbs: I,
) {
if let Some(mut existing_data) = self.inner.data.get_mut(target) {
existing_data.insert_reply_surbs(surbs)
existing_data.insert_fresh_reply_surbs(surbs);
if existing_data.possibly_stale.is_empty() {
return;
}
// if we're above the minimum threshold, remove stale surbs
let threshold = self.min_surb_threshold();
let diff = existing_data.data.len().saturating_sub(threshold);
trace!("will attempt to remove up to {diff} stale surbs");
if diff > 0 {
existing_data.remove_stale_surbs(diff);
}
} else {
let new_entry = ReceivedReplySurbs::new(surbs.into_iter().collect());
self.inner.data.insert(*target, new_entry);
@@ -185,44 +286,102 @@ impl ReceivedReplySurbsMap {
}
}
#[derive(Debug)]
pub struct ReceivedReplySurb {
pub(crate) surb: ReplySurbWithKeyRotation,
pub(crate) received_at: OffsetDateTime,
}
impl From<ReceivedReplySurb> for ReplySurbWithKeyRotation {
fn from(surb: ReceivedReplySurb) -> Self {
surb.surb
}
}
impl ReceivedReplySurb {
pub fn received_at(&self) -> OffsetDateTime {
self.received_at
}
pub fn key_rotation(&self) -> SphinxKeyRotation {
self.surb.key_rotation()
}
}
#[derive(Debug)]
pub struct ReceivedReplySurbs {
// in the future we'd probably want to put extra data here to indicate when the SURBs got received
// so we could invalidate entries from the previous key rotations
data: VecDeque<ReplySurbWithKeyRotation>,
data: VecDeque<ReceivedReplySurb>,
possibly_stale: VecDeque<ReceivedReplySurb>,
pending_reception: u32,
surbs_last_received_at_timestamp: i64,
surbs_last_received_at: OffsetDateTime,
}
impl ReceivedReplySurbs {
fn new(initial_surbs: VecDeque<ReplySurbWithKeyRotation>) -> Self {
ReceivedReplySurbs {
data: initial_surbs,
let mut this = ReceivedReplySurbs {
data: Default::default(),
possibly_stale: Default::default(),
pending_reception: 0,
surbs_last_received_at_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
}
surbs_last_received_at: OffsetDateTime::now_utc(),
};
this.insert_fresh_reply_surbs(initial_surbs);
this
}
#[cfg(all(not(target_arch = "wasm32"), feature = "fs-surb-storage"))]
pub fn new_retrieved(
surbs: Vec<ReplySurbWithKeyRotation>,
surbs_last_received_at_timestamp: i64,
surbs_last_received_at: OffsetDateTime,
) -> ReceivedReplySurbs {
ReceivedReplySurbs {
data: surbs.into(),
let mut this = ReceivedReplySurbs {
data: Default::default(),
possibly_stale: Default::default(),
pending_reception: 0,
surbs_last_received_at_timestamp,
}
surbs_last_received_at,
};
this.insert_fresh_reply_surbs(surbs);
this.surbs_last_received_at = surbs_last_received_at;
this
}
pub fn downgrade_freshness(&mut self) -> usize {
debug_assert!(self.possibly_stale.is_empty());
std::mem::swap(&mut self.data, &mut self.possibly_stale);
self.possibly_stale.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty() && self.possibly_stale.is_empty()
}
#[cfg(all(not(target_arch = "wasm32"), feature = "fs-surb-storage"))]
pub fn surbs_ref(&self) -> &VecDeque<ReplySurbWithKeyRotation> {
pub fn surbs_ref(&self) -> &VecDeque<ReceivedReplySurb> {
&self.data
}
pub fn surbs_last_received_at(&self) -> i64 {
self.surbs_last_received_at_timestamp
pub fn retain_fresh_surbs(&mut self, f: impl FnMut(&ReceivedReplySurb) -> bool) {
self.data.retain(f);
}
pub fn retain_possibly_stale_surbs(&mut self, f: impl FnMut(&ReceivedReplySurb) -> bool) {
self.possibly_stale.retain(f);
}
pub fn fresh_left(&self) -> usize {
self.data.len()
}
pub fn possibly_stale_left(&self) -> usize {
self.possibly_stale.len()
}
pub fn drop_possibly_stale_surbs(&mut self) {
self.possibly_stale = VecDeque::new();
}
pub fn surbs_last_received_at(&self) -> OffsetDateTime {
self.surbs_last_received_at
}
pub fn pending_reception(&self) -> u32 {
@@ -243,39 +402,78 @@ impl ReceivedReplySurbs {
self.pending_reception = 0;
}
pub fn get_reply_surbs(
&mut self,
amount: usize,
) -> (Option<Vec<ReplySurbWithKeyRotation>>, usize) {
/// Attempt to retrieve the specified number of reply SURBs (if at least that many are present)
/// and return the number of SURBs remaining in the storage after the call.
pub fn get_reply_surbs(&mut self, amount: usize) -> (Option<Vec<RetrievedReplySurb>>, usize) {
if self.items_left() < amount {
(None, self.items_left())
} else {
let surbs = self.data.drain(..amount).collect();
(Some(surbs), self.items_left())
let available_fresh = self.fresh_left();
// prefer the 'fresh' data if available. otherwise fallback to the possibly stale entries
let mut reply_surbs = Vec::with_capacity(amount);
let fresh_to_retrieve = min(available_fresh, amount);
for surb in self.data.drain(..fresh_to_retrieve) {
reply_surbs.push(RetrievedReplySurb::new_fresh(surb))
}
if available_fresh < amount {
let stale_to_retrieve = amount - fresh_to_retrieve;
for surb in self.possibly_stale.drain(..stale_to_retrieve) {
reply_surbs.push(RetrievedReplySurb::new_stale(surb))
}
}
(Some(reply_surbs), self.items_left())
}
}
pub fn get_reply_surb(&mut self) -> (Option<ReplySurbWithKeyRotation>, usize) {
pub fn get_reply_surb(&mut self) -> (Option<RetrievedReplySurb>, usize) {
(self.pop_surb(), self.items_left())
}
fn pop_surb(&mut self) -> Option<ReplySurbWithKeyRotation> {
self.data.pop_front()
fn pop_surb(&mut self) -> Option<RetrievedReplySurb> {
// prefer the 'fresh' data if available. otherwise fallback to the possibly stale entries
if let Some(fresh) = self.data.pop_front() {
return Some(RetrievedReplySurb::new_fresh(fresh));
}
if let Some(stale) = self.possibly_stale.pop_front() {
return Some(RetrievedReplySurb::new_stale(stale));
}
None
}
fn items_left(&self) -> usize {
self.data.len()
self.data.len() + self.possibly_stale.len()
}
pub fn remove_stale_surbs(&mut self, amount: usize) {
// remove up to amount number of possibly stale surbs
let amount = min(amount, self.possibly_stale.len());
self.possibly_stale.drain(..amount);
}
// realistically we're always going to be getting multiple surbs at once
pub fn insert_reply_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
pub(crate) fn insert_fresh_reply_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
&mut self,
surbs: I,
) {
let mut v = surbs.into_iter().collect::<VecDeque<_>>();
let received_at = OffsetDateTime::now_utc();
let mut v = surbs
.into_iter()
.map(|surb| ReceivedReplySurb { surb, received_at })
.collect::<VecDeque<_>>();
if v.is_empty() {
return;
}
trace!("storing {} surbs in the storage", v.len());
self.data.append(&mut v);
self.surbs_last_received_at_timestamp = OffsetDateTime::now_utc().unix_timestamp();
self.surbs_last_received_at = received_at;
trace!("we now have {} surbs!", self.data.len());
}
}
@@ -201,7 +201,7 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
debug!(
"Attemting to establish connection to gateway at: {}",
"Attempting to establish connection to gateway at: {}",
self.gateway_address
);
let (ws_stream, _) = connect_async(
@@ -272,7 +272,7 @@ impl<C, St> GatewayClient<C, St> {
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.shared_key() {
let encrypted = message.encrypt(&*shared_key)?;
Box::pin(self.send_websocket_message(encrypted)).await?;
Box::pin(self.send_websocket_message_without_response(encrypted)).await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
@@ -330,9 +330,80 @@ impl<C, St> GatewayClient<C, St> {
}
}
/// Attempt to send a websocket message to the gateway without waiting for any response
async fn send_websocket_message_without_response(
&mut self,
msg: impl Into<Message>,
) -> Result<(), GatewayClientError> {
match self.connection {
SocketState::Available(ref mut conn) => Ok(conn.send(msg.into()).await?),
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
if let Err(err) = partially_delegated.send_without_response(msg.into()).await {
error!("failed to send message without response - {err}...");
// we must ensure we do not leave the task still active
if let Err(err) = self.recover_socket_connection().await {
error!("... and the delegated stream has also errored out - {err}")
}
Err(err)
} else {
Ok(())
}
}
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
_ => Err(GatewayClientError::ConnectionInInvalidState),
}
}
// A very nasty hack due to lack of id tags on messages - send a non-sphinx packet websocket
// message and wait until first non 'Send' response within timeout
pub async fn send_websocket_message_with_non_send_response(
&mut self,
msg: impl Into<Message>,
) -> Result<ServerResponse, GatewayClientError> {
let should_restart_mixnet_listener = if self.connection.is_partially_delegated() {
self.recover_socket_connection().await?;
true
} else {
false
};
let conn = match self.connection {
SocketState::Available(ref mut conn) => conn,
SocketState::NotConnected => return Err(GatewayClientError::ConnectionNotEstablished),
_ => return Err(GatewayClientError::ConnectionInInvalidState),
};
conn.send(msg.into()).await?;
let timeout = sleep(self.cfg.connection.response_timeout_duration);
tokio::pin!(timeout);
let response = loop {
tokio::select! {
_ = &mut timeout => {
break Err(GatewayClientError::Timeout);
}
// note: the below will also listen for shutdown signals
msg = self.read_control_response() => {
match msg {
Ok(res) => if !res.is_send() {
break Ok(res);
},
Err(err) => break Err(err),
}
}
}
};
if should_restart_mixnet_listener {
self.start_listening_for_mixnet_messages()?;
}
response
}
/// Attempt to send a websocket message to the gateway and wait until we receive a response.
// If we want to send a message (with response), we need to have a full control over the socket,
// as we need to be able to write the request and read the subsequent response
pub async fn send_websocket_message(
pub async fn send_websocket_message_with_response(
&mut self,
msg: impl Into<Message>,
) -> Result<ServerResponse, GatewayClientError> {
@@ -387,29 +458,6 @@ impl<C, St> GatewayClient<C, St> {
}
}
async fn send_websocket_message_without_response(
&mut self,
msg: Message,
) -> Result<(), GatewayClientError> {
match self.connection {
SocketState::Available(ref mut conn) => Ok(conn.send(msg).await?),
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
if let Err(err) = partially_delegated.send_without_response(msg).await {
error!("failed to send message without response - {err}...");
// we must ensure we do not leave the task still active
if let Err(err) = self.recover_socket_connection().await {
error!("... and the delegated stream has also errored out - {err}")
}
Err(err)
} else {
Ok(())
}
}
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
_ => Err(GatewayClientError::ConnectionInInvalidState),
}
}
fn check_gateway_protocol(
&self,
gateway_protocol: Option<u8>,
@@ -535,7 +583,10 @@ impl<C, St> GatewayClient<C, St> {
.encrypt(legacy_key)?;
info!("sending upgrade request and awaiting the acknowledgement back");
let (ciphertext, nonce) = match self.send_websocket_message(upgrade_request).await? {
let (ciphertext, nonce) = match self
.send_websocket_message_with_response(upgrade_request)
.await?
{
ServerResponse::EncryptedResponse { ciphertext, nonce } => (ciphertext, nonce),
ServerResponse::Error { message } => {
return Err(GatewayClientError::GatewayError(message))
@@ -567,7 +618,7 @@ impl<C, St> GatewayClient<C, St> {
&mut self,
msg: ClientControlRequest,
) -> Result<(), GatewayClientError> {
match self.send_websocket_message(msg).await? {
match self.send_websocket_message_with_response(msg).await? {
ServerResponse::Authenticate {
protocol_version,
status,
@@ -717,13 +768,16 @@ impl<C, St> GatewayClient<C, St> {
}
}
/// Attempt to retrieve the currently supported gateway protocol version of the remote.
pub async fn get_gateway_protocol(&mut self) -> Result<u8, GatewayClientError> {
if !self.connection.is_established() {
return Err(GatewayClientError::ConnectionNotEstablished);
}
match self
.send_websocket_message(ClientControlRequest::SupportedProtocol {})
.send_websocket_message_with_non_send_response(
ClientControlRequest::SupportedProtocol {},
)
.await?
{
ServerResponse::SupportedProtocol { version } => Ok(version),
@@ -740,7 +794,10 @@ impl<C, St> GatewayClient<C, St> {
credential,
self.shared_key.as_ref().unwrap(),
)?;
let bandwidth_remaining = match self.send_websocket_message(msg).await? {
let bandwidth_remaining = match self
.send_websocket_message_with_non_send_response(msg)
.await?
{
ServerResponse::Bandwidth { available_total } => Ok(available_total),
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
ServerResponse::TypedError { error } => {
@@ -758,7 +815,10 @@ impl<C, St> GatewayClient<C, St> {
async fn try_claim_testnet_bandwidth(&mut self) -> Result<(), GatewayClientError> {
let msg = ClientControlRequest::ClaimFreeTestnetBandwidth;
let bandwidth_remaining = match self.send_websocket_message(msg).await? {
let bandwidth_remaining = match self
.send_websocket_message_with_non_send_response(msg)
.await?
{
ServerResponse::Bandwidth { available_total } => Ok(available_total),
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
other => Err(GatewayClientError::UnexpectedResponse { name: other.name() }),
@@ -337,7 +337,7 @@ impl PartiallyDelegatedHandle {
// check if the split stream didn't error out
let receive_res = stream_receiver
.try_recv()
.expect("stream sender was somehow dropped without sending anything!");
.map_err(|_| GatewayClientError::ConnectionAbruptlyClosed)?;
if let Some(res) = receive_res {
let _res = res?;
+10 -1
View File
@@ -28,6 +28,7 @@ pub struct Config {
pub maximum_reconnection_backoff: Duration,
pub initial_connection_timeout: Duration,
pub maximum_connection_buffer_size: usize,
pub use_legacy_packet_encoding: bool,
}
impl Config {
@@ -36,12 +37,14 @@ impl Config {
maximum_reconnection_backoff: Duration,
initial_connection_timeout: Duration,
maximum_connection_buffer_size: usize,
use_legacy_packet_encoding: bool,
) -> Self {
Config {
initial_reconnection_backoff,
maximum_reconnection_backoff,
initial_connection_timeout,
maximum_connection_buffer_size,
use_legacy_packet_encoding,
}
}
}
@@ -267,7 +270,12 @@ impl SendWithoutResponse for Client {
fn send_without_response(&self, packet: MixPacket) -> io::Result<()> {
let address = packet.next_hop_address();
trace!("Sending packet to {address}");
let framed_packet = FramedNymPacket::from(packet);
// TODO: optimisation for the future: rather than constantly using legacy encoding,
// once we're addressing by node_id (and thus have full node info here),
// we could simply infer supported encoding based on their version
let framed_packet =
FramedNymPacket::from_mix_packet(packet, self.config.use_legacy_packet_encoding);
let Some(sender) = self.active_connections.get_mut(&address) else {
// there was never a connection to begin with
@@ -328,6 +336,7 @@ mod tests {
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
use_legacy_packet_encoding: false,
},
NoiseConfig::new(
Arc::new(x25519::KeyPair::new(&mut rng)),
@@ -5,8 +5,8 @@ use crate::nyxd::{self, NyxdClient};
use crate::signing::direct_wallet::DirectSecp256k1HdWallet;
use crate::signing::signer::{NoSigner, OfflineSigner};
use crate::{
nym_api, DirectSigningReqwestRpcValidatorClient, QueryReqwestRpcValidatorClient,
ReqwestRpcClient, ValidatorClientError,
DirectSigningReqwestRpcValidatorClient, QueryReqwestRpcValidatorClient, ReqwestRpcClient,
ValidatorClientError,
};
use nym_api_requests::ecash::models::{
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
@@ -72,7 +72,7 @@ macro_rules! collect_paged_skimmed_v2 {
.$f(false, Some(page), None, $self.use_bincode)
.await?;
if metadata != res.metadata {
if !metadata.consistency_check(&res.metadata) {
return Err(ValidatorClientError::InconsistentPagedMetadata);
}
@@ -153,7 +153,7 @@ impl Config {
pub struct Client<C, S = NoSigner> {
// ideally they would have been read-only, but unfortunately rust doesn't have such features
// #[deprecated(note = "please use `nym_api_client` instead")]
pub nym_api: nym_api::Client,
pub nym_api: nym_http_api_client::Client,
// pub nym_api_client: NymApiClient,
pub nyxd: NyxdClient<C, S>,
}
@@ -214,7 +214,7 @@ impl Client<ReqwestRpcClient> {
impl<C> Client<C> {
pub fn new_with_rpc_client(config: Config, rpc_client: C) -> Self {
let nym_api_client = nym_api::Client::new(config.api_url.clone(), None);
let nym_api_client = nym_http_api_client::Client::new(config.api_url.clone(), None);
Client {
nym_api: nym_api_client,
@@ -228,7 +228,7 @@ impl<C, S> Client<C, S> {
where
S: OfflineSigner,
{
let nym_api_client = nym_api::Client::new(config.api_url.clone(), None);
let nym_api_client = nym_http_api_client::Client::new(config.api_url.clone(), None);
Client {
nym_api: nym_api_client,
@@ -385,38 +385,25 @@ impl<C, S> Client<C, S> {
}
}
/// DEPRECATED: Use nym_http_api_client::Client with from_network() or with_bincode() instead
#[deprecated(
since = "1.2.0",
note = "Use nym_http_api_client::Client::from_network() or ClientBuilder::with_bincode() instead"
)]
#[derive(Clone)]
pub struct NymApiClient {
pub use_bincode: bool,
pub nym_api: nym_api::Client,
pub nym_api: nym_http_api_client::Client,
// TODO: perhaps if we really need it at some (currently I don't see any reasons for it)
// we could re-implement the communication with the REST API on port 1317
}
impl From<nym_api::Client> for NymApiClient {
fn from(nym_api: nym_api::Client) -> Self {
NymApiClient {
use_bincode: false,
nym_api,
}
}
}
// we have to allow the use of deprecated method here as they're calling the deprecated trait methods
#[allow(deprecated)]
impl NymApiClient {
pub fn new(api_url: Url) -> Self {
let nym_api = nym_api::Client::new(api_url, None);
NymApiClient {
use_bincode: true,
nym_api,
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn new_with_timeout(api_url: Url, timeout: std::time::Duration) -> Self {
let nym_api = nym_api::Client::new(api_url, Some(timeout));
let nym_api = nym_http_api_client::Client::new(api_url, Some(timeout));
NymApiClient {
use_bincode: true,
@@ -431,10 +418,10 @@ impl NymApiClient {
}
pub fn new_with_user_agent(api_url: Url, user_agent: impl Into<UserAgent>) -> Self {
let nym_api = nym_api::Client::builder::<_, ValidatorClientError>(api_url)
let nym_api = nym_http_api_client::Client::builder(api_url)
.expect("invalid api url")
.with_user_agent(user_agent.into())
.build::<ValidatorClientError>()
.build()
.expect("failed to build nym api client");
NymApiClient {
@@ -471,12 +458,12 @@ impl NymApiClient {
pub async fn get_all_basic_entry_assigned_nodes(
&self,
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
self.get_all_basic_entry_assigned_nodes_v2()
self.get_all_basic_entry_assigned_nodes_with_metadata()
.await
.map(|res| res.nodes)
}
pub async fn get_all_basic_entry_assigned_nodes_v2(
pub async fn get_all_basic_entry_assigned_nodes_with_metadata(
&self,
) -> Result<SkimmedNodesWithMetadata, ValidatorClientError> {
collect_paged_skimmed_v2!(self, get_basic_entry_assigned_nodes_v2)
@@ -719,10 +706,11 @@ impl NymApiClient {
pub async fn partial_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<PartialExpirationDateSignatureResponse, ValidatorClientError> {
Ok(self
.nym_api
.partial_expiration_date_signatures(expiration_date)
.partial_expiration_date_signatures(expiration_date, epoch_id)
.await?)
}
@@ -739,10 +727,11 @@ impl NymApiClient {
pub async fn global_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<AggregatedExpirationDateSignatureResponse, ValidatorClientError> {
Ok(self
.nym_api
.global_expiration_date_signatures(expiration_date)
.global_expiration_date_signatures(expiration_date, epoch_id)
.await?)
}
@@ -3,7 +3,6 @@
use crate::nyxd::contract_traits::{DkgQueryClient, PagedDkgQueryClient};
use crate::nyxd::error::NyxdError;
use crate::NymApiClient;
use nym_coconut_dkg_common::types::{EpochId, NodeIndex};
use nym_coconut_dkg_common::verification_key::ContractVKShare;
use nym_compact_ecash::error::CompactEcashError;
@@ -15,7 +14,7 @@ use url::Url;
// TODO: it really doesn't feel like this should live in this crate.
#[derive(Clone)]
pub struct EcashApiClient {
pub api_client: NymApiClient,
pub api_client: nym_http_api_client::Client,
pub verification_key: VerificationKeyAuth,
pub node_id: NodeIndex,
pub cosmos_address: cosmrs::AccountId,
@@ -25,10 +24,15 @@ impl Display for EcashApiClient {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[id: {}] {} @ {}",
"[id: {}] {} @ ({})",
self.node_id,
self.cosmos_address,
self.api_client.api_url()
self.api_client
.base_urls()
.iter()
.map(|url| url.to_string())
.collect::<Vec<String>>()
.join(", ")
)
}
}
@@ -60,6 +64,9 @@ pub enum EcashApiError {
source: CompactEcashError,
},
#[error("failed to create API client: {0}")]
ClientError(String),
#[error("the provided account address is malformed: {source}")]
MalformedAccountAddress {
#[from]
@@ -89,8 +96,13 @@ impl TryFrom<ContractVKShare> for EcashApiClient {
// In non-client applications this resolver can cause warning logs about H2 connection
// failure. This indicates that the long lived https connection was closed by the remote
// peer and the resolver will have to reconnect. It should not impact actual functionality
let api_client = nym_http_api_client::Client::builder(url_address)
.map_err(|e| EcashApiError::ClientError(e.to_string()))?
.build()
.map_err(|e| EcashApiError::ClientError(e.to_string()))?;
Ok(EcashApiClient {
api_client: NymApiClient::new(url_address),
api_client,
verification_key: VerificationKeyAuth::try_from_bs58(&share.share)?,
node_id: share.node_index,
cosmos_address: share.owner.as_str().parse()?,
@@ -1,7 +1,8 @@
use crate::nym_api::NymApiClientExt;
use crate::nyxd::contract_traits::MixnetQueryClient;
use crate::nyxd::error::NyxdError;
use crate::nyxd::Config as ClientConfig;
use crate::{NymApiClient, QueryHttpRpcNyxdClient, ValidatorClientError};
use crate::{QueryHttpRpcNyxdClient, ValidatorClientError};
use colored::Colorize;
use core::fmt;
use itertools::Itertools;
@@ -87,8 +88,17 @@ fn setup_connection_tests<H: BuildHasher + 'static>(
}
});
let api_connection_test_clients = api_urls.map(|(network, url)| {
ClientForConnectionTest::Api(network, url.clone(), NymApiClient::new(url))
let api_connection_test_clients = api_urls.filter_map(|(network, url)| {
match nym_http_api_client::Client::builder(url.clone()).and_then(|b| b.build()) {
Ok(client) => Some(ClientForConnectionTest::Api(network, url, client)),
Err(err) => {
eprintln!(
"Failed to create API client for {}: {err}",
network.network_name
);
None
}
}
});
nyxd_connection_test_clients.chain(api_connection_test_clients)
@@ -160,7 +170,7 @@ async fn test_nyxd_connection(
async fn test_nym_api_connection(
network: NymNetworkDetails,
url: &Url,
client: &NymApiClient,
client: &nym_http_api_client::Client,
) -> ConnectionResult {
let result = match timeout(
Duration::from_secs(CONNECTION_TEST_TIMEOUT_SEC),
@@ -186,7 +196,7 @@ async fn test_nym_api_connection(
enum ClientForConnectionTest {
Nyxd(NymNetworkDetails, Url, Box<QueryHttpRpcNyxdClient>),
Api(NymNetworkDetails, Url, NymApiClient),
Api(NymNetworkDetails, Url, nym_http_api_client::Client),
}
impl ClientForConnectionTest {
@@ -14,7 +14,6 @@ pub mod signing;
pub use crate::error::ValidatorClientError;
pub use crate::rpc::reqwest::ReqwestRpcClient;
pub use crate::signing::direct_wallet::DirectSecp256k1HdWallet;
pub use client::NymApiClient;
pub use client::{Client, Config, EcashApiClient};
pub use nym_api_requests::*;
pub use nym_http_api_client::UserAgent;

Some files were not shown because too many files have changed in this diff Show More