Compare commits
140 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 02af95c65e | |||
| 85be73401e | |||
| 8385f719e2 | |||
| d1fb926a2a | |||
| dea69acd49 | |||
| ada2d2247a | |||
| 0159d7c27a | |||
| 28997c7f97 | |||
| a6c586a33b | |||
| 7c85c1a271 | |||
| 92c8d1b73f | |||
| 554e9ca490 | |||
| 6e6675f7bf | |||
| a7f7ebfbae | |||
| 1aec8be85e | |||
| 4b474dd8ff | |||
| 9b627dd70f | |||
| 9a0b769425 | |||
| 8e14f5f884 | |||
| 1b64cb42b0 | |||
| 03c4895f2b | |||
| dcfb092758 | |||
| 9305ad5364 | |||
| ea5aef6c2f | |||
| 61a4433cd9 | |||
| 5c89d36140 | |||
| 5ab164d229 | |||
| 26538c5884 | |||
| adb248dbcc | |||
| fffec65cab | |||
| bb24004d46 | |||
| c487eff7ca | |||
| 5fa21c9aae | |||
| fd18aae0d6 | |||
| c202e2d598 | |||
| 62d23cff9f | |||
| e454d71b78 | |||
| a7874add88 | |||
| 0a47d5dcf8 | |||
| 3d84be22e2 | |||
| 6ccbb30491 | |||
| 91c205f83a | |||
| 4a704e992a | |||
| 6c88c7df42 | |||
| 2a748fc968 | |||
| 25766dc0ec | |||
| 07544d939e | |||
| 102cd1033c | |||
| 676e93a372 | |||
| 5a6770e5e2 | |||
| 529e8d49ee | |||
| 11d6ee2fdb | |||
| d704c428fc | |||
| bca070c1bd | |||
| a94c035c0a | |||
| 24480418f0 | |||
| 226c040a13 | |||
| a46245ffe3 | |||
| 7c1c13e139 | |||
| 836a93cd96 | |||
| 3d2914b3e5 | |||
| 9b02de3e75 | |||
| b47a742dd0 | |||
| 6e14882246 | |||
| f3d8aba82c | |||
| a7466a0e02 | |||
| 78f45012db | |||
| f6a2f62ea9 | |||
| 3efeededc5 | |||
| c482350ec6 | |||
| 72a4a26c40 | |||
| 5d9b5a0d70 | |||
| c070e4bfee | |||
| 4d3d60b78e | |||
| 41fb17a31b | |||
| 7d5e3ef7d3 | |||
| 4f283f565c | |||
| 2fab3f11b6 | |||
| d0722e5f63 | |||
| 64373548e4 | |||
| bad85abff3 | |||
| 6e66cc2467 | |||
| c805aa79a4 | |||
| f5ca1ee20a | |||
| 4f07343efd | |||
| 94ab78606a | |||
| 7b92e471c8 | |||
| a507ffe371 | |||
| c02e93004f | |||
| 1113e0c599 | |||
| 06c7394861 | |||
| e20bea9d32 | |||
| eeea32fdca | |||
| b06349efd0 | |||
| 98a4cb4ae8 | |||
| be185824b4 | |||
| 60e8e53f3b | |||
| 1890367bfc | |||
| 2b26a88d6c | |||
| a6f4f017c7 | |||
| d8a6ca48c1 | |||
| 541d46e899 | |||
| 39f525e88e | |||
| 156e892baa | |||
| 5b6ae39dab | |||
| df004f834f | |||
| 235165171b | |||
| 572875058d | |||
| cf6f437187 | |||
| 6010de978d | |||
| d951ea9548 | |||
| 868d7439ec | |||
| a884aee1e9 | |||
| 80f965a104 | |||
| c99a240ed4 | |||
| 67976b1b30 | |||
| a2322d6cdf | |||
| ae346bb75b | |||
| 53c28af847 | |||
| 3521f36374 | |||
| f7a7a8072f | |||
| 3695332036 | |||
| acd068e5ab | |||
| d03302c391 | |||
| cd86110b2c | |||
| 8d5a41a790 | |||
| caa17d933c | |||
| ad0c135d4c | |||
| 039b05cf7e | |||
| 37b10b59aa | |||
| a9ede22bbd | |||
| b656003306 | |||
| 61e872f033 | |||
| b4f51baf94 | |||
| a3f3d83c1b | |||
| 84d7004cb2 | |||
| be063a36eb | |||
| 0a712b9fce | |||
| 88d6fb4e22 | |||
| 04c2045d94 |
@@ -0,0 +1 @@
|
||||
nym-validator-rewarder/.sqlx/** diff=nodiff
|
||||
@@ -31,3 +31,5 @@ updates:
|
||||
update-types:
|
||||
- "patch"
|
||||
open-pull-requests-limit: 10
|
||||
assignees:
|
||||
- "octol"
|
||||
|
||||
@@ -79,7 +79,6 @@ jobs:
|
||||
target/release/nym-socks5-client
|
||||
target/release/nym-api
|
||||
target/release/nym-network-requester
|
||||
target/release/nym-data-observatory
|
||||
target/release/nym-cli
|
||||
target/release/nymvisor
|
||||
target/release/nym-node
|
||||
@@ -97,7 +96,6 @@ jobs:
|
||||
cp target/release/nym-socks5-client $OUTPUT_DIR
|
||||
cp target/release/nym-api $OUTPUT_DIR
|
||||
cp target/release/nym-network-requester $OUTPUT_DIR
|
||||
cp target/release/nym-data-observatory $OUTPUT_DIR
|
||||
cp target/release/nymvisor $OUTPUT_DIR
|
||||
cp target/release/nym-node $OUTPUT_DIR
|
||||
cp target/release/nym-cli $OUTPUT_DIR
|
||||
|
||||
@@ -9,15 +9,16 @@ on:
|
||||
- 'gateway/**'
|
||||
- 'integrations/**'
|
||||
- 'mixnode/**'
|
||||
- 'sdk/rust/**'
|
||||
- 'sdk/lib/**'
|
||||
- 'service-providers/**'
|
||||
- 'nym-network-monitor/**'
|
||||
- 'nym-api/**'
|
||||
- 'nym-node/**'
|
||||
- 'nym-outfox/**'
|
||||
- 'nym-data-observatory/**'
|
||||
- 'nym-network-monitor/**'
|
||||
- 'nym-node/**'
|
||||
- 'nym-node-status-api/**'
|
||||
- 'nym-outfox/**'
|
||||
- 'nym-validator-rewarder/**'
|
||||
- 'sdk/lib/**'
|
||||
- 'sdk/rust/**'
|
||||
- 'service-providers/**'
|
||||
- 'tools/**'
|
||||
- 'wasm/**'
|
||||
- 'Cargo.toml'
|
||||
@@ -30,7 +31,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os: [ arc-ubuntu-20.04, custom-runner-mac-m1 ]
|
||||
os: [ arc-ubuntu-20.04, custom-windows-11, custom-runner-mac-m1 ]
|
||||
runs-on: ${{ matrix.os }}
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
|
||||
@@ -15,6 +15,7 @@ jobs:
|
||||
runs-on: ${{ matrix.os }}
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
IPINFO_API_TOKEN: ${{ secrets.IPINFO_API_TOKEN }}
|
||||
continue-on-error: true
|
||||
steps:
|
||||
- name: Check out repository code
|
||||
|
||||
@@ -26,7 +26,7 @@ jobs:
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.6
|
||||
uses: mikefarah/yq@v4.45.1
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-credential-proxy/Cargo.toml
|
||||
|
||||
@@ -26,7 +26,7 @@ jobs:
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.6
|
||||
uses: mikefarah/yq@v4.45.1
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
@@ -26,7 +26,7 @@ jobs:
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.6
|
||||
uses: mikefarah/yq@v4.45.1
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-network-monitor/Cargo.toml
|
||||
|
||||
@@ -31,7 +31,7 @@ jobs:
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.6
|
||||
uses: mikefarah/yq@v4.45.1
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
@@ -26,7 +26,7 @@ jobs:
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.6
|
||||
uses: mikefarah/yq@v4.45.1
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
@@ -26,7 +26,7 @@ jobs:
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.6
|
||||
uses: mikefarah/yq@v4.45.1
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
@@ -26,7 +26,7 @@ jobs:
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.6
|
||||
uses: mikefarah/yq@v4.45.1
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
@@ -26,7 +26,7 @@ jobs:
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.6
|
||||
uses: mikefarah/yq@v4.45.1
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
+8
-1
@@ -51,4 +51,11 @@ ppa-private-key.b64
|
||||
ppa-private-key.asc
|
||||
nym-network-monitor/topology.json
|
||||
nym-network-monitor/__pycache__
|
||||
nym-network-monitor/*.key
|
||||
nym-network-monitor/*.key
|
||||
nym-network-monitor/.envrc
|
||||
nym-network-monitor/.envrc
|
||||
nym-api/redocly/formatted-openapi.json
|
||||
|
||||
|
||||
*.sqlite
|
||||
.build
|
||||
|
||||
+155
@@ -4,6 +4,161 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [2025.2-hu] (2025-01-28)
|
||||
|
||||
- chore :update version of chain watcher and validator rewarder ([#5394])
|
||||
- bugfix: correctly handle ingore epoch roles flag ([#5390])
|
||||
- bugfix: terminate mixnet socket listener on shutdown ([#5389])
|
||||
- feat: make client ignore dual mode nodes by default ([#5388])
|
||||
- Handle ecash network errors differently ([#5378])
|
||||
- Remove empty ephemeral keys ([#5376])
|
||||
- fixed sql migration for adding default message timestamp ([#5374])
|
||||
- Bind to [::] on nym-node for both IP versions ([#5361])
|
||||
- exposed NymApiClient method for obtaining node performance history ([#5360])
|
||||
- Client gateway selection ([#5358])
|
||||
- chore: refresh wasm sdk ([#5353])
|
||||
- chore: update indexed_db_futures ([#5347])
|
||||
- build(deps): bump mikefarah/yq from 4.44.6 to 4.45.1 ([#5342])
|
||||
- updated cosmrs and tendermint-rpc to their most recent versions ([#5339])
|
||||
- build(deps): bump ts-rs from 10.0.0 to 10.1.0 ([#5338])
|
||||
- build(deps): bump tempfile from 3.14.0 to 3.15.0 ([#5337])
|
||||
- build(deps): bump the patch-updates group with 8 updates ([#5336])
|
||||
- feature: introduce /load endpoint for self-reported quantised NymNode load ([#5326])
|
||||
- feature: `CancellationToken`-based shutdowns ([#5325])
|
||||
- Use expect in geodata test to give error message on failure ([#5314])
|
||||
- feature: periodically remove stale gateway messages ([#5312])
|
||||
- build(deps): bump the patch-updates group across 1 directory with 35 updates ([#5310])
|
||||
- Add dependabot assignes for the root cargo ecosystem ([#5297])
|
||||
- Move tun constants to network defaults ([#5286])
|
||||
- Include IPINFO_API_TOKEN in nightly CI ([#5285])
|
||||
- Nyx Chain Watcher ([#5274])
|
||||
- bugfix: remove unnecessary arguments for nym-api swagger endpoints ([#5272])
|
||||
- feature: nym topology revamp ([#5271])
|
||||
- Add windows to CI builds ([#5269])
|
||||
- http-api-client: deduplicate code ([#5267])
|
||||
- build(deps): bump http from 1.1.0 to 1.2.0 ([#5228])
|
||||
- NS API: add mixnet scraper ([#5200])
|
||||
- build(deps): bump criterion from 0.4.0 to 0.5.1 ([#4911])
|
||||
|
||||
[#5394]: https://github.com/nymtech/nym/pull/5394
|
||||
[#5390]: https://github.com/nymtech/nym/pull/5390
|
||||
[#5389]: https://github.com/nymtech/nym/pull/5389
|
||||
[#5388]: https://github.com/nymtech/nym/pull/5388
|
||||
[#5378]: https://github.com/nymtech/nym/pull/5378
|
||||
[#5376]: https://github.com/nymtech/nym/pull/5376
|
||||
[#5374]: https://github.com/nymtech/nym/pull/5374
|
||||
[#5361]: https://github.com/nymtech/nym/pull/5361
|
||||
[#5360]: https://github.com/nymtech/nym/pull/5360
|
||||
[#5358]: https://github.com/nymtech/nym/pull/5358
|
||||
[#5353]: https://github.com/nymtech/nym/pull/5353
|
||||
[#5347]: https://github.com/nymtech/nym/pull/5347
|
||||
[#5342]: https://github.com/nymtech/nym/pull/5342
|
||||
[#5339]: https://github.com/nymtech/nym/pull/5339
|
||||
[#5338]: https://github.com/nymtech/nym/pull/5338
|
||||
[#5337]: https://github.com/nymtech/nym/pull/5337
|
||||
[#5336]: https://github.com/nymtech/nym/pull/5336
|
||||
[#5326]: https://github.com/nymtech/nym/pull/5326
|
||||
[#5325]: https://github.com/nymtech/nym/pull/5325
|
||||
[#5314]: https://github.com/nymtech/nym/pull/5314
|
||||
[#5312]: https://github.com/nymtech/nym/pull/5312
|
||||
[#5310]: https://github.com/nymtech/nym/pull/5310
|
||||
[#5297]: https://github.com/nymtech/nym/pull/5297
|
||||
[#5286]: https://github.com/nymtech/nym/pull/5286
|
||||
[#5285]: https://github.com/nymtech/nym/pull/5285
|
||||
[#5274]: https://github.com/nymtech/nym/pull/5274
|
||||
[#5272]: https://github.com/nymtech/nym/pull/5272
|
||||
[#5271]: https://github.com/nymtech/nym/pull/5271
|
||||
[#5269]: https://github.com/nymtech/nym/pull/5269
|
||||
[#5267]: https://github.com/nymtech/nym/pull/5267
|
||||
[#5228]: https://github.com/nymtech/nym/pull/5228
|
||||
[#5200]: https://github.com/nymtech/nym/pull/5200
|
||||
[#4911]: https://github.com/nymtech/nym/pull/4911
|
||||
|
||||
## [2025.1-reeses] (2025-01-15)
|
||||
|
||||
- Feture/legacy alert ([#5346])
|
||||
- chore: readjusted --mode behaviour to fix the regression ([#5331])
|
||||
- chore: apply 1.84 linter suggestions ([#5330])
|
||||
- bugfix: make sure refresh data key matches bond info ([#5329])
|
||||
- reduce log severity for number of packets being delayed ([#5321])
|
||||
- feat: warn users if node is run in exit mode only ([#5320])
|
||||
- Bugfix/contract version assignment ([#5318])
|
||||
- fixed client session histogram buckets ([#5316])
|
||||
- amend 250gb limit ([#5313])
|
||||
- feature: expand nym-node prometheus metrics ([#5298])
|
||||
- Cherry picked #5286 ([#5287])
|
||||
- Add close to credential storage ([#5283])
|
||||
- feature: wireguard metrics ([#5278])
|
||||
- Add PATCH support to nym-http-api-client ([#5260])
|
||||
- chore: removed legacy socks5 listener ([#5259])
|
||||
- bugfix: make sure to apply gateway score filtering when choosing initial node ([#5256])
|
||||
- Update TS bindings ([#5255])
|
||||
- Add conversion unit tests for auth msg ([#5251])
|
||||
- Add control messages to GatewayTransciver ([#5247])
|
||||
- Remove unneeded async function annotation ([#5246])
|
||||
- bugfix: make sure to update timestamp of last batch verification to prevent double redemption ([#5239])
|
||||
- Add FromStr impl for UserAgent ([#5236])
|
||||
- Extend swagger docs ([#5235])
|
||||
- TicketType derive Hash and Eq ([#5233])
|
||||
- Add fd callback to client core ([#5230])
|
||||
- Extend raw ws fd for gateway client ([#5218])
|
||||
- Shipping raw metrics to PG ([#5216])
|
||||
- Change sqlite journal mode to WAL ([#5213])
|
||||
- Derive serialize for UserAgent ([#5210])
|
||||
- Restore Location fields ([#5208])
|
||||
- better date serialization ([#5207])
|
||||
- Fix overflow ([#5204])
|
||||
- feature: hopefully final steps of the smoosh™️ ([#5201])
|
||||
- Fix overflow ([#5184])
|
||||
- NS API - Gateway stats scraping ([#5180])
|
||||
- introduced initial internal commands for nym-cli: ecash key and request generation ([#5174])
|
||||
- Move NS client to separate package under NS API ([#5171])
|
||||
- build(deps): bump micromatch from 4.0.4 to 4.0.8 in /testnet-faucet ([#4813])
|
||||
|
||||
[#5346]: https://github.com/nymtech/nym/pull/5346
|
||||
[#5331]: https://github.com/nymtech/nym/pull/5331
|
||||
[#5330]: https://github.com/nymtech/nym/pull/5330
|
||||
[#5329]: https://github.com/nymtech/nym/pull/5329
|
||||
[#5321]: https://github.com/nymtech/nym/pull/5321
|
||||
[#5320]: https://github.com/nymtech/nym/pull/5320
|
||||
[#5318]: https://github.com/nymtech/nym/pull/5318
|
||||
[#5316]: https://github.com/nymtech/nym/pull/5316
|
||||
[#5313]: https://github.com/nymtech/nym/pull/5313
|
||||
[#5298]: https://github.com/nymtech/nym/pull/5298
|
||||
[#5287]: https://github.com/nymtech/nym/pull/5287
|
||||
[#5283]: https://github.com/nymtech/nym/pull/5283
|
||||
[#5278]: https://github.com/nymtech/nym/pull/5278
|
||||
[#5260]: https://github.com/nymtech/nym/pull/5260
|
||||
[#5259]: https://github.com/nymtech/nym/pull/5259
|
||||
[#5256]: https://github.com/nymtech/nym/pull/5256
|
||||
[#5255]: https://github.com/nymtech/nym/pull/5255
|
||||
[#5251]: https://github.com/nymtech/nym/pull/5251
|
||||
[#5247]: https://github.com/nymtech/nym/pull/5247
|
||||
[#5246]: https://github.com/nymtech/nym/pull/5246
|
||||
[#5239]: https://github.com/nymtech/nym/pull/5239
|
||||
[#5236]: https://github.com/nymtech/nym/pull/5236
|
||||
[#5235]: https://github.com/nymtech/nym/pull/5235
|
||||
[#5233]: https://github.com/nymtech/nym/pull/5233
|
||||
[#5230]: https://github.com/nymtech/nym/pull/5230
|
||||
[#5218]: https://github.com/nymtech/nym/pull/5218
|
||||
[#5216]: https://github.com/nymtech/nym/pull/5216
|
||||
[#5213]: https://github.com/nymtech/nym/pull/5213
|
||||
[#5210]: https://github.com/nymtech/nym/pull/5210
|
||||
[#5208]: https://github.com/nymtech/nym/pull/5208
|
||||
[#5207]: https://github.com/nymtech/nym/pull/5207
|
||||
[#5204]: https://github.com/nymtech/nym/pull/5204
|
||||
[#5201]: https://github.com/nymtech/nym/pull/5201
|
||||
[#5184]: https://github.com/nymtech/nym/pull/5184
|
||||
[#5180]: https://github.com/nymtech/nym/pull/5180
|
||||
[#5174]: https://github.com/nymtech/nym/pull/5174
|
||||
[#5171]: https://github.com/nymtech/nym/pull/5171
|
||||
[#4813]: https://github.com/nymtech/nym/pull/4813
|
||||
|
||||
## [2024.14-crunch-patched] (2024-12-17)
|
||||
|
||||
- Fixes an issue to allow previously registred clients to connect to latest nym-nodes
|
||||
- Fixes compatibility issues between nym-nodes and older clients
|
||||
|
||||
## [2024.14-crunch] (2024-12-11)
|
||||
|
||||
- Merge/release/2024.14-crunch ([#5242])
|
||||
|
||||
Generated
+1325
-686
File diff suppressed because it is too large
Load Diff
+49
-51
@@ -118,8 +118,8 @@ members = [
|
||||
"nym-credential-proxy/nym-credential-proxy",
|
||||
"nym-credential-proxy/nym-credential-proxy-requests",
|
||||
"nym-credential-proxy/vpn-api-lib-wasm",
|
||||
"nym-data-observatory",
|
||||
"nym-network-monitor",
|
||||
"nyx-chain-watcher",
|
||||
"nym-node",
|
||||
"nym-node/nym-node-requests",
|
||||
"nym-node/nym-node-metrics",
|
||||
@@ -147,7 +147,9 @@ members = [
|
||||
"tools/internal/contract-state-importer/importer-cli",
|
||||
"tools/internal/contract-state-importer/importer-contract",
|
||||
"tools/internal/testnet-manager",
|
||||
"tools/internal/testnet-manager/dkg-bypass-contract", "common/verloc", "tools/internal/mixnet-connectivity-check",
|
||||
"tools/internal/testnet-manager/dkg-bypass-contract",
|
||||
"common/verloc",
|
||||
"tools/internal/mixnet-connectivity-check",
|
||||
]
|
||||
|
||||
default-members = [
|
||||
@@ -156,11 +158,11 @@ default-members = [
|
||||
"explorer-api",
|
||||
"nym-api",
|
||||
"nym-credential-proxy/nym-credential-proxy",
|
||||
"nym-data-observatory",
|
||||
"nym-node",
|
||||
"nym-node-status-api/nym-node-status-agent",
|
||||
"nym-node-status-api/nym-node-status-api",
|
||||
"nym-validator-rewarder",
|
||||
"nyx-chain-watcher",
|
||||
"service-providers/authenticator",
|
||||
"service-providers/ip-packet-router",
|
||||
"service-providers/network-requester",
|
||||
@@ -191,9 +193,10 @@ aes = "0.8.1"
|
||||
aes-gcm = "0.10.1"
|
||||
aes-gcm-siv = "0.11.1"
|
||||
aead = "0.5.2"
|
||||
anyhow = "1.0.90"
|
||||
anyhow = "1.0.95"
|
||||
arc-swap = "1.7.1"
|
||||
argon2 = "0.5.0"
|
||||
async-trait = "0.1.83"
|
||||
async-trait = "0.1.85"
|
||||
axum-client-ip = "0.6.1"
|
||||
axum = "0.7.5"
|
||||
axum-extra = "0.9.4"
|
||||
@@ -202,7 +205,7 @@ bincode = "1.3.3"
|
||||
bip39 = { version = "2.0.0", features = ["zeroize"] }
|
||||
bit-vec = "0.7.0" # can we unify those?
|
||||
bitvec = "1.0.0"
|
||||
blake3 = "1.5.4"
|
||||
blake3 = "1.5.5"
|
||||
bloomfilter = "1.0.14"
|
||||
bs58 = "0.5.1"
|
||||
bytecodec = "0.4.15"
|
||||
@@ -212,20 +215,20 @@ celes = "2.4.0"
|
||||
cfg-if = "1.0.0"
|
||||
chacha20 = "0.9.0"
|
||||
chacha20poly1305 = "0.10.1"
|
||||
chrono = "0.4.31"
|
||||
chrono = "0.4.39"
|
||||
cipher = "0.4.3"
|
||||
clap = "4.5.20"
|
||||
clap = "4.5.26"
|
||||
clap_complete = "4.5"
|
||||
clap_complete_fig = "4.5"
|
||||
colored = "2.0"
|
||||
comfy-table = "7.1.1"
|
||||
console = "0.15.8"
|
||||
comfy-table = "7.1.3"
|
||||
console = "0.15.10"
|
||||
console-subscriber = "0.1.1"
|
||||
console_error_panic_hook = "0.1"
|
||||
const-str = "0.5.6"
|
||||
const_format = "0.2.33"
|
||||
criterion = "0.4"
|
||||
csv = "1.3.0"
|
||||
const_format = "0.2.34"
|
||||
criterion = "0.5"
|
||||
csv = "1.3.1"
|
||||
ctr = "0.9.1"
|
||||
cupid = "0.6.1"
|
||||
curve25519-dalek = "4.1"
|
||||
@@ -242,8 +245,8 @@ etherparse = "0.13.0"
|
||||
envy = "0.4"
|
||||
eyre = "0.6.9"
|
||||
fastrand = "2.1.1"
|
||||
flate2 = "1.0.34"
|
||||
futures = "0.3.28"
|
||||
flate2 = "1.0.35"
|
||||
futures = "0.3.31"
|
||||
futures-util = "0.3"
|
||||
generic-array = "0.14.7"
|
||||
getrandom = "0.2.10"
|
||||
@@ -262,7 +265,7 @@ humantime-serde = "1.1.1"
|
||||
human-repr = "1.1.0"
|
||||
hyper = "1.4.1"
|
||||
hyper-util = "0.1"
|
||||
indicatif = "0.17.8"
|
||||
indicatif = "0.17.9"
|
||||
inquire = "0.6.2"
|
||||
ip_network = "0.4.1"
|
||||
ipnetwork = "0.20"
|
||||
@@ -287,7 +290,7 @@ parking_lot = "0.12.3"
|
||||
pem = "0.8"
|
||||
petgraph = "0.6.5"
|
||||
pin-project = "1.1"
|
||||
pin-project-lite = "0.2.14"
|
||||
pin-project-lite = "0.2.16"
|
||||
pretty_env_logger = "0.4.0"
|
||||
publicsuffix = "2.2.3"
|
||||
quote = "1"
|
||||
@@ -305,11 +308,11 @@ rocket_cors = "0.6.0"
|
||||
rocket_okapi = "0.8.0"
|
||||
safer-ffi = "0.1.13"
|
||||
schemars = "0.8.21"
|
||||
semver = "1.0.23"
|
||||
serde = "1.0.211"
|
||||
semver = "1.0.24"
|
||||
serde = "1.0.217"
|
||||
serde_bytes = "0.11.15"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0.132"
|
||||
serde_json = "1.0.135"
|
||||
serde_json_path = "0.7.1"
|
||||
serde_repr = "0.1"
|
||||
serde_with = "3.9.0"
|
||||
@@ -322,36 +325,36 @@ strum = "0.26"
|
||||
strum_macros = "0.26"
|
||||
subtle-encoding = "0.5"
|
||||
syn = "1"
|
||||
sysinfo = "0.30.13"
|
||||
sysinfo = "0.33.0"
|
||||
tap = "1.0.1"
|
||||
tar = "0.4.42"
|
||||
tempfile = "3.14"
|
||||
tar = "0.4.43"
|
||||
tempfile = "3.15"
|
||||
thiserror = "1.0.64"
|
||||
time = "0.3.30"
|
||||
time = "0.3.37"
|
||||
tokio = "1.39"
|
||||
tokio-stream = "0.1.16"
|
||||
tokio-stream = "0.1.17"
|
||||
tokio-test = "0.4.4"
|
||||
tokio-tun = "0.11.5"
|
||||
tokio-tungstenite = { version = "0.20.1" }
|
||||
tokio-util = "0.7.12"
|
||||
toml = "0.8.14"
|
||||
tokio-util = "0.7.13"
|
||||
toml = "0.8.19"
|
||||
tower = "0.4.13"
|
||||
tower-http = "0.5.2"
|
||||
tracing = "0.1.37"
|
||||
tracing = "0.1.41"
|
||||
tracing-opentelemetry = "0.19.0"
|
||||
tracing-subscriber = "0.3.16"
|
||||
tracing-subscriber = "0.3.19"
|
||||
tracing-tree = "0.2.2"
|
||||
tracing-log = "0.2"
|
||||
ts-rs = "10.0.0"
|
||||
ts-rs = "10.1.0"
|
||||
tungstenite = { version = "0.20.1", default-features = false }
|
||||
url = "2.5"
|
||||
utoipa = "4.2"
|
||||
utoipa-swagger-ui = "7.1"
|
||||
utoipauto = "0.1"
|
||||
utoipa = "5.2"
|
||||
utoipa-swagger-ui = "8.0"
|
||||
utoipauto = "0.2"
|
||||
uuid = "*"
|
||||
vergen = { version = "=8.3.1", default-features = false }
|
||||
walkdir = "2"
|
||||
wasm-bindgen-test = "0.3.43"
|
||||
wasm-bindgen-test = "0.3.49"
|
||||
x25519-dalek = "2.0.0"
|
||||
zeroize = "1.6.0"
|
||||
|
||||
@@ -384,29 +387,24 @@ cw-controllers = { version = "=1.1.0" }
|
||||
# cosmrs-related
|
||||
bip32 = { version = "0.5.2", default-features = false }
|
||||
|
||||
# temporarily using a fork again (yay.) because we need staking and slashing support (which are already on main but not released)
|
||||
# plus response message parsing (which is, as of the time of writing this message, waiting to get merged)
|
||||
#cosmrs = { path = "../cosmos-rust-fork/cosmos-rust/cosmrs" }
|
||||
cosmrs = { git = "https://github.com/cosmos/cosmos-rust", rev = "4b1332e6d8258ac845cef71589c8d362a669675a" } # unfortuntely we need a fork by yours truly to get the staking support
|
||||
tendermint = "0.37.0" # same version as used by cosmrs
|
||||
tendermint-rpc = "0.37.0" # same version as used by cosmrs
|
||||
prost = { version = "0.12", default-features = false }
|
||||
|
||||
cosmrs = { version = "0.21.0" }
|
||||
tendermint = "0.40.0"
|
||||
tendermint-rpc = "0.40.0"
|
||||
prost = { version = "0.13", default-features = false }
|
||||
|
||||
# wasm-related dependencies
|
||||
gloo-utils = "0.2.0"
|
||||
gloo-net = "0.5.0"
|
||||
gloo-net = "0.6.0"
|
||||
|
||||
# use a separate branch due to feature unification failures
|
||||
# this is blocked until the upstream removes outdates `wasm_bindgen` feature usage
|
||||
# indexed_db_futures = "0.4.1"
|
||||
indexed_db_futures = { git = "https://github.com/TiemenSch/rust-indexed-db", branch = "update-uuid" }
|
||||
js-sys = "0.3.70"
|
||||
indexed_db_futures = "0.6.0"
|
||||
js-sys = "0.3.76"
|
||||
serde-wasm-bindgen = "0.6.5"
|
||||
tsify = "0.4.5"
|
||||
wasm-bindgen = "0.2.95"
|
||||
wasm-bindgen-futures = "0.4.45"
|
||||
wasmtimer = "0.2.0"
|
||||
web-sys = "0.3.72"
|
||||
wasm-bindgen = "0.2.99"
|
||||
wasm-bindgen-futures = "0.4.49"
|
||||
wasmtimer = "0.4.1"
|
||||
web-sys = "0.3.76"
|
||||
|
||||
# Profile settings for individual crates
|
||||
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
Boost Software License - Version 1.0 - August 17th, 2003
|
||||
|
||||
Permission is hereby granted, free of charge, to any person or organization
|
||||
obtaining a copy of the software and accompanying documentation covered by
|
||||
this license (the "Software") to use, reproduce, display, distribute,
|
||||
execute, and transmit the Software, and to prepare derivative works of the
|
||||
Software, and to permit third-parties to whom the Software is furnished to
|
||||
do so, all subject to the following:
|
||||
|
||||
The copyright notices in the Software and this entire statement, including
|
||||
the above license grant, this restriction and the following disclaimer,
|
||||
must be included in all copies of the Software, in whole or in part, and
|
||||
all derivative works of the Software, unless such copies or derivative
|
||||
works are solely in the form of machine-executable object code generated by
|
||||
a source language processor.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
|
||||
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
|
||||
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
|
||||
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
DEALINGS IN THE SOFTWARE.
|
||||
@@ -14,7 +14,7 @@ The platform is composed of multiple Rust crates. Top-level executable binary cr
|
||||
* `nym-socks5-client` - a Socks5 proxy you can run on your machine and use with existing applications.
|
||||
* `nym-explorer` - a (projected) block explorer and (existing) mixnet viewer.
|
||||
* `nym-wallet` - a desktop wallet implemented using the [Tauri](https://tauri.studio/en/docs/about/intro) framework.
|
||||
* `nym-cli` - a tool for interacting with the network from the CLI.
|
||||
* `nym-cli` - a tool for interacting with the network from the CLI.
|
||||
<!-- coming soon
|
||||
* `nym-network-monitor` - sends packets through the full system to check that they are working as expected, and stores node uptime histories as the basis of a rewards system ("mixmining" or "proof-of-mixing").
|
||||
-->
|
||||
@@ -42,10 +42,10 @@ client ───► Gateway ──┘ mix │ mix ┌─►mix ───►
|
||||
|
||||
References for developers:
|
||||
|
||||
* [Dev Docs](https://nymtech.net/docs/developers)
|
||||
* [SDKs](https://nymtech.net/docs/developers/rust)
|
||||
* [Network Docs](https://nymtech.net/docs/network)
|
||||
* [Release Cycle - git flow](https://nymtech.net/docs/operators/release-cycle)
|
||||
* [Dev Docs](https://nym.com/docs/developers)
|
||||
* [SDKs](https://nym.com/docs/developers/rust)
|
||||
* [Network Docs](https://nym.com/docs/network)
|
||||
* [Release Cycle - git flow](https://nym.com/docs/operators/release-cycle)
|
||||
|
||||
### Developer chat
|
||||
|
||||
@@ -66,4 +66,4 @@ As a general approach, licensing is as follows this pattern:
|
||||
- libraries and components are Apache 2.0 or MIT
|
||||
- documentation is Apache 2.0 or CC0-1.0
|
||||
|
||||
Nym Node Operators and Validators Temrs and Conditions can be found [here](https://nymtech.net/terms-and-conditions/operators/v1.0.0).
|
||||
Nym Node Operators and Validators Temrs and Conditions can be found [here](https://nym.com/terms-and-conditions/operators/v1.0.0).
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nym-client"
|
||||
version = "1.1.45"
|
||||
version = "1.1.47"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
|
||||
description = "Implementation of the Nym Client"
|
||||
edition = "2021"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nym-socks5-client"
|
||||
version = "1.1.45"
|
||||
version = "1.1.47"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
|
||||
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
|
||||
edition = "2021"
|
||||
|
||||
@@ -28,7 +28,7 @@ pub type HmacSha256 = Hmac<Sha256>;
|
||||
pub type Nonce = u64;
|
||||
pub type Taken = Option<SystemTime>;
|
||||
|
||||
pub const BANDWIDTH_CAP_PER_DAY: u64 = 1024 * 1024 * 1024; // 1 GB
|
||||
pub const BANDWIDTH_CAP_PER_DAY: u64 = 250 * 1024 * 1024 * 1024; // 250 GB
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct IpPair {
|
||||
|
||||
@@ -3,7 +3,7 @@ name = "nym-client-core"
|
||||
version = "1.1.15"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
|
||||
edition = "2021"
|
||||
rust-version = "1.70"
|
||||
rust-version = "1.76"
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
@@ -45,7 +45,7 @@ nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
|
||||
nym-sphinx = { path = "../nymsphinx" }
|
||||
nym-statistics-common = { path = "../statistics" }
|
||||
nym-pemstore = { path = "../pemstore" }
|
||||
nym-topology = { path = "../topology", features = ["serializable"] }
|
||||
nym-topology = { path = "../topology", features = ["persistence"] }
|
||||
nym-mixnet-client = { path = "../client-libs/mixnet-client", default-features = false }
|
||||
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
|
||||
nym-task = { path = "../task" }
|
||||
|
||||
@@ -517,7 +517,7 @@ impl Default for Acknowledgements {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
#[serde(default)]
|
||||
pub struct Topology {
|
||||
/// The uniform delay every which clients are querying the directory server
|
||||
/// to try to obtain a compatible network topology to send sphinx packets through.
|
||||
@@ -550,6 +550,18 @@ pub struct Topology {
|
||||
/// Specifies a minimum performance of a gateway that is used on route construction.
|
||||
/// This setting is only applicable when `NymApi` topology is used.
|
||||
pub minimum_gateway_performance: u8,
|
||||
|
||||
/// Specifies whether this client should attempt to retrieve all available network nodes
|
||||
/// as opposed to just active mixnodes/gateways.
|
||||
pub use_extended_topology: bool,
|
||||
|
||||
/// Specifies whether this client should ignore the current epoch role of the target egress node
|
||||
/// when constructing the final hop packets.
|
||||
pub ignore_egress_epoch_role: bool,
|
||||
|
||||
/// Specifies whether this client should ignore the current epoch role of the ingress node
|
||||
/// when attempting to establish new connection
|
||||
pub ignore_ingress_epoch_role: bool,
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
@@ -586,6 +598,10 @@ impl Default for Topology {
|
||||
topology_structure: TopologyStructure::default(),
|
||||
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
|
||||
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
|
||||
use_extended_topology: false,
|
||||
|
||||
ignore_egress_epoch_role: true,
|
||||
ignore_ingress_epoch_role: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,10 @@ use crate::{
|
||||
},
|
||||
};
|
||||
use log::{debug, error};
|
||||
use sqlx::ConnectOptions;
|
||||
use sqlx::{
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
ConnectOptions,
|
||||
};
|
||||
use std::path::Path;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -30,6 +33,9 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path)
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
@@ -110,7 +116,7 @@ impl StorageManager {
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
|
||||
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
|
||||
VALUES (?, ?, ?)
|
||||
"#,
|
||||
registered_gateway.gateway_id_bs58,
|
||||
@@ -224,7 +230,7 @@ impl StorageManager {
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
|
||||
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
|
||||
VALUES (?, ?)
|
||||
"#,
|
||||
custom.gateway_id_bs58,
|
||||
|
||||
@@ -112,14 +112,15 @@ where
|
||||
source,
|
||||
}
|
||||
})?;
|
||||
hardcoded_topology.get_gateways()
|
||||
hardcoded_topology.entry_capable_nodes().cloned().collect()
|
||||
} else {
|
||||
let mut rng = rand::thread_rng();
|
||||
crate::init::helpers::current_gateways(
|
||||
crate::init::helpers::gateways_for_init(
|
||||
&mut rng,
|
||||
&core.client.nym_api_urls,
|
||||
user_agent,
|
||||
core.debug.topology.minimum_gateway_performance,
|
||||
core.debug.topology.ignore_ingress_epoch_role,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
@@ -128,7 +129,7 @@ where
|
||||
// make sure the list of available gateways doesn't overlap the list of known gateways
|
||||
let available_gateways = available_gateways
|
||||
.into_iter()
|
||||
.filter(|g| !registered_gateways.contains(g.identity()))
|
||||
.filter(|g| !registered_gateways.contains(&g.identity()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if available_gateways.is_empty() {
|
||||
|
||||
@@ -167,14 +167,15 @@ where
|
||||
source,
|
||||
}
|
||||
})?;
|
||||
hardcoded_topology.get_gateways()
|
||||
hardcoded_topology.entry_capable_nodes().cloned().collect()
|
||||
} else {
|
||||
let mut rng = rand::thread_rng();
|
||||
crate::init::helpers::current_gateways(
|
||||
crate::init::helpers::gateways_for_init(
|
||||
&mut rng,
|
||||
&core.client.nym_api_urls,
|
||||
user_agent,
|
||||
core.debug.topology.minimum_gateway_performance,
|
||||
core.debug.topology.ignore_ingress_epoch_role,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
use super::received_buffer::ReceivedBufferMessage;
|
||||
use super::statistics_control::StatisticsControl;
|
||||
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
|
||||
use crate::client::base_client::storage::helpers::store_client_keys;
|
||||
use crate::client::base_client::storage::MixnetClientStorage;
|
||||
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
|
||||
@@ -24,7 +23,7 @@ use crate::client::replies::reply_storage::{
|
||||
};
|
||||
use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
|
||||
use crate::client::topology_control::{
|
||||
nym_api_provider, TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
|
||||
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
|
||||
};
|
||||
use crate::config::{Config, DebugConfig};
|
||||
use crate::error::ClientCoreError;
|
||||
@@ -32,7 +31,7 @@ use crate::init::{
|
||||
setup_gateway,
|
||||
types::{GatewaySetup, InitialisationResult},
|
||||
};
|
||||
use crate::{config, spawn_future};
|
||||
use crate::{config, spawn_future, ForgetMe};
|
||||
use futures::channel::mpsc;
|
||||
use log::*;
|
||||
use nym_bandwidth_controller::BandwidthController;
|
||||
@@ -188,6 +187,11 @@ pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
|
||||
user_agent: Option<UserAgent>,
|
||||
|
||||
setup_method: GatewaySetup,
|
||||
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
|
||||
forget_me: ForgetMe,
|
||||
}
|
||||
|
||||
impl<'a, C, S> BaseClientBuilder<'a, C, S>
|
||||
@@ -210,9 +214,18 @@ where
|
||||
shutdown: None,
|
||||
user_agent: None,
|
||||
setup_method: GatewaySetup::MustLoad { gateway_id: None },
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
forget_me: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
|
||||
self.forget_me = forget_me.clone();
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_gateway_setup(mut self, setup: GatewaySetup) -> Self {
|
||||
self.setup_method = setup;
|
||||
@@ -261,6 +274,15 @@ where
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub fn with_connection_fd_callback(
|
||||
mut self,
|
||||
callback: Arc<dyn Fn(RawFd) + Send + Sync>,
|
||||
) -> Self {
|
||||
self.connection_fd_callback = Some(callback);
|
||||
self
|
||||
}
|
||||
|
||||
// note: do **NOT** make this method public as its only valid usage is from within `start_base`
|
||||
// because it relies on the crypto keys being already loaded
|
||||
fn mix_address(details: &InitialisationResult) -> Recipient {
|
||||
@@ -352,6 +374,7 @@ where
|
||||
controller.start_with_shutdown(shutdown)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn start_gateway_client(
|
||||
config: &Config,
|
||||
initialisation_result: InitialisationResult,
|
||||
@@ -359,6 +382,7 @@ where
|
||||
details_store: &S::GatewaysDetailsStore,
|
||||
packet_router: PacketRouter,
|
||||
stats_reporter: ClientStatsSender,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
shutdown: TaskClient,
|
||||
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
|
||||
where
|
||||
@@ -401,6 +425,8 @@ where
|
||||
packet_router,
|
||||
bandwidth_controller,
|
||||
stats_reporter,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
shutdown,
|
||||
)
|
||||
};
|
||||
@@ -437,8 +463,8 @@ where
|
||||
details_store
|
||||
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
|
||||
.await.map_err(|err| {
|
||||
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
|
||||
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
|
||||
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
|
||||
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
|
||||
})?
|
||||
}
|
||||
|
||||
@@ -446,6 +472,7 @@ where
|
||||
.claim_initial_bandwidth()
|
||||
.await
|
||||
.map_err(gateway_failure)?;
|
||||
|
||||
gateway_client
|
||||
.start_listening_for_mixnet_messages()
|
||||
.map_err(gateway_failure)?;
|
||||
@@ -462,6 +489,7 @@ where
|
||||
details_store: &S::GatewaysDetailsStore,
|
||||
packet_router: PacketRouter,
|
||||
stats_reporter: ClientStatsSender,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
mut shutdown: TaskClient,
|
||||
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
|
||||
where
|
||||
@@ -493,6 +521,8 @@ where
|
||||
details_store,
|
||||
packet_router,
|
||||
stats_reporter,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
shutdown,
|
||||
)
|
||||
.await?;
|
||||
@@ -509,15 +539,15 @@ where
|
||||
// if no custom provider was ... provided ..., create one using nym-api
|
||||
custom_provider.unwrap_or_else(|| match config_topology.topology_structure {
|
||||
config::TopologyStructure::NymApi => Box::new(NymApiTopologyProvider::new(
|
||||
nym_api_provider::Config {
|
||||
min_mixnode_performance: config_topology.minimum_mixnode_performance,
|
||||
min_gateway_performance: config_topology.minimum_gateway_performance,
|
||||
},
|
||||
config_topology,
|
||||
nym_api_urls,
|
||||
user_agent,
|
||||
)),
|
||||
config::TopologyStructure::GeoAware(group_by) => {
|
||||
Box::new(GeoAwareTopologyProvider::new(nym_api_urls, group_by))
|
||||
warn!("using deprecated 'GeoAware' topology provider - this option will be removed very soon");
|
||||
|
||||
#[allow(deprecated)]
|
||||
Box::new(crate::client::topology_control::GeoAwareTopologyProvider::new(nym_api_urls, group_by))
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -528,7 +558,7 @@ where
|
||||
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
|
||||
topology_config: config::Topology,
|
||||
topology_accessor: TopologyAccessor,
|
||||
local_gateway: &NodeIdentity,
|
||||
local_gateway: NodeIdentity,
|
||||
wait_for_gateway: bool,
|
||||
mut shutdown: TaskClient,
|
||||
) -> Result<(), ClientCoreError> {
|
||||
@@ -560,7 +590,7 @@ where
|
||||
};
|
||||
|
||||
if let Err(err) = topology_refresher
|
||||
.ensure_contains_gateway(local_gateway)
|
||||
.ensure_contains_routable_egress(local_gateway)
|
||||
.await
|
||||
{
|
||||
if let Some(waiting_timeout) = gateway_wait_timeout {
|
||||
@@ -615,9 +645,11 @@ where
|
||||
fn start_mix_traffic_controller(
|
||||
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
|
||||
shutdown: TaskClient,
|
||||
forget_me: ForgetMe,
|
||||
) -> BatchMixMessageSender {
|
||||
info!("Starting mix traffic controller...");
|
||||
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_transceiver);
|
||||
let (mix_traffic_controller, mix_tx) =
|
||||
MixTrafficController::new(gateway_transceiver, forget_me);
|
||||
mix_traffic_controller.start_with_shutdown(shutdown);
|
||||
mix_tx
|
||||
}
|
||||
@@ -708,7 +740,8 @@ where
|
||||
|
||||
// channels responsible for controlling ack messages
|
||||
let (ack_sender, ack_receiver) = mpsc::unbounded();
|
||||
let shared_topology_accessor = TopologyAccessor::new();
|
||||
let shared_topology_accessor =
|
||||
TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
|
||||
|
||||
// Shutdown notifier for signalling tasks to stop
|
||||
let shutdown = self
|
||||
@@ -772,6 +805,8 @@ where
|
||||
&details_store,
|
||||
gateway_packet_router,
|
||||
stats_reporter.clone(),
|
||||
#[cfg(unix)]
|
||||
self.connection_fd_callback,
|
||||
shutdown.fork("gateway_transceiver"),
|
||||
)
|
||||
.await?;
|
||||
@@ -797,9 +832,11 @@ where
|
||||
// that are to be sent to the mixnet. They are used by cover traffic stream and real
|
||||
// traffic stream.
|
||||
// The MixTrafficController then sends the actual traffic
|
||||
|
||||
let message_sender = Self::start_mix_traffic_controller(
|
||||
gateway_transceiver,
|
||||
shutdown.fork("mix_traffic_controller"),
|
||||
self.forget_me,
|
||||
);
|
||||
|
||||
// Channels that the websocket listener can use to signal downstream to the real traffic
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
// TODO: combine those more closely. Perhaps into a single underlying store.
|
||||
// Like for persistent, on-disk, storage, what's the point of having 3 different databases?
|
||||
|
||||
use rand::rngs::OsRng;
|
||||
|
||||
use crate::client::key_manager::persistence::{InMemEphemeralKeys, KeyStore};
|
||||
use crate::client::replies::reply_storage;
|
||||
use crate::client::replies::reply_storage::ReplyStorageBackend;
|
||||
@@ -63,7 +65,6 @@ pub trait MixnetClientStorage {
|
||||
fn gateway_details_store(&self) -> &Self::GatewaysDetailsStore;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Ephemeral {
|
||||
key_store: InMemEphemeralKeys,
|
||||
reply_store: reply_storage::Empty,
|
||||
@@ -71,9 +72,14 @@ pub struct Ephemeral {
|
||||
gateway_details_store: InMemGatewaysDetails,
|
||||
}
|
||||
|
||||
impl Ephemeral {
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
impl Default for Ephemeral {
|
||||
fn default() -> Self {
|
||||
Ephemeral {
|
||||
key_store: InMemEphemeralKeys::new(&mut OsRng),
|
||||
reply_store: Default::default(),
|
||||
credential_store: Default::default(),
|
||||
gateway_details_store: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -163,6 +163,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
// poisson delay, but is it really a problem?
|
||||
let topology_permit = self.topology_access.get_read_permit().await;
|
||||
// the ack is sent back to ourselves (and then ignored)
|
||||
|
||||
let topology_ref = match topology_permit.try_get_valid_topology_ref(
|
||||
&self.our_full_destination,
|
||||
Some(&self.our_full_destination),
|
||||
|
||||
@@ -28,7 +28,6 @@ pub enum InputMessage {
|
||||
recipient: Recipient,
|
||||
data: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
mix_hops: Option<u8>,
|
||||
},
|
||||
|
||||
/// Creates a message used for a duplex anonymous communication where the recipient
|
||||
@@ -44,7 +43,6 @@ pub enum InputMessage {
|
||||
data: Vec<u8>,
|
||||
reply_surbs: u32,
|
||||
lane: TransmissionLane,
|
||||
mix_hops: Option<u8>,
|
||||
},
|
||||
|
||||
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
|
||||
@@ -94,29 +92,6 @@ impl InputMessage {
|
||||
recipient,
|
||||
data,
|
||||
lane,
|
||||
mix_hops: None,
|
||||
};
|
||||
if let Some(packet_type) = packet_type {
|
||||
InputMessage::new_wrapper(message, packet_type)
|
||||
} else {
|
||||
message
|
||||
}
|
||||
}
|
||||
|
||||
// IMHO `new_regular` should take `mix_hops: Option<u8>` as an argument instead of creating
|
||||
// this function, but that would potentially break backwards compatibility with the current API
|
||||
pub fn new_regular_with_custom_hops(
|
||||
recipient: Recipient,
|
||||
data: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
packet_type: Option<PacketType>,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Self {
|
||||
let message = InputMessage::Regular {
|
||||
recipient,
|
||||
data,
|
||||
lane,
|
||||
mix_hops,
|
||||
};
|
||||
if let Some(packet_type) = packet_type {
|
||||
InputMessage::new_wrapper(message, packet_type)
|
||||
@@ -137,7 +112,6 @@ impl InputMessage {
|
||||
data,
|
||||
reply_surbs,
|
||||
lane,
|
||||
mix_hops: None,
|
||||
};
|
||||
if let Some(packet_type) = packet_type {
|
||||
InputMessage::new_wrapper(message, packet_type)
|
||||
@@ -154,14 +128,12 @@ impl InputMessage {
|
||||
reply_surbs: u32,
|
||||
lane: TransmissionLane,
|
||||
packet_type: Option<PacketType>,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Self {
|
||||
let message = InputMessage::Anonymous {
|
||||
recipient,
|
||||
data,
|
||||
reply_surbs,
|
||||
lane,
|
||||
mix_hops,
|
||||
};
|
||||
if let Some(packet_type) = packet_type {
|
||||
InputMessage::new_wrapper(message, packet_type)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::client::key_manager::ClientKeys;
|
||||
use async_trait::async_trait;
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use std::error::Error;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
@@ -193,9 +194,19 @@ impl KeyStore for OnDiskKeys {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct InMemEphemeralKeys {
|
||||
keys: Mutex<Option<ClientKeys>>,
|
||||
keys: Mutex<ClientKeys>,
|
||||
}
|
||||
|
||||
impl InMemEphemeralKeys {
|
||||
pub fn new<R>(rng: &mut R) -> Self
|
||||
where
|
||||
R: RngCore + CryptoRng,
|
||||
{
|
||||
InMemEphemeralKeys {
|
||||
keys: Mutex::new(ClientKeys::generate_new(rng)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -208,11 +219,11 @@ impl KeyStore for InMemEphemeralKeys {
|
||||
type StorageError = EphemeralKeysError;
|
||||
|
||||
async fn load_keys(&self) -> Result<ClientKeys, Self::StorageError> {
|
||||
self.keys.lock().await.clone().ok_or(EphemeralKeysError)
|
||||
Ok(self.keys.lock().await.clone())
|
||||
}
|
||||
|
||||
async fn store_keys(&self, keys: &ClientKeys) -> Result<(), Self::StorageError> {
|
||||
*self.keys.lock().await = Some(keys.clone());
|
||||
*self.keys.lock().await = keys.clone();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,12 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
|
||||
use crate::spawn_future;
|
||||
use crate::error::ClientCoreError;
|
||||
use crate::{spawn_future, ForgetMe};
|
||||
use log::*;
|
||||
use nym_gateway_requests::ClientRequest;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use transceiver::ErasedGatewayError;
|
||||
|
||||
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
|
||||
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
|
||||
@@ -26,10 +29,14 @@ pub struct MixTrafficController {
|
||||
// TODO: this is temporary work-around.
|
||||
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
|
||||
consecutive_gateway_failure_count: usize,
|
||||
forget_me: ForgetMe,
|
||||
}
|
||||
|
||||
impl MixTrafficController {
|
||||
pub fn new<T>(gateway_transceiver: T) -> (MixTrafficController, BatchMixMessageSender)
|
||||
pub fn new<T>(
|
||||
gateway_transceiver: T,
|
||||
forget_me: ForgetMe,
|
||||
) -> (MixTrafficController, BatchMixMessageSender)
|
||||
where
|
||||
T: GatewayTransceiver + Send + 'static,
|
||||
{
|
||||
@@ -40,6 +47,7 @@ impl MixTrafficController {
|
||||
gateway_transceiver: Box::new(gateway_transceiver),
|
||||
mix_rx: message_receiver,
|
||||
consecutive_gateway_failure_count: 0,
|
||||
forget_me,
|
||||
},
|
||||
message_sender,
|
||||
)
|
||||
@@ -47,6 +55,7 @@ impl MixTrafficController {
|
||||
|
||||
pub fn new_dynamic(
|
||||
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
|
||||
forget_me: ForgetMe,
|
||||
) -> (MixTrafficController, BatchMixMessageSender) {
|
||||
let (message_sender, message_receiver) =
|
||||
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
|
||||
@@ -55,12 +64,16 @@ impl MixTrafficController {
|
||||
gateway_transceiver,
|
||||
mix_rx: message_receiver,
|
||||
consecutive_gateway_failure_count: 0,
|
||||
forget_me,
|
||||
},
|
||||
message_sender,
|
||||
)
|
||||
}
|
||||
|
||||
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
|
||||
async fn on_messages(
|
||||
&mut self,
|
||||
mut mix_packets: Vec<MixPacket>,
|
||||
) -> Result<(), ErasedGatewayError> {
|
||||
debug_assert!(!mix_packets.is_empty());
|
||||
|
||||
let result = if mix_packets.len() == 1 {
|
||||
@@ -72,21 +85,14 @@ impl MixTrafficController {
|
||||
.await
|
||||
};
|
||||
|
||||
match result {
|
||||
Err(err) => {
|
||||
error!("Failed to send sphinx packet(s) to the gateway: {err}");
|
||||
self.consecutive_gateway_failure_count += 1;
|
||||
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
|
||||
// todo: in the future this should initiate a 'graceful' shutdown or try
|
||||
// to reconnect?
|
||||
panic!("failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead. Can't do anything about it yet :(")
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
|
||||
self.consecutive_gateway_failure_count = 0;
|
||||
}
|
||||
if result.is_err() {
|
||||
self.consecutive_gateway_failure_count += 1;
|
||||
} else {
|
||||
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
|
||||
self.consecutive_gateway_failure_count = 0;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
|
||||
@@ -97,7 +103,26 @@ impl MixTrafficController {
|
||||
tokio::select! {
|
||||
mix_packets = self.mix_rx.recv() => match mix_packets {
|
||||
Some(mix_packets) => {
|
||||
self.on_messages(mix_packets).await;
|
||||
tokio::select! {
|
||||
ret = self.on_messages(mix_packets) => {
|
||||
if let Err(err) = ret {
|
||||
error!("Failed to send sphinx packet(s) to the gateway: {err}");
|
||||
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
|
||||
// Disconnect from the gateway. If we should try to re-connect
|
||||
// is handled at a higher layer.
|
||||
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
|
||||
// Do we need to handle the embedded mixnet client case
|
||||
// separately?
|
||||
shutdown.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
log::trace!("MixTrafficController: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {
|
||||
log::trace!("MixTrafficController: Stopping since channel closed");
|
||||
@@ -111,7 +136,27 @@ impl MixTrafficController {
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
|
||||
if self.forget_me.any() {
|
||||
log::info!("Sending forget me request to the gateway");
|
||||
match self
|
||||
.gateway_transceiver
|
||||
.send_client_request(ClientRequest::ForgetMe {
|
||||
client: self.forget_me.client(),
|
||||
stats: self.forget_me.stats(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
log::info!("Successfully sent forget me request to the gateway");
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Failed to send forget me request to the gateway: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log::debug!("MixTrafficController: Exiting");
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,10 @@ use async_trait::async_trait;
|
||||
use log::{debug, error};
|
||||
use nym_credential_storage::storage::Storage as CredentialStorage;
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_gateway_client::error::GatewayClientError;
|
||||
use nym_gateway_client::GatewayClient;
|
||||
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
|
||||
use nym_gateway_requests::ClientRequest;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
|
||||
use std::fmt::Debug;
|
||||
@@ -26,9 +28,14 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
|
||||
}
|
||||
|
||||
/// This combines combines the functionalities of being able to send and receive mix packets.
|
||||
#[async_trait]
|
||||
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
|
||||
fn gateway_identity(&self) -> identity::PublicKey;
|
||||
fn ws_fd(&self) -> Option<RawFd>;
|
||||
async fn send_client_request(
|
||||
&mut self,
|
||||
message: ClientRequest,
|
||||
) -> Result<(), GatewayClientError>;
|
||||
}
|
||||
|
||||
/// This trait defines the functionality of sending `MixPacket` into the mixnet,
|
||||
@@ -65,6 +72,7 @@ pub trait GatewayReceiver {
|
||||
}
|
||||
|
||||
// to allow for dynamic dispatch
|
||||
#[async_trait]
|
||||
impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
|
||||
#[inline]
|
||||
fn gateway_identity(&self) -> identity::PublicKey {
|
||||
@@ -73,6 +81,13 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
|
||||
fn ws_fd(&self) -> Option<RawFd> {
|
||||
(**self).ws_fd()
|
||||
}
|
||||
|
||||
async fn send_client_request(
|
||||
&mut self,
|
||||
message: ClientRequest,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
(**self).send_client_request(message).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
@@ -91,7 +106,6 @@ impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
|
||||
(**self).batch_send_mix_packets(packets).await
|
||||
}
|
||||
}
|
||||
|
||||
impl<G: GatewayReceiver + ?Sized> GatewayReceiver for Box<G> {
|
||||
#[inline]
|
||||
fn set_packet_router(&mut self, packet_router: PacketRouter) -> Result<(), ErasedGatewayError> {
|
||||
@@ -111,6 +125,7 @@ impl<C, St> RemoteGateway<C, St> {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<C, St> GatewayTransceiver for RemoteGateway<C, St>
|
||||
where
|
||||
C: DkgQueryClient + Send + Sync,
|
||||
@@ -123,6 +138,20 @@ where
|
||||
fn ws_fd(&self) -> Option<RawFd> {
|
||||
self.gateway_client.ws_fd()
|
||||
}
|
||||
|
||||
async fn send_client_request(
|
||||
&mut self,
|
||||
message: ClientRequest,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
if let Some(shared_key) = self.gateway_client.shared_key() {
|
||||
self.gateway_client
|
||||
.send_websocket_message(message.encrypt(&*shared_key)?)
|
||||
.await?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(GatewayClientError::ConnectionInInvalidState)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
@@ -195,6 +224,7 @@ impl LocalGateway {
|
||||
mod nonwasm_sealed {
|
||||
use super::*;
|
||||
|
||||
#[async_trait]
|
||||
impl GatewayTransceiver for LocalGateway {
|
||||
fn gateway_identity(&self) -> identity::PublicKey {
|
||||
self.local_identity
|
||||
@@ -202,6 +232,13 @@ mod nonwasm_sealed {
|
||||
fn ws_fd(&self) -> Option<RawFd> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn send_client_request(
|
||||
&mut self,
|
||||
_message: ClientRequest,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -269,6 +306,7 @@ impl GatewaySender for MockGateway {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl GatewayTransceiver for MockGateway {
|
||||
fn gateway_identity(&self) -> identity::PublicKey {
|
||||
self.dummy_identity
|
||||
@@ -276,4 +314,11 @@ impl GatewayTransceiver for MockGateway {
|
||||
fn ws_fd(&self) -> Option<RawFd> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn send_client_request(
|
||||
&mut self,
|
||||
_message: ClientRequest,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
+8
-35
@@ -73,11 +73,10 @@ where
|
||||
content: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
mix_hops: Option<u8>,
|
||||
) {
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_plain_message(recipient, content, lane, packet_type, mix_hops)
|
||||
.try_send_plain_message(recipient, content, lane, packet_type)
|
||||
.await
|
||||
{
|
||||
warn!("failed to send a plain message - {err}")
|
||||
@@ -91,18 +90,10 @@ where
|
||||
reply_surbs: u32,
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
mix_hops: Option<u8>,
|
||||
) {
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_message_with_reply_surbs(
|
||||
recipient,
|
||||
content,
|
||||
reply_surbs,
|
||||
lane,
|
||||
packet_type,
|
||||
mix_hops,
|
||||
)
|
||||
.try_send_message_with_reply_surbs(recipient, content, reply_surbs, lane, packet_type)
|
||||
.await
|
||||
{
|
||||
warn!("failed to send a repliable message - {err}")
|
||||
@@ -115,9 +106,8 @@ where
|
||||
recipient,
|
||||
data,
|
||||
lane,
|
||||
mix_hops,
|
||||
} => {
|
||||
self.handle_plain_message(recipient, data, lane, PacketType::Mix, mix_hops)
|
||||
self.handle_plain_message(recipient, data, lane, PacketType::Mix)
|
||||
.await
|
||||
}
|
||||
InputMessage::Anonymous {
|
||||
@@ -125,17 +115,9 @@ where
|
||||
data,
|
||||
reply_surbs,
|
||||
lane,
|
||||
mix_hops,
|
||||
} => {
|
||||
self.handle_repliable_message(
|
||||
recipient,
|
||||
data,
|
||||
reply_surbs,
|
||||
lane,
|
||||
PacketType::Mix,
|
||||
mix_hops,
|
||||
)
|
||||
.await
|
||||
self.handle_repliable_message(recipient, data, reply_surbs, lane, PacketType::Mix)
|
||||
.await
|
||||
}
|
||||
InputMessage::Reply {
|
||||
recipient_tag,
|
||||
@@ -153,9 +135,8 @@ where
|
||||
recipient,
|
||||
data,
|
||||
lane,
|
||||
mix_hops,
|
||||
} => {
|
||||
self.handle_plain_message(recipient, data, lane, packet_type, mix_hops)
|
||||
self.handle_plain_message(recipient, data, lane, packet_type)
|
||||
.await
|
||||
}
|
||||
InputMessage::Anonymous {
|
||||
@@ -163,17 +144,9 @@ where
|
||||
data,
|
||||
reply_surbs,
|
||||
lane,
|
||||
mix_hops,
|
||||
} => {
|
||||
self.handle_repliable_message(
|
||||
recipient,
|
||||
data,
|
||||
reply_surbs,
|
||||
lane,
|
||||
packet_type,
|
||||
mix_hops,
|
||||
)
|
||||
.await
|
||||
self.handle_repliable_message(recipient, data, reply_surbs, lane, packet_type)
|
||||
.await
|
||||
}
|
||||
InputMessage::Reply {
|
||||
recipient_tag,
|
||||
|
||||
@@ -70,7 +70,6 @@ pub(crate) struct PendingAcknowledgement {
|
||||
message_chunk: Fragment,
|
||||
delay: SphinxDelay,
|
||||
destination: PacketDestination,
|
||||
mix_hops: Option<u8>,
|
||||
retransmissions: u32,
|
||||
}
|
||||
|
||||
@@ -80,13 +79,11 @@ impl PendingAcknowledgement {
|
||||
message_chunk: Fragment,
|
||||
delay: SphinxDelay,
|
||||
recipient: Recipient,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Self {
|
||||
PendingAcknowledgement {
|
||||
message_chunk,
|
||||
delay,
|
||||
destination: PacketDestination::KnownRecipient(recipient.into()),
|
||||
mix_hops,
|
||||
retransmissions: 0,
|
||||
}
|
||||
}
|
||||
@@ -104,9 +101,6 @@ impl PendingAcknowledgement {
|
||||
recipient_tag,
|
||||
extra_surb_request,
|
||||
},
|
||||
// Messages sent using SURBs are using the number of mix hops set by the recipient when
|
||||
// they provided the SURBs, so it doesn't make sense to include it here.
|
||||
mix_hops: None,
|
||||
retransmissions: 0,
|
||||
}
|
||||
}
|
||||
|
||||
+1
-8
@@ -52,18 +52,12 @@ where
|
||||
packet_recipient: Recipient,
|
||||
chunk_data: Fragment,
|
||||
packet_type: PacketType,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Result<PreparedFragment, PreparationError> {
|
||||
debug!("retransmitting normal packet...");
|
||||
|
||||
// TODO: Figure out retransmission packet type signaling
|
||||
self.message_handler
|
||||
.try_prepare_single_chunk_for_sending(
|
||||
packet_recipient,
|
||||
chunk_data,
|
||||
packet_type,
|
||||
mix_hops,
|
||||
)
|
||||
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -110,7 +104,6 @@ where
|
||||
**recipient,
|
||||
timed_out_ack.message_chunk.clone(),
|
||||
packet_type,
|
||||
timed_out_ack.mix_hops,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -15,11 +15,11 @@ use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessa
|
||||
use nym_sphinx::anonymous_replies::{ReplySurb, SurbEncryptionKey};
|
||||
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
|
||||
use nym_sphinx::message::NymMessage;
|
||||
use nym_sphinx::params::{PacketSize, PacketType, DEFAULT_NUM_MIX_HOPS};
|
||||
use nym_sphinx::params::{PacketSize, PacketType};
|
||||
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
|
||||
use nym_sphinx::Delay;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use nym_topology::{NymTopology, NymTopologyError};
|
||||
use nym_topology::{NymRouteProvider, NymTopologyError};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
@@ -100,10 +100,6 @@ pub(crate) struct Config {
|
||||
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
|
||||
average_ack_delay: Duration,
|
||||
|
||||
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
|
||||
/// Note that it does not include gateway hops.
|
||||
num_mix_hops: u8,
|
||||
|
||||
/// Primary predefined packet size used for the encapsulated messages.
|
||||
primary_packet_size: PacketSize,
|
||||
|
||||
@@ -125,19 +121,11 @@ impl Config {
|
||||
deterministic_route_selection,
|
||||
average_packet_delay,
|
||||
average_ack_delay,
|
||||
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
|
||||
primary_packet_size: PacketSize::default(),
|
||||
secondary_packet_size: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Allows setting non-default number of expected mix hops in the network.
|
||||
#[allow(dead_code)]
|
||||
pub fn with_mix_hops(mut self, hops: u8) -> Self {
|
||||
self.num_mix_hops = hops;
|
||||
self
|
||||
}
|
||||
|
||||
/// Allows setting non-default size of the sphinx packets sent out.
|
||||
pub fn with_custom_primary_packet_size(mut self, packet_size: PacketSize) -> Self {
|
||||
self.primary_packet_size = packet_size;
|
||||
@@ -185,9 +173,7 @@ where
|
||||
config.sender_address,
|
||||
config.average_packet_delay,
|
||||
config.average_ack_delay,
|
||||
)
|
||||
.with_mix_hops(config.num_mix_hops);
|
||||
|
||||
);
|
||||
MessageHandler {
|
||||
config,
|
||||
rng,
|
||||
@@ -216,7 +202,7 @@ where
|
||||
fn get_topology<'a>(
|
||||
&self,
|
||||
permit: &'a TopologyReadPermit<'a>,
|
||||
) -> Result<&'a NymTopology, PreparationError> {
|
||||
) -> Result<&'a NymRouteProvider, PreparationError> {
|
||||
match permit.try_get_valid_topology_ref(&self.config.sender_address, None) {
|
||||
Ok(topology_ref) => Ok(topology_ref),
|
||||
Err(err) => {
|
||||
@@ -233,9 +219,8 @@ where
|
||||
return self.config.primary_packet_size;
|
||||
};
|
||||
|
||||
let primary_count =
|
||||
msg.required_packets(self.config.primary_packet_size, self.config.num_mix_hops);
|
||||
let secondary_count = msg.required_packets(secondary_packet, self.config.num_mix_hops);
|
||||
let primary_count = msg.required_packets(self.config.primary_packet_size);
|
||||
let secondary_count = msg.required_packets(secondary_packet);
|
||||
|
||||
trace!("This message would require: {primary_count} primary packets or {secondary_count} secondary packets...");
|
||||
// if there would be no benefit in using the secondary packet - use the primary (duh)
|
||||
@@ -424,10 +409,9 @@ where
|
||||
message: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Result<(), PreparationError> {
|
||||
let message = NymMessage::new_plain(message);
|
||||
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
|
||||
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -437,7 +421,6 @@ where
|
||||
recipient: Recipient,
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Result<(), PreparationError> {
|
||||
debug!("Sending non-reply message with packet type {packet_type}");
|
||||
// TODO: I really dislike existence of this assertion, it implies code has to be re-organised
|
||||
@@ -470,7 +453,6 @@ where
|
||||
&self.config.ack_key,
|
||||
&recipient,
|
||||
packet_type,
|
||||
mix_hops,
|
||||
)?;
|
||||
|
||||
let real_message = RealMessage::new(
|
||||
@@ -478,8 +460,7 @@ where
|
||||
Some(fragment.fragment_identifier()),
|
||||
);
|
||||
let delay = prepared_fragment.total_delay;
|
||||
let pending_ack =
|
||||
PendingAcknowledgement::new_known(fragment, delay, recipient, mix_hops);
|
||||
let pending_ack = PendingAcknowledgement::new_known(fragment, delay, recipient);
|
||||
|
||||
real_messages.push(real_message);
|
||||
pending_acks.push(pending_ack);
|
||||
@@ -496,7 +477,6 @@ where
|
||||
recipient: Recipient,
|
||||
amount: u32,
|
||||
packet_type: PacketType,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Result<(), PreparationError> {
|
||||
debug!("Sending additional reply SURBs with packet type {packet_type}");
|
||||
let sender_tag = self.get_or_create_sender_tag(&recipient);
|
||||
@@ -513,7 +493,6 @@ where
|
||||
recipient,
|
||||
TransmissionLane::AdditionalReplySurbs,
|
||||
packet_type,
|
||||
mix_hops,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -530,7 +509,6 @@ where
|
||||
num_reply_surbs: u32,
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
debug!("Sending message with reply SURBs with packet type {packet_type}");
|
||||
let sender_tag = self.get_or_create_sender_tag(&recipient);
|
||||
@@ -541,7 +519,7 @@ where
|
||||
let message =
|
||||
NymMessage::new_repliable(RepliableMessage::new_data(message, sender_tag, reply_surbs));
|
||||
|
||||
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
|
||||
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
|
||||
.await?;
|
||||
|
||||
log::trace!("storing {} reply keys", reply_keys.len());
|
||||
@@ -555,23 +533,18 @@ where
|
||||
recipient: Recipient,
|
||||
chunk: Fragment,
|
||||
packet_type: PacketType,
|
||||
mix_hops: Option<u8>,
|
||||
) -> Result<PreparedFragment, PreparationError> {
|
||||
debug!("Sending single chunk with packet type {packet_type}");
|
||||
let topology_permit = self.topology_access.get_read_permit().await;
|
||||
let topology = self.get_topology(&topology_permit)?;
|
||||
|
||||
let prepared_fragment = self
|
||||
.message_preparer
|
||||
.prepare_chunk_for_sending(
|
||||
chunk,
|
||||
topology,
|
||||
&self.config.ack_key,
|
||||
&recipient,
|
||||
packet_type,
|
||||
mix_hops,
|
||||
)
|
||||
.unwrap();
|
||||
let prepared_fragment = self.message_preparer.prepare_chunk_for_sending(
|
||||
chunk,
|
||||
topology,
|
||||
&self.config.ack_key,
|
||||
&recipient,
|
||||
packet_type,
|
||||
)?;
|
||||
|
||||
Ok(prepared_fragment)
|
||||
}
|
||||
@@ -624,16 +597,13 @@ where
|
||||
Err(err) => return Err(err.return_surbs(vec![reply_surb])),
|
||||
};
|
||||
|
||||
let prepared_fragment = self
|
||||
.message_preparer
|
||||
.prepare_reply_chunk_for_sending(
|
||||
chunk,
|
||||
topology,
|
||||
&self.config.ack_key,
|
||||
reply_surb,
|
||||
PacketType::Mix,
|
||||
)
|
||||
.unwrap();
|
||||
let prepared_fragment = self.message_preparer.prepare_reply_chunk_for_sending(
|
||||
chunk,
|
||||
topology,
|
||||
&self.config.ack_key,
|
||||
reply_surb,
|
||||
PacketType::Mix,
|
||||
)?;
|
||||
|
||||
Ok(prepared_fragment)
|
||||
}
|
||||
@@ -656,9 +626,14 @@ where
|
||||
messages: Vec<RealMessage>,
|
||||
transmission_lane: TransmissionLane,
|
||||
) {
|
||||
self.real_message_sender
|
||||
if let Err(err) = self
|
||||
.real_message_sender
|
||||
.send((messages, transmission_lane))
|
||||
.await
|
||||
.expect("real message receiver task (OutQueueControl) has died");
|
||||
{
|
||||
error!(
|
||||
"Failed to forward messages to the real message sender (OutQueueControl): {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,10 +9,12 @@ use self::{
|
||||
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
|
||||
};
|
||||
use crate::client::real_messages_control::message_handler::MessageHandler;
|
||||
use crate::client::replies::reply_controller;
|
||||
use crate::client::replies::reply_controller::{
|
||||
ReplyController, ReplyControllerReceiver, ReplyControllerSender,
|
||||
};
|
||||
use crate::client::replies::reply_storage::CombinedReplyStorage;
|
||||
use crate::config;
|
||||
use crate::{
|
||||
client::{
|
||||
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
|
||||
@@ -27,16 +29,13 @@ use nym_gateway_client::AcknowledgementReceiver;
|
||||
use nym_sphinx::acknowledgements::AckKey;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_statistics_common::clients::ClientStatsSender;
|
||||
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
|
||||
use rand::{rngs::OsRng, CryptoRng, Rng};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::client::replies::reply_controller;
|
||||
use crate::config;
|
||||
pub(crate) use acknowledgement_control::{AckActionSender, Action};
|
||||
|
||||
use nym_statistics_common::clients::ClientStatsSender;
|
||||
|
||||
pub(crate) mod acknowledgement_control;
|
||||
pub(crate) mod message_handler;
|
||||
pub(crate) mod real_traffic_stream;
|
||||
|
||||
@@ -230,6 +230,7 @@ where
|
||||
// poisson delay, but is it really a problem?
|
||||
let topology_permit = self.topology_access.get_read_permit().await;
|
||||
// the ack is sent back to ourselves (and then ignored)
|
||||
|
||||
let topology_ref = match topology_permit.try_get_valid_topology_ref(
|
||||
&self.config.our_full_destination,
|
||||
Some(&self.config.our_full_destination),
|
||||
@@ -544,7 +545,7 @@ where
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("OutQueueControl: Received shutdown");
|
||||
break;
|
||||
}
|
||||
|
||||
+4
-1
@@ -70,7 +70,10 @@ impl SendingDelayController {
|
||||
lower_bound,
|
||||
multiplier_elevated_counter: 0,
|
||||
time_when_logged_about_elevated_multiplier: now
|
||||
- Duration::from_secs(INTERVAL_BETWEEN_WARNING_ABOUT_ELEVATED_MULTIPLIER_SECS),
|
||||
.checked_sub(Duration::from_secs(
|
||||
INTERVAL_BETWEEN_WARNING_ABOUT_ELEVATED_MULTIPLIER_SECS,
|
||||
))
|
||||
.unwrap_or(now),
|
||||
time_when_changed: now,
|
||||
time_when_backpressure_detected: now,
|
||||
}
|
||||
|
||||
@@ -516,7 +516,6 @@ where
|
||||
recipient,
|
||||
to_send,
|
||||
nym_sphinx::params::PacketType::Mix,
|
||||
self.config.reply_surbs.surb_mix_hops,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -16,14 +16,14 @@
|
||||
#![warn(clippy::todo)]
|
||||
#![warn(clippy::dbg_macro)]
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::StreamExt;
|
||||
use nym_client_core_config_types::StatsReporting;
|
||||
use nym_sphinx::addressing::Recipient;
|
||||
use nym_statistics_common::clients::{
|
||||
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
|
||||
};
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::{
|
||||
client::inbound_messages::{InputMessage, InputMessageSender},
|
||||
@@ -94,10 +94,32 @@ impl StatisticsControl {
|
||||
async fn run_with_shutdown(&mut self, mut task_client: nym_task::TaskClient) {
|
||||
log::debug!("Started StatisticsControl with graceful shutdown support");
|
||||
|
||||
let mut stats_report_interval =
|
||||
tokio::time::interval(self.reporting_config.reporting_interval);
|
||||
let mut local_report_interval = tokio::time::interval(LOCAL_REPORT_INTERVAL);
|
||||
let mut snapshot_interval = tokio::time::interval(SNAPSHOT_INTERVAL);
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let mut stats_report_interval = tokio_stream::wrappers::IntervalStream::new(
|
||||
tokio::time::interval(self.reporting_config.reporting_interval),
|
||||
);
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let mut local_report_interval = tokio_stream::wrappers::IntervalStream::new(
|
||||
tokio::time::interval(LOCAL_REPORT_INTERVAL),
|
||||
);
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let mut snapshot_interval =
|
||||
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(SNAPSHOT_INTERVAL));
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let mut stats_report_interval = gloo_timers::future::IntervalStream::new(
|
||||
self.reporting_config.reporting_interval.as_millis() as u32,
|
||||
);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let mut local_report_interval =
|
||||
gloo_timers::future::IntervalStream::new(LOCAL_REPORT_INTERVAL.as_millis() as u32);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let mut snapshot_interval =
|
||||
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -108,16 +130,20 @@ impl StatisticsControl {
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = snapshot_interval.tick() => {
|
||||
_ = snapshot_interval.next() => {
|
||||
self.stats.snapshot();
|
||||
}
|
||||
_ = stats_report_interval.tick(), if self.reporting_config.enabled && self.reporting_config.provider_address.is_some() => {
|
||||
// SAFTEY : this branch executes only if reporting is not none, so unwrapp is fine
|
||||
#[allow(clippy::unwrap_used)]
|
||||
self.report_stats(self.reporting_config.provider_address.unwrap()).await;
|
||||
_ = stats_report_interval.next() => {
|
||||
let Some(recipient) = self.reporting_config.provider_address else {
|
||||
continue
|
||||
};
|
||||
|
||||
if self.reporting_config.enabled {
|
||||
self.report_stats(recipient).await;
|
||||
}
|
||||
}
|
||||
|
||||
_ = local_report_interval.tick() => {
|
||||
_ = local_report_interval.next() => {
|
||||
self.stats.local_report(&mut task_client);
|
||||
}
|
||||
_ = task_client.recv_with_delay() => {
|
||||
|
||||
@@ -2,8 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::params::DEFAULT_NUM_MIX_HOPS;
|
||||
use nym_topology::{NymTopology, NymTopologyError};
|
||||
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
@@ -17,29 +16,36 @@ pub struct TopologyAccessorInner {
|
||||
// few seconds, while reads are needed every single packet generated.
|
||||
// However, proper benchmarks will be needed to determine if `RwLock` is indeed a better
|
||||
// approach than a `Mutex`
|
||||
topology: RwLock<Option<NymTopology>>,
|
||||
topology: RwLock<NymRouteProvider>,
|
||||
}
|
||||
|
||||
impl TopologyAccessorInner {
|
||||
fn new() -> Self {
|
||||
fn new(initial: NymRouteProvider) -> Self {
|
||||
TopologyAccessorInner {
|
||||
controlled_manually: AtomicBool::new(false),
|
||||
released_manual_control: Notify::new(),
|
||||
topology: RwLock::new(None),
|
||||
topology: RwLock::new(initial),
|
||||
}
|
||||
}
|
||||
|
||||
async fn update(&self, new: Option<NymTopology>) {
|
||||
*self.topology.write().await = new;
|
||||
let mut guard = self.topology.write().await;
|
||||
|
||||
match new {
|
||||
Some(updated) => {
|
||||
guard.update(updated);
|
||||
}
|
||||
None => guard.clear_topology(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TopologyReadPermit<'a> {
|
||||
permit: RwLockReadGuard<'a, Option<NymTopology>>,
|
||||
permit: RwLockReadGuard<'a, NymRouteProvider>,
|
||||
}
|
||||
|
||||
impl Deref for TopologyReadPermit<'_> {
|
||||
type Target = Option<NymTopology>;
|
||||
type Target = NymRouteProvider;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.permit
|
||||
@@ -53,43 +59,31 @@ impl<'a> TopologyReadPermit<'a> {
|
||||
&'a self,
|
||||
ack_recipient: &Recipient,
|
||||
packet_recipient: Option<&Recipient>,
|
||||
) -> Result<&'a NymTopology, NymTopologyError> {
|
||||
) -> Result<&'a NymRouteProvider, NymTopologyError> {
|
||||
let route_provider = self.permit.deref();
|
||||
let topology = &route_provider.topology;
|
||||
|
||||
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
|
||||
let topology = self
|
||||
.permit
|
||||
.as_ref()
|
||||
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
|
||||
topology.ensure_not_empty()?;
|
||||
|
||||
// 2. does it have any mixnode at all?
|
||||
// 3. does it have any gateways at all?
|
||||
// 4. does it have a mixnode on each layer?
|
||||
topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS)?;
|
||||
// 2. does the topology have a node on each mixing layer?
|
||||
topology.ensure_minimally_routable()?;
|
||||
|
||||
// 5. does it contain OUR gateway (so that we could create an ack packet)?
|
||||
if !topology.gateway_exists(ack_recipient.gateway()) {
|
||||
return Err(NymTopologyError::NonExistentGatewayError {
|
||||
identity_key: ack_recipient.gateway().to_base58_string(),
|
||||
});
|
||||
}
|
||||
// 3. does it contain OUR gateway (so that we could create an ack packet)?
|
||||
let _ = route_provider.egress_by_identity(ack_recipient.gateway())?;
|
||||
|
||||
// 6. for our target recipient, does it contain THEIR gateway (so that we could create
|
||||
// 4. for our target recipient, does it contain THEIR gateway (so that we send anything over?)
|
||||
if let Some(recipient) = packet_recipient {
|
||||
if !topology.gateway_exists(recipient.gateway()) {
|
||||
return Err(NymTopologyError::NonExistentGatewayError {
|
||||
identity_key: recipient.gateway().to_base58_string(),
|
||||
});
|
||||
}
|
||||
let _ = route_provider.egress_by_identity(recipient.gateway())?;
|
||||
}
|
||||
|
||||
Ok(topology)
|
||||
Ok(route_provider)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
|
||||
fn from(read_permit: RwLockReadGuard<'a, Option<NymTopology>>) -> Self {
|
||||
TopologyReadPermit {
|
||||
permit: read_permit,
|
||||
}
|
||||
impl<'a> From<RwLockReadGuard<'a, NymRouteProvider>> for TopologyReadPermit<'a> {
|
||||
fn from(permit: RwLockReadGuard<'a, NymRouteProvider>) -> Self {
|
||||
TopologyReadPermit { permit }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,9 +93,11 @@ pub struct TopologyAccessor {
|
||||
}
|
||||
|
||||
impl TopologyAccessor {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(ignore_egress_epoch_roles: bool) -> Self {
|
||||
TopologyAccessor {
|
||||
inner: Arc::new(TopologyAccessorInner::new()),
|
||||
inner: Arc::new(TopologyAccessorInner::new(NymRouteProvider::new_empty(
|
||||
ignore_egress_epoch_roles,
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,8 +117,21 @@ impl TopologyAccessor {
|
||||
self.inner.released_manual_control.notified().await
|
||||
}
|
||||
|
||||
#[deprecated(note = "use .current_route_provider instead")]
|
||||
pub async fn current_topology(&self) -> Option<NymTopology> {
|
||||
self.inner.topology.read().await.clone()
|
||||
self.current_route_provider()
|
||||
.await
|
||||
.as_ref()
|
||||
.map(|p| p.topology.clone())
|
||||
}
|
||||
|
||||
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
|
||||
let provider = self.inner.topology.read().await;
|
||||
if provider.topology.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(provider)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
|
||||
@@ -140,15 +149,11 @@ impl TopologyAccessor {
|
||||
// only used by the client at startup to get a slightly more reasonable error message
|
||||
// (currently displays as unused because health checker is disabled due to required changes)
|
||||
pub async fn ensure_is_routable(&self) -> Result<(), NymTopologyError> {
|
||||
match self.inner.topology.read().await.deref() {
|
||||
None => Err(NymTopologyError::EmptyNetworkTopology),
|
||||
Some(ref topology) => topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TopologyAccessor {
|
||||
fn default() -> Self {
|
||||
TopologyAccessor::new()
|
||||
self.inner
|
||||
.topology
|
||||
.read()
|
||||
.await
|
||||
.topology
|
||||
.ensure_minimally_routable()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ use log::{debug, error};
|
||||
use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond};
|
||||
use nym_network_defaults::var_names::EXPLORER_API;
|
||||
use nym_topology::{
|
||||
nym_topology_from_basic_info,
|
||||
provider_trait::{async_trait, TopologyProvider},
|
||||
NymTopology,
|
||||
};
|
||||
@@ -15,8 +14,6 @@ use url::Url;
|
||||
|
||||
pub use nym_country_group::CountryGroup;
|
||||
|
||||
const MIN_NODES_PER_LAYER: usize = 1;
|
||||
|
||||
fn create_explorer_client() -> Option<ExplorerClient> {
|
||||
let Ok(explorer_api_url) = std::env::var(EXPLORER_API) else {
|
||||
error!("Missing EXPLORER_API");
|
||||
@@ -63,30 +60,20 @@ fn log_mixnode_distribution(mixnodes: &HashMap<CountryGroup, Vec<NodeId>>) {
|
||||
}
|
||||
|
||||
fn check_layer_integrity(topology: NymTopology) -> Result<(), ()> {
|
||||
let mixes = topology.mixes();
|
||||
if mixes.keys().len() < 3 {
|
||||
if topology.ensure_minimally_routable().is_err() {
|
||||
error!("Layer is missing in topology!");
|
||||
return Err(());
|
||||
}
|
||||
for (layer, mixnodes) in mixes {
|
||||
debug!("Layer {:?} has {} mixnodes", layer, mixnodes.len());
|
||||
if mixnodes.len() < MIN_NODES_PER_LAYER {
|
||||
error!(
|
||||
"There are only {} mixnodes in layer {:?}",
|
||||
mixnodes.len(),
|
||||
layer
|
||||
);
|
||||
return Err(());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[deprecated(note = "use NymApiTopologyProvider instead as explorer API will soon be removed")]
|
||||
pub struct GeoAwareTopologyProvider {
|
||||
validator_client: nym_validator_client::client::NymApiClient,
|
||||
filter_on: GroupBy,
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
impl GeoAwareTopologyProvider {
|
||||
pub fn new(mut nym_api_urls: Vec<Url>, filter_on: GroupBy) -> GeoAwareTopologyProvider {
|
||||
log::info!(
|
||||
@@ -104,6 +91,15 @@ impl GeoAwareTopologyProvider {
|
||||
}
|
||||
|
||||
async fn get_topology(&self) -> Option<NymTopology> {
|
||||
let rewarded_set = self
|
||||
.validator_client
|
||||
.get_current_rewarded_set()
|
||||
.await
|
||||
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
|
||||
.ok()?;
|
||||
|
||||
let mut topology = NymTopology::new_empty(rewarded_set);
|
||||
|
||||
let mixnodes = match self
|
||||
.validator_client
|
||||
.get_all_basic_active_mixing_assigned_nodes()
|
||||
@@ -187,7 +183,8 @@ impl GeoAwareTopologyProvider {
|
||||
.filter(|m| filtered_mixnode_ids.contains(&m.node_id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let topology = nym_topology_from_basic_info(&mixnodes, &gateways);
|
||||
topology.add_skimmed_nodes(&mixnodes);
|
||||
topology.add_skimmed_nodes(&gateways);
|
||||
|
||||
// TODO: return real error type
|
||||
check_layer_integrity(topology.clone()).ok()?;
|
||||
@@ -196,6 +193,7 @@ impl GeoAwareTopologyProvider {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[async_trait]
|
||||
impl TopologyProvider for GeoAwareTopologyProvider {
|
||||
@@ -205,6 +203,7 @@ impl TopologyProvider for GeoAwareTopologyProvider {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
#[async_trait(?Send)]
|
||||
impl TopologyProvider for GeoAwareTopologyProvider {
|
||||
|
||||
@@ -19,6 +19,7 @@ mod accessor;
|
||||
pub mod geo_aware_provider;
|
||||
pub mod nym_api_provider;
|
||||
|
||||
#[allow(deprecated)]
|
||||
pub use geo_aware_provider::GeoAwareTopologyProvider;
|
||||
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
|
||||
pub use nym_topology::provider_trait::TopologyProvider;
|
||||
@@ -27,7 +28,7 @@ pub use nym_topology::provider_trait::TopologyProvider;
|
||||
const MAX_FAILURE_COUNT: usize = 10;
|
||||
|
||||
pub struct TopologyRefresherConfig {
|
||||
refresh_rate: Duration,
|
||||
pub refresh_rate: Duration,
|
||||
}
|
||||
|
||||
impl TopologyRefresherConfig {
|
||||
@@ -96,28 +97,24 @@ impl TopologyRefresher {
|
||||
self.topology_accessor.ensure_is_routable().await
|
||||
}
|
||||
|
||||
pub async fn ensure_contains_gateway(
|
||||
pub async fn ensure_contains_routable_egress(
|
||||
&self,
|
||||
gateway: &NodeIdentity,
|
||||
egress: NodeIdentity,
|
||||
) -> Result<(), NymTopologyError> {
|
||||
let topology = self
|
||||
.topology_accessor
|
||||
.current_topology()
|
||||
.current_route_provider()
|
||||
.await
|
||||
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
|
||||
|
||||
if !topology.gateway_exists(gateway) {
|
||||
return Err(NymTopologyError::NonExistentGatewayError {
|
||||
identity_key: gateway.to_base58_string(),
|
||||
});
|
||||
}
|
||||
let _ = topology.egress_by_identity(egress)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn wait_for_gateway(
|
||||
&mut self,
|
||||
gateway: &NodeIdentity,
|
||||
gateway: NodeIdentity,
|
||||
timeout_duration: Duration,
|
||||
) -> Result<(), NymTopologyError> {
|
||||
info!(
|
||||
@@ -135,7 +132,7 @@ impl TopologyRefresher {
|
||||
})
|
||||
}
|
||||
_ = self.try_refresh() => {
|
||||
if self.ensure_contains_gateway(gateway).await.is_ok() {
|
||||
if self.ensure_contains_routable_egress(gateway).await.is_ok() {
|
||||
return Ok(())
|
||||
}
|
||||
info!("gateway '{gateway}' is still not online...");
|
||||
|
||||
@@ -4,32 +4,39 @@
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, error, warn};
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_topology::{NymTopology, NymTopologyError};
|
||||
use nym_topology::NymTopology;
|
||||
use nym_validator_client::UserAgent;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::cmp::min;
|
||||
use url::Url;
|
||||
|
||||
// the same values as our current (10.06.24) blacklist
|
||||
pub const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
|
||||
pub const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
pub min_mixnode_performance: u8,
|
||||
pub min_gateway_performance: u8,
|
||||
pub use_extended_topology: bool,
|
||||
pub ignore_egress_epoch_role: bool,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
// old values that decided on blacklist membership
|
||||
impl From<nym_client_core_config_types::Topology> for Config {
|
||||
fn from(value: nym_client_core_config_types::Topology) -> Self {
|
||||
Config {
|
||||
min_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
|
||||
min_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
|
||||
min_mixnode_performance: value.minimum_mixnode_performance,
|
||||
min_gateway_performance: value.minimum_gateway_performance,
|
||||
use_extended_topology: value.use_extended_topology,
|
||||
ignore_egress_epoch_role: value.ignore_egress_epoch_role,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
// if we're using 'extended' topology, filter the nodes based on the lowest set performance
|
||||
fn min_node_performance(&self) -> u8 {
|
||||
min(self.min_mixnode_performance, self.min_gateway_performance)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NymApiTopologyProvider {
|
||||
config: Config,
|
||||
|
||||
@@ -39,7 +46,11 @@ pub struct NymApiTopologyProvider {
|
||||
}
|
||||
|
||||
impl NymApiTopologyProvider {
|
||||
pub fn new(config: Config, mut nym_api_urls: Vec<Url>, user_agent: Option<UserAgent>) -> Self {
|
||||
pub fn new(
|
||||
config: impl Into<Config>,
|
||||
mut nym_api_urls: Vec<Url>,
|
||||
user_agent: Option<UserAgent>,
|
||||
) -> Self {
|
||||
nym_api_urls.shuffle(&mut thread_rng());
|
||||
|
||||
let validator_client = if let Some(user_agent) = user_agent {
|
||||
@@ -52,7 +63,7 @@ impl NymApiTopologyProvider {
|
||||
};
|
||||
|
||||
NymApiTopologyProvider {
|
||||
config,
|
||||
config: config.into(),
|
||||
validator_client,
|
||||
nym_api_urls,
|
||||
currently_used_api: 0,
|
||||
@@ -70,70 +81,69 @@ impl NymApiTopologyProvider {
|
||||
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
|
||||
}
|
||||
|
||||
/// Verifies whether nodes a reasonably distributed among all mix layers.
|
||||
///
|
||||
/// In ideal world we would have 33% nodes on layer 1, 33% on layer 2 and 33% on layer 3.
|
||||
/// However, this is a rather unrealistic expectation, instead we check whether there exists
|
||||
/// a layer with more than 66% of nodes or with fewer than 15% and if so, we trigger a failure.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `topology`: active topology constructed from validator api data
|
||||
fn check_layer_distribution(
|
||||
&self,
|
||||
active_topology: &NymTopology,
|
||||
) -> Result<(), NymTopologyError> {
|
||||
let lower_threshold = 0.15;
|
||||
let upper_threshold = 0.66;
|
||||
active_topology.ensure_even_layer_distribution(lower_threshold, upper_threshold)
|
||||
}
|
||||
|
||||
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
|
||||
let mixnodes = match self
|
||||
let rewarded_set = self
|
||||
.validator_client
|
||||
.get_all_basic_active_mixing_assigned_nodes()
|
||||
.get_current_rewarded_set()
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
error!("failed to get network mixnodes - {err}");
|
||||
return None;
|
||||
}
|
||||
Ok(mixes) => mixes,
|
||||
};
|
||||
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
|
||||
.ok()?;
|
||||
|
||||
let gateways = match self
|
||||
.validator_client
|
||||
.get_all_basic_entry_assigned_nodes()
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
error!("failed to get network gateways - {err}");
|
||||
return None;
|
||||
}
|
||||
Ok(gateways) => gateways,
|
||||
};
|
||||
let mut topology = NymTopology::new_empty(rewarded_set);
|
||||
|
||||
debug!(
|
||||
"there are {} mixnodes and {} gateways in total (before performance filtering)",
|
||||
mixnodes.len(),
|
||||
gateways.len()
|
||||
);
|
||||
if self.config.use_extended_topology {
|
||||
let all_nodes = self
|
||||
.validator_client
|
||||
.get_all_basic_nodes()
|
||||
.await
|
||||
.inspect_err(|err| error!("failed to get network nodes: {err}"))
|
||||
.ok()?;
|
||||
|
||||
let topology = NymTopology::from_unordered(
|
||||
mixnodes.iter().filter(|m| {
|
||||
m.performance.round_to_integer() >= self.config.min_mixnode_performance
|
||||
}),
|
||||
gateways.iter().filter(|g| {
|
||||
g.performance.round_to_integer() >= self.config.min_gateway_performance
|
||||
}),
|
||||
);
|
||||
if let Err(err) = self.check_layer_distribution(&topology) {
|
||||
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
|
||||
self.use_next_nym_api();
|
||||
None
|
||||
debug!(
|
||||
"there are {} nodes on the network (before filtering)",
|
||||
all_nodes.len()
|
||||
);
|
||||
topology.add_additional_nodes(all_nodes.iter().filter(|n| {
|
||||
n.performance.round_to_integer() >= self.config.min_node_performance()
|
||||
}));
|
||||
} else {
|
||||
Some(topology)
|
||||
// if we're not using extended topology, we're only getting active set mixnodes and gateways
|
||||
|
||||
let mixnodes = self
|
||||
.validator_client
|
||||
.get_all_basic_active_mixing_assigned_nodes()
|
||||
.await
|
||||
.inspect_err(|err| error!("failed to get network mixnodes: {err}"))
|
||||
.ok()?;
|
||||
|
||||
// TODO: we really should be getting ACTIVE gateways only
|
||||
let gateways = self
|
||||
.validator_client
|
||||
.get_all_basic_entry_assigned_nodes()
|
||||
.await
|
||||
.inspect_err(|err| error!("failed to get network gateways: {err}"))
|
||||
.ok()?;
|
||||
|
||||
debug!(
|
||||
"there are {} mixnodes and {} gateways in total (before performance filtering)",
|
||||
mixnodes.len(),
|
||||
gateways.len()
|
||||
);
|
||||
|
||||
topology.add_additional_nodes(mixnodes.iter().filter(|m| {
|
||||
m.performance.round_to_integer() >= self.config.min_mixnode_performance
|
||||
}));
|
||||
topology.add_additional_nodes(gateways.iter().filter(|m| {
|
||||
m.performance.round_to_integer() >= self.config.min_gateway_performance
|
||||
}));
|
||||
};
|
||||
|
||||
if !topology.is_minimally_routable() {
|
||||
error!("the current filtered active topology can't be used to construct any packets");
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(topology)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,7 +152,11 @@ impl NymApiTopologyProvider {
|
||||
#[async_trait]
|
||||
impl TopologyProvider for NymApiTopologyProvider {
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
self.get_current_compatible_topology().await
|
||||
let Some(topology) = self.get_current_compatible_topology().await else {
|
||||
self.use_next_nym_api();
|
||||
return None;
|
||||
};
|
||||
Some(topology)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,6 +164,10 @@ impl TopologyProvider for NymApiTopologyProvider {
|
||||
#[async_trait(?Send)]
|
||||
impl TopologyProvider for NymApiTopologyProvider {
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
self.get_current_compatible_topology().await
|
||||
let Some(topology) = self.get_current_compatible_topology().await else {
|
||||
self.use_next_nym_api();
|
||||
return None;
|
||||
};
|
||||
Some(topology)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,8 @@
|
||||
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
|
||||
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
|
||||
use nym_gateway_client::error::GatewayClientError;
|
||||
use nym_topology::gateway::GatewayConversionError;
|
||||
use nym_topology::NymTopologyError;
|
||||
use nym_topology::node::RoutingNodeError;
|
||||
use nym_topology::{NodeId, NymTopologyError};
|
||||
use nym_validator_client::ValidatorClientError;
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
@@ -74,10 +74,10 @@ pub enum ClientCoreError {
|
||||
#[error("the gateway id is invalid - {0}")]
|
||||
UnableToCreatePublicKeyFromGatewayId(Ed25519RecoveryError),
|
||||
|
||||
#[error("The gateway is malformed: {source}")]
|
||||
#[error("the node is malformed: {source}")]
|
||||
MalformedGateway {
|
||||
#[from]
|
||||
source: GatewayConversionError,
|
||||
source: Box<RoutingNodeError>,
|
||||
},
|
||||
|
||||
#[error("failed to establish connection to gateway: {source}")]
|
||||
@@ -96,6 +96,9 @@ pub enum ClientCoreError {
|
||||
#[error("timed out while trying to establish gateway connection")]
|
||||
GatewayConnectionTimeout,
|
||||
|
||||
#[error("failed to forward mix messages to gateway")]
|
||||
GatewayFailedToForwardMessages,
|
||||
|
||||
#[error("no ping measurements for the gateway ({identity}) performed")]
|
||||
NoGatewayMeasurements { identity: String },
|
||||
|
||||
@@ -159,6 +162,9 @@ pub enum ClientCoreError {
|
||||
#[error("the specified gateway '{gateway}' does not support the wss protocol")]
|
||||
UnsupportedWssProtocol { gateway: String },
|
||||
|
||||
#[error("node {id} ({identity}) does not support mixnet entry mode")]
|
||||
UnsupportedEntry { id: NodeId, identity: String },
|
||||
|
||||
#[error(
|
||||
"failed to load custom topology using path '{}'. detailed message: {source}", file_path.display()
|
||||
)]
|
||||
|
||||
@@ -7,7 +7,7 @@ use futures::{SinkExt, StreamExt};
|
||||
use log::{debug, info, trace, warn};
|
||||
use nym_crypto::asymmetric::identity;
|
||||
use nym_gateway_client::GatewayClient;
|
||||
use nym_topology::gateway;
|
||||
use nym_topology::node::RoutingNode;
|
||||
use nym_validator_client::client::IdentityKeyRef;
|
||||
use nym_validator_client::UserAgent;
|
||||
use rand::{seq::SliceRandom, Rng};
|
||||
@@ -15,6 +15,7 @@ use std::{sync::Arc, time::Duration};
|
||||
use tungstenite::Message;
|
||||
use url::Url;
|
||||
|
||||
use nym_topology::NodeId;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::net::TcpStream;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
@@ -25,7 +26,6 @@ use tokio::time::Instant;
|
||||
use tokio_tungstenite::connect_async;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_utils::websocket::JSWebsocket;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
@@ -48,22 +48,30 @@ const PING_TIMEOUT: Duration = Duration::from_millis(1000);
|
||||
|
||||
// The abstraction that some of these helpers use
|
||||
pub trait ConnectableGateway {
|
||||
fn identity(&self) -> &identity::PublicKey;
|
||||
fn clients_address(&self) -> String;
|
||||
fn node_id(&self) -> NodeId;
|
||||
fn identity(&self) -> identity::PublicKey;
|
||||
fn clients_address(&self, prefer_ipv6: bool) -> Option<String>;
|
||||
fn is_wss(&self) -> bool;
|
||||
}
|
||||
|
||||
impl ConnectableGateway for gateway::LegacyNode {
|
||||
fn identity(&self) -> &identity::PublicKey {
|
||||
self.identity()
|
||||
impl ConnectableGateway for RoutingNode {
|
||||
fn node_id(&self) -> NodeId {
|
||||
self.node_id
|
||||
}
|
||||
|
||||
fn clients_address(&self) -> String {
|
||||
self.clients_address()
|
||||
fn identity(&self) -> identity::PublicKey {
|
||||
self.identity_key
|
||||
}
|
||||
|
||||
fn clients_address(&self, prefer_ipv6: bool) -> Option<String> {
|
||||
self.ws_entry_address(prefer_ipv6)
|
||||
}
|
||||
|
||||
fn is_wss(&self) -> bool {
|
||||
self.clients_wss_port.is_some()
|
||||
self.entry
|
||||
.as_ref()
|
||||
.map(|e| e.clients_wss_port.is_some())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,12 +86,13 @@ impl<'a, G: ConnectableGateway> GatewayWithLatency<'a, G> {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn current_gateways<R: Rng>(
|
||||
pub async fn gateways_for_init<R: Rng>(
|
||||
rng: &mut R,
|
||||
nym_apis: &[Url],
|
||||
user_agent: Option<UserAgent>,
|
||||
minimum_performance: u8,
|
||||
) -> Result<Vec<gateway::LegacyNode>, ClientCoreError> {
|
||||
ignore_epoch_roles: bool,
|
||||
) -> Result<Vec<RoutingNode>, ClientCoreError> {
|
||||
let nym_api = nym_apis
|
||||
.choose(rng)
|
||||
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
|
||||
@@ -100,11 +109,14 @@ pub async fn current_gateways<R: Rng>(
|
||||
|
||||
log::trace!("Gateways: {:#?}", gateways);
|
||||
|
||||
// filter out gateways below minimum performance and ones that could operate as a mixnode
|
||||
// (we don't want instability)
|
||||
let valid_gateways = gateways
|
||||
.iter()
|
||||
.filter(|g| ignore_epoch_roles || !g.supported_roles.mixnode)
|
||||
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
|
||||
.filter_map(|gateway| gateway.try_into().ok())
|
||||
.collect::<Vec<gateway::LegacyNode>>();
|
||||
.collect::<Vec<_>>();
|
||||
log::debug!("After checking validity: {}", valid_gateways.len());
|
||||
log::trace!("Valid gateways: {:#?}", valid_gateways);
|
||||
|
||||
@@ -134,7 +146,12 @@ async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, Client
|
||||
where
|
||||
G: ConnectableGateway,
|
||||
{
|
||||
let addr = gateway.clients_address();
|
||||
let Some(addr) = gateway.clients_address(false) else {
|
||||
return Err(ClientCoreError::UnsupportedEntry {
|
||||
id: gateway.node_id(),
|
||||
identity: gateway.identity().to_string(),
|
||||
});
|
||||
};
|
||||
trace!(
|
||||
"establishing connection to {} ({addr})...",
|
||||
gateway.identity(),
|
||||
@@ -190,7 +207,7 @@ where
|
||||
Ok(GatewayWithLatency::new(gateway, avg))
|
||||
}
|
||||
|
||||
pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone>(
|
||||
pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
|
||||
rng: &mut R,
|
||||
gateways: &[G],
|
||||
must_use_tls: bool,
|
||||
@@ -205,7 +222,7 @@ pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone
|
||||
let gateways_with_latency = Arc::new(tokio::sync::Mutex::new(Vec::new()));
|
||||
futures::stream::iter(gateways)
|
||||
.for_each_concurrent(CONCURRENT_GATEWAYS_MEASURED, |gateway| async {
|
||||
let id = *gateway.identity();
|
||||
let id = gateway.identity();
|
||||
trace!("measuring latency to {id}...");
|
||||
match measure_latency(gateway).await {
|
||||
Ok(with_latency) => {
|
||||
@@ -252,9 +269,9 @@ fn filter_by_tls<G: ConnectableGateway>(
|
||||
|
||||
pub(super) fn uniformly_random_gateway<R: Rng>(
|
||||
rng: &mut R,
|
||||
gateways: &[gateway::LegacyNode],
|
||||
gateways: &[RoutingNode],
|
||||
must_use_tls: bool,
|
||||
) -> Result<gateway::LegacyNode, ClientCoreError> {
|
||||
) -> Result<RoutingNode, ClientCoreError> {
|
||||
filter_by_tls(gateways, must_use_tls)?
|
||||
.choose(rng)
|
||||
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
|
||||
@@ -263,9 +280,9 @@ pub(super) fn uniformly_random_gateway<R: Rng>(
|
||||
|
||||
pub(super) fn get_specified_gateway(
|
||||
gateway_identity: IdentityKeyRef,
|
||||
gateways: &[gateway::LegacyNode],
|
||||
gateways: &[RoutingNode],
|
||||
must_use_tls: bool,
|
||||
) -> Result<gateway::LegacyNode, ClientCoreError> {
|
||||
) -> Result<RoutingNode, ClientCoreError> {
|
||||
log::debug!("Requesting specified gateway: {}", gateway_identity);
|
||||
let user_gateway = identity::PublicKey::from_base58_string(gateway_identity)
|
||||
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
|
||||
@@ -275,7 +292,14 @@ pub(super) fn get_specified_gateway(
|
||||
.find(|gateway| gateway.identity_key == user_gateway)
|
||||
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_identity.to_string()))?;
|
||||
|
||||
if must_use_tls && gateway.clients_wss_port.is_none() {
|
||||
let Some(entry_details) = gateway.entry.as_ref() else {
|
||||
return Err(ClientCoreError::UnsupportedEntry {
|
||||
id: gateway.node_id,
|
||||
identity: gateway.identity().to_string(),
|
||||
});
|
||||
};
|
||||
|
||||
if must_use_tls && entry_details.clients_wss_port.is_none() {
|
||||
return Err(ClientCoreError::UnsupportedWssProtocol {
|
||||
gateway: gateway_identity.to_string(),
|
||||
});
|
||||
|
||||
@@ -19,7 +19,7 @@ use crate::init::types::{
|
||||
use nym_client_core_gateways_storage::GatewaysDetailsStore;
|
||||
use nym_client_core_gateways_storage::{GatewayDetails, GatewayRegistration};
|
||||
use nym_gateway_client::client::InitGatewayClient;
|
||||
use nym_topology::gateway;
|
||||
use nym_topology::node::RoutingNode;
|
||||
use rand::rngs::OsRng;
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use serde::Serialize;
|
||||
@@ -50,7 +50,7 @@ async fn setup_new_gateway<K, D>(
|
||||
key_store: &K,
|
||||
details_store: &D,
|
||||
selection_specification: GatewaySelectionSpecification,
|
||||
available_gateways: Vec<gateway::LegacyNode>,
|
||||
available_gateways: Vec<RoutingNode>,
|
||||
) -> Result<InitialisationResult, ClientCoreError>
|
||||
where
|
||||
K: KeyStore,
|
||||
|
||||
@@ -13,11 +13,11 @@ use nym_crypto::asymmetric::identity;
|
||||
use nym_gateway_client::client::InitGatewayClient;
|
||||
use nym_gateway_requests::shared_key::SharedGatewayKey;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_topology::gateway;
|
||||
use nym_topology::node::RoutingNode;
|
||||
use nym_validator_client::client::IdentityKey;
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
use serde::Serialize;
|
||||
use std::fmt::Display;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use url::Url;
|
||||
@@ -38,16 +38,23 @@ pub enum SelectedGateway {
|
||||
|
||||
impl SelectedGateway {
|
||||
pub fn from_topology_node(
|
||||
node: gateway::LegacyNode,
|
||||
node: RoutingNode,
|
||||
must_use_tls: bool,
|
||||
) -> Result<Self, ClientCoreError> {
|
||||
// for now, let's use 'old' behaviour, if you want to change it, you can pass it up the enum stack yourself : )
|
||||
let prefer_ipv6 = false;
|
||||
|
||||
let gateway_listener = if must_use_tls {
|
||||
node.clients_address_tls()
|
||||
node.ws_entry_address_tls()
|
||||
.ok_or(ClientCoreError::UnsupportedWssProtocol {
|
||||
gateway: node.identity_key.to_base58_string(),
|
||||
})?
|
||||
} else {
|
||||
node.clients_address()
|
||||
node.ws_entry_address(prefer_ipv6)
|
||||
.ok_or(ClientCoreError::UnsupportedEntry {
|
||||
id: node.node_id,
|
||||
identity: node.identity_key.to_base58_string(),
|
||||
})?
|
||||
};
|
||||
|
||||
let gateway_listener =
|
||||
@@ -200,7 +207,7 @@ pub enum GatewaySetup {
|
||||
specification: GatewaySelectionSpecification,
|
||||
|
||||
// TODO: seems to be a bit inefficient to pass them by value
|
||||
available_gateways: Vec<gateway::LegacyNode>,
|
||||
available_gateways: Vec<RoutingNode>,
|
||||
},
|
||||
|
||||
ReuseConnection {
|
||||
@@ -214,6 +221,34 @@ pub enum GatewaySetup {
|
||||
},
|
||||
}
|
||||
|
||||
impl Debug for GatewaySetup {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
GatewaySetup::MustLoad { gateway_id } => f
|
||||
.debug_struct("GatewaySetup::MustLoad")
|
||||
.field("gateway_id", gateway_id)
|
||||
.finish(),
|
||||
GatewaySetup::New {
|
||||
specification,
|
||||
available_gateways,
|
||||
} => f
|
||||
.debug_struct("GatewaySetup::New")
|
||||
.field("specification", specification)
|
||||
.field("available_gateways", available_gateways)
|
||||
.field("gateways", specification)
|
||||
.finish(),
|
||||
GatewaySetup::ReuseConnection {
|
||||
gateway_details, ..
|
||||
} => f
|
||||
.debug_struct("GatewaySetup::ReuseConnection")
|
||||
.field("authenticated_ephemeral_client", &"***")
|
||||
.field("gateway_details", gateway_details)
|
||||
.field("client_keys", &"***")
|
||||
.finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GatewaySetup {
|
||||
pub fn try_reuse_connection(init_res: InitialisationResult) -> Result<Self, ClientCoreError> {
|
||||
if let Some(authenticated_ephemeral_client) = init_res.authenticated_ephemeral_client {
|
||||
|
||||
@@ -14,8 +14,7 @@ pub mod error;
|
||||
pub mod init;
|
||||
|
||||
pub use nym_topology::{
|
||||
HardcodedTopologyProvider, NymTopology, NymTopologyError, SerializableNymTopology,
|
||||
SerializableTopologyError, TopologyProvider,
|
||||
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
|
||||
};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
@@ -34,3 +33,48 @@ where
|
||||
{
|
||||
tokio::spawn(future);
|
||||
}
|
||||
|
||||
#[derive(Clone, Default, Debug)]
|
||||
pub struct ForgetMe {
|
||||
client: bool,
|
||||
stats: bool,
|
||||
}
|
||||
|
||||
impl ForgetMe {
|
||||
pub fn new_all() -> Self {
|
||||
Self {
|
||||
client: true,
|
||||
stats: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_client() -> Self {
|
||||
Self {
|
||||
client: true,
|
||||
stats: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_stats() -> Self {
|
||||
Self {
|
||||
client: false,
|
||||
stats: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(client: bool, stats: bool) -> Self {
|
||||
Self { client, stats }
|
||||
}
|
||||
|
||||
pub fn any(&self) -> bool {
|
||||
self.client || self.stats
|
||||
}
|
||||
|
||||
pub fn client(&self) -> bool {
|
||||
self.client
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> bool {
|
||||
self.stats
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,10 @@ use crate::backend::fs_backend::{
|
||||
},
|
||||
};
|
||||
use log::{error, info};
|
||||
use sqlx::ConnectOptions;
|
||||
use sqlx::{
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
ConnectOptions,
|
||||
};
|
||||
use std::path::Path;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -31,6 +34,9 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path)
|
||||
.create_if_missing(fresh)
|
||||
.disable_statement_logging();
|
||||
|
||||
@@ -101,6 +101,10 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
|
||||
// currently unused (but populated)
|
||||
negotiated_protocol: Option<u8>,
|
||||
|
||||
// Callback on the fd as soon as the connection has been established
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
|
||||
/// Listen to shutdown messages and send notifications back to the task manager
|
||||
task_client: TaskClient,
|
||||
}
|
||||
@@ -116,6 +120,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
packet_router: PacketRouter,
|
||||
bandwidth_controller: Option<BandwidthController<C, St>>,
|
||||
stats_reporter: ClientStatsSender,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
GatewayClient {
|
||||
@@ -131,6 +136,8 @@ impl<C, St> GatewayClient<C, St> {
|
||||
bandwidth_controller,
|
||||
stats_reporter,
|
||||
negotiated_protocol: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
@@ -205,6 +212,12 @@ impl<C, St> GatewayClient<C, St> {
|
||||
};
|
||||
|
||||
self.connection = SocketState::Available(Box::new(ws_stream));
|
||||
|
||||
#[cfg(unix)]
|
||||
if let (Some(callback), Some(fd)) = (self.connection_fd_callback.as_ref(), self.ws_fd()) {
|
||||
callback.as_ref()(fd);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -311,7 +324,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
|
||||
// If we want to send a message (with response), we need to have a full control over the socket,
|
||||
// as we need to be able to write the request and read the subsequent response
|
||||
async fn send_websocket_message(
|
||||
pub async fn send_websocket_message(
|
||||
&mut self,
|
||||
msg: impl Into<Message>,
|
||||
) -> Result<ServerResponse, GatewayClientError> {
|
||||
@@ -1034,6 +1047,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
bandwidth_controller: None,
|
||||
stats_reporter: ClientStatsSender::new(None),
|
||||
negotiated_protocol: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
@@ -1064,6 +1079,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
bandwidth_controller,
|
||||
stats_reporter,
|
||||
negotiated_protocol: self.negotiated_protocol,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: self.connection_fd_callback,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,6 +118,9 @@ pub enum GatewayClientError {
|
||||
"this operation couldn't be completed as the program is in the process of shutting down"
|
||||
)]
|
||||
ShutdownInProgress,
|
||||
|
||||
#[error("Timed out on sending message(s)")]
|
||||
TimeoutOnSink,
|
||||
}
|
||||
|
||||
impl GatewayClientError {
|
||||
|
||||
@@ -291,7 +291,10 @@ impl PartiallyDelegatedHandle {
|
||||
&mut self,
|
||||
msg: Message,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
Ok(self.sink_half.send(msg).await?)
|
||||
tokio::time::timeout(std::time::Duration::from_secs(1), self.sink_half.send(msg))
|
||||
.await
|
||||
.map_err(|_| GatewayClientError::TimeoutOnSink)??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn batch_send_without_response(
|
||||
@@ -300,7 +303,13 @@ impl PartiallyDelegatedHandle {
|
||||
) -> Result<(), GatewayClientError> {
|
||||
let stream_messages: Vec<_> = messages.into_iter().map(Ok).collect();
|
||||
let mut send_stream = futures::stream::iter(stream_messages);
|
||||
Ok(self.sink_half.send_all(&mut send_stream).await?)
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_secs(1),
|
||||
self.sink_half.send_all(&mut send_stream),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| GatewayClientError::TimeoutOnSink)??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn merge(self) -> Result<WsConn, GatewayClientError> {
|
||||
|
||||
@@ -8,10 +8,12 @@ license.workspace = true
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
dashmap = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tokio = { workspace = true, features = ["time"] }
|
||||
tokio = { workspace = true, features = ["time", "sync"] }
|
||||
tokio-util = { workspace = true, features = ["codec"], optional = true }
|
||||
tokio-stream = { workspace = true }
|
||||
|
||||
# internal
|
||||
nym-sphinx = { path = "../../nymsphinx" }
|
||||
|
||||
@@ -1,21 +1,24 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use dashmap::DashMap;
|
||||
use futures::StreamExt;
|
||||
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_sphinx::framing::packet::FramedNymPacket;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_sphinx::NymPacket;
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::time::sleep;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::codec::Framed;
|
||||
use tracing::*;
|
||||
|
||||
@@ -55,11 +58,37 @@ pub trait SendWithoutResponse {
|
||||
}
|
||||
|
||||
pub struct Client {
|
||||
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
|
||||
active_connections: ActiveConnections,
|
||||
connections_count: Arc<AtomicUsize>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
struct ConnectionSender {
|
||||
#[derive(Default, Clone)]
|
||||
pub struct ActiveConnections {
|
||||
inner: Arc<DashMap<NymNodeRoutingAddress, ConnectionSender>>,
|
||||
}
|
||||
|
||||
impl ActiveConnections {
|
||||
pub fn pending_packets(&self) -> usize {
|
||||
self.inner
|
||||
.iter()
|
||||
.map(|sender| {
|
||||
let max_capacity = sender.channel.max_capacity();
|
||||
let capacity = sender.channel.capacity();
|
||||
max_capacity - capacity
|
||||
})
|
||||
.sum()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for ActiveConnections {
|
||||
type Target = DashMap<NymNodeRoutingAddress, ConnectionSender>;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectionSender {
|
||||
channel: mpsc::Sender<FramedNymPacket>,
|
||||
current_reconnection_attempt: Arc<AtomicU32>,
|
||||
}
|
||||
@@ -73,46 +102,53 @@ impl ConnectionSender {
|
||||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(config: Config) -> Client {
|
||||
Client {
|
||||
conn_new: HashMap::new(),
|
||||
config,
|
||||
struct ManagedConnection {
|
||||
address: SocketAddr,
|
||||
message_receiver: ReceiverStream<FramedNymPacket>,
|
||||
connection_timeout: Duration,
|
||||
current_reconnection: Arc<AtomicU32>,
|
||||
}
|
||||
|
||||
impl ManagedConnection {
|
||||
fn new(
|
||||
address: SocketAddr,
|
||||
message_receiver: mpsc::Receiver<FramedNymPacket>,
|
||||
connection_timeout: Duration,
|
||||
current_reconnection: Arc<AtomicU32>,
|
||||
) -> Self {
|
||||
ManagedConnection {
|
||||
address,
|
||||
message_receiver: ReceiverStream::new(message_receiver),
|
||||
connection_timeout,
|
||||
current_reconnection,
|
||||
}
|
||||
}
|
||||
|
||||
async fn manage_connection(
|
||||
address: SocketAddr,
|
||||
receiver: mpsc::Receiver<FramedNymPacket>,
|
||||
connection_timeout: Duration,
|
||||
current_reconnection: &AtomicU32,
|
||||
) {
|
||||
async fn run(self) {
|
||||
let address = self.address;
|
||||
let connection_fut = TcpStream::connect(address);
|
||||
|
||||
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
|
||||
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
|
||||
Ok(stream_res) => match stream_res {
|
||||
Ok(stream) => {
|
||||
debug!("Managed to establish connection to {}", address);
|
||||
debug!("Managed to establish connection to {}", self.address);
|
||||
// if we managed to connect, reset the reconnection count (whatever it might have been)
|
||||
current_reconnection.store(0, Ordering::Release);
|
||||
self.current_reconnection.store(0, Ordering::Release);
|
||||
Framed::new(stream, NymCodec)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(
|
||||
"failed to establish connection to {} (err: {})",
|
||||
address, err
|
||||
);
|
||||
debug!("failed to establish connection to {address} (err: {err})",);
|
||||
return;
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
debug!(
|
||||
"failed to connect to {} within {:?}",
|
||||
address, connection_timeout
|
||||
"failed to connect to {address} within {:?}",
|
||||
self.connection_timeout
|
||||
);
|
||||
|
||||
// we failed to connect - increase reconnection attempt
|
||||
current_reconnection.fetch_add(1, Ordering::SeqCst);
|
||||
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -120,15 +156,28 @@ impl Client {
|
||||
// Take whatever the receiver channel produces and put it on the connection.
|
||||
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
|
||||
// about neither receiver nor the connection, it doesn't matter which one gets consumed
|
||||
if let Err(err) = receiver.map(Ok).forward(conn).await {
|
||||
warn!("Failed to forward packets to {} - {err}", address);
|
||||
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
|
||||
warn!("Failed to forward packets to {address}: {err}");
|
||||
}
|
||||
|
||||
debug!(
|
||||
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
|
||||
address
|
||||
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(config: Config, connections_count: Arc<AtomicUsize>) -> Client {
|
||||
Client {
|
||||
active_connections: Default::default(),
|
||||
connections_count,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn active_connections(&self) -> ActiveConnections {
|
||||
self.active_connections.clone()
|
||||
}
|
||||
|
||||
/// If we're trying to reconnect, determine how long we should wait.
|
||||
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
|
||||
@@ -148,7 +197,7 @@ impl Client {
|
||||
}
|
||||
|
||||
fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) {
|
||||
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
|
||||
let (sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
|
||||
|
||||
// this CAN'T fail because we just created the channel which has a non-zero capacity
|
||||
if self.config.maximum_connection_buffer_size > 0 {
|
||||
@@ -156,15 +205,16 @@ impl Client {
|
||||
}
|
||||
|
||||
// if we already tried to connect to `address` before, grab the current attempt count
|
||||
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
|
||||
existing.channel = sender;
|
||||
Arc::clone(&existing.current_reconnection_attempt)
|
||||
} else {
|
||||
let new_entry = ConnectionSender::new(sender);
|
||||
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
|
||||
self.conn_new.insert(address, new_entry);
|
||||
current_attempt
|
||||
};
|
||||
let current_reconnection_attempt =
|
||||
if let Some(mut existing) = self.active_connections.get_mut(&address) {
|
||||
existing.channel = sender;
|
||||
Arc::clone(&existing.current_reconnection_attempt)
|
||||
} else {
|
||||
let new_entry = ConnectionSender::new(sender);
|
||||
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
|
||||
self.active_connections.insert(address, new_entry);
|
||||
current_attempt
|
||||
};
|
||||
|
||||
// load the actual value.
|
||||
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
|
||||
@@ -173,6 +223,7 @@ impl Client {
|
||||
// copy the value before moving into another task
|
||||
let initial_connection_timeout = self.config.initial_connection_timeout;
|
||||
|
||||
let connections_count = self.connections_count.clone();
|
||||
tokio::spawn(async move {
|
||||
// before executing the manager, wait for what was specified, if anything
|
||||
if let Some(backoff) = backoff {
|
||||
@@ -180,13 +231,16 @@ impl Client {
|
||||
sleep(backoff).await;
|
||||
}
|
||||
|
||||
Self::manage_connection(
|
||||
connections_count.fetch_add(1, Ordering::SeqCst);
|
||||
ManagedConnection::new(
|
||||
address.into(),
|
||||
receiver,
|
||||
initial_connection_timeout,
|
||||
¤t_reconnection_attempt,
|
||||
current_reconnection_attempt,
|
||||
)
|
||||
.await
|
||||
.run()
|
||||
.await;
|
||||
connections_count.fetch_sub(1, Ordering::SeqCst);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -201,49 +255,47 @@ impl SendWithoutResponse for Client {
|
||||
trace!("Sending packet to {address:?}");
|
||||
let framed_packet = FramedNymPacket::new(packet, packet_type);
|
||||
|
||||
if let Some(sender) = self.conn_new.get_mut(&address) {
|
||||
if let Err(err) = sender.channel.try_send(framed_packet) {
|
||||
if err.is_full() {
|
||||
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
))
|
||||
} else if err.is_disconnected() {
|
||||
debug!(
|
||||
"Connection to {} seems to be dead. attempting to re-establish it...",
|
||||
address
|
||||
);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, err.into_inner());
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
"reconnection attempt is in progress",
|
||||
))
|
||||
} else {
|
||||
// this can't really happen, but let's safe-guard against it in case something changes in futures library
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"unknown connection buffer error",
|
||||
))
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
let Some(sender) = self.active_connections.get_mut(&address) else {
|
||||
// there was never a connection to begin with
|
||||
debug!("establishing initial connection to {}", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
|
||||
// for sending for as soon as the connection is created
|
||||
self.make_connection(address, framed_packet);
|
||||
Err(io::Error::new(
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
"connection is in progress",
|
||||
))
|
||||
}
|
||||
));
|
||||
};
|
||||
|
||||
let sending_res = sender.channel.try_send(framed_packet);
|
||||
drop(sender);
|
||||
|
||||
sending_res.map_err(|err| {
|
||||
match err {
|
||||
TrySendError::Full(_) => {
|
||||
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
)
|
||||
}
|
||||
TrySendError::Closed(dropped) => {
|
||||
debug!(
|
||||
"Connection to {address} seems to be dead. attempting to re-establish it...",
|
||||
);
|
||||
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, dropped);
|
||||
io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
"reconnection attempt is in progress",
|
||||
)
|
||||
}
|
||||
}
|
||||
} )
|
||||
}
|
||||
}
|
||||
|
||||
@@ -252,12 +304,15 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
fn dummy_client() -> Client {
|
||||
Client::new(Config {
|
||||
initial_reconnection_backoff: Duration::from_millis(10_000),
|
||||
maximum_reconnection_backoff: Duration::from_millis(300_000),
|
||||
initial_connection_timeout: Duration::from_millis(1_500),
|
||||
maximum_connection_buffer_size: 128,
|
||||
})
|
||||
Client::new(
|
||||
Config {
|
||||
initial_reconnection_backoff: Duration::from_millis(10_000),
|
||||
maximum_reconnection_backoff: Duration::from_millis(300_000),
|
||||
initial_connection_timeout: Duration::from_millis(1_500),
|
||||
maximum_connection_buffer_size: 128,
|
||||
},
|
||||
Default::default(),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -19,8 +19,9 @@ use nym_api_requests::ecash::{
|
||||
PartialExpirationDateSignatureResponse, VerificationKeyResponse,
|
||||
};
|
||||
use nym_api_requests::models::{
|
||||
ApiHealthResponse, GatewayBondAnnotated, GatewayCoreStatusResponse, MixnodeCoreStatusResponse,
|
||||
MixnodeStatusResponse, NymNodeDescription, RewardEstimationResponse, StakeSaturationResponse,
|
||||
ApiHealthResponse, GatewayBondAnnotated, GatewayCoreStatusResponse,
|
||||
HistoricalPerformanceResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
|
||||
NymNodeDescription, RewardEstimationResponse, StakeSaturationResponse,
|
||||
};
|
||||
use nym_api_requests::models::{LegacyDescribedGateway, MixNodeBondAnnotated};
|
||||
use nym_api_requests::nym_nodes::SkimmedNode;
|
||||
@@ -32,10 +33,10 @@ use time::Date;
|
||||
use url::Url;
|
||||
|
||||
pub use crate::nym_api::NymApiClientExt;
|
||||
use nym_mixnet_contract_common::EpochRewardedSet;
|
||||
pub use nym_mixnet_contract_common::{
|
||||
mixnode::MixNodeDetails, GatewayBond, IdentityKey, IdentityKeyRef, NodeId, NymNodeDetails,
|
||||
};
|
||||
|
||||
// re-export the type to not break existing imports
|
||||
pub use crate::coconut::EcashApiClient;
|
||||
|
||||
@@ -264,6 +265,31 @@ impl<C, S> Client<C, S> {
|
||||
Ok(self.nym_api.get_gateways_detailed_unfiltered().await?)
|
||||
}
|
||||
|
||||
pub async fn get_full_node_performance_history(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> Result<Vec<HistoricalPerformanceResponse>, ValidatorClientError> {
|
||||
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
|
||||
let mut page = 0;
|
||||
let mut history = Vec::new();
|
||||
|
||||
loop {
|
||||
let mut res = self
|
||||
.nym_api
|
||||
.get_node_performance_history(node_id, Some(page), None)
|
||||
.await?;
|
||||
|
||||
history.append(&mut res.history.data);
|
||||
if history.len() < res.history.pagination.total {
|
||||
page += 1
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(history)
|
||||
}
|
||||
|
||||
// TODO: combine with NymApiClient...
|
||||
pub async fn get_all_cached_described_nodes(
|
||||
&self,
|
||||
@@ -367,6 +393,10 @@ impl NymApiClient {
|
||||
Ok(self.nym_api.get_basic_gateways().await?.nodes)
|
||||
}
|
||||
|
||||
pub async fn get_current_rewarded_set(&self) -> Result<EpochRewardedSet, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_rewarded_set().await?.into())
|
||||
}
|
||||
|
||||
/// retrieve basic information for nodes are capable of operating as an entry gateway
|
||||
/// this includes legacy gateways and nym-nodes
|
||||
pub async fn get_all_basic_entry_assigned_nodes(
|
||||
|
||||
@@ -65,6 +65,12 @@ pub enum EcashApiError {
|
||||
#[from]
|
||||
source: cosmrs::ErrorReport,
|
||||
},
|
||||
|
||||
#[error("nym api error")]
|
||||
NymApi {
|
||||
#[from]
|
||||
source: crate::ValidatorClientError,
|
||||
},
|
||||
}
|
||||
|
||||
impl TryFrom<ContractVKShare> for EcashApiClient {
|
||||
|
||||
@@ -13,7 +13,7 @@ use nym_api_requests::ecash::models::{
|
||||
use nym_api_requests::ecash::VerificationKeyResponse;
|
||||
use nym_api_requests::models::{
|
||||
AnnotationResponse, ApiHealthResponse, LegacyDescribedMixNode, NodePerformanceResponse,
|
||||
NodeRefreshBody, NymNodeDescription,
|
||||
NodeRefreshBody, NymNodeDescription, PerformanceHistoryResponse, RewardedSetResponse,
|
||||
};
|
||||
use nym_api_requests::nym_nodes::PaginatedCachedNodesResponse;
|
||||
use nym_api_requests::pagination::PaginatedResponse;
|
||||
@@ -163,6 +163,35 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn get_node_performance_history(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
page: Option<u32>,
|
||||
per_page: Option<u32>,
|
||||
) -> Result<PerformanceHistoryResponse, NymAPIError> {
|
||||
let mut params = Vec::new();
|
||||
|
||||
if let Some(page) = page {
|
||||
params.push(("page", page.to_string()))
|
||||
}
|
||||
|
||||
if let Some(per_page) = per_page {
|
||||
params.push(("per_page", per_page.to_string()))
|
||||
}
|
||||
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
routes::NYM_NODES_PERFORMANCE_HISTORY,
|
||||
&*node_id.to_string(),
|
||||
],
|
||||
¶ms,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn get_nodes_described(
|
||||
&self,
|
||||
@@ -179,8 +208,15 @@ pub trait NymApiClientExt: ApiClient {
|
||||
params.push(("per_page", per_page.to_string()))
|
||||
}
|
||||
|
||||
self.get_json(&[routes::API_VERSION, "nym-nodes", "described"], ¶ms)
|
||||
.await
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
routes::NYM_NODES_DESCRIBED,
|
||||
],
|
||||
¶ms,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
@@ -199,8 +235,15 @@ pub trait NymApiClientExt: ApiClient {
|
||||
params.push(("per_page", per_page.to_string()))
|
||||
}
|
||||
|
||||
self.get_json(&[routes::API_VERSION, "nym-nodes", "bonded"], ¶ms)
|
||||
.await
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
routes::NYM_NODES_BONDED,
|
||||
],
|
||||
¶ms,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
@@ -210,7 +253,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
"nym-nodes",
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"mixnodes",
|
||||
"skimmed",
|
||||
],
|
||||
@@ -226,7 +269,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
"nym-nodes",
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"gateways",
|
||||
"skimmed",
|
||||
],
|
||||
@@ -235,6 +278,19 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_rewarded_set(&self) -> Result<RewardedSetResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
routes::NYM_NODES_REWARDED_SET,
|
||||
],
|
||||
NO_PARAMS,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// retrieve basic information for nodes are capable of operating as an entry gateway
|
||||
/// this includes legacy gateways and nym-nodes
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
@@ -262,7 +318,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
"nym-nodes",
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
"entry-gateways",
|
||||
"all",
|
||||
@@ -299,7 +355,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
"nym-nodes",
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
"mixnodes",
|
||||
"active",
|
||||
@@ -336,7 +392,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
"nym-nodes",
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
"mixnodes",
|
||||
"all",
|
||||
@@ -368,7 +424,12 @@ pub trait NymApiClientExt: ApiClient {
|
||||
}
|
||||
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, "unstable", "nym-nodes", "skimmed"],
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
],
|
||||
¶ms,
|
||||
)
|
||||
.await
|
||||
@@ -677,8 +738,8 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"nym-nodes",
|
||||
"performance",
|
||||
routes::NYM_NODES_ROUTES,
|
||||
routes::NYM_NODES_PERFORMANCE,
|
||||
&node_id.to_string(),
|
||||
],
|
||||
NO_PARAMS,
|
||||
@@ -693,8 +754,8 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"nym-nodes",
|
||||
"annotation",
|
||||
routes::NYM_NODES_ROUTES,
|
||||
routes::NYM_NODES_ANNOTATION,
|
||||
&node_id.to_string(),
|
||||
],
|
||||
NO_PARAMS,
|
||||
@@ -912,18 +973,24 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn force_refresh_describe_cache(
|
||||
&self,
|
||||
request: &NodeRefreshBody,
|
||||
) -> Result<(), NymAPIError> {
|
||||
self.post_json(
|
||||
&[routes::API_VERSION, "nym-nodes", "refresh-described"],
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
routes::NYM_NODES_REFRESH_DESCRIBED,
|
||||
],
|
||||
NO_PARAMS,
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn issued_ticketbooks_for(
|
||||
&self,
|
||||
expiration_date: Date,
|
||||
@@ -940,6 +1007,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn issued_ticketbooks_challenge(
|
||||
&self,
|
||||
expiration_date: Date,
|
||||
|
||||
@@ -34,6 +34,19 @@ pub mod ecash {
|
||||
pub const EPOCH_ID_PARAM: &str = "epoch_id";
|
||||
}
|
||||
|
||||
pub const NYM_NODES_ROUTES: &str = "nym-nodes";
|
||||
|
||||
pub use nym_nodes::*;
|
||||
pub mod nym_nodes {
|
||||
pub const NYM_NODES_PERFORMANCE_HISTORY: &str = "performance-history";
|
||||
pub const NYM_NODES_PERFORMANCE: &str = "performance";
|
||||
pub const NYM_NODES_ANNOTATION: &str = "annotation";
|
||||
pub const NYM_NODES_DESCRIBED: &str = "described";
|
||||
pub const NYM_NODES_BONDED: &str = "bonded";
|
||||
pub const NYM_NODES_REWARDED_SET: &str = "rewarded-set";
|
||||
pub const NYM_NODES_REFRESH_DESCRIBED: &str = "refresh-described";
|
||||
}
|
||||
|
||||
pub const STATUS_ROUTES: &str = "status";
|
||||
pub const API_STATUS_ROUTES: &str = "api-status";
|
||||
pub const HEALTH: &str = "health";
|
||||
|
||||
+15
-12
@@ -26,10 +26,10 @@ use nym_mixnet_contract_common::{
|
||||
reward_params::{Performance, RewardingParams},
|
||||
rewarding::{EstimatedCurrentEpochRewardResponse, PendingRewardResponse},
|
||||
ContractBuildInformation, ContractState, ContractStateParams, CurrentIntervalResponse,
|
||||
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochStatus, GatewayBond,
|
||||
GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry, IdentityKey,
|
||||
IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails, MixOwnershipResponse,
|
||||
MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
|
||||
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochRewardedSet, EpochStatus,
|
||||
GatewayBond, GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry,
|
||||
IdentityKey, IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails,
|
||||
MixOwnershipResponse, MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
|
||||
NumberOfPendingEventsResponse, NymNodeBond, NymNodeDetails, NymNodeVersionHistoryResponse,
|
||||
PagedAllDelegationsResponse, PagedDelegatorDelegationsResponse, PagedGatewayResponse,
|
||||
PagedMixnodeBondsResponse, PagedNodeDelegationsResponse, PendingEpochEvent,
|
||||
@@ -670,7 +670,7 @@ impl<T> PagedMixnetQueryClient for T where T: MixnetQueryClient {}
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
pub trait MixnetQueryClientExt: MixnetQueryClient {
|
||||
async fn get_rewarded_set(&self) -> Result<RewardedSet, NyxdError> {
|
||||
async fn get_rewarded_set(&self) -> Result<EpochRewardedSet, NyxdError> {
|
||||
let error_response = |message| Err(NyxdError::extension_query_failure("mixnet", message));
|
||||
|
||||
let metadata = self.get_rewarded_set_metadata().await?;
|
||||
@@ -711,13 +711,16 @@ pub trait MixnetQueryClientExt: MixnetQueryClient {
|
||||
return error_response("the nodes assigned for 'standby' returned unexpected epoch_id");
|
||||
}
|
||||
|
||||
Ok(RewardedSet {
|
||||
entry_gateways: entry.nodes,
|
||||
exit_gateways: exit.nodes,
|
||||
layer1: layer1.nodes,
|
||||
layer2: layer2.nodes,
|
||||
layer3: layer3.nodes,
|
||||
standby: standby.nodes,
|
||||
Ok(EpochRewardedSet {
|
||||
epoch_id: expected_epoch_id,
|
||||
assignment: RewardedSet {
|
||||
entry_gateways: entry.nodes,
|
||||
exit_gateways: exit.nodes,
|
||||
layer1: layer1.nodes,
|
||||
layer2: layer2.nodes,
|
||||
layer3: layer3.nodes,
|
||||
standby: standby.nodes,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
+31
@@ -153,13 +153,20 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
let req = QueryAllBalancesRequest {
|
||||
address: address.to_string(),
|
||||
pagination,
|
||||
resolve_denom: false,
|
||||
};
|
||||
|
||||
let mut res = self
|
||||
.make_abci_query::<_, QueryAllBalancesResponse>(path.clone(), req)
|
||||
.await?;
|
||||
|
||||
let early_break = res.balances.is_empty();
|
||||
raw_balances.append(&mut res.balances);
|
||||
|
||||
if early_break {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(next_key) = next_page_key(res.pagination) {
|
||||
pagination = Some(create_pagination(next_key))
|
||||
} else {
|
||||
@@ -187,7 +194,13 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
.make_abci_query::<_, QueryTotalSupplyResponse>(path.clone(), req)
|
||||
.await?;
|
||||
|
||||
let early_break = res.supply.is_empty();
|
||||
supply.append(&mut res.supply);
|
||||
|
||||
if early_break {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(next_key) = next_page_key(res.pagination) {
|
||||
pagination = Some(create_pagination(next_key))
|
||||
} else {
|
||||
@@ -328,7 +341,13 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
.make_abci_query::<_, QueryCodesResponse>(path.clone(), req)
|
||||
.await?;
|
||||
|
||||
let early_break = res.code_infos.is_empty();
|
||||
raw_codes.append(&mut res.code_infos);
|
||||
|
||||
if early_break {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(next_key) = next_page_key(res.pagination) {
|
||||
pagination = Some(create_pagination(next_key))
|
||||
} else {
|
||||
@@ -373,7 +392,13 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
.make_abci_query::<_, QueryContractsByCodeResponse>(path.clone(), req)
|
||||
.await?;
|
||||
|
||||
let early_break = res.contracts.is_empty();
|
||||
raw_contracts.append(&mut res.contracts);
|
||||
|
||||
if early_break {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(next_key) = next_page_key(res.pagination) {
|
||||
pagination = Some(create_pagination(next_key))
|
||||
} else {
|
||||
@@ -429,7 +454,13 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
.make_abci_query::<_, QueryContractHistoryResponse>(path.clone(), req)
|
||||
.await?;
|
||||
|
||||
let early_break = res.entries.is_empty();
|
||||
raw_entries.append(&mut res.entries);
|
||||
|
||||
if early_break {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(next_key) = next_page_key(res.pagination) {
|
||||
pagination = Some(create_pagination(next_key))
|
||||
} else {
|
||||
|
||||
@@ -4,9 +4,11 @@
|
||||
use crate::rpc::TendermintRpcClient;
|
||||
use async_trait::async_trait;
|
||||
use base64::Engine;
|
||||
use cosmrs::tendermint;
|
||||
use cosmrs::tendermint::{block::Height, evidence::Evidence, Hash};
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::{header, RequestBuilder};
|
||||
use tendermint_rpc::dialect::{v0_34, v0_37, v0_38, LatestDialect};
|
||||
use tendermint_rpc::{
|
||||
client::CompatMode,
|
||||
dialect::{self, Dialect},
|
||||
@@ -21,8 +23,21 @@ macro_rules! perform_with_compat {
|
||||
($self:expr, $request:expr) => {{
|
||||
let request = $request;
|
||||
match $self.compat {
|
||||
CompatMode::V0_37 => $self.perform_v0_37(request).await,
|
||||
CompatMode::V0_34 => $self.perform_v0_34(request).await,
|
||||
CompatMode::V0_38 => {
|
||||
$self
|
||||
.perform_request_with_dialect(request, dialect::v0_38::Dialect)
|
||||
.await
|
||||
}
|
||||
CompatMode::V0_37 => {
|
||||
$self
|
||||
.perform_request_with_dialect(request, dialect::v0_37::Dialect)
|
||||
.await
|
||||
}
|
||||
CompatMode::V0_34 => {
|
||||
$self
|
||||
.perform_request_with_dialect(request, dialect::v0_34::Dialect)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
@@ -70,7 +85,11 @@ impl ReqwestRpcClient {
|
||||
.headers(headers)
|
||||
}
|
||||
|
||||
async fn perform_request<R, S>(&self, request: R) -> Result<R::Output, Error>
|
||||
async fn perform_request_with_dialect<R, S>(
|
||||
&self,
|
||||
request: R,
|
||||
_dialect: S,
|
||||
) -> Result<R::Output, Error>
|
||||
where
|
||||
R: SimpleRequest<S>,
|
||||
S: Dialect,
|
||||
@@ -81,26 +100,25 @@ impl ReqwestRpcClient {
|
||||
.send()
|
||||
.await
|
||||
.map_err(TendermintRpcErrorMap::into_rpc_err)?;
|
||||
let response_status = response.status();
|
||||
let bytes = response
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(TendermintRpcErrorMap::into_rpc_err)?;
|
||||
|
||||
// Successful JSON-RPC requests are expected to return a 200 OK HTTP status.
|
||||
// Otherwise, this means that the HTTP request failed as a whole,
|
||||
// as opposed to the JSON-RPC request returning an error,
|
||||
// and we cannot expect the response body to be a valid JSON-RPC response.
|
||||
if response_status != reqwest::StatusCode::OK {
|
||||
// hehe, that's so nasty but we have to somehow convert between different versions of the same lib
|
||||
return Err(Error::http_request_failed(
|
||||
response_status.as_u16().try_into().unwrap(),
|
||||
));
|
||||
}
|
||||
|
||||
R::Response::from_string(bytes).map(Into::into)
|
||||
}
|
||||
|
||||
async fn perform_v0_34<R>(&self, request: R) -> Result<R::Output, Error>
|
||||
where
|
||||
R: SimpleRequest<dialect::v0_34::Dialect>,
|
||||
{
|
||||
self.perform_request(request).await
|
||||
}
|
||||
|
||||
async fn perform_v0_37<R>(&self, request: R) -> Result<R::Output, Error>
|
||||
where
|
||||
R: SimpleRequest<dialect::v0_37::Dialect>,
|
||||
{
|
||||
self.perform_request(request).await
|
||||
}
|
||||
}
|
||||
|
||||
trait TendermintRpcErrorMap {
|
||||
@@ -120,18 +138,50 @@ impl TendermintRpcClient for ReqwestRpcClient {
|
||||
where
|
||||
R: SimpleRequest,
|
||||
{
|
||||
self.perform_request(request).await
|
||||
self.perform_request_with_dialect(request, LatestDialect)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
|
||||
async fn block<H>(&self, height: H) -> Result<endpoint::block::Response, Error>
|
||||
where
|
||||
H: Into<Height> + Send,
|
||||
{
|
||||
perform_with_compat!(self, block_results::Request::new(height.into()))
|
||||
perform_with_compat!(self, endpoint::block::Request::new(height.into()))
|
||||
}
|
||||
|
||||
async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
|
||||
perform_with_compat!(self, block_results::Request::default())
|
||||
async fn block_by_hash(
|
||||
&self,
|
||||
hash: tendermint::Hash,
|
||||
) -> Result<endpoint::block_by_hash::Response, Error> {
|
||||
perform_with_compat!(self, endpoint::block_by_hash::Request::new(hash))
|
||||
}
|
||||
|
||||
async fn latest_block(&self) -> Result<endpoint::block::Response, Error> {
|
||||
perform_with_compat!(self, endpoint::block::Request::default())
|
||||
}
|
||||
|
||||
async fn block_results<H>(&self, height: H) -> Result<endpoint::block_results::Response, Error>
|
||||
where
|
||||
H: Into<Height> + Send,
|
||||
{
|
||||
perform_with_compat!(self, endpoint::block_results::Request::new(height.into()))
|
||||
}
|
||||
|
||||
async fn latest_block_results(&self) -> Result<endpoint::block_results::Response, Error> {
|
||||
perform_with_compat!(self, endpoint::block_results::Request::default())
|
||||
}
|
||||
|
||||
async fn block_search(
|
||||
&self,
|
||||
query: Query,
|
||||
page: u32,
|
||||
per_page: u8,
|
||||
order: Order,
|
||||
) -> Result<endpoint::block_search::Response, Error> {
|
||||
perform_with_compat!(
|
||||
self,
|
||||
endpoint::block_search::Request::new(query, page, per_page, order)
|
||||
)
|
||||
}
|
||||
|
||||
async fn header<H>(&self, height: H) -> Result<endpoint::header::Response, Error>
|
||||
@@ -140,11 +190,26 @@ impl TendermintRpcClient for ReqwestRpcClient {
|
||||
{
|
||||
let height = height.into();
|
||||
match self.compat {
|
||||
CompatMode::V0_37 => self.perform(endpoint::header::Request::new(height)).await,
|
||||
CompatMode::V0_38 => {
|
||||
self.perform_request_with_dialect(
|
||||
endpoint::header::Request::new(height),
|
||||
v0_38::Dialect,
|
||||
)
|
||||
.await
|
||||
}
|
||||
CompatMode::V0_37 => {
|
||||
self.perform_request_with_dialect(
|
||||
endpoint::header::Request::new(height),
|
||||
v0_37::Dialect,
|
||||
)
|
||||
.await
|
||||
}
|
||||
CompatMode::V0_34 => {
|
||||
// Back-fill with a request to /block endpoint and
|
||||
// taking just the header from the response.
|
||||
let resp = self.perform_v0_34(block::Request::new(height)).await?;
|
||||
let resp = self
|
||||
.perform_request_with_dialect(block::Request::new(height), v0_34::Dialect)
|
||||
.await?;
|
||||
Ok(resp.into())
|
||||
}
|
||||
}
|
||||
@@ -152,12 +217,25 @@ impl TendermintRpcClient for ReqwestRpcClient {
|
||||
|
||||
async fn header_by_hash(&self, hash: Hash) -> Result<header_by_hash::Response, Error> {
|
||||
match self.compat {
|
||||
CompatMode::V0_37 => self.perform(header_by_hash::Request::new(hash)).await,
|
||||
CompatMode::V0_38 => {
|
||||
self.perform_request_with_dialect(
|
||||
header_by_hash::Request::new(hash),
|
||||
v0_38::Dialect,
|
||||
)
|
||||
.await
|
||||
}
|
||||
CompatMode::V0_37 => {
|
||||
self.perform_request_with_dialect(
|
||||
header_by_hash::Request::new(hash),
|
||||
v0_37::Dialect,
|
||||
)
|
||||
.await
|
||||
}
|
||||
CompatMode::V0_34 => {
|
||||
// Back-fill with a request to /block_by_hash endpoint and
|
||||
// taking just the header from the response.
|
||||
let resp = self
|
||||
.perform_v0_34(block_by_hash::Request::new(hash))
|
||||
.perform_request_with_dialect(block_by_hash::Request::new(hash), v0_34::Dialect)
|
||||
.await?;
|
||||
Ok(resp.into())
|
||||
}
|
||||
@@ -167,8 +245,18 @@ impl TendermintRpcClient for ReqwestRpcClient {
|
||||
/// `/broadcast_evidence`: broadcast an evidence.
|
||||
async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
|
||||
match self.compat {
|
||||
CompatMode::V0_37 => self.perform(evidence::Request::new(e)).await,
|
||||
CompatMode::V0_34 => self.perform_v0_34(evidence::Request::new(e)).await,
|
||||
CompatMode::V0_38 => {
|
||||
self.perform_request_with_dialect(evidence::Request::new(e), v0_38::Dialect)
|
||||
.await
|
||||
}
|
||||
CompatMode::V0_37 => {
|
||||
self.perform_request_with_dialect(evidence::Request::new(e), v0_37::Dialect)
|
||||
.await
|
||||
}
|
||||
CompatMode::V0_34 => {
|
||||
self.perform_request_with_dialect(evidence::Request::new(e), v0_34::Dialect)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,8 @@ dirs = { workspace = true, optional = true }
|
||||
handlebars = { workspace = true }
|
||||
log = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
toml = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
toml = { workspace = true, features = ["display"] }
|
||||
url = { workspace = true }
|
||||
|
||||
nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
use std::io;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum NymConfigTomlError {
|
||||
#[error(transparent)]
|
||||
FileIoFailure(#[from] io::Error),
|
||||
#[error(transparent)]
|
||||
TomlSerializeFailure(#[from] toml::ser::Error),
|
||||
}
|
||||
@@ -13,6 +13,7 @@ pub use helpers::{parse_urls, OptionalSet};
|
||||
pub use toml::de::Error as TomlDeError;
|
||||
|
||||
pub mod defaults;
|
||||
pub mod error;
|
||||
pub mod helpers;
|
||||
pub mod legacy_helpers;
|
||||
pub mod serde_helpers;
|
||||
@@ -95,6 +96,42 @@ where
|
||||
config.format_to_writer(file)
|
||||
}
|
||||
|
||||
pub fn save_unformatted_config_to_file<C, P>(
|
||||
config: &C,
|
||||
path: P,
|
||||
) -> Result<(), error::NymConfigTomlError>
|
||||
where
|
||||
C: Serialize + ?Sized,
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let path = path.as_ref();
|
||||
log::info!("saving config file to {}", path.display());
|
||||
|
||||
if let Some(parent) = path.parent() {
|
||||
create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
let mut file = File::create(path)?;
|
||||
|
||||
// TODO: check for whether any of our configs store anything sensitive
|
||||
// and change that to 0o644 instead
|
||||
#[cfg(target_family = "unix")]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
let mut perms = fs::metadata(path)?.permissions();
|
||||
perms.set_mode(0o600);
|
||||
fs::set_permissions(path, perms)?;
|
||||
}
|
||||
|
||||
// let serde format the TOML in whatever ugly way it chooses
|
||||
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
|
||||
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
|
||||
let toml_string = toml::to_string_pretty(config)?;
|
||||
|
||||
Ok(file.write_all(toml_string.as_bytes())?)
|
||||
}
|
||||
|
||||
pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
|
||||
where
|
||||
C: DeserializeOwned,
|
||||
|
||||
@@ -13,6 +13,7 @@ cosmwasm-std = { workspace = true }
|
||||
cosmwasm-schema = { workspace = true }
|
||||
cw-storage-plus = { workspace = true }
|
||||
schemars = { workspace = true }
|
||||
utoipa = { workspace = true, optional = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
@@ -23,4 +24,5 @@ serde_json = { workspace = true }
|
||||
vergen = { workspace = true, features = ["build", "git", "gitcl", "rustc", "cargo"] }
|
||||
|
||||
[features]
|
||||
naive_float = []
|
||||
naive_float = []
|
||||
utoipa = ["dep:utoipa"]
|
||||
|
||||
@@ -221,6 +221,7 @@ fn default_unknown() -> String {
|
||||
// TODO: there's no reason this couldn't be used for proper binaries, but in that case
|
||||
// perhaps the struct should get renamed and moved to a "more" common crate
|
||||
#[cw_serde]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct ContractBuildInformation {
|
||||
/// Provides the name of the binary, i.e. the content of `CARGO_PKG_NAME` environmental variable.
|
||||
#[serde(default = "default_unknown")]
|
||||
|
||||
@@ -42,9 +42,11 @@ pub struct Gateway {
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct GatewayBond {
|
||||
/// Original amount pledged by the operator of this node.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
|
||||
pub pledge_amount: Coin,
|
||||
|
||||
/// Address of the owner of this gateway.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub owner: Addr,
|
||||
|
||||
/// Block height at which this gateway has been bonded.
|
||||
@@ -55,6 +57,7 @@ pub struct GatewayBond {
|
||||
|
||||
/// Entity who bonded this gateway on behalf of the owner.
|
||||
/// If exists, it's most likely the address of the vesting contract.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub proxy: Option<Addr>,
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
use crate::constants::{TOKEN_SUPPLY, UNIT_DELEGATION_BASE};
|
||||
use crate::error::MixnetContractError;
|
||||
use crate::helpers::IntoBaseDecimal;
|
||||
use crate::nym_node::Role;
|
||||
use crate::reward_params::{NodeRewardingParameters, RewardingParams};
|
||||
use crate::rewarding::helpers::truncate_reward;
|
||||
use crate::rewarding::RewardDistribution;
|
||||
@@ -81,20 +82,25 @@ impl MixNodeDetails {
|
||||
|
||||
// currently this struct is shared between mixnodes and nymnodes
|
||||
#[cw_serde]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct NodeRewarding {
|
||||
/// Information provided by the operator that influence the cost function.
|
||||
pub cost_params: NodeCostParams,
|
||||
|
||||
/// Total pledge and compounded reward earned by the node operator.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub operator: Decimal,
|
||||
|
||||
/// Total delegation and compounded reward earned by all node delegators.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub delegates: Decimal,
|
||||
|
||||
/// Cumulative reward earned by the "unit delegation" since the block 0.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub total_unit_reward: Decimal,
|
||||
|
||||
/// Value of the theoretical "unit delegation" that has delegated to this node at block 0.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub unit_delegation: Decimal,
|
||||
|
||||
/// Marks the epoch when this node was last rewarded so that we wouldn't accidentally attempt
|
||||
@@ -491,14 +497,17 @@ impl NodeRewarding {
|
||||
::cosmwasm_schema::schemars::JsonSchema,
|
||||
)]
|
||||
#[schemars(crate = "::cosmwasm_schema::schemars")]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct MixNodeBond {
|
||||
/// Unique id assigned to the bonded mixnode.
|
||||
pub mix_id: NodeId,
|
||||
|
||||
/// Address of the owner of this mixnode.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub owner: Addr,
|
||||
|
||||
/// Original amount pledged by the operator of this node.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
|
||||
pub original_pledge: Coin,
|
||||
|
||||
// REMOVED (but might be needed due to legacy things, idk yet)
|
||||
@@ -509,6 +518,7 @@ pub struct MixNodeBond {
|
||||
|
||||
/// Entity who bonded this mixnode on behalf of the owner.
|
||||
/// If exists, it's most likely the address of the vesting contract.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = Option<String>))]
|
||||
pub proxy: Option<Addr>,
|
||||
|
||||
/// Block height at which this mixnode has been bonded.
|
||||
@@ -544,6 +554,7 @@ impl MixNodeBond {
|
||||
feature = "generate-ts",
|
||||
ts(export, export_to = "ts-packages/types/src/types/rust/Mixnode.ts")
|
||||
)]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct MixNode {
|
||||
/// Network address of this mixnode, for example 1.1.1.1 or foo.mixnode.com
|
||||
pub host: String,
|
||||
@@ -570,11 +581,14 @@ pub struct MixNode {
|
||||
/// The cost parameters, or the cost function, defined for the particular mixnode that influences
|
||||
/// how the rewards should be split between the node operator and its delegators.
|
||||
#[cw_serde]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct NodeCostParams {
|
||||
/// The profit margin of the associated node, i.e. the desired percent of the reward to be distributed to the operator.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub profit_margin_percent: Percent,
|
||||
|
||||
/// Operating cost of the associated node per the entire interval.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
|
||||
pub interval_operating_cost: Coin,
|
||||
}
|
||||
|
||||
@@ -611,6 +625,16 @@ pub enum LegacyMixLayer {
|
||||
Three = 3,
|
||||
}
|
||||
|
||||
impl From<LegacyMixLayer> for Role {
|
||||
fn from(layer: LegacyMixLayer) -> Self {
|
||||
match layer {
|
||||
LegacyMixLayer::One => Role::Layer1,
|
||||
LegacyMixLayer::Two => Role::Layer2,
|
||||
LegacyMixLayer::Three => Role::Layer3,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LegacyMixLayer> for String {
|
||||
fn from(layer: LegacyMixLayer) -> Self {
|
||||
(layer as u8).to_string()
|
||||
@@ -669,7 +693,9 @@ pub struct PendingMixNodeChanges {
|
||||
}
|
||||
|
||||
#[derive(Default, Copy, Clone, Debug, Serialize, Deserialize, JsonSchema)]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct LegacyPendingMixNodeChanges {
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = Option<u32>))]
|
||||
pub pledge_change: Option<EpochEventId>,
|
||||
}
|
||||
|
||||
|
||||
@@ -863,11 +863,4 @@ pub enum QueryMsg {
|
||||
pub struct MigrateMsg {
|
||||
pub unsafe_skip_state_updates: Option<bool>,
|
||||
pub vesting_contract_address: Option<String>,
|
||||
pub current_nym_node_semver: String,
|
||||
|
||||
#[serde(default)]
|
||||
pub version_score_weights: OutdatedVersionWeights,
|
||||
|
||||
#[serde(default)]
|
||||
pub version_score_params: VersionScoreFormulaParams,
|
||||
}
|
||||
|
||||
@@ -113,6 +113,10 @@ impl Role {
|
||||
pub fn is_standby(&self) -> bool {
|
||||
matches!(self, Role::Standby)
|
||||
}
|
||||
|
||||
pub fn is_mixnode(&self) -> bool {
|
||||
matches!(self, Role::Layer1 | Role::Layer2 | Role::Layer3)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Role {
|
||||
@@ -231,6 +235,7 @@ pub struct RoleMetadata {
|
||||
|
||||
/// Full details associated with given node.
|
||||
#[cw_serde]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct NymNodeDetails {
|
||||
/// Basic bond information of this node, such as owner address, original pledge, etc.
|
||||
pub bond_information: NymNodeBond,
|
||||
@@ -288,14 +293,19 @@ impl NymNodeDetails {
|
||||
}
|
||||
|
||||
#[cw_serde]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct NymNodeBond {
|
||||
/// Unique id assigned to the bonded node.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = u32))]
|
||||
pub node_id: NodeId,
|
||||
|
||||
/// Address of the owner of this nym-node.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub owner: Addr,
|
||||
|
||||
/// Original amount pledged by the operator of this node.
|
||||
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = crate::CoinSchema))]
|
||||
pub original_pledge: Coin,
|
||||
|
||||
/// Block height at which this nym-node has been bonded.
|
||||
@@ -348,6 +358,7 @@ impl NymNodeBond {
|
||||
feature = "generate-ts",
|
||||
ts(export, export_to = "ts-packages/types/src/types/rust/NymNode.ts")
|
||||
)]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct NymNode {
|
||||
/// Network address of this nym-node, for example 1.1.1.1 or foo.mixnode.com
|
||||
/// that is used to discover other capabilities of this node.
|
||||
@@ -358,6 +369,7 @@ pub struct NymNode {
|
||||
pub custom_http_port: Option<u16>,
|
||||
|
||||
/// Base58-encoded ed25519 EdDSA public key.
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub identity_key: IdentityKey,
|
||||
// TODO: I don't think we want to include sphinx keys here,
|
||||
// given we want to rotate them and keeping that in sync with contract will be a PITA
|
||||
@@ -435,8 +447,11 @@ pub struct NodeConfigUpdate {
|
||||
export_to = "ts-packages/types/src/types/rust/PendingNodeChanges.ts"
|
||||
)
|
||||
)]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct PendingNodeChanges {
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = Option<u32>))]
|
||||
pub pledge_change: Option<EpochEventId>,
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = Option<u32>))]
|
||||
pub cost_params_change: Option<IntervalEventId>,
|
||||
}
|
||||
|
||||
|
||||
@@ -21,31 +21,37 @@ pub type WorkFactor = Decimal;
|
||||
)]
|
||||
#[cw_serde]
|
||||
#[derive(Copy)]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct IntervalRewardParams {
|
||||
/// Current value of the rewarding pool.
|
||||
/// It is expected to be constant throughout the interval.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub reward_pool: Decimal,
|
||||
|
||||
/// Current value of the staking supply.
|
||||
/// It is expected to be constant throughout the interval.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub staking_supply: Decimal,
|
||||
|
||||
/// Defines the percentage of stake needed to reach saturation for all of the nodes in the rewarded set.
|
||||
/// Also known as `beta`.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub staking_supply_scale_factor: Percent,
|
||||
|
||||
// computed values
|
||||
/// Current value of the computed reward budget per epoch, per node.
|
||||
/// It is expected to be constant throughout the interval.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub epoch_reward_budget: Decimal,
|
||||
|
||||
/// Current value of the stake saturation point.
|
||||
/// It is expected to be constant throughout the interval.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub stake_saturation_point: Decimal,
|
||||
|
||||
// constants(-ish)
|
||||
@@ -54,6 +60,7 @@ pub struct IntervalRewardParams {
|
||||
/// It is not really expected to be changing very often.
|
||||
/// As a matter of fact, unless there's a very specific reason, it should remain constant.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub sybil_resistance: Percent,
|
||||
|
||||
// default: 10
|
||||
@@ -61,6 +68,7 @@ pub struct IntervalRewardParams {
|
||||
/// It is not really expected to be changing very often.
|
||||
/// As a matter of fact, unless there's a very specific reason, it should remain constant.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub active_set_work_factor: Decimal,
|
||||
|
||||
// default: 2%
|
||||
@@ -70,6 +78,7 @@ pub struct IntervalRewardParams {
|
||||
/// It is not really expected to be changing very often.
|
||||
/// As a matter of fact, unless there's a very specific reason, it should remain constant.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub interval_pool_emission: Percent,
|
||||
}
|
||||
|
||||
@@ -90,6 +99,7 @@ impl IntervalRewardParams {
|
||||
)]
|
||||
#[cw_serde]
|
||||
#[derive(Copy)]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct RewardingParams {
|
||||
/// Parameters that should remain unchanged throughout an interval.
|
||||
pub interval: IntervalRewardParams,
|
||||
@@ -254,6 +264,7 @@ impl RewardingParams {
|
||||
)]
|
||||
#[cw_serde]
|
||||
#[derive(Copy)]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct RewardedSetParams {
|
||||
/// The expected number of nodes assigned entry gateway role (i.e. [`Role::EntryGateway`])
|
||||
pub entry_gateways: u32,
|
||||
|
||||
@@ -17,10 +17,12 @@ pub mod simulator;
|
||||
)]
|
||||
#[cw_serde]
|
||||
#[derive(Copy, Default)]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub struct RewardEstimate {
|
||||
/// The amount of **decimal** coins that are going to get distributed to the node,
|
||||
/// i.e. the operator and all its delegators.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub total_node_reward: Decimal,
|
||||
|
||||
// note that operator reward includes the operating_cost,
|
||||
@@ -28,14 +30,17 @@ pub struct RewardEstimate {
|
||||
// in that case the operator reward would still be `1nym` as opposed to 0
|
||||
/// The share of the reward that is going to get distributed to the node operator.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub operator: Decimal,
|
||||
|
||||
/// The share of the reward that is going to get distributed among the node delegators.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub delegates: Decimal,
|
||||
|
||||
/// The operating cost of this node. Note: it's already included in the operator reward.
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
#[cfg_attr(feature = "utoipa", schema(value_type = String))]
|
||||
pub operating_cost: Decimal,
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::config_score::{ConfigScoreParams, OutdatedVersionWeights, VersionScoreFormulaParams};
|
||||
use crate::nym_node::Role;
|
||||
use crate::EpochId;
|
||||
use contracts_common::Percent;
|
||||
use cosmwasm_schema::cw_serde;
|
||||
use cosmwasm_std::Coin;
|
||||
@@ -32,6 +33,23 @@ impl RoleAssignment {
|
||||
}
|
||||
}
|
||||
|
||||
#[cw_serde]
|
||||
#[derive(Default)]
|
||||
pub struct EpochRewardedSet {
|
||||
pub epoch_id: EpochId,
|
||||
|
||||
pub assignment: RewardedSet,
|
||||
}
|
||||
|
||||
impl From<(EpochId, RewardedSet)> for EpochRewardedSet {
|
||||
fn from((epoch_id, assignment): (EpochId, RewardedSet)) -> Self {
|
||||
EpochRewardedSet {
|
||||
epoch_id,
|
||||
assignment,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cw_serde]
|
||||
#[derive(Default)]
|
||||
pub struct RewardedSet {
|
||||
@@ -69,6 +87,29 @@ impl RewardedSet {
|
||||
pub fn rewarded_set_size(&self) -> usize {
|
||||
self.active_set_size() + self.standby.len()
|
||||
}
|
||||
|
||||
pub fn get_role(&self, node_id: NodeId) -> Option<Role> {
|
||||
// given each role has ~100 entries in them, doing linear lookup with vec should be fine
|
||||
if self.entry_gateways.contains(&node_id) {
|
||||
return Some(Role::EntryGateway);
|
||||
}
|
||||
if self.exit_gateways.contains(&node_id) {
|
||||
return Some(Role::ExitGateway);
|
||||
}
|
||||
if self.layer1.contains(&node_id) {
|
||||
return Some(Role::Layer1);
|
||||
}
|
||||
if self.layer2.contains(&node_id) {
|
||||
return Some(Role::Layer2);
|
||||
}
|
||||
if self.layer3.contains(&node_id) {
|
||||
return Some(Role::Layer3);
|
||||
}
|
||||
if self.standby.contains(&node_id) {
|
||||
return Some(Role::Standby);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cw_serde]
|
||||
@@ -134,6 +175,14 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "utoipa")]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
#[cfg_attr(feature = "utoipa", schema(title = "Coin"))]
|
||||
pub struct CoinSchema {
|
||||
pub denom: String,
|
||||
pub amount: String,
|
||||
}
|
||||
|
||||
/// The current state of the mixnet contract.
|
||||
#[cw_serde]
|
||||
pub struct ContractState {
|
||||
|
||||
@@ -23,6 +23,11 @@ impl SqliteEcashTicketbookManager {
|
||||
SqliteEcashTicketbookManager { connection_pool }
|
||||
}
|
||||
|
||||
/// Closes the connection pool.
|
||||
pub async fn close(&self) {
|
||||
self.connection_pool.close().await
|
||||
}
|
||||
|
||||
pub(crate) async fn cleanup_expired(&self, deadline: Date) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!(
|
||||
"DELETE FROM ecash_ticketbook WHERE expiration_date <= ?",
|
||||
|
||||
@@ -43,6 +43,10 @@ impl Debug for EphemeralStorage {
|
||||
impl Storage for EphemeralStorage {
|
||||
type StorageError = StorageError;
|
||||
|
||||
async fn close(&self) {
|
||||
// nothing to do here
|
||||
}
|
||||
|
||||
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
|
||||
self.storage_manager.cleanup_expired().await;
|
||||
Ok(())
|
||||
|
||||
@@ -33,7 +33,10 @@ use nym_credentials::{
|
||||
IssuanceTicketBook, IssuedTicketBook,
|
||||
};
|
||||
use nym_ecash_time::{ecash_today, Date, EcashTime};
|
||||
use sqlx::ConnectOptions;
|
||||
use sqlx::{
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
ConnectOptions,
|
||||
};
|
||||
use std::path::Path;
|
||||
use zeroize::Zeroizing;
|
||||
|
||||
@@ -56,6 +59,9 @@ impl PersistentStorage {
|
||||
);
|
||||
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path)
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
@@ -83,6 +89,10 @@ impl PersistentStorage {
|
||||
impl Storage for PersistentStorage {
|
||||
type StorageError = StorageError;
|
||||
|
||||
async fn close(&self) {
|
||||
self.storage_manager.close().await
|
||||
}
|
||||
|
||||
/// remove all expired ticketbooks and expiration date signatures
|
||||
async fn cleanup_expired(&self) -> Result<(), Self::StorageError> {
|
||||
let ecash_yesterday = ecash_today().date().previous_day().unwrap();
|
||||
|
||||
@@ -22,6 +22,8 @@ use std::error::Error;
|
||||
pub trait Storage: Send + Sync {
|
||||
type StorageError: Error;
|
||||
|
||||
async fn close(&self);
|
||||
|
||||
/// remove all expired ticketbooks and expiration date signatures
|
||||
async fn cleanup_expired(&self) -> Result<(), Self::StorageError>;
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ use nym_api_requests::constants::MIN_BATCH_REDEMPTION_DELAY;
|
||||
use nym_api_requests::ecash::models::{BatchRedeemTicketsBody, VerifyEcashTicketBody};
|
||||
use nym_credentials_interface::Bandwidth;
|
||||
use nym_credentials_interface::{ClientTicket, TicketType};
|
||||
use nym_validator_client::coconut::EcashApiError;
|
||||
use nym_validator_client::nym_api::EpochId;
|
||||
use nym_validator_client::nyxd::contract_traits::{
|
||||
EcashSigningClient, MultisigQueryClient, MultisigSigningClient, PagedMultisigQueryClient,
|
||||
@@ -352,7 +353,9 @@ impl CredentialHandler {
|
||||
}
|
||||
Err(err) => {
|
||||
error!("failed to send ticket {ticket_id} for verification to ecash signer '{client}': {err}. if we don't reach quorum, we'll retry later");
|
||||
Ok(false)
|
||||
Err(EcashTicketError::ApiFailure(EcashApiError::NymApi {
|
||||
source: err,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ serde = { workspace = true, features = ["derive"] }
|
||||
thiserror = { workspace = true }
|
||||
strum = { workspace = true, features = ["derive"] }
|
||||
time = { workspace = true, features = ["serde"] }
|
||||
utoipa = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
|
||||
nym-compact-ecash = { path = "../nym_offline_compact_ecash" }
|
||||
|
||||
@@ -86,7 +86,6 @@ impl Display for AddressPolicyAction {
|
||||
/// ```
|
||||
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
#[cfg_attr(feature = "openapi", aliases(ExitPolicy))]
|
||||
pub struct AddressPolicy {
|
||||
/// A list of rules to apply to find out whether an address is
|
||||
/// contained by this policy.
|
||||
@@ -727,10 +726,10 @@ mod test {
|
||||
let policy = AddressPolicy::parse_from_torrc(
|
||||
r#"
|
||||
ExitPolicy reject 1.2.3.4/32:*
|
||||
ExitPolicy reject 1.2.3.5:*
|
||||
ExitPolicy reject 1.2.3.5:*
|
||||
ExitPolicy reject 1.2.3.6/16:*
|
||||
ExitPolicy reject 1.2.3.6/16:123-456
|
||||
ExitPolicy accept *:53
|
||||
ExitPolicy reject 1.2.3.6/16:123-456
|
||||
ExitPolicy accept *:53
|
||||
ExitPolicy accept6 *6:119
|
||||
ExitPolicy accept *4:120
|
||||
ExitPolicy reject6 [FC00::]/7:*
|
||||
|
||||
@@ -9,7 +9,7 @@ use futures::{Sink, Stream};
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use tungstenite::Message as WsMessage;
|
||||
|
||||
impl<'a, S, R> State<'a, S, R> {
|
||||
impl<S, R> State<'_, S, R> {
|
||||
async fn client_handshake_inner(&mut self) -> Result<(), HandshakeError>
|
||||
where
|
||||
S: Stream<Item = WsItem> + Sink<WsMessage> + Unpin,
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::registration::handshake::{error::HandshakeError, WsItem};
|
||||
use futures::{Sink, Stream};
|
||||
use tungstenite::Message as WsMessage;
|
||||
|
||||
impl<'a, S, R> State<'a, S, R> {
|
||||
impl<S, R> State<'_, S, R> {
|
||||
async fn gateway_handshake_inner(
|
||||
&mut self,
|
||||
raw_init_message: Vec<u8>,
|
||||
|
||||
@@ -20,6 +20,10 @@ pub enum ClientRequest {
|
||||
hkdf_salt: Vec<u8>,
|
||||
derived_key_digest: Vec<u8>,
|
||||
},
|
||||
ForgetMe {
|
||||
client: bool,
|
||||
stats: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl ClientRequest {
|
||||
|
||||
@@ -11,6 +11,7 @@ use tungstenite::Message;
|
||||
#[non_exhaustive]
|
||||
pub enum SensitiveServerResponse {
|
||||
KeyUpgradeAck {},
|
||||
ForgetMeAck {},
|
||||
}
|
||||
|
||||
impl SensitiveServerResponse {
|
||||
|
||||
@@ -6,7 +6,10 @@ use models::StoredFinishedSession;
|
||||
use nym_node_metrics::entry::{ActiveSession, FinishedSession, SessionType};
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use sessions::SessionManager;
|
||||
use sqlx::ConnectOptions;
|
||||
use sqlx::{
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
ConnectOptions,
|
||||
};
|
||||
use std::path::Path;
|
||||
use time::Date;
|
||||
use tracing::{debug, error};
|
||||
@@ -36,6 +39,9 @@ impl PersistentStatsStorage {
|
||||
// TODO: we can inject here more stuff based on our gateway global config
|
||||
// struct. Maybe different pool size or timeout intervals?
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path)
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
@@ -116,6 +122,16 @@ impl PersistentStatsStorage {
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn delete_unique_user(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.delete_unique_user(client_address.as_base58_string())
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn insert_active_session(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
|
||||
@@ -71,6 +71,16 @@ impl SessionManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_unique_user(&self, client_address_b58: String) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"DELETE FROM sessions_unique_users WHERE client_address = ?",
|
||||
client_address_b58
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_unique_users(&self, date: Date) -> Result<Vec<String>> {
|
||||
sqlx::query_scalar!(
|
||||
"SELECT client_address as count FROM sessions_unique_users WHERE day = ?",
|
||||
|
||||
+12
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"db_name": "SQLite",
|
||||
"query": "DELETE FROM message_store WHERE client_address_bs58 = ?",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Right": 1
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "3ea5542b21a41b14276a8fd6b870c61aa0ddd30fee2565803b88c6086bd2a734"
|
||||
}
|
||||
+12
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"db_name": "SQLite",
|
||||
"query": "DELETE FROM available_bandwidth WHERE client_id = ?",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Right": 1
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "a3cc707995b8215fa77738cd1a55f9e8d251a3e764104d2a54153895dee1a118"
|
||||
}
|
||||
@@ -11,7 +11,6 @@ license.workspace = true
|
||||
[dependencies]
|
||||
bincode = { workspace = true }
|
||||
defguard_wireguard_rs = { workspace = true }
|
||||
log = { workspace = true }
|
||||
sqlx = { workspace = true, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
/*
|
||||
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
* SPDX-License-Identifier: GPL-3.0-only
|
||||
*/
|
||||
|
||||
|
||||
ALTER TABLE message_store
|
||||
RENAME TO message_store_old;
|
||||
|
||||
-- add new column with message timestamp.
|
||||
-- note: we can't simply alter existing table to add it since the default value is non-constant
|
||||
CREATE TABLE message_store
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
client_address_bs58 TEXT NOT NULL,
|
||||
content BLOB NOT NULL,
|
||||
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
INSERT INTO message_store(id, client_address_bs58, content)
|
||||
SELECT id, client_address_bs58, content
|
||||
FROM message_store_old;
|
||||
DROP TABLE message_store_old;
|
||||
|
||||
@@ -49,6 +49,16 @@ impl BandwidthManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_client(&self, client_id: i64) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!(
|
||||
"DELETE FROM available_bandwidth WHERE client_id = ?",
|
||||
client_id
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the expiration date of the particular client to the provided date.
|
||||
pub(crate) async fn set_expiration(
|
||||
&self,
|
||||
|
||||
@@ -2,9 +2,11 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::models::StoredMessage;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::debug;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct InboxManager {
|
||||
pub struct InboxManager {
|
||||
connection_pool: sqlx::SqlitePool,
|
||||
/// Maximum number of messages that can be obtained from the database per operation.
|
||||
/// It is used to prevent out of memory errors in the case of client receiving a lot of data while
|
||||
@@ -71,44 +73,22 @@ impl InboxManager {
|
||||
// get 1 additional message to check whether there will be more to grab
|
||||
// next time
|
||||
let limit = self.retrieval_limit + 1;
|
||||
let mut res = if let Some(start_after) = start_after {
|
||||
sqlx::query_as!(
|
||||
StoredMessage,
|
||||
r#"
|
||||
SELECT
|
||||
id as "id!",
|
||||
client_address_bs58 as "client_address_bs58!",
|
||||
content as "content!"
|
||||
FROM message_store
|
||||
WHERE client_address_bs58 = ? AND id > ?
|
||||
ORDER BY id ASC
|
||||
LIMIT ?;
|
||||
"#,
|
||||
client_address_bs58,
|
||||
start_after,
|
||||
limit
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await?
|
||||
} else {
|
||||
sqlx::query_as!(
|
||||
StoredMessage,
|
||||
r#"
|
||||
SELECT
|
||||
id as "id!",
|
||||
client_address_bs58 as "client_address_bs58!",
|
||||
content as "content!"
|
||||
FROM message_store
|
||||
WHERE client_address_bs58 = ?
|
||||
ORDER BY id ASC
|
||||
LIMIT ?;
|
||||
"#,
|
||||
client_address_bs58,
|
||||
limit
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await?
|
||||
};
|
||||
let start_after = start_after.unwrap_or(-1);
|
||||
|
||||
let mut res: Vec<StoredMessage> = sqlx::query_as(
|
||||
r#"
|
||||
SELECT id, client_address_bs58, content, timestamp
|
||||
FROM message_store
|
||||
WHERE client_address_bs58 = ? AND id > ?
|
||||
ORDER BY id ASC
|
||||
LIMIT ?;
|
||||
"#,
|
||||
)
|
||||
.bind(client_address_bs58)
|
||||
.bind(start_after)
|
||||
.bind(limit)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await?;
|
||||
|
||||
if res.len() > self.retrieval_limit as usize {
|
||||
res.truncate(self.retrieval_limit as usize);
|
||||
@@ -133,4 +113,26 @@ impl InboxManager {
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_messages_for_client(
|
||||
&self,
|
||||
client_address_bs58: &str,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!(
|
||||
"DELETE FROM message_store WHERE client_address_bs58 = ?",
|
||||
client_address_bs58
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn remove_stale(&self, cutoff: OffsetDateTime) -> Result<(), sqlx::Error> {
|
||||
let affected = sqlx::query!("DELETE FROM message_store WHERE timestamp < ?", cutoff)
|
||||
.execute(&self.connection_pool)
|
||||
.await?
|
||||
.rows_affected();
|
||||
debug!("Removed {affected} stale messages");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
use bandwidth::BandwidthManager;
|
||||
use clients::{ClientManager, ClientType};
|
||||
use inboxes::InboxManager;
|
||||
use models::{
|
||||
Client, PersistedBandwidth, PersistedSharedKeys, RedemptionProposal, StoredMessage,
|
||||
VerifiedTicket, WireguardPeer,
|
||||
@@ -12,7 +11,10 @@ use nym_credentials_interface::ClientTicket;
|
||||
use nym_gateway_requests::shared_key::SharedGatewayKey;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use shared_keys::SharedKeysManager;
|
||||
use sqlx::ConnectOptions;
|
||||
use sqlx::{
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
ConnectOptions,
|
||||
};
|
||||
use std::path::Path;
|
||||
use tickets::TicketStorageManager;
|
||||
use time::OffsetDateTime;
|
||||
@@ -28,6 +30,7 @@ mod tickets;
|
||||
mod wireguard_peers;
|
||||
|
||||
pub use error::GatewayStorageError;
|
||||
pub use inboxes::InboxManager;
|
||||
|
||||
// note that clone here is fine as upon cloning the same underlying pool will be used
|
||||
#[derive(Clone)]
|
||||
@@ -41,6 +44,33 @@ pub struct GatewayStorage {
|
||||
}
|
||||
|
||||
impl GatewayStorage {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn client_manager(&self) -> &ClientManager {
|
||||
&self.client_manager
|
||||
}
|
||||
|
||||
pub(crate) fn shared_key_manager(&self) -> &SharedKeysManager {
|
||||
&self.shared_key_manager
|
||||
}
|
||||
|
||||
pub fn inbox_manager(&self) -> &InboxManager {
|
||||
&self.inbox_manager
|
||||
}
|
||||
|
||||
pub(crate) fn bandwidth_manager(&self) -> &BandwidthManager {
|
||||
&self.bandwidth_manager
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn ticket_manager(&self) -> &TicketStorageManager {
|
||||
&self.ticket_manager
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn wireguard_peer_manager(&self) -> &wireguard_peers::WgPeerManager {
|
||||
&self.wireguard_peer_manager
|
||||
}
|
||||
|
||||
/// Initialises `PersistentStorage` using the provided path.
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -59,6 +89,9 @@ impl GatewayStorage {
|
||||
// TODO: we can inject here more stuff based on our gateway global config
|
||||
// struct. Maybe different pool size or timeout intervals?
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.auto_vacuum(SqliteAutoVacuum::Incremental)
|
||||
.filename(database_path)
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
@@ -101,6 +134,21 @@ impl GatewayStorage {
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn handle_forget_me(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
) -> Result<(), GatewayStorageError> {
|
||||
let client_id = self.get_mixnet_client_id(client_address).await?;
|
||||
self.inbox_manager()
|
||||
.remove_messages_for_client(&client_address.as_base58_string())
|
||||
.await?;
|
||||
self.bandwidth_manager().remove_client(client_id).await?;
|
||||
self.shared_key_manager()
|
||||
.remove_shared_keys(&client_address.as_base58_string())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn insert_shared_keys(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
|
||||
@@ -48,11 +48,13 @@ impl TryFrom<PersistedSharedKeys> for SharedGatewayKey {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromRow)]
|
||||
pub struct StoredMessage {
|
||||
pub id: i64,
|
||||
#[allow(dead_code)]
|
||||
pub client_address_bs58: String,
|
||||
pub content: Vec<u8>,
|
||||
pub timestamp: OffsetDateTime,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, FromRow)]
|
||||
|
||||
+241
-138
@@ -192,6 +192,28 @@ impl Client {
|
||||
&self.base_url
|
||||
}
|
||||
|
||||
pub fn create_request<B, K, V>(
|
||||
&self,
|
||||
method: reqwest::Method,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
json_body: Option<&B>,
|
||||
) -> RequestBuilder
|
||||
where
|
||||
B: Serialize + ?Sized,
|
||||
K: AsRef<str>,
|
||||
V: AsRef<str>,
|
||||
{
|
||||
let url = sanitize_url(&self.base_url, path, params);
|
||||
let mut request = self.reqwest_client.request(method.clone(), url);
|
||||
|
||||
if let Some(body) = json_body {
|
||||
request = request.json(body);
|
||||
}
|
||||
|
||||
request
|
||||
}
|
||||
|
||||
pub fn create_get_request<K, V>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
@@ -205,38 +227,6 @@ impl Client {
|
||||
self.reqwest_client.get(url)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, fields(path=?path))]
|
||||
async fn send_get_request<K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
) -> Result<Response, HttpClientError<E>>
|
||||
where
|
||||
K: AsRef<str>,
|
||||
V: AsRef<str>,
|
||||
E: Display,
|
||||
{
|
||||
tracing::trace!("Sending GET request");
|
||||
let url = sanitize_url(&self.base_url, path, params);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
Ok(
|
||||
wasmtimer::tokio::timeout(
|
||||
self.request_timeout,
|
||||
self.reqwest_client.get(url).send(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
Ok(self.reqwest_client.get(url).send().await?)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_post_request<B, K, V>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
@@ -252,36 +242,6 @@ impl Client {
|
||||
self.reqwest_client.post(url).json(json_body)
|
||||
}
|
||||
|
||||
async fn send_post_request<B, K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
json_body: &B,
|
||||
) -> Result<Response, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized,
|
||||
K: AsRef<str>,
|
||||
V: AsRef<str>,
|
||||
E: Display,
|
||||
{
|
||||
let url = sanitize_url(&self.base_url, path, params);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
Ok(wasmtimer::tokio::timeout(
|
||||
self.request_timeout,
|
||||
self.reqwest_client.post(url).json(json_body).send(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
Ok(self.reqwest_client.post(url).json(json_body).send().await?)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_delete_request<K, V>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
@@ -295,6 +255,88 @@ impl Client {
|
||||
self.reqwest_client.delete(url)
|
||||
}
|
||||
|
||||
pub fn create_patch_request<B, K, V>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
json_body: &B,
|
||||
) -> RequestBuilder
|
||||
where
|
||||
B: Serialize + ?Sized,
|
||||
K: AsRef<str>,
|
||||
V: AsRef<str>,
|
||||
{
|
||||
let url = sanitize_url(&self.base_url, path, params);
|
||||
self.reqwest_client.patch(url).json(json_body)
|
||||
}
|
||||
|
||||
async fn send_request<B, K, V, E>(
|
||||
&self,
|
||||
method: reqwest::Method,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
json_body: Option<&B>,
|
||||
) -> Result<Response, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized,
|
||||
K: AsRef<str>,
|
||||
V: AsRef<str>,
|
||||
E: Display,
|
||||
{
|
||||
let url = sanitize_url(&self.base_url, path, params);
|
||||
|
||||
let mut request = self.reqwest_client.request(method.clone(), url);
|
||||
|
||||
if let Some(body) = json_body {
|
||||
request = request.json(body);
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
Ok(
|
||||
wasmtimer::tokio::timeout(self.request_timeout, request.send())
|
||||
.await
|
||||
.map_err(|_timeout| HttpClientError::RequestTimeout)??,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
Ok(request.send().await?)
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, fields(path=?path))]
|
||||
async fn send_get_request<K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
) -> Result<Response, HttpClientError<E>>
|
||||
where
|
||||
K: AsRef<str>,
|
||||
V: AsRef<str>,
|
||||
E: Display,
|
||||
{
|
||||
self.send_request(reqwest::Method::GET, path, params, None::<&()>)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn send_post_request<B, K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
json_body: &B,
|
||||
) -> Result<Response, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized,
|
||||
K: AsRef<str>,
|
||||
V: AsRef<str>,
|
||||
E: Display,
|
||||
{
|
||||
self.send_request(reqwest::Method::POST, path, params, Some(json_body))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_delete_request<K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
@@ -305,23 +347,24 @@ impl Client {
|
||||
V: AsRef<str>,
|
||||
E: Display,
|
||||
{
|
||||
tracing::trace!("Sending DELETE request");
|
||||
let url = sanitize_url(&self.base_url, path, params);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
Ok(wasmtimer::tokio::timeout(
|
||||
self.request_timeout,
|
||||
self.reqwest_client.delete(url).send(),
|
||||
)
|
||||
self.send_request(reqwest::Method::DELETE, path, params, None::<&()>)
|
||||
.await
|
||||
.map_err(|_timeout| HttpClientError::RequestTimeout)??)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
Ok(self.reqwest_client.delete(url).send().await?)
|
||||
}
|
||||
pub async fn send_patch_request<B, K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
json_body: &B,
|
||||
) -> Result<Response, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized,
|
||||
K: AsRef<str>,
|
||||
V: AsRef<str>,
|
||||
E: Display,
|
||||
{
|
||||
self.send_request(reqwest::Method::PATCH, path, params, Some(json_body))
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
@@ -372,6 +415,56 @@ impl Client {
|
||||
parse_response(res, false).await
|
||||
}
|
||||
|
||||
pub async fn patch_json<B, T, K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
json_body: &B,
|
||||
) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized,
|
||||
for<'a> T: Deserialize<'a>,
|
||||
K: AsRef<str>,
|
||||
V: AsRef<str>,
|
||||
E: Display + DeserializeOwned,
|
||||
{
|
||||
let res = self.send_patch_request(path, params, json_body).await?;
|
||||
parse_response(res, true).await
|
||||
}
|
||||
|
||||
async fn call_json_endpoint<B, T, S, E>(
|
||||
&self,
|
||||
method: reqwest::Method,
|
||||
endpoint: S,
|
||||
json_body: Option<&B>,
|
||||
) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized,
|
||||
for<'a> T: Deserialize<'a>,
|
||||
E: Display + DeserializeOwned,
|
||||
S: AsRef<str>,
|
||||
{
|
||||
let mut request = self
|
||||
.reqwest_client
|
||||
.request(method.clone(), self.base_url.join(endpoint.as_ref())?);
|
||||
|
||||
if let Some(body) = json_body {
|
||||
request = request.json(body);
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let res = {
|
||||
wasmtimer::tokio::timeout(self.request_timeout, request.send())
|
||||
.await
|
||||
.map_err(|_timeout| HttpClientError::RequestTimeout)??
|
||||
};
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let res = { request.send().await? };
|
||||
|
||||
parse_response(res, false).await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub async fn get_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
@@ -379,27 +472,8 @@ impl Client {
|
||||
E: Display + DeserializeOwned,
|
||||
S: AsRef<str>,
|
||||
{
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let res = {
|
||||
wasmtimer::tokio::timeout(
|
||||
self.request_timeout,
|
||||
self.reqwest_client
|
||||
.get(self.base_url.join(endpoint.as_ref())?)
|
||||
.send(),
|
||||
)
|
||||
self.call_json_endpoint(reqwest::Method::GET, endpoint, None::<&()>)
|
||||
.await
|
||||
.map_err(|_timeout| HttpClientError::RequestTimeout)??
|
||||
};
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let res = {
|
||||
self.reqwest_client
|
||||
.get(self.base_url.join(endpoint.as_ref())?)
|
||||
.send()
|
||||
.await?
|
||||
};
|
||||
|
||||
parse_response(res, false).await
|
||||
}
|
||||
|
||||
pub async fn post_json_endpoint<B, T, S, E>(
|
||||
@@ -413,29 +487,8 @@ impl Client {
|
||||
E: Display + DeserializeOwned,
|
||||
S: AsRef<str>,
|
||||
{
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let res = {
|
||||
wasmtimer::tokio::timeout(
|
||||
self.request_timeout,
|
||||
self.reqwest_client
|
||||
.post(self.base_url.join(endpoint.as_ref())?)
|
||||
.json(json_body)
|
||||
.send(),
|
||||
)
|
||||
self.call_json_endpoint(reqwest::Method::POST, endpoint, Some(json_body))
|
||||
.await
|
||||
.map_err(|_timeout| HttpClientError::RequestTimeout)??
|
||||
};
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let res = {
|
||||
self.reqwest_client
|
||||
.post(self.base_url.join(endpoint.as_ref())?)
|
||||
.json(json_body)
|
||||
.send()
|
||||
.await?
|
||||
};
|
||||
|
||||
parse_response(res, true).await
|
||||
}
|
||||
|
||||
pub async fn delete_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
|
||||
@@ -444,27 +497,23 @@ impl Client {
|
||||
E: Display + DeserializeOwned,
|
||||
S: AsRef<str>,
|
||||
{
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let res = {
|
||||
wasmtimer::tokio::timeout(
|
||||
self.request_timeout,
|
||||
self.reqwest_client
|
||||
.delete(self.base_url.join(endpoint.as_ref())?)
|
||||
.send(),
|
||||
)
|
||||
self.call_json_endpoint(reqwest::Method::DELETE, endpoint, None::<&()>)
|
||||
.await
|
||||
.map_err(|_timeout| HttpClientError::RequestTimeout)??
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let res = {
|
||||
self.reqwest_client
|
||||
.delete(self.base_url.join(endpoint.as_ref())?)
|
||||
.send()
|
||||
.await?
|
||||
};
|
||||
|
||||
parse_response(res, false).await
|
||||
pub async fn patch_json_endpoint<B, T, S, E>(
|
||||
&self,
|
||||
endpoint: S,
|
||||
json_body: &B,
|
||||
) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized,
|
||||
for<'a> T: Deserialize<'a>,
|
||||
E: Display + DeserializeOwned,
|
||||
S: AsRef<str>,
|
||||
{
|
||||
self.call_json_endpoint(reqwest::Method::PATCH, endpoint, Some(json_body))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -509,6 +558,19 @@ pub trait ApiClient {
|
||||
V: AsRef<str> + Sync,
|
||||
E: Display + DeserializeOwned;
|
||||
|
||||
async fn patch_json<B, T, K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
json_body: &B,
|
||||
) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized + Sync,
|
||||
for<'a> T: Deserialize<'a>,
|
||||
K: AsRef<str> + Sync,
|
||||
V: AsRef<str> + Sync,
|
||||
E: Display + DeserializeOwned;
|
||||
|
||||
/// `get` json data from the provided absolute endpoint, i.e. for example `"/api/v1/mixnodes?since=12345"`
|
||||
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
@@ -532,6 +594,17 @@ pub trait ApiClient {
|
||||
for<'a> T: Deserialize<'a>,
|
||||
E: Display + DeserializeOwned,
|
||||
S: AsRef<str> + Sync + Send;
|
||||
|
||||
async fn patch_json_data_at<B, T, S, E>(
|
||||
&self,
|
||||
endpoint: S,
|
||||
json_body: &B,
|
||||
) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized + Sync,
|
||||
for<'a> T: Deserialize<'a>,
|
||||
E: Display + DeserializeOwned,
|
||||
S: AsRef<str> + Sync + Send;
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
@@ -581,6 +654,22 @@ impl ApiClient for Client {
|
||||
self.delete_json(path, params).await
|
||||
}
|
||||
|
||||
async fn patch_json<B, T, K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
params: Params<'_, K, V>,
|
||||
json_body: &B,
|
||||
) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized + Sync,
|
||||
for<'a> T: Deserialize<'a>,
|
||||
K: AsRef<str> + Sync,
|
||||
V: AsRef<str> + Sync,
|
||||
E: Display + DeserializeOwned,
|
||||
{
|
||||
self.patch_json(path, params, json_body).await
|
||||
}
|
||||
|
||||
async fn get_json_from<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
for<'a> T: Deserialize<'a>,
|
||||
@@ -612,6 +701,20 @@ impl ApiClient for Client {
|
||||
{
|
||||
self.delete_json_endpoint(endpoint).await
|
||||
}
|
||||
|
||||
async fn patch_json_data_at<B, T, S, E>(
|
||||
&self,
|
||||
endpoint: S,
|
||||
json_body: &B,
|
||||
) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
B: Serialize + ?Sized + Sync,
|
||||
for<'a> T: Deserialize<'a>,
|
||||
E: Display + DeserializeOwned,
|
||||
S: AsRef<str> + Sync + Send,
|
||||
{
|
||||
self.patch_json_endpoint(endpoint, json_body).await
|
||||
}
|
||||
}
|
||||
|
||||
// utility function that should solve the double slash problem in API urls forever.
|
||||
|
||||
@@ -10,7 +10,6 @@ use serde::{Deserialize, Serialize};
|
||||
pub mod middleware;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
|
||||
pub enum FormattedResponse<T> {
|
||||
Json(Json<T>),
|
||||
Yaml(Yaml<T>),
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user