Compare commits

..

161 Commits

Author SHA1 Message Date
mfahampshire 499b1c7400 license 2025-01-08 16:20:54 +01:00
mfahampshire 5b536eeaf6 proper disconnect signal management 2025-01-08 16:20:46 +01:00
mfahampshire 45617cd218 proper disconnect signals 2025-01-08 16:20:29 +01:00
mfahampshire 3f90de1790 update lock 2025-01-08 16:20:14 +01:00
mfahampshire e638f87729 first pass done 2025-01-08 16:20:03 +01:00
mfahampshire d86286ef54 trying to make run_with_shutdown killable 2025-01-07 14:42:51 +01:00
mfahampshire b73d66e073 rearrange code, comment for rewrite of run() 2025-01-06 19:42:25 +01:00
mfahampshire 378ed88714 comments 2025-01-06 19:41:41 +01:00
mfahampshire 3e1ea8e855 remove comments 2025-01-06 19:41:01 +01:00
mfahampshire 50191cf80b re-add commented out code fr sharing 2025-01-06 13:06:37 +01:00
mfahampshire 54d7b37f10 debugging failing lock quiring 2025-01-03 18:28:28 +01:00
mfahampshire 8dda5fba5f comment 2025-01-03 13:35:40 +01:00
mfahampshire 93f063bb01 added test start fn to client_pool: uses specified gw for all clients 2025-01-03 13:35:40 +01:00
mfahampshire 376eca688a cancel token fixes ongoing 2025-01-03 13:35:40 +01:00
mfahampshire 54cec265e6 add dir creation + time dir to results 2025-01-03 13:35:40 +01:00
mfahampshire 20697fd5fc remove runs 2025-01-03 13:35:40 +01:00
mfahampshire a87753f1ee remove scratch 2025-01-03 13:35:40 +01:00
mfahampshire 5e9d25364f cancel token 2025-01-03 13:35:40 +01:00
mfahampshire 49e1134722 cargo + todo list 2025-01-03 13:35:40 +01:00
mfahampshire ffcc4e8fa5 first pass tester 2025-01-03 13:35:40 +01:00
mfahampshire 0337c04844 fixed test: needed proxiedMessage format 2025-01-03 13:35:40 +01:00
mfahampshire ec77f1e7b4 minor log tweak 2025-01-03 13:35:40 +01:00
mfahampshire 1cd42bde0b working on script fr testing all 2025-01-03 13:35:40 +01:00
mfahampshire 6606fc8cc4 work on test 2025-01-03 13:35:40 +01:00
mfahampshire eef3405777 logging tweak 2025-01-03 13:35:40 +01:00
mfahampshire 2d7957a83b remove active conn 2025-01-03 13:35:40 +01:00
mfahampshire 60e8f8254e fix test 2025-01-03 13:35:39 +01:00
mfahampshire 7066c115e5 deps 2025-01-03 13:35:39 +01:00
mfahampshire fb63555ab0 add metric getter + debug print 2025-01-03 13:35:39 +01:00
mfahampshire 814aeb5e7c clap standalone echo_server 2025-01-03 13:35:39 +01:00
mfahampshire 45bef0da68 temp commit 2025-01-03 13:35:39 +01:00
mfahampshire 060f785ff0 temp commit 2025-01-03 13:35:39 +01:00
mfahampshire a9ef20914e temp commit 2025-01-03 13:35:39 +01:00
mfahampshire 9761644d08 tweaks to lib 2025-01-03 13:35:39 +01:00
mfahampshire b6b8ec41eb first pass all fns 2025-01-03 13:35:39 +01:00
mfahampshire d7d2aaff68 comment 2025-01-03 13:35:39 +01:00
mfahampshire e2114dd9e5 add disconnect 2025-01-03 13:35:39 +01:00
mfahampshire d247c88a74 cargo fix 2025-01-03 13:35:39 +01:00
mfahampshire 2fc359f58f first pass echo server lib 2025-01-03 13:35:39 +01:00
mfahampshire 0f35f5c909 add configurable gw to server 2025-01-03 13:35:39 +01:00
mfahampshire 3fdaa7bac3 future is now 2025-01-03 13:35:39 +01:00
mfahampshire 16c3e5ff9a updated comment in example 2025-01-03 13:35:39 +01:00
mfahampshire 4f5d86a50c tweaked text grammar 2025-01-03 13:35:39 +01:00
mfahampshire 9d8f14ddec fix test 2025-01-03 13:35:39 +01:00
mfahampshire 72482180b9 version bump 2025-01-03 13:35:39 +01:00
mfahampshire d99cb96994 version bump 2025-01-03 13:35:39 +01:00
mfahampshire 99d2a807dd fix clippy again 2025-01-03 13:35:39 +01:00
mfahampshire 9400140f69 fix clippy again 2025-01-03 13:35:39 +01:00
mfahampshire f78bb83eab fix clippy 2025-01-03 13:35:39 +01:00
mfahampshire 349e95aa07 remove clone 2025-01-03 13:35:39 +01:00
mfahampshire 5cc5484297 clone impl 2025-01-03 13:35:39 +01:00
mfahampshire 8864921260 update command help 2025-01-03 13:35:39 +01:00
mfahampshire 7608ca768f comment on ffi 2025-01-03 13:35:39 +01:00
mfahampshire 7fa1cdba93 remove comments for future ffi push + lower default pool size from 4 to 2 2025-01-03 13:35:39 +01:00
mfahampshire d59f05f9d4 client pool docs 2025-01-03 13:35:39 +01:00
mfahampshire f819b33628 update example files tcpproxy 2025-01-03 13:35:39 +01:00
mfahampshire 7d2b1ca9a8 added disconnect thread 2025-01-03 13:35:39 +01:00
mfahampshire 08f5749887 add clone 2025-01-03 13:35:39 +01:00
mfahampshire 3573d1e6a6 comments 2025-01-03 13:35:39 +01:00
mfahampshire 1015251bd9 comments 2025-01-03 13:35:39 +01:00
mfahampshire 0492d84ea8 add cancel token disconnect fn 2025-01-03 13:35:39 +01:00
mfahampshire 70c1e4fd85 update mod file 2025-01-03 13:35:39 +01:00
mfahampshire 5047ed2722 added disconnect to client pool 2025-01-03 13:35:39 +01:00
mfahampshire 8630cefddb client pool example done 2025-01-03 13:35:39 +01:00
mfahampshire 68a1d43cbf comments 2025-01-03 13:35:39 +01:00
mfahampshire fedaf00375 clippy 2025-01-03 13:35:39 +01:00
mfahampshire 7660f007c8 client pool example 2025-01-03 13:35:39 +01:00
mfahampshire 747675f52f client_pool.rs mod 2025-01-03 13:35:39 +01:00
mfahampshire c7b3bd1608 minor comments 2025-01-03 13:35:39 +01:00
mfahampshire 6fc231821b some inline comments 2025-01-03 13:35:39 +01:00
mfahampshire fa6efbaa2a comments to examples 2025-01-03 13:35:39 +01:00
mfahampshire e9ade22b10 slightly dropped default decay time 2025-01-03 13:35:39 +01:00
mfahampshire 731f69acc3 add duplicate packets received to troubleshooting 2025-01-03 13:35:39 +01:00
mfahampshire a0e55f49d4 temp commit 2025-01-03 13:29:55 +01:00
mfahampshire 7563cccf49 bump default decay time (again) 2025-01-03 13:29:55 +01:00
mfahampshire 95274e3883 bump default decay time 2025-01-03 13:29:55 +01:00
mfahampshire c8ca174b87 updated code commnet 2025-01-03 13:29:55 +01:00
mfahampshire 4eccc59f6f bump default decay time 2025-01-03 13:29:55 +01:00
mfahampshire 56becc3a34 minor tweaks 2025-01-03 13:29:55 +01:00
mfahampshire 13a6f0b5ef logging change 2025-01-03 13:29:55 +01:00
mfahampshire 4f1bac204c cancel token 2025-01-03 13:29:55 +01:00
mfahampshire f87f57bddd first pass spin out client_pool 2025-01-03 13:29:55 +01:00
mfahampshire c49f7f91ed first version working 2025-01-03 13:29:55 +01:00
mfahampshire 8c8e085796 added notes for next features 2025-01-03 13:29:55 +01:00
mfahampshire 4bb05f3698 err handling conpool start 2025-01-03 13:29:55 +01:00
mfahampshire 8fcea5f350 removed a bunch of commenting from example 2025-01-03 13:29:55 +01:00
mfahampshire 42739597ed removed a bunch of commenting from example 2025-01-03 13:29:55 +01:00
mfahampshire 699aa6116a comments 2025-01-03 13:29:55 +01:00
mfahampshire 4519bd6571 remove comments and reduce logging verbosity 2025-01-03 13:29:55 +01:00
mfahampshire 5083b420ec remove double accounting for moment 2025-01-03 13:29:55 +01:00
mfahampshire 6f4421dc4a trying to get disconnect to remove from pool 2025-01-03 13:29:55 +01:00
mfahampshire 415c30fcc9 first pass disconnect 2025-01-03 13:29:55 +01:00
mfahampshire 8e8eceb894 fix flipped lowerthan waiting for pool 2025-01-03 13:29:55 +01:00
mfahampshire 546482916d first pass connpool 2025-01-03 13:29:55 +01:00
mfahampshire 530d744d6d make default decay const 2025-01-03 13:29:55 +01:00
mfahampshire 2e8e9a4c0a tweak existing conn tracker logic 2025-01-03 13:29:55 +01:00
mfahampshire 109de83128 tweak 2025-01-03 13:29:55 +01:00
mfahampshire 0c36e64526 tweak 2025-01-03 13:29:55 +01:00
mfahampshire d423cde71c changed logging 2025-01-03 13:29:55 +01:00
mfahampshire 1130d0fe3e name change + specific inc and dec logging 2025-01-03 13:29:55 +01:00
mfahampshire f53d2aa4cd small tweak to tcp tracker 2025-01-03 13:29:55 +01:00
mfahampshire c406814886 tcp conn tracker 2025-01-03 13:29:55 +01:00
mfahampshire 4d3d60b78e tweak format (#5295)
* tweak format

* auto commit generated command files

* auto commit generated command files

* push components

* edit migration page (#5303)

---------

Co-authored-by: import this <97586125+serinko@users.noreply.github.com>
2024-12-23 11:51:24 +00:00
dynco-nym 41fb17a31b Extend swagger docs (#5235)
* WIP adding derive(ToSchema)

* Derive ToSchema for more types

* ContractBuildInformation on /nym_contracts_detailed

* rustfmt

* Add cfg_attr

* A bunch of annotations

* Compiles with utoipa 5.2

* WIP

* Post rebase fixes

* Gitattributes to ignore .sqlx diffs

* generate Sqlx schema files

* Improvements

* Move ecash schema out of ecash crate

* Move redocly config to nym-api/

* Move redocly config to nym-api/

* Remove ErrorResponse

* Move generated openapi spec to .gitignore

* Include BSL licence

* Remove utoipa from ecash toml file

* Remove placeholder annotations

* Chain-watcher rebase changes

* Update licence info

* Treat Scalar as String in OpenAPI
2024-12-20 12:18:45 +01:00
Jędrzej Stuczyński 7d5e3ef7d3 feature: expand nym-node prometheus metrics (#5298)
* fixed bearer auth for prometheus route

* basic prometheus metrics

* added rates on global values

* improved structure on the prometheus metrics

* added additional metrics for ingress websockets and egress mixnet connections

* some channel business metrics

* fixed metrics registration and added additional variants

* added counter for number of disk persisted packets

* counter for pending egress packets

* counter for pending egress forward packets

* clippy
2024-12-20 10:32:56 +00:00
Jon Häggblad 4f283f565c Add assignes for the root cargo ecosystem (#5297) 2024-12-20 01:16:39 +01:00
Tommy Verrall 2fab3f11b6 Merge pull request #5274 from nymtech/feature/nyx-chain-watcher
Nyx Chain Watcher
2024-12-19 17:34:36 +00:00
Sachin Kamath d0722e5f63 chain-watcher: try fix windows path 2024-12-19 21:07:50 +05:30
Sachin Kamath 64373548e4 chain-watcher: windows workaround for db path, add sqlx 2024-12-19 20:30:11 +05:30
Sachin Kamath bad85abff3 chain-watcher: bump version 2024-12-19 14:10:28 +00:00
Sachin Kamath 6e66cc2467 validator-rewarder: fix errors 2024-12-19 14:10:28 +00:00
Sachin Kamath c805aa79a4 nyx-chain-watcher: fallback to env variable when reading config 2024-12-19 14:10:28 +00:00
Mark Sinclair f5ca1ee20a Bump version 2024-12-19 14:10:28 +00:00
Sachin Kamath 4f07343efd api: fetch addresses from config. 2024-12-19 14:10:28 +00:00
Mark Sinclair 94ab78606a Bump version 2024-12-19 14:10:28 +00:00
Sachin Kamath 7b92e471c8 bugfix: dont manually set last_processed_height for pruning=nothing strat. 2024-12-19 14:10:28 +00:00
Sachin Kamath a507ffe371 chain-scraper : use tx module for parsing transactions 2024-12-19 14:10:28 +00:00
Mark Sinclair c02e93004f nyx-chain-watcher: return average price over 24 hours 2024-12-19 14:10:28 +00:00
Mark Sinclair 1113e0c599 formatting 2024-12-19 14:10:28 +00:00
Mark Sinclair 06c7394861 change webhook payload to have a structured coin for funds 2024-12-19 14:10:28 +00:00
Mark Sinclair e20bea9d32 bump version 2024-12-19 14:10:28 +00:00
Mark Sinclair eeea32fdca add websocket rpcs to env files 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński b06349efd0 added env variable to nuke the db 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 98a4cb4ae8 even more logs 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński be185824b4 extra logs 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 60e8e53f3b explicitly build websocket client in 0.37 compat mode 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 1890367bfc allow conversion from CometBFT block subscription 2024-12-19 14:10:28 +00:00
Mark Sinclair 2b26a88d6c Bump version 2024-12-19 14:10:28 +00:00
Mark Sinclair a6f4f017c7 Bump version 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński d8a6ca48c1 implemented starting block logic inside the chain scraper itself 2024-12-19 14:10:28 +00:00
Mark Sinclair 541d46e899 Fix docker entry point and bump version 2024-12-19 14:10:28 +00:00
Mark Sinclair 39f525e88e Add Dockerfile and workflow to build 2024-12-19 14:10:28 +00:00
Mark Sinclair 156e892baa parse message index and process all log entries 2024-12-19 14:10:28 +00:00
Mark Sinclair 5b6ae39dab init saves example config 2024-12-19 14:10:28 +00:00
Mark Sinclair df004f834f Add example to README 2024-12-19 14:10:28 +00:00
Mark Sinclair 235165171b Remove migration from seed app 2024-12-19 14:10:28 +00:00
Mark Sinclair 572875058d Add config, overrides and CLI 2024-12-19 14:10:28 +00:00
Mark Sinclair cf6f437187 Move nym-data-observatory (v0) to nyx-chain-watcher 2024-12-19 14:10:28 +00:00
Mark Sinclair 6010de978d data-observatory: renamed transactions to payments because there is already transaction in the base scraper schema 2024-12-19 14:10:28 +00:00
Mark Sinclair d951ea9548 nyxd-scraper: add optional starting height parameter to scrape before listening for new blocks 2024-12-19 14:10:28 +00:00
Sachin Kamath 868d7439ec observatory 0.1 2024-12-19 14:10:28 +00:00
Sachin Kamath a884aee1e9 fix review comments 2024-12-19 14:10:28 +00:00
Sachin Kamath 80f965a104 clippy 2024-12-19 14:10:28 +00:00
Sachin Kamath c99a240ed4 nyxd-scraper: add config to make pre-commit storage optional 2024-12-19 14:10:28 +00:00
Jędrzej Stuczyński 67976b1b30 feature: wireguard metrics (#5278)
* experimental log

* introduce wireguard metrics updates

* add wireguard traffic rates to console logger

* missing import

* changed order of displayed values

* expose bytes information via rest endpoint

* clippy
2024-12-19 10:49:56 +00:00
Jędrzej Stuczyński a2322d6cdf feature: nym topology revamp (#5271)
* revamped NymTopology

* wip

* working e2e client

* updated nym-api

* updated nym-node

* updated rest of non-test code

* updated the rest of the codebase

* additional tweaks

* linux clippy fixes + adding additional dummy ipr types for better linting on non-linux targets
2024-12-19 10:44:34 +00:00
Jędrzej Stuczyński ae346bb75b bugfix: remove unnecessary arguments for nym-api swagger endpoints (#5272)
* removed incorrect body argument for '/rewarded-set' endpoint

* removed incorrect pagination parameters for monitor run results
2024-12-19 10:42:52 +00:00
Jon Häggblad 53c28af847 Add close to credential storage (#5283) (#5293)
* Add close method to credential storage

* wip
2024-12-18 21:51:00 +01:00
Bogdan-Ștefan Neacşu 3521f36374 Include IPINFO_API_TOKEN in nightly CI (#5285)
* Include IPINFO_API_TOKEN in nightly CI

* Fix beta clippy
2024-12-18 16:46:28 +02:00
Bogdan-Ștefan Neacşu 3695332036 Move tun constants to network defaults (#5286) 2024-12-18 15:03:21 +02:00
Jon Häggblad d03302c391 http-api-client: deduplicate code (#5267)
* Deduplicate code

* Remove unneeded async
2024-12-18 12:36:10 +01:00
mfahampshire cd86110b2c Max/crunch patch docs (#5284)
* patch changelog done

* auto commit generated command files
2024-12-18 10:37:45 +00:00
Mark Sinclair ad0c135d4c Bump credential proxy version 2024-12-17 20:35:42 +00:00
Bogdan-Ștefan Neacşu 61e872f033 Add windows to CI builds (#5269)
* Add windows to CI builds

* Fix win build for node status api

* Fix win build for sdk

* Fix win build for cred proxy
2024-12-17 15:18:11 +02:00
dynco-nym b4f51baf94 Change sqlite journal mode to WAL (#5213)
* Change sqlite journal mode to WAL

* Synchronous mode & auto vacuum

* Bump probe git ref to 1.1.0
2024-12-16 16:40:02 +01:00
Drazen Urch a3f3d83c1b Shipping raw metrics to PG (#5216)
* Shipping raw metrics to PG

* Put cancel token back in its place

* fmt
2024-12-16 16:19:37 +01:00
Drazen Urch 84d7004cb2 Add control messages to GatewayTransciver (#5247)
* Add control messages to GatewayTransciver

* Add forget me flag to clients

* CI gate IPIINFO test

* Handle ForgetMe for client and stats db

* fmt
2024-12-16 15:18:04 +01:00
import this be063a36eb syntax hotfix (#5266) 2024-12-16 13:17:38 +00:00
windy-ux 0a712b9fce Fix/web 615 seo setup (#5265)
* + add header into Packet Mixing docs

* + add head changes for testing

* / updated version of metatags in theme.config

* + add env file

* / theme.config to use NEXT_PUBLIC_SITE_URL from env file

* @ Fix broken link in theme.config

* - remove favicon code

* + add desription for intro pages

* + add default book's desriptions

* Revert "+ add desription for intro pages"

This reverts commit 98c78242d4.
2024-12-16 13:17:25 +00:00
Bogdan-Ștefan Neacşu 88d6fb4e22 Add fd callback to client core (#5230)
* Add fd callback to client core

* Include in sdk

* Fix clippy many args

* Method in builder

* Replace Box with Arc
2024-12-16 13:57:34 +02:00
Jon Häggblad 04c2045d94 Add PATCH support to nym-http-api-client (#5260) 2024-12-16 12:28:44 +01:00
382 changed files with 9703 additions and 12450 deletions
+2
View File
@@ -31,3 +31,5 @@ updates:
update-types:
- "patch"
open-pull-requests-limit: 10
assignees:
- "octol"
+1 -1
View File
@@ -30,7 +30,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ arc-ubuntu-20.04, custom-runner-mac-m1 ]
os: [ arc-ubuntu-20.04, custom-windows-11, custom-runner-mac-m1 ]
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
+1
View File
@@ -15,6 +15,7 @@ jobs:
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
IPINFO_API_TOKEN: ${{ secrets.IPINFO_API_TOKEN }}
continue-on-error: true
steps:
- name: Check out repository code
+8 -1
View File
@@ -51,4 +51,11 @@ ppa-private-key.b64
ppa-private-key.asc
nym-network-monitor/topology.json
nym-network-monitor/__pycache__
nym-network-monitor/*.key
nym-network-monitor/*.key
nym-network-monitor/.envrc
nym-network-monitor/.envrc
nym-api/redocly/formatted-openapi.json
*.sqlite
.build
-83
View File
@@ -1,83 +0,0 @@
extends:
- minimal
apis:
nym-api:
root: ./formatted-openapi.json
rules:
# https://redocly.com/docs/cli/rules/oas/operation-summary
operation-summary: off
# https://redocly.com/docs/cli/rules/oas/security-defined
security-defined: off
struct: off
# https://redocly.com/docs/cli/rules/oas/operation-2xx-response
operation-2xx-response: off
# rules:
# skip-warnings: true
# ignore:
# - path: /v1/gateways
# method: get
# - path: /v1/gateways/blacklisted
# method: get
# - path: /v1/mixnodes
# method: get
# - path: /v1/mixnodes/active
# method: get
# - path: /v1/mixnodes/active/detailed
# method: get
# - path: /v1/mixnodes/blacklisted
# method: get
# - path: /v1/mixnodes/detailed
# method: get
# - path: /v1/mixnodes/rewarded
# method: get
# - path: /v1/mixnodes/rewarded/detailed
# method: get
# - path: /v1/gateways/described
# method: get
# # network-monitor-status (deprecated)
# - path: /v1/status/gateway/{identity}/avg_uptime
# method: GET
# - path: /v1/status/gateway/{identity}/core-status-count
# method: GET
# - path: /v1/status/gateway/{identity}/history
# method: GET
# - path: /v1/status/gateway/{identity}/report
# method: GET
# - path: /v1/status/gateways/detailed
# method: GET
# - path: /v1/status/gateways/detailed-unfiltered
# method: GET
# - path: /v1/status/mixnode/{mix_id}/avg_uptime
# method: GET
# - path: /v1/status/mixnode/{mix_id}/compute-reward-estimation
# method: POST
# - path: /v1/status/mixnode/{mix_id}/core-status-count
# method: GET
# - path: /v1/status/mixnode/{mix_id}/history
# method: GET
# - path: /v1/status/mixnode/{mix_id}/report
# method: GET
# - path: /v1/status/mixnode/{mix_id}/reward-estimation
# method: GET
# - path: /v1/status/mixnodes/detailed-unfiltered
# method: GET
# # status
# - path: /v1/status/mixnode/{mix_id}/inclusion-probability
# method: GET
# - path: /v1/status/mixnode/{mix_id}/stake-saturation
# method: GET
# - path: /v1/status/mixnode/{mix_id}/status
# method: GET
# - path: /v1/status/mixnodes/active/detailed
# method: GET
# - path: /v1/status/mixnodes/detailed
# method: GET
# - path: /v1/status/mixnodes/inclusion-probability
# method: GET
# - path: /v1/status/mixnodes/rewarded/detailed
# method: GET
# # unstable nym nodes
# - path: /v1/unstable/nym-nodes/gateways/skimmed
# method: get
# - path: /v1/unstable/nym-nodes/mixnodes/skimmed
# method: get
Generated
+165 -45
View File
@@ -2196,10 +2196,15 @@ dependencies = [
"bincode",
"bytecodec",
"bytes",
"clap 4.5.20",
"dashmap",
"dirs",
"futures",
"nym-bin-common",
"nym-crypto",
"nym-sdk",
"serde",
"tempfile",
"tokio",
"tokio-stream",
"tokio-util",
@@ -2475,6 +2480,12 @@ dependencies = [
"once_cell",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fancy_constructor"
version = "1.2.2"
@@ -4055,6 +4066,30 @@ dependencies = [
"wasm-utils",
]
[[package]]
name = "mixnet-check-all-gateways"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"clap 4.5.20",
"dirs",
"echo-server",
"futures",
"nym-bin-common",
"nym-crypto",
"nym-sdk",
"reqwest 0.12.4",
"serde",
"serde_json",
"tempfile",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]
name = "mixnet-connectivity-check"
version = "0.1.0"
@@ -4951,7 +4986,6 @@ dependencies = [
"sha2 0.9.9",
"subtle 2.5.0",
"thiserror",
"utoipa",
"zeroize",
]
@@ -4964,6 +4998,7 @@ dependencies = [
"log",
"nym-network-defaults",
"serde",
"thiserror",
"toml 0.8.14",
"url",
]
@@ -5008,7 +5043,7 @@ dependencies = [
[[package]]
name = "nym-credential-proxy"
version = "0.1.6"
version = "0.1.7"
dependencies = [
"anyhow",
"async-trait",
@@ -5203,31 +5238,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "nym-data-observatory"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.7.7",
"chrono",
"clap 4.5.20",
"nym-bin-common",
"nym-network-defaults",
"nym-node-requests",
"nym-task",
"serde",
"serde_json",
"sqlx",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nym-dkg"
version = "0.1.0"
@@ -5329,7 +5339,7 @@ dependencies = [
[[package]]
name = "nym-ffi-shared"
version = "0.2.0"
version = "0.2.1"
dependencies = [
"anyhow",
"bs58",
@@ -5490,7 +5500,7 @@ dependencies = [
[[package]]
name = "nym-go-ffi"
version = "0.2.0"
version = "0.2.1"
dependencies = [
"anyhow",
"lazy_static",
@@ -5661,18 +5671,20 @@ version = "0.1.0"
dependencies = [
"dashmap",
"lazy_static",
"log",
"prometheus",
"tracing",
]
[[package]]
name = "nym-mixnet-client"
version = "0.1.0"
dependencies = [
"dashmap",
"futures",
"nym-sphinx",
"nym-task",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
]
@@ -5770,6 +5782,7 @@ dependencies = [
"nym-bin-common",
"nym-client-core",
"nym-crypto",
"nym-gateway-requests",
"nym-network-defaults",
"nym-sdk",
"nym-sphinx",
@@ -5783,6 +5796,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-postgres",
"tokio-util",
"utoipa",
"utoipa-swagger-ui",
@@ -6164,6 +6178,7 @@ dependencies = [
"reqwest 0.12.4",
"serde",
"tap",
"tempfile",
"thiserror",
"tokio",
"tokio-stream",
@@ -6540,10 +6555,7 @@ name = "nym-topology"
version = "0.1.0"
dependencies = [
"async-trait",
"bs58",
"log",
"nym-api-requests",
"nym-bin-common",
"nym-config",
"nym-crypto",
"nym-mixnet-contract-common",
@@ -6552,10 +6564,10 @@ dependencies = [
"nym-sphinx-types",
"rand",
"reqwest 0.12.4",
"semver 1.0.23",
"serde",
"serde_json",
"thiserror",
"tracing",
"tsify",
"wasm-bindgen",
"wasm-utils",
@@ -6789,6 +6801,7 @@ dependencies = [
"nym-crypto",
"nym-gateway-storage",
"nym-network-defaults",
"nym-node-metrics",
"nym-task",
"nym-wireguard-types",
"thiserror",
@@ -6844,6 +6857,40 @@ dependencies = [
"url",
]
[[package]]
name = "nyx-chain-watcher"
version = "0.1.8"
dependencies = [
"anyhow",
"async-trait",
"axum 0.7.7",
"chrono",
"clap 4.5.20",
"nym-bin-common",
"nym-config",
"nym-network-defaults",
"nym-node-requests",
"nym-task",
"nym-validator-client",
"nyxd-scraper",
"reqwest 0.12.4",
"rocket",
"schemars",
"serde",
"serde_json",
"sqlx",
"thiserror",
"time",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nyxd-scraper"
version = "0.1.0"
@@ -7240,6 +7287,24 @@ dependencies = [
"indexmap 2.2.6",
]
[[package]]
name = "phf"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
dependencies = [
"siphasher 0.3.11",
]
[[package]]
name = "pin-project"
version = "1.1.6"
@@ -7378,6 +7443,35 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0"
[[package]]
name = "postgres-protocol"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23"
dependencies = [
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand",
"sha2 0.10.8",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@@ -9684,6 +9778,32 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "tokio-postgres"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"rand",
"socket2",
"tokio",
"tokio-util",
"whoami",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
@@ -10627,9 +10747,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasm-bindgen"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e"
checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396"
dependencies = [
"cfg-if",
"once_cell",
@@ -10638,13 +10758,12 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358"
checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.90",
@@ -10665,9 +10784,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56"
checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -10675,9 +10794,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68"
checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2"
dependencies = [
"proc-macro2",
"quote",
@@ -10688,9 +10807,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d"
checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6"
[[package]]
name = "wasm-bindgen-test"
@@ -10861,6 +10980,7 @@ checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d"
dependencies = [
"redox_syscall 0.5.1",
"wasite",
"web-sys",
]
[[package]]
+7 -5
View File
@@ -118,8 +118,8 @@ members = [
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-network-monitor",
"nyx-chain-watcher",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
@@ -128,7 +128,7 @@ members = [
"nym-node-status-api/nym-node-status-client",
"nym-outfox",
"nym-validator-rewarder",
"tools/echo-server",
# "tools/echo-server",
"tools/internal/ssl-inject",
# "tools/internal/sdk-version-bump",
"tools/internal/testnet-manager",
@@ -147,7 +147,9 @@ members = [
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract", "common/verloc", "tools/internal/mixnet-connectivity-check",
"tools/internal/testnet-manager/dkg-bypass-contract",
"common/verloc",
"tools/internal/mixnet-connectivity-check", "tools/internal/mixnet-check-all-gateways",
]
default-members = [
@@ -156,11 +158,11 @@ default-members = [
"explorer-api",
"nym-api",
"nym-credential-proxy/nym-credential-proxy",
"nym-data-observatory",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -403,7 +405,7 @@ indexed_db_futures = { git = "https://github.com/TiemenSch/rust-indexed-db", bra
js-sys = "0.3.70"
serde-wasm-bindgen = "0.6.5"
tsify = "0.4.5"
wasm-bindgen = "0.2.95"
wasm-bindgen = "0.2.99"
wasm-bindgen-futures = "0.4.45"
wasmtimer = "0.2.0"
web-sys = "0.3.72"
+23
View File
@@ -0,0 +1,23 @@
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
+2 -2
View File
@@ -3,7 +3,7 @@ name = "nym-client-core"
version = "1.1.15"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
rust-version = "1.70"
rust-version = "1.76"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -45,7 +45,7 @@ nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
nym-pemstore = { path = "../pemstore" }
nym-topology = { path = "../topology", features = ["serializable"] }
nym-topology = { path = "../topology", features = ["persistence"] }
nym-mixnet-client = { path = "../client-libs/mixnet-client", default-features = false }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
nym-task = { path = "../task" }
@@ -550,6 +550,15 @@ pub struct Topology {
/// Specifies a minimum performance of a gateway that is used on route construction.
/// This setting is only applicable when `NymApi` topology is used.
pub minimum_gateway_performance: u8,
/// Specifies whether this client should attempt to retrieve all available network nodes
/// as opposed to just active mixnodes/gateways.
/// Useless without `ignore_epoch_roles = true`
pub use_extended_topology: bool,
/// Specifies whether this client should ignore the current epoch role of the target egress node
/// when constructing the final hop packets.
pub ignore_egress_epoch_role: bool,
}
#[allow(clippy::large_enum_variant)]
@@ -586,6 +595,8 @@ impl Default for Topology {
topology_structure: TopologyStructure::default(),
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
use_extended_topology: false,
ignore_egress_epoch_role: false,
}
}
}
@@ -8,7 +8,10 @@ use crate::{
},
};
use log::{debug, error};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
#[derive(Debug, Clone)]
@@ -30,6 +33,9 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -110,7 +116,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
VALUES (?, ?, ?)
"#,
registered_gateway.gateway_id_bs58,
@@ -224,7 +230,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
VALUES (?, ?)
"#,
custom.gateway_id_bs58,
@@ -112,7 +112,7 @@ where
source,
}
})?;
hardcoded_topology.get_gateways()
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::current_gateways(
@@ -128,7 +128,7 @@ where
// make sure the list of available gateways doesn't overlap the list of known gateways
let available_gateways = available_gateways
.into_iter()
.filter(|g| !registered_gateways.contains(g.identity()))
.filter(|g| !registered_gateways.contains(&g.identity()))
.collect::<Vec<_>>();
if available_gateways.is_empty() {
@@ -167,7 +167,7 @@ where
source,
}
})?;
hardcoded_topology.get_gateways()
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::current_gateways(
@@ -3,7 +3,6 @@
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
@@ -24,7 +23,7 @@ use crate::client::replies::reply_storage::{
};
use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::{
nym_api_provider, TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
};
use crate::config::{Config, DebugConfig};
use crate::error::ClientCoreError;
@@ -32,7 +31,7 @@ use crate::init::{
setup_gateway,
types::{GatewaySetup, InitialisationResult},
};
use crate::{config, spawn_future};
use crate::{config, spawn_future, ForgetMe};
use futures::channel::mpsc;
use log::*;
use nym_bandwidth_controller::BandwidthController;
@@ -188,6 +187,11 @@ pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
user_agent: Option<UserAgent>,
setup_method: GatewaySetup,
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
forget_me: ForgetMe,
}
impl<'a, C, S> BaseClientBuilder<'a, C, S>
@@ -210,9 +214,18 @@ where
shutdown: None,
user_agent: None,
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
connection_fd_callback: None,
forget_me: Default::default(),
}
}
#[must_use]
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
self.forget_me = forget_me.clone();
self
}
#[must_use]
pub fn with_gateway_setup(mut self, setup: GatewaySetup) -> Self {
self.setup_method = setup;
@@ -261,6 +274,15 @@ where
Ok(self)
}
#[cfg(unix)]
pub fn with_connection_fd_callback(
mut self,
callback: Arc<dyn Fn(RawFd) + Send + Sync>,
) -> Self {
self.connection_fd_callback = Some(callback);
self
}
// note: do **NOT** make this method public as its only valid usage is from within `start_base`
// because it relies on the crypto keys being already loaded
fn mix_address(details: &InitialisationResult) -> Recipient {
@@ -352,6 +374,7 @@ where
controller.start_with_shutdown(shutdown)
}
#[allow(clippy::too_many_arguments)]
async fn start_gateway_client(
config: &Config,
initialisation_result: InitialisationResult,
@@ -359,6 +382,7 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
shutdown: TaskClient,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
@@ -401,6 +425,8 @@ where
packet_router,
bandwidth_controller,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
};
@@ -437,8 +463,8 @@ where
details_store
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
.await.map_err(|err| {
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
})?
}
@@ -462,6 +488,7 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
mut shutdown: TaskClient,
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
where
@@ -493,6 +520,8 @@ where
details_store,
packet_router,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
.await?;
@@ -509,15 +538,15 @@ where
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or_else(|| match config_topology.topology_structure {
config::TopologyStructure::NymApi => Box::new(NymApiTopologyProvider::new(
nym_api_provider::Config {
min_mixnode_performance: config_topology.minimum_mixnode_performance,
min_gateway_performance: config_topology.minimum_gateway_performance,
},
config_topology,
nym_api_urls,
user_agent,
)),
config::TopologyStructure::GeoAware(group_by) => {
Box::new(GeoAwareTopologyProvider::new(nym_api_urls, group_by))
warn!("using deprecated 'GeoAware' topology provider - this option will be removed very soon");
#[allow(deprecated)]
Box::new(crate::client::topology_control::GeoAwareTopologyProvider::new(nym_api_urls, group_by))
}
})
}
@@ -528,7 +557,7 @@ where
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: config::Topology,
topology_accessor: TopologyAccessor,
local_gateway: &NodeIdentity,
local_gateway: NodeIdentity,
wait_for_gateway: bool,
mut shutdown: TaskClient,
) -> Result<(), ClientCoreError> {
@@ -560,7 +589,7 @@ where
};
if let Err(err) = topology_refresher
.ensure_contains_gateway(local_gateway)
.ensure_contains_routable_egress(local_gateway)
.await
{
if let Some(waiting_timeout) = gateway_wait_timeout {
@@ -615,9 +644,11 @@ where
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown: TaskClient,
forget_me: ForgetMe,
) -> BatchMixMessageSender {
info!("Starting mix traffic controller...");
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_transceiver);
let (mix_traffic_controller, mix_tx) =
MixTrafficController::new(gateway_transceiver, forget_me);
mix_traffic_controller.start_with_shutdown(shutdown);
mix_tx
}
@@ -708,7 +739,8 @@ where
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor = TopologyAccessor::new();
let shared_topology_accessor =
TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
// Shutdown notifier for signalling tasks to stop
let shutdown = self
@@ -772,6 +804,8 @@ where
&details_store,
gateway_packet_router,
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
shutdown.fork("gateway_transceiver"),
)
.await?;
@@ -797,9 +831,11 @@ where
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
// The MixTrafficController then sends the actual traffic
let message_sender = Self::start_mix_traffic_controller(
gateway_transceiver,
shutdown.fork("mix_traffic_controller"),
self.forget_me,
);
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -163,6 +163,7 @@ impl LoopCoverTrafficStream<OsRng> {
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.our_full_destination,
Some(&self.our_full_destination),
@@ -28,7 +28,6 @@ pub enum InputMessage {
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Creates a message used for a duplex anonymous communication where the recipient
@@ -44,7 +43,6 @@ pub enum InputMessage {
data: Vec<u8>,
reply_surbs: u32,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
@@ -94,29 +92,6 @@ impl InputMessage {
recipient,
data,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
} else {
message
}
}
// IMHO `new_regular` should take `mix_hops: Option<u8>` as an argument instead of creating
// this function, but that would potentially break backwards compatibility with the current API
pub fn new_regular_with_custom_hops(
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Regular {
recipient,
data,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -137,7 +112,6 @@ impl InputMessage {
data,
reply_surbs,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -154,14 +128,12 @@ impl InputMessage {
reply_surbs: u32,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Anonymous {
recipient,
data,
reply_surbs,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -2,8 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::spawn_future;
use crate::{spawn_future, ForgetMe};
use log::*;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
@@ -26,10 +27,14 @@ pub struct MixTrafficController {
// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
consecutive_gateway_failure_count: usize,
forget_me: ForgetMe,
}
impl MixTrafficController {
pub fn new<T>(gateway_transceiver: T) -> (MixTrafficController, BatchMixMessageSender)
pub fn new<T>(
gateway_transceiver: T,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender)
where
T: GatewayTransceiver + Send + 'static,
{
@@ -40,6 +45,7 @@ impl MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_rx: message_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
},
message_sender,
)
@@ -47,6 +53,7 @@ impl MixTrafficController {
pub fn new_dynamic(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender) {
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
@@ -55,6 +62,7 @@ impl MixTrafficController {
gateway_transceiver,
mix_rx: message_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
},
message_sender,
)
@@ -111,7 +119,27 @@ impl MixTrafficController {
}
}
shutdown.recv_timeout().await;
if self.forget_me.any() {
log::info!("Sending forget me request to the gateway");
match self
.gateway_transceiver
.send_client_request(ClientRequest::ForgetMe {
client: self.forget_me.client(),
stats: self.forget_me.stats(),
})
.await
{
Ok(_) => {
log::info!("Successfully sent forget me request to the gateway");
}
Err(err) => {
log::error!("Failed to send forget me request to the gateway: {err}");
}
}
}
log::debug!("MixTrafficController: Exiting");
})
});
}
}
@@ -5,8 +5,10 @@ use async_trait::async_trait;
use log::{debug, error};
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::identity;
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::GatewayClient;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
@@ -26,9 +28,14 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
}
/// This combines combines the functionalities of being able to send and receive mix packets.
#[async_trait]
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> identity::PublicKey;
fn ws_fd(&self) -> Option<RawFd>;
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError>;
}
/// This trait defines the functionality of sending `MixPacket` into the mixnet,
@@ -65,6 +72,7 @@ pub trait GatewayReceiver {
}
// to allow for dynamic dispatch
#[async_trait]
impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
#[inline]
fn gateway_identity(&self) -> identity::PublicKey {
@@ -73,6 +81,13 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
fn ws_fd(&self) -> Option<RawFd> {
(**self).ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
(**self).send_client_request(message).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -91,7 +106,6 @@ impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
(**self).batch_send_mix_packets(packets).await
}
}
impl<G: GatewayReceiver + ?Sized> GatewayReceiver for Box<G> {
#[inline]
fn set_packet_router(&mut self, packet_router: PacketRouter) -> Result<(), ErasedGatewayError> {
@@ -111,6 +125,7 @@ impl<C, St> RemoteGateway<C, St> {
}
}
#[async_trait]
impl<C, St> GatewayTransceiver for RemoteGateway<C, St>
where
C: DkgQueryClient + Send + Sync,
@@ -123,6 +138,20 @@ where
fn ws_fd(&self) -> Option<RawFd> {
self.gateway_client.ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.gateway_client.shared_key() {
self.gateway_client
.send_websocket_message(message.encrypt(&*shared_key)?)
.await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -195,6 +224,7 @@ impl LocalGateway {
mod nonwasm_sealed {
use super::*;
#[async_trait]
impl GatewayTransceiver for LocalGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.local_identity
@@ -202,6 +232,13 @@ mod nonwasm_sealed {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
#[async_trait]
@@ -269,6 +306,7 @@ impl GatewaySender for MockGateway {
}
}
#[async_trait]
impl GatewayTransceiver for MockGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.dummy_identity
@@ -276,4 +314,11 @@ impl GatewayTransceiver for MockGateway {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
@@ -73,11 +73,10 @@ where
content: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_plain_message(recipient, content, lane, packet_type, mix_hops)
.try_send_plain_message(recipient, content, lane, packet_type)
.await
{
warn!("failed to send a plain message - {err}")
@@ -91,18 +90,10 @@ where
reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_message_with_reply_surbs(
recipient,
content,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.try_send_message_with_reply_surbs(recipient, content, reply_surbs, lane, packet_type)
.await
{
warn!("failed to send a repliable message - {err}")
@@ -115,9 +106,8 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, PacketType::Mix, mix_hops)
self.handle_plain_message(recipient, data, lane, PacketType::Mix)
.await
}
InputMessage::Anonymous {
@@ -125,17 +115,9 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
PacketType::Mix,
mix_hops,
)
.await
self.handle_repliable_message(recipient, data, reply_surbs, lane, PacketType::Mix)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -153,9 +135,8 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, packet_type, mix_hops)
self.handle_plain_message(recipient, data, lane, packet_type)
.await
}
InputMessage::Anonymous {
@@ -163,17 +144,9 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.await
self.handle_repliable_message(recipient, data, reply_surbs, lane, packet_type)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -70,7 +70,6 @@ pub(crate) struct PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
destination: PacketDestination,
mix_hops: Option<u8>,
retransmissions: u32,
}
@@ -80,13 +79,11 @@ impl PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
recipient: Recipient,
mix_hops: Option<u8>,
) -> Self {
PendingAcknowledgement {
message_chunk,
delay,
destination: PacketDestination::KnownRecipient(recipient.into()),
mix_hops,
retransmissions: 0,
}
}
@@ -104,9 +101,6 @@ impl PendingAcknowledgement {
recipient_tag,
extra_surb_request,
},
// Messages sent using SURBs are using the number of mix hops set by the recipient when
// they provided the SURBs, so it doesn't make sense to include it here.
mix_hops: None,
retransmissions: 0,
}
}
@@ -52,18 +52,12 @@ where
packet_recipient: Recipient,
chunk_data: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("retransmitting normal packet...");
// TODO: Figure out retransmission packet type signaling
self.message_handler
.try_prepare_single_chunk_for_sending(
packet_recipient,
chunk_data,
packet_type,
mix_hops,
)
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type)
.await
}
@@ -110,7 +104,6 @@ where
**recipient,
timed_out_ack.message_chunk.clone(),
packet_type,
timed_out_ack.mix_hops,
)
.await
}
@@ -15,11 +15,11 @@ use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessa
use nym_sphinx::anonymous_replies::{ReplySurb, SurbEncryptionKey};
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, PacketType, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
use std::sync::Arc;
@@ -100,10 +100,6 @@ pub(crate) struct Config {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
/// Primary predefined packet size used for the encapsulated messages.
primary_packet_size: PacketSize,
@@ -125,19 +121,11 @@ impl Config {
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
primary_packet_size: PacketSize::default(),
secondary_packet_size: None,
}
}
/// Allows setting non-default number of expected mix hops in the network.
#[allow(dead_code)]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
/// Allows setting non-default size of the sphinx packets sent out.
pub fn with_custom_primary_packet_size(mut self, packet_size: PacketSize) -> Self {
self.primary_packet_size = packet_size;
@@ -185,9 +173,7 @@ where
config.sender_address,
config.average_packet_delay,
config.average_ack_delay,
)
.with_mix_hops(config.num_mix_hops);
);
MessageHandler {
config,
rng,
@@ -216,7 +202,7 @@ where
fn get_topology<'a>(
&self,
permit: &'a TopologyReadPermit<'a>,
) -> Result<&'a NymTopology, PreparationError> {
) -> Result<&'a NymRouteProvider, PreparationError> {
match permit.try_get_valid_topology_ref(&self.config.sender_address, None) {
Ok(topology_ref) => Ok(topology_ref),
Err(err) => {
@@ -233,9 +219,8 @@ where
return self.config.primary_packet_size;
};
let primary_count =
msg.required_packets(self.config.primary_packet_size, self.config.num_mix_hops);
let secondary_count = msg.required_packets(secondary_packet, self.config.num_mix_hops);
let primary_count = msg.required_packets(self.config.primary_packet_size);
let secondary_count = msg.required_packets(secondary_packet);
trace!("This message would require: {primary_count} primary packets or {secondary_count} secondary packets...");
// if there would be no benefit in using the secondary packet - use the primary (duh)
@@ -424,10 +409,9 @@ where
message: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
let message = NymMessage::new_plain(message);
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
.await
}
@@ -437,7 +421,6 @@ where
recipient: Recipient,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending non-reply message with packet type {packet_type}");
// TODO: I really dislike existence of this assertion, it implies code has to be re-organised
@@ -470,7 +453,6 @@ where
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)?;
let real_message = RealMessage::new(
@@ -478,8 +460,7 @@ where
Some(fragment.fragment_identifier()),
);
let delay = prepared_fragment.total_delay;
let pending_ack =
PendingAcknowledgement::new_known(fragment, delay, recipient, mix_hops);
let pending_ack = PendingAcknowledgement::new_known(fragment, delay, recipient);
real_messages.push(real_message);
pending_acks.push(pending_ack);
@@ -496,7 +477,6 @@ where
recipient: Recipient,
amount: u32,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending additional reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -513,7 +493,6 @@ where
recipient,
TransmissionLane::AdditionalReplySurbs,
packet_type,
mix_hops,
)
.await?;
@@ -530,7 +509,6 @@ where
num_reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), SurbWrappedPreparationError> {
debug!("Sending message with reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -541,7 +519,7 @@ where
let message =
NymMessage::new_repliable(RepliableMessage::new_data(message, sender_tag, reply_surbs));
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
.await?;
log::trace!("storing {} reply keys", reply_keys.len());
@@ -555,23 +533,18 @@ where
recipient: Recipient,
chunk: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("Sending single chunk with packet type {packet_type}");
let topology_permit = self.topology_access.get_read_permit().await;
let topology = self.get_topology(&topology_permit)?;
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)
.unwrap();
let prepared_fragment = self.message_preparer.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
)?;
Ok(prepared_fragment)
}
@@ -624,16 +597,13 @@ where
Err(err) => return Err(err.return_surbs(vec![reply_surb])),
};
let prepared_fragment = self
.message_preparer
.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)
.unwrap();
let prepared_fragment = self.message_preparer.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)?;
Ok(prepared_fragment)
}
@@ -230,6 +230,7 @@ where
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.config.our_full_destination,
Some(&self.config.our_full_destination),
@@ -516,7 +516,6 @@ where
recipient,
to_send,
nym_sphinx::params::PacketType::Mix,
self.config.reply_surbs.surb_mix_hops,
)
.await
{
@@ -2,8 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::DEFAULT_NUM_MIX_HOPS;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -17,29 +16,36 @@ pub struct TopologyAccessorInner {
// few seconds, while reads are needed every single packet generated.
// However, proper benchmarks will be needed to determine if `RwLock` is indeed a better
// approach than a `Mutex`
topology: RwLock<Option<NymTopology>>,
topology: RwLock<NymRouteProvider>,
}
impl TopologyAccessorInner {
fn new() -> Self {
fn new(initial: NymRouteProvider) -> Self {
TopologyAccessorInner {
controlled_manually: AtomicBool::new(false),
released_manual_control: Notify::new(),
topology: RwLock::new(None),
topology: RwLock::new(initial),
}
}
async fn update(&self, new: Option<NymTopology>) {
*self.topology.write().await = new;
let mut guard = self.topology.write().await;
match new {
Some(updated) => {
guard.update(updated);
}
None => guard.clear_topology(),
}
}
}
pub struct TopologyReadPermit<'a> {
permit: RwLockReadGuard<'a, Option<NymTopology>>,
permit: RwLockReadGuard<'a, NymRouteProvider>,
}
impl Deref for TopologyReadPermit<'_> {
type Target = Option<NymTopology>;
type Target = NymRouteProvider;
fn deref(&self) -> &Self::Target {
&self.permit
@@ -53,43 +59,31 @@ impl<'a> TopologyReadPermit<'a> {
&'a self,
ack_recipient: &Recipient,
packet_recipient: Option<&Recipient>,
) -> Result<&'a NymTopology, NymTopologyError> {
) -> Result<&'a NymRouteProvider, NymTopologyError> {
let route_provider = self.permit.deref();
let topology = &route_provider.topology;
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
let topology = self
.permit
.as_ref()
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
topology.ensure_not_empty()?;
// 2. does it have any mixnode at all?
// 3. does it have any gateways at all?
// 4. does it have a mixnode on each layer?
topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS)?;
// 2. does the topology have a node on each mixing layer?
topology.ensure_minimally_routable()?;
// 5. does it contain OUR gateway (so that we could create an ack packet)?
if !topology.gateway_exists(ack_recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: ack_recipient.gateway().to_base58_string(),
});
}
// 3. does it contain OUR gateway (so that we could create an ack packet)?
let _ = route_provider.egress_by_identity(ack_recipient.gateway())?;
// 6. for our target recipient, does it contain THEIR gateway (so that we could create
// 4. for our target recipient, does it contain THEIR gateway (so that we send anything over?)
if let Some(recipient) = packet_recipient {
if !topology.gateway_exists(recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: recipient.gateway().to_base58_string(),
});
}
let _ = route_provider.egress_by_identity(recipient.gateway())?;
}
Ok(topology)
Ok(route_provider)
}
}
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
fn from(read_permit: RwLockReadGuard<'a, Option<NymTopology>>) -> Self {
TopologyReadPermit {
permit: read_permit,
}
impl<'a> From<RwLockReadGuard<'a, NymRouteProvider>> for TopologyReadPermit<'a> {
fn from(permit: RwLockReadGuard<'a, NymRouteProvider>) -> Self {
TopologyReadPermit { permit }
}
}
@@ -99,9 +93,11 @@ pub struct TopologyAccessor {
}
impl TopologyAccessor {
pub fn new() -> Self {
pub fn new(ignore_egress_epoch_roles: bool) -> Self {
TopologyAccessor {
inner: Arc::new(TopologyAccessorInner::new()),
inner: Arc::new(TopologyAccessorInner::new(NymRouteProvider::new_empty(
ignore_egress_epoch_roles,
))),
}
}
@@ -121,8 +117,21 @@ impl TopologyAccessor {
self.inner.released_manual_control.notified().await
}
#[deprecated(note = "use .current_route_provider instead")]
pub async fn current_topology(&self) -> Option<NymTopology> {
self.inner.topology.read().await.clone()
self.current_route_provider()
.await
.as_ref()
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
} else {
Some(provider)
}
}
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
@@ -140,15 +149,11 @@ impl TopologyAccessor {
// only used by the client at startup to get a slightly more reasonable error message
// (currently displays as unused because health checker is disabled due to required changes)
pub async fn ensure_is_routable(&self) -> Result<(), NymTopologyError> {
match self.inner.topology.read().await.deref() {
None => Err(NymTopologyError::EmptyNetworkTopology),
Some(ref topology) => topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS),
}
}
}
impl Default for TopologyAccessor {
fn default() -> Self {
TopologyAccessor::new()
self.inner
.topology
.read()
.await
.topology
.ensure_minimally_routable()
}
}
@@ -3,7 +3,6 @@ use log::{debug, error};
use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond};
use nym_network_defaults::var_names::EXPLORER_API;
use nym_topology::{
nym_topology_from_basic_info,
provider_trait::{async_trait, TopologyProvider},
NymTopology,
};
@@ -15,8 +14,6 @@ use url::Url;
pub use nym_country_group::CountryGroup;
const MIN_NODES_PER_LAYER: usize = 1;
fn create_explorer_client() -> Option<ExplorerClient> {
let Ok(explorer_api_url) = std::env::var(EXPLORER_API) else {
error!("Missing EXPLORER_API");
@@ -63,30 +60,20 @@ fn log_mixnode_distribution(mixnodes: &HashMap<CountryGroup, Vec<NodeId>>) {
}
fn check_layer_integrity(topology: NymTopology) -> Result<(), ()> {
let mixes = topology.mixes();
if mixes.keys().len() < 3 {
if topology.ensure_minimally_routable().is_err() {
error!("Layer is missing in topology!");
return Err(());
}
for (layer, mixnodes) in mixes {
debug!("Layer {:?} has {} mixnodes", layer, mixnodes.len());
if mixnodes.len() < MIN_NODES_PER_LAYER {
error!(
"There are only {} mixnodes in layer {:?}",
mixnodes.len(),
layer
);
return Err(());
}
}
Ok(())
}
#[deprecated(note = "use NymApiTopologyProvider instead as explorer API will soon be removed")]
pub struct GeoAwareTopologyProvider {
validator_client: nym_validator_client::client::NymApiClient,
filter_on: GroupBy,
}
#[allow(deprecated)]
impl GeoAwareTopologyProvider {
pub fn new(mut nym_api_urls: Vec<Url>, filter_on: GroupBy) -> GeoAwareTopologyProvider {
log::info!(
@@ -104,6 +91,15 @@ impl GeoAwareTopologyProvider {
}
async fn get_topology(&self) -> Option<NymTopology> {
let rewarded_set = self
.validator_client
.get_current_rewarded_set()
.await
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let mut topology = NymTopology::new_empty(rewarded_set);
let mixnodes = match self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
@@ -187,7 +183,8 @@ impl GeoAwareTopologyProvider {
.filter(|m| filtered_mixnode_ids.contains(&m.node_id))
.collect::<Vec<_>>();
let topology = nym_topology_from_basic_info(&mixnodes, &gateways);
topology.add_skimmed_nodes(&mixnodes);
topology.add_skimmed_nodes(&gateways);
// TODO: return real error type
check_layer_integrity(topology.clone()).ok()?;
@@ -196,6 +193,7 @@ impl GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -205,6 +203,7 @@ impl TopologyProvider for GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(target_arch = "wasm32")]
#[async_trait(?Send)]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -19,6 +19,7 @@ mod accessor;
pub mod geo_aware_provider;
pub mod nym_api_provider;
#[allow(deprecated)]
pub use geo_aware_provider::GeoAwareTopologyProvider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::TopologyProvider;
@@ -27,7 +28,7 @@ pub use nym_topology::provider_trait::TopologyProvider;
const MAX_FAILURE_COUNT: usize = 10;
pub struct TopologyRefresherConfig {
refresh_rate: Duration,
pub refresh_rate: Duration,
}
impl TopologyRefresherConfig {
@@ -96,28 +97,24 @@ impl TopologyRefresher {
self.topology_accessor.ensure_is_routable().await
}
pub async fn ensure_contains_gateway(
pub async fn ensure_contains_routable_egress(
&self,
gateway: &NodeIdentity,
egress: NodeIdentity,
) -> Result<(), NymTopologyError> {
let topology = self
.topology_accessor
.current_topology()
.current_route_provider()
.await
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
if !topology.gateway_exists(gateway) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: gateway.to_base58_string(),
});
}
let _ = topology.egress_by_identity(egress)?;
Ok(())
}
pub async fn wait_for_gateway(
&mut self,
gateway: &NodeIdentity,
gateway: NodeIdentity,
timeout_duration: Duration,
) -> Result<(), NymTopologyError> {
info!(
@@ -135,7 +132,7 @@ impl TopologyRefresher {
})
}
_ = self.try_refresh() => {
if self.ensure_contains_gateway(gateway).await.is_ok() {
if self.ensure_contains_routable_egress(gateway).await.is_ok() {
return Ok(())
}
info!("gateway '{gateway}' is still not online...");
@@ -4,32 +4,39 @@
use async_trait::async_trait;
use log::{debug, error, warn};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::NymTopology;
use nym_validator_client::UserAgent;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
use url::Url;
// the same values as our current (10.06.24) blacklist
pub const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
pub const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
#[derive(Debug)]
pub struct Config {
pub min_mixnode_performance: u8,
pub min_gateway_performance: u8,
pub use_extended_topology: bool,
pub ignore_egress_epoch_role: bool,
}
impl Default for Config {
fn default() -> Self {
// old values that decided on blacklist membership
impl From<nym_client_core_config_types::Topology> for Config {
fn from(value: nym_client_core_config_types::Topology) -> Self {
Config {
min_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
min_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
min_mixnode_performance: value.minimum_mixnode_performance,
min_gateway_performance: value.minimum_gateway_performance,
use_extended_topology: value.use_extended_topology,
ignore_egress_epoch_role: value.ignore_egress_epoch_role,
}
}
}
impl Config {
// if we're using 'extended' topology, filter the nodes based on the lowest set performance
fn min_node_performance(&self) -> u8 {
min(self.min_mixnode_performance, self.min_gateway_performance)
}
}
pub struct NymApiTopologyProvider {
config: Config,
@@ -39,7 +46,11 @@ pub struct NymApiTopologyProvider {
}
impl NymApiTopologyProvider {
pub fn new(config: Config, mut nym_api_urls: Vec<Url>, user_agent: Option<UserAgent>) -> Self {
pub fn new(
config: impl Into<Config>,
mut nym_api_urls: Vec<Url>,
user_agent: Option<UserAgent>,
) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
let validator_client = if let Some(user_agent) = user_agent {
@@ -52,7 +63,7 @@ impl NymApiTopologyProvider {
};
NymApiTopologyProvider {
config,
config: config.into(),
validator_client,
nym_api_urls,
currently_used_api: 0,
@@ -70,70 +81,69 @@ impl NymApiTopologyProvider {
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
}
/// Verifies whether nodes a reasonably distributed among all mix layers.
///
/// In ideal world we would have 33% nodes on layer 1, 33% on layer 2 and 33% on layer 3.
/// However, this is a rather unrealistic expectation, instead we check whether there exists
/// a layer with more than 66% of nodes or with fewer than 15% and if so, we trigger a failure.
///
/// # Arguments
///
/// * `topology`: active topology constructed from validator api data
fn check_layer_distribution(
&self,
active_topology: &NymTopology,
) -> Result<(), NymTopologyError> {
let lower_threshold = 0.15;
let upper_threshold = 0.66;
active_topology.ensure_even_layer_distribution(lower_threshold, upper_threshold)
}
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
let mixnodes = match self
let rewarded_set = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
.get_current_rewarded_set()
.await
{
Err(err) => {
error!("failed to get network mixnodes - {err}");
return None;
}
Ok(mixes) => mixes,
};
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let gateways = match self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
{
Err(err) => {
error!("failed to get network gateways - {err}");
return None;
}
Ok(gateways) => gateways,
};
let mut topology = NymTopology::new_empty(rewarded_set);
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
if self.config.use_extended_topology {
let all_nodes = self
.validator_client
.get_all_basic_nodes()
.await
.inspect_err(|err| error!("failed to get network nodes: {err}"))
.ok()?;
let topology = NymTopology::from_unordered(
mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}),
gateways.iter().filter(|g| {
g.performance.round_to_integer() >= self.config.min_gateway_performance
}),
);
if let Err(err) = self.check_layer_distribution(&topology) {
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
self.use_next_nym_api();
None
debug!(
"there are {} nodes on the network (before filtering)",
all_nodes.len()
);
topology.add_additional_nodes(all_nodes.iter().filter(|n| {
n.performance.round_to_integer() >= self.config.min_node_performance()
}));
} else {
Some(topology)
// if we're not using extended topology, we're only getting active set mixnodes and gateways
let mixnodes = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network mixnodes: {err}"))
.ok()?;
// TODO: we really should be getting ACTIVE gateways only
let gateways = self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network gateways: {err}"))
.ok()?;
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
topology.add_additional_nodes(mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}));
topology.add_additional_nodes(gateways.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_gateway_performance
}));
};
if !topology.is_minimally_routable() {
error!("the current filtered active topology can't be used to construct any packets");
return None;
}
Some(topology)
}
}
@@ -142,7 +152,11 @@ impl NymApiTopologyProvider {
#[async_trait]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
self.get_current_compatible_topology().await
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
}
}
@@ -150,6 +164,10 @@ impl TopologyProvider for NymApiTopologyProvider {
#[async_trait(?Send)]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
self.get_current_compatible_topology().await
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
}
}
+7 -4
View File
@@ -4,8 +4,8 @@
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_topology::gateway::GatewayConversionError;
use nym_topology::NymTopologyError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::ValidatorClientError;
use std::error::Error;
use std::path::PathBuf;
@@ -74,10 +74,10 @@ pub enum ClientCoreError {
#[error("the gateway id is invalid - {0}")]
UnableToCreatePublicKeyFromGatewayId(Ed25519RecoveryError),
#[error("The gateway is malformed: {source}")]
#[error("the node is malformed: {source}")]
MalformedGateway {
#[from]
source: GatewayConversionError,
source: Box<RoutingNodeError>,
},
#[error("failed to establish connection to gateway: {source}")]
@@ -159,6 +159,9 @@ pub enum ClientCoreError {
#[error("the specified gateway '{gateway}' does not support the wss protocol")]
UnsupportedWssProtocol { gateway: String },
#[error("node {id} ({identity}) does not support mixnet entry mode")]
UnsupportedEntry { id: NodeId, identity: String },
#[error(
"failed to load custom topology using path '{}'. detailed message: {source}", file_path.display()
)]
+40 -20
View File
@@ -7,7 +7,7 @@ use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use nym_crypto::asymmetric::identity;
use nym_gateway_client::GatewayClient;
use nym_topology::gateway;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKeyRef;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
@@ -15,6 +15,7 @@ use std::{sync::Arc, time::Duration};
use tungstenite::Message;
use url::Url;
use nym_topology::NodeId;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
@@ -25,7 +26,6 @@ use tokio::time::Instant;
use tokio_tungstenite::connect_async;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
#[cfg(target_arch = "wasm32")]
@@ -48,22 +48,30 @@ const PING_TIMEOUT: Duration = Duration::from_millis(1000);
// The abstraction that some of these helpers use
pub trait ConnectableGateway {
fn identity(&self) -> &identity::PublicKey;
fn clients_address(&self) -> String;
fn node_id(&self) -> NodeId;
fn identity(&self) -> identity::PublicKey;
fn clients_address(&self, prefer_ipv6: bool) -> Option<String>;
fn is_wss(&self) -> bool;
}
impl ConnectableGateway for gateway::LegacyNode {
fn identity(&self) -> &identity::PublicKey {
self.identity()
impl ConnectableGateway for RoutingNode {
fn node_id(&self) -> NodeId {
self.node_id
}
fn clients_address(&self) -> String {
self.clients_address()
fn identity(&self) -> identity::PublicKey {
self.identity_key
}
fn clients_address(&self, prefer_ipv6: bool) -> Option<String> {
self.ws_entry_address(prefer_ipv6)
}
fn is_wss(&self) -> bool {
self.clients_wss_port.is_some()
self.entry
.as_ref()
.map(|e| e.clients_wss_port.is_some())
.unwrap_or_default()
}
}
@@ -83,7 +91,7 @@ pub async fn current_gateways<R: Rng>(
nym_apis: &[Url],
user_agent: Option<UserAgent>,
minimum_performance: u8,
) -> Result<Vec<gateway::LegacyNode>, ClientCoreError> {
) -> Result<Vec<RoutingNode>, ClientCoreError> {
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
@@ -104,7 +112,7 @@ pub async fn current_gateways<R: Rng>(
.iter()
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<gateway::LegacyNode>>();
.collect::<Vec<_>>();
log::debug!("After checking validity: {}", valid_gateways.len());
log::trace!("Valid gateways: {:#?}", valid_gateways);
@@ -134,7 +142,12 @@ async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, Client
where
G: ConnectableGateway,
{
let addr = gateway.clients_address();
let Some(addr) = gateway.clients_address(false) else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id(),
identity: gateway.identity().to_string(),
});
};
trace!(
"establishing connection to {} ({addr})...",
gateway.identity(),
@@ -190,7 +203,7 @@ where
Ok(GatewayWithLatency::new(gateway, avg))
}
pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone>(
pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
rng: &mut R,
gateways: &[G],
must_use_tls: bool,
@@ -205,7 +218,7 @@ pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone
let gateways_with_latency = Arc::new(tokio::sync::Mutex::new(Vec::new()));
futures::stream::iter(gateways)
.for_each_concurrent(CONCURRENT_GATEWAYS_MEASURED, |gateway| async {
let id = *gateway.identity();
let id = gateway.identity();
trace!("measuring latency to {id}...");
match measure_latency(gateway).await {
Ok(with_latency) => {
@@ -252,9 +265,9 @@ fn filter_by_tls<G: ConnectableGateway>(
pub(super) fn uniformly_random_gateway<R: Rng>(
rng: &mut R,
gateways: &[gateway::LegacyNode],
gateways: &[RoutingNode],
must_use_tls: bool,
) -> Result<gateway::LegacyNode, ClientCoreError> {
) -> Result<RoutingNode, ClientCoreError> {
filter_by_tls(gateways, must_use_tls)?
.choose(rng)
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
@@ -263,9 +276,9 @@ pub(super) fn uniformly_random_gateway<R: Rng>(
pub(super) fn get_specified_gateway(
gateway_identity: IdentityKeyRef,
gateways: &[gateway::LegacyNode],
gateways: &[RoutingNode],
must_use_tls: bool,
) -> Result<gateway::LegacyNode, ClientCoreError> {
) -> Result<RoutingNode, ClientCoreError> {
log::debug!("Requesting specified gateway: {}", gateway_identity);
let user_gateway = identity::PublicKey::from_base58_string(gateway_identity)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
@@ -275,7 +288,14 @@ pub(super) fn get_specified_gateway(
.find(|gateway| gateway.identity_key == user_gateway)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_identity.to_string()))?;
if must_use_tls && gateway.clients_wss_port.is_none() {
let Some(entry_details) = gateway.entry.as_ref() else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id,
identity: gateway.identity().to_string(),
});
};
if must_use_tls && entry_details.clients_wss_port.is_none() {
return Err(ClientCoreError::UnsupportedWssProtocol {
gateway: gateway_identity.to_string(),
});
+2 -2
View File
@@ -19,7 +19,7 @@ use crate::init::types::{
use nym_client_core_gateways_storage::GatewaysDetailsStore;
use nym_client_core_gateways_storage::{GatewayDetails, GatewayRegistration};
use nym_gateway_client::client::InitGatewayClient;
use nym_topology::gateway;
use nym_topology::node::RoutingNode;
use rand::rngs::OsRng;
use rand::{CryptoRng, RngCore};
use serde::Serialize;
@@ -50,7 +50,7 @@ async fn setup_new_gateway<K, D>(
key_store: &K,
details_store: &D,
selection_specification: GatewaySelectionSpecification,
available_gateways: Vec<gateway::LegacyNode>,
available_gateways: Vec<RoutingNode>,
) -> Result<InitialisationResult, ClientCoreError>
where
K: KeyStore,
+12 -5
View File
@@ -13,7 +13,7 @@ use nym_crypto::asymmetric::identity;
use nym_gateway_client::client::InitGatewayClient;
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::gateway;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKey;
use nym_validator_client::nyxd::AccountId;
use serde::Serialize;
@@ -38,16 +38,23 @@ pub enum SelectedGateway {
impl SelectedGateway {
pub fn from_topology_node(
node: gateway::LegacyNode,
node: RoutingNode,
must_use_tls: bool,
) -> Result<Self, ClientCoreError> {
// for now, let's use 'old' behaviour, if you want to change it, you can pass it up the enum stack yourself : )
let prefer_ipv6 = false;
let gateway_listener = if must_use_tls {
node.clients_address_tls()
node.ws_entry_address_tls()
.ok_or(ClientCoreError::UnsupportedWssProtocol {
gateway: node.identity_key.to_base58_string(),
})?
} else {
node.clients_address()
node.ws_entry_address(prefer_ipv6)
.ok_or(ClientCoreError::UnsupportedEntry {
id: node.node_id,
identity: node.identity_key.to_base58_string(),
})?
};
let gateway_listener =
@@ -200,7 +207,7 @@ pub enum GatewaySetup {
specification: GatewaySelectionSpecification,
// TODO: seems to be a bit inefficient to pass them by value
available_gateways: Vec<gateway::LegacyNode>,
available_gateways: Vec<RoutingNode>,
},
ReuseConnection {
+46 -2
View File
@@ -14,8 +14,7 @@ pub mod error;
pub mod init;
pub use nym_topology::{
HardcodedTopologyProvider, NymTopology, NymTopologyError, SerializableNymTopology,
SerializableTopologyError, TopologyProvider,
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
};
#[cfg(target_arch = "wasm32")]
@@ -34,3 +33,48 @@ where
{
tokio::spawn(future);
}
#[derive(Clone, Default, Debug)]
pub struct ForgetMe {
client: bool,
stats: bool,
}
impl ForgetMe {
pub fn new_all() -> Self {
Self {
client: true,
stats: true,
}
}
pub fn new_client() -> Self {
Self {
client: true,
stats: false,
}
}
pub fn new_stats() -> Self {
Self {
client: false,
stats: true,
}
}
pub fn new(client: bool, stats: bool) -> Self {
Self { client, stats }
}
pub fn any(&self) -> bool {
self.client || self.stats
}
pub fn client(&self) -> bool {
self.client
}
pub fn stats(&self) -> bool {
self.stats
}
}
@@ -9,7 +9,10 @@ use crate::backend::fs_backend::{
},
};
use log::{error, info};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
#[derive(Debug, Clone)]
@@ -31,6 +34,9 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(fresh)
.disable_statement_logging();
@@ -101,6 +101,10 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
// currently unused (but populated)
negotiated_protocol: Option<u8>,
// Callback on the fd as soon as the connection has been established
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
/// Listen to shutdown messages and send notifications back to the task manager
task_client: TaskClient,
}
@@ -116,6 +120,7 @@ impl<C, St> GatewayClient<C, St> {
packet_router: PacketRouter,
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
task_client: TaskClient,
) -> Self {
GatewayClient {
@@ -131,6 +136,8 @@ impl<C, St> GatewayClient<C, St> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
task_client,
}
}
@@ -205,6 +212,12 @@ impl<C, St> GatewayClient<C, St> {
};
self.connection = SocketState::Available(Box::new(ws_stream));
#[cfg(unix)]
if let (Some(callback), Some(fd)) = (self.connection_fd_callback.as_ref(), self.ws_fd()) {
callback.as_ref()(fd);
}
Ok(())
}
@@ -311,7 +324,7 @@ impl<C, St> GatewayClient<C, St> {
// If we want to send a message (with response), we need to have a full control over the socket,
// as we need to be able to write the request and read the subsequent response
async fn send_websocket_message(
pub async fn send_websocket_message(
&mut self,
msg: impl Into<Message>,
) -> Result<ServerResponse, GatewayClientError> {
@@ -1034,6 +1047,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_controller: None,
stats_reporter: ClientStatsSender::new(None),
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback: None,
task_client,
}
}
@@ -1064,6 +1079,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: self.negotiated_protocol,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
task_client,
}
}
+3 -1
View File
@@ -8,10 +8,12 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dashmap = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tokio = { workspace = true, features = ["time", "sync"] }
tokio-util = { workspace = true, features = ["codec"], optional = true }
tokio-stream = { workspace = true }
# internal
nym-sphinx = { path = "../../nymsphinx" }
+141 -86
View File
@@ -1,21 +1,24 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::channel::mpsc;
use dashmap::DashMap;
use futures::StreamExt;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::params::PacketType;
use nym_sphinx::NymPacket;
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::Framed;
use tracing::*;
@@ -55,11 +58,37 @@ pub trait SendWithoutResponse {
}
pub struct Client {
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
active_connections: ActiveConnections,
connections_count: Arc<AtomicUsize>,
config: Config,
}
struct ConnectionSender {
#[derive(Default, Clone)]
pub struct ActiveConnections {
inner: Arc<DashMap<NymNodeRoutingAddress, ConnectionSender>>,
}
impl ActiveConnections {
pub fn pending_packets(&self) -> usize {
self.inner
.iter()
.map(|sender| {
let max_capacity = sender.channel.max_capacity();
let capacity = sender.channel.capacity();
max_capacity - capacity
})
.sum()
}
}
impl Deref for ActiveConnections {
type Target = DashMap<NymNodeRoutingAddress, ConnectionSender>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct ConnectionSender {
channel: mpsc::Sender<FramedNymPacket>,
current_reconnection_attempt: Arc<AtomicU32>,
}
@@ -73,46 +102,53 @@ impl ConnectionSender {
}
}
impl Client {
pub fn new(config: Config) -> Client {
Client {
conn_new: HashMap::new(),
config,
struct ManagedConnection {
address: SocketAddr,
message_receiver: ReceiverStream<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
}
impl ManagedConnection {
fn new(
address: SocketAddr,
message_receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
) -> Self {
ManagedConnection {
address,
message_receiver: ReceiverStream::new(message_receiver),
connection_timeout,
current_reconnection,
}
}
async fn manage_connection(
address: SocketAddr,
receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
) {
async fn run(self) {
let address = self.address;
let connection_fut = TcpStream::connect(address);
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", address);
debug!("Managed to establish connection to {}", self.address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
self.current_reconnection.store(0, Ordering::Release);
Framed::new(stream, NymCodec)
}
Err(err) => {
debug!(
"failed to establish connection to {} (err: {})",
address, err
);
debug!("failed to establish connection to {address} (err: {err})",);
return;
}
},
Err(_) => {
debug!(
"failed to connect to {} within {:?}",
address, connection_timeout
"failed to connect to {address} within {:?}",
self.connection_timeout
);
// we failed to connect - increase reconnection attempt
current_reconnection.fetch_add(1, Ordering::SeqCst);
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
@@ -120,15 +156,28 @@ impl Client {
// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {} - {err}", address);
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {address}: {err}");
}
debug!(
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
address
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
);
}
}
impl Client {
pub fn new(config: Config, connections_count: Arc<AtomicUsize>) -> Client {
Client {
active_connections: Default::default(),
connections_count,
config,
}
}
pub fn active_connections(&self) -> ActiveConnections {
self.active_connections.clone()
}
/// If we're trying to reconnect, determine how long we should wait.
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
@@ -148,7 +197,7 @@ impl Client {
}
fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) {
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
let (sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
// this CAN'T fail because we just created the channel which has a non-zero capacity
if self.config.maximum_connection_buffer_size > 0 {
@@ -156,15 +205,16 @@ impl Client {
}
// if we already tried to connect to `address` before, grab the current attempt count
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.conn_new.insert(address, new_entry);
current_attempt
};
let current_reconnection_attempt =
if let Some(mut existing) = self.active_connections.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.active_connections.insert(address, new_entry);
current_attempt
};
// load the actual value.
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
@@ -173,6 +223,7 @@ impl Client {
// copy the value before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;
let connections_count = self.connections_count.clone();
tokio::spawn(async move {
// before executing the manager, wait for what was specified, if anything
if let Some(backoff) = backoff {
@@ -180,13 +231,16 @@ impl Client {
sleep(backoff).await;
}
Self::manage_connection(
connections_count.fetch_add(1, Ordering::SeqCst);
ManagedConnection::new(
address.into(),
receiver,
initial_connection_timeout,
&current_reconnection_attempt,
current_reconnection_attempt,
)
.await
.run()
.await;
connections_count.fetch_sub(1, Ordering::SeqCst);
});
}
}
@@ -201,49 +255,47 @@ impl SendWithoutResponse for Client {
trace!("Sending packet to {address:?}");
let framed_packet = FramedNymPacket::new(packet, packet_type);
if let Some(sender) = self.conn_new.get_mut(&address) {
if let Err(err) = sender.channel.try_send(framed_packet) {
if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
))
} else if err.is_disconnected() {
debug!(
"Connection to {} seems to be dead. attempting to re-establish it...",
address
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, err.into_inner());
Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
))
} else {
// this can't really happen, but let's safe-guard against it in case something changes in futures library
Err(io::Error::new(
io::ErrorKind::Other,
"unknown connection buffer error",
))
}
} else {
Ok(())
}
} else {
let Some(sender) = self.active_connections.get_mut(&address) else {
// there was never a connection to begin with
debug!("establishing initial connection to {}", address);
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
Err(io::Error::new(
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection is in progress",
))
}
));
};
let sending_res = sender.channel.try_send(framed_packet);
drop(sender);
sending_res.map_err(|err| {
match err {
TrySendError::Full(_) => {
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
)
}
TrySendError::Closed(dropped) => {
debug!(
"Connection to {address} seems to be dead. attempting to re-establish it...",
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, dropped);
io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
)
}
}
} )
}
}
@@ -252,12 +304,15 @@ mod tests {
use super::*;
fn dummy_client() -> Client {
Client::new(Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
})
Client::new(
Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
},
Default::default(),
)
}
#[test]
@@ -32,10 +32,10 @@ use time::Date;
use url::Url;
pub use crate::nym_api::NymApiClientExt;
use nym_mixnet_contract_common::EpochRewardedSet;
pub use nym_mixnet_contract_common::{
mixnode::MixNodeDetails, GatewayBond, IdentityKey, IdentityKeyRef, NodeId, NymNodeDetails,
};
// re-export the type to not break existing imports
pub use crate::coconut::EcashApiClient;
@@ -367,6 +367,10 @@ impl NymApiClient {
Ok(self.nym_api.get_basic_gateways().await?.nodes)
}
pub async fn get_current_rewarded_set(&self) -> Result<EpochRewardedSet, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_set().await?.into())
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
pub async fn get_all_basic_entry_assigned_nodes(
@@ -13,7 +13,7 @@ use nym_api_requests::ecash::models::{
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, LegacyDescribedMixNode, NodePerformanceResponse,
NodeRefreshBody, NymNodeDescription,
NodeRefreshBody, NymNodeDescription, RewardedSetResponse,
};
use nym_api_requests::nym_nodes::PaginatedCachedNodesResponse;
use nym_api_requests::pagination::PaginatedResponse;
@@ -235,6 +235,15 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_set(&self) -> Result<RewardedSetResponse, NymAPIError> {
self.get_json(
&[routes::API_VERSION, "nym-nodes", "rewarded-set"],
NO_PARAMS,
)
.await
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
#[instrument(level = "debug", skip(self))]
@@ -912,6 +921,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn force_refresh_describe_cache(
&self,
request: &NodeRefreshBody,
@@ -924,6 +934,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_for(
&self,
expiration_date: Date,
@@ -940,6 +951,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_challenge(
&self,
expiration_date: Date,
@@ -26,10 +26,10 @@ use nym_mixnet_contract_common::{
reward_params::{Performance, RewardingParams},
rewarding::{EstimatedCurrentEpochRewardResponse, PendingRewardResponse},
ContractBuildInformation, ContractState, ContractStateParams, CurrentIntervalResponse,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochStatus, GatewayBond,
GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry, IdentityKey,
IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails, MixOwnershipResponse,
MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochRewardedSet, EpochStatus,
GatewayBond, GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry,
IdentityKey, IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails,
MixOwnershipResponse, MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
NumberOfPendingEventsResponse, NymNodeBond, NymNodeDetails, NymNodeVersionHistoryResponse,
PagedAllDelegationsResponse, PagedDelegatorDelegationsResponse, PagedGatewayResponse,
PagedMixnodeBondsResponse, PagedNodeDelegationsResponse, PendingEpochEvent,
@@ -670,7 +670,7 @@ impl<T> PagedMixnetQueryClient for T where T: MixnetQueryClient {}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MixnetQueryClientExt: MixnetQueryClient {
async fn get_rewarded_set(&self) -> Result<RewardedSet, NyxdError> {
async fn get_rewarded_set(&self) -> Result<EpochRewardedSet, NyxdError> {
let error_response = |message| Err(NyxdError::extension_query_failure("mixnet", message));
let metadata = self.get_rewarded_set_metadata().await?;
@@ -711,13 +711,16 @@ pub trait MixnetQueryClientExt: MixnetQueryClient {
return error_response("the nodes assigned for 'standby' returned unexpected epoch_id");
}
Ok(RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
Ok(EpochRewardedSet {
epoch_id: expected_epoch_id,
assignment: RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
},
})
}
}
+2 -1
View File
@@ -12,7 +12,8 @@ dirs = { workspace = true, optional = true }
handlebars = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
toml = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true, features = ["display"] }
url = { workspace = true }
nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
+10
View File
@@ -0,0 +1,10 @@
use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NymConfigTomlError {
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
TomlSerializeFailure(#[from] toml::ser::Error),
}
+37
View File
@@ -13,6 +13,7 @@ pub use helpers::{parse_urls, OptionalSet};
pub use toml::de::Error as TomlDeError;
pub mod defaults;
pub mod error;
pub mod helpers;
pub mod legacy_helpers;
pub mod serde_helpers;
@@ -95,6 +96,42 @@ where
config.format_to_writer(file)
}
pub fn save_unformatted_config_to_file<C, P>(
config: &C,
path: P,
) -> Result<(), error::NymConfigTomlError>
where
C: Serialize + ?Sized,
P: AsRef<Path>,
{
let path = path.as_ref();
log::info!("saving config file to {}", path.display());
if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}
let mut file = File::create(path)?;
// TODO: check for whether any of our configs store anything sensitive
// and change that to 0o644 instead
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(path, perms)?;
}
// let serde format the TOML in whatever ugly way it chooses
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
let toml_string = toml::to_string_pretty(config)?;
Ok(file.write_all(toml_string.as_bytes())?)
}
pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
where
C: DeserializeOwned,
@@ -7,6 +7,7 @@
use crate::constants::{TOKEN_SUPPLY, UNIT_DELEGATION_BASE};
use crate::error::MixnetContractError;
use crate::helpers::IntoBaseDecimal;
use crate::nym_node::Role;
use crate::reward_params::{NodeRewardingParameters, RewardingParams};
use crate::rewarding::helpers::truncate_reward;
use crate::rewarding::RewardDistribution;
@@ -624,6 +625,16 @@ pub enum LegacyMixLayer {
Three = 3,
}
impl From<LegacyMixLayer> for Role {
fn from(layer: LegacyMixLayer) -> Self {
match layer {
LegacyMixLayer::One => Role::Layer1,
LegacyMixLayer::Two => Role::Layer2,
LegacyMixLayer::Three => Role::Layer3,
}
}
}
impl From<LegacyMixLayer> for String {
fn from(layer: LegacyMixLayer) -> Self {
(layer as u8).to_string()
@@ -3,6 +3,7 @@
use crate::config_score::{ConfigScoreParams, OutdatedVersionWeights, VersionScoreFormulaParams};
use crate::nym_node::Role;
use crate::EpochId;
use contracts_common::Percent;
use cosmwasm_schema::cw_serde;
use cosmwasm_std::Coin;
@@ -32,6 +33,23 @@ impl RoleAssignment {
}
}
#[cw_serde]
#[derive(Default)]
pub struct EpochRewardedSet {
pub epoch_id: EpochId,
pub assignment: RewardedSet,
}
impl From<(EpochId, RewardedSet)> for EpochRewardedSet {
fn from((epoch_id, assignment): (EpochId, RewardedSet)) -> Self {
EpochRewardedSet {
epoch_id,
assignment,
}
}
}
#[cw_serde]
#[derive(Default)]
pub struct RewardedSet {
@@ -69,6 +87,29 @@ impl RewardedSet {
pub fn rewarded_set_size(&self) -> usize {
self.active_set_size() + self.standby.len()
}
pub fn get_role(&self, node_id: NodeId) -> Option<Role> {
// given each role has ~100 entries in them, doing linear lookup with vec should be fine
if self.entry_gateways.contains(&node_id) {
return Some(Role::EntryGateway);
}
if self.exit_gateways.contains(&node_id) {
return Some(Role::ExitGateway);
}
if self.layer1.contains(&node_id) {
return Some(Role::Layer1);
}
if self.layer2.contains(&node_id) {
return Some(Role::Layer2);
}
if self.layer3.contains(&node_id) {
return Some(Role::Layer3);
}
if self.standby.contains(&node_id) {
return Some(Role::Standby);
}
None
}
}
#[cw_serde]
@@ -23,6 +23,11 @@ impl SqliteEcashTicketbookManager {
SqliteEcashTicketbookManager { connection_pool }
}
/// Closes the connection pool.
pub async fn close(&self) {
self.connection_pool.close().await
}
pub(crate) async fn cleanup_expired(&self, deadline: Date) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM ecash_ticketbook WHERE expiration_date <= ?",
@@ -43,6 +43,10 @@ impl Debug for EphemeralStorage {
impl Storage for EphemeralStorage {
type StorageError = StorageError;
async fn close(&self) {
// nothing to do here
}
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
self.storage_manager.cleanup_expired().await;
Ok(())
@@ -33,7 +33,10 @@ use nym_credentials::{
IssuanceTicketBook, IssuedTicketBook,
};
use nym_ecash_time::{ecash_today, Date, EcashTime};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use zeroize::Zeroizing;
@@ -56,6 +59,9 @@ impl PersistentStorage {
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -83,6 +89,10 @@ impl PersistentStorage {
impl Storage for PersistentStorage {
type StorageError = StorageError;
async fn close(&self) {
self.storage_manager.close().await
}
/// remove all expired ticketbooks and expiration date signatures
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
let ecash_yesterday = ecash_today().date().previous_day().unwrap();
+2
View File
@@ -22,6 +22,8 @@ use std::error::Error;
pub trait Storage: Send + Sync {
type StorageError: Error;
async fn close(&self);
/// remove all expired ticketbooks and expiration date signatures
async fn cleanup_expired(&self) -> Result<(), Self::StorageError>;
@@ -9,7 +9,7 @@ use futures::{Sink, Stream};
use rand::{CryptoRng, RngCore};
use tungstenite::Message as WsMessage;
impl<'a, S, R> State<'a, S, R> {
impl<S, R> State<'_, S, R> {
async fn client_handshake_inner(&mut self) -> Result<(), HandshakeError>
where
S: Stream<Item = WsItem> + Sink<WsMessage> + Unpin,
@@ -10,7 +10,7 @@ use crate::registration::handshake::{error::HandshakeError, WsItem};
use futures::{Sink, Stream};
use tungstenite::Message as WsMessage;
impl<'a, S, R> State<'a, S, R> {
impl<S, R> State<'_, S, R> {
async fn gateway_handshake_inner(
&mut self,
raw_init_message: Vec<u8>,
@@ -20,6 +20,10 @@ pub enum ClientRequest {
hkdf_salt: Vec<u8>,
derived_key_digest: Vec<u8>,
},
ForgetMe {
client: bool,
stats: bool,
},
}
impl ClientRequest {
@@ -11,6 +11,7 @@ use tungstenite::Message;
#[non_exhaustive]
pub enum SensitiveServerResponse {
KeyUpgradeAck {},
ForgetMeAck {},
}
impl SensitiveServerResponse {
+17 -1
View File
@@ -6,7 +6,10 @@ use models::StoredFinishedSession;
use nym_node_metrics::entry::{ActiveSession, FinishedSession, SessionType};
use nym_sphinx::DestinationAddressBytes;
use sessions::SessionManager;
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use time::Date;
use tracing::{debug, error};
@@ -36,6 +39,9 @@ impl PersistentStatsStorage {
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -116,6 +122,16 @@ impl PersistentStatsStorage {
.await?)
}
pub async fn delete_unique_user(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_unique_user(client_address.as_base58_string())
.await?)
}
pub async fn insert_active_session(
&self,
client_address: DestinationAddressBytes,
@@ -71,6 +71,16 @@ impl SessionManager {
Ok(())
}
pub(crate) async fn delete_unique_user(&self, client_address_b58: String) -> Result<()> {
sqlx::query!(
"DELETE FROM sessions_unique_users WHERE client_address = ?",
client_address_b58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn get_unique_users(&self, date: Date) -> Result<Vec<String>> {
sqlx::query_scalar!(
"SELECT client_address as count FROM sessions_unique_users WHERE day = ?",
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM message_store WHERE client_address_bs58 = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "3ea5542b21a41b14276a8fd6b870c61aa0ddd30fee2565803b88c6086bd2a734"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM available_bandwidth WHERE client_id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "a3cc707995b8215fa77738cd1a55f9e8d251a3e764104d2a54153895dee1a118"
}
+10
View File
@@ -49,6 +49,16 @@ impl BandwidthManager {
Ok(())
}
pub(crate) async fn remove_client(&self, client_id: i64) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM available_bandwidth WHERE client_id = ?",
client_id
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
/// Set the expiration date of the particular client to the provided date.
pub(crate) async fn set_expiration(
&self,
+13
View File
@@ -133,4 +133,17 @@ impl InboxManager {
.await?;
Ok(())
}
pub(crate) async fn remove_messages_for_client(
&self,
client_address_bs58: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM message_store WHERE client_address_bs58 = ?",
client_address_bs58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
}
+49 -1
View File
@@ -12,7 +12,10 @@ use nym_credentials_interface::ClientTicket;
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_sphinx::DestinationAddressBytes;
use shared_keys::SharedKeysManager;
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use tickets::TicketStorageManager;
use time::OffsetDateTime;
@@ -41,6 +44,33 @@ pub struct GatewayStorage {
}
impl GatewayStorage {
#[allow(dead_code)]
pub(crate) fn client_manager(&self) -> &ClientManager {
&self.client_manager
}
pub(crate) fn shared_key_manager(&self) -> &SharedKeysManager {
&self.shared_key_manager
}
pub(crate) fn inbox_manager(&self) -> &InboxManager {
&self.inbox_manager
}
pub(crate) fn bandwidth_manager(&self) -> &BandwidthManager {
&self.bandwidth_manager
}
#[allow(dead_code)]
pub(crate) fn ticket_manager(&self) -> &TicketStorageManager {
&self.ticket_manager
}
#[allow(dead_code)]
pub(crate) fn wireguard_peer_manager(&self) -> &wireguard_peers::WgPeerManager {
&self.wireguard_peer_manager
}
/// Initialises `PersistentStorage` using the provided path.
///
/// # Arguments
@@ -59,6 +89,9 @@ impl GatewayStorage {
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -101,6 +134,21 @@ impl GatewayStorage {
.await?)
}
pub async fn handle_forget_me(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), GatewayStorageError> {
let client_id = self.get_mixnet_client_id(client_address).await?;
self.inbox_manager()
.remove_messages_for_client(&client_address.as_base58_string())
.await?;
self.bandwidth_manager().remove_client(client_id).await?;
self.shared_key_manager()
.remove_shared_keys(&client_address.as_base58_string())
.await?;
Ok(())
}
pub async fn insert_shared_keys(
&self,
client_address: DestinationAddressBytes,
+241 -138
View File
@@ -192,6 +192,28 @@ impl Client {
&self.base_url
}
pub fn create_request<B, K, V>(
&self,
method: reqwest::Method,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: Option<&B>,
) -> RequestBuilder
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
{
let url = sanitize_url(&self.base_url, path, params);
let mut request = self.reqwest_client.request(method.clone(), url);
if let Some(body) = json_body {
request = request.json(body);
}
request
}
pub fn create_get_request<K, V>(
&self,
path: PathSegments<'_>,
@@ -205,38 +227,6 @@ impl Client {
self.reqwest_client.get(url)
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending GET request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.get(url).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.get(url).send().await?)
}
}
pub fn create_post_request<B, K, V>(
&self,
path: PathSegments<'_>,
@@ -252,36 +242,6 @@ impl Client {
self.reqwest_client.post(url).json(json_body)
}
async fn send_post_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.post(url).json(json_body).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.post(url).json(json_body).send().await?)
}
}
pub fn create_delete_request<K, V>(
&self,
path: PathSegments<'_>,
@@ -295,6 +255,88 @@ impl Client {
self.reqwest_client.delete(url)
}
pub fn create_patch_request<B, K, V>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> RequestBuilder
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
{
let url = sanitize_url(&self.base_url, path, params);
self.reqwest_client.patch(url).json(json_body)
}
async fn send_request<B, K, V, E>(
&self,
method: reqwest::Method,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: Option<&B>,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
let url = sanitize_url(&self.base_url, path, params);
let mut request = self.reqwest_client.request(method.clone(), url);
if let Some(body) = json_body {
request = request.json(body);
}
#[cfg(target_arch = "wasm32")]
{
Ok(
wasmtimer::tokio::timeout(self.request_timeout, request.send())
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(request.send().await?)
}
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
) -> Result<Response, HttpClientError<E>>
where
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::GET, path, params, None::<&()>)
.await
}
async fn send_post_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::POST, path, params, Some(json_body))
.await
}
pub async fn send_delete_request<K, V, E>(
&self,
path: PathSegments<'_>,
@@ -305,23 +347,24 @@ impl Client {
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending DELETE request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.delete(url).send(),
)
self.send_request(reqwest::Method::DELETE, path, params, None::<&()>)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self.reqwest_client.delete(url).send().await?)
}
pub async fn send_patch_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
self.send_request(reqwest::Method::PATCH, path, params, Some(json_body))
.await
}
#[instrument(level = "debug", skip_all)]
@@ -372,6 +415,56 @@ impl Client {
parse_response(res, false).await
}
pub async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
K: AsRef<str>,
V: AsRef<str>,
E: Display + DeserializeOwned,
{
let res = self.send_patch_request(path, params, json_body).await?;
parse_response(res, true).await
}
async fn call_json_endpoint<B, T, S, E>(
&self,
method: reqwest::Method,
endpoint: S,
json_body: Option<&B>,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str>,
{
let mut request = self
.reqwest_client
.request(method.clone(), self.base_url.join(endpoint.as_ref())?);
if let Some(body) = json_body {
request = request.json(body);
}
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(self.request_timeout, request.send())
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = { request.send().await? };
parse_response(res, false).await
}
#[instrument(level = "debug", skip_all)]
pub async fn get_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
@@ -379,27 +472,8 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.get(self.base_url.join(endpoint.as_ref())?)
.send(),
)
self.call_json_endpoint(reqwest::Method::GET, endpoint, None::<&()>)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.get(self.base_url.join(endpoint.as_ref())?)
.send()
.await?
};
parse_response(res, false).await
}
pub async fn post_json_endpoint<B, T, S, E>(
@@ -413,29 +487,8 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.post(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send(),
)
self.call_json_endpoint(reqwest::Method::POST, endpoint, Some(json_body))
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.post(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send()
.await?
};
parse_response(res, true).await
}
pub async fn delete_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
@@ -444,27 +497,23 @@ impl Client {
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.delete(self.base_url.join(endpoint.as_ref())?)
.send(),
)
self.call_json_endpoint(reqwest::Method::DELETE, endpoint, None::<&()>)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
}
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.delete(self.base_url.join(endpoint.as_ref())?)
.send()
.await?
};
parse_response(res, false).await
pub async fn patch_json_endpoint<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str>,
{
self.call_json_endpoint(reqwest::Method::PATCH, endpoint, Some(json_body))
.await
}
}
@@ -509,6 +558,19 @@ pub trait ApiClient {
V: AsRef<str> + Sync,
E: Display + DeserializeOwned;
async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
K: AsRef<str> + Sync,
V: AsRef<str> + Sync,
E: Display + DeserializeOwned;
/// `get` json data from the provided absolute endpoint, i.e. for example `"/api/v1/mixnodes?since=12345"`
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
@@ -532,6 +594,17 @@ pub trait ApiClient {
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send;
async fn patch_json_data_at<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -581,6 +654,22 @@ impl ApiClient for Client {
self.delete_json(path, params).await
}
async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
K: AsRef<str> + Sync,
V: AsRef<str> + Sync,
E: Display + DeserializeOwned,
{
self.patch_json(path, params, json_body).await
}
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
for<'a> T: Deserialize<'a>,
@@ -612,6 +701,20 @@ impl ApiClient for Client {
{
self.delete_json_endpoint(endpoint).await
}
async fn patch_json_data_at<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized + Sync,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str> + Sync + Send,
{
self.patch_json_endpoint(endpoint, json_body).await
}
}
// utility function that should solve the double slash problem in API urls forever.
@@ -63,6 +63,7 @@ impl From<v6::request::StaticConnectRequest> for v7::request::StaticConnectReque
}
}
#[allow(deprecated)]
impl From<v6::request::DynamicConnectRequest> for v7::request::DynamicConnectRequest {
fn from(dynamic_connect_request: v6::request::DynamicConnectRequest) -> Self {
Self {
@@ -51,6 +51,7 @@ impl IpPacketRequest {
)
}
#[allow(deprecated)]
pub fn new_dynamic_connect_request(
reply_to: Recipient,
reply_to_hops: Option<u8>,
@@ -285,6 +286,9 @@ pub struct DynamicConnectRequest {
// The number of mix node hops that responses should take, in addition to the entry and exit
// node. Zero means only client -> entry -> exit -> client.
#[deprecated(
note = "clients can no longer control number of hops to use. this field is scheduled for removal in V8"
)]
pub reply_to_hops: Option<u8>,
// The average delay at each mix node, in milliseconds. Currently this is not supported by the
+11
View File
@@ -57,3 +57,14 @@ pub mod wireguard {
pub const WG_TUN_DEVICE_IP_ADDRESS_V6: Ipv6Addr = Ipv6Addr::new(0xfc01, 0, 0, 0, 0, 0, 0, 0x1); // fc01::1
pub const WG_TUN_DEVICE_NETMASK_V6: u8 = 112;
}
pub mod mixnet_vpn {
use std::net::{Ipv4Addr, Ipv6Addr};
// The interface used to route traffic
pub const NYM_TUN_BASE_NAME: &str = "nymtun";
pub const NYM_TUN_DEVICE_ADDRESS_V4: Ipv4Addr = Ipv4Addr::new(10, 0, 0, 1);
pub const NYM_TUN_DEVICE_NETMASK_V4: Ipv4Addr = Ipv4Addr::new(255, 255, 0, 0);
pub const NYM_TUN_DEVICE_ADDRESS_V6: Ipv6Addr = Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 0x1); // fc00::1
pub const NYM_TUN_DEVICE_NETMASK_V6: &str = "112";
}
+33 -31
View File
@@ -2,10 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::NetworkTestingError;
use crate::node::TestableNode;
use crate::NodeId;
use crate::node::{NodeType, TestableNode};
use nym_sphinx::message::NymMessage;
use nym_topology::{gateway, mix};
use nym_topology::node::RoutingNode;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
@@ -26,73 +25,76 @@ pub struct TestMessage<T = Empty> {
}
impl<T> TestMessage<T> {
pub fn new<N: Into<TestableNode>>(node: N, msg_id: u32, total_msgs: u32, ext: T) -> Self {
pub fn new(tested_node: TestableNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
TestMessage {
tested_node: node.into(),
tested_node,
msg_id,
total_msgs,
ext,
}
}
pub fn new_mix(node: &mix::LegacyNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(node, msg_id, total_msgs, ext)
pub fn new_mix(node: &RoutingNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(
TestableNode::new_routing(node, NodeType::Mixnode),
msg_id,
total_msgs,
ext,
)
}
// pub fn new_gateway(node: &gateway::Node, msg_id: u32, total_msgs: u32, ext: T) -> Self {
// Self::new(node, msg_id, total_msgs, ext)
// }
pub fn new_serialized<N>(
node: N,
msg_id: u32,
total_msgs: u32,
ext: T,
) -> Result<Vec<u8>, NetworkTestingError>
where
N: Into<TestableNode>,
T: Serialize,
{
Self::new(node, msg_id, total_msgs, ext).as_bytes()
pub fn new_gateway(node: &RoutingNode, msg_id: u32, total_msgs: u32, ext: T) -> Self {
Self::new(
TestableNode::new_routing(node, NodeType::Gateway),
msg_id,
total_msgs,
ext,
)
}
pub fn new_plaintexts<N>(
node: &N,
pub fn new_plaintexts(
node: TestableNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
for<'a> &'a N: Into<TestableNode>,
T: Serialize + Clone,
{
let mut msgs = Vec::with_capacity(total_msgs as usize);
for msg_id in 1..=total_msgs {
msgs.push(Self::new(node, msg_id, total_msgs, ext.clone()).as_bytes()?)
msgs.push(Self::new(node.clone(), msg_id, total_msgs, ext.clone()).as_bytes()?)
}
Ok(msgs)
}
pub fn mix_plaintexts(
node: &mix::LegacyNode,
node: &RoutingNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
T: Serialize + Clone,
{
Self::new_plaintexts(node, total_msgs, ext)
Self::new_plaintexts(
TestableNode::new_routing(node, NodeType::Mixnode),
total_msgs,
ext,
)
}
pub fn legacy_gateway_plaintexts(
node: &gateway::LegacyNode,
node_id: NodeId,
node: &RoutingNode,
total_msgs: u32,
ext: T,
) -> Result<Vec<Vec<u8>>, NetworkTestingError>
where
T: Serialize + Clone,
{
Self::new_plaintexts(&(node, node_id), total_msgs, ext)
Self::new_plaintexts(
TestableNode::new_routing(node, NodeType::Gateway),
total_msgs,
ext,
)
}
pub fn as_json_string(&self) -> Result<String, NetworkTestingError>
+9 -33
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::NodeId;
use nym_topology::{gateway, mix};
use nym_topology::node::RoutingNode;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
@@ -24,6 +24,14 @@ impl TestableNode {
}
}
pub fn new_routing(routing_node: &RoutingNode, typ: NodeType) -> Self {
TestableNode::new(
routing_node.identity_key.to_base58_string(),
typ,
routing_node.node_id,
)
}
pub fn new_mixnode(encoded_identity: String, node_id: NodeId) -> Self {
TestableNode::new(encoded_identity, NodeType::Mixnode, node_id)
}
@@ -37,38 +45,6 @@ impl TestableNode {
}
}
impl<'a> From<&'a mix::LegacyNode> for TestableNode {
fn from(value: &'a mix::LegacyNode) -> Self {
TestableNode {
encoded_identity: value.identity_key.to_base58_string(),
typ: NodeType::Mixnode,
node_id: value.mix_id,
}
}
}
impl<'a> From<(&'a gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): (&'a gateway::LegacyNode, NodeId)) -> Self {
(&(gateway, node_id)).into()
}
}
impl<'a> From<&'a (gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): &'a (gateway::LegacyNode, NodeId)) -> Self {
(gateway, *node_id).into()
}
}
impl<'a, 'b> From<&'a (&'b gateway::LegacyNode, NodeId)> for TestableNode {
fn from((gateway, node_id): &'a (&'b gateway::LegacyNode, NodeId)) -> Self {
TestableNode {
encoded_identity: gateway.identity_key.to_base58_string(),
typ: NodeType::Gateway,
node_id: *node_id,
}
}
}
impl Display for TestableNode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
+39 -93
View File
@@ -2,21 +2,22 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::NetworkTestingError;
use crate::Empty;
use crate::NodeId;
use crate::TestMessage;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx::params::PacketSize;
use nym_sphinx::preparer::{FragmentPreparer, PreparedFragment};
use nym_sphinx_params::PacketType;
use nym_topology::{gateway, mix, NymTopology};
use nym_topology::node::RoutingNode;
use nym_topology::{NymRouteProvider, NymTopology, Role};
use rand::{CryptoRng, Rng};
use serde::Serialize;
use std::sync::Arc;
use std::time::Duration;
pub use nym_topology::node::LegacyMixLayer;
pub struct NodeTester<R> {
rng: R,
@@ -38,10 +39,6 @@ pub struct NodeTester<R> {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
// while acks are going to be ignored they still need to be constructed
// so that the gateway would be able to correctly process and forward the message
ack_key: Arc<AckKey>,
@@ -70,41 +67,27 @@ where
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
ack_key,
}
}
/// Allows setting non-default number of expected mix hops in the network.
#[allow(dead_code)]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
pub fn testable_mix_topology(&self, node: &mix::LegacyNode) -> NymTopology {
pub fn testable_mix_topology(&self, layer: LegacyMixLayer, node: &RoutingNode) -> NymTopology {
let mut topology = self.base_topology.clone();
topology.set_mixes_in_layer(node.layer as u8, vec![node.clone()]);
topology.set_testable_node(layer.into(), node.clone());
topology
}
pub fn testable_gateway_topology(&self, gateway: &gateway::LegacyNode) -> NymTopology {
pub fn testable_gateway_topology(&self, node: &RoutingNode) -> NymTopology {
let mut topology = self.base_topology.clone();
topology.set_gateways(vec![gateway.clone()]);
topology.set_testable_node(Role::EntryGateway, node.clone());
topology.set_testable_node(Role::ExitGateway, node.clone());
topology
}
pub fn simple_mixnode_test_packets(
&mut self,
mix: &mix::LegacyNode,
test_packets: u32,
) -> Result<Vec<PreparedFragment>, NetworkTestingError> {
self.mixnode_test_packets(mix, Empty, test_packets, None)
}
pub fn mixnode_test_packets<T>(
&mut self,
mix: &mix::LegacyNode,
mix: &RoutingNode,
legacy_mix_layer: LegacyMixLayer,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -112,7 +95,9 @@ where
where
T: Serialize + Clone,
{
let ephemeral_topology = self.testable_mix_topology(mix);
let ephemeral_topology =
NymRouteProvider::from(self.testable_mix_topology(legacy_mix_layer, mix))
.with_ignore_egress_epoch_roles(true);
let mut packets = Vec::with_capacity(test_packets as usize);
for plaintext in TestMessage::mix_plaintexts(mix, test_packets, msg_ext)? {
@@ -128,7 +113,7 @@ where
pub fn mixnodes_test_packets<T>(
&mut self,
nodes: &[mix::LegacyNode],
nodes: &[(LegacyMixLayer, RoutingNode)],
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -137,9 +122,10 @@ where
T: Serialize + Clone,
{
let mut packets = Vec::new();
for node in nodes {
for (layer, node) in nodes {
packets.append(&mut self.mixnode_test_packets(
node,
*layer,
msg_ext.clone(),
test_packets,
custom_recipient,
@@ -149,26 +135,10 @@ where
Ok(packets)
}
pub fn existing_mixnode_test_packets<T>(
&mut self,
mix_id: NodeId,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
) -> Result<Vec<PreparedFragment>, NetworkTestingError>
where
T: Serialize + Clone,
{
let Some(node) = self.base_topology.find_mix(mix_id) else {
return Err(NetworkTestingError::NonExistentMixnode { mix_id });
};
self.mixnode_test_packets(&node.clone(), msg_ext, test_packets, custom_recipient)
}
pub fn existing_identity_mixnode_test_packets<T>(
&mut self,
encoded_mix_identity: String,
layer: LegacyMixLayer,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -176,22 +146,30 @@ where
where
T: Serialize + Clone,
{
let Some(node) = self
.base_topology
.find_mix_by_identity(&encoded_mix_identity)
else {
let Ok(identity) = encoded_mix_identity.parse() else {
return Err(NetworkTestingError::NonExistentMixnodeIdentity {
mix_identity: encoded_mix_identity,
});
};
self.mixnode_test_packets(&node.clone(), msg_ext, test_packets, custom_recipient)
let Some(node) = self.base_topology.find_node_by_identity(identity) else {
return Err(NetworkTestingError::NonExistentMixnodeIdentity {
mix_identity: encoded_mix_identity,
});
};
self.mixnode_test_packets(
&node.clone(),
layer,
msg_ext,
test_packets,
custom_recipient,
)
}
pub fn legacy_gateway_test_packets<T>(
&mut self,
gateway: &gateway::LegacyNode,
node_id: NodeId,
gateway: &RoutingNode,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
@@ -199,12 +177,11 @@ where
where
T: Serialize + Clone,
{
let ephemeral_topology = self.testable_gateway_topology(gateway);
let ephemeral_topology = NymRouteProvider::from(self.testable_gateway_topology(gateway))
.with_ignore_egress_epoch_roles(true);
let mut packets = Vec::with_capacity(test_packets as usize);
for plaintext in
TestMessage::legacy_gateway_plaintexts(gateway, node_id, test_packets, msg_ext)?
{
for plaintext in TestMessage::legacy_gateway_plaintexts(gateway, test_packets, msg_ext)? {
packets.push(self.wrap_plaintext_data(
plaintext,
&ephemeral_topology,
@@ -215,36 +192,10 @@ where
Ok(packets)
}
pub fn existing_gateway_test_packets<T>(
&mut self,
node_id: NodeId,
encoded_gateway_identity: String,
msg_ext: T,
test_packets: u32,
custom_recipient: Option<Recipient>,
) -> Result<Vec<PreparedFragment>, NetworkTestingError>
where
T: Serialize + Clone,
{
let Some(node) = self.base_topology.find_gateway(&encoded_gateway_identity) else {
return Err(NetworkTestingError::NonExistentGateway {
gateway_identity: encoded_gateway_identity,
});
};
self.legacy_gateway_test_packets(
&node.clone(),
node_id,
msg_ext,
test_packets,
custom_recipient,
)
}
pub fn wrap_plaintext_data(
&mut self,
plaintext: Vec<u8>,
topology: &NymTopology,
topology: &NymRouteProvider,
custom_recipient: Option<Recipient>,
) -> Result<PreparedFragment, NetworkTestingError> {
let message = NymMessage::new_plain(plaintext);
@@ -274,14 +225,13 @@ where
&address,
&address,
PacketType::Mix,
None,
)?)
}
pub fn create_test_packet<T>(
&mut self,
message: &TestMessage<T>,
topology: &NymTopology,
topology: &NymRouteProvider,
custom_recipient: Option<Recipient>,
) -> Result<PreparedFragment, NetworkTestingError>
where
@@ -307,10 +257,6 @@ impl<R: CryptoRng + Rng> FragmentPreparer for NodeTester<R> {
1
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
fn average_packet_delay(&self) -> Duration {
self.average_packet_delay
}
@@ -65,6 +65,14 @@ impl<T> NonExhaustiveDelayQueue<T> {
pub fn remove(&mut self, key: &QueueKey) -> Expired<T> {
self.inner.remove(key)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl<T> Default for NonExhaustiveDelayQueue<T> {
+1 -1
View File
@@ -12,6 +12,6 @@ license.workspace = true
[dependencies]
prometheus = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
dashmap = { workspace = true }
lazy_static = { workspace = true }
+390 -81
View File
@@ -1,14 +1,18 @@
use dashmap::DashMap;
pub use log::error;
use log::{debug, warn};
use std::fmt;
pub use std::time::Instant;
use tracing::{debug, error, warn};
use prometheus::{core::Collector, Encoder as _, IntCounter, IntGauge, Registry, TextEncoder};
use prometheus::{
core::Collector, Encoder as _, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge, Registry,
TextEncoder,
};
pub use prometheus::HistogramTimer;
pub use std::time::Instant;
#[macro_export]
macro_rules! prepend_package_name {
($name: literal) => {
($name: tt) => {
&format!(
"{}_{}",
std::module_path!()
@@ -23,15 +27,29 @@ macro_rules! prepend_package_name {
#[macro_export]
macro_rules! inc_by {
($name:literal, $x:expr, $help: expr) => {
$crate::REGISTRY.maybe_register_and_inc_by(
$crate::prepend_package_name!($name),
$x as i64,
$help,
);
};
($name:literal, $x:expr) => {
$crate::REGISTRY.inc_by($crate::prepend_package_name!($name), $x as i64);
$crate::REGISTRY.maybe_register_and_inc_by(
$crate::prepend_package_name!($name),
$x as i64,
None,
);
};
}
#[macro_export]
macro_rules! inc {
($name:literal, $help: expr) => {
$crate::REGISTRY.maybe_register_and_inc($crate::prepend_package_name!($name), $help);
};
($name:literal) => {
$crate::REGISTRY.inc($crate::prepend_package_name!($name));
$crate::REGISTRY.maybe_register_and_inc($crate::prepend_package_name!($name), None);
};
}
@@ -42,6 +60,71 @@ macro_rules! metrics {
};
}
#[macro_export]
macro_rules! set_metric {
($name:literal, $x:expr, $help: expr) => {
$crate::REGISTRY.maybe_register_and_set(
$crate::prepend_package_name!($name),
$x as i64,
$help,
);
};
($name:literal, $x:expr) => {
$crate::REGISTRY.maybe_register_and_set(
$crate::prepend_package_name!($name),
$x as i64,
None,
);
};
}
#[macro_export]
macro_rules! set_metric_float {
($name:literal, $x:expr, $help: expr) => {
$crate::REGISTRY.maybe_register_and_set_float(
$crate::prepend_package_name!($name),
$x as f64,
$help,
);
};
($name:literal, $x:expr) => {
$crate::REGISTRY.maybe_register_and_set_float(
$crate::prepend_package_name!($name),
$x as f64,
None,
);
};
}
#[macro_export]
macro_rules! add_histogram_obs {
($name:expr, $x:expr, $b:expr, $help:expr) => {
$crate::REGISTRY.maybe_register_and_add_to_histogram(
$crate::prepend_package_name!($name),
$x as f64,
Some($b),
$help,
);
};
($name:expr, $x:expr, $b:expr) => {
$crate::REGISTRY.maybe_register_and_add_to_histogram(
$crate::prepend_package_name!($name),
$x as f64,
Some($b),
None,
);
};
($name:expr, $x:expr) => {
$crate::REGISTRY.maybe_register_and_add_to_histogram(
$crate::prepend_package_name!($name),
$x as f64,
None,
None,
);
};
}
#[macro_export]
macro_rules! nanos {
( $name:literal, $x:expr ) => {{
@@ -50,7 +133,7 @@ macro_rules! nanos {
let r = $x;
let duration = start.elapsed().as_nanos() as i64;
let name = $crate::prepend_package_name!($name);
$crate::REGISTRY.inc_by(&format!("{}_nanos", $name), duration);
$crate::REGISTRY.maybe_register_and_inc_by(&format!("{}_nanos", $name), duration, None);
r
}};
}
@@ -59,15 +142,100 @@ lazy_static::lazy_static! {
pub static ref REGISTRY: MetricsController = MetricsController::default();
}
pub fn metrics_registry() -> &'static MetricsController {
&REGISTRY
}
#[derive(Default)]
pub struct MetricsController {
registry: Registry,
registry_index: DashMap<String, Metric>,
}
enum Metric {
C(Box<IntCounter>),
G(Box<IntGauge>),
pub enum Metric {
IntCounter(Box<IntCounter>),
IntGauge(Box<IntGauge>),
FloatGauge(Box<Gauge>),
Histogram(Box<Histogram>),
}
impl Metric {
pub fn new_int_counter(name: &str, help: &str) -> Option<Self> {
match IntCounter::new(sanitize_metric_name(name), help) {
Ok(c) => Some(c.into()),
Err(err) => {
error!("Failed to create counter {name:?}: {err}");
None
}
}
}
pub fn new_int_gauge(name: &str, help: &str) -> Option<Self> {
match IntGauge::new(sanitize_metric_name(name), help) {
Ok(g) => Some(g.into()),
Err(err) => {
error!("Failed to create gauge {name:?}: {err}");
None
}
}
}
pub fn new_float_gauge(name: &str, help: &str) -> Option<Self> {
match Gauge::new(sanitize_metric_name(name), help) {
Ok(g) => Some(g.into()),
Err(err) => {
error!("Failed to create gauge {name:?}: {err}");
None
}
}
}
pub fn new_histogram(name: &str, help: &str, buckets: Option<&[f64]>) -> Option<Self> {
let mut opts = HistogramOpts::new(sanitize_metric_name(name), help);
if let Some(buckets) = buckets {
opts = opts.buckets(buckets.to_vec())
}
match Histogram::with_opts(opts) {
Ok(h) => Some(Metric::Histogram(Box::new(h))),
Err(err) => {
error!("failed to create histogram {name:?}: {err}");
None
}
}
}
fn as_collector(&self) -> Box<dyn Collector> {
match self {
Metric::IntCounter(c) => c.clone(),
Metric::IntGauge(g) => g.clone(),
Metric::FloatGauge(g) => g.clone(),
Metric::Histogram(h) => h.clone(),
}
}
}
impl From<IntCounter> for Metric {
fn from(v: IntCounter) -> Self {
Metric::IntCounter(Box::new(v))
}
}
impl From<IntGauge> for Metric {
fn from(v: IntGauge) -> Self {
Metric::IntGauge(Box::new(v))
}
}
impl From<Gauge> for Metric {
fn from(v: Gauge) -> Self {
Metric::FloatGauge(Box::new(v))
}
}
impl From<Histogram> for Metric {
fn from(v: Histogram) -> Self {
Metric::Histogram(Box::new(v))
}
}
fn fq_name(c: &dyn Collector) -> String {
@@ -81,34 +249,92 @@ impl Metric {
#[inline(always)]
fn fq_name(&self) -> String {
match self {
Metric::C(c) => fq_name(c.as_ref()),
Metric::G(g) => fq_name(g.as_ref()),
Metric::IntCounter(c) => fq_name(c.as_ref()),
Metric::IntGauge(g) => fq_name(g.as_ref()),
Metric::FloatGauge(g) => fq_name(g.as_ref()),
Metric::Histogram(h) => fq_name(h.as_ref()),
}
}
#[inline(always)]
fn inc(&self) {
match self {
Metric::C(c) => c.inc(),
Metric::G(g) => g.inc(),
Metric::IntCounter(c) => c.inc(),
Metric::IntGauge(g) => g.inc(),
Metric::FloatGauge(g) => g.inc(),
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call increment on a histogram")
}
}
}
#[inline(always)]
fn inc_by(&self, value: i64) {
match self {
Metric::C(c) => c.inc_by(value as u64),
Metric::G(g) => g.add(value),
Metric::IntCounter(c) => c.inc_by(value as u64),
Metric::IntGauge(g) => g.add(value),
Metric::FloatGauge(g) => {
warn!("attempted to increment a float gauge ('{}') by an integer - this is most likely a bug", self.fq_name());
g.add(value as f64)
}
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call increment on a histogram")
}
}
}
#[inline(always)]
fn set(&self, value: i64) {
match self {
Metric::C(_c) => {
Metric::IntCounter(_c) => {
warn!("Cannot set value for counter {:?}", self.fq_name());
}
Metric::G(g) => g.set(value),
Metric::IntGauge(g) => g.set(value),
Metric::FloatGauge(g) => {
warn!("attempted to set a float gauge ('{}') to an integer value - this is most likely a bug", self.fq_name());
g.set(value as f64)
}
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call set on a histogram")
}
}
}
#[inline(always)]
fn set_float(&self, value: f64) {
match self {
Metric::IntCounter(_c) => {
warn!("Cannot set value for counter {:?}", self.fq_name());
}
Metric::IntGauge(g) => {
warn!("attempted to set a integer gauge ('{}') to a float value - this is most likely a bug", self.fq_name());
g.set(value as i64)
}
Metric::FloatGauge(g) => g.set(value),
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call increment on a histogram")
}
}
}
#[inline(always)]
fn add_histogram_observation(&self, value: f64) {
match self {
Metric::Histogram(h) => {
h.observe(value);
}
_ => warn!("attempted to add histogram observation on a non-histogram metric"),
}
}
#[inline(always)]
fn start_timer(&self) -> Option<HistogramTimer> {
match self {
Metric::Histogram(h) => Some(h.start_timer()),
_ => {
warn!("attempted to start histogram observation on a non-histogram metric");
None
}
}
}
}
@@ -145,93 +371,165 @@ impl MetricsController {
}
}
pub fn set(&self, name: &str, value: i64) {
pub fn register_int_gauge<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_int_gauge(name, help.into().unwrap_or(name)) else {
return;
};
self.register_metric(metric);
}
pub fn register_float_gauge<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_float_gauge(name, help.into().unwrap_or(name)) else {
return;
};
self.register_metric(metric);
}
pub fn register_int_counter<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_int_counter(name, help.into().unwrap_or(name)) else {
return;
};
self.register_metric(metric);
}
pub fn register_histogram<'a>(
&self,
name: &str,
help: impl Into<Option<&'a str>>,
buckets: Option<&[f64]>,
) {
let Some(metric) = Metric::new_histogram(name, help.into().unwrap_or(name), buckets) else {
return;
};
self.register_metric(metric);
}
pub fn set(&self, name: &str, value: i64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.set(value);
true
} else {
let gauge = match IntGauge::new(sanitize_metric_name(name), name) {
Ok(g) => g,
Err(e) => {
debug!("Failed to create gauge {:?}:\n{}", name, e);
return;
}
};
self.register_gauge(Box::new(gauge));
self.set(name, value)
false
}
}
pub fn inc(&self, name: &str) {
pub fn set_float(&self, name: &str, value: f64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.set_float(value);
true
} else {
false
}
}
pub fn add_to_histogram(&self, name: &str, value: f64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.add_histogram_observation(value);
true
} else {
false
}
}
pub fn start_timer(&self, name: &str) -> Option<HistogramTimer> {
self.registry_index
.get(name)
.and_then(|metric| metric.start_timer())
}
pub fn inc(&self, name: &str) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.inc();
true
} else {
let counter = match IntCounter::new(sanitize_metric_name(name), name) {
Ok(c) => c,
Err(e) => {
debug!("Failed to create counter {:?}:\n{}", name, e);
return;
}
};
self.register_counter(Box::new(counter));
self.inc(name)
false
}
}
pub fn inc_by(&self, name: &str, value: i64) {
pub fn inc_by(&self, name: &str, value: i64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.inc_by(value);
true
} else {
let counter = match IntCounter::new(sanitize_metric_name(name), name) {
Ok(c) => c,
Err(e) => {
debug!("Failed to create counter {:?}:\n{}", name, e);
return;
}
};
self.register_counter(Box::new(counter));
self.inc_by(name, value)
false
}
}
fn register_gauge(&self, metric: Box<IntGauge>) {
let fq_name = metric
.desc()
.first()
.map(|d| d.fq_name.clone())
.unwrap_or_default();
pub fn maybe_register_and_set<'a>(
&self,
name: &str,
value: i64,
help: impl Into<Option<&'a str>>,
) {
if !self.set(name, value) {
let help = help.into();
self.register_int_gauge(name, help);
self.set(name, value);
}
}
pub fn maybe_register_and_set_float<'a>(
&self,
name: &str,
value: f64,
help: impl Into<Option<&'a str>>,
) {
if !self.set_float(name, value) {
let help = help.into();
self.register_float_gauge(name, help);
self.set_float(name, value);
}
}
pub fn maybe_register_and_add_to_histogram<'a>(
&self,
name: &str,
value: f64,
buckets: Option<&[f64]>,
help: impl Into<Option<&'a str>>,
) {
if !self.add_to_histogram(name, value) {
let help = help.into();
self.register_histogram(name, help, buckets);
self.add_to_histogram(name, value);
}
}
pub fn maybe_register_and_inc<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
if !self.inc(name) {
let help = help.into();
self.register_int_counter(name, help);
self.inc(name);
}
}
pub fn maybe_register_and_inc_by<'a>(
&self,
name: &str,
value: i64,
help: impl Into<Option<&'a str>>,
) {
if !self.inc_by(name, value) {
let help = help.into();
self.register_int_counter(name, help);
self.inc_by(name, value);
}
}
pub fn register_metric(&self, metric: impl Into<Metric>) {
let m = metric.into();
let fq_name = m.fq_name();
if self.registry_index.contains_key(&fq_name) {
return;
}
match self.registry.register(metric.clone()) {
match self.registry.register(m.as_collector()) {
Ok(_) => {
self.registry_index
.insert(fq_name, Metric::G(metric.clone()));
self.registry_index.insert(fq_name, m);
}
Err(e) => {
debug!("Failed to register {:?}:\n{}", fq_name, e)
}
}
}
fn register_counter(&self, metric: Box<IntCounter>) {
let fq_name = metric
.desc()
.first()
.map(|d| d.fq_name.clone())
.unwrap_or_default();
if self.registry_index.contains_key(&fq_name) {
return;
}
match self.registry.register(metric.clone()) {
Ok(_) => {
self.registry_index
.insert(fq_name, Metric::C(metric.clone()));
}
Err(e) => {
debug!("Failed to register {:?}:\n{}", fq_name, e)
Err(err) => {
debug!("Failed to register '{fq_name}': {err}")
}
}
}
@@ -275,4 +573,15 @@ mod tests {
"packets_sent_34_242_65_133:1789"
)
}
#[test]
fn prepend_package_name() {
let literal = prepend_package_name!("foo");
assert_eq!(literal, "nym_metrics_foo");
let bar = "bar";
let format = format!("foomp_{}", bar);
let formatted = prepend_package_name!(format);
assert_eq!(formatted, "nym_metrics_foomp_bar");
}
}
@@ -20,7 +20,6 @@ rand = { workspace = true }
thiserror = { workspace = true }
sha2 = "0.9"
bs58 = { workspace = true }
utoipa = { workspace = true }
serde = { workspace = true, features = ["derive"] }
rayon = { workspace = true, optional = true }
zeroize = { workspace = true, features = ["zeroize_derive"] }
@@ -115,10 +115,7 @@ pub fn aggregate_signatures(
let params = ecash_group_parameters();
// aggregate the signature
let signature = match Aggregatable::aggregate(signatures, indices) {
Ok(res) => res,
Err(err) => return Err(err),
};
let signature = Aggregatable::aggregate(signatures, indices)?;
// Ensure the aggregated signature is not an infinity point
if bool::from(signature.is_at_infinity()) {
@@ -16,7 +16,6 @@ use group::{Curve, Group};
use itertools::Itertools;
use std::borrow::Borrow;
use std::ops::Neg;
use utoipa::ToSchema;
pub struct Polynomial {
coefficients: Vec<Scalar>,
@@ -52,17 +51,6 @@ impl Polynomial {
}
}
#[derive(ToSchema)]
#[schema(title = "G2Projective")]
pub struct G2ProjectiveSchema {
#[schema(content_encoding = "base16")]
pub x: [u64; 6],
#[schema(content_encoding = "base16")]
pub y: [u64; 6],
#[schema(content_encoding = "base16")]
pub z: [u64; 6],
}
#[inline]
pub fn generate_lagrangian_coefficients_at_origin(points: &[u64]) -> Vec<Scalar> {
let x = Scalar::zero();
@@ -8,10 +8,10 @@ use nym_sphinx_addressing::nodes::{
NymNodeRoutingAddress, NymNodeRoutingAddressError, MAX_NODE_ADDRESS_UNPADDED_LEN,
};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_params::PacketType;
use nym_sphinx_types::delays::Delay;
use nym_sphinx_types::{NymPacket, NymPacketError, MIN_PACKET_SIZE};
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use std::time;
@@ -43,14 +43,13 @@ impl SurbAck {
ack_key: &AckKey,
marshaled_fragment_id: [u8; 5],
average_delay: time::Duration,
topology: &NymTopology,
topology: &NymRouteProvider,
packet_type: PacketType,
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
let route = topology.random_route_to_egress(rng, recipient.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
+2 -2
View File
@@ -131,8 +131,8 @@ impl Recipient {
&self.client_encryption_key
}
pub fn gateway(&self) -> &NodeIdentity {
&self.gateway
pub fn gateway(&self) -> NodeIdentity {
self.gateway
}
pub fn to_bytes(self) -> RecipientBytes {
@@ -6,9 +6,9 @@ use nym_crypto::{generic_array::typenum::Unsigned, Digest};
use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, MAX_NODE_ADDRESS_UNPADDED_LEN};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
use nym_sphinx_types::{NymPacket, SURBMaterial, SphinxError, SURB};
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use serde::de::{Error as SerdeError, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -89,13 +89,12 @@ impl ReplySurb {
rng: &mut R,
recipient: &Recipient,
average_delay: time::Duration,
topology: &NymTopology,
topology: &NymRouteProvider,
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, recipient.gateway())?;
let route = topology.random_route_to_egress(rng, recipient.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
@@ -110,15 +109,12 @@ impl ReplySurb {
/// Returns the expected number of bytes the [`ReplySURB`] will take after serialization.
/// Useful for deserialization from a bytes stream.
pub fn serialized_len(mix_hops: u8) -> usize {
pub fn serialized_len() -> usize {
use nym_sphinx_types::{HEADER_SIZE, NODE_ADDRESS_LENGTH, PAYLOAD_KEY_SIZE};
// the SURB itself consists of SURB_header, first hop address and set of payload keys
// (note extra 1 for the gateway)
SurbEncryptionKeySize::USIZE
+ HEADER_SIZE
+ NODE_ADDRESS_LENGTH
+ (1 + mix_hops as usize) * PAYLOAD_KEY_SIZE
// for each hop (3x mix + egress)
SurbEncryptionKeySize::USIZE + HEADER_SIZE + NODE_ADDRESS_LENGTH + 4 * PAYLOAD_KEY_SIZE
}
pub fn encryption_key(&self) -> &SurbEncryptionKey {
@@ -169,10 +169,7 @@ impl RepliableMessage {
.collect()
}
pub fn try_from_bytes(
bytes: &[u8],
num_mix_hops: u8,
) -> Result<Self, InvalidReplyRequestError> {
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, InvalidReplyRequestError> {
if bytes.len() < SENDER_TAG_SIZE + 1 {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
@@ -180,11 +177,8 @@ impl RepliableMessage {
AnonymousSenderTag::from_bytes(bytes[..SENDER_TAG_SIZE].try_into().unwrap());
let content_tag = RepliableMessageContentTag::try_from(bytes[SENDER_TAG_SIZE])?;
let content = RepliableMessageContent::try_from_bytes(
&bytes[SENDER_TAG_SIZE + 1..],
num_mix_hops,
content_tag,
)?;
let content =
RepliableMessageContent::try_from_bytes(&bytes[SENDER_TAG_SIZE + 1..], content_tag)?;
Ok(RepliableMessage {
sender_tag,
@@ -192,23 +186,20 @@ impl RepliableMessage {
})
}
pub fn serialized_size(&self, num_mix_hops: u8) -> usize {
pub fn serialized_size(&self) -> usize {
let content_type_size = 1;
SENDER_TAG_SIZE + content_type_size + self.content.serialized_size(num_mix_hops)
SENDER_TAG_SIZE + content_type_size + self.content.serialized_size()
}
}
// this recovery code is shared between all variants containing reply surbs
fn recover_reply_surbs(
bytes: &[u8],
num_mix_hops: u8,
) -> Result<(Vec<ReplySurb>, usize), InvalidReplyRequestError> {
fn recover_reply_surbs(bytes: &[u8]) -> Result<(Vec<ReplySurb>, usize), InvalidReplyRequestError> {
let mut consumed = mem::size_of::<u32>();
if bytes.len() < consumed {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
let num_surbs = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
let surb_size = ReplySurb::serialized_len(num_mix_hops);
let surb_size = ReplySurb::serialized_len();
if bytes[consumed..].len() < num_surbs as usize * surb_size {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
@@ -307,14 +298,13 @@ impl RepliableMessageContent {
fn try_from_bytes(
bytes: &[u8],
num_mix_hops: u8,
tag: RepliableMessageContentTag,
) -> Result<Self, InvalidReplyRequestError> {
if bytes.is_empty() {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
let (reply_surbs, n) = recover_reply_surbs(bytes, num_mix_hops)?;
let (reply_surbs, n) = recover_reply_surbs(bytes)?;
match tag {
RepliableMessageContentTag::Data => Ok(RepliableMessageContent::Data {
@@ -340,7 +330,7 @@ impl RepliableMessageContent {
}
}
fn serialized_size(&self, num_mix_hops: u8) -> usize {
fn serialized_size(&self) -> usize {
match self {
RepliableMessageContent::Data {
message,
@@ -348,19 +338,18 @@ impl RepliableMessageContent {
} => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag
+ reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
+ reply_surbs.len() * ReplySurb::serialized_len()
+ message.len()
}
RepliableMessageContent::AdditionalSurbs { reply_surbs } => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag + reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
num_reply_surbs_tag + reply_surbs.len() * ReplySurb::serialized_len()
}
RepliableMessageContent::Heartbeat {
additional_reply_surbs,
} => {
let num_reply_surbs_tag = mem::size_of::<u32>();
num_reply_surbs_tag
+ additional_reply_surbs.len() * ReplySurb::serialized_len(num_mix_hops)
num_reply_surbs_tag + additional_reply_surbs.len() * ReplySurb::serialized_len()
}
}
}
@@ -578,11 +567,11 @@ mod tests {
}
}
pub(super) fn reply_surb(rng: &mut ChaCha20Rng, num_mix_hops: u8) -> ReplySurb {
pub(super) fn reply_surb(rng: &mut ChaCha20Rng) -> ReplySurb {
// due to gateway
let num_hops = num_mix_hops + 1;
let route = (0..num_hops).map(|_| node(rng)).collect();
let delays = (0..num_hops)
const HOPS: u8 = 4;
let route = (0..HOPS).map(|_| node(rng)).collect();
let delays = (0..HOPS)
.map(|_| Delay::new_from_nanos(rng.next_u64()))
.collect();
let mut destination_bytes = [0u8; 32];
@@ -605,47 +594,40 @@ mod tests {
}
}
pub(super) fn reply_surbs(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
n: usize,
) -> Vec<ReplySurb> {
pub(super) fn reply_surbs(rng: &mut ChaCha20Rng, n: usize) -> Vec<ReplySurb> {
let mut surbs = Vec::with_capacity(n);
for _ in 0..n {
surbs.push(reply_surb(rng, num_mix_hops))
surbs.push(reply_surb(rng))
}
surbs
}
pub(super) fn repliable_content_data(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
msg_len: usize,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::Data {
message: random_vec_u8(rng, msg_len),
reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
reply_surbs: reply_surbs(rng, surbs),
}
}
pub(super) fn repliable_content_surbs(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::AdditionalSurbs {
reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
reply_surbs: reply_surbs(rng, surbs),
}
}
pub(super) fn repliable_content_heartbeat(
rng: &mut ChaCha20Rng,
num_mix_hops: u8,
surbs: usize,
) -> RepliableMessageContent {
RepliableMessageContent::Heartbeat {
additional_reply_surbs: reply_surbs(rng, num_mix_hops, surbs),
additional_reply_surbs: reply_surbs(rng, surbs),
}
}
@@ -676,70 +658,54 @@ mod tests {
#[test]
fn serialized_size_matches_actual_serialization() {
let mut rng = fixtures::test_rng();
let num_mix_hops = 3;
let data1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 10000, 0),
content: fixtures::repliable_content_data(&mut rng, 10000, 0),
};
assert_eq!(
data1.serialized_size(num_mix_hops),
data1.into_bytes().len()
);
assert_eq!(data1.serialized_size(), data1.into_bytes().len());
let data2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 10, 100),
content: fixtures::repliable_content_data(&mut rng, 10, 100),
};
assert_eq!(
data2.serialized_size(num_mix_hops),
data2.into_bytes().len()
);
assert_eq!(data2.serialized_size(), data2.into_bytes().len());
let data3 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_data(&mut rng, num_mix_hops, 100000, 1000),
content: fixtures::repliable_content_data(&mut rng, 100000, 1000),
};
assert_eq!(
data3.serialized_size(num_mix_hops),
data3.into_bytes().len()
);
assert_eq!(data3.serialized_size(), data3.into_bytes().len());
let additional_surbs1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1),
content: fixtures::repliable_content_surbs(&mut rng, 1),
};
assert_eq!(
additional_surbs1.serialized_size(num_mix_hops),
additional_surbs1.serialized_size(),
additional_surbs1.into_bytes().len()
);
let additional_surbs2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1000),
content: fixtures::repliable_content_surbs(&mut rng, 1000),
};
assert_eq!(
additional_surbs2.serialized_size(num_mix_hops),
additional_surbs2.serialized_size(),
additional_surbs2.into_bytes().len()
);
let heartbeat1 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1),
content: fixtures::repliable_content_heartbeat(&mut rng, 1),
};
assert_eq!(
heartbeat1.serialized_size(num_mix_hops),
heartbeat1.into_bytes().len()
);
assert_eq!(heartbeat1.serialized_size(), heartbeat1.into_bytes().len());
let heartbeat2 = RepliableMessage {
sender_tag: fixtures::sender_tag(&mut rng),
content: fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1000),
content: fixtures::repliable_content_heartbeat(&mut rng, 1000),
};
assert_eq!(
heartbeat2.serialized_size(num_mix_hops),
heartbeat2.into_bytes().len()
);
assert_eq!(heartbeat2.serialized_size(), heartbeat2.into_bytes().len());
}
}
@@ -750,49 +716,33 @@ mod tests {
#[test]
fn serialized_size_matches_actual_serialization() {
let mut rng = fixtures::test_rng();
let num_mix_hops = 3;
let data1 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 10000, 0);
assert_eq!(
data1.serialized_size(num_mix_hops),
data1.into_bytes().len()
);
let data1 = fixtures::repliable_content_data(&mut rng, 10000, 0);
assert_eq!(data1.serialized_size(), data1.into_bytes().len());
let data2 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 10, 100);
assert_eq!(
data2.serialized_size(num_mix_hops),
data2.into_bytes().len()
);
let data2 = fixtures::repliable_content_data(&mut rng, 10, 100);
assert_eq!(data2.serialized_size(), data2.into_bytes().len());
let data3 = fixtures::repliable_content_data(&mut rng, num_mix_hops, 100000, 1000);
assert_eq!(
data3.serialized_size(num_mix_hops),
data3.into_bytes().len()
);
let data3 = fixtures::repliable_content_data(&mut rng, 100000, 1000);
assert_eq!(data3.serialized_size(), data3.into_bytes().len());
let additional_surbs1 = fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1);
let additional_surbs1 = fixtures::repliable_content_surbs(&mut rng, 1);
assert_eq!(
additional_surbs1.serialized_size(num_mix_hops),
additional_surbs1.serialized_size(),
additional_surbs1.into_bytes().len()
);
let additional_surbs2 = fixtures::repliable_content_surbs(&mut rng, num_mix_hops, 1000);
let additional_surbs2 = fixtures::repliable_content_surbs(&mut rng, 1000);
assert_eq!(
additional_surbs2.serialized_size(num_mix_hops),
additional_surbs2.serialized_size(),
additional_surbs2.into_bytes().len()
);
let heartbeat1 = fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1);
assert_eq!(
heartbeat1.serialized_size(num_mix_hops),
heartbeat1.into_bytes().len()
);
let heartbeat1 = fixtures::repliable_content_heartbeat(&mut rng, 1);
assert_eq!(heartbeat1.serialized_size(), heartbeat1.into_bytes().len());
let heartbeat2 = fixtures::repliable_content_heartbeat(&mut rng, num_mix_hops, 1000);
assert_eq!(
heartbeat2.serialized_size(num_mix_hops),
heartbeat2.into_bytes().len()
);
let heartbeat2 = fixtures::repliable_content_heartbeat(&mut rng, 1000);
assert_eq!(heartbeat2.serialized_size(), heartbeat2.into_bytes().len());
}
}
+6 -17
View File
@@ -69,11 +69,11 @@ pub mod monitoring {
}
}
pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey, hops: u8) {
pub fn fragment_sent(fragment: &Fragment, client_nonce: i32, destination: PublicKey) {
if enabled() {
let id = fragment.fragment_identifier().set_id();
let mut entry = FRAGMENTS_SENT.entry(id).or_default();
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination, hops);
let s = SentFragment::new(fragment.header(), now!(), client_nonce, destination);
entry.push(s);
}
}
@@ -82,16 +82,11 @@ pub mod monitoring {
#[derive(Debug, Clone)]
pub struct FragmentMixParams {
destination: PublicKey,
hops: u8,
}
impl FragmentMixParams {
pub fn destination(&self) -> &PublicKey {
&self.destination
}
pub fn hops(&self) -> u8 {
self.hops
pub fn destination(&self) -> PublicKey {
self.destination
}
}
@@ -105,14 +100,8 @@ pub struct SentFragment {
}
impl SentFragment {
fn new(
header: FragmentHeader,
at: u64,
client_nonce: i32,
destination: PublicKey,
hops: u8,
) -> Self {
let mixnet_params = FragmentMixParams { destination, hops };
fn new(header: FragmentHeader, at: u64, client_nonce: i32, destination: PublicKey) -> Self {
let mixnet_params = FragmentMixParams { destination };
SentFragment {
header,
at,
+5 -8
View File
@@ -10,11 +10,9 @@ use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_chunking::fragment::COVER_FRAG_ID;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, PacketType, DEFAULT_NUM_MIX_HOPS,
};
use nym_sphinx_params::{PacketEncryptionAlgorithm, PacketHkdfAlgorithm, PacketType};
use nym_sphinx_types::NymPacket;
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use std::time;
@@ -36,7 +34,7 @@ pub enum CoverMessageError {
pub fn generate_loop_cover_surb_ack<R>(
rng: &mut R,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
full_address: &Recipient,
average_ack_delay: time::Duration,
@@ -59,7 +57,7 @@ where
#[allow(clippy::too_many_arguments)]
pub fn generate_loop_cover_packet<R>(
rng: &mut R,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
full_address: &Recipient,
average_ack_delay: time::Duration,
@@ -118,8 +116,7 @@ where
.chain(cover_content)
.collect();
let route =
topology.random_route_to_gateway(rng, DEFAULT_NUM_MIX_HOPS, full_address.gateway())?;
let route = topology.random_route_to_egress(rng, full_address.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
let destination = full_address.as_sphinx_destination();
-4
View File
@@ -16,10 +16,6 @@ pub mod packet_sizes;
pub mod packet_types;
pub mod packet_version;
// If somebody can provide an argument why it might be reasonable to have more than 255 mix hops,
// I will change this to [`usize`]
pub const DEFAULT_NUM_MIX_HOPS: u8 = 3;
// TODO: not entirely sure how to feel about those being defined here, ideally it'd be where [`Fragment`]
// is defined, but that'd introduce circular dependencies as the acknowledgements crate also needs
// access to that
+1 -33
View File
@@ -1,19 +1,10 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx_types::{delays, Delay};
use std::time::Duration;
use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_types::{delays, Delay, Node};
use thiserror::Error;
pub trait SphinxRouteMaker {
type Error;
fn sphinx_route(&mut self, hops: u8, destination: &Recipient)
-> Result<Vec<Node>, Self::Error>;
}
#[derive(Debug, Error, Clone, Copy)]
#[error("the route vector contains {available} nodes while {requested} hops are required")]
pub struct InvalidNumberOfHops {
@@ -21,29 +12,6 @@ pub struct InvalidNumberOfHops {
requested: u8,
}
// if one wants to provide a hardcoded route, they can
impl SphinxRouteMaker for Vec<Node> {
type Error = InvalidNumberOfHops;
fn sphinx_route(
&mut self,
hops: u8,
_destination: &Recipient,
) -> Result<Vec<Node>, InvalidNumberOfHops> {
// it's the responsibility of the caller to ensure the hardcoded route has correct number of hops
// and that it's final hop include the recipient's gateway.
if self.len() != hops as usize {
Err(InvalidNumberOfHops {
available: self.len(),
requested: hops,
})
} else {
Ok(self.clone())
}
}
}
pub fn generate_hop_delays(average_packet_delay: Duration, num_hops: usize) -> Vec<Delay> {
if average_packet_delay.is_zero() {
vec![nym_sphinx_types::Delay::new_from_millis(0); num_hops]
+11 -11
View File
@@ -149,7 +149,7 @@ impl NymMessage {
.collect()
}
fn try_from_bytes(bytes: &[u8], num_mix_hops: u8) -> Result<Self, NymMessageError> {
fn try_from_bytes(bytes: &[u8]) -> Result<Self, NymMessageError> {
if bytes.is_empty() {
return Err(NymMessageError::EmptyMessage);
}
@@ -158,7 +158,7 @@ impl NymMessage {
match typ_tag {
NymMessageType::Plain => Ok(NymMessage::Plain(bytes[1..].to_vec())),
NymMessageType::Repliable => Ok(NymMessage::Repliable(
RepliableMessage::try_from_bytes(&bytes[1..], num_mix_hops)?,
RepliableMessage::try_from_bytes(&bytes[1..])?,
)),
NymMessageType::Reply => Ok(NymMessage::Reply(ReplyMessage::try_from_bytes(
&bytes[1..],
@@ -166,10 +166,10 @@ impl NymMessage {
}
}
fn serialized_size(&self, num_mix_hops: u8) -> usize {
fn serialized_size(&self) -> usize {
let inner_size = match self {
NymMessage::Plain(msg) => msg.len(),
NymMessage::Repliable(msg) => msg.serialized_size(num_mix_hops),
NymMessage::Repliable(msg) => msg.serialized_size(),
NymMessage::Reply(msg) => msg.serialized_size(),
};
let message_type_size = 1;
@@ -207,9 +207,9 @@ impl NymMessage {
}
/// Determines the number of required packets of the provided size for the split message.
pub fn required_packets(&self, packet_size: PacketSize, num_mix_hops: u8) -> usize {
pub fn required_packets(&self, packet_size: PacketSize) -> usize {
let plaintext_per_packet = self.true_available_plaintext_per_packet(packet_size);
let serialized_len = self.serialized_size(num_mix_hops);
let serialized_len = self.serialized_size();
let (num_fragments, _) =
chunking::number_of_required_fragments(serialized_len, plaintext_per_packet);
@@ -279,11 +279,11 @@ impl PaddedMessage {
}
// reverse of NymMessage::pad_to_full_packet_lengths
pub fn remove_padding(self, num_mix_hops: u8) -> Result<NymMessage, NymMessageError> {
pub fn remove_padding(self) -> Result<NymMessage, NymMessageError> {
// we are looking for first occurrence of 1 in the tail and we get its index
if let Some(padding_end) = self.0.iter().rposition(|b| *b == 1) {
// and now we only take bytes until that point (but not including it)
NymMessage::try_from_bytes(&self.0[..padding_end], num_mix_hops)
NymMessage::try_from_bytes(&self.0[..padding_end])
} else {
Err(NymMessageError::InvalidMessagePadding)
}
@@ -304,7 +304,7 @@ mod tests {
fn serialized_size_matches_actual_serialization() {
// plain
let plain = NymMessage::new_plain(vec![1, 2, 3, 4, 5]);
assert_eq!(plain.serialized_size(3), plain.into_bytes().len());
assert_eq!(plain.serialized_size(), plain.into_bytes().len());
// a single variant for each repliable and reply is enough as they are more thoroughly tested
// internally
@@ -313,9 +313,9 @@ mod tests {
[42u8; 16].into(),
vec![],
));
assert_eq!(repliable.serialized_size(3), repliable.into_bytes().len());
assert_eq!(repliable.serialized_size(), repliable.into_bytes().len());
let reply = NymMessage::new_reply(ReplyMessage::new_data_message(vec![1, 2, 3, 4, 5]));
assert_eq!(reply.serialized_size(3), reply.into_bytes().len());
assert_eq!(reply.serialized_size(), reply.into_bytes().len());
}
}
+16 -37
View File
@@ -14,9 +14,9 @@ use nym_sphinx_anonymous_replies::reply_surb::ReplySurb;
use nym_sphinx_chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
use nym_sphinx_types::{Delay, NymPacket};
use nym_topology::{NymTopology, NymTopologyError};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
@@ -54,14 +54,13 @@ pub trait FragmentPreparer {
fn deterministic_route_selection(&self) -> bool;
fn rng(&mut self) -> &mut Self::Rng;
fn nonce(&self) -> i32;
fn num_mix_hops(&self) -> u8;
fn average_packet_delay(&self) -> Duration;
fn average_ack_delay(&self) -> Duration;
fn generate_reply_surbs(
&mut self,
amount: usize,
topology: &NymTopology,
topology: &NymRouteProvider,
reply_recipient: &Recipient,
) -> Result<Vec<ReplySurb>, NymTopologyError> {
let mut reply_surbs = Vec::with_capacity(amount);
@@ -79,7 +78,7 @@ pub trait FragmentPreparer {
&mut self,
recipient: &Recipient,
fragment_id: FragmentIdentifier,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_type: PacketType,
) -> Result<SurbAck, NymTopologyError> {
@@ -109,7 +108,7 @@ pub trait FragmentPreparer {
fn prepare_reply_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
reply_surb: ReplySurb,
packet_sender: &Recipient,
@@ -130,9 +129,8 @@ pub trait FragmentPreparer {
.expect("the message has been incorrectly fragmented");
// this is not going to be accurate by any means. but that's the best estimation we can do
let expected_forward_delay = Delay::new_from_millis(
(self.average_packet_delay().as_millis() * self.num_mix_hops() as u128) as u64,
);
let expected_forward_delay =
Delay::new_from_millis((self.average_packet_delay().as_millis() * 3) as u64);
let fragment_identifier = fragment.fragment_identifier();
@@ -190,12 +188,11 @@ pub trait FragmentPreparer {
fn prepare_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_sender: &Recipient,
packet_recipient: &Recipient,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, NymTopologyError> {
debug!("Preparing chunk for sending");
// each plain or repliable packet (i.e. not a reply) attaches an ephemeral public key so that the recipient
@@ -204,8 +201,7 @@ pub trait FragmentPreparer {
let fragment_header = fragment.header();
let destination = packet_recipient.gateway();
let hops = mix_hops.unwrap_or(self.num_mix_hops());
monitoring::fragment_sent(&fragment, self.nonce(), *destination, hops);
monitoring::fragment_sent(&fragment, self.nonce(), destination);
let non_reply_overhead = encryption::PUBLIC_KEY_SIZE;
let expected_plaintext = match packet_type {
@@ -240,16 +236,16 @@ pub trait FragmentPreparer {
};
// generate pseudorandom route for the packet
log::trace!("Preparing chunk for sending with {hops} mix hops");
log::trace!("Preparing chunk for sending");
let route = if self.deterministic_route_selection() {
log::trace!("using deterministic route selection");
let seed = fragment_header.seed().wrapping_mul(self.nonce());
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
topology.random_route_to_gateway(&mut rng, hops, destination)?
topology.random_route_to_egress(&mut rng, destination)?
} else {
log::trace!("using pseudorandom route selection");
let mut rng = self.rng();
topology.random_route_to_gateway(&mut rng, hops, destination)?
topology.random_route_to_egress(&mut rng, destination)?
};
let destination = packet_recipient.as_sphinx_destination();
@@ -335,10 +331,6 @@ pub struct MessagePreparer<R> {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
nonce: i32,
}
@@ -361,17 +353,10 @@ where
sender_address,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
nonce,
}
}
/// Allows setting non-default number of expected mix hops in the network.
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
/// Overwrites existing sender address with the provided value.
pub fn set_sender_address(&mut self, sender_address: Recipient) {
self.sender_address = sender_address;
@@ -380,7 +365,7 @@ where
pub fn generate_reply_surbs(
&mut self,
amount: usize,
topology: &NymTopology,
topology: &NymRouteProvider,
) -> Result<Vec<ReplySurb>, NymTopologyError> {
let mut reply_surbs = Vec::with_capacity(amount);
for _ in 0..amount {
@@ -399,7 +384,7 @@ where
pub fn prepare_reply_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
reply_surb: ReplySurb,
packet_type: PacketType,
@@ -420,11 +405,10 @@ where
pub fn prepare_chunk_for_sending(
&mut self,
fragment: Fragment,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_recipient: &Recipient,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, NymTopologyError> {
let sender = self.sender_address;
@@ -436,7 +420,6 @@ where
&sender,
packet_recipient,
packet_type,
mix_hops,
)
}
@@ -444,7 +427,7 @@ where
pub fn generate_surb_ack(
&mut self,
fragment_id: FragmentIdentifier,
topology: &NymTopology,
topology: &NymRouteProvider,
ack_key: &AckKey,
packet_type: PacketType,
) -> Result<SurbAck, NymTopologyError> {
@@ -483,10 +466,6 @@ impl<R: CryptoRng + Rng> FragmentPreparer for MessagePreparer<R> {
self.nonce
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
fn average_packet_delay(&self) -> Duration {
self.average_packet_delay
}
+2 -129
View File
@@ -14,7 +14,6 @@ use nym_sphinx_chunking::reconstruction::MessageReconstructor;
use nym_sphinx_chunking::ChunkingError;
use nym_sphinx_params::{
PacketEncryptionAlgorithm, PacketHkdfAlgorithm, ReplySurbEncryptionAlgorithm,
DEFAULT_NUM_MIX_HOPS,
};
use thiserror::Error;
@@ -79,7 +78,6 @@ pub enum MessageRecoveryError {
pub trait MessageReceiver {
fn new() -> Self;
fn reconstructor(&mut self) -> &mut MessageReconstructor;
fn num_mix_hops(&self) -> u8;
fn decrypt_raw_message<C>(
&self,
@@ -143,7 +141,7 @@ pub trait MessageReceiver {
fragment: Fragment,
) -> Result<Option<(NymMessage, Vec<i32>)>, MessageRecoveryError> {
if let Some((message, used_sets)) = self.reconstructor().insert_new_fragment(fragment) {
match PaddedMessage::new_reconstructed(message).remove_padding(self.num_mix_hops()) {
match PaddedMessage::new_reconstructed(message).remove_padding() {
Ok(message) => Ok(Some((message, used_sets))),
Err(err) => Err(MessageRecoveryError::MalformedReconstructedMessage {
source: err,
@@ -156,28 +154,11 @@ pub trait MessageReceiver {
}
}
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct SphinxMessageReceiver {
/// High level public structure used to buffer all received data [`Fragment`]s and eventually
/// returning original messages that they encapsulate.
reconstructor: MessageReconstructor,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
}
impl SphinxMessageReceiver {
/// Allows setting non-default number of expected mix hops in the network.
// IMPORTANT NOTE: this is among others used to deserialize SURBs. Meaning that this is a
// global setting and currently always set to the default value. The implication is that it is
// not currently possible to have different number of hops for different SURB messages. So,
// don't try to use <3 mix hops for SURBs until this is refactored.
#[must_use]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
}
impl MessageReceiver for SphinxMessageReceiver {
@@ -201,112 +182,4 @@ impl MessageReceiver for SphinxMessageReceiver {
fn reconstructor(&mut self) -> &mut MessageReconstructor {
&mut self.reconstructor
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
}
impl Default for SphinxMessageReceiver {
fn default() -> Self {
SphinxMessageReceiver {
reconstructor: Default::default(),
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
}
}
}
#[cfg(test)]
mod message_receiver {
use super::*;
use nym_crypto::asymmetric::identity;
use nym_mixnet_contract_common::LegacyMixLayer;
use nym_topology::{gateway, mix, NymTopology};
use std::collections::BTreeMap;
// TODO: is it somehow maybe possible to move it to `topology` and have if conditionally
// available to other modules?
/// Returns a hardcoded, valid instance of [`NymTopology`] that is to be used in
/// tests requiring instance of topology.
#[allow(dead_code)]
fn topology_fixture() -> NymTopology {
let mut mixes = BTreeMap::new();
mixes.insert(
1,
vec![mix::LegacyNode {
mix_id: 123,
host: "10.20.30.40".parse().unwrap(),
mix_host: "10.20.30.40:1789".parse().unwrap(),
identity_key: identity::PublicKey::from_base58_string(
"3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"B3GzG62aXAZNg14RoMCp3BhELNBrySLr2JqrwyfYFzRc",
)
.unwrap(),
layer: LegacyMixLayer::One,
version: "0.8.0-dev".into(),
}],
);
mixes.insert(
2,
vec![mix::LegacyNode {
mix_id: 234,
host: "11.21.31.41".parse().unwrap(),
mix_host: "11.21.31.41:1789".parse().unwrap(),
identity_key: identity::PublicKey::from_base58_string(
"D6YaMzLSY7mANtSQRKXsmMZpqgqiVkeiagKM4V4oFPFr",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"5Z1VqYwM2xeKxd8H7fJpGWasNiDFijYBAee7MErkZ5QT",
)
.unwrap(),
layer: LegacyMixLayer::Two,
version: "0.8.0-dev".into(),
}],
);
mixes.insert(
3,
vec![mix::LegacyNode {
mix_id: 456,
host: "12.22.32.42".parse().unwrap(),
mix_host: "12.22.32.42:1789".parse().unwrap(),
identity_key: identity::PublicKey::from_base58_string(
"GkWDysw4AjESv1KiAiVn7JzzCMJeksxNSXVfr1PpX8wD",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"9EyjhCggr2QEA2nakR88YHmXgpy92DWxoe2draDRkYof",
)
.unwrap(),
layer: LegacyMixLayer::Three,
version: "0.8.0-dev".into(),
}],
);
NymTopology::new(
// currently coco_nodes don't really exist so this is still to be determined
mixes,
vec![gateway::LegacyNode {
node_id: 789,
host: "1.2.3.4".parse().unwrap(),
mix_host: "1.2.3.4:1789".parse().unwrap(),
clients_ws_port: 9000,
clients_wss_port: None,
identity_key: identity::PublicKey::from_base58_string(
"FioFa8nMmPpQnYi7JyojoTuwGLeyNS8BF4ChPr29zUML",
)
.unwrap(),
sphinx_key: encryption::PublicKey::from_base58_string(
"EB42xvMFMD5rUCstE2CDazgQQJ22zLv8SPm1Luxni44c",
)
.unwrap(),
version: "0.8.0-dev".into(),
}],
)
}
}
+99 -12
View File
@@ -42,8 +42,43 @@ impl PendingSync {
}
}
#[derive(Debug, Clone)]
pub struct BlockProcessorConfig {
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub explicit_starting_block_height: Option<u32>,
pub use_best_effort_start_height: bool,
}
impl Default for BlockProcessorConfig {
fn default() -> Self {
Self {
pruning_options: PruningOptions::nothing(),
store_precommits: true,
explicit_starting_block_height: None,
use_best_effort_start_height: false,
}
}
}
impl BlockProcessorConfig {
pub fn new(
pruning_options: PruningOptions,
store_precommits: bool,
explicit_starting_block_height: Option<u32>,
use_best_effort_start_height: bool,
) -> Self {
Self {
pruning_options,
store_precommits,
explicit_starting_block_height,
use_best_effort_start_height,
}
}
}
pub struct BlockProcessor {
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
@@ -65,9 +100,10 @@ pub struct BlockProcessor {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
pub async fn new(
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
@@ -81,8 +117,10 @@ impl BlockProcessor {
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(last_processed_height = %last_processed_height, pruned_height = %last_pruned_height, "setting up block processor...");
Ok(BlockProcessor {
pruning_options,
config,
cancel,
synced,
last_processed_height,
@@ -101,7 +139,7 @@ impl BlockProcessor {
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.pruning_options = pruning_options;
self.config.pruning_options = pruning_options;
self
}
@@ -128,7 +166,7 @@ impl BlockProcessor {
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;
persist_block(&full_info, &mut tx).await?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
// let the modules do whatever they want
// the ones wanting the full block:
@@ -241,7 +279,7 @@ impl BlockProcessor {
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
@@ -282,12 +320,12 @@ impl BlockProcessor {
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.pruning_options.strategy.is_nothing() {
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.pruning_options.strategy_interval();
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
@@ -363,20 +401,69 @@ impl BlockProcessor {
// but we need it to help the compiler figure out the future is `Send`
async fn startup_resync(&mut self) -> Result<(), ScraperError> {
assert!(self.pending_sync.is_empty());
info!("attempting to run startup resync...");
self.maybe_prune_storage().await?;
let latest_block = self.rpc_client.current_block_height().await? as u32;
info!("obtained latest block height: {latest_block}");
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
info!("we have already processed some blocks in the past - attempting to resume...");
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
self.last_processed_height = max(self.last_processed_height, last_to_keep);
if !self.config.pruning_options.strategy.is_nothing() {
self.last_processed_height = max(self.last_processed_height, last_to_keep);
}
let request_range = self.last_processed_height + 1..latest_block + 1;
info!("we need to request {request_range:?} to resync");
info!(
keep_recent = %keep_recent,
last_to_keep = %last_to_keep,
last_processed_height = %self.last_processed_height,
"we need to request {request_range:?} to resync"
);
self.request_missing_blocks(request_range).await?;
return Ok(());
}
// this is the first time starting up
if self.last_processed_height == 0 {
info!("this is the first time starting up");
let Some(starting_height) = self.config.explicit_starting_block_height else {
info!("no starting block height set - will use the default behaviour");
// nothing to do
return Ok(());
};
info!("attempting to start the scraper from block {starting_height}");
let earliest_available =
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");
if earliest_available > starting_height && self.config.use_best_effort_start_height {
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
});
}
let starting_height = if earliest_available > starting_height {
// add few additional blocks to account for all the startup waiting
// because the node might have pruned few blocks since
earliest_available + 10
} else {
starting_height
};
let request_range = starting_height..latest_block + 1;
info!("going to start the scraper from block {starting_height}");
info!("we need to request {request_range:?} before properly starting up");
self.request_missing_blocks(request_range).await?;
}
@@ -384,7 +471,7 @@ impl BlockProcessor {
}
pub(crate) async fn run(&mut self) {
info!("starting processing loop");
info!("starting block processor processing loop");
// sure, we could be more efficient and reset it on every processed block,
// but the overhead is so minimal that it doesn't matter
@@ -84,13 +84,7 @@ impl TryFrom<Event> for BlockToProcess {
// TODO: we're losing `result_begin_block` and `result_end_block` here but maybe that's fine?
let maybe_block = match event.data {
// we don't care about `NewBlock` until CometBFT 0.38, i.e. until we upgrade to wasmd 0.50
EventData::NewBlock { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
query,
kind: "NewBlock".to_string(),
})
}
EventData::NewBlock { block, .. } => block,
EventData::LegacyNewBlock { block, .. } => block,
EventData::Tx { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
+3
View File
@@ -19,6 +19,9 @@ pub enum ScraperError {
#[error("the block scraper is already running")]
ScraperAlreadyRunning,
#[error("block information for height {height} is not available on the provided rpc endpoint")]
BlocksUnavailable { height: u32 },
#[error("failed to establish websocket connection to {url}: {source}")]
WebSocketConnectionFailure {
url: String,
+2 -1
View File
@@ -15,6 +15,7 @@ pub(crate) mod scraper;
pub mod storage;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use block_processor::types::ParsedTransactionResponse;
pub use modules::{BlockModule, MsgModule, TxModule};
pub use scraper::{Config, NyxdScraper};
pub use scraper::{Config, NyxdScraper, StartingBlockOpts};
pub use storage::models;
+11
View File
@@ -117,6 +117,17 @@ impl RpcClient {
Ok(info.last_block_height.value())
}
pub(crate) async fn earliest_available_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting earliest available block height");
let status = self
.inner
.status()
.await
.map_err(|source| ScraperError::AbciInfoQueryFailure { source })?;
Ok(status.sync_info.earliest_block_height.value())
}
async fn get_transaction_results(
&self,
raw: &[Vec<u8>],
+99 -30
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::BlockProcessor;
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
@@ -24,6 +24,15 @@ use url::Url;
mod subscriber;
#[derive(Default, Clone, Copy)]
pub struct StartingBlockOpts {
pub start_block_height: Option<u32>,
/// If the scraper fails to start from the desired height, rather than failing,
/// attempt to use the next available height
pub use_best_effort_start_height: bool,
}
pub struct Config {
/// Url to the websocket endpoint of a validator, for example `wss://rpc.nymtech.net/websocket`
pub websocket_url: Url,
@@ -34,6 +43,10 @@ pub struct Config {
pub database_path: PathBuf,
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub start_block: StartingBlockOpts,
}
pub struct NyxdScraperBuilder {
@@ -60,8 +73,16 @@ impl NyxdScraperBuilder {
req_rx,
processing_tx.clone(),
);
let mut block_processor = BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
scraper.config.pruning_options,
scraper.config.store_precommits,
scraper.config.start_block.start_block_height,
scraper.config.start_block.use_best_effort_start_height,
);
let mut block_processor = BlockProcessor::new(
block_processor_config,
scraper.cancel_token.clone(),
scraper.startup_sync.clone(),
processing_rx,
@@ -118,7 +139,7 @@ pub struct NyxdScraper {
task_tracker: TaskTracker,
cancel_token: CancellationToken,
startup_sync: Arc<Notify>,
pub storage: ScraperStorage,
storage: ScraperStorage,
rpc_client: RpcClient,
}
@@ -142,6 +163,10 @@ impl NyxdScraper {
})
}
pub fn storage(&self) -> ScraperStorage {
self.storage.clone()
}
fn start_tasks(
&self,
mut block_requester: BlockRequester,
@@ -158,7 +183,10 @@ impl NyxdScraper {
self.task_tracker.close();
}
pub async fn process_single_block(&self, height: u32) -> Result<(), ScraperError> {
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_single_block(&self, height: u32) -> Result<(), ScraperError> {
info!(height = height, "attempting to process a single block");
if !self.task_tracker.is_empty() {
return Err(ScraperError::ScraperAlreadyRunning);
@@ -177,7 +205,10 @@ impl NyxdScraper {
block_processor.process_block(block.into()).await
}
pub async fn process_block_range(
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_block_range(
&self,
starting_height: Option<u32>,
end_height: Option<u32>,
@@ -194,10 +225,10 @@ impl NyxdScraper {
.await?
.with_pruning(PruningOptions::nothing());
let current_height = self.rpc_client.current_block_height().await? as u32;
let mut current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
let starting_height = match starting_height {
let mut starting_height = match starting_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -211,7 +242,8 @@ impl NyxdScraper {
}
};
let end_height = match end_height {
let must_catch_up = end_height.is_none();
let mut end_height = match end_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -226,32 +258,62 @@ impl NyxdScraper {
}
};
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
let mut last_processed = starting_height;
let range = (starting_height..=end_height).collect::<Vec<_>>();
while last_processed < current_height {
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
let range = (starting_height..=end_height).collect::<Vec<_>>();
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
}
}
}
}
// if we don't need to catch up, return early
if !must_catch_up {
return Ok(());
}
// check if we have caught up to the current block height
last_processed = end_height;
current_height = self.rpc_client.current_block_height().await? as u32;
info!(
last_processed = last_processed,
current_height = current_height,
"🏃 still need to catch up..."
);
starting_height = last_processed + 1;
end_height = current_height;
}
if must_catch_up {
info!(
last_processed = last_processed,
current_height = current_height,
"✅ block processing has caught up!"
);
}
Ok(())
@@ -275,8 +337,15 @@ impl NyxdScraper {
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
self.config.store_precommits,
self.config.start_block.start_block_height,
self.config.start_block.use_best_effort_start_height,
);
BlockProcessor::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
+11 -1
View File
@@ -3,6 +3,7 @@
use crate::block_processor::types::BlockToProcess;
use crate::error::ScraperError;
use tendermint_rpc::client::CompatMode;
use tendermint_rpc::event::Event;
use tendermint_rpc::query::EventType;
use tendermint_rpc::{SubscriptionClient, WebSocketClient, WebSocketClientDriver};
@@ -38,7 +39,16 @@ impl ChainSubscriber {
) -> Result<Self, ScraperError> {
// sure, we could have just used websocket client entirely, but let's keep the logic for
// getting current blocks and historical blocks completely separate with the dual connection
let (client, driver) = WebSocketClient::new(websocket_endpoint.as_str())
let websocket_url = websocket_endpoint.as_str().try_into().map_err(|source| {
ScraperError::WebSocketConnectionFailure {
url: websocket_endpoint.to_string(),
source,
}
})?;
let (client, driver) = WebSocketClient::builder(websocket_url)
.compat_mode(CompatMode::V0_37)
.build()
.await
.map_err(|source| ScraperError::WebSocketConnectionFailure {
url: websocket_endpoint.to_string(),
+21 -6
View File
@@ -13,7 +13,11 @@ use crate::{
models::{CommitSignature, Validator},
},
};
use sqlx::{types::time::OffsetDateTime, ConnectOptions, Sqlite, Transaction};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
types::time::OffsetDateTime,
ConnectOptions, Sqlite, Transaction,
};
use std::{fmt::Debug, path::Path};
use tendermint::{
block::{Commit, CommitSig},
@@ -50,7 +54,16 @@ pub(crate) fn log_db_operation_time(op_name: &str, start_time: Instant) {
impl ScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(database_path: P) -> Result<Self, ScraperError> {
let database_path = database_path.as_ref();
debug!(
"initialising scraper database path to '{}'",
database_path.display()
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -212,6 +225,7 @@ impl ScraperStorage {
pub async fn persist_block(
block: &FullBlockInformation,
tx: &mut StorageTransaction,
store_precommits: bool,
) -> Result<(), ScraperError> {
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
@@ -224,11 +238,12 @@ pub async fn persist_block(
// persist block data
persist_block_data(&block.block, total_gas, tx).await?;
// persist commits
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
if store_precommits {
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
@@ -428,7 +428,7 @@ impl PacketStatisticsControl {
while self
.history
.front()
.map_or(false, |&(t, _)| t < recording_window)
.is_some_and(|&(t, _)| t < recording_window)
{
self.history.pop_front();
}
@@ -462,7 +462,7 @@ impl PacketStatisticsControl {
while self
.rates
.front()
.map_or(false, |&(t, _)| t < recording_window)
.is_some_and(|&(t, _)| t < recording_window)
{
self.rates.pop_front();
}
+8
View File
@@ -58,6 +58,10 @@ pub enum GatewaySessionEvent {
/// Address of the remote client opening the connection
client: DestinationAddressBytes,
},
SessionDelete {
/// Address of the remote client opening the connection
client: DestinationAddressBytes,
},
}
impl GatewaySessionEvent {
@@ -87,4 +91,8 @@ impl GatewaySessionEvent {
client,
}
}
pub fn new_session_delete(client: DestinationAddressBytes) -> GatewaySessionEvent {
GatewaySessionEvent::SessionDelete { client }
}
}
+4 -7
View File
@@ -12,15 +12,13 @@ documentation = { workspace = true }
[dependencies]
async-trait = { workspace = true, optional = true }
bs58 = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
semver = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
# 'serializable' feature
serde = { workspace = true, features = ["derive"], optional = true }
# 'serde' feature
serde_json = { workspace = true, optional = true }
# 'wasm-serde-types' feature
@@ -28,7 +26,6 @@ tsify = { workspace = true, features = ["js"], optional = true }
wasm-bindgen = { workspace = true, optional = true }
## internal
nym-bin-common = { path = "../bin-common" }
nym-config = { path = "../config" }
nym-crypto = { path = "../crypto", features = ["sphinx", "outfox"] }
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
@@ -51,5 +48,5 @@ wasm-utils = { path = "../wasm/utils", default-features = false, optional = true
default = ["provider-trait"]
provider-trait = ["async-trait"]
wasm-serde-types = ["tsify", "wasm-bindgen", "wasm-utils"]
serializable = ["serde", "serde_json"]
persistence = ["serde_json"]
outfox = []
+12 -8
View File
@@ -4,18 +4,28 @@
use std::array::TryFromSliceError;
use crate::MixLayer;
use nym_sphinx_addressing::NodeIdentity;
use nym_sphinx_types::NymPacketError;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NymTopologyError {
#[error("The provided network topology is empty - there are no mixnodes and no gateways on it - the network request(s) probably failed")]
#[error("the provided network topology is empty - there are no valid nodes on it - the network request(s) probably failed")]
EmptyNetworkTopology,
#[error("no node with identity {node_identity} is known")]
NonExistentNode { node_identity: Box<NodeIdentity> },
#[error("could not use node with identity {node_identity} as egress since it didn't get assigned valid role in the current epoch")]
InvalidEgressRole { node_identity: Box<NodeIdentity> },
#[error("one (or more) of mixing layers does not have any valid nodes available")]
InsufficientMixingNodes,
#[error("The provided network topology has no gateways available")]
NoGatewaysAvailable,
#[error("The provided network topology has no mixnodes available")]
#[error("The provided network topology has no valid mixnodes available")]
NoMixnodesAvailable,
#[error("Gateway with identity key {identity_key} doesn't exist")]
@@ -55,12 +65,6 @@ pub enum NymTopologyError {
#[error("{0}")]
ReqwestError(#[from] reqwest::Error),
#[error("{0}")]
MixnodeConversionError(#[from] crate::mix::MixnodeConversionError),
#[error("{0}")]
GatewayConversionError(#[from] crate::gateway::GatewayConversionError),
#[error("{0}")]
VarError(#[from] std::env::VarError),
}
-174
View File
@@ -1,174 +0,0 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{NetworkAddress, NodeVersion};
use nym_api_requests::nym_nodes::SkimmedNode;
use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_contract_common::NodeId;
use nym_sphinx_addressing::nodes::{NodeIdentity, NymNodeRoutingAddress};
use nym_sphinx_types::Node as SphinxNode;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::fmt;
use std::fmt::Formatter;
use std::io;
use std::net::AddrParseError;
use std::net::SocketAddr;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum GatewayConversionError {
#[error("gateway identity key was malformed - {0}")]
InvalidIdentityKey(#[from] identity::Ed25519RecoveryError),
#[error("gateway sphinx key was malformed - {0}")]
InvalidSphinxKey(#[from] encryption::KeyRecoveryError),
#[error("'{value}' is not a valid gateway address - {source}")]
InvalidAddress {
value: String,
#[source]
source: io::Error,
},
#[error("'{gateway}' has not provided any valid ip addresses")]
NoIpAddressesProvided { gateway: String },
#[error("'{gateway}' has provided a malformed ip address: {err}")]
MalformedIpAddress {
gateway: String,
#[source]
err: AddrParseError,
},
#[error("provided node is not an entry gateway in this epoch!")]
NotGateway,
}
#[derive(Clone)]
pub struct LegacyNode {
pub node_id: NodeId,
pub host: NetworkAddress,
// we're keeping this as separate resolved field since we do not want to be resolving the potential
// hostname every time we want to construct a path via this node
pub mix_host: SocketAddr,
// #[serde(alias = "clients_port")]
pub clients_ws_port: u16,
// #[serde(default)]
pub clients_wss_port: Option<u16>,
pub identity_key: identity::PublicKey,
pub sphinx_key: encryption::PublicKey, // TODO: or nymsphinx::PublicKey? both are x25519
// to be removed:
pub version: NodeVersion,
}
impl std::fmt::Debug for LegacyNode {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("gateway::Node")
.field("host", &self.host)
.field("mix_host", &self.mix_host)
.field("clients_ws_port", &self.clients_ws_port)
.field("clients_wss_port", &self.clients_wss_port)
.field("identity_key", &self.identity_key.to_base58_string())
.field("sphinx_key", &self.sphinx_key.to_base58_string())
.field("version", &self.version)
.finish()
}
}
impl LegacyNode {
pub fn parse_host(raw: &str) -> Result<NetworkAddress, GatewayConversionError> {
// safety: this conversion is infallible
// (but we retain result return type for legacy reasons)
Ok(raw.parse().unwrap())
}
pub fn extract_mix_host(
host: &NetworkAddress,
mix_port: u16,
) -> Result<SocketAddr, GatewayConversionError> {
Ok(host.to_socket_addrs(mix_port).map_err(|err| {
GatewayConversionError::InvalidAddress {
value: host.to_string(),
source: err,
}
})?[0])
}
pub fn identity(&self) -> &NodeIdentity {
&self.identity_key
}
pub fn clients_address(&self) -> String {
self.clients_address_tls()
.unwrap_or_else(|| self.clients_address_no_tls())
}
pub fn clients_address_no_tls(&self) -> String {
format!("ws://{}:{}", self.host, self.clients_ws_port)
}
pub fn clients_address_tls(&self) -> Option<String> {
self.clients_wss_port
.map(|p| format!("wss://{}:{p}", self.host))
}
}
impl fmt::Display for LegacyNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "legacy gateway {} @ {}", self.node_id, self.host)
}
}
impl<'a> From<&'a LegacyNode> for SphinxNode {
fn from(node: &'a LegacyNode) -> Self {
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host)
.try_into()
.unwrap();
SphinxNode::new(node_address_bytes, (&node.sphinx_key).into())
}
}
impl<'a> TryFrom<&'a SkimmedNode> for LegacyNode {
type Error = GatewayConversionError;
fn try_from(value: &'a SkimmedNode) -> Result<Self, Self::Error> {
let Some(entry_details) = &value.entry else {
return Err(GatewayConversionError::NotGateway);
};
if value.ip_addresses.is_empty() {
return Err(GatewayConversionError::NoIpAddressesProvided {
gateway: value.ed25519_identity_pubkey.to_base58_string(),
});
}
// safety: we just checked the slice is not empty
#[allow(clippy::unwrap_used)]
let ip = value.ip_addresses.choose(&mut thread_rng()).unwrap();
let host = if let Some(hostname) = &entry_details.hostname {
NetworkAddress::Hostname(hostname.to_string())
} else {
NetworkAddress::IpAddr(*ip)
};
Ok(LegacyNode {
node_id: value.node_id,
host,
mix_host: SocketAddr::new(*ip, value.mix_port),
clients_ws_port: entry_details.ws_port,
clients_wss_port: entry_details.wss_port,
identity_key: value.ed25519_identity_pubkey,
sphinx_key: value.x25519_sphinx_pubkey,
version: NodeVersion::Unknown,
})
}
}
+440 -478
View File
File diff suppressed because it is too large Load Diff
-135
View File
@@ -1,135 +0,0 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{NetworkAddress, NodeVersion};
use nym_api_requests::nym_nodes::{NodeRole, SkimmedNode};
use nym_crypto::asymmetric::{encryption, identity};
pub use nym_mixnet_contract_common::LegacyMixLayer;
use nym_mixnet_contract_common::NodeId;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_types::Node as SphinxNode;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::fmt::Formatter;
use std::io;
use std::net::SocketAddr;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MixnodeConversionError {
#[error("mixnode identity key was malformed - {0}")]
InvalidIdentityKey(#[from] identity::Ed25519RecoveryError),
#[error("mixnode sphinx key was malformed - {0}")]
InvalidSphinxKey(#[from] encryption::KeyRecoveryError),
#[error("'{value}' is not a valid mixnode address - {source}")]
InvalidAddress {
value: String,
#[source]
source: io::Error,
},
#[error("invalid mix layer")]
InvalidLayer,
#[error("'{mixnode}' has not provided any valid ip addresses")]
NoIpAddressesProvided { mixnode: String },
#[error("provided node is not a mixnode in this epoch!")]
NotMixnode,
}
#[derive(Clone)]
pub struct LegacyNode {
pub mix_id: NodeId,
pub host: NetworkAddress,
// we're keeping this as separate resolved field since we do not want to be resolving the potential
// hostname every time we want to construct a path via this node
pub mix_host: SocketAddr,
pub identity_key: identity::PublicKey,
pub sphinx_key: encryption::PublicKey, // TODO: or nymsphinx::PublicKey? both are x25519
pub layer: LegacyMixLayer,
// to be removed:
pub version: NodeVersion,
}
impl std::fmt::Debug for LegacyNode {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("mix::Node")
.field("mix_id", &self.mix_id)
.field("host", &self.host)
.field("mix_host", &self.mix_host)
.field("identity_key", &self.identity_key.to_base58_string())
.field("sphinx_key", &self.sphinx_key.to_base58_string())
.field("layer", &self.layer)
.field("version", &self.version)
.finish()
}
}
impl LegacyNode {
pub fn parse_host(raw: &str) -> Result<NetworkAddress, MixnodeConversionError> {
// safety: this conversion is infallible
// (but we retain result return type for legacy reasons)
Ok(raw.parse().unwrap())
}
pub fn extract_mix_host(
host: &NetworkAddress,
mix_port: u16,
) -> Result<SocketAddr, MixnodeConversionError> {
Ok(host.to_socket_addrs(mix_port).map_err(|err| {
MixnodeConversionError::InvalidAddress {
value: host.to_string(),
source: err,
}
})?[0])
}
}
impl<'a> From<&'a LegacyNode> for SphinxNode {
fn from(node: &'a LegacyNode) -> Self {
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host)
.try_into()
.unwrap();
SphinxNode::new(node_address_bytes, (&node.sphinx_key).into())
}
}
impl<'a> TryFrom<&'a SkimmedNode> for LegacyNode {
type Error = MixnodeConversionError;
fn try_from(value: &'a SkimmedNode) -> Result<Self, Self::Error> {
if value.ip_addresses.is_empty() {
return Err(MixnodeConversionError::NoIpAddressesProvided {
mixnode: value.ed25519_identity_pubkey.to_base58_string(),
});
}
let layer = match value.role {
NodeRole::Mixnode { layer } => layer
.try_into()
.map_err(|_| MixnodeConversionError::InvalidLayer)?,
_ => return Err(MixnodeConversionError::NotMixnode),
};
// safety: we just checked the slice is not empty
#[allow(clippy::unwrap_used)]
let ip = value.ip_addresses.choose(&mut thread_rng()).unwrap();
let host = NetworkAddress::IpAddr(*ip);
Ok(LegacyNode {
mix_id: value.node_id,
host,
mix_host: SocketAddr::new(*ip, value.mix_port),
identity_key: value.ed25519_identity_pubkey,
sphinx_key: value.x25519_sphinx_pubkey,
layer,
version: NodeVersion::Unknown,
})
}
}
+143
View File
@@ -0,0 +1,143 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_api_requests::models::DeclaredRoles;
use nym_api_requests::nym_nodes::SkimmedNode;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_mixnet_contract_common::NodeId;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_types::Node as SphinxNode;
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, SocketAddr};
use thiserror::Error;
pub use nym_mixnet_contract_common::LegacyMixLayer;
#[derive(Error, Debug)]
pub enum RoutingNodeError {
#[error("node {node_id} ('{identity}') has not provided any valid ip addresses")]
NoIpAddressesProvided { node_id: NodeId, identity: String },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EntryDetails {
// to allow client to choose ipv6 preference, if available
pub ip_addresses: Vec<IpAddr>,
pub clients_ws_port: u16,
pub hostname: Option<String>,
pub clients_wss_port: Option<u16>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct SupportedRoles {
pub mixnode: bool,
pub mixnet_entry: bool,
pub mixnet_exit: bool,
}
impl From<DeclaredRoles> for SupportedRoles {
fn from(value: DeclaredRoles) -> Self {
SupportedRoles {
mixnode: value.mixnode,
mixnet_entry: value.entry,
mixnet_exit: value.exit_nr && value.exit_ipr,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RoutingNode {
pub node_id: NodeId,
pub mix_host: SocketAddr,
pub entry: Option<EntryDetails>,
pub identity_key: ed25519::PublicKey,
pub sphinx_key: x25519::PublicKey,
pub supported_roles: SupportedRoles,
}
impl RoutingNode {
pub fn ws_entry_address_tls(&self) -> Option<String> {
let entry = self.entry.as_ref()?;
let hostname = entry.hostname.as_ref()?;
let wss_port = entry.clients_wss_port?;
Some(format!("wss://{hostname}:{wss_port}"))
}
pub fn ws_entry_address_no_tls(&self, prefer_ipv6: bool) -> Option<String> {
let entry = self.entry.as_ref()?;
if let Some(hostname) = entry.hostname.as_ref() {
return Some(format!("ws://{hostname}:{}", entry.clients_ws_port));
}
if prefer_ipv6 {
if let Some(ipv6) = entry.ip_addresses.iter().find(|ip| ip.is_ipv6()) {
return Some(format!("ws://{ipv6}:{}", entry.clients_ws_port));
}
}
let any_ip = entry.ip_addresses.first()?;
Some(format!("ws://{any_ip}:{}", entry.clients_ws_port))
}
pub fn ws_entry_address(&self, prefer_ipv6: bool) -> Option<String> {
if let Some(tls) = self.ws_entry_address_tls() {
return Some(tls);
}
self.ws_entry_address_no_tls(prefer_ipv6)
}
pub fn identity(&self) -> ed25519::PublicKey {
self.identity_key
}
}
impl<'a> From<&'a RoutingNode> for SphinxNode {
fn from(node: &'a RoutingNode) -> Self {
// SAFETY: this conversion is infallible as all versions of socket addresses have
// sufficiently small bytes representation to fit inside `NodeAddressBytes`
#[allow(clippy::unwrap_used)]
let node_address_bytes = NymNodeRoutingAddress::from(node.mix_host)
.try_into()
.unwrap();
SphinxNode::new(node_address_bytes, (&node.sphinx_key).into())
}
}
impl<'a> TryFrom<&'a SkimmedNode> for RoutingNode {
type Error = RoutingNodeError;
fn try_from(value: &'a SkimmedNode) -> Result<Self, Self::Error> {
// IF YOU EVER ADD "performance" TO RoutingNode,
// MAKE SURE TO UPDATE THE LAZY IMPLEMENTATION OF
// `impl NodeDescriptionTopologyExt for NymNodeDescription`!!!
let Some(first_ip) = value.ip_addresses.first() else {
return Err(RoutingNodeError::NoIpAddressesProvided {
node_id: value.node_id,
identity: value.ed25519_identity_pubkey.to_string(),
});
};
let entry = value.entry.as_ref().map(|entry| EntryDetails {
ip_addresses: value.ip_addresses.clone(),
clients_ws_port: entry.ws_port,
hostname: entry.hostname.clone(),
clients_wss_port: entry.wss_port,
});
Ok(RoutingNode {
node_id: value.node_id,
mix_host: SocketAddr::new(*first_ip, value.mix_port),
entry,
identity_key: value.ed25519_identity_pubkey,
sphinx_key: value.x25519_sphinx_pubkey,
supported_roles: value.supported_roles.into(),
})
}
}

Some files were not shown because too many files have changed in this diff Show More