Compare commits

...

37 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 8e992c3c58 Handle ecash network errors differently (#5378) 2025-01-23 10:48:03 +01:00
benedetta davico 5ab164d229 Update Cargo.toml 2025-01-16 12:51:53 +01:00
Jędrzej Stuczyński 26538c5884 bugfix: only consider pre-existing peers for wg bytes metric (#5357) 2025-01-16 11:50:26 +00:00
benedettadavico c202e2d598 adding changelog for reeses 2025-01-15 10:27:39 +01:00
huximaxi a7874add88 Merge pull request #5346 from nymtech/feture/legacy_alert
Feture/legacy alert
2025-01-14 15:00:49 +01:00
RadekSabacky 3d84be22e2 + add releaseAlert component 2025-01-14 13:41:30 +01:00
RadekSabacky 25766dc0ec + add alert message into nav components 2025-01-14 13:22:31 +01:00
Jędrzej Stuczyński 676e93a372 bugfix: make sure refresh data key matches bond info (#5329) 2025-01-10 14:52:52 +00:00
Jędrzej Stuczyński 5a6770e5e2 chore: readjusted --mode behaviour to fix the regression (#5331) 2025-01-10 13:17:03 +00:00
Jędrzej Stuczyński 529e8d49ee chore: apply 1.84 linter suggestions (#5330)
* chore: apply 1.84 linter suggestions

* updated wasm dependencies to fix the macro issue

* second batch of clippy fixes
2025-01-10 13:00:18 +00:00
benedettadavico a94c035c0a correct the nym-node bumped version 2025-01-09 12:36:05 +01:00
Jędrzej Stuczyński 24480418f0 Bugfix/contract version assignment (#5318)
* fixed contract version being overwritten

* introduced migration to fix existing [mainnet] state

* updated contract schema

* updated testnet manager migrate msg code
2025-01-09 10:00:37 +00:00
Jędrzej Stuczyński a46245ffe3 feat: warn users if node is run in exit mode only (#5320)
* added 'full-gateway' nymnode mode to enable both entry and exit at the same time

* warning for running node in exit mode only
2025-01-09 09:02:52 +00:00
Jędrzej Stuczyński 7c1c13e139 reduce log severity for number of packets being delayed (#5321) 2025-01-09 09:02:37 +00:00
Jędrzej Stuczyński 836a93cd96 fixed client session histogram buckets (#5316) 2025-01-08 10:26:40 +00:00
benedettadavico b47a742dd0 update nym-node binary version 2025-01-08 10:37:48 +01:00
benedetta davico 6e14882246 Merge pull request #5315 from nymtech/release/2024.14-crunch-patched
Merge crunch patched to reeses
2025-01-08 10:35:54 +01:00
Tommy Verrall a7466a0e02 Merge pull request #5313 from nymtech/bugfix/append-gb-cap
amend 250gb limit
2025-01-08 09:50:04 +01:00
Tommy Verrall 78f45012db amend 250gb limit 2025-01-08 09:44:14 +01:00
benedettadavico f6a2f62ea9 bump versions of binaries 2025-01-08 09:28:48 +01:00
Jędrzej Stuczyński 3efeededc5 feature: expand nym-node prometheus metrics (#5298)
* fixed bearer auth for prometheus route

* basic prometheus metrics

* added rates on global values

* improved structure on the prometheus metrics

* added additional metrics for ingress websockets and egress mixnet connections

* some channel business metrics

* fixed metrics registration and added additional variants

* added counter for number of disk persisted packets

* counter for pending egress packets

* counter for pending egress forward packets

* clippy
2025-01-07 13:34:18 +00:00
Jędrzej Stuczyński c482350ec6 feature: wireguard metrics (#5278)
* experimental log

* introduce wireguard metrics updates

* add wireguard traffic rates to console logger

* missing import

* changed order of displayed values

* expose bytes information via rest endpoint

* clippy
2025-01-07 13:32:07 +00:00
Bogdan-Ștefan Neacşu f7a7a8072f Move tun constants to network defaults (#5286) (#5287) 2024-12-18 16:23:18 +02:00
Jon Häggblad acd068e5ab Add close to credential storage (#5283)
* Add close method to credential storage

* wip
2024-12-18 12:37:16 +01:00
benedetta davico 8d5a41a790 Merge pull request #5277 from nymtech/feature/modify_changelog
Modify CHANGELOG
2024-12-18 11:07:49 +01:00
Bogdan-Ștefan Neacşu caa17d933c Add windows to CI builds (#5269)
* Add windows to CI builds

* Fix win build for node status api

* Fix win build for sdk

* Fix win build for cred proxy
2024-12-17 22:26:38 +01:00
Bogdan-Ștefan Neacşu 039b05cf7e Modify CHANGELOG 2024-12-17 18:59:49 +02:00
benedetta davico 37b10b59aa update changelog for nym-node v1.2.1 2024-12-17 17:54:18 +01:00
benedetta davico a9ede22bbd update nym-node version 2024-12-17 17:41:12 +01:00
Bogdan-Ștefan Neacşu b656003306 Expect that previously regitrated clients don't have v6 addr 2024-12-17 16:59:01 +02:00
dynco-nym b4f51baf94 Change sqlite journal mode to WAL (#5213)
* Change sqlite journal mode to WAL

* Synchronous mode & auto vacuum

* Bump probe git ref to 1.1.0
2024-12-16 16:40:02 +01:00
Drazen Urch a3f3d83c1b Shipping raw metrics to PG (#5216)
* Shipping raw metrics to PG

* Put cancel token back in its place

* fmt
2024-12-16 16:19:37 +01:00
Drazen Urch 84d7004cb2 Add control messages to GatewayTransciver (#5247)
* Add control messages to GatewayTransciver

* Add forget me flag to clients

* CI gate IPIINFO test

* Handle ForgetMe for client and stats db

* fmt
2024-12-16 15:18:04 +01:00
import this be063a36eb syntax hotfix (#5266) 2024-12-16 13:17:38 +00:00
windy-ux 0a712b9fce Fix/web 615 seo setup (#5265)
* + add header into Packet Mixing docs

* + add head changes for testing

* / updated version of metatags in theme.config

* + add env file

* / theme.config to use NEXT_PUBLIC_SITE_URL from env file

* @ Fix broken link in theme.config

* - remove favicon code

* + add desription for intro pages

* + add default book's desriptions

* Revert "+ add desription for intro pages"

This reverts commit 98c78242d4.
2024-12-16 13:17:25 +00:00
Bogdan-Ștefan Neacşu 88d6fb4e22 Add fd callback to client core (#5230)
* Add fd callback to client core

* Include in sdk

* Fix clippy many args

* Method in builder

* Replace Box with Arc
2024-12-16 13:57:34 +02:00
Jon Häggblad 04c2045d94 Add PATCH support to nym-http-api-client (#5260) 2024-12-16 12:28:44 +01:00
144 changed files with 4100 additions and 908 deletions
+1 -1
View File
@@ -30,7 +30,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ arc-ubuntu-20.04, custom-runner-mac-m1 ]
os: [ arc-ubuntu-20.04, custom-windows-11, custom-runner-mac-m1 ]
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
+3 -1
View File
@@ -51,4 +51,6 @@ ppa-private-key.b64
ppa-private-key.asc
nym-network-monitor/topology.json
nym-network-monitor/__pycache__
nym-network-monitor/*.key
nym-network-monitor/*.key
nym-network-monitor/.envrc
nym-network-monitor/.envrc
+85
View File
@@ -4,6 +4,91 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2025.1-reeses] (2025-01-15)
- Feture/legacy alert ([#5346])
- chore: readjusted --mode behaviour to fix the regression ([#5331])
- chore: apply 1.84 linter suggestions ([#5330])
- bugfix: make sure refresh data key matches bond info ([#5329])
- reduce log severity for number of packets being delayed ([#5321])
- feat: warn users if node is run in exit mode only ([#5320])
- Bugfix/contract version assignment ([#5318])
- fixed client session histogram buckets ([#5316])
- amend 250gb limit ([#5313])
- feature: expand nym-node prometheus metrics ([#5298])
- Cherry picked #5286 ([#5287])
- Add close to credential storage ([#5283])
- feature: wireguard metrics ([#5278])
- Add PATCH support to nym-http-api-client ([#5260])
- chore: removed legacy socks5 listener ([#5259])
- bugfix: make sure to apply gateway score filtering when choosing initial node ([#5256])
- Update TS bindings ([#5255])
- Add conversion unit tests for auth msg ([#5251])
- Add control messages to GatewayTransciver ([#5247])
- Remove unneeded async function annotation ([#5246])
- bugfix: make sure to update timestamp of last batch verification to prevent double redemption ([#5239])
- Add FromStr impl for UserAgent ([#5236])
- Extend swagger docs ([#5235])
- TicketType derive Hash and Eq ([#5233])
- Add fd callback to client core ([#5230])
- Extend raw ws fd for gateway client ([#5218])
- Shipping raw metrics to PG ([#5216])
- Change sqlite journal mode to WAL ([#5213])
- Derive serialize for UserAgent ([#5210])
- Restore Location fields ([#5208])
- better date serialization ([#5207])
- Fix overflow ([#5204])
- feature: hopefully final steps of the smoosh™️ ([#5201])
- Fix overflow ([#5184])
- NS API - Gateway stats scraping ([#5180])
- introduced initial internal commands for nym-cli: ecash key and request generation ([#5174])
- Move NS client to separate package under NS API ([#5171])
- build(deps): bump micromatch from 4.0.4 to 4.0.8 in /testnet-faucet ([#4813])
[#5346]: https://github.com/nymtech/nym/pull/5346
[#5331]: https://github.com/nymtech/nym/pull/5331
[#5330]: https://github.com/nymtech/nym/pull/5330
[#5329]: https://github.com/nymtech/nym/pull/5329
[#5321]: https://github.com/nymtech/nym/pull/5321
[#5320]: https://github.com/nymtech/nym/pull/5320
[#5318]: https://github.com/nymtech/nym/pull/5318
[#5316]: https://github.com/nymtech/nym/pull/5316
[#5313]: https://github.com/nymtech/nym/pull/5313
[#5298]: https://github.com/nymtech/nym/pull/5298
[#5287]: https://github.com/nymtech/nym/pull/5287
[#5283]: https://github.com/nymtech/nym/pull/5283
[#5278]: https://github.com/nymtech/nym/pull/5278
[#5260]: https://github.com/nymtech/nym/pull/5260
[#5259]: https://github.com/nymtech/nym/pull/5259
[#5256]: https://github.com/nymtech/nym/pull/5256
[#5255]: https://github.com/nymtech/nym/pull/5255
[#5251]: https://github.com/nymtech/nym/pull/5251
[#5247]: https://github.com/nymtech/nym/pull/5247
[#5246]: https://github.com/nymtech/nym/pull/5246
[#5239]: https://github.com/nymtech/nym/pull/5239
[#5236]: https://github.com/nymtech/nym/pull/5236
[#5235]: https://github.com/nymtech/nym/pull/5235
[#5233]: https://github.com/nymtech/nym/pull/5233
[#5230]: https://github.com/nymtech/nym/pull/5230
[#5218]: https://github.com/nymtech/nym/pull/5218
[#5216]: https://github.com/nymtech/nym/pull/5216
[#5213]: https://github.com/nymtech/nym/pull/5213
[#5210]: https://github.com/nymtech/nym/pull/5210
[#5208]: https://github.com/nymtech/nym/pull/5208
[#5207]: https://github.com/nymtech/nym/pull/5207
[#5204]: https://github.com/nymtech/nym/pull/5204
[#5201]: https://github.com/nymtech/nym/pull/5201
[#5184]: https://github.com/nymtech/nym/pull/5184
[#5180]: https://github.com/nymtech/nym/pull/5180
[#5174]: https://github.com/nymtech/nym/pull/5174
[#5171]: https://github.com/nymtech/nym/pull/5171
[#4813]: https://github.com/nymtech/nym/pull/4813
## [2024.14-crunch-patched] (2024-12-17)
- Fixes an issue to allow previously registred clients to connect to latest nym-nodes
- Fixes compatibility issues between nym-nodes and older clients
## [2024.14-crunch] (2024-12-11)
- Merge/release/2024.14-crunch ([#5242])
Generated
+122 -35
View File
@@ -2416,7 +2416,7 @@ dependencies = [
[[package]]
name = "explorer-api"
version = "1.1.43"
version = "1.1.44"
dependencies = [
"chrono",
"clap 4.5.20",
@@ -2476,10 +2476,16 @@ dependencies = [
]
[[package]]
name = "fancy_constructor"
version = "1.2.2"
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f71f317e4af73b2f8f608fac190c52eac4b1879d2145df1db2fe48881ca69435"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fancy_constructor"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07b19d0e43eae2bfbafe4931b5e79c73fb1a849ca15cd41a761a7b8587f9a1a2"
dependencies = [
"macroific",
"proc-macro2",
@@ -2840,15 +2846,15 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gloo-net"
version = "0.5.0"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43aaa242d1239a8822c15c645f02166398da4f8b5c4bae795c1f5b44e9eee173"
checksum = "c06f627b1a58ca3d42b45d6104bf1e1a03799df472df00988b6ba21accc10580"
dependencies = [
"futures-channel",
"futures-core",
"futures-sink",
"gloo-utils 0.2.0",
"http 0.2.12",
"http 1.1.0",
"js-sys",
"pin-project",
"serde",
@@ -3426,7 +3432,8 @@ checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683"
[[package]]
name = "indexed_db_futures"
version = "0.4.2"
source = "git+https://github.com/TiemenSch/rust-indexed-db?branch=update-uuid#9745d015707008b0c410115d787014a6d1af2efb"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0704b71f13f81b5933d791abf2de26b33c40935143985220299a357721166706"
dependencies = [
"accessory",
"cfg-if",
@@ -3664,10 +3671,11 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "js-sys"
version = "0.3.72"
version = "0.3.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9"
checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7"
dependencies = [
"once_cell",
"wasm-bindgen",
]
@@ -4387,7 +4395,7 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "nym-api"
version = "1.1.47"
version = "1.1.48"
dependencies = [
"anyhow",
"async-trait",
@@ -4636,7 +4644,7 @@ dependencies = [
[[package]]
name = "nym-cli"
version = "1.1.45"
version = "1.1.46"
dependencies = [
"anyhow",
"base64 0.22.1",
@@ -4719,7 +4727,7 @@ dependencies = [
[[package]]
name = "nym-client"
version = "1.1.45"
version = "1.1.46"
dependencies = [
"bs58",
"clap 4.5.20",
@@ -5673,18 +5681,20 @@ version = "0.1.0"
dependencies = [
"dashmap",
"lazy_static",
"log",
"prometheus",
"tracing",
]
[[package]]
name = "nym-mixnet-client"
version = "0.1.0"
dependencies = [
"dashmap",
"futures",
"nym-sphinx",
"nym-task",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
]
@@ -5782,6 +5792,7 @@ dependencies = [
"nym-bin-common",
"nym-client-core",
"nym-crypto",
"nym-gateway-requests",
"nym-network-defaults",
"nym-sdk",
"nym-sphinx",
@@ -5795,6 +5806,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-postgres",
"tokio-util",
"utoipa",
"utoipa-swagger-ui",
@@ -5802,7 +5814,7 @@ dependencies = [
[[package]]
name = "nym-network-requester"
version = "1.1.46"
version = "1.1.47"
dependencies = [
"addr",
"anyhow",
@@ -5853,7 +5865,7 @@ dependencies = [
[[package]]
name = "nym-node"
version = "1.2.0"
version = "1.3.0"
dependencies = [
"anyhow",
"async-trait",
@@ -6225,7 +6237,7 @@ dependencies = [
[[package]]
name = "nym-socks5-client"
version = "1.1.45"
version = "1.1.46"
dependencies = [
"bs58",
"clap 4.5.20",
@@ -6799,6 +6811,7 @@ dependencies = [
"nym-crypto",
"nym-gateway-storage",
"nym-network-defaults",
"nym-node-metrics",
"nym-task",
"nym-wireguard-types",
"thiserror",
@@ -6826,7 +6839,7 @@ dependencies = [
[[package]]
name = "nymvisor"
version = "0.1.10"
version = "0.1.11"
dependencies = [
"anyhow",
"bytes",
@@ -7250,6 +7263,24 @@ dependencies = [
"indexmap 2.2.6",
]
[[package]]
name = "phf"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
dependencies = [
"siphasher 0.3.11",
]
[[package]]
name = "pin-project"
version = "1.1.6"
@@ -7388,6 +7419,35 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0"
[[package]]
name = "postgres-protocol"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23"
dependencies = [
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand",
"sha2 0.10.8",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f66ea23a2d0e5734297357705193335e0a957696f34bed2f2faefacb2fec336f"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@@ -9722,6 +9782,32 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "tokio-postgres"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b5d3742945bc7d7f210693b0c58ae542c6fd47b17adbbda0885f3dcb34a6bdb"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"rand",
"socket2",
"tokio",
"tokio-util",
"whoami",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
@@ -10678,9 +10764,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasm-bindgen"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e"
checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396"
dependencies = [
"cfg-if",
"once_cell",
@@ -10689,13 +10775,12 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358"
checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.90",
@@ -10704,21 +10789,22 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.45"
version = "0.4.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b"
checksum = "38176d9b44ea84e9184eff0bc34cc167ed044f816accfe5922e54d84cf48eca2"
dependencies = [
"cfg-if",
"js-sys",
"once_cell",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56"
checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -10726,9 +10812,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68"
checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2"
dependencies = [
"proc-macro2",
"quote",
@@ -10739,9 +10825,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.95"
version = "0.2.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d"
checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6"
[[package]]
name = "wasm-bindgen-test"
@@ -10849,9 +10935,9 @@ dependencies = [
[[package]]
name = "wasmtimer"
version = "0.2.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f656cd8858a5164932d8a90f936700860976ec21eb00e0fe2aa8cab13f6b4cf"
checksum = "0048ad49a55b9deb3953841fa1fc5858f0efbcb7a18868c899a360269fac1b23"
dependencies = [
"futures",
"js-sys",
@@ -10863,9 +10949,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.72"
version = "0.3.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112"
checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -10912,6 +10998,7 @@ checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d"
dependencies = [
"redox_syscall 0.5.1",
"wasite",
"web-sys",
]
[[package]]
+8 -10
View File
@@ -394,19 +394,17 @@ prost = { version = "0.12", default-features = false }
# wasm-related dependencies
gloo-utils = "0.2.0"
gloo-net = "0.5.0"
gloo-net = "0.6.0"
# use a separate branch due to feature unification failures
# this is blocked until the upstream removes outdates `wasm_bindgen` feature usage
# indexed_db_futures = "0.4.1"
indexed_db_futures = { git = "https://github.com/TiemenSch/rust-indexed-db", branch = "update-uuid" }
js-sys = "0.3.70"
# TODO: migrate to 0.6+
indexed_db_futures = "0.4.2"
js-sys = "0.3.76"
serde-wasm-bindgen = "0.6.5"
tsify = "0.4.5"
wasm-bindgen = "0.2.95"
wasm-bindgen-futures = "0.4.45"
wasmtimer = "0.2.0"
web-sys = "0.3.72"
wasm-bindgen = "0.2.99"
wasm-bindgen-futures = "0.4.49"
wasmtimer = "0.4.1"
web-sys = "0.3.76"
# Profile settings for individual crates
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.45"
version = "1.1.46"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.45"
version = "1.1.46"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
@@ -28,7 +28,7 @@ pub type HmacSha256 = Hmac<Sha256>;
pub type Nonce = u64;
pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: u64 = 1024 * 1024 * 1024; // 1 GB
pub const BANDWIDTH_CAP_PER_DAY: u64 = 250 * 1024 * 1024 * 1024; // 250 GB
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct IpPair {
@@ -8,7 +8,10 @@ use crate::{
},
};
use log::{debug, error};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
#[derive(Debug, Clone)]
@@ -30,6 +33,9 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -110,7 +116,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
VALUES (?, ?, ?)
"#,
registered_gateway.gateway_id_bs58,
@@ -224,7 +230,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
VALUES (?, ?)
"#,
custom.gateway_id_bs58,
@@ -32,7 +32,7 @@ use crate::init::{
setup_gateway,
types::{GatewaySetup, InitialisationResult},
};
use crate::{config, spawn_future};
use crate::{config, spawn_future, ForgetMe};
use futures::channel::mpsc;
use log::*;
use nym_bandwidth_controller::BandwidthController;
@@ -188,6 +188,11 @@ pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
user_agent: Option<UserAgent>,
setup_method: GatewaySetup,
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
forget_me: ForgetMe,
}
impl<'a, C, S> BaseClientBuilder<'a, C, S>
@@ -210,9 +215,18 @@ where
shutdown: None,
user_agent: None,
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
connection_fd_callback: None,
forget_me: Default::default(),
}
}
#[must_use]
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
self.forget_me = forget_me.clone();
self
}
#[must_use]
pub fn with_gateway_setup(mut self, setup: GatewaySetup) -> Self {
self.setup_method = setup;
@@ -261,6 +275,15 @@ where
Ok(self)
}
#[cfg(unix)]
pub fn with_connection_fd_callback(
mut self,
callback: Arc<dyn Fn(RawFd) + Send + Sync>,
) -> Self {
self.connection_fd_callback = Some(callback);
self
}
// note: do **NOT** make this method public as its only valid usage is from within `start_base`
// because it relies on the crypto keys being already loaded
fn mix_address(details: &InitialisationResult) -> Recipient {
@@ -352,6 +375,7 @@ where
controller.start_with_shutdown(shutdown)
}
#[allow(clippy::too_many_arguments)]
async fn start_gateway_client(
config: &Config,
initialisation_result: InitialisationResult,
@@ -359,6 +383,7 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
shutdown: TaskClient,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
@@ -401,6 +426,8 @@ where
packet_router,
bandwidth_controller,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
};
@@ -462,6 +489,7 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
mut shutdown: TaskClient,
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
where
@@ -493,6 +521,8 @@ where
details_store,
packet_router,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
.await?;
@@ -615,9 +645,11 @@ where
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown: TaskClient,
forget_me: ForgetMe,
) -> BatchMixMessageSender {
info!("Starting mix traffic controller...");
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_transceiver);
let (mix_traffic_controller, mix_tx) =
MixTrafficController::new(gateway_transceiver, forget_me);
mix_traffic_controller.start_with_shutdown(shutdown);
mix_tx
}
@@ -772,6 +804,8 @@ where
&details_store,
gateway_packet_router,
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
shutdown.fork("gateway_transceiver"),
)
.await?;
@@ -797,9 +831,11 @@ where
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
// The MixTrafficController then sends the actual traffic
let message_sender = Self::start_mix_traffic_controller(
gateway_transceiver,
shutdown.fork("mix_traffic_controller"),
self.forget_me,
);
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -2,8 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::spawn_future;
use crate::{spawn_future, ForgetMe};
use log::*;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
@@ -26,10 +27,14 @@ pub struct MixTrafficController {
// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
consecutive_gateway_failure_count: usize,
forget_me: ForgetMe,
}
impl MixTrafficController {
pub fn new<T>(gateway_transceiver: T) -> (MixTrafficController, BatchMixMessageSender)
pub fn new<T>(
gateway_transceiver: T,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender)
where
T: GatewayTransceiver + Send + 'static,
{
@@ -40,6 +45,7 @@ impl MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_rx: message_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
},
message_sender,
)
@@ -47,6 +53,7 @@ impl MixTrafficController {
pub fn new_dynamic(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
forget_me: ForgetMe,
) -> (MixTrafficController, BatchMixMessageSender) {
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
@@ -55,6 +62,7 @@ impl MixTrafficController {
gateway_transceiver,
mix_rx: message_receiver,
consecutive_gateway_failure_count: 0,
forget_me,
},
message_sender,
)
@@ -111,7 +119,27 @@ impl MixTrafficController {
}
}
shutdown.recv_timeout().await;
if self.forget_me.any() {
log::info!("Sending forget me request to the gateway");
match self
.gateway_transceiver
.send_client_request(ClientRequest::ForgetMe {
client: self.forget_me.client(),
stats: self.forget_me.stats(),
})
.await
{
Ok(_) => {
log::info!("Successfully sent forget me request to the gateway");
}
Err(err) => {
log::error!("Failed to send forget me request to the gateway: {err}");
}
}
}
log::debug!("MixTrafficController: Exiting");
})
});
}
}
@@ -5,8 +5,10 @@ use async_trait::async_trait;
use log::{debug, error};
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::identity;
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::GatewayClient;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
@@ -26,9 +28,14 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
}
/// This combines combines the functionalities of being able to send and receive mix packets.
#[async_trait]
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> identity::PublicKey;
fn ws_fd(&self) -> Option<RawFd>;
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError>;
}
/// This trait defines the functionality of sending `MixPacket` into the mixnet,
@@ -65,6 +72,7 @@ pub trait GatewayReceiver {
}
// to allow for dynamic dispatch
#[async_trait]
impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
#[inline]
fn gateway_identity(&self) -> identity::PublicKey {
@@ -73,6 +81,13 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
fn ws_fd(&self) -> Option<RawFd> {
(**self).ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
(**self).send_client_request(message).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -91,7 +106,6 @@ impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
(**self).batch_send_mix_packets(packets).await
}
}
impl<G: GatewayReceiver + ?Sized> GatewayReceiver for Box<G> {
#[inline]
fn set_packet_router(&mut self, packet_router: PacketRouter) -> Result<(), ErasedGatewayError> {
@@ -111,6 +125,7 @@ impl<C, St> RemoteGateway<C, St> {
}
}
#[async_trait]
impl<C, St> GatewayTransceiver for RemoteGateway<C, St>
where
C: DkgQueryClient + Send + Sync,
@@ -123,6 +138,20 @@ where
fn ws_fd(&self) -> Option<RawFd> {
self.gateway_client.ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.gateway_client.shared_key() {
self.gateway_client
.send_websocket_message(message.encrypt(&*shared_key)?)
.await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
}
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -195,6 +224,7 @@ impl LocalGateway {
mod nonwasm_sealed {
use super::*;
#[async_trait]
impl GatewayTransceiver for LocalGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.local_identity
@@ -202,6 +232,13 @@ mod nonwasm_sealed {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
#[async_trait]
@@ -269,6 +306,7 @@ impl GatewaySender for MockGateway {
}
}
#[async_trait]
impl GatewayTransceiver for MockGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.dummy_identity
@@ -276,4 +314,11 @@ impl GatewayTransceiver for MockGateway {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
+1 -1
View File
@@ -190,7 +190,7 @@ where
Ok(GatewayWithLatency::new(gateway, avg))
}
pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone>(
pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
rng: &mut R,
gateways: &[G],
must_use_tls: bool,
+45
View File
@@ -34,3 +34,48 @@ where
{
tokio::spawn(future);
}
#[derive(Clone, Default, Debug)]
pub struct ForgetMe {
client: bool,
stats: bool,
}
impl ForgetMe {
pub fn new_all() -> Self {
Self {
client: true,
stats: true,
}
}
pub fn new_client() -> Self {
Self {
client: true,
stats: false,
}
}
pub fn new_stats() -> Self {
Self {
client: false,
stats: true,
}
}
pub fn new(client: bool, stats: bool) -> Self {
Self { client, stats }
}
pub fn any(&self) -> bool {
self.client || self.stats
}
pub fn client(&self) -> bool {
self.client
}
pub fn stats(&self) -> bool {
self.stats
}
}
@@ -9,7 +9,10 @@ use crate::backend::fs_backend::{
},
};
use log::{error, info};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
#[derive(Debug, Clone)]
@@ -31,6 +34,9 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(fresh)
.disable_statement_logging();
@@ -101,6 +101,10 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
// currently unused (but populated)
negotiated_protocol: Option<u8>,
// Callback on the fd as soon as the connection has been established
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
/// Listen to shutdown messages and send notifications back to the task manager
task_client: TaskClient,
}
@@ -116,6 +120,7 @@ impl<C, St> GatewayClient<C, St> {
packet_router: PacketRouter,
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
task_client: TaskClient,
) -> Self {
GatewayClient {
@@ -131,6 +136,8 @@ impl<C, St> GatewayClient<C, St> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
task_client,
}
}
@@ -205,6 +212,12 @@ impl<C, St> GatewayClient<C, St> {
};
self.connection = SocketState::Available(Box::new(ws_stream));
#[cfg(unix)]
if let (Some(callback), Some(fd)) = (self.connection_fd_callback.as_ref(), self.ws_fd()) {
callback.as_ref()(fd);
}
Ok(())
}
@@ -311,7 +324,7 @@ impl<C, St> GatewayClient<C, St> {
// If we want to send a message (with response), we need to have a full control over the socket,
// as we need to be able to write the request and read the subsequent response
async fn send_websocket_message(
pub async fn send_websocket_message(
&mut self,
msg: impl Into<Message>,
) -> Result<ServerResponse, GatewayClientError> {
@@ -1034,6 +1047,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_controller: None,
stats_reporter: ClientStatsSender::new(None),
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback: None,
task_client,
}
}
@@ -1064,6 +1079,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: self.negotiated_protocol,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
task_client,
}
}
+3 -1
View File
@@ -8,10 +8,12 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dashmap = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tokio = { workspace = true, features = ["time", "sync"] }
tokio-util = { workspace = true, features = ["codec"], optional = true }
tokio-stream = { workspace = true }
# internal
nym-sphinx = { path = "../../nymsphinx" }
+141 -86
View File
@@ -1,21 +1,24 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::channel::mpsc;
use dashmap::DashMap;
use futures::StreamExt;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::params::PacketType;
use nym_sphinx::NymPacket;
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::Framed;
use tracing::*;
@@ -55,11 +58,37 @@ pub trait SendWithoutResponse {
}
pub struct Client {
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
active_connections: ActiveConnections,
connections_count: Arc<AtomicUsize>,
config: Config,
}
struct ConnectionSender {
#[derive(Default, Clone)]
pub struct ActiveConnections {
inner: Arc<DashMap<NymNodeRoutingAddress, ConnectionSender>>,
}
impl ActiveConnections {
pub fn pending_packets(&self) -> usize {
self.inner
.iter()
.map(|sender| {
let max_capacity = sender.channel.max_capacity();
let capacity = sender.channel.capacity();
max_capacity - capacity
})
.sum()
}
}
impl Deref for ActiveConnections {
type Target = DashMap<NymNodeRoutingAddress, ConnectionSender>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct ConnectionSender {
channel: mpsc::Sender<FramedNymPacket>,
current_reconnection_attempt: Arc<AtomicU32>,
}
@@ -73,46 +102,53 @@ impl ConnectionSender {
}
}
impl Client {
pub fn new(config: Config) -> Client {
Client {
conn_new: HashMap::new(),
config,
struct ManagedConnection {
address: SocketAddr,
message_receiver: ReceiverStream<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
}
impl ManagedConnection {
fn new(
address: SocketAddr,
message_receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
) -> Self {
ManagedConnection {
address,
message_receiver: ReceiverStream::new(message_receiver),
connection_timeout,
current_reconnection,
}
}
async fn manage_connection(
address: SocketAddr,
receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
) {
async fn run(self) {
let address = self.address;
let connection_fut = TcpStream::connect(address);
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", address);
debug!("Managed to establish connection to {}", self.address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
self.current_reconnection.store(0, Ordering::Release);
Framed::new(stream, NymCodec)
}
Err(err) => {
debug!(
"failed to establish connection to {} (err: {})",
address, err
);
debug!("failed to establish connection to {address} (err: {err})",);
return;
}
},
Err(_) => {
debug!(
"failed to connect to {} within {:?}",
address, connection_timeout
"failed to connect to {address} within {:?}",
self.connection_timeout
);
// we failed to connect - increase reconnection attempt
current_reconnection.fetch_add(1, Ordering::SeqCst);
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
@@ -120,15 +156,28 @@ impl Client {
// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {} - {err}", address);
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {address}: {err}");
}
debug!(
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
address
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
);
}
}
impl Client {
pub fn new(config: Config, connections_count: Arc<AtomicUsize>) -> Client {
Client {
active_connections: Default::default(),
connections_count,
config,
}
}
pub fn active_connections(&self) -> ActiveConnections {
self.active_connections.clone()
}
/// If we're trying to reconnect, determine how long we should wait.
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
@@ -148,7 +197,7 @@ impl Client {
}
fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) {
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
let (sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
// this CAN'T fail because we just created the channel which has a non-zero capacity
if self.config.maximum_connection_buffer_size > 0 {
@@ -156,15 +205,16 @@ impl Client {
}
// if we already tried to connect to `address` before, grab the current attempt count
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.conn_new.insert(address, new_entry);
current_attempt
};
let current_reconnection_attempt =
if let Some(mut existing) = self.active_connections.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.active_connections.insert(address, new_entry);
current_attempt
};
// load the actual value.
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
@@ -173,6 +223,7 @@ impl Client {
// copy the value before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;
let connections_count = self.connections_count.clone();
tokio::spawn(async move {
// before executing the manager, wait for what was specified, if anything
if let Some(backoff) = backoff {
@@ -180,13 +231,16 @@ impl Client {
sleep(backoff).await;
}
Self::manage_connection(
connections_count.fetch_add(1, Ordering::SeqCst);
ManagedConnection::new(
address.into(),
receiver,
initial_connection_timeout,
&current_reconnection_attempt,
current_reconnection_attempt,
)
.await
.run()
.await;
connections_count.fetch_sub(1, Ordering::SeqCst);
});
}
}
@@ -201,49 +255,47 @@ impl SendWithoutResponse for Client {
trace!("Sending packet to {address:?}");
let framed_packet = FramedNymPacket::new(packet, packet_type);
if let Some(sender) = self.conn_new.get_mut(&address) {
if let Err(err) = sender.channel.try_send(framed_packet) {
if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
))
} else if err.is_disconnected() {
debug!(
"Connection to {} seems to be dead. attempting to re-establish it...",
address
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, err.into_inner());
Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
))
} else {
// this can't really happen, but let's safe-guard against it in case something changes in futures library
Err(io::Error::new(
io::ErrorKind::Other,
"unknown connection buffer error",
))
}
} else {
Ok(())
}
} else {
let Some(sender) = self.active_connections.get_mut(&address) else {
// there was never a connection to begin with
debug!("establishing initial connection to {}", address);
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
Err(io::Error::new(
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection is in progress",
))
}
));
};
let sending_res = sender.channel.try_send(framed_packet);
drop(sender);
sending_res.map_err(|err| {
match err {
TrySendError::Full(_) => {
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
)
}
TrySendError::Closed(dropped) => {
debug!(
"Connection to {address} seems to be dead. attempting to re-establish it...",
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, dropped);
io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
)
}
}
} )
}
}
@@ -252,12 +304,15 @@ mod tests {
use super::*;
fn dummy_client() -> Client {
Client::new(Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
})
Client::new(
Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
},
Default::default(),
)
}
#[test]
@@ -65,6 +65,12 @@ pub enum EcashApiError {
#[from]
source: cosmrs::ErrorReport,
},
#[error("nym api error")]
NymApi {
#[from]
source: crate::ValidatorClientError,
},
}
impl TryFrom<ContractVKShare> for EcashApiClient {
@@ -863,11 +863,4 @@ pub enum QueryMsg {
pub struct MigrateMsg {
pub unsafe_skip_state_updates: Option<bool>,
pub vesting_contract_address: Option<String>,
pub current_nym_node_semver: String,
#[serde(default)]
pub version_score_weights: OutdatedVersionWeights,
#[serde(default)]
pub version_score_params: VersionScoreFormulaParams,
}
@@ -23,6 +23,11 @@ impl SqliteEcashTicketbookManager {
SqliteEcashTicketbookManager { connection_pool }
}
/// Closes the connection pool.
pub async fn close(&self) {
self.connection_pool.close().await
}
pub(crate) async fn cleanup_expired(&self, deadline: Date) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM ecash_ticketbook WHERE expiration_date <= ?",
@@ -43,6 +43,10 @@ impl Debug for EphemeralStorage {
impl Storage for EphemeralStorage {
type StorageError = StorageError;
async fn close(&self) {
// nothing to do here
}
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
self.storage_manager.cleanup_expired().await;
Ok(())
@@ -33,7 +33,10 @@ use nym_credentials::{
IssuanceTicketBook, IssuedTicketBook,
};
use nym_ecash_time::{ecash_today, Date, EcashTime};
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use zeroize::Zeroizing;
@@ -56,6 +59,9 @@ impl PersistentStorage {
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -83,6 +89,10 @@ impl PersistentStorage {
impl Storage for PersistentStorage {
type StorageError = StorageError;
async fn close(&self) {
self.storage_manager.close().await
}
/// remove all expired ticketbooks and expiration date signatures
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
let ecash_yesterday = ecash_today().date().previous_day().unwrap();
+2
View File
@@ -22,6 +22,8 @@ use std::error::Error;
pub trait Storage: Send + Sync {
type StorageError: Error;
async fn close(&self);
/// remove all expired ticketbooks and expiration date signatures
async fn cleanup_expired(&self) -> Result<(), Self::StorageError>;
@@ -13,6 +13,7 @@ use nym_api_requests::constants::MIN_BATCH_REDEMPTION_DELAY;
use nym_api_requests::ecash::models::{BatchRedeemTicketsBody, VerifyEcashTicketBody};
use nym_credentials_interface::Bandwidth;
use nym_credentials_interface::{ClientTicket, TicketType};
use nym_validator_client::coconut::EcashApiError;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::{
EcashSigningClient, MultisigQueryClient, MultisigSigningClient, PagedMultisigQueryClient,
@@ -352,7 +353,9 @@ impl CredentialHandler {
}
Err(err) => {
error!("failed to send ticket {ticket_id} for verification to ecash signer '{client}': {err}. if we don't reach quorum, we'll retry later");
Ok(false)
Err(EcashTicketError::ApiFailure(EcashApiError::NymApi {
source: err,
}))
}
}
}
@@ -9,7 +9,7 @@ use futures::{Sink, Stream};
use rand::{CryptoRng, RngCore};
use tungstenite::Message as WsMessage;
impl<'a, S, R> State<'a, S, R> {
impl<S, R> State<'_, S, R> {
async fn client_handshake_inner(&mut self) -> Result<(), HandshakeError>
where
S: Stream<Item = WsItem> + Sink<WsMessage> + Unpin,
@@ -10,7 +10,7 @@ use crate::registration::handshake::{error::HandshakeError, WsItem};
use futures::{Sink, Stream};
use tungstenite::Message as WsMessage;
impl<'a, S, R> State<'a, S, R> {
impl<S, R> State<'_, S, R> {
async fn gateway_handshake_inner(
&mut self,
raw_init_message: Vec<u8>,
@@ -20,6 +20,10 @@ pub enum ClientRequest {
hkdf_salt: Vec<u8>,
derived_key_digest: Vec<u8>,
},
ForgetMe {
client: bool,
stats: bool,
},
}
impl ClientRequest {
@@ -11,6 +11,7 @@ use tungstenite::Message;
#[non_exhaustive]
pub enum SensitiveServerResponse {
KeyUpgradeAck {},
ForgetMeAck {},
}
impl SensitiveServerResponse {
+17 -1
View File
@@ -6,7 +6,10 @@ use models::StoredFinishedSession;
use nym_node_metrics::entry::{ActiveSession, FinishedSession, SessionType};
use nym_sphinx::DestinationAddressBytes;
use sessions::SessionManager;
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use time::Date;
use tracing::{debug, error};
@@ -36,6 +39,9 @@ impl PersistentStatsStorage {
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -116,6 +122,16 @@ impl PersistentStatsStorage {
.await?)
}
pub async fn delete_unique_user(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_unique_user(client_address.as_base58_string())
.await?)
}
pub async fn insert_active_session(
&self,
client_address: DestinationAddressBytes,
@@ -71,6 +71,16 @@ impl SessionManager {
Ok(())
}
pub(crate) async fn delete_unique_user(&self, client_address_b58: String) -> Result<()> {
sqlx::query!(
"DELETE FROM sessions_unique_users WHERE client_address = ?",
client_address_b58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn get_unique_users(&self, date: Date) -> Result<Vec<String>> {
sqlx::query_scalar!(
"SELECT client_address as count FROM sessions_unique_users WHERE day = ?",
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM message_store WHERE client_address_bs58 = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "3ea5542b21a41b14276a8fd6b870c61aa0ddd30fee2565803b88c6086bd2a734"
}
@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "DELETE FROM available_bandwidth WHERE client_id = ?",
"describe": {
"columns": [],
"parameters": {
"Right": 1
},
"nullable": []
},
"hash": "a3cc707995b8215fa77738cd1a55f9e8d251a3e764104d2a54153895dee1a118"
}
+10
View File
@@ -49,6 +49,16 @@ impl BandwidthManager {
Ok(())
}
pub(crate) async fn remove_client(&self, client_id: i64) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM available_bandwidth WHERE client_id = ?",
client_id
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
/// Set the expiration date of the particular client to the provided date.
pub(crate) async fn set_expiration(
&self,
+13
View File
@@ -133,4 +133,17 @@ impl InboxManager {
.await?;
Ok(())
}
pub(crate) async fn remove_messages_for_client(
&self,
client_address_bs58: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM message_store WHERE client_address_bs58 = ?",
client_address_bs58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
}
+49 -1
View File
@@ -12,7 +12,10 @@ use nym_credentials_interface::ClientTicket;
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_sphinx::DestinationAddressBytes;
use shared_keys::SharedKeysManager;
use sqlx::ConnectOptions;
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use tickets::TicketStorageManager;
use time::OffsetDateTime;
@@ -41,6 +44,33 @@ pub struct GatewayStorage {
}
impl GatewayStorage {
#[allow(dead_code)]
pub(crate) fn client_manager(&self) -> &ClientManager {
&self.client_manager
}
pub(crate) fn shared_key_manager(&self) -> &SharedKeysManager {
&self.shared_key_manager
}
pub(crate) fn inbox_manager(&self) -> &InboxManager {
&self.inbox_manager
}
pub(crate) fn bandwidth_manager(&self) -> &BandwidthManager {
&self.bandwidth_manager
}
#[allow(dead_code)]
pub(crate) fn ticket_manager(&self) -> &TicketStorageManager {
&self.ticket_manager
}
#[allow(dead_code)]
pub(crate) fn wireguard_peer_manager(&self) -> &wireguard_peers::WgPeerManager {
&self.wireguard_peer_manager
}
/// Initialises `PersistentStorage` using the provided path.
///
/// # Arguments
@@ -59,6 +89,9 @@ impl GatewayStorage {
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -101,6 +134,21 @@ impl GatewayStorage {
.await?)
}
pub async fn handle_forget_me(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), GatewayStorageError> {
let client_id = self.get_mixnet_client_id(client_address).await?;
self.inbox_manager()
.remove_messages_for_client(&client_address.as_base58_string())
.await?;
self.bandwidth_manager().remove_client(client_id).await?;
self.shared_key_manager()
.remove_shared_keys(&client_address.as_base58_string())
.await?;
Ok(())
}
pub async fn insert_shared_keys(
&self,
client_address: DestinationAddressBytes,
+103
View File
@@ -324,6 +324,56 @@ impl Client {
}
}
pub fn create_patch_request<B, K, V>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> RequestBuilder
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
{
let url = sanitize_url(&self.base_url, path, params);
self.reqwest_client.patch(url).json(json_body)
}
pub async fn send_patch_request<B, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<Response, HttpClientError<E>>
where
B: Serialize + ?Sized,
K: AsRef<str>,
V: AsRef<str>,
E: Display,
{
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
{
Ok(wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client.patch(url).json(json_body).send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
}
#[cfg(not(target_arch = "wasm32"))]
{
Ok(self
.reqwest_client
.patch(url)
.json(json_body)
.send()
.await?)
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_json<T, K, V, E>(
&self,
@@ -372,6 +422,23 @@ impl Client {
parse_response(res, false).await
}
pub async fn patch_json<B, T, K, V, E>(
&self,
path: PathSegments<'_>,
params: Params<'_, K, V>,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
K: AsRef<str>,
V: AsRef<str>,
E: Display + DeserializeOwned,
{
let res = self.send_patch_request(path, params, json_body).await?;
parse_response(res, true).await
}
#[instrument(level = "debug", skip_all)]
pub async fn get_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
where
@@ -466,6 +533,42 @@ impl Client {
parse_response(res, false).await
}
pub async fn patch_json_endpoint<B, T, S, E>(
&self,
endpoint: S,
json_body: &B,
) -> Result<T, HttpClientError<E>>
where
B: Serialize + ?Sized,
for<'a> T: Deserialize<'a>,
E: Display + DeserializeOwned,
S: AsRef<str>,
{
#[cfg(target_arch = "wasm32")]
let res = {
wasmtimer::tokio::timeout(
self.request_timeout,
self.reqwest_client
.patch(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send(),
)
.await
.map_err(|_timeout| HttpClientError::RequestTimeout)??
};
#[cfg(not(target_arch = "wasm32"))]
let res = {
self.reqwest_client
.patch(self.base_url.join(endpoint.as_ref())?)
.json(json_body)
.send()
.await?
};
parse_response(res, true).await
}
}
// define those methods on the trait for nicer extensions (and not having to type the thing twice)
+11
View File
@@ -57,3 +57,14 @@ pub mod wireguard {
pub const WG_TUN_DEVICE_IP_ADDRESS_V6: Ipv6Addr = Ipv6Addr::new(0xfc01, 0, 0, 0, 0, 0, 0, 0x1); // fc01::1
pub const WG_TUN_DEVICE_NETMASK_V6: u8 = 112;
}
pub mod mixnet_vpn {
use std::net::{Ipv4Addr, Ipv6Addr};
// The interface used to route traffic
pub const NYM_TUN_BASE_NAME: &str = "nymtun";
pub const NYM_TUN_DEVICE_ADDRESS_V4: Ipv4Addr = Ipv4Addr::new(10, 0, 0, 1);
pub const NYM_TUN_DEVICE_NETMASK_V4: Ipv4Addr = Ipv4Addr::new(255, 255, 0, 0);
pub const NYM_TUN_DEVICE_ADDRESS_V6: Ipv6Addr = Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 0x1); // fc00::1
pub const NYM_TUN_DEVICE_NETMASK_V6: &str = "112";
}
@@ -65,6 +65,14 @@ impl<T> NonExhaustiveDelayQueue<T> {
pub fn remove(&mut self, key: &QueueKey) -> Expired<T> {
self.inner.remove(key)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl<T> Default for NonExhaustiveDelayQueue<T> {
+1 -1
View File
@@ -12,6 +12,6 @@ license.workspace = true
[dependencies]
prometheus = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
dashmap = { workspace = true }
lazy_static = { workspace = true }
+390 -81
View File
@@ -1,14 +1,18 @@
use dashmap::DashMap;
pub use log::error;
use log::{debug, warn};
use std::fmt;
pub use std::time::Instant;
use tracing::{debug, error, warn};
use prometheus::{core::Collector, Encoder as _, IntCounter, IntGauge, Registry, TextEncoder};
use prometheus::{
core::Collector, Encoder as _, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge, Registry,
TextEncoder,
};
pub use prometheus::HistogramTimer;
pub use std::time::Instant;
#[macro_export]
macro_rules! prepend_package_name {
($name: literal) => {
($name: tt) => {
&format!(
"{}_{}",
std::module_path!()
@@ -23,15 +27,29 @@ macro_rules! prepend_package_name {
#[macro_export]
macro_rules! inc_by {
($name:literal, $x:expr, $help: expr) => {
$crate::REGISTRY.maybe_register_and_inc_by(
$crate::prepend_package_name!($name),
$x as i64,
$help,
);
};
($name:literal, $x:expr) => {
$crate::REGISTRY.inc_by($crate::prepend_package_name!($name), $x as i64);
$crate::REGISTRY.maybe_register_and_inc_by(
$crate::prepend_package_name!($name),
$x as i64,
None,
);
};
}
#[macro_export]
macro_rules! inc {
($name:literal, $help: expr) => {
$crate::REGISTRY.maybe_register_and_inc($crate::prepend_package_name!($name), $help);
};
($name:literal) => {
$crate::REGISTRY.inc($crate::prepend_package_name!($name));
$crate::REGISTRY.maybe_register_and_inc($crate::prepend_package_name!($name), None);
};
}
@@ -42,6 +60,71 @@ macro_rules! metrics {
};
}
#[macro_export]
macro_rules! set_metric {
($name:literal, $x:expr, $help: expr) => {
$crate::REGISTRY.maybe_register_and_set(
$crate::prepend_package_name!($name),
$x as i64,
$help,
);
};
($name:literal, $x:expr) => {
$crate::REGISTRY.maybe_register_and_set(
$crate::prepend_package_name!($name),
$x as i64,
None,
);
};
}
#[macro_export]
macro_rules! set_metric_float {
($name:literal, $x:expr, $help: expr) => {
$crate::REGISTRY.maybe_register_and_set_float(
$crate::prepend_package_name!($name),
$x as f64,
$help,
);
};
($name:literal, $x:expr) => {
$crate::REGISTRY.maybe_register_and_set_float(
$crate::prepend_package_name!($name),
$x as f64,
None,
);
};
}
#[macro_export]
macro_rules! add_histogram_obs {
($name:expr, $x:expr, $b:expr, $help:expr) => {
$crate::REGISTRY.maybe_register_and_add_to_histogram(
$crate::prepend_package_name!($name),
$x as f64,
Some($b),
$help,
);
};
($name:expr, $x:expr, $b:expr) => {
$crate::REGISTRY.maybe_register_and_add_to_histogram(
$crate::prepend_package_name!($name),
$x as f64,
Some($b),
None,
);
};
($name:expr, $x:expr) => {
$crate::REGISTRY.maybe_register_and_add_to_histogram(
$crate::prepend_package_name!($name),
$x as f64,
None,
None,
);
};
}
#[macro_export]
macro_rules! nanos {
( $name:literal, $x:expr ) => {{
@@ -50,7 +133,7 @@ macro_rules! nanos {
let r = $x;
let duration = start.elapsed().as_nanos() as i64;
let name = $crate::prepend_package_name!($name);
$crate::REGISTRY.inc_by(&format!("{}_nanos", $name), duration);
$crate::REGISTRY.maybe_register_and_inc_by(&format!("{}_nanos", $name), duration, None);
r
}};
}
@@ -59,15 +142,100 @@ lazy_static::lazy_static! {
pub static ref REGISTRY: MetricsController = MetricsController::default();
}
pub fn metrics_registry() -> &'static MetricsController {
&REGISTRY
}
#[derive(Default)]
pub struct MetricsController {
registry: Registry,
registry_index: DashMap<String, Metric>,
}
enum Metric {
C(Box<IntCounter>),
G(Box<IntGauge>),
pub enum Metric {
IntCounter(Box<IntCounter>),
IntGauge(Box<IntGauge>),
FloatGauge(Box<Gauge>),
Histogram(Box<Histogram>),
}
impl Metric {
pub fn new_int_counter(name: &str, help: &str) -> Option<Self> {
match IntCounter::new(sanitize_metric_name(name), help) {
Ok(c) => Some(c.into()),
Err(err) => {
error!("Failed to create counter {name:?}: {err}");
None
}
}
}
pub fn new_int_gauge(name: &str, help: &str) -> Option<Self> {
match IntGauge::new(sanitize_metric_name(name), help) {
Ok(g) => Some(g.into()),
Err(err) => {
error!("Failed to create gauge {name:?}: {err}");
None
}
}
}
pub fn new_float_gauge(name: &str, help: &str) -> Option<Self> {
match Gauge::new(sanitize_metric_name(name), help) {
Ok(g) => Some(g.into()),
Err(err) => {
error!("Failed to create gauge {name:?}: {err}");
None
}
}
}
pub fn new_histogram(name: &str, help: &str, buckets: Option<&[f64]>) -> Option<Self> {
let mut opts = HistogramOpts::new(sanitize_metric_name(name), help);
if let Some(buckets) = buckets {
opts = opts.buckets(buckets.to_vec())
}
match Histogram::with_opts(opts) {
Ok(h) => Some(Metric::Histogram(Box::new(h))),
Err(err) => {
error!("failed to create histogram {name:?}: {err}");
None
}
}
}
fn as_collector(&self) -> Box<dyn Collector> {
match self {
Metric::IntCounter(c) => c.clone(),
Metric::IntGauge(g) => g.clone(),
Metric::FloatGauge(g) => g.clone(),
Metric::Histogram(h) => h.clone(),
}
}
}
impl From<IntCounter> for Metric {
fn from(v: IntCounter) -> Self {
Metric::IntCounter(Box::new(v))
}
}
impl From<IntGauge> for Metric {
fn from(v: IntGauge) -> Self {
Metric::IntGauge(Box::new(v))
}
}
impl From<Gauge> for Metric {
fn from(v: Gauge) -> Self {
Metric::FloatGauge(Box::new(v))
}
}
impl From<Histogram> for Metric {
fn from(v: Histogram) -> Self {
Metric::Histogram(Box::new(v))
}
}
fn fq_name(c: &dyn Collector) -> String {
@@ -81,34 +249,92 @@ impl Metric {
#[inline(always)]
fn fq_name(&self) -> String {
match self {
Metric::C(c) => fq_name(c.as_ref()),
Metric::G(g) => fq_name(g.as_ref()),
Metric::IntCounter(c) => fq_name(c.as_ref()),
Metric::IntGauge(g) => fq_name(g.as_ref()),
Metric::FloatGauge(g) => fq_name(g.as_ref()),
Metric::Histogram(h) => fq_name(h.as_ref()),
}
}
#[inline(always)]
fn inc(&self) {
match self {
Metric::C(c) => c.inc(),
Metric::G(g) => g.inc(),
Metric::IntCounter(c) => c.inc(),
Metric::IntGauge(g) => g.inc(),
Metric::FloatGauge(g) => g.inc(),
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call increment on a histogram")
}
}
}
#[inline(always)]
fn inc_by(&self, value: i64) {
match self {
Metric::C(c) => c.inc_by(value as u64),
Metric::G(g) => g.add(value),
Metric::IntCounter(c) => c.inc_by(value as u64),
Metric::IntGauge(g) => g.add(value),
Metric::FloatGauge(g) => {
warn!("attempted to increment a float gauge ('{}') by an integer - this is most likely a bug", self.fq_name());
g.add(value as f64)
}
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call increment on a histogram")
}
}
}
#[inline(always)]
fn set(&self, value: i64) {
match self {
Metric::C(_c) => {
Metric::IntCounter(_c) => {
warn!("Cannot set value for counter {:?}", self.fq_name());
}
Metric::G(g) => g.set(value),
Metric::IntGauge(g) => g.set(value),
Metric::FloatGauge(g) => {
warn!("attempted to set a float gauge ('{}') to an integer value - this is most likely a bug", self.fq_name());
g.set(value as f64)
}
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call set on a histogram")
}
}
}
#[inline(always)]
fn set_float(&self, value: f64) {
match self {
Metric::IntCounter(_c) => {
warn!("Cannot set value for counter {:?}", self.fq_name());
}
Metric::IntGauge(g) => {
warn!("attempted to set a integer gauge ('{}') to a float value - this is most likely a bug", self.fq_name());
g.set(value as i64)
}
Metric::FloatGauge(g) => g.set(value),
Metric::Histogram(_) => {
warn!("invalid operation: attempted to call increment on a histogram")
}
}
}
#[inline(always)]
fn add_histogram_observation(&self, value: f64) {
match self {
Metric::Histogram(h) => {
h.observe(value);
}
_ => warn!("attempted to add histogram observation on a non-histogram metric"),
}
}
#[inline(always)]
fn start_timer(&self) -> Option<HistogramTimer> {
match self {
Metric::Histogram(h) => Some(h.start_timer()),
_ => {
warn!("attempted to start histogram observation on a non-histogram metric");
None
}
}
}
}
@@ -145,93 +371,165 @@ impl MetricsController {
}
}
pub fn set(&self, name: &str, value: i64) {
pub fn register_int_gauge<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_int_gauge(name, help.into().unwrap_or(name)) else {
return;
};
self.register_metric(metric);
}
pub fn register_float_gauge<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_float_gauge(name, help.into().unwrap_or(name)) else {
return;
};
self.register_metric(metric);
}
pub fn register_int_counter<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_int_counter(name, help.into().unwrap_or(name)) else {
return;
};
self.register_metric(metric);
}
pub fn register_histogram<'a>(
&self,
name: &str,
help: impl Into<Option<&'a str>>,
buckets: Option<&[f64]>,
) {
let Some(metric) = Metric::new_histogram(name, help.into().unwrap_or(name), buckets) else {
return;
};
self.register_metric(metric);
}
pub fn set(&self, name: &str, value: i64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.set(value);
true
} else {
let gauge = match IntGauge::new(sanitize_metric_name(name), name) {
Ok(g) => g,
Err(e) => {
debug!("Failed to create gauge {:?}:\n{}", name, e);
return;
}
};
self.register_gauge(Box::new(gauge));
self.set(name, value)
false
}
}
pub fn inc(&self, name: &str) {
pub fn set_float(&self, name: &str, value: f64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.set_float(value);
true
} else {
false
}
}
pub fn add_to_histogram(&self, name: &str, value: f64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.add_histogram_observation(value);
true
} else {
false
}
}
pub fn start_timer(&self, name: &str) -> Option<HistogramTimer> {
self.registry_index
.get(name)
.and_then(|metric| metric.start_timer())
}
pub fn inc(&self, name: &str) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.inc();
true
} else {
let counter = match IntCounter::new(sanitize_metric_name(name), name) {
Ok(c) => c,
Err(e) => {
debug!("Failed to create counter {:?}:\n{}", name, e);
return;
}
};
self.register_counter(Box::new(counter));
self.inc(name)
false
}
}
pub fn inc_by(&self, name: &str, value: i64) {
pub fn inc_by(&self, name: &str, value: i64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.inc_by(value);
true
} else {
let counter = match IntCounter::new(sanitize_metric_name(name), name) {
Ok(c) => c,
Err(e) => {
debug!("Failed to create counter {:?}:\n{}", name, e);
return;
}
};
self.register_counter(Box::new(counter));
self.inc_by(name, value)
false
}
}
fn register_gauge(&self, metric: Box<IntGauge>) {
let fq_name = metric
.desc()
.first()
.map(|d| d.fq_name.clone())
.unwrap_or_default();
pub fn maybe_register_and_set<'a>(
&self,
name: &str,
value: i64,
help: impl Into<Option<&'a str>>,
) {
if !self.set(name, value) {
let help = help.into();
self.register_int_gauge(name, help);
self.set(name, value);
}
}
pub fn maybe_register_and_set_float<'a>(
&self,
name: &str,
value: f64,
help: impl Into<Option<&'a str>>,
) {
if !self.set_float(name, value) {
let help = help.into();
self.register_float_gauge(name, help);
self.set_float(name, value);
}
}
pub fn maybe_register_and_add_to_histogram<'a>(
&self,
name: &str,
value: f64,
buckets: Option<&[f64]>,
help: impl Into<Option<&'a str>>,
) {
if !self.add_to_histogram(name, value) {
let help = help.into();
self.register_histogram(name, help, buckets);
self.add_to_histogram(name, value);
}
}
pub fn maybe_register_and_inc<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
if !self.inc(name) {
let help = help.into();
self.register_int_counter(name, help);
self.inc(name);
}
}
pub fn maybe_register_and_inc_by<'a>(
&self,
name: &str,
value: i64,
help: impl Into<Option<&'a str>>,
) {
if !self.inc_by(name, value) {
let help = help.into();
self.register_int_counter(name, help);
self.inc_by(name, value);
}
}
pub fn register_metric(&self, metric: impl Into<Metric>) {
let m = metric.into();
let fq_name = m.fq_name();
if self.registry_index.contains_key(&fq_name) {
return;
}
match self.registry.register(metric.clone()) {
match self.registry.register(m.as_collector()) {
Ok(_) => {
self.registry_index
.insert(fq_name, Metric::G(metric.clone()));
self.registry_index.insert(fq_name, m);
}
Err(e) => {
debug!("Failed to register {:?}:\n{}", fq_name, e)
}
}
}
fn register_counter(&self, metric: Box<IntCounter>) {
let fq_name = metric
.desc()
.first()
.map(|d| d.fq_name.clone())
.unwrap_or_default();
if self.registry_index.contains_key(&fq_name) {
return;
}
match self.registry.register(metric.clone()) {
Ok(_) => {
self.registry_index
.insert(fq_name, Metric::C(metric.clone()));
}
Err(e) => {
debug!("Failed to register {:?}:\n{}", fq_name, e)
Err(err) => {
debug!("Failed to register '{fq_name}': {err}")
}
}
}
@@ -275,4 +573,15 @@ mod tests {
"packets_sent_34_242_65_133:1789"
)
}
#[test]
fn prepend_package_name() {
let literal = prepend_package_name!("foo");
assert_eq!(literal, "nym_metrics_foo");
let bar = "bar";
let format = format!("foomp_{}", bar);
let formatted = prepend_package_name!(format);
assert_eq!(formatted, "nym_metrics_foomp_bar");
}
}
@@ -115,10 +115,7 @@ pub fn aggregate_signatures(
let params = ecash_group_parameters();
// aggregate the signature
let signature = match Aggregatable::aggregate(signatures, indices) {
Ok(res) => res,
Err(err) => return Err(err),
};
let signature = Aggregatable::aggregate(signatures, indices)?;
// Ensure the aggregated signature is not an infinity point
if bool::from(signature.is_at_infinity()) {
+8 -1
View File
@@ -13,7 +13,11 @@ use crate::{
models::{CommitSignature, Validator},
},
};
use sqlx::{types::time::OffsetDateTime, ConnectOptions, Sqlite, Transaction};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
types::time::OffsetDateTime,
ConnectOptions, Sqlite, Transaction,
};
use std::{fmt::Debug, path::Path};
use tendermint::{
block::{Commit, CommitSig},
@@ -51,6 +55,9 @@ impl ScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(database_path: P) -> Result<Self, ScraperError> {
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -428,7 +428,7 @@ impl PacketStatisticsControl {
while self
.history
.front()
.map_or(false, |&(t, _)| t < recording_window)
.is_some_and(|&(t, _)| t < recording_window)
{
self.history.pop_front();
}
@@ -462,7 +462,7 @@ impl PacketStatisticsControl {
while self
.rates
.front()
.map_or(false, |&(t, _)| t < recording_window)
.is_some_and(|&(t, _)| t < recording_window)
{
self.rates.pop_front();
}
+8
View File
@@ -58,6 +58,10 @@ pub enum GatewaySessionEvent {
/// Address of the remote client opening the connection
client: DestinationAddressBytes,
},
SessionDelete {
/// Address of the remote client opening the connection
client: DestinationAddressBytes,
},
}
impl GatewaySessionEvent {
@@ -87,4 +91,8 @@ impl GatewaySessionEvent {
client,
}
}
pub fn new_session_delete(client: DestinationAddressBytes) -> GatewaySessionEvent {
GatewaySessionEvent::SessionDelete { client }
}
}
+1
View File
@@ -36,3 +36,4 @@ nym-gateway-storage = { path = "../gateway-storage" }
nym-network-defaults = { path = "../network-defaults" }
nym-task = { path = "../task" }
nym-wireguard-types = { path = "../wireguard-types" }
nym-node-metrics = { path = "../../nym-node/nym-node-metrics" }
+2
View File
@@ -85,6 +85,7 @@ pub struct WireguardData {
#[cfg(target_os = "linux")]
pub async fn start_wireguard(
storage: nym_gateway_storage::GatewayStorage,
metrics: nym_node_metrics::NymNodeMetrics,
all_peers: Vec<nym_gateway_storage::models::WireguardPeer>,
task_client: nym_task::TaskClient,
wireguard_data: WireguardData,
@@ -175,6 +176,7 @@ pub async fn start_wireguard(
let wg_api = std::sync::Arc::new(WgApiWrapper::new(wg_api));
let mut controller = PeerController::new(
storage,
metrics,
wg_api.clone(),
host,
peer_bandwidth_managers,
+61
View File
@@ -16,7 +16,9 @@ use nym_credential_verification::{
ClientBandwidth,
};
use nym_gateway_storage::GatewayStorage;
use nym_node_metrics::NymNodeMetrics;
use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK;
use std::time::{Duration, SystemTime};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
@@ -65,6 +67,11 @@ pub struct QueryBandwidthControlResponse {
pub struct PeerController {
storage: GatewayStorage,
// we have "all" metrics of a node, but they're behind a single Arc pointer,
// so the overhead is minimal
metrics: NymNodeMetrics,
// used to receive commands from individual handles too
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
@@ -76,8 +83,10 @@ pub struct PeerController {
}
impl PeerController {
#[allow(clippy::too_many_arguments)]
pub fn new(
storage: GatewayStorage,
metrics: NymNodeMetrics,
wg_api: Arc<WgApiWrapper>,
initial_host_information: Host,
bw_storage_managers: HashMap<Key, (Option<SharedBandwidthStorageManager>, Peer)>,
@@ -123,6 +132,7 @@ impl PeerController {
request_rx,
timeout_check_interval,
task_client,
metrics,
}
}
@@ -257,6 +267,55 @@ impl PeerController {
}))
}
async fn update_metrics(&self, new_host: &Host) {
let now = SystemTime::now();
const ACTIVITY_THRESHOLD: Duration = Duration::from_secs(60);
let old_host = self.host_information.read().await;
let total_peers = new_host.peers.len();
let mut active_peers = 0;
let mut new_rx = 0;
let mut new_tx = 0;
for (peer_key, peer) in new_host.peers.iter() {
// only consider pre-existing peers,
// so that the value would always be increasing
if let Some(prior) = old_host.peers.get(peer_key) {
let delta_rx = peer.rx_bytes.saturating_sub(prior.rx_bytes);
let delta_tx = prior.tx_bytes.saturating_sub(prior.tx_bytes);
new_rx += delta_rx;
new_tx += delta_tx;
}
// if a peer hasn't performed a handshake in last minute,
// I think it's reasonable to assume it's no longer active
let Some(last_handshake) = peer.last_handshake else {
continue;
};
let Ok(elapsed) = now.duration_since(last_handshake) else {
continue;
};
if elapsed < ACTIVITY_THRESHOLD {
active_peers += 1;
}
}
self.metrics.wireguard.update(
// if the conversion fails it means we're running not running on a 64bit system
// and that's a reason enough for this failure.
new_rx.try_into().expect(
"failed to convert bytes from u64 to usize - are you running on non 64bit system?",
),
new_tx.try_into().expect(
"failed to convert bytes from u64 to usize - are you running on non 64bit system?",
),
total_peers,
active_peers,
);
}
pub async fn run(&mut self) {
info!("started wireguard peer controller");
loop {
@@ -266,6 +325,8 @@ impl PeerController {
log::error!("Can't read wireguard kernel data");
continue;
};
self.update_metrics(&host).await;
*self.host_information.write().await = host;
}
_ = self.task_client.recv() => {
@@ -464,10 +464,7 @@ pub fn instantiate_contracts(
mixnet_contract_address.clone(),
&nym_mixnet_contract_common::MigrateMsg {
vesting_contract_address: Some(vesting_contract_address.to_string()),
current_nym_node_semver: "1.1.10".to_string(),
version_score_weights: Default::default(),
unsafe_skip_state_updates: Some(true),
version_score_params: Default::default(),
},
mixnet_code_id,
)
@@ -3470,43 +3470,13 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MigrateMsg",
"type": "object",
"required": [
"current_nym_node_semver"
],
"properties": {
"current_nym_node_semver": {
"type": "string"
},
"unsafe_skip_state_updates": {
"type": [
"boolean",
"null"
]
},
"version_score_params": {
"default": {
"penalty": "0.995",
"penalty_scaling": "1.65"
},
"allOf": [
{
"$ref": "#/definitions/VersionScoreFormulaParams"
}
]
},
"version_score_weights": {
"default": {
"major": 100,
"minor": 10,
"patch": 1,
"prerelease": 1
},
"allOf": [
{
"$ref": "#/definitions/OutdatedVersionWeights"
}
]
},
"vesting_contract_address": {
"type": [
"string",
@@ -3514,63 +3484,7 @@
]
}
},
"additionalProperties": false,
"definitions": {
"Decimal": {
"description": "A fixed-point decimal value with 18 fractional digits, i.e. Decimal(1_000_000_000_000_000_000) == 1.0\n\nThe greatest possible value that can be represented is 340282366920938463463.374607431768211455 (which is (2^128 - 1) / 10^18)",
"type": "string"
},
"OutdatedVersionWeights": {
"description": "Defines weights for calculating numbers of versions behind the current release.",
"type": "object",
"required": [
"major",
"minor",
"patch",
"prerelease"
],
"properties": {
"major": {
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
"minor": {
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
"patch": {
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
"prerelease": {
"type": "integer",
"format": "uint32",
"minimum": 0.0
}
},
"additionalProperties": false
},
"VersionScoreFormulaParams": {
"description": "Given the formula of version_score = penalty ^ (versions_behind_factor ^ penalty_scaling) define the relevant parameters",
"type": "object",
"required": [
"penalty",
"penalty_scaling"
],
"properties": {
"penalty": {
"$ref": "#/definitions/Decimal"
},
"penalty_scaling": {
"$ref": "#/definitions/Decimal"
}
},
"additionalProperties": false
}
}
"additionalProperties": false
},
"sudo": null,
"responses": {
+1 -87
View File
@@ -2,43 +2,13 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MigrateMsg",
"type": "object",
"required": [
"current_nym_node_semver"
],
"properties": {
"current_nym_node_semver": {
"type": "string"
},
"unsafe_skip_state_updates": {
"type": [
"boolean",
"null"
]
},
"version_score_params": {
"default": {
"penalty": "0.995",
"penalty_scaling": "1.65"
},
"allOf": [
{
"$ref": "#/definitions/VersionScoreFormulaParams"
}
]
},
"version_score_weights": {
"default": {
"major": 100,
"minor": 10,
"patch": 1,
"prerelease": 1
},
"allOf": [
{
"$ref": "#/definitions/OutdatedVersionWeights"
}
]
},
"vesting_contract_address": {
"type": [
"string",
@@ -46,61 +16,5 @@
]
}
},
"additionalProperties": false,
"definitions": {
"Decimal": {
"description": "A fixed-point decimal value with 18 fractional digits, i.e. Decimal(1_000_000_000_000_000_000) == 1.0\n\nThe greatest possible value that can be represented is 340282366920938463463.374607431768211455 (which is (2^128 - 1) / 10^18)",
"type": "string"
},
"OutdatedVersionWeights": {
"description": "Defines weights for calculating numbers of versions behind the current release.",
"type": "object",
"required": [
"major",
"minor",
"patch",
"prerelease"
],
"properties": {
"major": {
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
"minor": {
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
"patch": {
"type": "integer",
"format": "uint32",
"minimum": 0.0
},
"prerelease": {
"type": "integer",
"format": "uint32",
"minimum": 0.0
}
},
"additionalProperties": false
},
"VersionScoreFormulaParams": {
"description": "Given the formula of version_score = penalty ^ (versions_behind_factor ^ penalty_scaling) define the relevant parameters",
"type": "object",
"required": [
"penalty",
"penalty_scaling"
],
"properties": {
"penalty": {
"$ref": "#/definitions/Decimal"
},
"penalty_scaling": {
"$ref": "#/definitions/Decimal"
}
},
"additionalProperties": false
}
}
"additionalProperties": false
}
+2 -2
View File
@@ -603,7 +603,7 @@ pub fn query(
#[entry_point]
pub fn migrate(
mut deps: DepsMut<'_>,
env: Env,
_env: Env,
msg: MigrateMsg,
) -> Result<Response, MixnetContractError> {
set_build_information!(deps.storage)?;
@@ -612,7 +612,7 @@ pub fn migrate(
let skip_state_updates = msg.unsafe_skip_state_updates.unwrap_or(false);
if !skip_state_updates {
crate::queued_migrations::add_config_score_params(deps.branch(), env, &msg)?;
crate::queued_migrations::restore_node_version_history(deps.branch())?;
}
// due to circular dependency on contract addresses (i.e. mixnet contract requiring vesting contract address
@@ -34,9 +34,13 @@ impl NymNodeVersionHistory<'_> {
}
fn next_id(&self, storage: &mut dyn Storage) -> Result<u32, MixnetContractError> {
let next = self.id_counter.may_load(storage)?.unwrap_or_default();
self.id_counter.save(storage, &next)?;
Ok(next)
let id = self
.id_counter
.may_load(storage)?
.map(|current| current + 1)
.unwrap_or_default();
self.id_counter.save(storage, &id)?;
Ok(id)
}
pub fn current_version(
@@ -56,10 +60,10 @@ impl NymNodeVersionHistory<'_> {
pub fn insert_new(
&self,
storage: &mut dyn Storage,
entry: HistoricalNymNodeVersion,
entry: &HistoricalNymNodeVersion,
) -> Result<u32, MixnetContractError> {
let next_id = self.next_id(storage)?;
self.version_history.save(storage, next_id, &entry)?;
self.version_history.save(storage, next_id, entry)?;
Ok(next_id)
}
@@ -79,7 +83,7 @@ impl NymNodeVersionHistory<'_> {
// treat this as genesis
let genesis =
HistoricalNymNodeVersion::genesis(raw_semver.to_string(), env.block.height);
return self.insert_new(storage, genesis);
return self.insert_new(storage, &genesis);
};
let current_semver = current.version_information.semver_unchecked();
@@ -99,7 +103,7 @@ impl NymNodeVersionHistory<'_> {
introduced_at_height: env.block.height,
difference_since_genesis: diff,
};
self.insert_new(storage, entry)
self.insert_new(storage, &entry)
}
}
@@ -170,3 +174,218 @@ pub(crate) fn initialise_storage(
ADMIN.set(deps, Some(initial_admin))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(test)]
mod nym_node_version_history {
use super::*;
use crate::support::tests::test_helpers::TestSetup;
use cosmwasm_std::testing::{mock_dependencies, mock_env};
#[test]
fn getting_current() -> anyhow::Result<()> {
// empty storage
let deps = mock_dependencies();
let storage = NymNodeVersionHistory::new();
assert!(storage.current_version(&deps.storage)?.is_none());
let mut test = TestSetup::new();
let zeroth = storage.current_version(test.storage())?.unwrap();
let manual_zeroth = storage.version_history.load(test.storage(), 0)?;
assert_eq!(zeroth.version_information, manual_zeroth);
// manually update the counter to make sure data is still read correctly
let dummy = HistoricalNymNodeVersion {
semver: "1.2.3".to_string(),
introduced_at_height: 1234,
difference_since_genesis: Default::default(),
};
storage.id_counter.save(test.storage_mut(), &123)?;
storage
.version_history
.save(test.storage_mut(), 123, &dummy)?;
let updated = storage.current_version(test.storage())?.unwrap();
assert_eq!(updated.version_information, dummy);
Ok(())
}
#[test]
fn inserting_new_entry() -> anyhow::Result<()> {
let mut test = TestSetup::new();
let storage = NymNodeVersionHistory::new();
let first = HistoricalNymNodeVersion {
semver: "1.1.1".to_string(),
introduced_at_height: 12,
difference_since_genesis: Default::default(),
};
let second = HistoricalNymNodeVersion {
semver: "1.1.2".to_string(),
introduced_at_height: 123,
difference_since_genesis: Default::default(),
};
let third = HistoricalNymNodeVersion {
semver: "1.1.3".to_string(),
introduced_at_height: 1234,
difference_since_genesis: Default::default(),
};
assert_eq!(storage.id_counter.load(test.storage())?, 0);
// id is correctly incremented for each case and no entry is overwritten
storage.insert_new(test.storage_mut(), &first)?;
assert_eq!(storage.id_counter.load(test.storage())?, 1);
storage.insert_new(test.storage_mut(), &second)?;
assert_eq!(storage.id_counter.load(test.storage())?, 2);
storage.insert_new(test.storage_mut(), &third)?;
assert_eq!(storage.id_counter.load(test.storage())?, 3);
assert_eq!(storage.version_history.load(test.storage(), 1)?, first);
assert_eq!(storage.version_history.load(test.storage(), 2)?, second);
assert_eq!(storage.version_history.load(test.storage(), 3)?, third);
Ok(())
}
#[test]
fn inserting_initial_semver() -> anyhow::Result<()> {
// empty storage
let mut deps = mock_dependencies();
let env = mock_env();
let storage = NymNodeVersionHistory::new();
assert!(storage
.id_counter
.may_load(deps.as_mut().storage)?
.is_none());
storage.try_insert_new(deps.as_mut().storage, &env, "1.1.1")?;
assert_eq!(storage.id_counter.load(deps.as_mut().storage)?, 0);
assert_eq!(
storage
.version_history
.load(deps.as_ref().storage, 0)?
.semver,
"1.1.1"
);
assert_eq!(
storage
.current_version(deps.as_ref().storage)?
.unwrap()
.version_information
.semver,
"1.1.1"
);
Ok(())
}
#[test]
fn inserting_second_semver() -> anyhow::Result<()> {
let mut test = TestSetup::new();
let env = test.env();
let storage = NymNodeVersionHistory::new();
// lower version
assert!(storage
.try_insert_new(test.storage_mut(), &env, "1.1.9")
.is_err());
assert!(storage
.try_insert_new(test.storage_mut(), &env, "1.0.1")
.is_err());
// malformed
assert!(storage
.try_insert_new(test.storage_mut(), &env, "1.0")
.is_err());
assert!(storage
.try_insert_new(test.storage_mut(), &env, "1.0bad")
.is_err());
assert!(storage
.try_insert_new(test.storage_mut(), &env, "foomp")
.is_err());
// patch
let mut test = TestSetup::new();
storage.try_insert_new(test.storage_mut(), &env, "1.1.11")?;
let current = storage
.current_version(test.storage_mut())?
.unwrap()
.version_information;
assert_eq!(current.semver, "1.1.11");
assert_eq!(current.difference_since_genesis.major, 0);
assert_eq!(current.difference_since_genesis.minor, 0);
assert_eq!(current.difference_since_genesis.patch, 1);
assert_eq!(current.difference_since_genesis.prerelease, 0);
// minor
let mut test = TestSetup::new();
storage.try_insert_new(test.storage_mut(), &env, "1.2.0")?;
let current = storage
.current_version(test.storage_mut())?
.unwrap()
.version_information;
assert_eq!(current.semver, "1.2.0");
assert_eq!(current.difference_since_genesis.major, 0);
assert_eq!(current.difference_since_genesis.minor, 1);
assert_eq!(current.difference_since_genesis.patch, 0);
assert_eq!(current.difference_since_genesis.prerelease, 0);
// minor alt.
let mut test = TestSetup::new();
storage.try_insert_new(test.storage_mut(), &env, "1.2.3")?;
let current = storage
.current_version(test.storage_mut())?
.unwrap()
.version_information;
assert_eq!(current.semver, "1.2.3");
assert_eq!(current.difference_since_genesis.major, 0);
assert_eq!(current.difference_since_genesis.minor, 1);
assert_eq!(current.difference_since_genesis.patch, 0);
assert_eq!(current.difference_since_genesis.prerelease, 0);
Ok(())
}
#[test]
fn inserting_subsequent_semver() -> anyhow::Result<()> {
let mut test = TestSetup::new();
let env = test.env();
let storage = NymNodeVersionHistory::new();
storage.try_insert_new(test.storage_mut(), &env, "1.2.0")?;
storage.try_insert_new(test.storage_mut(), &env, "1.2.1")?;
storage.try_insert_new(test.storage_mut(), &env, "1.2.3")?;
let current = storage
.current_version(test.storage_mut())?
.unwrap()
.version_information;
assert_eq!(current.semver, "1.2.3");
assert_eq!(current.difference_since_genesis.major, 0);
assert_eq!(current.difference_since_genesis.minor, 1);
assert_eq!(current.difference_since_genesis.patch, 3);
assert_eq!(current.difference_since_genesis.prerelease, 0);
storage.try_insert_new(test.storage_mut(), &env, "1.3.0")?;
let current = storage
.current_version(test.storage_mut())?
.unwrap()
.version_information;
assert_eq!(current.semver, "1.3.0");
assert_eq!(current.difference_since_genesis.major, 0);
assert_eq!(current.difference_since_genesis.minor, 2);
assert_eq!(current.difference_since_genesis.patch, 3);
assert_eq!(current.difference_since_genesis.prerelease, 0);
Ok(())
}
}
}
@@ -238,4 +238,446 @@ pub mod tests {
// let res = try_update_contract_settings(deps.as_mut(), info, new_params);
// assert_eq!(Err(MixnetContractError::ZeroActiveSet), res);
}
#[cfg(test)]
mod updating_current_nym_node_semver {
use super::*;
use crate::mixnet_contract_settings::queries::query_current_nym_node_version;
use crate::support::tests::test_helpers::TestSetup;
#[test]
fn is_restricted_to_the_admin() -> anyhow::Result<()> {
let mut test = TestSetup::new();
let not_admin = mock_info("not-admin", &[]);
let admin = mock_info(test.admin().as_ref(), &[]);
let env = test.env();
let res = try_update_current_nym_node_semver(
test.deps_mut(),
env,
not_admin,
"1.2.1".to_string(),
);
assert!(res.is_err());
let env = test.env();
let res = try_update_current_nym_node_semver(
test.deps_mut(),
env,
admin,
"1.2.1".to_string(),
);
assert!(res.is_ok());
Ok(())
}
#[test]
fn updates_current_semver_value() -> anyhow::Result<()> {
let mut test = TestSetup::new();
let res = query_current_nym_node_version(test.deps())?;
let initial = res.version.unwrap().version_information.semver;
// sanity check to make sure our contract init hasn't changed
assert_eq!(initial, "1.1.10");
let update = "1.2.0".to_string();
let env = test.env();
let sender = test.admin_sender();
try_update_current_nym_node_semver(test.deps_mut(), env, sender, update.clone())?;
let updated = query_current_nym_node_version(test.deps())?;
let version = updated.version.unwrap().version_information.semver;
assert_eq!(version, update);
Ok(())
}
#[cfg(test)]
mod semver_chain_updates {
use super::*;
use crate::mixnet_contract_settings::queries::query_nym_node_version_history_paged;
use mixnet_contract_common::{
HistoricalNymNodeVersion, HistoricalNymNodeVersionEntry, TotalVersionDifference,
};
fn test_setup_with_initial_checks() -> anyhow::Result<TestSetup> {
let test = TestSetup::new();
let res = query_current_nym_node_version(test.deps())?;
let initial = res.version.unwrap().version_information.semver;
// sanity check to make sure our contract init hasn't changed
assert_eq!(initial, "1.1.10");
let history = query_nym_node_version_history_paged(test.deps(), None, None)?;
assert_eq!(history.history.len(), 1);
Ok(test)
}
#[test]
fn single_patch() -> anyhow::Result<()> {
let mut test = test_setup_with_initial_checks()?;
let initial = query_current_nym_node_version(test.deps())?
.version
.unwrap();
let env = test.env();
let sender = test.admin_sender();
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender,
"1.1.11".to_string(),
)?;
let history =
query_nym_node_version_history_paged(test.deps(), None, None)?.history;
assert_eq!(history.len(), 2);
assert_eq!(history[0], initial);
assert_eq!(
history[1],
HistoricalNymNodeVersionEntry {
id: 1,
version_information: HistoricalNymNodeVersion {
semver: "1.1.11".to_string(),
introduced_at_height: env.block.height,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 0,
patch: 1,
prerelease: 0,
},
},
}
);
Ok(())
}
#[test]
fn single_minor() -> anyhow::Result<()> {
let mut test = test_setup_with_initial_checks()?;
let initial = query_current_nym_node_version(test.deps())?
.version
.unwrap();
let env = test.env();
let sender = test.admin_sender();
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender,
"1.2.0".to_string(),
)?;
let history =
query_nym_node_version_history_paged(test.deps(), None, None)?.history;
assert_eq!(history.len(), 2);
assert_eq!(history[0], initial);
assert_eq!(
history[1],
HistoricalNymNodeVersionEntry {
id: 1,
version_information: HistoricalNymNodeVersion {
semver: "1.2.0".to_string(),
introduced_at_height: env.block.height,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 1,
patch: 0,
prerelease: 0,
},
},
}
);
Ok(())
}
#[test]
fn multiple_patches() -> anyhow::Result<()> {
let mut test = test_setup_with_initial_checks()?;
let initial = query_current_nym_node_version(test.deps())?
.version
.unwrap();
let mut env = test.env();
let sender = test.admin_sender();
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender.clone(),
"1.1.11".to_string(),
)?;
env.block.height += 1;
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender.clone(),
"1.1.12".to_string(),
)?;
env.block.height += 1;
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender,
"1.1.13".to_string(),
)?;
let history =
query_nym_node_version_history_paged(test.deps(), None, None)?.history;
assert_eq!(history.len(), 4);
assert_eq!(history[0], initial);
assert_eq!(
history[1],
HistoricalNymNodeVersionEntry {
id: 1,
version_information: HistoricalNymNodeVersion {
semver: "1.1.11".to_string(),
introduced_at_height: env.block.height - 2,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 0,
patch: 1,
prerelease: 0,
},
},
}
);
assert_eq!(
history[2],
HistoricalNymNodeVersionEntry {
id: 2,
version_information: HistoricalNymNodeVersion {
semver: "1.1.12".to_string(),
introduced_at_height: env.block.height - 1,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 0,
patch: 2,
prerelease: 0,
},
},
}
);
assert_eq!(
history[3],
HistoricalNymNodeVersionEntry {
id: 3,
version_information: HistoricalNymNodeVersion {
semver: "1.1.13".to_string(),
introduced_at_height: env.block.height,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 0,
patch: 3,
prerelease: 0,
},
},
}
);
Ok(())
}
#[test]
fn multiple_minors() -> anyhow::Result<()> {
let mut test = test_setup_with_initial_checks()?;
let initial = query_current_nym_node_version(test.deps())?
.version
.unwrap();
let mut env = test.env();
let sender = test.admin_sender();
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender.clone(),
"1.2.0".to_string(),
)?;
env.block.height += 1;
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender.clone(),
"1.3.0".to_string(),
)?;
env.block.height += 1;
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender,
"1.4.0".to_string(),
)?;
let history =
query_nym_node_version_history_paged(test.deps(), None, None)?.history;
assert_eq!(history.len(), 4);
assert_eq!(history[0], initial);
assert_eq!(
history[1],
HistoricalNymNodeVersionEntry {
id: 1,
version_information: HistoricalNymNodeVersion {
semver: "1.2.0".to_string(),
introduced_at_height: env.block.height - 2,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 1,
patch: 0,
prerelease: 0,
},
},
}
);
assert_eq!(
history[2],
HistoricalNymNodeVersionEntry {
id: 2,
version_information: HistoricalNymNodeVersion {
semver: "1.3.0".to_string(),
introduced_at_height: env.block.height - 1,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 2,
patch: 0,
prerelease: 0,
},
},
}
);
assert_eq!(
history[3],
HistoricalNymNodeVersionEntry {
id: 3,
version_information: HistoricalNymNodeVersion {
semver: "1.4.0".to_string(),
introduced_at_height: env.block.height,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 3,
patch: 0,
prerelease: 0,
},
},
}
);
Ok(())
}
#[test]
fn mixed_multiple_updates() -> anyhow::Result<()> {
let mut test = test_setup_with_initial_checks()?;
let initial = query_current_nym_node_version(test.deps())?
.version
.unwrap();
let mut env = test.env();
let sender = test.admin_sender();
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender.clone(),
"1.2.0".to_string(),
)?;
env.block.height += 1;
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender.clone(),
"1.2.1".to_string(),
)?;
env.block.height += 1;
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender.clone(),
"1.2.3".to_string(),
)?;
env.block.height += 1;
try_update_current_nym_node_semver(
test.deps_mut(),
env.clone(),
sender,
"1.3.0".to_string(),
)?;
let history =
query_nym_node_version_history_paged(test.deps(), None, None)?.history;
assert_eq!(history.len(), 5);
assert_eq!(history[0], initial);
assert_eq!(
history[1],
HistoricalNymNodeVersionEntry {
id: 1,
version_information: HistoricalNymNodeVersion {
semver: "1.2.0".to_string(),
introduced_at_height: env.block.height - 3,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 1,
patch: 0,
prerelease: 0,
},
},
}
);
assert_eq!(
history[2],
HistoricalNymNodeVersionEntry {
id: 2,
version_information: HistoricalNymNodeVersion {
semver: "1.2.1".to_string(),
introduced_at_height: env.block.height - 2,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 1,
patch: 1,
prerelease: 0,
},
},
}
);
assert_eq!(
history[3],
HistoricalNymNodeVersionEntry {
id: 3,
version_information: HistoricalNymNodeVersion {
semver: "1.2.3".to_string(),
introduced_at_height: env.block.height - 1,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 1,
patch: 3,
prerelease: 0,
},
},
}
);
assert_eq!(
history[4],
HistoricalNymNodeVersionEntry {
id: 4,
version_information: HistoricalNymNodeVersion {
semver: "1.3.0".to_string(),
introduced_at_height: env.block.height,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 2,
patch: 3,
prerelease: 0,
},
},
}
);
Ok(())
}
}
}
}
+135 -68
View File
@@ -1,86 +1,153 @@
// Copyright 2022-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
mod config_score_params {
use crate::constants::CONTRACT_STATE_KEY;
use crate::mixnet_contract_settings::storage as mixnet_params_storage;
mod node_version_history {
use crate::mixnet_contract_settings::storage::NymNodeVersionHistory;
use cosmwasm_std::{Addr, Coin, DepsMut, Env};
use cw_storage_plus::Item;
use cosmwasm_std::DepsMut;
use mixnet_contract_common::error::MixnetContractError;
use mixnet_contract_common::{
ConfigScoreParams, ContractState, ContractStateParams, DelegationsParams, MigrateMsg,
OperatingCostRange, OperatorsParams, ProfitMarginRange,
};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use mixnet_contract_common::{HistoricalNymNodeVersion, TotalVersionDifference};
pub(crate) fn add_config_score_params(
pub(crate) fn restore_node_version_history(
deps: DepsMut<'_>,
env: Env,
msg: &MigrateMsg,
) -> Result<(), MixnetContractError> {
if semver::Version::from_str(&msg.current_nym_node_semver).is_err() {
return Err(MixnetContractError::InvalidNymNodeSemver {
provided: msg.current_nym_node_semver.to_string(),
// sanity check:
let storage = NymNodeVersionHistory::new();
let Some(current) = storage.current_version(deps.storage)? else {
return Err(MixnetContractError::FailedMigration {
comment: "no node version history set".to_string(),
});
};
if current.version_information.semver != "1.2.1"
|| current.version_information.introduced_at_height != 15902170
{
return Err(MixnetContractError::FailedMigration {
comment: format!("unexpected current node version history. got: {current:?}"),
});
}
#[derive(Serialize, Deserialize)]
pub struct OldContractState {
pub owner: Option<Addr>,
pub rewarding_validator_address: Addr,
pub vesting_contract_address: Addr,
pub rewarding_denom: String,
pub params: OldContractStateParams,
}
#[derive(Serialize, Deserialize)]
pub struct OldContractStateParams {
pub minimum_delegation: Option<Coin>,
pub minimum_pledge: Coin,
#[serde(default)]
pub profit_margin: ProfitMarginRange,
#[serde(default)]
pub interval_operating_cost: OperatingCostRange,
}
const OLD_CONTRACT_STATE: Item<'_, OldContractState> = Item::new(CONTRACT_STATE_KEY);
let old_state = OLD_CONTRACT_STATE.load(deps.storage)?;
#[allow(deprecated)]
let new_state = ContractState {
owner: old_state.owner,
rewarding_validator_address: old_state.rewarding_validator_address,
vesting_contract_address: old_state.vesting_contract_address,
rewarding_denom: old_state.rewarding_denom,
params: ContractStateParams {
delegations_params: DelegationsParams {
minimum_delegation: old_state.params.minimum_delegation,
},
operators_params: OperatorsParams {
minimum_pledge: old_state.params.minimum_pledge,
profit_margin: old_state.params.profit_margin,
interval_operating_cost: old_state.params.interval_operating_cost,
},
config_score_params: ConfigScoreParams {
version_weights: msg.version_score_weights,
version_score_formula_params: msg.version_score_params,
},
},
let lost = HistoricalNymNodeVersion {
semver: "1.1.12".to_string(),
introduced_at_height: 15779133,
difference_since_genesis: TotalVersionDifference::default(),
};
mixnet_params_storage::CONTRACT_STATE.save(deps.storage, &new_state)?;
#[allow(clippy::unwrap_used)]
// SAFETY: this information was already stored in the contract, so it must be a valid semver
let difference_since_genesis = lost.cumulative_difference_since_genesis(
&current.version_information.semver.parse().unwrap(),
);
let updated_current = HistoricalNymNodeVersion {
semver: current.version_information.semver,
introduced_at_height: current.version_information.introduced_at_height,
difference_since_genesis,
};
// initialise the version chain
NymNodeVersionHistory::new().try_insert_new(
deps.storage,
&env,
&msg.current_nym_node_semver,
)?;
// restore overwritten entry for 1.1.12
storage.version_history.save(deps.storage, 0, &lost)?;
// re-insert 1.2.1 as the current
storage
.version_history
.save(deps.storage, 1, &updated_current)?;
storage.id_counter.save(deps.storage, &1)?;
Ok(())
}
}
pub(crate) use config_score_params::add_config_score_params;
pub(crate) use node_version_history::restore_node_version_history;
#[cfg(test)]
mod tests {
use super::*;
use crate::mixnet_contract_settings::queries::query_nym_node_version_history_paged;
use crate::mixnet_contract_settings::storage::NymNodeVersionHistory;
use cosmwasm_std::testing::{mock_dependencies, mock_env};
use mixnet_contract_common::{
HistoricalNymNodeVersion, HistoricalNymNodeVersionEntry, TotalVersionDifference,
};
#[test]
fn fixing_history_storage() -> anyhow::Result<()> {
// current state on mainnet:
let mut deps = mock_dependencies();
let storage = NymNodeVersionHistory::new();
storage.id_counter.save(deps.as_mut().storage, &0)?;
storage.version_history.save(
deps.as_mut().storage,
0,
&HistoricalNymNodeVersion {
semver: "1.2.1".to_string(),
introduced_at_height: 15902170,
difference_since_genesis: Default::default(),
},
)?;
// run migration
restore_node_version_history(deps.as_mut())?;
let current = storage.current_version(deps.as_ref().storage)?.unwrap();
assert_eq!(current.version_information.semver, "1.2.1");
assert_eq!(current.version_information.introduced_at_height, 15902170);
assert_eq!(
current.version_information.difference_since_genesis,
TotalVersionDifference {
major: 0,
minor: 1,
patch: 0,
prerelease: 0,
}
);
let history = query_nym_node_version_history_paged(deps.as_ref(), None, None)?.history;
assert_eq!(history.len(), 2);
assert_eq!(
history,
vec![
HistoricalNymNodeVersionEntry {
id: 0,
version_information: HistoricalNymNodeVersion {
semver: "1.1.12".to_string(),
introduced_at_height: 15779133,
difference_since_genesis: Default::default(),
},
},
HistoricalNymNodeVersionEntry {
id: 1,
version_information: HistoricalNymNodeVersion {
semver: "1.2.1".to_string(),
introduced_at_height: 15902170,
difference_since_genesis: TotalVersionDifference {
major: 0,
minor: 1,
patch: 0,
prerelease: 0,
},
},
}
]
);
let counter = storage.id_counter.load(deps.as_ref().storage)?;
assert_eq!(counter, 1);
// make sure adding another version doesn't mess anything up
storage.try_insert_new(deps.as_mut().storage, &mock_env(), "1.3.0")?;
let current = storage.current_version(deps.as_ref().storage)?.unwrap();
assert_eq!(current.version_information.semver, "1.3.0");
assert_eq!(
current.version_information.difference_since_genesis,
TotalVersionDifference {
major: 0,
minor: 2,
patch: 0,
prerelease: 0,
}
);
let counter = storage.id_counter.load(deps.as_ref().storage)?;
assert_eq!(counter, 2);
Ok(())
}
}
+17 -1
View File
@@ -18,7 +18,7 @@ pub mod test_helpers {
};
use crate::interval::{pending_events, storage as interval_storage};
use crate::mixnet_contract_settings::storage::{
self as mixnet_params_storage, minimum_node_pledge,
self as mixnet_params_storage, minimum_node_pledge, ADMIN,
};
use crate::mixnet_contract_settings::storage::{rewarding_denom, rewarding_validator_address};
use crate::mixnodes::helpers::get_mixnode_details_by_id;
@@ -318,6 +318,10 @@ pub mod test_helpers {
compare_decimals(mix_info.delegates, subtotal, Some(epsilon))
}
pub fn admin(&self) -> Addr {
ADMIN.get(self.deps()).unwrap().unwrap()
}
pub fn random_address(&mut self) -> String {
format!("n1foomp{}", self.rng.next_u64())
}
@@ -330,6 +334,14 @@ pub mod test_helpers {
self.deps.as_mut()
}
pub fn storage(&self) -> &dyn Storage {
self.deps().storage
}
pub fn storage_mut(&mut self) -> &mut dyn Storage {
self.deps_mut().storage
}
pub fn env(&self) -> Env {
self.env.clone()
}
@@ -470,6 +482,10 @@ pub mod test_helpers {
.unwrap()
}
pub fn admin_sender(&self) -> MessageInfo {
mock_info(self.admin().as_ref(), &[])
}
pub fn owner(&self) -> MessageInfo {
self.owner.clone()
}
@@ -1,7 +1,3 @@
---
description: Interactive APIs generated from the OpenAPI specs of various API endpoints offered by bits of Nym infrastructure run both by Nym and community operators for both Mainnet and the Sandbox testnet.
---
# Introduction
This site contains interactive APIs generated from the OpenAPI specs of various API endpoints offered by bits of Nym infrastructure run both by Nym and community operators for both Mainnet and the Sandbox testnet.
@@ -1,7 +1,3 @@
---
description: Nym's developer documentation covering core concepts of integrating with the Mixnet, interacting with the Nyx blockchain, an overview of the avaliable tools, and our SDK docs.
---
# Introduction
Nym's developer documentation covering core concepts of integrating with the Mixnet, interacting with the Nyx blockchain, an overview of the avaliable tools, and our SDK docs.
@@ -1,7 +1,3 @@
---
description: Nym's network documentation covering network architecture, node types, tokenomics, and cryptography.
---
# Introduction
Nym's network documentation covering network architecture, node types, tokenomics, and crypto systems.
@@ -1,7 +1,3 @@
---
description: Nym's Operators guide containing information and setup guides for the various components of Nym network and Nyx blockchain validators.
---
# Introduction
This is **Nym's Operators guide**, containing information and setup guides for the various components of Nym network and Nyx blockchain validators.
@@ -222,7 +222,7 @@ Good performance is much more essential than [total stake](../tokenomics.mdx#sta
For a comparison we made an example with 5 nodes, where first number is node performance and second stake saturation (assuming all of them `config_score` = 1 for simplification):
<br />
<AccordionTemplate name="✏️ Example: Performance ^ 20 \* total_stake calculation">
<AccordionTemplate name="✏️ Example: Performance ^ 20 * total_stake calculation">
> node_1 = 1.00 ^ 20 \* 1.0 = 1 <br />
> node_2 = 1.00 ^ 20 \* 0.5 = 0.5 <br />
> node_3 = 0.99 ^ 20 \* 1.0 = 0.818 <br />
+15 -2
View File
@@ -11,10 +11,23 @@ const config: DocsThemeConfig = {
const url = process.env.NEXT_PUBLIC_SITE_URL
const image = url + '/nym_logo.jpg'
// Define descriptions for different "books"
const bookDescriptions: Record<string, string> = {
'/developers': "Nym's developer documentation covering core concepts of integrating with the Mixnet, interacting with the Nyx blockchain, an overview of the avaliable tools, and our SDK docs.",
'/network': "Nym's network documentation covering network architecture, node types, tokenomics, and cryptography.",
'/operators': "Nym's Operators guide containing information and setup guides for the various components of Nym network and Nyx blockchain validators.",
'/apis': "Interactive APIs generated from the OpenAPI specs of various API endpoints offered by bits of Nym infrastructure run both by Nym and community operators for both Mainnet and the Sandbox testnet."
}
const defaultDescription = 'Nym is a privacy platform. It provides strong network-level privacy against sophisticated end-to-end attackers, and anonymous access control using blinded, re-randomizable, decentralized credentials.'
const topLevel = '/' + route.split('/')[1]
const description =
config.frontMatter.description ||
'Nym is a privacy platform. It provides strong network-level privacy against sophisticated end-to-end attackers, and anonymous access control using blinded, re-randomizable, decentralized credentials.'
const title = config.title + (route === '/' ? '' : ' - Nym docs')
bookDescriptions[topLevel] ||
defaultDescription
const title = config.title + (route === '/' ? '' : ' - Nym docs')
return (
<>
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "explorer-api"
version = "1.1.43"
version = "1.1.44"
edition = "2021"
license.workspace = true
+1 -3
View File
@@ -120,9 +120,7 @@ impl ThreadsafeGatewayCache {
.read()
.await
.get(&identity_key)
.map_or(false, |cache_item| {
cache_item.valid_until > SystemTime::now()
})
.is_some_and(|cache_item| cache_item.valid_until > SystemTime::now())
}
pub(crate) async fn get_locations(&self) -> GatewayLocationCache {
+1 -3
View File
@@ -106,9 +106,7 @@ impl ThreadsafeMixNodesCache {
.read()
.await
.get(&mix_id)
.map_or(false, |cache_item| {
cache_item.valid_until > SystemTime::now()
})
.is_some_and(|cache_item| cache_item.valid_until > SystemTime::now())
}
pub(crate) async fn get_locations(&self) -> MixnodeLocationCache {
+1 -3
View File
@@ -60,9 +60,7 @@ impl ThreadSafeNymNodesCache {
.read()
.await
.get(&node_id)
.map_or(false, |cache_item| {
cache_item.valid_until > SystemTime::now()
})
.is_some_and(|cache_item| cache_item.valid_until > SystemTime::now())
}
pub(crate) async fn get_bonded_nymnodes(
@@ -20,6 +20,7 @@ import { NYM_WEBSITE } from '@/app/api/constants'
import { useMainContext } from '@/app/context/main'
import { MobileDrawerClose } from '@/app/icons/MobileDrawerClose'
import { NavOptionType, originalNavOptions } from '@/app/context/nav'
import { ReleaseAlert } from '@/app/components/ReleaseAlert'
import { DarkLightSwitchDesktop } from '@/app/components/Switch'
import { Footer } from '@/app/components/Footer'
import { ConnectKeplrWallet } from '@/app/components/Wallet/ConnectKeplrWallet'
@@ -369,6 +370,7 @@ export const Nav: FCWithChildren = ({ children }) => {
style={{ width: `calc(100% - ${drawerWidth}px` }}
sx={{ py: 5, px: 6, mt: 7 }}
>
<ReleaseAlert />
{children}
<Footer />
</Box>
@@ -22,6 +22,7 @@ import { ExpandableButton } from './DesktopNav'
import { ConnectKeplrWallet } from '../Wallet/ConnectKeplrWallet'
import { NetworkTitle } from '../NetworkTitle'
import { originalNavOptions } from '@/app/context/nav'
import { ReleaseAlert } from '@/app/components/ReleaseAlert'
import {SearchToolbar} from "@/app/components/Nav/Search";
export const MobileNav: FCWithChildren = ({ children }) => {
@@ -137,6 +138,7 @@ export const MobileNav: FCWithChildren = ({ children }) => {
</Drawer>
<Box sx={{ width: '100%', p: 4, mt: 7 }}>
<ReleaseAlert />
{children}
<Footer />
</Box>
@@ -0,0 +1,9 @@
import { Alert, Box, Typography } from '@mui/material';
export const ReleaseAlert = () => (
<Alert severity="warning" sx={{ mb: 3, fontSize: 'medium', width: '100%' }}>
<Box>
<Typography>You are now viewing the legacy Nym mixnet explorer. Explorer 2.0 is coming soon.</Typography>
</Box>
</Alert>
);
@@ -163,4 +163,11 @@ impl ActiveClientsStore {
pub(crate) fn size(&self) -> usize {
self.inner.len()
}
pub fn pending_packets(&self) -> usize {
self.inner
.iter()
.map(|client| client.get_sender_ref().len())
.sum()
}
}
@@ -7,6 +7,7 @@ use nym_crypto::asymmetric::identity;
use nym_gateway_storage::GatewayStorage;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_node_metrics::events::MetricEventsSender;
use nym_node_metrics::NymNodeMetrics;
use std::sync::Arc;
// I can see this being possible expanded with say storage or client store
@@ -17,7 +18,14 @@ pub(crate) struct CommonHandlerState {
pub(crate) local_identity: Arc<identity::KeyPair>,
pub(crate) only_coconut_credentials: bool,
pub(crate) bandwidth_cfg: BandwidthFlushingBehaviourConfig,
pub(crate) metrics: NymNodeMetrics,
pub(crate) metrics_sender: MetricEventsSender,
pub(crate) outbound_mix_sender: MixForwardingSender,
pub(crate) active_clients_store: ActiveClientsStore,
}
impl CommonHandlerState {
pub(crate) fn storage(&self) -> &GatewayStorage {
&self.storage
}
}
@@ -157,6 +157,10 @@ impl<R, S> Drop for AuthenticatedHandler<R, S> {
}
impl<R, S> AuthenticatedHandler<R, S> {
pub(crate) fn inner(&self) -> &FreshHandler<R, S> {
&self.inner
}
/// Upgrades `FreshHandler` into the Authenticated variant implying the client is now authenticated
/// and thus allowed to perform more actions with the gateway, such as redeeming bandwidth or
/// sending sphinx packets.
@@ -327,6 +331,24 @@ impl<R, S> AuthenticatedHandler<R, S> {
}
}
async fn handle_forget_me(
&mut self,
client: bool,
stats: bool,
) -> Result<ServerResponse, RequestHandlingError> {
if client {
self.inner()
.shared_state()
.storage()
.handle_forget_me(self.client.address)
.await?;
}
if stats {
self.send_metrics(GatewaySessionEvent::new_session_delete(self.client.address));
}
Ok(SensitiveServerResponse::ForgetMeAck {}.encrypt(&self.client.shared_keys)?)
}
async fn handle_key_upgrade(
&mut self,
hkdf_salt: Vec<u8>,
@@ -370,6 +392,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
hkdf_salt,
derived_key_digest,
} => self.handle_key_upgrade(hkdf_salt, derived_key_digest).await,
ClientRequest::ForgetMe { client, stats } => self.handle_forget_me(client, stats).await,
_ => Err(RequestHandlingError::UnknownEncryptedTextRequest),
}
}
@@ -114,6 +114,10 @@ pub(crate) struct FreshHandler<R, S> {
}
impl<R, S> FreshHandler<R, S> {
pub(crate) fn shared_state(&self) -> &CommonHandlerState {
&self.shared_state
}
// for time being we assume handle is always constructed from raw socket.
// if we decide we want to change it, that's not too difficult
// also at this point I'm not entirely sure how to deal with this warning without
@@ -61,7 +61,13 @@ impl Listener {
remote_addr,
shutdown,
);
tokio::spawn(handle.start_handling());
tokio::spawn(async move {
// TODO: refactor it similarly to the mixnet listener on the nym-node
let metrics_ref = handle.shared_state.metrics.clone();
metrics_ref.network.new_ingress_websocket_client();
handle.start_handling().await;
metrics_ref.network.disconnected_ingress_websocket_client();
});
}
Err(err) => warn!("failed to get client: {err}"),
}
+9
View File
@@ -37,6 +37,7 @@ mod internal_service_providers;
pub use client_handling::active_clients::ActiveClientsStore;
pub use nym_gateway_stats_storage::PersistentStatsStorage;
pub use nym_gateway_storage::{error::GatewayStorageError, GatewayStorage};
use nym_node_metrics::NymNodeMetrics;
pub use nym_sdk::{NymApiTopologyProvider, NymApiTopologyProviderConfig, UserAgent};
#[derive(Debug, Clone)]
@@ -81,6 +82,8 @@ pub struct GatewayTasksBuilder {
metrics_sender: MetricEventsSender,
metrics: NymNodeMetrics,
mnemonic: Arc<Zeroizing<bip39::Mnemonic>>,
shutdown: TaskClient,
@@ -102,12 +105,14 @@ impl Drop for GatewayTasksBuilder {
}
impl GatewayTasksBuilder {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: Config,
identity: Arc<ed25519::KeyPair>,
storage: GatewayStorage,
mix_packet_sender: MixForwardingSender,
metrics_sender: MetricEventsSender,
metrics: NymNodeMetrics,
mnemonic: Arc<Zeroizing<bip39::Mnemonic>>,
shutdown: TaskClient,
) -> GatewayTasksBuilder {
@@ -121,6 +126,7 @@ impl GatewayTasksBuilder {
storage,
mix_packet_sender,
metrics_sender,
metrics,
mnemonic,
shutdown,
ecash_manager: None,
@@ -246,6 +252,7 @@ impl GatewayTasksBuilder {
local_identity: Arc::clone(&self.identity_keypair),
only_coconut_credentials: self.config.gateway.enforce_zk_nyms,
bandwidth_cfg: (&self.config).into(),
metrics: self.metrics.clone(),
metrics_sender: self.metrics_sender.clone(),
outbound_mix_sender: self.mix_packet_sender.clone(),
active_clients_store: active_clients_store.clone(),
@@ -443,6 +450,7 @@ impl GatewayTasksBuilder {
pub async fn try_start_wireguard(
&mut self,
) -> Result<Arc<nym_wireguard::WgApiWrapper>, Box<dyn std::error::Error + Send + Sync>> {
let _ = self.metrics.clone();
unimplemented!("wireguard is not supported on this platform")
}
@@ -460,6 +468,7 @@ impl GatewayTasksBuilder {
let wg_handle = nym_wireguard::start_wireguard(
self.storage.clone(),
self.metrics.clone(),
all_peers,
self.shutdown.fork("wireguard"),
wireguard_data,
+1 -1
View File
@@ -4,7 +4,7 @@
[package]
name = "nym-api"
license = "GPL-3.0"
version = "1.1.47"
version = "1.1.48"
authors.workspace = true
edition = "2021"
rust-version.workspace = true
@@ -183,6 +183,8 @@ impl PacketSender {
gateway_packet_router,
Some(fresh_gateway_client_data.bandwidth_controller.clone()),
nym_statistics_common::clients::ClientStatsSender::new(None),
#[cfg(unix)]
None,
task_client,
);
+50 -15
View File
@@ -12,6 +12,7 @@ use futures::{stream, StreamExt};
use nym_api_requests::legacy::{LegacyGatewayBondWithId, LegacyMixNodeDetailsWithLayer};
use nym_api_requests::models::{DescribedNodeType, NymNodeData, NymNodeDescription};
use nym_config::defaults::DEFAULT_NYM_NODE_HTTP_PORT;
use nym_crypto::asymmetric::ed25519;
use nym_mixnet_contract_common::{LegacyMixLayer, NodeId, NymNodeDetails};
use nym_node_requests::api::client::{NymNodeApiClientError, NymNodeApiClientExt};
use nym_topology::gateway::GatewayConversionError;
@@ -58,6 +59,13 @@ pub enum NodeDescribeCacheError {
#[error("could not verify signed host information for node {node_id}")]
MissignedHostInformation { node_id: NodeId },
#[error("identity of node {node_id} does not match. expected {expected} but got {got}")]
MismatchedIdentity {
node_id: NodeId,
expected: String,
got: String,
},
#[error("node {node_id} is announcing an illegal ip address")]
IllegalIpAddress { node_id: NodeId },
}
@@ -289,6 +297,15 @@ async fn try_get_description(
let host_info = client.get_host_information().await.map_err(map_query_err)?;
// check if the identity key matches the information provided during bonding
if data.expected_identity != host_info.keys.ed25519_identity {
return Err(NodeDescribeCacheError::MismatchedIdentity {
node_id: data.node_id,
expected: data.expected_identity.to_base58_string(),
got: host_info.keys.ed25519_identity.to_base58_string(),
});
}
if !host_info.verify_host_information() {
return Err(NodeDescribeCacheError::MissignedHostInformation {
node_id: data.node_id,
@@ -315,47 +332,58 @@ async fn try_get_description(
pub(crate) struct RefreshData {
host: String,
node_id: NodeId,
expected_identity: ed25519::PublicKey,
node_type: DescribedNodeType,
port: Option<u16>,
}
impl<'a> From<&'a LegacyMixNodeDetailsWithLayer> for RefreshData {
fn from(node: &'a LegacyMixNodeDetailsWithLayer) -> Self {
RefreshData::new(
impl<'a> TryFrom<&'a LegacyMixNodeDetailsWithLayer> for RefreshData {
type Error = ed25519::Ed25519RecoveryError;
fn try_from(node: &'a LegacyMixNodeDetailsWithLayer) -> Result<Self, Self::Error> {
Ok(RefreshData::new(
&node.bond_information.mix_node.host,
node.bond_information.identity().parse()?,
DescribedNodeType::LegacyMixnode,
node.mix_id(),
Some(node.bond_information.mix_node.http_api_port),
)
))
}
}
impl<'a> From<&'a LegacyGatewayBondWithId> for RefreshData {
fn from(node: &'a LegacyGatewayBondWithId) -> Self {
RefreshData::new(
impl<'a> TryFrom<&'a LegacyGatewayBondWithId> for RefreshData {
type Error = ed25519::Ed25519RecoveryError;
fn try_from(node: &'a LegacyGatewayBondWithId) -> Result<Self, Self::Error> {
Ok(RefreshData::new(
&node.bond.gateway.host,
node.bond.identity().parse()?,
DescribedNodeType::LegacyGateway,
node.node_id,
None,
)
))
}
}
impl<'a> From<&'a NymNodeDetails> for RefreshData {
fn from(node: &'a NymNodeDetails) -> Self {
RefreshData::new(
impl<'a> TryFrom<&'a NymNodeDetails> for RefreshData {
type Error = ed25519::Ed25519RecoveryError;
fn try_from(node: &'a NymNodeDetails) -> Result<Self, Self::Error> {
Ok(RefreshData::new(
&node.bond_information.node.host,
node.bond_information.identity().parse()?,
DescribedNodeType::NymNode,
node.node_id(),
node.bond_information.node.custom_http_port,
)
))
}
}
impl RefreshData {
pub fn new(
host: impl Into<String>,
expected_identity: ed25519::PublicKey,
node_type: DescribedNodeType,
node_id: NodeId,
port: Option<u16>,
@@ -363,6 +391,7 @@ impl RefreshData {
RefreshData {
host: host.into(),
node_id,
expected_identity,
node_type,
port,
}
@@ -404,7 +433,9 @@ impl CacheItemProvider for NodeDescriptionProvider {
None => error!("failed to obtain mixnodes information from the cache"),
Some(legacy_mixnodes) => {
for node in &**legacy_mixnodes {
nodes_to_query.push(node.into())
if let Ok(data) = node.try_into() {
nodes_to_query.push(data);
}
}
}
}
@@ -413,7 +444,9 @@ impl CacheItemProvider for NodeDescriptionProvider {
None => error!("failed to obtain gateways information from the cache"),
Some(legacy_gateways) => {
for node in &**legacy_gateways {
nodes_to_query.push(node.into())
if let Ok(data) = node.try_into() {
nodes_to_query.push(data);
}
}
}
}
@@ -422,7 +455,9 @@ impl CacheItemProvider for NodeDescriptionProvider {
None => error!("failed to obtain nym-nodes information from the cache"),
Some(nym_nodes) => {
for node in &**nym_nodes {
nodes_to_query.push(node.into())
if let Ok(data) = node.try_into() {
nodes_to_query.push(data);
}
}
}
}
+3 -3
View File
@@ -385,7 +385,7 @@ impl NymContractCache {
.iter()
.find(|n| n.bond_information.identity() == encoded_identity)
{
return Some(nym_node.into());
return nym_node.try_into().ok();
}
// 2. check legacy mixnodes
@@ -394,7 +394,7 @@ impl NymContractCache {
.iter()
.find(|n| n.bond_information.identity() == encoded_identity)
{
return Some(mixnode.into());
return mixnode.try_into().ok();
}
// 3. check legacy gateways
@@ -403,7 +403,7 @@ impl NymContractCache {
.iter()
.find(|n| n.identity() == &encoded_identity)
{
return Some(gateway.into());
return gateway.try_into().ok();
}
None
+4
View File
@@ -18,6 +18,7 @@ use crate::support::storage::models::{
use dashmap::DashMap;
use nym_mixnet_contract_common::NodeId;
use nym_types::monitoring::NodeResult;
use sqlx::sqlite::{SqliteAutoVacuum, SqliteSynchronous};
use sqlx::ConnectOptions;
use std::path::Path;
use std::sync::Arc;
@@ -67,6 +68,9 @@ impl NymApiStorage {
// TODO: we can inject here more stuff based on our nym-api global config
// struct. Maybe different pool size or timeout intervals?
let connect_opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.log_statements(LevelFilter::Trace)
@@ -1,8 +1,22 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_bin_common::bin_info;
use time::OffsetDateTime;
use tracing::{debug, info, warn};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::{
cli::Cli,
deposit_maker::DepositMaker,
error::VpnApiError,
http::{
state::{ApiState, ChainClient},
HttpServer,
},
storage::VpnApiStorage,
tasks::StoragePruner,
};
pub struct LockTimer {
created: OffsetDateTime,
@@ -40,3 +54,88 @@ impl Default for LockTimer {
}
}
}
pub async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
// if we fail to setup the signals, we should just blow up
#[allow(clippy::expect_used)]
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
#[allow(clippy::expect_used)]
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Received SIGINT");
},
_ = sigterm.recv() => {
info!("Received SIGTERM");
}
_ = sigquit.recv() => {
info!("Received SIGQUIT");
}
}
}
fn build_sha_short() -> &'static str {
let bin_info = bin_info!();
if bin_info.commit_sha.len() < 7 {
panic!("unavailable build commit sha")
}
if bin_info.commit_sha == "VERGEN_IDEMPOTENT_OUTPUT" {
error!("the binary hasn't been built correctly. it doesn't have a commit sha information");
return "unknown";
}
&bin_info.commit_sha[..7]
}
pub(crate) async fn run_api(cli: Cli) -> Result<(), VpnApiError> {
// create the tasks
let bind_address = cli.bind_address();
let storage = VpnApiStorage::init(cli.persistent_storage_path()).await?;
let mnemonic = cli.mnemonic;
let auth_token = cli.http_auth_token;
let webhook_cfg = cli.webhook;
let chain_client = ChainClient::new(mnemonic)?;
let cancellation_token = CancellationToken::new();
let deposit_maker = DepositMaker::new(
build_sha_short(),
chain_client.clone(),
cli.max_concurrent_deposits,
cancellation_token.clone(),
);
let deposit_request_sender = deposit_maker.deposit_request_sender();
let api_state = ApiState::new(
storage.clone(),
webhook_cfg,
chain_client,
deposit_request_sender,
cancellation_token.clone(),
)
.await?;
let http_server = HttpServer::new(
bind_address,
api_state.clone(),
auth_token,
cancellation_token.clone(),
);
let storage_pruner = StoragePruner::new(cancellation_token, storage);
// spawn all the tasks
api_state.try_spawn(http_server.run_forever());
api_state.try_spawn(storage_pruner.run_forever());
api_state.try_spawn(deposit_maker.run_forever());
// wait for cancel signal (SIGINT, SIGTERM or SIGQUIT)
wait_for_signal().await;
// cancel all the tasks and wait for all task to terminate
api_state.cancel_and_wait().await;
Ok(())
}
@@ -710,7 +710,7 @@ pub(crate) struct ChainWritePermit<'a> {
inner: RwLockWriteGuard<'a, DirectSigningHttpRpcNyxdClient>,
}
impl<'a> ChainWritePermit<'a> {
impl ChainWritePermit<'_> {
pub(crate) async fn make_deposits(
self,
short_sha: &'static str,
@@ -6,117 +6,30 @@
#![warn(clippy::todo)]
#![warn(clippy::dbg_macro)]
use crate::cli::Cli;
use crate::deposit_maker::DepositMaker;
use crate::error::VpnApiError;
use crate::http::state::{ApiState, ChainClient};
use crate::http::HttpServer;
use crate::storage::VpnApiStorage;
use crate::tasks::StoragePruner;
use clap::Parser;
use nym_bin_common::logging::setup_tracing_logger;
use nym_bin_common::{bin_info, bin_info_owned};
use nym_network_defaults::setup_env;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, trace};
cfg_if::cfg_if! {
if #[cfg(unix)] {
use crate::cli::Cli;
use clap::Parser;
use nym_bin_common::bin_info_owned;
use nym_bin_common::logging::setup_tracing_logger;
use nym_network_defaults::setup_env;
use tracing::{info, trace};
pub mod cli;
pub mod config;
pub mod credentials;
mod deposit_maker;
pub mod error;
pub mod helpers;
pub mod http;
pub mod nym_api_helpers;
pub mod storage;
pub mod tasks;
mod webhook;
pub async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
// if we fail to setup the signals, we should just blow up
#[allow(clippy::expect_used)]
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
#[allow(clippy::expect_used)]
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Received SIGINT");
},
_ = sigterm.recv() => {
info!("Received SIGTERM");
}
_ = sigquit.recv() => {
info!("Received SIGQUIT");
}
pub mod cli;
pub mod config;
pub mod credentials;
mod deposit_maker;
pub mod error;
pub mod helpers;
pub mod http;
pub mod nym_api_helpers;
pub mod storage;
pub mod tasks;
mod webhook;
}
}
fn build_sha_short() -> &'static str {
let bin_info = bin_info!();
if bin_info.commit_sha.len() < 7 {
panic!("unavailable build commit sha")
}
if bin_info.commit_sha == "VERGEN_IDEMPOTENT_OUTPUT" {
error!("the binary hasn't been built correctly. it doesn't have a commit sha information");
return "unknown";
}
&bin_info.commit_sha[..7]
}
async fn run_api(cli: Cli) -> Result<(), VpnApiError> {
// create the tasks
let bind_address = cli.bind_address();
let storage = VpnApiStorage::init(cli.persistent_storage_path()).await?;
let mnemonic = cli.mnemonic;
let auth_token = cli.http_auth_token;
let webhook_cfg = cli.webhook;
let chain_client = ChainClient::new(mnemonic)?;
let cancellation_token = CancellationToken::new();
let deposit_maker = DepositMaker::new(
build_sha_short(),
chain_client.clone(),
cli.max_concurrent_deposits,
cancellation_token.clone(),
);
let deposit_request_sender = deposit_maker.deposit_request_sender();
let api_state = ApiState::new(
storage.clone(),
webhook_cfg,
chain_client,
deposit_request_sender,
cancellation_token.clone(),
)
.await?;
let http_server = HttpServer::new(
bind_address,
api_state.clone(),
auth_token,
cancellation_token.clone(),
);
let storage_pruner = StoragePruner::new(cancellation_token, storage);
// spawn all the tasks
api_state.try_spawn(http_server.run_forever());
api_state.try_spawn(storage_pruner.run_forever());
api_state.try_spawn(deposit_maker.run_forever());
// wait for cancel signal (SIGINT, SIGTERM or SIGQUIT)
wait_for_signal().await;
// cancel all the tasks and wait for all task to terminate
api_state.cancel_and_wait().await;
Ok(())
}
#[cfg(unix)]
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// std::env::set_var(
@@ -134,6 +47,13 @@ async fn main() -> anyhow::Result<()> {
let bin_info = bin_info_owned!();
info!("using the following version: {bin_info}");
run_api(cli).await?;
helpers::run_api(cli).await?;
Ok(())
}
#[cfg(not(unix))]
#[tokio::main]
async fn main() -> anyhow::Result<()> {
eprintln!("This tool is only supported on Unix systems");
std::process::exit(1)
}
@@ -16,6 +16,7 @@ use nym_validator_client::ecash::BlindedSignatureResponse;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::ecash_query_client::DepositId;
use nym_validator_client::nyxd::Coin;
use sqlx::sqlite::{SqliteAutoVacuum, SqliteSynchronous};
use sqlx::ConnectOptions;
use std::fmt::Debug;
use std::path::Path;
@@ -40,6 +41,9 @@ impl VpnApiStorage {
debug!("Attempting to connect to database");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.log_statements(LevelFilter::Trace)
+2
View File
@@ -27,12 +27,14 @@ tokio = { workspace = true, features = ["macros", "time"] }
tokio-util = { workspace = true }
utoipa = { workspace = true, features = ["axum_extras"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
tokio-postgres = "0.7"
# internal
nym-bin-common = { path = "../common/bin-common" }
nym-client-core = { path = "../common/client-core" }
nym-crypto = { path = "../common/crypto" }
nym-network-defaults = { path = "../common/network-defaults" }
nym-gateway-requests = { path = "../common/gateway-requests" }
nym-sdk = { path = "../sdk/rust/nym-sdk" }
nym-sphinx = { path = "../common/nymsphinx" }
nym-topology = { path = "../common/topology" }
+3 -2
View File
@@ -9,13 +9,14 @@ network=${NYM_NETWORK:-mainnet}
timeout=${LOCUST_TIMEOUT:-600}
users=${LOCUST_USERS:-10}
processes=${LOCUST_PROCESSES:-4}
_database_url=${DATABASE_URL}
RUST_LOG=info nym-network-monitor --env envs/"${network}".env --private-key "${_private_key}" &
RUST_LOG=info nym-network-monitor --env envs/"${network}".env --private-key "${_private_key}" --database-url "${_database_url}" &
nnm_pid=$!
sleep 10
python -m locust -H http://${NYM_NETWORK_MONITOR_HOST}:${NYM_NETWORK_MONITOR_PORT} --processes "${processes}" --autostart --autoquit 60 -u "${users}" -t "${timeout}"s &
python -m locust -H http://"${NYM_NETWORK_MONITOR_HOST}":"${NYM_NETWORK_MONITOR_PORT}" --processes "${processes}" --autostart --autoquit 60 -u "${users}" -t "${timeout}"s &
locust_pid=$!
wait $locust_pid
+123 -42
View File
@@ -1,7 +1,10 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use anyhow::Result;
use futures::{stream::FuturesUnordered, StreamExt};
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
use log::{debug, info};
use nym_sphinx::chunking::{monitoring, SentFragment};
use nym_topology::{gateway, mix, NymTopology};
@@ -10,6 +13,7 @@ use nym_validator_client::nym_api::routes::{API_VERSION, STATUS, SUBMIT_GATEWAY,
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use serde::{Deserialize, Serialize};
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type, Client};
use utoipa::ToSchema;
use crate::{NYM_API_URL, PRIVATE_KEY, TOPOLOGY};
@@ -23,20 +27,20 @@ struct HydratedRoute {
struct GatewayStats(u32, u32);
impl GatewayStats {
fn new(sent: u32, recv: u32) -> Self {
GatewayStats(sent, recv)
fn new(success: u32, failure: u32) -> Self {
GatewayStats(success, failure)
}
fn success(&self) -> u32 {
self.0
}
fn failed(&self) -> u32 {
fn failure(&self) -> u32 {
self.1
}
fn reliability(&self) -> f64 {
self.success() as f64 / (self.success() + self.failed()) as f64
self.success() as f64 / (self.success() + self.failure()) as f64
}
fn incr_success(&mut self) {
@@ -321,48 +325,125 @@ pub async fn monitor_mixnode_results() -> anyhow::Result<Vec<NodeResult>> {
.collect())
}
pub async fn submit_metrics() -> anyhow::Result<()> {
let node_stats = monitor_mixnode_results().await?;
let gateway_stats = monitor_gateway_results().await?;
async fn submit_node_stats_to_db(client: Arc<Client>) -> anyhow::Result<()> {
let client = Arc::clone(&client);
let node_stats = all_node_stats().await?;
info!("Submitting metrics to {}", *NYM_API_URL);
let client = reqwest::Client::new();
let sink = client
.copy_in("COPY node_stats (node_id, identity, reliability, complete_routes, incomplete_routes) FROM STDIN BINARY")
.await?;
let node_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_NODE}", &*NYM_API_URL);
let gateway_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_GATEWAY}", &*NYM_API_URL);
let writer = BinaryCopyInWriter::new(
sink,
&[Type::INT4, Type::TEXT, Type::FLOAT8, Type::INT8, Type::INT8],
);
pin_mut!(writer);
info!("Submitting {} mixnode measurements", node_stats.len());
for stat in node_stats {
writer
.as_mut()
.write(&[
&(stat.mix_id as i32),
&stat.identity,
&stat.reliability,
&(stat.complete_routes as i64),
&(stat.incomplete_routes as i64),
])
.await?;
}
node_stats
.chunks(10)
.map(|chunk| {
let monitor_message =
MonitorMessage::new(chunk.to_vec(), PRIVATE_KEY.get().expect("We've set this!"));
client.post(&node_submit_url).json(&monitor_message).send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
writer.finish().await?;
info!("Submitting {} gateway measurements", gateway_stats.len());
Ok(())
}
gateway_stats
.chunks(10)
.map(|chunk| {
let monitor_message =
MonitorMessage::new(chunk.to_vec(), PRIVATE_KEY.get().expect("We've set this!"));
client
.post(&gateway_submit_url)
.json(&monitor_message)
.send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
async fn submit_gateway_stats_to_db(client: Arc<Client>) -> anyhow::Result<()> {
let client = Arc::clone(&client);
let network_account = NetworkAccount::finalize()?;
let gateway_stats = network_account.gateway_stats;
let sink = client
.copy_in("COPY gateway_stats (identity, reliability, success, failure) FROM STDIN BINARY")
.await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT, Type::FLOAT8, Type::INT8, Type::INT8]);
pin_mut!(writer);
for (key, stats) in gateway_stats {
writer
.as_mut()
.write(&[
&key,
&stats.reliability(),
&(stats.success() as i64),
&(stats.failure() as i64),
])
.await?;
}
writer.finish().await?;
Ok(())
}
pub async fn submit_metrics_to_db(client: Arc<Client>) -> anyhow::Result<()> {
let client = Arc::clone(&client);
let client2 = Arc::clone(&client);
submit_node_stats_to_db(client).await?;
submit_gateway_stats_to_db(client2).await?;
Ok(())
}
pub async fn submit_metrics(client: Option<Arc<Client>>) -> anyhow::Result<()> {
if let Some(client) = client {
submit_metrics_to_db(client).await?;
}
if let Some(private_key) = PRIVATE_KEY.get() {
let node_stats = monitor_mixnode_results().await?;
let gateway_stats = monitor_gateway_results().await?;
info!("Submitting metrics to {}", *NYM_API_URL);
let client = reqwest::Client::new();
let node_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_NODE}", &*NYM_API_URL);
let gateway_submit_url =
format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_GATEWAY}", &*NYM_API_URL);
info!("Submitting {} mixnode measurements", node_stats.len());
node_stats
.chunks(10)
.map(|chunk| {
let monitor_message = MonitorMessage::new(chunk.to_vec(), private_key);
client.post(&node_submit_url).json(&monitor_message).send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
info!("Submitting {} gateway measurements", gateway_stats.len());
gateway_stats
.chunks(10)
.map(|chunk| {
let monitor_message = MonitorMessage::new(
chunk.to_vec(),
PRIVATE_KEY.get().expect("We've set this!"),
);
client
.post(&gateway_submit_url)
.json(&monitor_message)
.send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
}
NetworkAccount::empty_buffers();
+34 -7
View File
@@ -2,7 +2,8 @@ use crate::http::HttpServer;
use accounting::submit_metrics;
use anyhow::Result;
use clap::Parser;
use log::{info, warn};
use log::{error, info, warn};
use nym_client_core::ForgetMe;
use nym_crypto::asymmetric::ed25519::PrivateKey;
use nym_network_defaults::setup_env;
use nym_network_defaults::var_names::NYM_API;
@@ -21,6 +22,7 @@ use std::{
};
use tokio::sync::OnceCell;
use tokio::{signal::ctrl_c, sync::RwLock};
use tokio_postgres::NoTls;
use tokio_util::sync::CancellationToken;
static NYM_API_URL: LazyLock<String> = LazyLock::new(|| {
@@ -56,7 +58,11 @@ async fn make_clients(
loop {
if Arc::strong_count(&dropped_client) == 1 {
if let Some(client) = Arc::into_inner(dropped_client) {
client.into_inner().disconnect().await;
// let forget_me = ClientRequest::ForgetMe {
// also_from_stats: true,
// };
let client_handle = client.into_inner();
client_handle.disconnect().await;
} else {
warn!("Failed to drop client, client had more then one strong ref")
}
@@ -89,6 +95,7 @@ async fn make_client(topology: NymTopology) -> Result<MixnetClient> {
.network_details(net)
.custom_topology_provider(topology_provider)
.debug_config(mixnet_debug_config(0))
.with_forget_me(ForgetMe::new_all())
// .enable_credentials_mode()
.build()?;
@@ -130,7 +137,10 @@ struct Args {
generate_key_pair: bool,
#[arg(long)]
private_key: String,
private_key: Option<String>,
#[arg(long, env = "DATABASE_URL")]
database_url: Option<String>,
}
fn generate_key_pair() -> Result<()> {
@@ -168,8 +178,10 @@ async fn main() -> Result<()> {
std::process::exit(0);
}
let pk = PrivateKey::from_base58_string(&args.private_key)?;
PRIVATE_KEY.set(pk).ok();
if let Some(private_key) = args.private_key {
let pk = PrivateKey::from_base58_string(&private_key)?;
PRIVATE_KEY.set(pk).ok();
}
TOPOLOGY
.set(if let Some(topology_file) = args.topology {
@@ -197,16 +209,31 @@ async fn main() -> Result<()> {
info!("Waiting for message (ctrl-c to exit)");
let client = if let Some(database_url) = args.database_url {
let (client, connection) = tokio_postgres::connect(&database_url, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Postgres connection error: {}", e);
}
});
Some(Arc::new(client))
} else {
None
};
loop {
let client = client.as_ref().map(Arc::clone);
match tokio::time::timeout(Duration::from_secs(600), ctrl_c()).await {
Ok(_) => {
info!("Received kill signal, shutting down, submitting final batch of metrics");
submit_metrics().await?;
submit_metrics(client).await?;
break;
}
Err(_) => {
info!("Submitting metrics, cleaning metric buffers");
submit_metrics().await?;
submit_metrics(client).await?;
}
};
}
@@ -3,7 +3,7 @@
set -eu
export ENVIRONMENT=${ENVIRONMENT:-"sandbox"}
probe_git_ref="nym-vpn-core-v1.0.0-rc.14"
probe_git_ref="nym-vpn-core-v1.1.0"
crate_root=$(dirname $(realpath "$0"))
monorepo_root=$(realpath "${crate_root}/../..")
@@ -54,7 +54,7 @@ function swarm() {
echo "All agents completed"
}
copy_gw_probe
# copy_gw_probe
build_agent
swarm $workers
@@ -1,6 +1,8 @@
use anyhow::{anyhow, Result};
use sqlx::{Connection, SqliteConnection};
#[cfg(target_family = "unix")]
use std::fs::Permissions;
#[cfg(target_family = "unix")]
use std::os::unix::fs::PermissionsExt;
use tokio::{fs::File, io::AsyncWriteExt};
@@ -39,7 +41,10 @@ async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Resu
file.write_all(format!("sqlite3 {}/{}", out_dir, db_filename).as_bytes())
.await?;
#[cfg(target_family = "unix")]
file.set_permissions(Permissions::from_mode(0o755))
.await
.map_err(From::from)
.map_err(anyhow::Error::from)?;
Ok(())
}
@@ -2,6 +2,7 @@
set -e
user_rust_log_preference=$RUST_LOG
export NYM_API_CLIENT_TIMEOUT=60
export EXPLORER_CLIENT_TIMEOUT=60
export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=120
@@ -20,7 +21,8 @@ function run_bare() {
set -a
source "${monorepo_root}/envs/${ENVIRONMENT}.env"
set +a
export RUST_LOG=debug
export RUST_LOG=${user_rust_log_preference:-debug}
echo "RUST_LOG=${RUST_LOG}"
# --conection-url is provided in build.rs
cargo run --package nym-node-status-api
@@ -154,31 +154,39 @@ mod api_regression {
use super::*;
use std::{env::var, sync::LazyLock};
static IPINFO_TOKEN: LazyLock<String> = LazyLock::new(|| var("IPINFO_API_TOKEN").unwrap());
static IPINFO_TOKEN: LazyLock<Option<String>> = LazyLock::new(|| var("IPINFO_API_TOKEN").ok());
static CI: LazyLock<Option<String>> = LazyLock::new(|| var("CI").ok());
#[tokio::test]
async fn should_parse_response() {
let client = IpInfoClient::new(&(*IPINFO_TOKEN));
let my_ip = reqwest::get("https://api.ipify.org")
.await
.expect("Couldn't get own IP")
.text()
.await
.unwrap();
if CI.is_none() {
return;
}
if let Some(token) = &*IPINFO_TOKEN {
let client = IpInfoClient::new(token);
let my_ip = reqwest::get("https://api.ipify.org")
.await
.expect("Couldn't get own IP")
.text()
.await
.unwrap();
let location_result = client.locate_ip(my_ip).await;
assert!(location_result.is_ok(), "Did ipinfo response change?");
let location_result = client.locate_ip(my_ip).await;
assert!(location_result.is_ok(), "Did ipinfo response change?");
assert!(
client.check_remaining_bandwidth().await.is_ok(),
"Failed to check remaining bandwidth?"
);
assert!(
client.check_remaining_bandwidth().await.is_ok(),
"Failed to check remaining bandwidth?"
);
// when serialized, these fields should be present because they're exposed over API
let location_result = location_result.unwrap();
let json = serde_json::to_value(&location_result).unwrap();
assert!(json.get("two_letter_iso_country_code").is_some());
assert!(json.get("latitude").is_some());
assert!(json.get("longitude").is_some());
// when serialized, these fields should be present because they're exposed over API
let location_result = location_result.unwrap();
let json = serde_json::to_value(&location_result).unwrap();
assert!(json.get("two_letter_iso_country_code").is_some());
assert!(json.get("latitude").is_some());
assert!(json.get("longitude").is_some());
} else {
panic!("IPINFO_API_TOKEN not set");
}
}
}
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-node"
version = "1.2.0"
version = "1.3.1"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
+12
View File
@@ -1,9 +1,16 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![warn(clippy::expect_used)]
#![warn(clippy::unwrap_used)]
#![warn(clippy::todo)]
#![warn(clippy::dbg_macro)]
use crate::entry::EntryStats;
use crate::mixnet::MixingStats;
use crate::network::NetworkStats;
use crate::process::NodeStats;
use crate::wireguard::WireguardStats;
use std::ops::Deref;
use std::sync::Arc;
@@ -11,6 +18,9 @@ pub mod entry;
pub mod events;
pub mod mixnet;
pub mod network;
pub mod process;
pub mod prometheus_wrapper;
pub mod wireguard;
#[derive(Clone, Default)]
pub struct NymNodeMetrics {
@@ -34,6 +44,8 @@ impl Deref for NymNodeMetrics {
pub struct NymNodeMetricsInner {
pub mixnet: MixingStats,
pub entry: EntryStats,
pub wireguard: WireguardStats,
pub network: NetworkStats,
pub process: NodeStats,
}
+10 -7
View File
@@ -131,13 +131,6 @@ impl MixingStats {
.or_default()
.dropped += 1;
}
pub fn egress_dropped_final_hop_packet(&self) {
todo!()
// self.egress
// .final_hop_packets_dropped
// .fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Clone, Copy, Default, PartialEq)]
@@ -148,6 +141,8 @@ pub struct EgressRecipientStats {
#[derive(Default)]
pub struct EgressMixingStats {
disk_persisted_packets: AtomicUsize,
// this includes ACKS!
forward_hop_packets_sent: AtomicUsize,
@@ -159,6 +154,14 @@ pub struct EgressMixingStats {
}
impl EgressMixingStats {
pub fn add_disk_persisted_packet(&self) {
self.disk_persisted_packets.fetch_add(1, Ordering::Relaxed);
}
pub fn disk_persisted_packets(&self) -> usize {
self.disk_persisted_packets.load(Ordering::Relaxed)
}
pub fn forward_hop_packets_sent(&self) -> usize {
self.forward_hop_packets_sent.load(Ordering::Relaxed)
}
+32
View File
@@ -2,11 +2,19 @@
// SPDX-License-Identifier: Apache-2.0
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Default)]
pub struct NetworkStats {
// for now just experiment with basic data, we could always extend it
active_ingress_mixnet_connections: AtomicUsize,
active_ingress_websocket_connections: AtomicUsize,
// the reason for additional `Arc` on this one is that the handler wasn't
// designed with metrics in mind and this single counter has been woven through
// the call stack
active_egress_mixnet_connections: Arc<AtomicUsize>,
}
impl NetworkStats {
@@ -20,8 +28,32 @@ impl NetworkStats {
.fetch_sub(1, Ordering::Relaxed);
}
pub fn new_ingress_websocket_client(&self) {
self.active_ingress_websocket_connections
.fetch_add(1, Ordering::Relaxed);
}
pub fn disconnected_ingress_websocket_client(&self) {
self.active_ingress_websocket_connections
.fetch_sub(1, Ordering::Relaxed);
}
pub fn active_ingress_mixnet_connections_count(&self) -> usize {
self.active_ingress_mixnet_connections
.load(Ordering::Relaxed)
}
pub fn active_ingress_websocket_connections_count(&self) -> usize {
self.active_ingress_websocket_connections
.load(Ordering::Relaxed)
}
pub fn active_egress_mixnet_connections_counter(&self) -> Arc<AtomicUsize> {
self.active_egress_mixnet_connections.clone()
}
pub fn active_egress_mixnet_connections_count(&self) -> usize {
self.active_egress_mixnet_connections
.load(Ordering::Relaxed)
}
}
+57
View File
@@ -0,0 +1,57 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Default)]
pub struct NodeStats {
pub final_hop_packets_pending_delivery: AtomicUsize,
pub forward_hop_packets_pending_delivery: AtomicUsize,
pub forward_hop_packets_being_delayed: AtomicUsize,
// packets that haven't yet been delayed and are waiting for their chance
pub packet_forwarder_queue_size: AtomicUsize,
}
impl NodeStats {
pub fn update_final_hop_packets_pending_delivery(&self, current: usize) {
self.final_hop_packets_pending_delivery
.store(current, Ordering::Relaxed);
}
pub fn final_hop_packets_pending_delivery_count(&self) -> usize {
self.final_hop_packets_pending_delivery
.load(Ordering::Relaxed)
}
pub fn update_forward_hop_packets_pending_delivery(&self, current: usize) {
self.forward_hop_packets_pending_delivery
.store(current, Ordering::Relaxed);
}
pub fn forward_hop_packets_pending_delivery_count(&self) -> usize {
self.forward_hop_packets_pending_delivery
.load(Ordering::Relaxed)
}
pub fn update_forward_hop_packets_being_delayed(&self, current: usize) {
self.forward_hop_packets_being_delayed
.store(current, Ordering::Relaxed);
}
pub fn forward_hop_packets_being_delayed_count(&self) -> usize {
self.forward_hop_packets_being_delayed
.load(Ordering::Relaxed)
}
pub fn update_packet_forwarder_queue_size(&self, current: usize) {
self.packet_forwarder_queue_size
.store(current, Ordering::Relaxed);
}
pub fn packet_forwarder_queue_size(&self) -> usize {
self.packet_forwarder_queue_size.load(Ordering::Relaxed)
}
}
@@ -0,0 +1,390 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_metrics::{metrics_registry, HistogramTimer, Metric};
use std::sync::LazyLock;
use strum::{Display, EnumCount, EnumIter, EnumProperty, IntoEnumIterator};
pub static PROMETHEUS_METRICS: LazyLock<NymNodePrometheusMetrics> =
LazyLock::new(NymNodePrometheusMetrics::initialise);
const CLIENT_SESSION_DURATION_BUCKETS: &[f64] = &[
// sub 3s (implicitly)
3., // 3s - 15s
15., // 15s - 70s
70., // 70s - 2min
120., // 2 min - 5 min
300., // 5min - 15min
900., // 15min - 1h
3600., // 1h - 12h
43200., // 12h - 23.5h
84600., // 23.5h - 24.5h
88200., // 24.5h - 72h
259200., // 72h+ (implicitly)
];
#[derive(Clone, Debug, EnumIter, Display, EnumProperty, EnumCount, Eq, Hash, PartialEq)]
#[strum(serialize_all = "snake_case", prefix = "nym_node_")]
pub enum PrometheusMetric {
// # MIXNET
// ## INGRESS
#[strum(props(help = "The number of ingress forward hop sphinx packets received"))]
MixnetIngressForwardPacketsReceived,
#[strum(props(help = "The number of ingress final hop sphinx packets received"))]
MixnetIngressFinalHopPacketsReceived,
#[strum(props(help = "The number of ingress malformed sphinx packets received"))]
MixnetIngressMalformedPacketsReceived,
#[strum(props(
help = "The number of ingress forward sphinx packets that specified excessive delay received"
))]
MixnetIngressExcessiveDelayPacketsReceived,
#[strum(props(help = "The number of ingress forward hop sphinx packets dropped"))]
MixnetIngressForwardPacketsDropped,
#[strum(props(help = "The number of ingress final hop sphinx packets dropped"))]
MixnetIngressFinalHopPacketsDropped,
#[strum(props(help = "The current rate of receiving ingress forward hop sphinx packets"))]
MixnetIngressForwardPacketsReceivedRate,
#[strum(props(help = "The current rate of receiving ingress final hop sphinx packets"))]
MixnetIngressFinalHopPacketsReceivedRate,
#[strum(props(help = "The current rate of receiving ingress malformed sphinx packets"))]
MixnetIngressMalformedPacketsReceivedRate,
#[strum(props(
help = "The current rate of receiving ingress sphinx packets that specified excessive delay"
))]
MixnetIngressExcessiveDelayPacketsReceivedRate,
#[strum(props(help = "The current rate of dropping ingress forward hop sphinx packets"))]
MixnetIngressForwardPacketsDroppedRate,
#[strum(props(help = "The current rate of dropping ingress final hop sphinx packets"))]
MixnetIngressFinalHopPacketsDroppedRate,
// ## EGRESS
#[strum(props(
help = "The number of unwrapped final hop packets stored on disk for offline clients"
))]
MixnetEgressStoredOnDiskFinalHopPackets,
#[strum(props(help = "The number of egress forward hop sphinx packets sent/forwarded"))]
MixnetEgressForwardPacketsSent,
#[strum(props(
help = "The number of egress forward hop sphinx packets sent/forwarded (acks only)"
))]
MixnetEgressAckSent,
#[strum(props(help = "The number of egress forward hop sphinx packets dropped"))]
MixnetEgressForwardPacketsDropped,
#[strum(props(
help = "The current rate of sending/forwarding egress forward hop sphinx packets"
))]
MixnetEgressForwardPacketsSendRate,
#[strum(props(
help = "The current rate of sending/forwarding egress forward hop sphinx packets (acks only)"
))]
MixnetEgressAckSendRate,
#[strum(props(help = "The current rate of dropping egress forward hop sphinx packets"))]
MixnetEgressForwardPacketsDroppedRate,
// # ENTRY
#[strum(props(help = "The number of unique users"))]
EntryClientUniqueUsers,
#[strum(props(help = "The number of client sessions started"))]
EntryClientSessionsStarted,
#[strum(props(help = "The number of client sessions finished"))]
EntryClientSessionsFinished,
#[strum(to_string = "entry_client_sessions_durations_{typ}")]
#[strum(props(help = "The distribution of client sessions duration of the specified type"))]
EntryClientSessionsDurations { typ: String },
// # WIREGUARD
#[strum(props(help = "The amount of bytes transmitted via wireguard"))]
WireguardBytesTx,
#[strum(props(help = "The amount of bytes received via wireguard"))]
WireguardBytesRx,
#[strum(props(help = "The current number of all registered wireguard peers"))]
WireguardTotalPeers,
#[strum(props(help = "The current number of active wireguard peers"))]
WireguardActivePeers,
#[strum(props(help = "The current sending rate of wireguard"))]
WireguardBytesTxRate,
#[strum(props(help = "The current receiving rate of wireguard"))]
WireguardBytesRxRate,
// # NETWORK
#[strum(props(help = "The number of active ingress mixnet connections"))]
NetworkActiveIngressMixnetConnections,
#[strum(props(help = "The number of active ingress websocket connections"))]
NetworkActiveIngressWebSocketConnections,
#[strum(props(help = "The number of active egress mixnet connections"))]
NetworkActiveEgressMixnetConnections,
// # PROCESS
#[strum(props(help = "The current number of packets being delayed"))]
ProcessForwardHopPacketsBeingDelayed,
#[strum(props(
help = "The current number of packets waiting in the queue to get delayed and sent into the mixnet"
))]
ProcessPacketForwarderQueueSize,
#[strum(props(
help = "The latency distribution of attempting to retrieve network topology (from nym-api)"
))]
ProcessTopologyQueryResolutionLatency,
#[strum(props(
help = "The current number of final hop packets stuck in channels waiting to get delivered to appropriate websocket connections"
))]
ProcessFinalHopPacketsPendingDelivery,
#[strum(props(
help = "The current number of forward hop packets stuck in channels waiting to get delivered to appropriate TCP connections"
))]
ProcessForwardHopPacketsPendingDelivery,
}
impl PrometheusMetric {
fn name(&self) -> String {
self.to_string()
}
fn help(&self) -> &'static str {
// SAFETY: every variant has a `help` prop defined (and there's a unit test is checking for that)
#[allow(clippy::unwrap_used)]
self.get_str("help").unwrap()
}
fn is_complex(&self) -> bool {
matches!(self, PrometheusMetric::EntryClientSessionsDurations { .. })
// match self {
// PrometheusMetric::EntryClientSessionsDurations { .. } => true,
// _ => false,
// }
}
fn to_registrable_metric(&self) -> Option<Metric> {
let name = self.name();
let help = self.help();
match self {
PrometheusMetric::MixnetIngressForwardPacketsReceived => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetIngressFinalHopPacketsReceived => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetIngressMalformedPacketsReceived => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetIngressExcessiveDelayPacketsReceived => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetIngressForwardPacketsDropped => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetIngressFinalHopPacketsDropped => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetIngressForwardPacketsReceivedRate => {
Metric::new_float_gauge(&name, help)
}
PrometheusMetric::MixnetIngressFinalHopPacketsReceivedRate => {
Metric::new_float_gauge(&name, help)
}
PrometheusMetric::MixnetIngressMalformedPacketsReceivedRate => {
Metric::new_float_gauge(&name, help)
}
PrometheusMetric::MixnetIngressExcessiveDelayPacketsReceivedRate => {
Metric::new_float_gauge(&name, help)
}
PrometheusMetric::MixnetIngressForwardPacketsDroppedRate => {
Metric::new_float_gauge(&name, help)
}
PrometheusMetric::MixnetIngressFinalHopPacketsDroppedRate => {
Metric::new_float_gauge(&name, help)
}
PrometheusMetric::MixnetEgressStoredOnDiskFinalHopPackets => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetEgressForwardPacketsSent => Metric::new_int_gauge(&name, help),
PrometheusMetric::MixnetEgressAckSent => Metric::new_int_gauge(&name, help),
PrometheusMetric::MixnetEgressForwardPacketsDropped => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetEgressForwardPacketsSendRate => {
Metric::new_float_gauge(&name, help)
}
PrometheusMetric::MixnetEgressAckSendRate => Metric::new_float_gauge(&name, help),
PrometheusMetric::MixnetEgressForwardPacketsDroppedRate => {
Metric::new_float_gauge(&name, help)
}
PrometheusMetric::EntryClientUniqueUsers => Metric::new_int_gauge(&name, help),
PrometheusMetric::EntryClientSessionsStarted => Metric::new_int_gauge(&name, help),
PrometheusMetric::EntryClientSessionsFinished => Metric::new_int_gauge(&name, help),
PrometheusMetric::EntryClientSessionsDurations { .. } => {
Metric::new_histogram(&name, help, Some(CLIENT_SESSION_DURATION_BUCKETS))
}
PrometheusMetric::WireguardBytesTx => Metric::new_int_gauge(&name, help),
PrometheusMetric::WireguardBytesRx => Metric::new_int_gauge(&name, help),
PrometheusMetric::WireguardTotalPeers => Metric::new_int_gauge(&name, help),
PrometheusMetric::WireguardActivePeers => Metric::new_int_gauge(&name, help),
PrometheusMetric::WireguardBytesTxRate => Metric::new_float_gauge(&name, help),
PrometheusMetric::WireguardBytesRxRate => Metric::new_float_gauge(&name, help),
PrometheusMetric::NetworkActiveIngressMixnetConnections => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::NetworkActiveIngressWebSocketConnections => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::NetworkActiveEgressMixnetConnections => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::ProcessForwardHopPacketsBeingDelayed => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::ProcessPacketForwarderQueueSize => Metric::new_int_gauge(&name, help),
PrometheusMetric::ProcessTopologyQueryResolutionLatency => {
Metric::new_histogram(&name, help, None)
}
PrometheusMetric::ProcessFinalHopPacketsPendingDelivery => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::ProcessForwardHopPacketsPendingDelivery => {
Metric::new_int_gauge(&name, help)
}
}
}
fn set(&self, value: i64) {
metrics_registry().set(&self.name(), value);
}
fn set_float(&self, value: f64) {
metrics_registry().set_float(&self.name(), value);
}
fn inc(&self) {
metrics_registry().inc(&self.name());
}
fn inc_by(&self, value: i64) {
metrics_registry().inc_by(&self.name(), value);
}
fn observe_histogram(&self, value: f64) {
let reg = metrics_registry();
if !reg.add_to_histogram(&self.name(), value) {
if let Some(registrable) = self.to_registrable_metric() {
reg.register_metric(registrable);
reg.add_to_histogram(&self.name(), value);
}
}
}
fn start_timer(&self) -> Option<HistogramTimer> {
metrics_registry().start_timer(&self.name())
}
}
#[non_exhaustive]
pub struct NymNodePrometheusMetrics {}
impl NymNodePrometheusMetrics {
// initialise all fields on startup with default values so that they'd be immediately available for query
pub(crate) fn initialise() -> Self {
let registry = metrics_registry();
// we can't initialise complex metrics as their names will only be fully known at runtime
for kind in PrometheusMetric::iter() {
if !kind.is_complex() {
if let Some(metric) = kind.to_registrable_metric() {
registry.register_metric(metric);
}
}
}
NymNodePrometheusMetrics {}
}
pub fn set(&self, metric: PrometheusMetric, value: i64) {
metric.set(value)
}
pub fn set_float(&self, metric: PrometheusMetric, value: f64) {
metric.set_float(value)
}
pub fn inc(&self, metric: PrometheusMetric) {
metric.inc()
}
pub fn inc_by(&self, metric: PrometheusMetric, value: i64) {
metric.inc_by(value)
}
pub fn observe_histogram(&self, metric: PrometheusMetric, value: f64) {
metric.observe_histogram(value)
}
pub fn start_timer(&self, metric: PrometheusMetric) -> Option<HistogramTimer> {
metric.start_timer()
}
}
#[cfg(test)]
mod tests {
use super::*;
use strum::IntoEnumIterator;
#[test]
fn prometheus_metrics() {
// a sanity check for anyone adding new metrics. if this test fails,
// make sure any methods on `PrometheusMetric` enum don't need updating
// or require custom Display impl
assert_eq!(37, PrometheusMetric::COUNT)
}
#[test]
fn every_variant_has_help_property() {
for variant in PrometheusMetric::iter() {
assert!(variant.get_str("help").is_some())
}
}
#[test]
fn prometheus_metrics_names() {
// make sure nothing changed in our serialisation
let simple = PrometheusMetric::MixnetIngressForwardPacketsReceived.to_string();
assert_eq!("nym_node_mixnet_ingress_forward_packets_received", simple);
let parameterised =
PrometheusMetric::EntryClientSessionsDurations { typ: "vpn".into() }.to_string();
assert_eq!(
"nym_node_entry_client_sessions_durations_vpn",
parameterised
)
}
}
@@ -0,0 +1,44 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Default)]
pub struct WireguardStats {
bytes_rx: AtomicUsize,
bytes_tx: AtomicUsize,
total_peers: AtomicUsize,
active_peers: AtomicUsize,
}
impl WireguardStats {
pub fn bytes_rx(&self) -> usize {
self.bytes_rx.load(Ordering::Relaxed)
}
pub fn bytes_tx(&self) -> usize {
self.bytes_tx.load(Ordering::Relaxed)
}
pub fn total_peers(&self) -> usize {
self.total_peers.load(Ordering::Relaxed)
}
pub fn active_peers(&self) -> usize {
self.active_peers.load(Ordering::Relaxed)
}
pub fn update(
&self,
new_bytes_rx: usize,
new_bytes_tx: usize,
total_peers: usize,
active_peers: usize,
) {
self.bytes_rx.fetch_add(new_bytes_rx, Ordering::Relaxed);
self.bytes_tx.fetch_add(new_bytes_tx, Ordering::Relaxed);
self.total_peers.store(total_peers, Ordering::Relaxed);
self.active_peers.store(active_peers, Ordering::Relaxed);
}
}
@@ -4,6 +4,19 @@
pub use mixing::*;
pub use session::*;
pub use verloc::*;
pub use wireguard::*;
pub mod wireguard {
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct WireguardStats {
pub bytes_tx: usize,
pub bytes_rx: usize,
}
}
pub mod packets {
use serde::{Deserialize, Serialize};

Some files were not shown because too many files have changed in this diff Show More