Compare commits

..

15 Commits

Author SHA1 Message Date
mfahampshire 7f1baa1db7 sandbox nyx rest api 2024-12-18 14:34:06 +01:00
mfahampshire 6d4eaea1cc spacing + working openapi local for nymapi 2024-12-18 14:29:28 +01:00
dynco-nym 865b668e12 Move ecash schema out of ecash crate 2024-12-18 13:23:17 +01:00
dynco-nym c4409e3c1a Improvements 2024-12-16 19:08:53 +01:00
dynco-nym 3934c556ee generate Sqlx schema files 2024-12-16 13:47:04 +01:00
dynco-nym f0da36df7c Gitattributes to ignore .sqlx diffs 2024-12-16 13:46:43 +01:00
dynco-nym 95989dbb67 Post rebase fixes 2024-12-16 13:32:06 +01:00
dynco-nym 33f2e2ca7d WIP 2024-12-16 13:24:34 +01:00
dynco-nym fce494af97 Compiles with utoipa 5.2 2024-12-16 13:24:33 +01:00
dynco-nym 671ce9a399 A bunch of annotations 2024-12-16 12:05:54 +01:00
dynco-nym 1303d404f7 Add cfg_attr 2024-12-16 12:05:54 +01:00
dynco-nym 7618ebf694 rustfmt 2024-12-16 12:05:54 +01:00
dynco-nym 13e64da2ad ContractBuildInformation on /nym_contracts_detailed 2024-12-16 12:05:54 +01:00
dynco-nym 901b88f98b Derive ToSchema for more types 2024-12-16 12:05:54 +01:00
dynco-nym b60f07730b WIP adding derive(ToSchema) 2024-12-16 12:05:54 +01:00
988 changed files with 25554 additions and 98902 deletions
-2
View File
@@ -31,5 +31,3 @@ updates:
update-types:
- "patch"
open-pull-requests-limit: 10
assignees:
- "octol"
@@ -79,6 +79,7 @@ jobs:
target/release/nym-socks5-client
target/release/nym-api
target/release/nym-network-requester
target/release/nym-data-observatory
target/release/nym-cli
target/release/nymvisor
target/release/nym-node
@@ -96,6 +97,7 @@ jobs:
cp target/release/nym-socks5-client $OUTPUT_DIR
cp target/release/nym-api $OUTPUT_DIR
cp target/release/nym-network-requester $OUTPUT_DIR
cp target/release/nym-data-observatory $OUTPUT_DIR
cp target/release/nymvisor $OUTPUT_DIR
cp target/release/nym-node $OUTPUT_DIR
cp target/release/nym-cli $OUTPUT_DIR
+9 -25
View File
@@ -8,18 +8,16 @@ on:
- 'explorer-api/**'
- 'gateway/**'
- 'integrations/**'
- 'nym-api/**'
- 'nym-credential-proxy/**'
- 'nym-network-monitor/**'
- 'nym-node/**'
- 'nym-node-status-api/**'
- 'nym-outfox/**'
- 'nym-validator-rewarder/**'
- 'nyx-chain-watcher/**'
- 'sdk/ffi/**'
- 'mixnode/**'
- 'sdk/rust/**'
- 'sdk/lib/**'
- 'service-providers/**'
- 'nym-browser-extension/storage/**'
- 'nym-network-monitor/**'
- 'nym-api/**'
- 'nym-node/**'
- 'nym-outfox/**'
- 'nym-data-observatory/**'
- 'nym-validator-rewarder/**'
- 'tools/**'
- 'wasm/**'
- 'Cargo.toml'
@@ -32,7 +30,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ arc-ubuntu-20.04, custom-windows-11, custom-runner-mac-m1 ]
os: [ arc-ubuntu-20.04, custom-runner-mac-m1 ]
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
@@ -54,20 +52,6 @@ jobs:
override: true
components: rustfmt, clippy
# To avoid running out of disk space, skip generating debug symbols
- name: Set debug to false (unix)
if: contains(matrix.os, 'ubuntu') || contains(matrix.os, 'mac')
run: |
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
git diff
- name: Set debug to false (win)
if: contains(matrix.os, 'windows')
shell: pwsh
run: |
(Get-Content Cargo.toml) -replace '\[profile.dev\]', "`$&`ndebug = false" | Set-Content Cargo.toml
git diff
- name: Check formatting
uses: actions-rs/cargo@v1
with:
-2
View File
@@ -9,8 +9,6 @@ on:
paths:
- 'contracts/**'
- 'common/**'
- 'Cargo.lock'
- 'Cargo.toml'
- '.github/workflows/ci-contracts.yml'
jobs:
-6
View File
@@ -30,12 +30,6 @@ jobs:
override: true
components: rustfmt, clippy
- name: Set debug to false
working-directory: nym-wallet
run: |
sed -i.bak '1s/^/\[profile.dev\]\ndebug = false\n\n/' Cargo.toml
git diff
- name: Build all binaries
uses: actions-rs/cargo@v1
with:
-6
View File
@@ -1,7 +1,6 @@
name: ci-sdk-wasm
on:
workflow_dispatch:
pull_request:
paths:
- 'wasm/**'
@@ -45,11 +44,6 @@ jobs:
- name: Install wasm-bindgen-cli
run: cargo install wasm-bindgen-cli
- name: Set debug to false
run: |
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
git diff
- name: "Build"
run: make sdk-wasm-build
-1
View File
@@ -15,7 +15,6 @@ jobs:
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
IPINFO_API_TOKEN: ${{ secrets.IPINFO_API_TOKEN }}
continue-on-error: true
steps:
- name: Check out repository code
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-credential-proxy/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-network-monitor/Cargo.toml
@@ -31,7 +31,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.1
uses: mikefarah/yq@v4.44.6
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
+1 -8
View File
@@ -51,11 +51,4 @@ ppa-private-key.b64
ppa-private-key.asc
nym-network-monitor/topology.json
nym-network-monitor/__pycache__
nym-network-monitor/*.key
nym-network-monitor/.envrc
nym-network-monitor/.envrc
nym-api/redocly/formatted-openapi.json
*.sqlite
.build
nym-network-monitor/*.key
@@ -2,18 +2,22 @@
# See https://redocly.com/docs/cli/ for more information.
formatted-openapi.json:
path-parameters-defined:
- >-
#/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/0/name
- >-
#/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/1/name
- >-
#/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/2/name
- >-
#/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/3/name
- >-
#/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/4/name
- >-
#/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/5/name
# - >-
# #/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/0/name
# - >-
# #/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/1/name
# - >-
# #/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/2/name
# - >-
# #/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/3/name
# - >-
# #/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/4/name
# - >-
# #/paths/~1v1~1status~1mixnode~1{mix_id}~1compute-reward-estimation/post/parameters/5/name
# - '#/paths/~1v1~1unstable~1nym-nodes~1skimmed~1active/get/parameters/0/name'
# - '#/paths/~1v1~1unstable~1nym-nodes~1skimmed~1active/get/parameters/1/name'
# - '#/paths/~1v1~1unstable~1nym-nodes~1skimmed~1active/get/parameters/2/name'
# - '#/paths/~1v1~1unstable~1nym-nodes~1skimmed~1active/get/parameters/3/name'
operation-operationId-unique:
- >-
#/paths/~1v1~1status~1mixnodes~1active~1detailed/get/get_active_set_detailed
@@ -26,7 +30,6 @@ formatted-openapi.json:
- '#/components/schemas/EcashTicketVerificationRejection'
- '#/components/schemas/ExpirationDatePathParam'
- '#/components/schemas/FullFatNode'
- '#/components/schemas/G2ProjectiveSchema'
- '#/components/schemas/HistoricalPerformanceResponse'
- '#/components/schemas/HistoricalUptimeResponse'
- '#/components/schemas/MasterVerificationKeyResponse'
@@ -38,7 +41,6 @@ formatted-openapi.json:
- '#/components/schemas/NymNodeDetails'
- '#/components/schemas/PaginationRequest'
- '#/components/schemas/PartialCoinIndicesSignatureResponse'
- '#/components/schemas/PayInfo'
- '#/components/schemas/SpentCredentialsResponse'
- '#/components/schemas/UptimeHistoryResponse'
- '#/components/schemas/VerifyEcashCredentialBody'
+83
View File
@@ -0,0 +1,83 @@
extends:
- minimal
apis:
nym-api:
root: ./formatted-openapi.json
rules:
# https://redocly.com/docs/cli/rules/oas/operation-summary
operation-summary: off
# https://redocly.com/docs/cli/rules/oas/security-defined
security-defined: off
struct: off
# https://redocly.com/docs/cli/rules/oas/operation-2xx-response
operation-2xx-response: off
# rules:
# skip-warnings: true
# ignore:
# - path: /v1/gateways
# method: get
# - path: /v1/gateways/blacklisted
# method: get
# - path: /v1/mixnodes
# method: get
# - path: /v1/mixnodes/active
# method: get
# - path: /v1/mixnodes/active/detailed
# method: get
# - path: /v1/mixnodes/blacklisted
# method: get
# - path: /v1/mixnodes/detailed
# method: get
# - path: /v1/mixnodes/rewarded
# method: get
# - path: /v1/mixnodes/rewarded/detailed
# method: get
# - path: /v1/gateways/described
# method: get
# # network-monitor-status (deprecated)
# - path: /v1/status/gateway/{identity}/avg_uptime
# method: GET
# - path: /v1/status/gateway/{identity}/core-status-count
# method: GET
# - path: /v1/status/gateway/{identity}/history
# method: GET
# - path: /v1/status/gateway/{identity}/report
# method: GET
# - path: /v1/status/gateways/detailed
# method: GET
# - path: /v1/status/gateways/detailed-unfiltered
# method: GET
# - path: /v1/status/mixnode/{mix_id}/avg_uptime
# method: GET
# - path: /v1/status/mixnode/{mix_id}/compute-reward-estimation
# method: POST
# - path: /v1/status/mixnode/{mix_id}/core-status-count
# method: GET
# - path: /v1/status/mixnode/{mix_id}/history
# method: GET
# - path: /v1/status/mixnode/{mix_id}/report
# method: GET
# - path: /v1/status/mixnode/{mix_id}/reward-estimation
# method: GET
# - path: /v1/status/mixnodes/detailed-unfiltered
# method: GET
# # status
# - path: /v1/status/mixnode/{mix_id}/inclusion-probability
# method: GET
# - path: /v1/status/mixnode/{mix_id}/stake-saturation
# method: GET
# - path: /v1/status/mixnode/{mix_id}/status
# method: GET
# - path: /v1/status/mixnodes/active/detailed
# method: GET
# - path: /v1/status/mixnodes/detailed
# method: GET
# - path: /v1/status/mixnodes/inclusion-probability
# method: GET
# - path: /v1/status/mixnodes/rewarded/detailed
# method: GET
# # unstable nym nodes
# - path: /v1/unstable/nym-nodes/gateways/skimmed
# method: get
# - path: /v1/unstable/nym-nodes/mixnodes/skimmed
# method: get
-201
View File
@@ -4,207 +4,6 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2025.3-ruta] (2025-02-10)
- Push down forget me to client configs ([#5431])
- Fix statistics shutdown ([#5426])
- Make wait_for_graceful_shutdown to be pub ([#5424])
- Upgrade to thiserror 2.0 ([#5414])
- build(deps): bump the patch-updates group across 1 directory with 9 updates ([#5406])
- Relocate a validator api function ([#5401])
- Send shutdown instead of panic when reaching max fail ([#5398])
- Change Explorer URL to new smooshed nodes ([#5396])
- reduce log severity for checking topology validity ([#5395])
- MixnetClient can send ClientRequests ([#5381])
- Fix missing path triggers for CI ([#5380])
- Uncouple storage reference for bandwidth client ([#5372])
- build(deps): bump tokio from 1.40.0 to 1.43.0 ([#5370])
- DNS resolver configuration for internal HTTP client lookups ([#5355])
- Update README.md ([#5328])
- Update README.md ([#5327])
[#5431]: https://github.com/nymtech/nym/pull/5431
[#5426]: https://github.com/nymtech/nym/pull/5426
[#5424]: https://github.com/nymtech/nym/pull/5424
[#5414]: https://github.com/nymtech/nym/pull/5414
[#5406]: https://github.com/nymtech/nym/pull/5406
[#5401]: https://github.com/nymtech/nym/pull/5401
[#5398]: https://github.com/nymtech/nym/pull/5398
[#5396]: https://github.com/nymtech/nym/pull/5396
[#5395]: https://github.com/nymtech/nym/pull/5395
[#5381]: https://github.com/nymtech/nym/pull/5381
[#5380]: https://github.com/nymtech/nym/pull/5380
[#5372]: https://github.com/nymtech/nym/pull/5372
[#5370]: https://github.com/nymtech/nym/pull/5370
[#5355]: https://github.com/nymtech/nym/pull/5355
[#5328]: https://github.com/nymtech/nym/pull/5328
[#5327]: https://github.com/nymtech/nym/pull/5327
## [2025.2-hu] (2025-02-04)
- Feature/remove double spending bloomfilter ([#5417])
- HU - Downgrade harmless log message from info to debug ([#5405])
- lower default ticket verification quorum to 0.7 ([#5404])
- Downgrade harmless log message from info to debug ([#5403])
- Redirect from mixnode page to nodes page ([#5397])
- chore :update version of chain watcher and validator rewarder ([#5394])
- bugfix: correctly handle ingore epoch roles flag ([#5390])
- bugfix: terminate mixnet socket listener on shutdown ([#5389])
- feat: make client ignore dual mode nodes by default ([#5388])
- Handle ecash network errors differently ([#5378])
- Remove empty ephemeral keys ([#5376])
- fixed sql migration for adding default message timestamp ([#5374])
- Bind to [::] on nym-node for both IP versions ([#5361])
- exposed NymApiClient method for obtaining node performance history ([#5360])
- Client gateway selection ([#5358])
- chore: refresh wasm sdk ([#5353])
- chore: update indexed_db_futures ([#5347])
- build(deps): bump mikefarah/yq from 4.44.6 to 4.45.1 ([#5342])
- updated cosmrs and tendermint-rpc to their most recent versions ([#5339])
- build(deps): bump ts-rs from 10.0.0 to 10.1.0 ([#5338])
- build(deps): bump tempfile from 3.14.0 to 3.15.0 ([#5337])
- build(deps): bump the patch-updates group with 8 updates ([#5336])
- feature: introduce /load endpoint for self-reported quantised NymNode load ([#5326])
- feature: `CancellationToken`-based shutdowns ([#5325])
- Use expect in geodata test to give error message on failure ([#5314])
- feature: periodically remove stale gateway messages ([#5312])
- build(deps): bump the patch-updates group across 1 directory with 35 updates ([#5310])
- Add dependabot assignes for the root cargo ecosystem ([#5297])
- Move tun constants to network defaults ([#5286])
- Include IPINFO_API_TOKEN in nightly CI ([#5285])
- Nyx Chain Watcher ([#5274])
- bugfix: remove unnecessary arguments for nym-api swagger endpoints ([#5272])
- feature: nym topology revamp ([#5271])
- Add windows to CI builds ([#5269])
- http-api-client: deduplicate code ([#5267])
- build(deps): bump http from 1.1.0 to 1.2.0 ([#5228])
- NS API: add mixnet scraper ([#5200])
- build(deps): bump criterion from 0.4.0 to 0.5.1 ([#4911])
[#5417]: https://github.com/nymtech/nym/pull/5417
[#5405]: https://github.com/nymtech/nym/pull/5405
[#5404]: https://github.com/nymtech/nym/pull/5404
[#5403]: https://github.com/nymtech/nym/pull/5403
[#5397]: https://github.com/nymtech/nym/pull/5397
[#5394]: https://github.com/nymtech/nym/pull/5394
[#5390]: https://github.com/nymtech/nym/pull/5390
[#5389]: https://github.com/nymtech/nym/pull/5389
[#5388]: https://github.com/nymtech/nym/pull/5388
[#5378]: https://github.com/nymtech/nym/pull/5378
[#5376]: https://github.com/nymtech/nym/pull/5376
[#5374]: https://github.com/nymtech/nym/pull/5374
[#5361]: https://github.com/nymtech/nym/pull/5361
[#5360]: https://github.com/nymtech/nym/pull/5360
[#5358]: https://github.com/nymtech/nym/pull/5358
[#5353]: https://github.com/nymtech/nym/pull/5353
[#5347]: https://github.com/nymtech/nym/pull/5347
[#5342]: https://github.com/nymtech/nym/pull/5342
[#5339]: https://github.com/nymtech/nym/pull/5339
[#5338]: https://github.com/nymtech/nym/pull/5338
[#5337]: https://github.com/nymtech/nym/pull/5337
[#5336]: https://github.com/nymtech/nym/pull/5336
[#5326]: https://github.com/nymtech/nym/pull/5326
[#5325]: https://github.com/nymtech/nym/pull/5325
[#5314]: https://github.com/nymtech/nym/pull/5314
[#5312]: https://github.com/nymtech/nym/pull/5312
[#5310]: https://github.com/nymtech/nym/pull/5310
[#5297]: https://github.com/nymtech/nym/pull/5297
[#5286]: https://github.com/nymtech/nym/pull/5286
[#5285]: https://github.com/nymtech/nym/pull/5285
[#5274]: https://github.com/nymtech/nym/pull/5274
[#5272]: https://github.com/nymtech/nym/pull/5272
[#5271]: https://github.com/nymtech/nym/pull/5271
[#5269]: https://github.com/nymtech/nym/pull/5269
[#5267]: https://github.com/nymtech/nym/pull/5267
[#5228]: https://github.com/nymtech/nym/pull/5228
[#5200]: https://github.com/nymtech/nym/pull/5200
[#4911]: https://github.com/nymtech/nym/pull/4911
## [2025.1-reeses] (2025-01-15)
- Feture/legacy alert ([#5346])
- chore: readjusted --mode behaviour to fix the regression ([#5331])
- chore: apply 1.84 linter suggestions ([#5330])
- bugfix: make sure refresh data key matches bond info ([#5329])
- reduce log severity for number of packets being delayed ([#5321])
- feat: warn users if node is run in exit mode only ([#5320])
- Bugfix/contract version assignment ([#5318])
- fixed client session histogram buckets ([#5316])
- amend 250gb limit ([#5313])
- feature: expand nym-node prometheus metrics ([#5298])
- Cherry picked #5286 ([#5287])
- Add close to credential storage ([#5283])
- feature: wireguard metrics ([#5278])
- Add PATCH support to nym-http-api-client ([#5260])
- chore: removed legacy socks5 listener ([#5259])
- bugfix: make sure to apply gateway score filtering when choosing initial node ([#5256])
- Update TS bindings ([#5255])
- Add conversion unit tests for auth msg ([#5251])
- Add control messages to GatewayTransciver ([#5247])
- Remove unneeded async function annotation ([#5246])
- bugfix: make sure to update timestamp of last batch verification to prevent double redemption ([#5239])
- Add FromStr impl for UserAgent ([#5236])
- Extend swagger docs ([#5235])
- TicketType derive Hash and Eq ([#5233])
- Add fd callback to client core ([#5230])
- Extend raw ws fd for gateway client ([#5218])
- Shipping raw metrics to PG ([#5216])
- Change sqlite journal mode to WAL ([#5213])
- Derive serialize for UserAgent ([#5210])
- Restore Location fields ([#5208])
- better date serialization ([#5207])
- Fix overflow ([#5204])
- feature: hopefully final steps of the smoosh™️ ([#5201])
- Fix overflow ([#5184])
- NS API - Gateway stats scraping ([#5180])
- introduced initial internal commands for nym-cli: ecash key and request generation ([#5174])
- Move NS client to separate package under NS API ([#5171])
- build(deps): bump micromatch from 4.0.4 to 4.0.8 in /testnet-faucet ([#4813])
[#5346]: https://github.com/nymtech/nym/pull/5346
[#5331]: https://github.com/nymtech/nym/pull/5331
[#5330]: https://github.com/nymtech/nym/pull/5330
[#5329]: https://github.com/nymtech/nym/pull/5329
[#5321]: https://github.com/nymtech/nym/pull/5321
[#5320]: https://github.com/nymtech/nym/pull/5320
[#5318]: https://github.com/nymtech/nym/pull/5318
[#5316]: https://github.com/nymtech/nym/pull/5316
[#5313]: https://github.com/nymtech/nym/pull/5313
[#5298]: https://github.com/nymtech/nym/pull/5298
[#5287]: https://github.com/nymtech/nym/pull/5287
[#5283]: https://github.com/nymtech/nym/pull/5283
[#5278]: https://github.com/nymtech/nym/pull/5278
[#5260]: https://github.com/nymtech/nym/pull/5260
[#5259]: https://github.com/nymtech/nym/pull/5259
[#5256]: https://github.com/nymtech/nym/pull/5256
[#5255]: https://github.com/nymtech/nym/pull/5255
[#5251]: https://github.com/nymtech/nym/pull/5251
[#5247]: https://github.com/nymtech/nym/pull/5247
[#5246]: https://github.com/nymtech/nym/pull/5246
[#5239]: https://github.com/nymtech/nym/pull/5239
[#5236]: https://github.com/nymtech/nym/pull/5236
[#5235]: https://github.com/nymtech/nym/pull/5235
[#5233]: https://github.com/nymtech/nym/pull/5233
[#5230]: https://github.com/nymtech/nym/pull/5230
[#5218]: https://github.com/nymtech/nym/pull/5218
[#5216]: https://github.com/nymtech/nym/pull/5216
[#5213]: https://github.com/nymtech/nym/pull/5213
[#5210]: https://github.com/nymtech/nym/pull/5210
[#5208]: https://github.com/nymtech/nym/pull/5208
[#5207]: https://github.com/nymtech/nym/pull/5207
[#5204]: https://github.com/nymtech/nym/pull/5204
[#5201]: https://github.com/nymtech/nym/pull/5201
[#5184]: https://github.com/nymtech/nym/pull/5184
[#5180]: https://github.com/nymtech/nym/pull/5180
[#5174]: https://github.com/nymtech/nym/pull/5174
[#5171]: https://github.com/nymtech/nym/pull/5171
[#4813]: https://github.com/nymtech/nym/pull/4813
## [2024.14-crunch-patched] (2024-12-17)
- Fixes an issue to allow previously registred clients to connect to latest nym-nodes
- Fixes compatibility issues between nym-nodes and older clients
## [2024.14-crunch] (2024-12-11)
- Merge/release/2024.14-crunch ([#5242])
Generated
+1366 -2367
View File
File diff suppressed because it is too large Load Diff
+90 -94
View File
@@ -48,12 +48,13 @@ members = [
"common/credentials-interface",
"common/crypto",
"common/dkg",
"common/ecash-double-spending",
"common/ecash-time",
"common/execute",
"common/exit-policy",
"common/gateway-requests",
"common/gateway-stats-storage",
"common/gateway-storage",
"common/gateway-stats-storage",
"common/http-api-client",
"common/http-api-common",
"common/inclusion-probability",
@@ -92,7 +93,6 @@ members = [
"common/topology",
"common/tun",
"common/types",
"common/verloc",
"common/wasm/client-core",
"common/wasm/storage",
"common/wasm/utils",
@@ -104,22 +104,6 @@ members = [
"explorer-api/explorer-client",
"gateway",
"integrations/bity",
"nym-api",
"nym-api/nym-api-requests",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-network-monitor",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-node-status-api/nym-node-status-client",
"nym-node/nym-node-metrics",
"nym-node/nym-node-requests",
"nym-outfox",
"nym-validator-rewarder",
"nyx-chain-watcher",
"sdk/ffi/cpp",
"sdk/ffi/go",
"sdk/ffi/shared",
@@ -128,16 +112,26 @@ members = [
"service-providers/common",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"nym-api",
"nym-api/nym-api-requests",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-network-monitor",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-node-status-api/nym-node-status-client",
"nym-outfox",
"nym-validator-rewarder",
"tools/echo-server",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/mixnet-connectivity-check",
# "tools/internal/sdk-version-bump",
"tools/internal/ssl-inject",
# "tools/internal/sdk-version-bump",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/nym-cli",
"tools/nym-id-cli",
@@ -149,6 +143,11 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract", "common/verloc", "tools/internal/mixnet-connectivity-check",
]
default-members = [
@@ -157,11 +156,11 @@ default-members = [
"explorer-api",
"nym-api",
"nym-credential-proxy/nym-credential-proxy",
"nym-data-observatory",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -172,6 +171,7 @@ exclude = [
"explorer",
"contracts",
"nym-wallet",
"nym-vpn/ui/src-tauri",
"cpu-cycles",
]
@@ -187,49 +187,45 @@ readme = "README.md"
[workspace.dependencies]
addr = "0.15.6"
aead = "0.5.2"
aes = "0.8.1"
aes-gcm = "0.10.1"
aes-gcm-siv = "0.11.1"
ammonia = "4"
anyhow = "1.0.95"
arc-swap = "1.7.1"
aead = "0.5.2"
anyhow = "1.0.90"
argon2 = "0.5.0"
async-trait = "0.1.86"
axum = "0.7.5"
async-trait = "0.1.83"
axum-client-ip = "0.6.1"
axum = "0.7.5"
axum-extra = "0.9.4"
axum-test = "16.2.0"
base64 = "0.22.1"
base85rs = "0.1.3"
bincode = "1.3.3"
bip39 = { version = "2.0.0", features = ["zeroize"] }
bit-vec = "0.7.0" # can we unify those?
bitvec = "1.0.0"
blake3 = "1.5.5"
blake3 = "1.5.4"
bloomfilter = "1.0.14"
bs58 = "0.5.1"
bytecodec = "0.4.15"
bytes = "1.7.2"
cargo_metadata = "0.18.1"
celes = "2.5.0"
celes = "2.4.0"
cfg-if = "1.0.0"
chacha20 = "0.9.0"
chacha20poly1305 = "0.10.1"
chrono = "0.4.39"
chrono = "0.4.31"
cipher = "0.4.3"
clap = "4.5.30"
clap = "4.5.20"
clap_complete = "4.5"
clap_complete_fig = "4.5"
colored = "2.2"
comfy-table = "7.1.4"
console = "0.15.10"
colored = "2.0"
comfy-table = "7.1.1"
console = "0.15.8"
console-subscriber = "0.1.1"
console_error_panic_hook = "0.1"
const-str = "0.5.6"
const_format = "0.2.34"
criterion = "0.5"
csv = "1.3.1"
const_format = "0.2.33"
criterion = "0.4"
csv = "1.3.0"
ctr = "0.9.1"
cupid = "0.6.1"
curve25519-dalek = "4.1"
@@ -242,33 +238,31 @@ doc-comment = "0.3"
dotenvy = "0.15.6"
ecdsa = "0.16"
ed25519-dalek = "2.1"
env_logger = "0.11.6"
envy = "0.4"
etherparse = "0.13.0"
envy = "0.4"
eyre = "0.6.9"
fastrand = "2.1.1"
flate2 = "1.0.35"
futures = "0.3.31"
flate2 = "1.0.34"
futures = "0.3.28"
futures-util = "0.3"
generic-array = "0.14.7"
getrandom = "0.2.10"
getset = "0.1.4"
getset = "0.1.3"
handlebars = "3.5.5"
headers = "0.4.0"
hex = "0.4.3"
hex-literal = "0.3.3"
hickory-resolver = "0.24.3"
hkdf = "0.12.3"
hmac = "0.12.1"
http = "1"
http-body-util = "0.1"
httpcodec = "0.2.3"
human-repr = "1.1.0"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "1.6.0"
human-repr = "1.1.0"
hyper = "1.4.1"
hyper-util = "0.1"
indicatif = "0.17.11"
indicatif = "0.17.8"
inquire = "0.6.2"
ip_network = "0.4.1"
ipnetwork = "0.20"
@@ -280,21 +274,22 @@ ledger-transport = "0.10.0"
ledger-transport-hid = "0.10.0"
log = "0.4"
maxminddb = "0.23.0"
rs_merkle = "1.4.2"
mime = "0.3.17"
moka = { version = "0.12", features = ["future"] }
nix = "0.27.1"
notify = "5.1.0"
okapi = "0.7.0"
once_cell = "1.20.3"
once_cell = "1.20.2"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
pin-project = "1.1"
pin-project-lite = "0.2.16"
pin-project-lite = "0.2.14"
pretty_env_logger = "0.4.0"
publicsuffix = "2.3.0"
publicsuffix = "2.2.3"
quote = "1"
rand = "0.8.5"
rand_chacha = "0.3"
@@ -308,59 +303,55 @@ reqwest = { version = "0.12.4", default-features = false }
rocket = "0.5.0"
rocket_cors = "0.6.0"
rocket_okapi = "0.8.0"
rs_merkle = "1.4.2"
safer-ffi = "0.1.13"
schemars = "0.8.21"
semver = "1.0.25"
serde = "1.0.217"
semver = "1.0.23"
serde = "1.0.211"
serde_bytes = "0.11.15"
serde_derive = "1.0"
serde_json = "1.0.138"
serde_json_path = "0.7.2"
serde_json = "1.0.132"
serde_json_path = "0.7.1"
serde_repr = "0.1"
serde_with = "3.9.0"
serde_yaml = "0.9.25"
sha2 = "0.10.8"
si-scale = "0.2.3"
sphinx-packet = "0.3.1"
sphinx-packet = "0.1.1"
sqlx = "0.7.4"
strum = "0.26"
strum_macros = "0.26"
subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.33.0"
sysinfo = "0.30.13"
tap = "1.0.1"
tar = "0.4.44"
tempfile = "3.15"
thiserror = "2.0"
time = "0.3.37"
tokio = "1.43"
tokio-postgres = "0.7"
tokio-stream = "0.1.17"
tar = "0.4.42"
tempfile = "3.14"
thiserror = "1.0.64"
time = "0.3.30"
tokio = "1.39"
tokio-stream = "0.1.16"
tokio-test = "0.4.4"
tokio-tun = "0.11.5"
tokio-tungstenite = { version = "0.20.1" }
tokio-util = "0.7.13"
toml = "0.8.20"
tower = "0.5.2"
tokio-util = "0.7.12"
toml = "0.8.14"
tower = "0.4.13"
tower-http = "0.5.2"
tracing = "0.1.41"
tracing-log = "0.2"
tracing = "0.1.37"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = "0.3.19"
tracing-subscriber = "0.3.16"
tracing-tree = "0.2.2"
ts-rs = "10.1.0"
tracing-log = "0.2"
ts-rs = "10.0.0"
tungstenite = { version = "0.20.1", default-features = false }
uniffi = "0.29.0"
uniffi_build = "0.29.0"
url = "2.5"
utoipa = "5.2"
utoipa-swagger-ui = "8.1"
utoipa-swagger-ui = "8.0"
utoipauto = "0.2"
uuid = "*"
vergen = { version = "=8.3.1", default-features = false }
walkdir = "2"
wasm-bindgen-test = "0.3.49"
wasm-bindgen-test = "0.3.43"
x25519-dalek = "2.0.0"
zeroize = "1.6.0"
@@ -391,26 +382,31 @@ cw4 = { version = "=1.1.2" }
cw-controllers = { version = "=1.1.0" }
# cosmrs-related
bip32 = { version = "0.5.3", default-features = false }
bip32 = { version = "0.5.2", default-features = false }
cosmrs = { version = "0.21.1" }
tendermint = "0.40.0"
tendermint-rpc = "0.40.0"
prost = { version = "0.13", default-features = false }
# temporarily using a fork again (yay.) because we need staking and slashing support (which are already on main but not released)
# plus response message parsing (which is, as of the time of writing this message, waiting to get merged)
#cosmrs = { path = "../cosmos-rust-fork/cosmos-rust/cosmrs" }
cosmrs = { git = "https://github.com/cosmos/cosmos-rust", rev = "4b1332e6d8258ac845cef71589c8d362a669675a" } # unfortuntely we need a fork by yours truly to get the staking support
tendermint = "0.37.0" # same version as used by cosmrs
tendermint-rpc = "0.37.0" # same version as used by cosmrs
prost = { version = "0.12", default-features = false }
# wasm-related dependencies
gloo-utils = "0.2.0"
gloo-net = "0.6.0"
gloo-net = "0.5.0"
indexed_db_futures = "0.6.0"
js-sys = "0.3.76"
# use a separate branch due to feature unification failures
# this is blocked until the upstream removes outdates `wasm_bindgen` feature usage
# indexed_db_futures = "0.4.1"
indexed_db_futures = { git = "https://github.com/TiemenSch/rust-indexed-db", branch = "update-uuid" }
js-sys = "0.3.70"
serde-wasm-bindgen = "0.6.5"
tsify = "0.4.5"
wasm-bindgen = "0.2.99"
wasm-bindgen-futures = "0.4.49"
wasmtimer = "0.4.1"
web-sys = "0.3.76"
wasm-bindgen = "0.2.95"
wasm-bindgen-futures = "0.4.45"
wasmtimer = "0.2.0"
web-sys = "0.3.72"
# Profile settings for individual crates
-23
View File
@@ -1,23 +0,0 @@
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
+7 -7
View File
@@ -13,8 +13,8 @@ The platform is composed of multiple Rust crates. Top-level executable binary cr
* `nym-client` - an executable which you can build into your own applications. Use it for interacting with Nym nodes.
* `nym-socks5-client` - a Socks5 proxy you can run on your machine and use with existing applications.
* `nym-explorer` - a (projected) block explorer and (existing) mixnet viewer.
* `nym-wallet` - a desktop wallet implemented using the [Tauri](https://tauri.app)) framework.
* `nym-cli` - a tool for interacting with the network from the CLI.
* `nym-wallet` - a desktop wallet implemented using the [Tauri](https://tauri.studio/en/docs/about/intro) framework.
* `nym-cli` - a tool for interacting with the network from the CLI.
<!-- coming soon
* `nym-network-monitor` - sends packets through the full system to check that they are working as expected, and stores node uptime histories as the basis of a rewards system ("mixmining" or "proof-of-mixing").
-->
@@ -42,10 +42,10 @@ client ───► Gateway ──┘ mix │ mix ┌─►mix ───►
References for developers:
* [Dev Docs](https://nym.com/docs/developers)
* [SDKs](https://nym.com/docs/developers/rust)
* [Network Docs](https://nym.com/docs/network)
* [Release Cycle - git flow](https://nym.com/docs/operators/release-cycle)
* [Dev Docs](https://nymtech.net/docs/developers)
* [SDKs](https://nymtech.net/docs/developers/rust)
* [Network Docs](https://nymtech.net/docs/network)
* [Release Cycle - git flow](https://nymtech.net/docs/operators/release-cycle)
### Developer chat
@@ -66,4 +66,4 @@ As a general approach, licensing is as follows this pattern:
- libraries and components are Apache 2.0 or MIT
- documentation is Apache 2.0 or CC0-1.0
Nym Node Operators and Validators Terms and Conditions can be found [here](https://nym.com/operators-validators-terms).
Nym Node Operators and Validators Temrs and Conditions can be found [here](https://nymtech.net/terms-and-conditions/operators/v1.0.0).
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.48"
version = "1.1.45"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+2 -6
View File
@@ -56,7 +56,7 @@ pub fn default_data_directory<P: AsRef<Path>>(id: P) -> PathBuf {
.join(DEFAULT_DATA_DIR)
}
#[derive(Debug, Deserialize, PartialEq, Serialize, Clone)]
#[derive(Debug, Deserialize, PartialEq, Serialize)]
pub struct Config {
#[serde(flatten)]
pub base: BaseClientConfig,
@@ -94,10 +94,6 @@ impl CliClientConfig for Config {
}
impl Config {
pub fn base(&self) -> BaseClientConfig {
self.base.clone()
}
pub fn new<S: AsRef<str>>(id: S) -> Self {
Config {
base: BaseClientConfig::new(id.as_ref(), env!("CARGO_PKG_VERSION")),
@@ -213,7 +209,7 @@ impl SocketType {
}
}
#[derive(Debug, Deserialize, PartialEq, Eq, Serialize, Clone)]
#[derive(Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Socket {
pub socket_type: SocketType,
@@ -107,8 +107,5 @@ enabled = {{ debug.stats_reporting.enabled }}
provider_address = '{{ debug.stats_reporting.provider_address }}'
reporting_interval = '{{ debug.stats_reporting.reporting_interval }}'
[debug.forget_me]
client = {{ debug.forget_me.client }}
stats = {{ debug.forget_me.stats }}
"#;
+6 -16
View File
@@ -20,7 +20,7 @@ pub use nym_sphinx::addressing::clients::Recipient;
pub mod config;
type NativeClientBuilder = BaseClientBuilder<QueryHttpRpcNyxdClient, OnDiskPersistent>;
type NativeClientBuilder<'a> = BaseClientBuilder<'a, QueryHttpRpcNyxdClient, OnDiskPersistent>;
pub struct SocketClient {
/// Client configuration options, including, among other things, packet sending rates,
@@ -32,10 +32,6 @@ pub struct SocketClient {
}
impl SocketClient {
pub fn config(&self) -> Config {
self.config.clone()
}
pub fn new(config: Config, custom_mixnet: Option<PathBuf>) -> Self {
SocketClient {
config,
@@ -49,7 +45,7 @@ impl SocketClient {
client_output: ClientOutput,
client_state: ClientState,
self_address: &Recipient,
task_client: nym_task::TaskClient,
shutdown: nym_task::TaskClient,
packet_type: PacketType,
) {
info!("Starting websocket listener...");
@@ -77,15 +73,10 @@ impl SocketClient {
shared_lane_queue_lengths,
reply_controller_sender,
Some(packet_type),
task_client.fork("websocket_handler"),
);
websocket::Listener::new(
config.socket.host,
config.socket.listening_port,
task_client.with_suffix("websocket_listener"),
)
.start(websocket_handler);
websocket::Listener::new(config.socket.host, config.socket.listening_port)
.start(websocket_handler, shutdown);
}
/// blocking version of `start_socket` method. Will run forever (or until SIGINT is sent)
@@ -117,9 +108,8 @@ impl SocketClient {
let storage = self.initialise_storage().await?;
let user_agent = nym_bin_common::bin_info!().into();
let mut base_client =
BaseClientBuilder::new(self.config().base(), storage, dkg_query_client)
.with_user_agent(user_agent);
let mut base_client = BaseClientBuilder::new(&self.config.base, storage, dkg_query_client)
.with_user_agent(user_agent);
if let Some(custom_mixnet) = &self.custom_mixnet {
base_client = base_client.with_stored_topology(custom_mixnet)?;
-1
View File
@@ -82,7 +82,6 @@ impl From<Init> for OverrideConfig {
nyxd_urls: init_config.common_args.nyxd_urls,
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
stats_reporting_address: init_config.common_args.stats_reporting_address,
forget_me: init_config.common_args.forget_me.into(),
}
}
}
-3
View File
@@ -16,7 +16,6 @@ use nym_bin_common::completions::{fig_generate, ArgShell};
use nym_client::client::Recipient;
use nym_client_core::cli_helpers::CliClient;
use nym_client_core::client::base_client::storage::migration_helpers::v1_1_33;
use nym_client_core::config::ForgetMe;
use nym_config::OptionalSet;
use std::error::Error;
use std::net::IpAddr;
@@ -107,7 +106,6 @@ pub(crate) struct OverrideConfig {
nyxd_urls: Option<Vec<url::Url>>,
enabled_credentials_mode: Option<bool>,
stats_reporting_address: Option<Recipient>,
forget_me: ForgetMe,
}
pub(crate) async fn execute(args: Cli) -> Result<(), Box<dyn Error + Send + Sync>> {
@@ -135,7 +133,6 @@ pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
args.fastmode,
)
.with_base(BaseClientConfig::with_disabled_cover_traffic, args.no_cover)
.with_base(BaseClientConfig::with_forget_me, args.forget_me)
.with_optional(Config::with_port, args.port)
.with_optional(Config::with_host, args.host)
.with_optional_custom_env_ext(
-1
View File
@@ -41,7 +41,6 @@ impl From<Run> for OverrideConfig {
nyxd_urls: run_config.common_args.nyxd_urls,
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
stats_reporting_address: run_config.common_args.stats_reporting_address,
forget_me: run_config.common_args.forget_me.into(),
}
}
}
+40 -66
View File
@@ -19,7 +19,6 @@ use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::connections::{
ConnectionCommand, ConnectionCommandSender, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::TaskClient;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::Instant;
@@ -44,11 +43,9 @@ pub(crate) struct HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
}
impl HandlerBuilder {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
msg_input: InputMessageSender,
client_connection_tx: ConnectionCommandSender,
@@ -57,7 +54,6 @@ impl HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
) -> Self {
Self {
msg_input,
@@ -67,14 +63,11 @@ impl HandlerBuilder {
lane_queue_lengths,
reply_controller_sender,
packet_type,
task_client,
}
}
// TODO: make sure we only ever have one active handler
pub fn create_active_handler(&self) -> Handler {
let mut task_client = self.task_client.fork("active_handler");
task_client.disarm();
Handler {
msg_input: self.msg_input.clone(),
client_connection_tx: self.client_connection_tx.clone(),
@@ -85,7 +78,6 @@ impl HandlerBuilder {
lane_queue_lengths: self.lane_queue_lengths.clone(),
reply_controller_sender: self.reply_controller_sender.clone(),
packet_type: self.packet_type,
task_client,
}
}
}
@@ -100,18 +92,16 @@ pub(crate) struct Handler {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
task_client: TaskClient,
}
impl Drop for Handler {
fn drop(&mut self) {
if let Err(err) = self
if self
.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect)
.is_err()
{
if !self.task_client.is_shutdown_poll() {
error!("failed to disconnect the receiver from the buffer: {err}");
}
error!("we failed to disconnect the receiver from the buffer! presumably the shutdown procedure has been initiated!")
}
}
}
@@ -135,23 +125,10 @@ impl Handler {
};
// get the number of pending replies waiting for reply surbs
let reply_queue_length = match self
let reply_queue_length = self
.reply_controller_sender
.get_lane_queue_length(connection_id)
.await
{
Ok(length) => length,
Err(err) => {
if !self.task_client.is_shutdown_poll() {
error!(
"Failed to get reply queue length for connection {connection_id}: {err}"
);
}
// We're just going to assume that the queue is empty, and I think that's okay
// during shutdown.
0
}
};
.await;
let queue_length = base_length + reply_queue_length;
@@ -191,11 +168,10 @@ impl Handler {
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send message to the input buffer: {err}");
}
}
self.msg_input
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
let TransmissionLane::ConnectionId(connection_id) = lane else {
@@ -224,11 +200,10 @@ impl Handler {
let input_msg =
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send anonymous message to the input buffer: {err}");
}
}
self.msg_input
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
let TransmissionLane::ConnectionId(connection_id) = lane else {
@@ -252,11 +227,10 @@ impl Handler {
});
let input_msg = InputMessage::new_reply(recipient_tag, message, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send reply message to the input buffer: {err}");
}
}
self.msg_input
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
let TransmissionLane::ConnectionId(connection_id) = lane else {
@@ -271,14 +245,9 @@ impl Handler {
}
fn handle_closed_connection(&self, connection_id: u64) -> Option<ServerResponse> {
if let Err(err) = self
.client_connection_tx
self.client_connection_tx
.unbounded_send(ConnectionCommand::Close(connection_id))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send close connection command: {err}");
}
}
.unwrap();
None
}
@@ -393,10 +362,11 @@ impl Handler {
}
}
async fn listen_for_requests(&mut self, mut msg_receiver: ReconstructedMessagesReceiver) {
let mut task_client = self.task_client.fork("select");
task_client.disarm();
async fn listen_for_requests(
&mut self,
mut msg_receiver: ReconstructedMessagesReceiver,
mut task_client: nym_task::TaskClient,
) {
while !task_client.is_shutdown() {
tokio::select! {
// we can either get a client request from the websocket
@@ -445,7 +415,15 @@ impl Handler {
}
// consume self to make sure `drop` is called after this is done
pub(crate) async fn handle_connection(mut self, socket: TcpStream) {
pub(crate) async fn handle_connection(
mut self,
socket: TcpStream,
mut task_client: nym_task::TaskClient,
) {
// We don't want a crash in the connection handler to trigger a shutdown of the whole
// process.
task_client.disarm();
let ws_stream = match accept_async(socket).await {
Ok(ws_stream) => ws_stream,
Err(err) => {
@@ -458,18 +436,14 @@ impl Handler {
let (reconstructed_sender, reconstructed_receiver) = mpsc::unbounded();
// tell the buffer to start sending stuff to us
if let Err(err) =
self.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverAnnounce(
reconstructed_sender,
))
{
if !self.task_client.is_shutdown_poll() {
error!("failed to announce the receiver to the buffer: {err}");
}
}
self.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverAnnounce(
reconstructed_sender,
))
.expect("the buffer request failed!");
self.listen_for_requests(reconstructed_receiver).await;
self.listen_for_requests(reconstructed_receiver, task_client)
.await;
}
}
+17 -11
View File
@@ -3,7 +3,6 @@
use super::handler::HandlerBuilder;
use log::*;
use nym_task::TaskClient;
use std::net::IpAddr;
use std::{net::SocketAddr, process, sync::Arc};
use tokio::io::AsyncWriteExt;
@@ -23,19 +22,21 @@ impl State {
pub(crate) struct Listener {
address: SocketAddr,
state: State,
task_client: TaskClient,
}
impl Listener {
pub(crate) fn new(host: IpAddr, port: u16, task_client: TaskClient) -> Self {
pub(crate) fn new(host: IpAddr, port: u16) -> Self {
Listener {
address: SocketAddr::new(host, port),
state: State::AwaitingConnection,
task_client,
}
}
pub(crate) async fn run(&mut self, handler: HandlerBuilder) {
pub(crate) async fn run(
&mut self,
handler: HandlerBuilder,
mut task_client: nym_task::TaskClient,
) {
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
Ok(listener) => listener,
Err(err) => {
@@ -46,11 +47,11 @@ impl Listener {
let notify = Arc::new(Notify::new());
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
// When the handler finishes we check if shutdown is signalled
_ = notify.notified() => {
if self.task_client.is_shutdown() {
if task_client.is_shutdown() {
log::trace!("Websocket listener: detected shutdown after connection closed");
break;
}
@@ -59,7 +60,7 @@ impl Listener {
}
// ... but when there is no connected client at the time of shutdown being
// signalled, we handle it here.
_ = self.task_client.recv() => {
_ = task_client.recv() => {
if !self.state.is_connected() {
log::trace!("Not connected: shutting down");
break;
@@ -87,8 +88,9 @@ impl Listener {
// hanging because the executor doesn't come back here
let notify_clone = Arc::clone(&notify);
let fresh_handler = handler.create_active_handler();
let task_client_handler = task_client.clone();
tokio::spawn(async move {
fresh_handler.handle_connection(socket).await;
fresh_handler.handle_connection(socket, task_client_handler).await;
notify_clone.notify_one();
});
self.state = State::Connected;
@@ -102,9 +104,13 @@ impl Listener {
log::debug!("Websocket listener: Exiting");
}
pub(crate) fn start(mut self, handler: HandlerBuilder) -> JoinHandle<()> {
pub(crate) fn start(
mut self,
handler: HandlerBuilder,
shutdown: nym_task::TaskClient,
) -> JoinHandle<()> {
info!("Running websocket on {:?}", self.address.to_string());
tokio::spawn(async move { self.run(handler).await })
tokio::spawn(async move { self.run(handler, shutdown).await })
}
}
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.48"
version = "1.1.45"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
-1
View File
@@ -93,7 +93,6 @@ impl From<Init> for OverrideConfig {
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
outfox: false,
stats_reporting_address: init_config.common_args.stats_reporting_address,
forget_me: init_config.common_args.forget_me.into(),
}
}
}
+1 -3
View File
@@ -17,7 +17,7 @@ use nym_bin_common::completions::{fig_generate, ArgShell};
use nym_client_core::cli_helpers::CliClient;
use nym_client_core::client::base_client::storage::migration_helpers::v1_1_33;
use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup;
use nym_client_core::config::{ForgetMe, GroupBy, TopologyStructure};
use nym_client_core::config::{GroupBy, TopologyStructure};
use nym_config::OptionalSet;
use nym_sphinx::addressing::Recipient;
use nym_sphinx::params::{PacketSize, PacketType};
@@ -113,7 +113,6 @@ pub(crate) struct OverrideConfig {
enabled_credentials_mode: Option<bool>,
outfox: bool,
stats_reporting_address: Option<Recipient>,
forget_me: ForgetMe,
}
pub(crate) async fn execute(args: Cli) -> Result<(), Box<dyn Error + Send + Sync>> {
@@ -180,7 +179,6 @@ pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
BaseClientConfig::with_topology_structure,
topology_structure,
)
.with_base(BaseClientConfig::with_forget_me, args.forget_me)
.with_optional(Config::with_anonymous_replies, args.use_anonymous_replies)
.with_optional(Config::with_port, args.port)
.with_optional(Config::with_ip, args.ip)
-1
View File
@@ -65,7 +65,6 @@ impl From<Run> for OverrideConfig {
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
outfox: run_config.outfox,
stats_reporting_address: run_config.common_args.stats_reporting_address,
forget_me: run_config.common_args.forget_me.into(),
}
}
}
-4
View File
@@ -113,8 +113,4 @@ enabled = {{ core.debug.stats_reporting.enabled }}
provider_address = '{{ core.debug.stats_reporting.provider_address }}'
reporting_interval = '{{ core.debug.stats_reporting.reporting_interval }}'
[core.debug.forget_me]
client = {{ core.debug.forget_me.client }}
stats = {{ core.debug.forget_me.stats }}
"#;
@@ -28,7 +28,7 @@ pub type HmacSha256 = Hmac<Sha256>;
pub type Nonce = u64;
pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: u64 = 250 * 1024 * 1024 * 1024; // 250 GB
pub const BANDWIDTH_CAP_PER_DAY: u64 = 1024 * 1024 * 1024; // 1 GB
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct IpPair {
@@ -60,7 +60,7 @@ impl From<IpAddr> for IpPair {
std::net::IpAddr::V4(ipv4_addr) => (ipv4_addr.octets()[2], ipv4_addr.octets()[3]),
std::net::IpAddr::V6(ipv6_addr) => (ipv6_addr.octets()[14], ipv6_addr.octets()[15]),
};
let last_bytes = ((before_last_byte as u16) << 8) | last_byte as u16;
let last_bytes = (before_last_byte as u16) << 8 | last_byte as u16;
let ipv4 = Ipv4Addr::new(
WG_TUN_DEVICE_IP_ADDRESS_V4.octets()[0],
WG_TUN_DEVICE_IP_ADDRESS_V4.octets()[1],
+2 -3
View File
@@ -3,7 +3,7 @@ name = "nym-client-core"
version = "1.1.15"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
rust-version = "1.76"
rust-version = "1.70"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -40,13 +40,12 @@ nym-crypto = { path = "../crypto" }
nym-explorer-client = { path = "../../explorer-api/explorer-client" }
nym-gateway-client = { path = "../client-libs/gateway-client" }
nym-gateway-requests = { path = "../gateway-requests" }
nym-http-api-client = { path = "../http-api-client" }
nym-metrics = { path = "../nym-metrics" }
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
nym-pemstore = { path = "../pemstore" }
nym-topology = { path = "../topology", features = ["persistence"] }
nym-topology = { path = "../topology", features = ["serializable"] }
nym-mixnet-client = { path = "../client-libs/mixnet-client", default-features = false }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
nym-task = { path = "../task" }
+1 -88
View File
@@ -145,11 +145,6 @@ impl Config {
self
}
pub fn with_forget_me(mut self, forget_me: ForgetMe) -> Self {
self.debug.forget_me = forget_me;
self
}
// TODO: this should be refactored properly
// as of 12.09.23 the below is true (not sure how this comment will rot in the future)
// medium_toggle:
@@ -522,7 +517,7 @@ impl Default for Acknowledgements {
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct Topology {
/// The uniform delay every which clients are querying the directory server
/// to try to obtain a compatible network topology to send sphinx packets through.
@@ -555,18 +550,6 @@ pub struct Topology {
/// Specifies a minimum performance of a gateway that is used on route construction.
/// This setting is only applicable when `NymApi` topology is used.
pub minimum_gateway_performance: u8,
/// Specifies whether this client should attempt to retrieve all available network nodes
/// as opposed to just active mixnodes/gateways.
pub use_extended_topology: bool,
/// Specifies whether this client should ignore the current epoch role of the target egress node
/// when constructing the final hop packets.
pub ignore_egress_epoch_role: bool,
/// Specifies whether this client should ignore the current epoch role of the ingress node
/// when attempting to establish new connection
pub ignore_ingress_epoch_role: bool,
}
#[allow(clippy::large_enum_variant)]
@@ -603,10 +586,6 @@ impl Default for Topology {
topology_structure: TopologyStructure::default(),
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
use_extended_topology: false,
ignore_egress_epoch_role: true,
ignore_ingress_epoch_role: true,
}
}
}
@@ -724,9 +703,6 @@ pub struct DebugConfig {
/// Defines all configuration options related to stats reporting.
pub stats_reporting: StatsReporting,
/// Defines all configuration options related to the forget me flag.
pub forget_me: ForgetMe,
}
impl DebugConfig {
@@ -749,69 +725,6 @@ impl Default for DebugConfig {
topology: Default::default(),
reply_surbs: Default::default(),
stats_reporting: Default::default(),
forget_me: Default::default(),
}
}
}
#[derive(Clone, Default, Debug, Deserialize, PartialEq, Serialize, Copy)]
pub struct ForgetMe {
client: bool,
stats: bool,
}
impl From<bool> for ForgetMe {
fn from(value: bool) -> Self {
if value {
Self::new_all()
} else {
Self::new_none()
}
}
}
impl ForgetMe {
pub fn new_all() -> Self {
Self {
client: true,
stats: true,
}
}
pub fn new_client() -> Self {
Self {
client: true,
stats: false,
}
}
pub fn new_stats() -> Self {
Self {
client: false,
stats: true,
}
}
pub fn new(client: bool, stats: bool) -> Self {
Self { client, stats }
}
pub fn any(&self) -> bool {
self.client || self.stats
}
pub fn client(&self) -> bool {
self.client
}
pub fn stats(&self) -> bool {
self.stats
}
pub fn new_none() -> Self {
Self {
client: false,
stats: false,
}
}
}
@@ -182,7 +182,7 @@ impl From<ConfigV5> for Config {
maximum_reply_key_age: value.debug.reply_surbs.maximum_reply_key_age,
surb_mix_hops: value.debug.reply_surbs.surb_mix_hops,
},
..Default::default()
stats_reporting: Default::default(),
},
}
}
@@ -8,10 +8,7 @@ use crate::{
},
};
use log::{debug, error};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use sqlx::ConnectOptions;
use std::path::Path;
#[derive(Debug, Clone)]
@@ -33,9 +30,6 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -116,7 +110,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
INSERT INTO registered_gateway(gateway_id_bs58, registration_timestamp, gateway_type)
VALUES (?, ?, ?)
"#,
registered_gateway.gateway_id_bs58,
@@ -230,7 +224,7 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
INSERT INTO custom_gateway_details(gateway_id_bs58, data)
VALUES (?, ?)
"#,
custom.gateway_id_bs58,
@@ -15,7 +15,6 @@ pub mod error;
mod manager;
mod models;
#[derive(Clone)]
pub struct OnDiskGatewaysDetails {
manager: StorageManager,
}
@@ -20,12 +20,12 @@ pub enum InMemStorageError {
MalformedGateway(#[from] BadGateway),
}
#[derive(Clone, Debug, Default)]
#[derive(Debug, Default)]
pub struct InMemGatewaysDetails {
inner: Arc<RwLock<InMemStorageInner>>,
}
#[derive(Clone, Debug, Default)]
#[derive(Debug, Default)]
struct InMemStorageInner {
active_gateway: Option<String>,
gateways: HashMap<String, GatewayRegistration>,
@@ -112,15 +112,14 @@ where
source,
}
})?;
hardcoded_topology.entry_capable_nodes().cloned().collect()
hardcoded_topology.get_gateways()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::gateways_for_init(
crate::init::helpers::current_gateways(
&mut rng,
&core.client.nym_api_urls,
user_agent,
core.debug.topology.minimum_gateway_performance,
core.debug.topology.ignore_ingress_epoch_role,
)
.await?
};
@@ -129,7 +128,7 @@ where
// make sure the list of available gateways doesn't overlap the list of known gateways
let available_gateways = available_gateways
.into_iter()
.filter(|g| !registered_gateways.contains(&g.identity()))
.filter(|g| !registered_gateways.contains(g.identity()))
.collect::<Vec<_>>();
if available_gateways.is_empty() {
@@ -93,10 +93,6 @@ pub struct CommonClientInitArgs {
/// Sets the address to report statistics
#[cfg_attr(feature = "cli", clap(long, hide = true))]
pub stats_reporting_address: Option<Recipient>,
/// Sets the forget me flag
#[cfg_attr(feature = "cli", clap(long, hide = true, default_value_t = false))]
pub forget_me: bool,
}
pub struct InitResultsWithConfig<T> {
@@ -171,15 +167,14 @@ where
source,
}
})?;
hardcoded_topology.entry_capable_nodes().cloned().collect()
hardcoded_topology.get_gateways()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::gateways_for_init(
crate::init::helpers::current_gateways(
&mut rng,
&core.client.nym_api_urls,
user_agent,
core.debug.topology.minimum_gateway_performance,
core.debug.topology.ignore_ingress_epoch_role,
)
.await?
};
@@ -61,8 +61,4 @@ pub struct CommonClientRunArgs {
/// Sets the address to report statistics
#[cfg_attr(feature = "cli", clap(long, hide = true))]
pub stats_reporting_address: Option<Recipient>,
/// Sets the forget me flag
#[cfg_attr(feature = "cli", clap(long, hide = true, default_value_t = false))]
pub forget_me: bool,
}
+41 -104
View File
@@ -1,9 +1,9 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::mix_traffic::ClientRequestSender;
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
@@ -24,7 +24,7 @@ use crate::client::replies::reply_storage::{
};
use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::{
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
nym_api_provider, TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
};
use crate::config::{Config, DebugConfig};
use crate::error::ClientCoreError;
@@ -36,11 +36,9 @@ use crate::{config, spawn_future};
use futures::channel::mpsc;
use log::*;
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_config_types::ForgetMe;
use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::{encryption, identity};
use nym_crypto::hkdf::DerivationMaterial;
use nym_gateway_client::client::config::GatewayClientConfig;
use nym_gateway_client::{
AcknowledgementReceiver, GatewayClient, GatewayConfig, MixnetMessageReceiver, PacketRouter,
@@ -178,8 +176,8 @@ impl From<bool> for CredentialsToggle {
}
}
pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
config: Config,
pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
config: &'a Config,
client_store: S,
dkg_query_client: Option<C>,
@@ -190,23 +188,18 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
user_agent: Option<UserAgent>,
setup_method: GatewaySetup,
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
derivation_material: Option<DerivationMaterial>,
}
impl<C, S> BaseClientBuilder<C, S>
impl<'a, C, S> BaseClientBuilder<'a, C, S>
where
S: MixnetClientStorage + 'static,
C: DkgQueryClient + Send + Sync + 'static,
{
pub fn new(
base_config: Config,
base_config: &'a Config,
client_store: S,
dkg_query_client: Option<C>,
) -> BaseClientBuilder<C, S> {
) -> BaseClientBuilder<'a, C, S> {
BaseClientBuilder {
config: base_config,
client_store,
@@ -217,27 +210,9 @@ where
shutdown: None,
user_agent: None,
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
connection_fd_callback: None,
derivation_material: None,
}
}
#[must_use]
pub fn with_derivation_material(
mut self,
derivation_material: Option<DerivationMaterial>,
) -> Self {
self.derivation_material = derivation_material;
self
}
#[must_use]
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
self.config.debug.forget_me = *forget_me;
self
}
#[must_use]
pub fn with_gateway_setup(mut self, setup: GatewaySetup) -> Self {
self.setup_method = setup;
@@ -286,15 +261,6 @@ where
Ok(self)
}
#[cfg(unix)]
pub fn with_connection_fd_callback(
mut self,
callback: Arc<dyn Fn(RawFd) + Send + Sync>,
) -> Self {
self.connection_fd_callback = Some(callback);
self
}
// note: do **NOT** make this method public as its only valid usage is from within `start_base`
// because it relies on the crypto keys being already loaded
fn mix_address(details: &InitialisationResult) -> Recipient {
@@ -310,7 +276,7 @@ where
topology_accessor: TopologyAccessor,
mix_tx: BatchMixMessageSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
shutdown: TaskClient,
) {
info!("Starting loop cover traffic stream...");
@@ -323,10 +289,9 @@ where
debug_config.traffic,
debug_config.cover_traffic,
stats_tx,
task_client,
);
stream.start();
stream.start_with_shutdown(shutdown);
}
#[allow(clippy::too_many_arguments)]
@@ -341,7 +306,7 @@ where
reply_controller_receiver: ReplyControllerReceiver,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
task_client: TaskClient,
shutdown: TaskClient,
packet_type: PacketType,
stats_tx: ClientStatsSender,
) {
@@ -359,9 +324,8 @@ where
lane_queue_lengths,
client_connection_rx,
stats_tx,
task_client,
)
.start(packet_type);
.start_with_shutdown(shutdown, packet_type);
}
// buffer controlling all messages fetched from provider
@@ -384,12 +348,10 @@ where
reply_key_storage,
reply_controller_sender,
metrics_reporter,
shutdown,
);
controller.start()
controller.start_with_shutdown(shutdown)
}
#[allow(clippy::too_many_arguments)]
async fn start_gateway_client(
config: &Config,
initialisation_result: InitialisationResult,
@@ -397,7 +359,6 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
shutdown: TaskClient,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
@@ -440,8 +401,6 @@ where
packet_router,
bandwidth_controller,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
};
@@ -478,8 +437,8 @@ where
details_store
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
.await.map_err(|err| {
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
})?
}
@@ -487,7 +446,6 @@ where
.claim_initial_bandwidth()
.await
.map_err(gateway_failure)?;
gateway_client
.start_listening_for_mixnet_messages()
.map_err(gateway_failure)?;
@@ -504,7 +462,6 @@ where
details_store: &S::GatewaysDetailsStore,
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
mut shutdown: TaskClient,
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
where
@@ -536,8 +493,6 @@ where
details_store,
packet_router,
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown,
)
.await?;
@@ -554,15 +509,15 @@ where
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or_else(|| match config_topology.topology_structure {
config::TopologyStructure::NymApi => Box::new(NymApiTopologyProvider::new(
config_topology,
nym_api_provider::Config {
min_mixnode_performance: config_topology.minimum_mixnode_performance,
min_gateway_performance: config_topology.minimum_gateway_performance,
},
nym_api_urls,
user_agent,
)),
config::TopologyStructure::GeoAware(group_by) => {
warn!("using deprecated 'GeoAware' topology provider - this option will be removed very soon");
#[allow(deprecated)]
Box::new(crate::client::topology_control::GeoAwareTopologyProvider::new(nym_api_urls, group_by))
Box::new(GeoAwareTopologyProvider::new(nym_api_urls, group_by))
}
})
}
@@ -573,24 +528,17 @@ where
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
topology_config: config::Topology,
topology_accessor: TopologyAccessor,
local_gateway: NodeIdentity,
local_gateway: &NodeIdentity,
wait_for_gateway: bool,
mut task_client: TaskClient,
mut shutdown: TaskClient,
) -> Result<(), ClientCoreError> {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The background topology refesher is not going to be started");
task_client.disarm();
}
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
task_client,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
@@ -612,7 +560,7 @@ where
};
if let Err(err) = topology_refresher
.ensure_contains_routable_egress(local_gateway)
.ensure_contains_gateway(local_gateway)
.await
{
if let Some(waiting_timeout) = gateway_wait_timeout {
@@ -631,11 +579,15 @@ where
}
}
if !topology_config.disable_refreshing {
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
info!("The topology refesher is not going to be started");
shutdown.disarm();
} else {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
info!("Starting topology refresher...");
topology_refresher.start();
topology_refresher.start_with_shutdown(shutdown);
}
Ok(())
@@ -646,29 +598,28 @@ where
user_agent: Option<UserAgent>,
client_stats_id: String,
input_sender: Sender<InputMessage>,
task_client: TaskClient,
shutdown: TaskClient,
) -> ClientStatsSender {
info!("Starting statistics control...");
StatisticsControl::create_and_start(
StatisticsControl::create_and_start_with_shutdown(
config.debug.stats_reporting,
user_agent
.map(|u| u.application)
.unwrap_or("unknown".to_string()),
client_stats_id,
input_sender.clone(),
task_client,
shutdown.with_suffix("controller"),
)
}
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown: TaskClient,
) -> (BatchMixMessageSender, ClientRequestSender) {
) -> BatchMixMessageSender {
info!("Starting mix traffic controller...");
let (mix_traffic_controller, mix_tx, client_tx) =
MixTrafficController::new(gateway_transceiver, shutdown);
mix_traffic_controller.start();
(mix_tx, client_tx)
let (mix_traffic_controller, mix_tx) = MixTrafficController::new(gateway_transceiver);
mix_traffic_controller.start_with_shutdown(shutdown);
mix_tx
}
// TODO: rename it as it implies the data is persistent whilst one can use InMemBackend
@@ -703,7 +654,6 @@ where
setup_method: GatewaySetup,
key_store: &S::KeyStore,
details_store: &S::GatewaysDetailsStore,
derivation_material: Option<DerivationMaterial>,
) -> Result<InitialisationResult, ClientCoreError>
where
<S::KeyStore as KeyStore>::StorageError: Sync + Send,
@@ -713,12 +663,7 @@ where
if key_store.load_keys().await.is_err() {
info!("could not find valid client keys - a new set will be generated");
let mut rng = OsRng;
let keys = if let Some(derivation_material) = derivation_material {
ClientKeys::from_master_key(&mut rng, &derivation_material)
.map_err(|_| ClientCoreError::HkdfDerivationError {})?
} else {
ClientKeys::generate_new(&mut rng)
};
let keys = ClientKeys::generate_new(&mut rng);
store_client_keys(keys, key_store).await?;
}
@@ -740,7 +685,6 @@ where
self.setup_method,
self.client_store.key_store(),
self.client_store.gateway_details_store(),
self.derivation_material,
)
.await?;
@@ -764,8 +708,7 @@ where
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor =
TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
let shared_topology_accessor = TopologyAccessor::new();
// Shutdown notifier for signalling tasks to stop
let shutdown = self
@@ -797,7 +740,7 @@ where
);
let stats_reporter = Self::start_statistics_control(
&self.config,
self.config,
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
@@ -823,14 +766,12 @@ where
let gateway_transceiver = Self::setup_gateway_transceiver(
self.custom_gateway_transceiver,
&self.config,
self.config,
init_res,
bandwidth_controller,
&details_store,
gateway_packet_router,
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
shutdown.fork("gateway_transceiver"),
)
.await?;
@@ -856,8 +797,7 @@ where
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
// The MixTrafficController then sends the actual traffic
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
let message_sender = Self::start_mix_traffic_controller(
gateway_transceiver,
shutdown.fork("mix_traffic_controller"),
);
@@ -934,8 +874,6 @@ where
},
stats_reporter,
task_handle: shutdown,
client_request_sender,
forget_me: self.config.debug.forget_me,
})
}
}
@@ -947,7 +885,6 @@ pub struct BaseClient {
pub client_output: ClientOutputStatus,
pub client_state: ClientState,
pub stats_reporter: ClientStatsSender,
pub client_request_sender: ClientRequestSender,
pub task_handle: TaskHandle,
pub forget_me: ForgetMe,
}
@@ -4,8 +4,6 @@
// TODO: combine those more closely. Perhaps into a single underlying store.
// Like for persistent, on-disk, storage, what's the point of having 3 different databases?
use rand::rngs::OsRng;
use crate::client::key_manager::persistence::{InMemEphemeralKeys, KeyStore};
use crate::client::replies::reply_storage;
use crate::client::replies::reply_storage::ReplyStorageBackend;
@@ -65,7 +63,7 @@ pub trait MixnetClientStorage {
fn gateway_details_store(&self) -> &Self::GatewaysDetailsStore;
}
#[derive(Clone)]
#[derive(Default)]
pub struct Ephemeral {
key_store: InMemEphemeralKeys,
reply_store: reply_storage::Empty,
@@ -73,14 +71,9 @@ pub struct Ephemeral {
gateway_details_store: InMemGatewaysDetails,
}
impl Default for Ephemeral {
fn default() -> Self {
Ephemeral {
key_store: InMemEphemeralKeys::new(&mut OsRng),
reply_store: Default::default(),
credential_store: Default::default(),
gateway_details_store: Default::default(),
}
impl Ephemeral {
pub fn new() -> Self {
Default::default()
}
}
@@ -121,7 +114,6 @@ impl MixnetClientStorage for Ephemeral {
}
}
#[derive(Clone)]
#[cfg(all(
not(target_arch = "wasm32"),
feature = "fs-surb-storage",
@@ -13,7 +13,6 @@ use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::TaskClient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -65,8 +64,6 @@ where
packet_type: PacketType,
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
impl<R> Stream for LoopCoverTrafficStream<R>
@@ -113,7 +110,6 @@ impl LoopCoverTrafficStream<OsRng> {
traffic_config: config::Traffic,
cover_config: config::CoverTraffic,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let rng = OsRng;
@@ -132,7 +128,6 @@ impl LoopCoverTrafficStream<OsRng> {
secondary_packet_size: traffic_config.secondary_packet_size,
packet_type: traffic_config.packet_type,
stats_tx,
task_client,
}
}
@@ -168,7 +163,6 @@ impl LoopCoverTrafficStream<OsRng> {
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.our_full_destination,
Some(&self.our_full_destination),
@@ -180,7 +174,7 @@ impl LoopCoverTrafficStream<OsRng> {
}
};
let cover_message = match generate_loop_cover_packet(
let cover_message = generate_loop_cover_packet(
&mut self.rng,
topology_ref,
&self.ack_key,
@@ -189,15 +183,8 @@ impl LoopCoverTrafficStream<OsRng> {
self.cover_traffic.loop_cover_traffic_average_delay,
cover_traffic_packet_size,
self.packet_type,
) {
Ok(cover_message) => cover_message,
Err(err) => {
warn!(
"Somehow failed to generate a loop cover message with a valid topology: {err}"
);
return;
}
};
)
.expect("Somehow failed to generate a loop cover message with a valid topology");
if let Err(err) = self.mix_tx.try_send(vec![cover_message]) {
match err {
@@ -229,7 +216,7 @@ impl LoopCoverTrafficStream<OsRng> {
tokio::task::yield_now().await;
}
pub fn start(mut self) {
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
if self.cover_traffic.disable_loop_cover_traffic_stream {
// we should have never got here in the first place - the task should have never been created to begin with
// so panic and review the code that lead to this branch
@@ -243,8 +230,6 @@ impl LoopCoverTrafficStream<OsRng> {
);
self.set_next_delay(sampled);
let mut shutdown = self.task_client.fork("select");
spawn_future(async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
@@ -28,6 +28,7 @@ pub enum InputMessage {
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Creates a message used for a duplex anonymous communication where the recipient
@@ -43,6 +44,7 @@ pub enum InputMessage {
data: Vec<u8>,
reply_surbs: u32,
lane: TransmissionLane,
mix_hops: Option<u8>,
},
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
@@ -92,6 +94,29 @@ impl InputMessage {
recipient,
data,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
} else {
message
}
}
// IMHO `new_regular` should take `mix_hops: Option<u8>` as an argument instead of creating
// this function, but that would potentially break backwards compatibility with the current API
pub fn new_regular_with_custom_hops(
recipient: Recipient,
data: Vec<u8>,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Regular {
recipient,
data,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -112,6 +137,7 @@ impl InputMessage {
data,
reply_surbs,
lane,
mix_hops: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -128,12 +154,14 @@ impl InputMessage {
reply_surbs: u32,
lane: TransmissionLane,
packet_type: Option<PacketType>,
mix_hops: Option<u8>,
) -> Self {
let message = InputMessage::Anonymous {
recipient,
data,
reply_surbs,
lane,
mix_hops,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -2,10 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::key_manager::persistence::KeyStore;
use nym_crypto::{
asymmetric::{encryption, identity},
hkdf::{DerivationMaterial, InvalidLength},
};
use nym_crypto::asymmetric::{encryption, identity};
use nym_gateway_requests::shared_key::{LegacySharedKeys, SharedGatewayKey, SharedSymmetricKey};
use nym_sphinx::acknowledgements::AckKey;
use rand::{CryptoRng, RngCore};
@@ -13,7 +10,6 @@ use std::sync::Arc;
use zeroize::ZeroizeOnDrop;
pub mod persistence;
mod test;
// Note: to support key rotation in the future, all keys will require adding an extra smart pointer,
// most likely an AtomicCell, or if it doesn't work as I think it does, a Mutex. Although I think
@@ -47,24 +43,6 @@ impl ClientKeys {
}
}
pub fn from_master_key<R>(
rng: &mut R,
derivation_material: &DerivationMaterial,
) -> Result<Self, InvalidLength>
where
R: RngCore + CryptoRng,
{
let secret = derivation_material.derive_secret()?;
Ok(ClientKeys {
identity_keypair: Arc::new(identity::KeyPair::from_secret(
secret,
derivation_material.index(),
)),
encryption_keypair: Arc::new(encryption::KeyPair::new(rng)),
ack_key: Arc::new(AckKey::new(rng)),
})
}
pub fn from_keys(
id_keypair: identity::KeyPair,
enc_keypair: encryption::KeyPair,
@@ -3,9 +3,7 @@
use crate::client::key_manager::ClientKeys;
use async_trait::async_trait;
use rand::{CryptoRng, RngCore};
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Mutex;
#[cfg(not(target_arch = "wasm32"))]
@@ -66,7 +64,6 @@ pub enum OnDiskKeysError {
},
}
#[derive(Clone)]
#[cfg(not(target_arch = "wasm32"))]
pub struct OnDiskKeys {
paths: ClientKeysPaths,
@@ -196,20 +193,9 @@ impl KeyStore for OnDiskKeys {
}
}
#[derive(Clone)]
#[derive(Default)]
pub struct InMemEphemeralKeys {
keys: Arc<Mutex<ClientKeys>>,
}
impl InMemEphemeralKeys {
pub fn new<R>(rng: &mut R) -> Self
where
R: RngCore + CryptoRng,
{
InMemEphemeralKeys {
keys: Arc::new(Mutex::new(ClientKeys::generate_new(rng))),
}
}
keys: Mutex<Option<ClientKeys>>,
}
#[derive(Debug, thiserror::Error)]
@@ -222,11 +208,11 @@ impl KeyStore for InMemEphemeralKeys {
type StorageError = EphemeralKeysError;
async fn load_keys(&self) -> Result<ClientKeys, Self::StorageError> {
Ok(self.keys.lock().await.clone())
self.keys.lock().await.clone().ok_or(EphemeralKeysError)
}
async fn store_keys(&self, keys: &ClientKeys) -> Result<(), Self::StorageError> {
*self.keys.lock().await = keys.clone();
*self.keys.lock().await = Some(keys.clone());
Ok(())
}
}
@@ -1,89 +0,0 @@
#[cfg(test)]
mod tests {
use crate::client::key_manager::ClientKeys;
use nym_crypto::hkdf::DerivationMaterial;
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
#[test]
fn test_from_master_key_success() {
// Set up a deterministic RNG.
let seed = [33u8; 32];
let mut rng = ChaCha20Rng::from_seed(seed);
// Set up the derivation material.
let master_key = b"this is a secret master key";
let salt = b"unique-salt";
let derivation_material = DerivationMaterial::new(master_key, 0, salt);
// Generate ClientKeys from the master key.
let client_keys = ClientKeys::from_master_key(&mut rng, &derivation_material)
.expect("Failed to create client keys");
assert_eq!(
client_keys.identity_keypair().public_key().to_string(),
String::from("FX4Undr5LPPBA7zThWWpAKXKQTXSbW1C28PnxbCqUkU4")
);
assert_eq!(
client_keys.identity_keypair().private_key().to_string(),
String::from("6S3uMi2rU5SwyUUYCiMrF5qqdcYnEDMYLggBSvavVzEt")
);
}
#[test]
fn test_from_master_key_deterministic_identity() {
// Using identical derivation material should result in the exactly same identity keypair.
let seed = [1u8; 32];
let mut rng1 = ChaCha20Rng::from_seed(seed);
let mut rng2 = ChaCha20Rng::from_seed(seed);
let master_key = b"another secret master key";
let salt = b"deterministic-salt";
let index = 7u32;
let derivation_material = DerivationMaterial::new(master_key, index, salt);
let client_keys1 = ClientKeys::from_master_key(&mut rng1, &derivation_material)
.expect("Failed to create client keys (first instance)");
let client_keys2 = ClientKeys::from_master_key(&mut rng2, &derivation_material)
.expect("Failed to create client keys (second instance)");
assert_eq!(
client_keys1.identity_keypair().public_key().to_string(),
client_keys2.identity_keypair().public_key().to_string()
);
assert_eq!(
client_keys1.identity_keypair().private_key().to_string(),
client_keys2.identity_keypair().private_key().to_string()
);
}
#[test]
fn test_from_master_key_different_indices() {
// Changing the index should yield a different identity key.
let seed = [5u8; 32];
let mut rng = ChaCha20Rng::from_seed(seed);
let master_key = b"same secret key";
let salt = b"same-salt";
let derivation_material1 = DerivationMaterial::new(master_key, 1, salt);
let derivation_material2 = DerivationMaterial::new(master_key, 2, salt);
let client_keys1 = ClientKeys::from_master_key(&mut rng, &derivation_material1)
.expect("Failed to create client keys for index 1");
let client_keys2 = ClientKeys::from_master_key(&mut rng, &derivation_material2)
.expect("Failed to create client keys for index 2");
assert_ne!(
client_keys1.identity_keypair().public_key().to_string(),
client_keys2.identity_keypair().public_key().to_string()
);
assert_ne!(
client_keys1.identity_keypair().private_key().to_string(),
client_keys2.identity_keypair().private_key().to_string()
);
}
}
@@ -2,18 +2,12 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::error::ClientCoreError;
use crate::spawn_future;
use log::*;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use transceiver::ErasedGatewayError;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
pub type ClientRequestReceiver = tokio::sync::mpsc::Receiver<ClientRequest>;
pub type ClientRequestSender = tokio::sync::mpsc::Sender<ClientRequest>;
pub mod transceiver;
@@ -28,73 +22,45 @@ pub struct MixTrafficController {
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
mix_rx: BatchMixMessageReceiver,
client_rx: ClientRequestReceiver,
// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
consecutive_gateway_failure_count: usize,
task_client: TaskClient,
}
impl MixTrafficController {
pub fn new<T>(
gateway_transceiver: T,
task_client: TaskClient,
) -> (
MixTrafficController,
BatchMixMessageSender,
ClientRequestSender,
)
pub fn new<T>(gateway_transceiver: T) -> (MixTrafficController, BatchMixMessageSender)
where
T: GatewayTransceiver + Send + 'static,
{
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(1);
(
MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_rx: message_receiver,
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
task_client,
},
message_sender,
client_sender,
)
}
pub fn new_dynamic(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
task_client: TaskClient,
) -> (
MixTrafficController,
BatchMixMessageSender,
ClientRequestSender,
) {
) -> (MixTrafficController, BatchMixMessageSender) {
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(1);
(
MixTrafficController {
gateway_transceiver,
mix_rx: message_receiver,
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
task_client,
},
message_sender,
client_sender,
)
}
async fn on_messages(
&mut self,
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
debug_assert!(!mix_packets.is_empty());
let result = if mix_packets.len() == 1 {
@@ -106,62 +72,46 @@ impl MixTrafficController {
.await
};
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
match result {
Err(err) => {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
self.consecutive_gateway_failure_count += 1;
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// todo: in the future this should initiate a 'graceful' shutdown or try
// to reconnect?
panic!("failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead. Can't do anything about it yet :(")
}
}
Ok(_) => {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
}
result
}
pub fn start(mut self) {
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
self.on_messages(mix_packets).await;
},
None => {
log::trace!("MixTrafficController: Stopping since channel closed");
break;
}
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
match self.gateway_transceiver.send_client_request(client_request).await {
Ok(_) => (),
Err(e) => error!("Failed to send client request: {}", e),
};
},
None => {
log::trace!("MixTrafficController, client request channel closed");
}
},
_ = self.task_client.recv() => {
_ = shutdown.recv_with_delay() => {
log::trace!("MixTrafficController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;
shutdown.recv_timeout().await;
log::debug!("MixTrafficController: Exiting");
});
})
}
}
@@ -5,10 +5,8 @@ use async_trait::async_trait;
use log::{debug, error};
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::identity;
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::GatewayClient;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
@@ -28,14 +26,9 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
}
/// This combines combines the functionalities of being able to send and receive mix packets.
#[async_trait]
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> identity::PublicKey;
fn ws_fd(&self) -> Option<RawFd>;
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError>;
}
/// This trait defines the functionality of sending `MixPacket` into the mixnet,
@@ -72,7 +65,6 @@ pub trait GatewayReceiver {
}
// to allow for dynamic dispatch
#[async_trait]
impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
#[inline]
fn gateway_identity(&self) -> identity::PublicKey {
@@ -81,15 +73,6 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
fn ws_fd(&self) -> Option<RawFd> {
(**self).ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
let _ = (**self).send_client_request(message.clone()).await?;
log::debug!("Sent client request: {:?}", message);
Ok(())
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -108,6 +91,7 @@ impl<G: GatewaySender + ?Sized + Send> GatewaySender for Box<G> {
(**self).batch_send_mix_packets(packets).await
}
}
impl<G: GatewayReceiver + ?Sized> GatewayReceiver for Box<G> {
#[inline]
fn set_packet_router(&mut self, packet_router: PacketRouter) -> Result<(), ErasedGatewayError> {
@@ -127,7 +111,6 @@ impl<C, St> RemoteGateway<C, St> {
}
}
#[async_trait]
impl<C, St> GatewayTransceiver for RemoteGateway<C, St>
where
C: DkgQueryClient + Send + Sync,
@@ -140,13 +123,6 @@ where
fn ws_fd(&self) -> Option<RawFd> {
self.gateway_client.ws_fd()
}
async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
self.gateway_client.send_client_request(message).await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -219,7 +195,6 @@ impl LocalGateway {
mod nonwasm_sealed {
use super::*;
#[async_trait]
impl GatewayTransceiver for LocalGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.local_identity
@@ -227,13 +202,6 @@ mod nonwasm_sealed {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
#[async_trait]
@@ -301,7 +269,6 @@ impl GatewaySender for MockGateway {
}
}
#[async_trait]
impl GatewayTransceiver for MockGateway {
fn gateway_identity(&self) -> identity::PublicKey {
self.dummy_identity
@@ -309,11 +276,4 @@ impl GatewayTransceiver for MockGateway {
fn ws_fd(&self) -> Option<RawFd> {
None
}
async fn send_client_request(
&mut self,
_message: ClientRequest,
) -> Result<(), GatewayClientError> {
Ok(())
}
}
@@ -11,7 +11,6 @@ use nym_sphinx::{
acknowledgements::{identifier::recover_identifier, AckKey},
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
};
use nym_task::TaskClient;
use std::sync::Arc;
/// Module responsible for listening for any data resembling acknowledgements from the network
@@ -21,7 +20,6 @@ pub(super) struct AcknowledgementListener {
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
impl AcknowledgementListener {
@@ -30,14 +28,12 @@ impl AcknowledgementListener {
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
AcknowledgementListener {
ack_key,
ack_receiver,
action_sender,
stats_tx,
task_client,
}
}
@@ -68,14 +64,9 @@ impl AcknowledgementListener {
trace!("Received {} from the mix network", frag_id);
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
if let Err(err) = self
.action_sender
self.action_sender
.unbounded_send(Action::new_remove(frag_id))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send remove action to action controller: {err}");
}
}
.unwrap();
}
async fn handle_ack_receiver_item(&mut self, item: Vec<Vec<u8>>) {
@@ -85,10 +76,10 @@ impl AcknowledgementListener {
}
}
pub(super) async fn run(&mut self) {
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
debug!("Started AcknowledgementListener with graceful shutdown support");
while !self.task_client.is_shutdown() {
while !shutdown.is_shutdown() {
tokio::select! {
acks = self.ack_receiver.next() => match acks {
Some(acks) => self.handle_ack_receiver_item(acks).await,
@@ -97,12 +88,12 @@ impl AcknowledgementListener {
break;
}
},
_ = self.task_client.recv() => {
_ = shutdown.recv_with_delay() => {
log::trace!("AcknowledgementListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
shutdown.recv_timeout().await;
log::debug!("AcknowledgementListener: Exiting");
}
}
@@ -9,7 +9,6 @@ use log::*;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::Delay as SphinxDelay;
use nym_task::TaskClient;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
@@ -102,8 +101,6 @@ pub(super) struct ActionController {
/// Channel for notifying `RetransmissionRequestListener` about expired acknowledgements.
retransmission_sender: RetransmissionRequestSender,
task_client: TaskClient,
}
impl ActionController {
@@ -111,7 +108,6 @@ impl ActionController {
config: Config,
retransmission_sender: RetransmissionRequestSender,
incoming_actions: AckActionReceiver,
task_client: TaskClient,
) -> Self {
ActionController {
config,
@@ -119,7 +115,6 @@ impl ActionController {
pending_acks_timers: NonExhaustiveDelayQueue::new(),
incoming_actions,
retransmission_sender,
task_client,
}
}
@@ -221,7 +216,11 @@ impl ActionController {
}
// note: when the entry expires it's automatically removed from pending_acks_timers
fn handle_expired_ack_timer(&mut self, expired_ack: Expired<FragmentIdentifier>) {
fn handle_expired_ack_timer(
&mut self,
expired_ack: Expired<FragmentIdentifier>,
task_client: &mut nym_task::TaskClient,
) {
// I'm honestly not sure how to handle it, because getting it means other things in our
// system are already misbehaving. If we ever see this panic, then I guess we should worry
// about it. Perhaps just reschedule it at later point?
@@ -239,13 +238,15 @@ impl ActionController {
// downgrading an arc and then upgrading vs cloning is difference of 30ns vs 15ns
// so it's literally a NO difference while it might prevent us from unnecessarily
// resending data (in maybe 1 in 1 million cases, but it's something)
if let Err(err) = self
if self
.retransmission_sender
.unbounded_send(Arc::downgrade(pending_ack_data))
.is_err()
{
if !self.task_client.is_shutdown_poll() {
log::error!("Failed to send pending ack for retransmission: {err}");
}
assert!(
task_client.is_shutdown_poll(),
"Failed to send pending ack for retransmission"
);
}
} else {
// this shouldn't cause any issues but shouldn't have happened to begin with!
@@ -264,10 +265,10 @@ impl ActionController {
}
}
pub(super) async fn run(&mut self) {
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
debug!("Started ActionController with graceful shutdown support");
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
action = self.incoming_actions.next() => match action {
Some(action) => self.process_action(action),
@@ -279,19 +280,19 @@ impl ActionController {
}
},
expired_ack = self.pending_acks_timers.next() => match expired_ack {
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack),
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack, &mut shutdown),
None => {
log::trace!("ActionController: Stopping since ack channel closed");
break;
}
},
_ = self.task_client.recv() => {
_ = shutdown.recv_with_delay() => {
log::trace!("ActionController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;
shutdown.recv_timeout().await;
log::debug!("ActionController: Exiting");
}
}
@@ -11,7 +11,6 @@ use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
@@ -24,7 +23,6 @@ where
input_receiver: InputMessageReceiver,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
}
impl<R> InputMessageListener<R>
@@ -38,13 +36,11 @@ where
input_receiver: InputMessageReceiver,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
) -> Self {
InputMessageListener {
input_receiver,
message_handler,
reply_controller_sender,
task_client,
}
}
@@ -67,14 +63,8 @@ where
lane: TransmissionLane,
) {
// offload reply handling to the dedicated task
if let Err(err) = self
.reply_controller_sender
self.reply_controller_sender
.send_reply(recipient_tag, data, lane)
{
if !self.task_client.is_shutdown_poll() {
error!("failed to send a reply - {err}");
}
}
}
async fn handle_plain_message(
@@ -83,10 +73,11 @@ where
content: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_plain_message(recipient, content, lane, packet_type)
.try_send_plain_message(recipient, content, lane, packet_type, mix_hops)
.await
{
warn!("failed to send a plain message - {err}")
@@ -100,10 +91,18 @@ where
reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) {
if let Err(err) = self
.message_handler
.try_send_message_with_reply_surbs(recipient, content, reply_surbs, lane, packet_type)
.try_send_message_with_reply_surbs(
recipient,
content,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.await
{
warn!("failed to send a repliable message - {err}")
@@ -116,8 +115,9 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, PacketType::Mix)
self.handle_plain_message(recipient, data, lane, PacketType::Mix, mix_hops)
.await
}
InputMessage::Anonymous {
@@ -125,9 +125,17 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(recipient, data, reply_surbs, lane, PacketType::Mix)
.await
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
PacketType::Mix,
mix_hops,
)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -145,8 +153,9 @@ where
recipient,
data,
lane,
mix_hops,
} => {
self.handle_plain_message(recipient, data, lane, packet_type)
self.handle_plain_message(recipient, data, lane, packet_type, mix_hops)
.await
}
InputMessage::Anonymous {
@@ -154,9 +163,17 @@ where
data,
reply_surbs,
lane,
mix_hops,
} => {
self.handle_repliable_message(recipient, data, reply_surbs, lane, packet_type)
.await
self.handle_repliable_message(
recipient,
data,
reply_surbs,
lane,
packet_type,
mix_hops,
)
.await
}
InputMessage::Reply {
recipient_tag,
@@ -174,10 +191,10 @@ where
};
}
pub(super) async fn run(&mut self) {
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
debug!("Started InputMessageListener with graceful shutdown support");
while !self.task_client.is_shutdown() {
while !shutdown.is_shutdown() {
tokio::select! {
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
@@ -188,12 +205,12 @@ where
break;
}
},
_ = self.task_client.recv() => {
_ = shutdown.recv_with_delay() => {
log::trace!("InputMessageListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
shutdown.recv_timeout().await;
log::debug!("InputMessageListener: Exiting");
}
}
@@ -24,7 +24,6 @@ use nym_sphinx::{
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use std::{
sync::{Arc, Weak},
@@ -67,10 +66,11 @@ pub(crate) enum PacketDestination {
/// Structure representing a data `Fragment` that is on-route to the specified `Recipient`
#[derive(Debug)]
pub struct PendingAcknowledgement {
pub(crate) struct PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
destination: PacketDestination,
mix_hops: Option<u8>,
retransmissions: u32,
}
@@ -80,11 +80,13 @@ impl PendingAcknowledgement {
message_chunk: Fragment,
delay: SphinxDelay,
recipient: Recipient,
mix_hops: Option<u8>,
) -> Self {
PendingAcknowledgement {
message_chunk,
delay,
destination: PacketDestination::KnownRecipient(recipient.into()),
mix_hops,
retransmissions: 0,
}
}
@@ -102,6 +104,9 @@ impl PendingAcknowledgement {
recipient_tag,
extra_surb_request,
},
// Messages sent using SURBs are using the number of mix hops set by the recipient when
// they provided the SURBs, so it doesn't make sense to include it here.
mix_hops: None,
retransmissions: 0,
}
}
@@ -217,7 +222,6 @@ where
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();
@@ -227,7 +231,6 @@ where
action_config,
retransmission_tx,
connectors.ack_action_receiver,
task_client.fork("action_controller"),
);
// will listen for any acks coming from the network
@@ -236,7 +239,6 @@ where
connectors.ack_receiver,
connectors.ack_action_sender.clone(),
stats_tx,
task_client.fork("acknowledgement_listener"),
);
// will listen for any new messages from the client
@@ -244,7 +246,6 @@ where
connectors.input_receiver,
message_handler.clone(),
reply_controller_sender.clone(),
task_client.fork("input_message_listener"),
);
// will listen for any ack timeouts and trigger retransmission
@@ -254,16 +255,12 @@ where
message_handler,
retransmission_rx,
reply_controller_sender,
task_client.fork("retransmission_request_listener"),
);
// will listen for events indicating the packet was sent through the network so that
// the retransmission timer should be started.
let sent_notification_listener = SentNotificationListener::new(
connectors.sent_notifier,
connectors.ack_action_sender,
task_client.with_suffix("sent_notification_listener"),
);
let sent_notification_listener =
SentNotificationListener::new(connectors.sent_notifier, connectors.ack_action_sender);
AcknowledgementController {
acknowledgement_listener,
@@ -274,35 +271,53 @@ where
}
}
pub(super) fn start(self, packet_type: PacketType) {
pub(super) fn start_with_shutdown(
self,
shutdown: nym_task::TaskClient,
packet_type: PacketType,
) {
let mut acknowledgement_listener = self.acknowledgement_listener;
let mut input_message_listener = self.input_message_listener;
let mut retransmission_request_listener = self.retransmission_request_listener;
let mut sent_notification_listener = self.sent_notification_listener;
let mut action_controller = self.action_controller;
let shutdown_handle = shutdown.fork("acknowledgement_listener");
spawn_future(async move {
acknowledgement_listener.run().await;
acknowledgement_listener
.run_with_shutdown(shutdown_handle)
.await;
debug!("The acknowledgement listener has finished execution!");
});
let shutdown_handle = shutdown.fork("input_message_listener");
spawn_future(async move {
input_message_listener.run().await;
input_message_listener
.run_with_shutdown(shutdown_handle)
.await;
debug!("The input listener has finished execution!");
});
let shutdown_handle = shutdown.fork("retransmission_request_listener");
spawn_future(async move {
retransmission_request_listener.run(packet_type).await;
retransmission_request_listener
.run_with_shutdown(shutdown_handle, packet_type)
.await;
debug!("The retransmission request listener has finished execution!");
});
let shutdown_handle = shutdown.fork("sent_notification_listener");
spawn_future(async move {
sent_notification_listener.run().await;
sent_notification_listener
.run_with_shutdown(shutdown_handle)
.await;
debug!("The sent notification listener has finished execution!");
});
spawn_future(async move {
action_controller.run().await;
action_controller
.run_with_shutdown(shutdown.with_suffix("action_controller"))
.await;
debug!("The controller has finished execution!");
});
}
@@ -14,7 +14,7 @@ use log::*;
use nym_sphinx::chunking::fragment::Fragment;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
use nym_task::{connections::TransmissionLane, TaskClient};
use nym_task::connections::TransmissionLane;
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
@@ -25,7 +25,6 @@ pub(super) struct RetransmissionRequestListener<R> {
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
}
impl<R> RetransmissionRequestListener<R>
@@ -38,7 +37,6 @@ where
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
) -> Self {
RetransmissionRequestListener {
maximum_retransmissions,
@@ -46,7 +44,6 @@ where
message_handler,
request_receiver,
reply_controller_sender,
task_client,
}
}
@@ -55,12 +52,18 @@ where
packet_recipient: Recipient,
chunk_data: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("retransmitting normal packet...");
// TODO: Figure out retransmission packet type signaling
self.message_handler
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type)
.try_prepare_single_chunk_for_sending(
packet_recipient,
chunk_data,
packet_type,
mix_hops,
)
.await
}
@@ -82,12 +85,9 @@ where
if let Some(limit) = self.maximum_retransmissions {
if timed_out_ack.retransmissions >= limit {
warn!("reached maximum number of allowed retransmissions for the packet");
if let Err(err) = self
.action_sender
self.action_sender
.unbounded_send(Action::new_remove(frag_id))
{
error!("Failed to send remove action to the controller: {err}");
}
.unwrap();
return;
}
}
@@ -99,22 +99,18 @@ where
} => {
// if this is retransmission for reply, offload it to the dedicated task
// that deals with all the surbs
if let Err(err) = self.reply_controller_sender.send_retransmission_data(
return self.reply_controller_sender.send_retransmission_data(
*recipient_tag,
weak_timed_out_ack,
*extra_surb_request,
) {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send retransmission data to the reply controller: {err}");
}
}
return;
);
}
PacketDestination::KnownRecipient(recipient) => {
self.prepare_normal_retransmission_chunk(
**recipient,
timed_out_ack.message_chunk.clone(),
packet_type,
timed_out_ack.mix_hops,
)
.await
}
@@ -125,12 +121,9 @@ where
Err(err) => {
warn!("Could not retransmit the packet - {err}");
// we NEED to start timer here otherwise we will have this guy permanently stuck in memory
if let Err(err) = self
.action_sender
self.action_sender
.unbounded_send(Action::new_start_timer(frag_id))
{
error!("Failed to send start timer action to the controller: {err}");
}
.unwrap();
return;
}
};
@@ -155,14 +148,9 @@ where
// is sent to the `OutQueueControl` and has gone through its internal queue
// with the additional poisson delay.
// And since Actions are executed in order `UpdateTimer` will HAVE TO be executed before `StartTimer`
if let Err(err) = self
.action_sender
self.action_sender
.unbounded_send(Action::new_update_pending_ack(frag_id, new_delay))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send update pending ack action to the controller: {err}");
}
}
.unwrap();
// send to `OutQueueControl` to eventually send to the mix network
self.message_handler
@@ -176,10 +164,14 @@ where
.await
}
pub(super) async fn run(&mut self, packet_type: PacketType) {
pub(super) async fn run_with_shutdown(
&mut self,
mut shutdown: nym_task::TaskClient,
packet_type: PacketType,
) {
debug!("Started RetransmissionRequestListener with graceful shutdown support");
while !self.task_client.is_shutdown() {
while !shutdown.is_shutdown() {
tokio::select! {
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
@@ -188,12 +180,12 @@ where
break;
}
},
_ = self.task_client.recv() => {
_ = shutdown.recv() => {
log::trace!("RetransmissionRequestListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
shutdown.recv_timeout().await;
log::debug!("RetransmissionRequestListener: Exiting");
}
}
@@ -6,7 +6,6 @@ use super::SentPacketNotificationReceiver;
use futures::StreamExt;
use log::*;
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use nym_task::TaskClient;
/// Module responsible for starting up retransmission timers.
/// It is required because when we send our packet to the `real traffic stream` controlled
@@ -15,19 +14,16 @@ use nym_task::TaskClient;
pub(super) struct SentNotificationListener {
sent_notifier: SentPacketNotificationReceiver,
action_sender: AckActionSender,
task_client: TaskClient,
}
impl SentNotificationListener {
pub(super) fn new(
sent_notifier: SentPacketNotificationReceiver,
action_sender: AckActionSender,
task_client: TaskClient,
) -> Self {
SentNotificationListener {
sent_notifier,
action_sender,
task_client,
}
}
@@ -36,20 +32,15 @@ impl SentNotificationListener {
trace!("sent off a cover message - no need to start retransmission timer!");
return;
}
if let Err(err) = self
.action_sender
self.action_sender
.unbounded_send(Action::new_start_timer(frag_id))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send start timer action to action controller: {err}");
}
}
.unwrap();
}
pub(super) async fn run(&mut self) {
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
debug!("Started SentNotificationListener with graceful shutdown support");
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
frag_id = self.sent_notifier.next() => match frag_id {
Some(frag_id) => {
@@ -60,13 +51,13 @@ impl SentNotificationListener {
break;
}
},
_ = self.task_client.recv() => {
_ = shutdown.recv_with_delay() => {
log::trace!("SentNotificationListener: Received shutdown");
break;
}
}
}
assert!(self.task_client.is_shutdown_poll());
assert!(shutdown.is_shutdown_poll());
log::debug!("SentNotificationListener: Exiting");
}
}
@@ -15,12 +15,11 @@ use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessa
use nym_sphinx::anonymous_replies::{ReplySurb, SurbEncryptionKey};
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::params::{PacketSize, PacketType, DEFAULT_NUM_MIX_HOPS};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_task::TaskClient;
use nym_topology::{NymRouteProvider, NymTopologyError};
use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
use std::sync::Arc;
@@ -101,6 +100,10 @@ pub(crate) struct Config {
/// Average delay an acknowledgement packet is going to get delay at a single mixnode.
average_ack_delay: Duration,
/// Number of mix hops each packet ('real' message, ack, reply) is expected to take.
/// Note that it does not include gateway hops.
num_mix_hops: u8,
/// Primary predefined packet size used for the encapsulated messages.
primary_packet_size: PacketSize,
@@ -122,11 +125,19 @@ impl Config {
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
primary_packet_size: PacketSize::default(),
secondary_packet_size: None,
}
}
/// Allows setting non-default number of expected mix hops in the network.
#[allow(dead_code)]
pub fn with_mix_hops(mut self, hops: u8) -> Self {
self.num_mix_hops = hops;
self
}
/// Allows setting non-default size of the sphinx packets sent out.
pub fn with_custom_primary_packet_size(mut self, packet_size: PacketSize) -> Self {
self.primary_packet_size = packet_size;
@@ -150,14 +161,12 @@ pub(crate) struct MessageHandler<R> {
topology_access: TopologyAccessor,
reply_key_storage: SentReplyKeys,
tag_storage: UsedSenderTags,
task_client: TaskClient,
}
impl<R> MessageHandler<R>
where
R: CryptoRng + Rng,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
config: Config,
rng: R,
@@ -166,7 +175,6 @@ where
topology_access: TopologyAccessor,
reply_key_storage: SentReplyKeys,
tag_storage: UsedSenderTags,
task_client: TaskClient,
) -> Self
where
R: Copy,
@@ -177,7 +185,9 @@ where
config.sender_address,
config.average_packet_delay,
config.average_ack_delay,
);
)
.with_mix_hops(config.num_mix_hops);
MessageHandler {
config,
rng,
@@ -187,7 +197,6 @@ where
topology_access,
reply_key_storage,
tag_storage,
task_client,
}
}
@@ -207,7 +216,7 @@ where
fn get_topology<'a>(
&self,
permit: &'a TopologyReadPermit<'a>,
) -> Result<&'a NymRouteProvider, PreparationError> {
) -> Result<&'a NymTopology, PreparationError> {
match permit.try_get_valid_topology_ref(&self.config.sender_address, None) {
Ok(topology_ref) => Ok(topology_ref),
Err(err) => {
@@ -224,8 +233,9 @@ where
return self.config.primary_packet_size;
};
let primary_count = msg.required_packets(self.config.primary_packet_size);
let secondary_count = msg.required_packets(secondary_packet);
let primary_count =
msg.required_packets(self.config.primary_packet_size, self.config.num_mix_hops);
let secondary_count = msg.required_packets(secondary_packet, self.config.num_mix_hops);
trace!("This message would require: {primary_count} primary packets or {secondary_count} secondary packets...");
// if there would be no benefit in using the secondary packet - use the primary (duh)
@@ -414,9 +424,10 @@ where
message: Vec<u8>,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
let message = NymMessage::new_plain(message);
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
.await
}
@@ -426,6 +437,7 @@ where
recipient: Recipient,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending non-reply message with packet type {packet_type}");
// TODO: I really dislike existence of this assertion, it implies code has to be re-organised
@@ -458,6 +470,7 @@ where
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)?;
let real_message = RealMessage::new(
@@ -465,7 +478,8 @@ where
Some(fragment.fragment_identifier()),
);
let delay = prepared_fragment.total_delay;
let pending_ack = PendingAcknowledgement::new_known(fragment, delay, recipient);
let pending_ack =
PendingAcknowledgement::new_known(fragment, delay, recipient, mix_hops);
real_messages.push(real_message);
pending_acks.push(pending_ack);
@@ -482,6 +496,7 @@ where
recipient: Recipient,
amount: u32,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), PreparationError> {
debug!("Sending additional reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -498,6 +513,7 @@ where
recipient,
TransmissionLane::AdditionalReplySurbs,
packet_type,
mix_hops,
)
.await?;
@@ -514,6 +530,7 @@ where
num_reply_surbs: u32,
lane: TransmissionLane,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<(), SurbWrappedPreparationError> {
debug!("Sending message with reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -524,7 +541,7 @@ where
let message =
NymMessage::new_repliable(RepliableMessage::new_data(message, sender_tag, reply_surbs));
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type)
self.try_split_and_send_non_reply_message(message, recipient, lane, packet_type, mix_hops)
.await?;
log::trace!("storing {} reply keys", reply_keys.len());
@@ -538,18 +555,23 @@ where
recipient: Recipient,
chunk: Fragment,
packet_type: PacketType,
mix_hops: Option<u8>,
) -> Result<PreparedFragment, PreparationError> {
debug!("Sending single chunk with packet type {packet_type}");
let topology_permit = self.topology_access.get_read_permit().await;
let topology = self.get_topology(&topology_permit)?;
let prepared_fragment = self.message_preparer.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
)?;
let prepared_fragment = self
.message_preparer
.prepare_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
&recipient,
packet_type,
mix_hops,
)
.unwrap();
Ok(prepared_fragment)
}
@@ -602,37 +624,30 @@ where
Err(err) => return Err(err.return_surbs(vec![reply_surb])),
};
let prepared_fragment = self.message_preparer.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)?;
let prepared_fragment = self
.message_preparer
.prepare_reply_chunk_for_sending(
chunk,
topology,
&self.config.ack_key,
reply_surb,
PacketType::Mix,
)
.unwrap();
Ok(prepared_fragment)
}
pub(crate) fn update_ack_delay(&self, id: FragmentIdentifier, new_delay: Delay) {
if let Err(err) = self
.action_sender
self.action_sender
.unbounded_send(Action::UpdatePendingAck(id, new_delay))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send update action to the controller: {err}");
}
}
.expect("action control task has died")
}
pub(crate) fn insert_pending_acks(&self, pending_acks: Vec<PendingAcknowledgement>) {
if let Err(err) = self
.action_sender
self.action_sender
.unbounded_send(Action::new_insert(pending_acks))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send insert action to the controller: {err}");
}
}
.expect("action control task has died")
}
// tells real message sender (with the poisson timer) to send this to the mix network
@@ -641,14 +656,9 @@ where
messages: Vec<RealMessage>,
transmission_lane: TransmissionLane,
) {
if let Err(err) = self
.real_message_sender
self.real_message_sender
.send((messages, transmission_lane))
.await
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to forward messages to the real message sender: {err}");
}
}
.expect("real message receiver task (OutQueueControl) has died");
}
}
@@ -9,12 +9,10 @@ use self::{
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
};
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller;
use crate::client::replies::reply_controller::{
ReplyController, ReplyControllerReceiver, ReplyControllerSender,
};
use crate::client::replies::reply_storage::CombinedReplyStorage;
use crate::config;
use crate::{
client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
@@ -29,14 +27,16 @@ use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::PacketType;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_task::TaskClient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
use crate::client::replies::reply_controller;
use crate::config;
pub(crate) use acknowledgement_control::{AckActionSender, Action};
use nym_statistics_common::clients::ClientStatsSender;
pub(crate) mod acknowledgement_control;
pub(crate) mod message_handler;
pub(crate) mod real_traffic_stream;
@@ -148,7 +148,6 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let rng = OsRng;
@@ -179,7 +178,6 @@ impl RealMessagesController<OsRng> {
topology_access.clone(),
reply_storage.key_storage(),
reply_storage.tags_storage(),
task_client.fork("message_handler"),
);
let ack_control = AcknowledgementController::new(
@@ -189,7 +187,6 @@ impl RealMessagesController<OsRng> {
message_handler.clone(),
reply_controller_sender,
stats_tx.clone(),
task_client.fork("ack_control"),
);
let reply_control = ReplyController::new(
@@ -197,7 +194,6 @@ impl RealMessagesController<OsRng> {
message_handler,
reply_storage,
reply_controller_receiver,
task_client.fork("reply_controller"),
);
let out_queue_control = OutQueueControl::new(
@@ -210,7 +206,6 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths,
client_connection_rx,
stats_tx,
task_client.with_suffix("out_queue_control"),
);
RealMessagesController {
@@ -220,20 +215,22 @@ impl RealMessagesController<OsRng> {
}
}
pub fn start(self, packet_type: PacketType) {
pub fn start_with_shutdown(self, shutdown: nym_task::TaskClient, packet_type: PacketType) {
let mut out_queue_control = self.out_queue_control;
let ack_control = self.ack_control;
let mut reply_control = self.reply_control;
let shutdown_handle = shutdown.fork("out_queue_control");
spawn_future(async move {
out_queue_control.run().await;
out_queue_control.run_with_shutdown(shutdown_handle).await;
debug!("The out queue controller has finished execution!");
});
let shutdown_handle = shutdown.fork("reply_control");
spawn_future(async move {
reply_control.run().await;
reply_control.run_with_shutdown(shutdown_handle).await;
debug!("The reply controller has finished execution!");
});
ack_control.start(packet_type);
ack_control.start_with_shutdown(shutdown.with_suffix("ack_control"), packet_type);
}
}
@@ -22,7 +22,6 @@ use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, C
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -118,8 +117,6 @@ where
/// Channel used for sending metrics events (specifically `PacketStatistics` events) to the metrics tracker.
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
#[derive(Debug)]
@@ -179,7 +176,6 @@ where
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
OutQueueControl {
config,
@@ -194,7 +190,6 @@ where
client_connection_rx,
lane_queue_lengths,
stats_tx,
task_client,
}
}
@@ -203,9 +198,7 @@ where
// queues and client load rather than the required delay. So realistically we can treat
// whatever is about to happen as negligible additional delay.
trace!("{} is about to get sent to the mixnet", frag_id);
if let Err(err) = self.sent_notifier.unbounded_send(frag_id) {
error!("Failed to notify about sent message: {err}");
}
self.sent_notifier.unbounded_send(frag_id).unwrap();
}
fn loop_cover_message_size(&mut self) -> PacketSize {
@@ -237,7 +230,6 @@ where
// poisson delay, but is it really a problem?
let topology_permit = self.topology_access.get_read_permit().await;
// the ack is sent back to ourselves (and then ignored)
let topology_ref = match topology_permit.try_get_valid_topology_ref(
&self.config.our_full_destination,
Some(&self.config.our_full_destination),
@@ -278,9 +270,7 @@ where
};
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
if !self.task_client.is_shutdown_poll() {
log::error!("Failed to send: {err}");
}
log::error!("Failed to send: {err}");
} else {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
@@ -513,7 +503,7 @@ where
}
#[cfg(not(target_arch = "wasm32"))]
fn log_status(&self, shutdown: &mut TaskClient) {
fn log_status(&self, shutdown: &mut nym_task::TaskClient) {
use crate::error::ClientCoreStatusMessage;
let packets = self.transmission_buffer.total_size();
@@ -544,19 +534,17 @@ where
}
}
pub(super) async fn run(&mut self) {
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
debug!("Started OutQueueControl with graceful shutdown support");
let mut shutdown = self.task_client.fork("select");
#[cfg(not(target_arch = "wasm32"))]
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
while !shutdown.is_shutdown() {
loop {
tokio::select! {
biased;
_ = shutdown.recv() => {
_ = shutdown.recv_with_delay() => {
log::trace!("OutQueueControl: Received shutdown");
break;
}
@@ -70,10 +70,7 @@ impl SendingDelayController {
lower_bound,
multiplier_elevated_counter: 0,
time_when_logged_about_elevated_multiplier: now
.checked_sub(Duration::from_secs(
INTERVAL_BETWEEN_WARNING_ABOUT_ELEVATED_MULTIPLIER_SECS,
))
.unwrap_or(now),
- Duration::from_secs(INTERVAL_BETWEEN_WARNING_ABOUT_ELEVATED_MULTIPLIER_SECS),
time_when_changed: now,
time_when_backpressure_detected: now,
}
@@ -20,7 +20,6 @@ use nym_sphinx::message::{NymMessage, PlainMessage};
use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::TaskClient;
use std::collections::HashSet;
use std::sync::Arc;
@@ -153,7 +152,6 @@ struct ReceivedMessagesBuffer<R: MessageReceiver> {
inner: Arc<Mutex<ReceivedMessagesBufferInner<R>>>,
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
}
impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
@@ -162,7 +160,6 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
ReceivedMessagesBuffer {
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
@@ -175,7 +172,6 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
})),
reply_key_storage,
reply_controller_sender,
task_client,
}
}
@@ -261,15 +257,11 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
};
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
self.reply_controller_sender.send_additional_surbs(
msg.sender_tag,
reply_surbs,
from_surb_request,
) {
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
}
)
}
reconstructed
}
@@ -284,14 +276,8 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
ReplyMessageContent::Data { message } => reconstructed.push(message.into()),
ReplyMessageContent::SurbRequest { recipient, amount } => {
debug!("received request for {amount} additional reply SURBs from {recipient}");
if let Err(err) = self
.reply_controller_sender
.send_additional_surbs_request(*recipient, amount)
{
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
}
self.reply_controller_sender
.send_additional_surbs_request(*recipient, amount);
}
}
}
@@ -413,19 +399,16 @@ pub enum ReceivedBufferMessage {
struct RequestReceiver<R: MessageReceiver> {
received_buffer: ReceivedMessagesBuffer<R>,
query_receiver: ReceivedBufferRequestReceiver,
task_client: TaskClient,
}
impl<R: MessageReceiver> RequestReceiver<R> {
fn new(
received_buffer: ReceivedMessagesBuffer<R>,
query_receiver: ReceivedBufferRequestReceiver,
task_client: TaskClient,
) -> Self {
RequestReceiver {
received_buffer,
query_receiver,
task_client,
}
}
@@ -440,12 +423,12 @@ impl<R: MessageReceiver> RequestReceiver<R> {
}
}
async fn run(&mut self) {
async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
debug!("Started RequestReceiver with graceful shutdown support");
while !self.task_client.is_shutdown() {
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
_ = shutdown.recv_with_delay() => {
log::trace!("RequestReceiver: Received shutdown");
}
request = self.query_receiver.next() => {
@@ -458,7 +441,7 @@ impl<R: MessageReceiver> RequestReceiver<R> {
},
}
}
self.task_client.recv().await;
shutdown.recv_timeout().await;
log::debug!("RequestReceiver: Exiting");
}
}
@@ -466,25 +449,25 @@ impl<R: MessageReceiver> RequestReceiver<R> {
struct FragmentedMessageReceiver<R: MessageReceiver> {
received_buffer: ReceivedMessagesBuffer<R>,
mixnet_packet_receiver: MixnetMessageReceiver,
task_client: TaskClient,
}
impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
fn new(
received_buffer: ReceivedMessagesBuffer<R>,
mixnet_packet_receiver: MixnetMessageReceiver,
task_client: TaskClient,
) -> Self {
FragmentedMessageReceiver {
received_buffer,
mixnet_packet_receiver,
task_client,
}
}
async fn run(&mut self) -> Result<(), MessageRecoveryError> {
async fn run_with_shutdown(
&mut self,
mut shutdown: nym_task::TaskClient,
) -> Result<(), MessageRecoveryError> {
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
while !self.task_client.is_shutdown() {
while !shutdown.is_shutdown() {
tokio::select! {
new_messages = self.mixnet_packet_receiver.next() => {
if let Some(new_messages) = new_messages {
@@ -494,12 +477,12 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
break;
}
},
_ = self.task_client.recv_with_delay() => {
_ = shutdown.recv_with_delay() => {
log::trace!("FragmentedMessageReceiver: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
shutdown.recv_timeout().await;
log::debug!("FragmentedMessageReceiver: Exiting");
Ok(())
}
@@ -518,42 +501,41 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
metrics_reporter: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let received_buffer = ReceivedMessagesBuffer::new(
local_encryption_keypair,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
task_client.fork("received_messages_buffer"),
);
ReceivedMessagesBufferController {
fragmented_message_receiver: FragmentedMessageReceiver::new(
received_buffer.clone(),
mixnet_packet_receiver,
task_client.fork("fragmented_message_receiver"),
),
request_receiver: RequestReceiver::new(
received_buffer,
query_receiver,
task_client.with_suffix("request_receiver"),
),
request_receiver: RequestReceiver::new(received_buffer, query_receiver),
}
}
pub fn start(self) {
pub fn start_with_shutdown(self, shutdown: nym_task::TaskClient) {
let mut fragmented_message_receiver = self.fragmented_message_receiver;
let mut request_receiver = self.request_receiver;
let shutdown_handle = shutdown.fork("fragmented_message_receiver");
spawn_future(async move {
match fragmented_message_receiver.run().await {
match fragmented_message_receiver
.run_with_shutdown(shutdown_handle)
.await
{
Ok(_) => {}
Err(e) => error!("{e}"),
}
});
spawn_future(async move {
request_receiver.run().await;
request_receiver
.run_with_shutdown(shutdown.with_suffix("request_receiver"))
.await;
});
}
}
@@ -12,7 +12,6 @@ use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurb;
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_task::connections::{ConnectionId, TransmissionLane};
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use std::cmp::{max, min};
use std::collections::btree_map::Entry;
@@ -69,9 +68,6 @@ pub struct ReplyController<R> {
message_handler: MessageHandler<R>,
full_reply_storage: CombinedReplyStorage,
// Listen for shutdown signals
task_client: TaskClient,
}
impl<R> ReplyController<R>
@@ -83,7 +79,6 @@ where
message_handler: MessageHandler<R>,
full_reply_storage: CombinedReplyStorage,
request_receiver: ReplyControllerReceiver,
task_client: TaskClient,
) -> Self {
ReplyController {
config,
@@ -92,7 +87,6 @@ where
pending_retransmissions: HashMap::new(),
message_handler,
full_reply_storage,
task_client,
}
}
@@ -522,6 +516,7 @@ where
recipient,
to_send,
nym_sphinx::params::PacketType::Mix,
self.config.reply_surbs.surb_mix_hops,
)
.await
{
@@ -852,11 +847,9 @@ where
// todo!()
// }
pub(crate) async fn run(&mut self) {
pub(crate) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
debug!("Started ReplyController with graceful shutdown support");
let mut shutdown = self.task_client.fork("select");
let polling_rate = Duration::from_secs(5);
let mut stale_inspection = new_interval_stream(polling_rate);
@@ -868,7 +861,7 @@ where
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
_ = shutdown.recv_with_delay() => {
log::trace!("ReplyController: Received shutdown");
},
req = self.request_receiver.next() => match req {
@@ -15,27 +15,6 @@ pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerR
(tx.into(), rx)
}
#[derive(Debug, thiserror::Error)]
pub enum ReplyControllerSenderError {
#[error("failed to send retransmission data to reply controller")]
SendRetransmissionData(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("failed to send reply to reply controller")]
SendReply(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("failed to send additional surbs to reply controller")]
AdditionalSurbs(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("failed to send additional surbs request to reply controller")]
AdditionalSurbsRequest(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("failed to request lane queue length from reply controller")]
LaneQueueLength(#[source] mpsc::TrySendError<ReplyControllerMessage>),
#[error("response channel was dropped before we could receive the response")]
ResponseChannelDropped(#[source] oneshot::Canceled),
}
#[derive(Debug, Clone)]
pub struct ReplyControllerSender(mpsc::UnboundedSender<ReplyControllerMessage>);
@@ -51,14 +30,14 @@ impl ReplyControllerSender {
recipient: AnonymousSenderTag,
timed_out_ack: Weak<PendingAcknowledgement>,
extra_surb_request: bool,
) -> Result<(), ReplyControllerSenderError> {
) {
self.0
.unbounded_send(ReplyControllerMessage::RetransmitReply {
recipient,
timed_out_ack,
extra_surb_request,
})
.map_err(ReplyControllerSenderError::SendRetransmissionData)
.expect("ReplyControllerReceiver has died!")
}
pub(crate) fn send_reply(
@@ -66,14 +45,14 @@ impl ReplyControllerSender {
recipient: AnonymousSenderTag,
message: Vec<u8>,
lane: TransmissionLane,
) -> Result<(), ReplyControllerSenderError> {
) {
self.0
.unbounded_send(ReplyControllerMessage::SendReply {
recipient,
message,
lane,
})
.map_err(ReplyControllerSenderError::SendReply)
.expect("ReplyControllerReceiver has died!")
}
pub(crate) fn send_additional_surbs(
@@ -81,47 +60,42 @@ impl ReplyControllerSender {
sender_tag: AnonymousSenderTag,
reply_surbs: Vec<ReplySurb>,
from_surb_request: bool,
) -> Result<(), ReplyControllerSenderError> {
) {
self.0
.unbounded_send(ReplyControllerMessage::AdditionalSurbs {
sender_tag,
reply_surbs,
from_surb_request,
})
.map_err(ReplyControllerSenderError::AdditionalSurbs)
.expect("ReplyControllerReceiver has died!")
}
pub(crate) fn send_additional_surbs_request(
&self,
recipient: Recipient,
amount: u32,
) -> Result<(), ReplyControllerSenderError> {
pub(crate) fn send_additional_surbs_request(&self, recipient: Recipient, amount: u32) {
self.0
.unbounded_send(ReplyControllerMessage::AdditionalSurbsRequest {
recipient: Box::new(recipient),
amount,
})
.map_err(ReplyControllerSenderError::AdditionalSurbsRequest)
.expect("ReplyControllerReceiver has died!")
}
pub async fn get_lane_queue_length(
&self,
connection_id: ConnectionId,
) -> Result<usize, ReplyControllerSenderError> {
pub async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> usize {
let (response_tx, response_rx) = oneshot::channel();
if let Err(err) = self
.0
self.0
.unbounded_send(ReplyControllerMessage::LaneQueueLength {
connection_id,
response_channel: response_tx,
})
{
return Err(ReplyControllerSenderError::LaneQueueLength(err));
}
.expect("ReplyControllerReceiver has died!");
response_rx
.await
.map_err(ReplyControllerSenderError::ResponseChannelDropped)
match response_rx.await {
Ok(length) => length,
Err(_) => {
error!("The reply controller has dropped our response channel!");
// TODO: should we panic here instead? this message implies something weird and unrecoverable has happened
0
}
}
}
}
@@ -136,10 +110,7 @@ impl ReplyQueueLengths {
}
}
pub async fn get_lane_queue_length(
&self,
connection_id: ConnectionId,
) -> Result<usize, ReplyControllerSenderError> {
pub async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> usize {
self.reply_controller_sender
.get_lane_queue_length(connection_id)
.await
@@ -149,7 +120,7 @@ impl ReplyQueueLengths {
pub(crate) type ReplyControllerReceiver = mpsc::UnboundedReceiver<ReplyControllerMessage>;
#[derive(Debug)]
pub enum ReplyControllerMessage {
pub(crate) enum ReplyControllerMessage {
RetransmitReply {
recipient: AnonymousSenderTag,
timed_out_ack: Weak<PendingAcknowledgement>,
@@ -16,14 +16,14 @@
#![warn(clippy::todo)]
#![warn(clippy::dbg_macro)]
use futures::StreamExt;
use std::time::Duration;
use nym_client_core_config_types::StatsReporting;
use nym_sphinx::addressing::Recipient;
use nym_statistics_common::clients::{
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
};
use nym_task::{connections::TransmissionLane, TaskClient};
use std::time::Duration;
use nym_task::connections::TransmissionLane;
use crate::{
client::inbound_messages::{InputMessage, InputMessageSender},
@@ -51,9 +51,6 @@ pub(crate) struct StatisticsControl {
/// Config for stats reporting (enabled, address, interval)
reporting_config: StatsReporting,
/// Task client for listening for shutdown
task_client: TaskClient,
}
impl StatisticsControl {
@@ -62,24 +59,19 @@ impl StatisticsControl {
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
task_client: TaskClient,
) -> (Self, ClientStatsSender) {
let (stats_tx, stats_rx) = tokio::sync::mpsc::unbounded_channel();
let stats = ClientStatsController::new(client_stats_id, client_type);
let mut task_client_stats_sender = task_client.fork("stats_sender");
task_client_stats_sender.disarm();
(
StatisticsControl {
stats,
stats_rx,
report_tx,
reporting_config,
task_client,
},
ClientStatsSender::new(Some(stats_tx), task_client_stats_sender),
ClientStatsSender::new(Some(stats_tx)),
)
}
@@ -99,43 +91,16 @@ impl StatisticsControl {
}
}
async fn run(&mut self) {
async fn run_with_shutdown(&mut self, mut task_client: nym_task::TaskClient) {
log::debug!("Started StatisticsControl with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut stats_report_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(self.reporting_config.reporting_interval),
);
let mut stats_report_interval =
tokio::time::interval(self.reporting_config.reporting_interval);
let mut local_report_interval = tokio::time::interval(LOCAL_REPORT_INTERVAL);
let mut snapshot_interval = tokio::time::interval(SNAPSHOT_INTERVAL);
#[cfg(not(target_arch = "wasm32"))]
let mut local_report_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(LOCAL_REPORT_INTERVAL),
);
#[cfg(not(target_arch = "wasm32"))]
let mut snapshot_interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(SNAPSHOT_INTERVAL));
#[cfg(target_arch = "wasm32")]
let mut stats_report_interval = gloo_timers::future::IntervalStream::new(
self.reporting_config.reporting_interval.as_millis() as u32,
);
#[cfg(target_arch = "wasm32")]
let mut local_report_interval =
gloo_timers::future::IntervalStream::new(LOCAL_REPORT_INTERVAL.as_millis() as u32);
#[cfg(target_arch = "wasm32")]
let mut snapshot_interval =
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
while !self.task_client.is_shutdown() {
loop {
tokio::select! {
biased;
_ = self.task_client.recv() => {
log::trace!("StatisticsControl: Received shutdown");
break;
},
stats_event = self.stats_rx.recv() => match stats_event {
Some(stats_event) => self.stats.handle_event(stats_event),
None => {
@@ -143,48 +108,44 @@ impl StatisticsControl {
break;
}
},
_ = snapshot_interval.next() => {
_ = snapshot_interval.tick() => {
self.stats.snapshot();
}
_ = stats_report_interval.next() => {
let Some(recipient) = self.reporting_config.provider_address else {
continue
};
if self.reporting_config.enabled {
self.report_stats(recipient).await;
}
_ = stats_report_interval.tick(), if self.reporting_config.enabled && self.reporting_config.provider_address.is_some() => {
// SAFTEY : this branch executes only if reporting is not none, so unwrapp is fine
#[allow(clippy::unwrap_used)]
self.report_stats(self.reporting_config.provider_address.unwrap()).await;
}
_ = local_report_interval.next() => {
self.stats.local_report(&mut self.task_client);
_ = local_report_interval.tick() => {
self.stats.local_report(&mut task_client);
}
_ = task_client.recv_with_delay() => {
log::trace!("StatisticsControl: Received shutdown");
break;
},
}
}
task_client.recv_timeout().await;
log::debug!("StatisticsControl: Exiting");
}
pub(crate) fn start(mut self) {
pub(crate) fn start_with_shutdown(mut self, task_client: nym_task::TaskClient) {
spawn_future(async move {
self.run().await;
self.run_with_shutdown(task_client).await;
})
}
pub(crate) fn create_and_start(
pub(crate) fn create_and_start_with_shutdown(
reporting_config: StatsReporting,
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
task_client: TaskClient,
task_client: nym_task::TaskClient,
) -> ClientStatsSender {
let (controller, sender) = Self::create(
reporting_config,
client_type,
client_stats_id,
report_tx,
task_client,
);
controller.start();
let (controller, sender) =
Self::create(reporting_config, client_type, client_stats_id, report_tx);
controller.start_with_shutdown(task_client);
sender
}
}
@@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
use nym_sphinx::params::DEFAULT_NUM_MIX_HOPS;
use nym_topology::{NymTopology, NymTopologyError};
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -16,36 +17,29 @@ pub struct TopologyAccessorInner {
// few seconds, while reads are needed every single packet generated.
// However, proper benchmarks will be needed to determine if `RwLock` is indeed a better
// approach than a `Mutex`
topology: RwLock<NymRouteProvider>,
topology: RwLock<Option<NymTopology>>,
}
impl TopologyAccessorInner {
fn new(initial: NymRouteProvider) -> Self {
fn new() -> Self {
TopologyAccessorInner {
controlled_manually: AtomicBool::new(false),
released_manual_control: Notify::new(),
topology: RwLock::new(initial),
topology: RwLock::new(None),
}
}
async fn update(&self, new: Option<NymTopology>) {
let mut guard = self.topology.write().await;
match new {
Some(updated) => {
guard.update(updated);
}
None => guard.clear_topology(),
}
*self.topology.write().await = new;
}
}
pub struct TopologyReadPermit<'a> {
permit: RwLockReadGuard<'a, NymRouteProvider>,
permit: RwLockReadGuard<'a, Option<NymTopology>>,
}
impl Deref for TopologyReadPermit<'_> {
type Target = NymRouteProvider;
type Target = Option<NymTopology>;
fn deref(&self) -> &Self::Target {
&self.permit
@@ -59,31 +53,43 @@ impl<'a> TopologyReadPermit<'a> {
&'a self,
ack_recipient: &Recipient,
packet_recipient: Option<&Recipient>,
) -> Result<&'a NymRouteProvider, NymTopologyError> {
let route_provider = self.permit.deref();
let topology = &route_provider.topology;
) -> Result<&'a NymTopology, NymTopologyError> {
// 1. Have we managed to get anything from the refresher, i.e. have the nym-api queries gone through?
topology.ensure_not_empty()?;
let topology = self
.permit
.as_ref()
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
// 2. does the topology have a node on each mixing layer?
topology.ensure_minimally_routable()?;
// 2. does it have any mixnode at all?
// 3. does it have any gateways at all?
// 4. does it have a mixnode on each layer?
topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS)?;
// 3. does it contain OUR gateway (so that we could create an ack packet)?
let _ = route_provider.egress_by_identity(ack_recipient.gateway())?;
// 4. for our target recipient, does it contain THEIR gateway (so that we send anything over?)
if let Some(recipient) = packet_recipient {
let _ = route_provider.egress_by_identity(recipient.gateway())?;
// 5. does it contain OUR gateway (so that we could create an ack packet)?
if !topology.gateway_exists(ack_recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: ack_recipient.gateway().to_base58_string(),
});
}
Ok(route_provider)
// 6. for our target recipient, does it contain THEIR gateway (so that we could create
if let Some(recipient) = packet_recipient {
if !topology.gateway_exists(recipient.gateway()) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: recipient.gateway().to_base58_string(),
});
}
}
Ok(topology)
}
}
impl<'a> From<RwLockReadGuard<'a, NymRouteProvider>> for TopologyReadPermit<'a> {
fn from(permit: RwLockReadGuard<'a, NymRouteProvider>) -> Self {
TopologyReadPermit { permit }
impl<'a> From<RwLockReadGuard<'a, Option<NymTopology>>> for TopologyReadPermit<'a> {
fn from(read_permit: RwLockReadGuard<'a, Option<NymTopology>>) -> Self {
TopologyReadPermit {
permit: read_permit,
}
}
}
@@ -93,11 +99,9 @@ pub struct TopologyAccessor {
}
impl TopologyAccessor {
pub fn new(ignore_egress_epoch_roles: bool) -> Self {
pub fn new() -> Self {
TopologyAccessor {
inner: Arc::new(TopologyAccessorInner::new(NymRouteProvider::new_empty(
ignore_egress_epoch_roles,
))),
inner: Arc::new(TopologyAccessorInner::new()),
}
}
@@ -117,21 +121,8 @@ impl TopologyAccessor {
self.inner.released_manual_control.notified().await
}
#[deprecated(note = "use .current_route_provider instead")]
pub async fn current_topology(&self) -> Option<NymTopology> {
self.current_route_provider()
.await
.as_ref()
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
} else {
Some(provider)
}
self.inner.topology.read().await.clone()
}
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
@@ -149,11 +140,15 @@ impl TopologyAccessor {
// only used by the client at startup to get a slightly more reasonable error message
// (currently displays as unused because health checker is disabled due to required changes)
pub async fn ensure_is_routable(&self) -> Result<(), NymTopologyError> {
self.inner
.topology
.read()
.await
.topology
.ensure_minimally_routable()
match self.inner.topology.read().await.deref() {
None => Err(NymTopologyError::EmptyNetworkTopology),
Some(ref topology) => topology.ensure_can_construct_path_through(DEFAULT_NUM_MIX_HOPS),
}
}
}
impl Default for TopologyAccessor {
fn default() -> Self {
TopologyAccessor::new()
}
}
@@ -3,6 +3,7 @@ use log::{debug, error};
use nym_explorer_client::{ExplorerClient, PrettyDetailedMixNodeBond};
use nym_network_defaults::var_names::EXPLORER_API;
use nym_topology::{
nym_topology_from_basic_info,
provider_trait::{async_trait, TopologyProvider},
NymTopology,
};
@@ -14,6 +15,8 @@ use url::Url;
pub use nym_country_group::CountryGroup;
const MIN_NODES_PER_LAYER: usize = 1;
fn create_explorer_client() -> Option<ExplorerClient> {
let Ok(explorer_api_url) = std::env::var(EXPLORER_API) else {
error!("Missing EXPLORER_API");
@@ -60,20 +63,30 @@ fn log_mixnode_distribution(mixnodes: &HashMap<CountryGroup, Vec<NodeId>>) {
}
fn check_layer_integrity(topology: NymTopology) -> Result<(), ()> {
if topology.ensure_minimally_routable().is_err() {
let mixes = topology.mixes();
if mixes.keys().len() < 3 {
error!("Layer is missing in topology!");
return Err(());
}
for (layer, mixnodes) in mixes {
debug!("Layer {:?} has {} mixnodes", layer, mixnodes.len());
if mixnodes.len() < MIN_NODES_PER_LAYER {
error!(
"There are only {} mixnodes in layer {:?}",
mixnodes.len(),
layer
);
return Err(());
}
}
Ok(())
}
#[deprecated(note = "use NymApiTopologyProvider instead as explorer API will soon be removed")]
pub struct GeoAwareTopologyProvider {
validator_client: nym_validator_client::client::NymApiClient,
filter_on: GroupBy,
}
#[allow(deprecated)]
impl GeoAwareTopologyProvider {
pub fn new(mut nym_api_urls: Vec<Url>, filter_on: GroupBy) -> GeoAwareTopologyProvider {
log::info!(
@@ -91,15 +104,6 @@ impl GeoAwareTopologyProvider {
}
async fn get_topology(&self) -> Option<NymTopology> {
let rewarded_set = self
.validator_client
.get_current_rewarded_set()
.await
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let mut topology = NymTopology::new_empty(rewarded_set);
let mixnodes = match self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
@@ -183,8 +187,7 @@ impl GeoAwareTopologyProvider {
.filter(|m| filtered_mixnode_ids.contains(&m.node_id))
.collect::<Vec<_>>();
topology.add_skimmed_nodes(&mixnodes);
topology.add_skimmed_nodes(&gateways);
let topology = nym_topology_from_basic_info(&mixnodes, &gateways);
// TODO: return real error type
check_layer_integrity(topology.clone()).ok()?;
@@ -193,7 +196,6 @@ impl GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -203,7 +205,6 @@ impl TopologyProvider for GeoAwareTopologyProvider {
}
}
#[allow(deprecated)]
#[cfg(target_arch = "wasm32")]
#[async_trait(?Send)]
impl TopologyProvider for GeoAwareTopologyProvider {
@@ -6,7 +6,6 @@ pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
use futures::StreamExt;
use log::*;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_task::TaskClient;
use nym_topology::NymTopologyError;
use std::time::Duration;
@@ -20,7 +19,6 @@ mod accessor;
pub mod geo_aware_provider;
pub mod nym_api_provider;
#[allow(deprecated)]
pub use geo_aware_provider::GeoAwareTopologyProvider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::TopologyProvider;
@@ -29,7 +27,7 @@ pub use nym_topology::provider_trait::TopologyProvider;
const MAX_FAILURE_COUNT: usize = 10;
pub struct TopologyRefresherConfig {
pub refresh_rate: Duration,
refresh_rate: Duration,
}
impl TopologyRefresherConfig {
@@ -44,8 +42,6 @@ pub struct TopologyRefresher {
refresh_rate: Duration,
consecutive_failure_count: usize,
task_client: TaskClient,
}
impl TopologyRefresher {
@@ -53,14 +49,12 @@ impl TopologyRefresher {
cfg: TopologyRefresherConfig,
topology_accessor: TopologyAccessor,
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
task_client: TaskClient,
) -> Self {
TopologyRefresher {
topology_provider,
topology_accessor,
refresh_rate: cfg.refresh_rate,
consecutive_failure_count: 0,
task_client,
}
}
@@ -102,24 +96,28 @@ impl TopologyRefresher {
self.topology_accessor.ensure_is_routable().await
}
pub async fn ensure_contains_routable_egress(
pub async fn ensure_contains_gateway(
&self,
egress: NodeIdentity,
gateway: &NodeIdentity,
) -> Result<(), NymTopologyError> {
let topology = self
.topology_accessor
.current_route_provider()
.current_topology()
.await
.ok_or(NymTopologyError::EmptyNetworkTopology)?;
let _ = topology.egress_by_identity(egress)?;
if !topology.gateway_exists(gateway) {
return Err(NymTopologyError::NonExistentGatewayError {
identity_key: gateway.to_base58_string(),
});
}
Ok(())
}
pub async fn wait_for_gateway(
&mut self,
gateway: NodeIdentity,
gateway: &NodeIdentity,
timeout_duration: Duration,
) -> Result<(), NymTopologyError> {
info!(
@@ -137,7 +135,7 @@ impl TopologyRefresher {
})
}
_ = self.try_refresh() => {
if self.ensure_contains_routable_egress(gateway).await.is_ok() {
if self.ensure_contains_gateway(gateway).await.is_ok() {
return Ok(())
}
info!("gateway '{gateway}' is still not online...");
@@ -147,7 +145,7 @@ impl TopologyRefresher {
}
}
pub fn start(mut self) {
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
spawn_future(async move {
debug!("Started TopologyRefresher with graceful shutdown support");
@@ -160,17 +158,17 @@ impl TopologyRefresher {
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
while !self.task_client.is_shutdown() {
while !shutdown.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = self.task_client.recv() => {
_ = shutdown.recv() => {
log::trace!("TopologyRefresher: Received shutdown");
},
}
}
self.task_client.recv_timeout().await;
shutdown.recv_timeout().await;
log::debug!("TopologyRefresher: Exiting");
})
}
@@ -4,39 +4,32 @@
use async_trait::async_trait;
use log::{debug, error, warn};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::NymTopology;
use nym_topology::{NymTopology, NymTopologyError};
use nym_validator_client::UserAgent;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
use url::Url;
// the same values as our current (10.06.24) blacklist
pub const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
pub const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
#[derive(Debug)]
pub struct Config {
pub min_mixnode_performance: u8,
pub min_gateway_performance: u8,
pub use_extended_topology: bool,
pub ignore_egress_epoch_role: bool,
}
impl From<nym_client_core_config_types::Topology> for Config {
fn from(value: nym_client_core_config_types::Topology) -> Self {
impl Default for Config {
fn default() -> Self {
// old values that decided on blacklist membership
Config {
min_mixnode_performance: value.minimum_mixnode_performance,
min_gateway_performance: value.minimum_gateway_performance,
use_extended_topology: value.use_extended_topology,
ignore_egress_epoch_role: value.ignore_egress_epoch_role,
min_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
min_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
}
}
}
impl Config {
// if we're using 'extended' topology, filter the nodes based on the lowest set performance
fn min_node_performance(&self) -> u8 {
min(self.min_mixnode_performance, self.min_gateway_performance)
}
}
pub struct NymApiTopologyProvider {
config: Config,
@@ -46,11 +39,7 @@ pub struct NymApiTopologyProvider {
}
impl NymApiTopologyProvider {
pub fn new(
config: impl Into<Config>,
mut nym_api_urls: Vec<Url>,
user_agent: Option<UserAgent>,
) -> Self {
pub fn new(config: Config, mut nym_api_urls: Vec<Url>, user_agent: Option<UserAgent>) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
let validator_client = if let Some(user_agent) = user_agent {
@@ -63,7 +52,7 @@ impl NymApiTopologyProvider {
};
NymApiTopologyProvider {
config: config.into(),
config,
validator_client,
nym_api_urls,
currently_used_api: 0,
@@ -81,69 +70,70 @@ impl NymApiTopologyProvider {
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
}
/// Verifies whether nodes a reasonably distributed among all mix layers.
///
/// In ideal world we would have 33% nodes on layer 1, 33% on layer 2 and 33% on layer 3.
/// However, this is a rather unrealistic expectation, instead we check whether there exists
/// a layer with more than 66% of nodes or with fewer than 15% and if so, we trigger a failure.
///
/// # Arguments
///
/// * `topology`: active topology constructed from validator api data
fn check_layer_distribution(
&self,
active_topology: &NymTopology,
) -> Result<(), NymTopologyError> {
let lower_threshold = 0.15;
let upper_threshold = 0.66;
active_topology.ensure_even_layer_distribution(lower_threshold, upper_threshold)
}
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
let rewarded_set = self
let mixnodes = match self
.validator_client
.get_current_rewarded_set()
.get_all_basic_active_mixing_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get current rewarded set: {err}"))
.ok()?;
let mut topology = NymTopology::new_empty(rewarded_set);
if self.config.use_extended_topology {
let all_nodes = self
.validator_client
.get_all_basic_nodes()
.await
.inspect_err(|err| error!("failed to get network nodes: {err}"))
.ok()?;
debug!(
"there are {} nodes on the network (before filtering)",
all_nodes.len()
);
topology.add_additional_nodes(all_nodes.iter().filter(|n| {
n.performance.round_to_integer() >= self.config.min_node_performance()
}));
} else {
// if we're not using extended topology, we're only getting active set mixnodes and gateways
let mixnodes = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network mixnodes: {err}"))
.ok()?;
// TODO: we really should be getting ACTIVE gateways only
let gateways = self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
.inspect_err(|err| error!("failed to get network gateways: {err}"))
.ok()?;
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
topology.add_additional_nodes(mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}));
topology.add_additional_nodes(gateways.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_gateway_performance
}));
{
Err(err) => {
error!("failed to get network mixnodes - {err}");
return None;
}
Ok(mixes) => mixes,
};
if !topology.is_minimally_routable() {
error!("the current filtered active topology can't be used to construct any packets");
return None;
}
let gateways = match self
.validator_client
.get_all_basic_entry_assigned_nodes()
.await
{
Err(err) => {
error!("failed to get network gateways - {err}");
return None;
}
Ok(gateways) => gateways,
};
Some(topology)
debug!(
"there are {} mixnodes and {} gateways in total (before performance filtering)",
mixnodes.len(),
gateways.len()
);
let topology = NymTopology::from_unordered(
mixnodes.iter().filter(|m| {
m.performance.round_to_integer() >= self.config.min_mixnode_performance
}),
gateways.iter().filter(|g| {
g.performance.round_to_integer() >= self.config.min_gateway_performance
}),
);
if let Err(err) = self.check_layer_distribution(&topology) {
warn!("The current filtered active topology has extremely skewed layer distribution. It cannot be used: {err}");
self.use_next_nym_api();
None
} else {
Some(topology)
}
}
}
@@ -152,11 +142,7 @@ impl NymApiTopologyProvider {
#[async_trait]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
self.get_current_compatible_topology().await
}
}
@@ -164,10 +150,6 @@ impl TopologyProvider for NymApiTopologyProvider {
#[async_trait(?Send)]
impl TopologyProvider for NymApiTopologyProvider {
async fn get_new_topology(&mut self) -> Option<NymTopology> {
let Some(topology) = self.get_current_compatible_topology().await else {
self.use_next_nym_api();
return None;
};
Some(topology)
self.get_current_compatible_topology().await
}
}
+4 -20
View File
@@ -4,8 +4,8 @@
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::identity::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_topology::gateway::GatewayConversionError;
use nym_topology::NymTopologyError;
use nym_validator_client::ValidatorClientError;
use std::error::Error;
use std::path::PathBuf;
@@ -36,13 +36,6 @@ pub enum ClientCoreError {
#[error("no gateway with id: {0}")]
NoGatewayWithId(String),
#[error("Invalid URL: {0}")]
InvalidUrl(String),
#[cfg(not(target_arch = "wasm32"))]
#[error("resolution failed: {0}")]
ResolutionFailed(#[from] nym_http_api_client::HickoryDnsError),
#[error("no gateways on network")]
NoGatewaysOnNetwork,
@@ -81,10 +74,10 @@ pub enum ClientCoreError {
#[error("the gateway id is invalid - {0}")]
UnableToCreatePublicKeyFromGatewayId(Ed25519RecoveryError),
#[error("the node is malformed: {source}")]
#[error("The gateway is malformed: {source}")]
MalformedGateway {
#[from]
source: Box<RoutingNodeError>,
source: GatewayConversionError,
},
#[error("failed to establish connection to gateway: {source}")]
@@ -103,9 +96,6 @@ pub enum ClientCoreError {
#[error("timed out while trying to establish gateway connection")]
GatewayConnectionTimeout,
#[error("failed to forward mix messages to gateway")]
GatewayFailedToForwardMessages,
#[error("no ping measurements for the gateway ({identity}) performed")]
NoGatewayMeasurements { identity: String },
@@ -169,9 +159,6 @@ pub enum ClientCoreError {
#[error("the specified gateway '{gateway}' does not support the wss protocol")]
UnsupportedWssProtocol { gateway: String },
#[error("node {id} ({identity}) does not support mixnet entry mode")]
UnsupportedEntry { id: NodeId, identity: String },
#[error(
"failed to load custom topology using path '{}'. detailed message: {source}", file_path.display()
)]
@@ -222,9 +209,6 @@ pub enum ClientCoreError {
"fresh registration with gateway {gateway_id} somehow requires an additional key upgrade!"
)]
UnexpectedKeyUpgrade { gateway_id: String },
#[error("failed to derive keys from master key")]
HkdfDerivationError {},
}
/// Set of messages that the client can send to listeners via the task manager
+24 -49
View File
@@ -7,7 +7,7 @@ use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use nym_crypto::asymmetric::identity;
use nym_gateway_client::GatewayClient;
use nym_topology::node::RoutingNode;
use nym_topology::gateway;
use nym_validator_client::client::IdentityKeyRef;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
@@ -15,10 +15,6 @@ use std::{sync::Arc, time::Duration};
use tungstenite::Message;
use url::Url;
#[cfg(not(target_arch = "wasm32"))]
use crate::init::websockets::connect_async;
use nym_topology::NodeId;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
@@ -26,7 +22,10 @@ use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(target_arch = "wasm32")]
use wasm_utils::websocket::JSWebsocket;
#[cfg(target_arch = "wasm32")]
@@ -49,30 +48,22 @@ const PING_TIMEOUT: Duration = Duration::from_millis(1000);
// The abstraction that some of these helpers use
pub trait ConnectableGateway {
fn node_id(&self) -> NodeId;
fn identity(&self) -> identity::PublicKey;
fn clients_address(&self, prefer_ipv6: bool) -> Option<String>;
fn identity(&self) -> &identity::PublicKey;
fn clients_address(&self) -> String;
fn is_wss(&self) -> bool;
}
impl ConnectableGateway for RoutingNode {
fn node_id(&self) -> NodeId {
self.node_id
impl ConnectableGateway for gateway::LegacyNode {
fn identity(&self) -> &identity::PublicKey {
self.identity()
}
fn identity(&self) -> identity::PublicKey {
self.identity_key
}
fn clients_address(&self, prefer_ipv6: bool) -> Option<String> {
self.ws_entry_address(prefer_ipv6)
fn clients_address(&self) -> String {
self.clients_address()
}
fn is_wss(&self) -> bool {
self.entry
.as_ref()
.map(|e| e.clients_wss_port.is_some())
.unwrap_or_default()
self.clients_wss_port.is_some()
}
}
@@ -87,13 +78,12 @@ impl<'a, G: ConnectableGateway> GatewayWithLatency<'a, G> {
}
}
pub async fn gateways_for_init<R: Rng>(
pub async fn current_gateways<R: Rng>(
rng: &mut R,
nym_apis: &[Url],
user_agent: Option<UserAgent>,
minimum_performance: u8,
ignore_epoch_roles: bool,
) -> Result<Vec<RoutingNode>, ClientCoreError> {
) -> Result<Vec<gateway::LegacyNode>, ClientCoreError> {
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
@@ -110,14 +100,11 @@ pub async fn gateways_for_init<R: Rng>(
log::trace!("Gateways: {:#?}", gateways);
// filter out gateways below minimum performance and ones that could operate as a mixnode
// (we don't want instability)
let valid_gateways = gateways
.iter()
.filter(|g| ignore_epoch_roles || !g.supported_roles.mixnode)
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<_>>();
.collect::<Vec<gateway::LegacyNode>>();
log::debug!("After checking validity: {}", valid_gateways.len());
log::trace!("Valid gateways: {:#?}", valid_gateways);
@@ -133,7 +120,7 @@ pub async fn gateways_for_init<R: Rng>(
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await {
Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout),
Ok(Err(conn_failure)) => Err(conn_failure),
Ok(Err(conn_failure)) => Err(conn_failure.into()),
Ok(Ok((stream, _))) => Ok(stream),
}
}
@@ -147,12 +134,7 @@ async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, Client
where
G: ConnectableGateway,
{
let Some(addr) = gateway.clients_address(false) else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id(),
identity: gateway.identity().to_string(),
});
};
let addr = gateway.clients_address();
trace!(
"establishing connection to {} ({addr})...",
gateway.identity(),
@@ -208,7 +190,7 @@ where
Ok(GatewayWithLatency::new(gateway, avg))
}
pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
pub async fn choose_gateway_by_latency<'a, R: Rng, G: ConnectableGateway + Clone>(
rng: &mut R,
gateways: &[G],
must_use_tls: bool,
@@ -223,7 +205,7 @@ pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
let gateways_with_latency = Arc::new(tokio::sync::Mutex::new(Vec::new()));
futures::stream::iter(gateways)
.for_each_concurrent(CONCURRENT_GATEWAYS_MEASURED, |gateway| async {
let id = gateway.identity();
let id = *gateway.identity();
trace!("measuring latency to {id}...");
match measure_latency(gateway).await {
Ok(with_latency) => {
@@ -270,9 +252,9 @@ fn filter_by_tls<G: ConnectableGateway>(
pub(super) fn uniformly_random_gateway<R: Rng>(
rng: &mut R,
gateways: &[RoutingNode],
gateways: &[gateway::LegacyNode],
must_use_tls: bool,
) -> Result<RoutingNode, ClientCoreError> {
) -> Result<gateway::LegacyNode, ClientCoreError> {
filter_by_tls(gateways, must_use_tls)?
.choose(rng)
.ok_or(ClientCoreError::NoGatewaysOnNetwork)
@@ -281,9 +263,9 @@ pub(super) fn uniformly_random_gateway<R: Rng>(
pub(super) fn get_specified_gateway(
gateway_identity: IdentityKeyRef,
gateways: &[RoutingNode],
gateways: &[gateway::LegacyNode],
must_use_tls: bool,
) -> Result<RoutingNode, ClientCoreError> {
) -> Result<gateway::LegacyNode, ClientCoreError> {
log::debug!("Requesting specified gateway: {}", gateway_identity);
let user_gateway = identity::PublicKey::from_base58_string(gateway_identity)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
@@ -293,14 +275,7 @@ pub(super) fn get_specified_gateway(
.find(|gateway| gateway.identity_key == user_gateway)
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_identity.to_string()))?;
let Some(entry_details) = gateway.entry.as_ref() else {
return Err(ClientCoreError::UnsupportedEntry {
id: gateway.node_id,
identity: gateway.identity().to_string(),
});
};
if must_use_tls && entry_details.clients_wss_port.is_none() {
if must_use_tls && gateway.clients_wss_port.is_none() {
return Err(ClientCoreError::UnsupportedWssProtocol {
gateway: gateway_identity.to_string(),
});
+2 -4
View File
@@ -19,15 +19,13 @@ use crate::init::types::{
use nym_client_core_gateways_storage::GatewaysDetailsStore;
use nym_client_core_gateways_storage::{GatewayDetails, GatewayRegistration};
use nym_gateway_client::client::InitGatewayClient;
use nym_topology::node::RoutingNode;
use nym_topology::gateway;
use rand::rngs::OsRng;
use rand::{CryptoRng, RngCore};
use serde::Serialize;
pub mod helpers;
pub mod types;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod websockets;
// helpers for error wrapping
@@ -52,7 +50,7 @@ async fn setup_new_gateway<K, D>(
key_store: &K,
details_store: &D,
selection_specification: GatewaySelectionSpecification,
available_gateways: Vec<RoutingNode>,
available_gateways: Vec<gateway::LegacyNode>,
) -> Result<InitialisationResult, ClientCoreError>
where
K: KeyStore,
+6 -41
View File
@@ -13,11 +13,11 @@ use nym_crypto::asymmetric::identity;
use nym_gateway_client::client::InitGatewayClient;
use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::node::RoutingNode;
use nym_topology::gateway;
use nym_validator_client::client::IdentityKey;
use nym_validator_client::nyxd::AccountId;
use serde::Serialize;
use std::fmt::{Debug, Display};
use std::fmt::Display;
use std::sync::Arc;
use time::OffsetDateTime;
use url::Url;
@@ -38,23 +38,16 @@ pub enum SelectedGateway {
impl SelectedGateway {
pub fn from_topology_node(
node: RoutingNode,
node: gateway::LegacyNode,
must_use_tls: bool,
) -> Result<Self, ClientCoreError> {
// for now, let's use 'old' behaviour, if you want to change it, you can pass it up the enum stack yourself : )
let prefer_ipv6 = false;
let gateway_listener = if must_use_tls {
node.ws_entry_address_tls()
node.clients_address_tls()
.ok_or(ClientCoreError::UnsupportedWssProtocol {
gateway: node.identity_key.to_base58_string(),
})?
} else {
node.ws_entry_address(prefer_ipv6)
.ok_or(ClientCoreError::UnsupportedEntry {
id: node.node_id,
identity: node.identity_key.to_base58_string(),
})?
node.clients_address()
};
let gateway_listener =
@@ -207,7 +200,7 @@ pub enum GatewaySetup {
specification: GatewaySelectionSpecification,
// TODO: seems to be a bit inefficient to pass them by value
available_gateways: Vec<RoutingNode>,
available_gateways: Vec<gateway::LegacyNode>,
},
ReuseConnection {
@@ -221,34 +214,6 @@ pub enum GatewaySetup {
},
}
impl Debug for GatewaySetup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GatewaySetup::MustLoad { gateway_id } => f
.debug_struct("GatewaySetup::MustLoad")
.field("gateway_id", gateway_id)
.finish(),
GatewaySetup::New {
specification,
available_gateways,
} => f
.debug_struct("GatewaySetup::New")
.field("specification", specification)
.field("available_gateways", available_gateways)
.field("gateways", specification)
.finish(),
GatewaySetup::ReuseConnection {
gateway_details, ..
} => f
.debug_struct("GatewaySetup::ReuseConnection")
.field("authenticated_ephemeral_client", &"***")
.field("gateway_details", gateway_details)
.field("client_keys", &"***")
.finish(),
}
}
}
impl GatewaySetup {
pub fn try_reuse_connection(init_res: InitialisationResult) -> Result<Self, ClientCoreError> {
if let Some(authenticated_ephemeral_client) = init_res.authenticated_ephemeral_client {
-44
View File
@@ -1,44 +0,0 @@
use crate::error::ClientCoreError;
use nym_http_api_client::HickoryDnsResolver;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::handshake::client::Response;
use url::{Host, Url};
use std::net::SocketAddr;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), ClientCoreError> {
let resolver = HickoryDnsResolver::default();
let uri = Url::parse(endpoint).map_err(|_| ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
let port: u16 = uri.port_or_known_default().unwrap_or(443);
let host = uri
.host()
.ok_or(ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
// the default std resolve
let sock_addrs: Vec<SocketAddr> = match host {
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Domain(domain) => {
// Do a DNS lookup for the domain using our custom DNS resolver
resolver
.resolve_str(domain)
.await?
.into_iter()
.map(|a| SocketAddr::new(a, port))
.collect()
}
};
let stream = TcpStream::connect(&sock_addrs[..]).await?;
tokio_tungstenite::client_async_tls(endpoint, stream)
.await
.map_err(Into::into)
}
+2 -1
View File
@@ -14,7 +14,8 @@ pub mod error;
pub mod init;
pub use nym_topology::{
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
HardcodedTopologyProvider, NymTopology, NymTopologyError, SerializableNymTopology,
SerializableTopologyError, TopologyProvider,
};
#[cfg(target_arch = "wasm32")]
@@ -9,10 +9,7 @@ use crate::backend::fs_backend::{
},
};
use log::{error, info};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use sqlx::ConnectOptions;
use std::path::Path;
#[derive(Debug, Clone)]
@@ -34,9 +31,6 @@ impl StorageManager {
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(fresh)
.disable_statement_logging();
@@ -22,7 +22,7 @@ mod error;
mod manager;
mod models;
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct Backend {
temporary_old_path: Option<PathBuf>,
database_path: PathBuf,
@@ -19,7 +19,7 @@ pub mod fs_backend;
#[error("no information provided")]
pub struct UndefinedError;
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct Empty {
// we need to keep 'basic' metadata here to "load" the CombinedReplyStorage
pub min_surb_threshold: usize,
@@ -27,7 +27,6 @@ nym-credential-storage = { path = "../../credential-storage" }
nym-credentials-interface = { path = "../../credentials-interface" }
nym-crypto = { path = "../../crypto" }
nym-gateway-requests = { path = "../../gateway-requests" }
nym-http-api-client = { path = "../../http-api-client" }
nym-network-defaults = { path = "../../network-defaults" }
nym-sphinx = { path = "../../nymsphinx" }
nym-statistics-common = { path = "../../statistics" }
@@ -40,6 +40,8 @@ use url::Url;
use std::os::fd::RawFd;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
#[cfg(not(unix))]
use std::os::raw::c_int as RawFd;
@@ -51,11 +53,6 @@ use zeroize::Zeroizing;
pub mod config;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) mod websockets;
#[cfg(not(target_arch = "wasm32"))]
use websockets::connect_async;
pub struct GatewayConfig {
pub gateway_identity: identity::PublicKey,
@@ -104,10 +101,6 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
// currently unused (but populated)
negotiated_protocol: Option<u8>,
// Callback on the fd as soon as the connection has been established
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
/// Listen to shutdown messages and send notifications back to the task manager
task_client: TaskClient,
}
@@ -123,7 +116,6 @@ impl<C, St> GatewayClient<C, St> {
packet_router: PacketRouter,
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
task_client: TaskClient,
) -> Self {
GatewayClient {
@@ -139,8 +131,6 @@ impl<C, St> GatewayClient<C, St> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
task_client,
}
}
@@ -204,15 +194,17 @@ impl<C, St> GatewayClient<C, St> {
"Attemting to establish connection to gateway at: {}",
self.gateway_address
);
let (ws_stream, _) = connect_async(
&self.gateway_address,
#[cfg(unix)]
self.connection_fd_callback.clone(),
)
.await?;
let ws_stream = match connect_async(&self.gateway_address).await {
Ok((ws_stream, _)) => ws_stream,
Err(error) => {
return Err(GatewayClientError::NetworkConnectionFailed {
address: self.gateway_address.clone(),
source: error,
})
}
};
self.connection = SocketState::Available(Box::new(ws_stream));
Ok(())
}
@@ -266,19 +258,6 @@ impl<C, St> GatewayClient<C, St> {
}
}
pub async fn send_client_request(
&mut self,
message: ClientRequest,
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.shared_key() {
let encrypted = message.encrypt(&*shared_key)?;
Box::pin(self.send_websocket_message(encrypted)).await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
}
}
async fn read_control_response(&mut self) -> Result<ServerResponse, GatewayClientError> {
// we use the fact that all request responses are Message::Text and only pushed
// sphinx packets are Message::Binary
@@ -332,7 +311,7 @@ impl<C, St> GatewayClient<C, St> {
// If we want to send a message (with response), we need to have a full control over the socket,
// as we need to be able to write the request and read the subsequent response
pub async fn send_websocket_message(
async fn send_websocket_message(
&mut self,
msg: impl Into<Message>,
) -> Result<ServerResponse, GatewayClientError> {
@@ -1053,10 +1032,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
connection: SocketState::NotConnected,
packet_router,
bandwidth_controller: None,
stats_reporter: ClientStatsSender::new(None, task_client.clone()),
stats_reporter: ClientStatsSender::new(None),
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback: None,
task_client,
}
}
@@ -1087,8 +1064,6 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
bandwidth_controller,
stats_reporter,
negotiated_protocol: self.negotiated_protocol,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
task_client,
}
}
@@ -1,88 +0,0 @@
use crate::error::GatewayClientError;
use nym_http_api_client::HickoryDnsResolver;
#[cfg(unix)]
use std::{
os::fd::{AsRawFd, RawFd},
sync::Arc,
};
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::handshake::client::Response;
use url::{Host, Url};
use std::net::SocketAddr;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
use tokio::net::TcpSocket;
let resolver = HickoryDnsResolver::default();
let uri =
Url::parse(endpoint).map_err(|_| GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
let port: u16 = uri.port_or_known_default().unwrap_or(443);
let host = uri
.host()
.ok_or(GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
// the default std resolve
let sock_addrs: Vec<SocketAddr> = match host {
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
Host::Domain(domain) => {
// Do a DNS lookup for the domain using our custom DNS resolver
resolver
.resolve_str(domain)
.await?
.into_iter()
.map(|a| SocketAddr::new(a, port))
.collect()
}
};
let mut stream = Err(GatewayClientError::NoEndpointForConnection {
address: endpoint.to_owned(),
});
for sock_addr in sock_addrs {
let socket = if sock_addr.is_ipv4() {
TcpSocket::new_v4()
} else {
TcpSocket::new_v6()
}
.map_err(|err| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: err.into(),
})?;
#[cfg(unix)]
if let Some(callback) = connection_fd_callback.as_ref() {
callback.as_ref()(socket.as_raw_fd());
}
match socket.connect(sock_addr).await {
Ok(s) => {
stream = Ok(s);
break;
}
Err(err) => {
stream = Err(GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: err.into(),
});
continue;
}
}
}
tokio_tungstenite::client_async_tls(endpoint, stream?)
.await
.map_err(|error| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: error,
})
}
@@ -43,15 +43,8 @@ pub enum GatewayClientError {
#[error("connection failed: {address}: {source}")]
NetworkConnectionFailed { address: String, source: WsError },
#[error("no socket address for endpoint: {address}")]
NoEndpointForConnection { address: String },
#[error("Invalid URL: {0}")]
InvalidUrl(String),
#[cfg(not(target_arch = "wasm32"))]
#[error("resolution failed: {0}")]
ResolutionFailed(#[from] nym_http_api_client::HickoryDnsError),
InvalidURL(String),
#[error("No shared key was provided or obtained")]
NoSharedKeyAvailable,
+1 -3
View File
@@ -8,12 +8,10 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dashmap = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true, features = ["time", "sync"] }
tokio = { workspace = true, features = ["time"] }
tokio-util = { workspace = true, features = ["codec"], optional = true }
tokio-stream = { workspace = true }
# internal
nym-sphinx = { path = "../../nymsphinx" }
+86 -141
View File
@@ -1,24 +1,21 @@
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::StreamExt;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::params::PacketType;
use nym_sphinx::NymPacket;
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::Framed;
use tracing::*;
@@ -58,37 +55,11 @@ pub trait SendWithoutResponse {
}
pub struct Client {
active_connections: ActiveConnections,
connections_count: Arc<AtomicUsize>,
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
config: Config,
}
#[derive(Default, Clone)]
pub struct ActiveConnections {
inner: Arc<DashMap<NymNodeRoutingAddress, ConnectionSender>>,
}
impl ActiveConnections {
pub fn pending_packets(&self) -> usize {
self.inner
.iter()
.map(|sender| {
let max_capacity = sender.channel.max_capacity();
let capacity = sender.channel.capacity();
max_capacity - capacity
})
.sum()
}
}
impl Deref for ActiveConnections {
type Target = DashMap<NymNodeRoutingAddress, ConnectionSender>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
pub struct ConnectionSender {
struct ConnectionSender {
channel: mpsc::Sender<FramedNymPacket>,
current_reconnection_attempt: Arc<AtomicU32>,
}
@@ -102,53 +73,46 @@ impl ConnectionSender {
}
}
struct ManagedConnection {
address: SocketAddr,
message_receiver: ReceiverStream<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
}
impl ManagedConnection {
fn new(
address: SocketAddr,
message_receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
) -> Self {
ManagedConnection {
address,
message_receiver: ReceiverStream::new(message_receiver),
connection_timeout,
current_reconnection,
impl Client {
pub fn new(config: Config) -> Client {
Client {
conn_new: HashMap::new(),
config,
}
}
async fn run(self) {
let address = self.address;
async fn manage_connection(
address: SocketAddr,
receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
) {
let connection_fut = TcpStream::connect(address);
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", self.address);
debug!("Managed to establish connection to {}", address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
self.current_reconnection.store(0, Ordering::Release);
current_reconnection.store(0, Ordering::Release);
Framed::new(stream, NymCodec)
}
Err(err) => {
debug!("failed to establish connection to {address} (err: {err})",);
debug!(
"failed to establish connection to {} (err: {})",
address, err
);
return;
}
},
Err(_) => {
debug!(
"failed to connect to {address} within {:?}",
self.connection_timeout
"failed to connect to {} within {:?}",
address, connection_timeout
);
// we failed to connect - increase reconnection attempt
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
@@ -156,28 +120,15 @@ impl ManagedConnection {
// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {address}: {err}");
if let Err(err) = receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {} - {err}", address);
}
debug!(
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
address
);
}
}
impl Client {
pub fn new(config: Config, connections_count: Arc<AtomicUsize>) -> Client {
Client {
active_connections: Default::default(),
connections_count,
config,
}
}
pub fn active_connections(&self) -> ActiveConnections {
self.active_connections.clone()
}
/// If we're trying to reconnect, determine how long we should wait.
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
@@ -197,7 +148,7 @@ impl Client {
}
fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) {
let (sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
// this CAN'T fail because we just created the channel which has a non-zero capacity
if self.config.maximum_connection_buffer_size > 0 {
@@ -205,16 +156,15 @@ impl Client {
}
// if we already tried to connect to `address` before, grab the current attempt count
let current_reconnection_attempt =
if let Some(mut existing) = self.active_connections.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.active_connections.insert(address, new_entry);
current_attempt
};
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.conn_new.insert(address, new_entry);
current_attempt
};
// load the actual value.
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
@@ -223,7 +173,6 @@ impl Client {
// copy the value before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;
let connections_count = self.connections_count.clone();
tokio::spawn(async move {
// before executing the manager, wait for what was specified, if anything
if let Some(backoff) = backoff {
@@ -231,16 +180,13 @@ impl Client {
sleep(backoff).await;
}
connections_count.fetch_add(1, Ordering::SeqCst);
ManagedConnection::new(
Self::manage_connection(
address.into(),
receiver,
initial_connection_timeout,
current_reconnection_attempt,
&current_reconnection_attempt,
)
.run()
.await;
connections_count.fetch_sub(1, Ordering::SeqCst);
.await
});
}
}
@@ -255,47 +201,49 @@ impl SendWithoutResponse for Client {
trace!("Sending packet to {address:?}");
let framed_packet = FramedNymPacket::new(packet, packet_type);
let Some(sender) = self.active_connections.get_mut(&address) else {
if let Some(sender) = self.conn_new.get_mut(&address) {
if let Err(err) = sender.channel.try_send(framed_packet) {
if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
))
} else if err.is_disconnected() {
debug!(
"Connection to {} seems to be dead. attempting to re-establish it...",
address
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, err.into_inner());
Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
))
} else {
// this can't really happen, but let's safe-guard against it in case something changes in futures library
Err(io::Error::new(
io::ErrorKind::Other,
"unknown connection buffer error",
))
}
} else {
Ok(())
}
} else {
// there was never a connection to begin with
debug!("establishing initial connection to {}", address);
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
return Err(io::Error::new(
Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection is in progress",
));
};
let sending_res = sender.channel.try_send(framed_packet);
drop(sender);
sending_res.map_err(|err| {
match err {
TrySendError::Full(_) => {
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
)
}
TrySendError::Closed(dropped) => {
debug!(
"Connection to {address} seems to be dead. attempting to re-establish it...",
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, dropped);
io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
)
}
}
} )
))
}
}
}
@@ -304,15 +252,12 @@ mod tests {
use super::*;
fn dummy_client() -> Client {
Client::new(
Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
},
Default::default(),
)
Client::new(Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
})
}
#[test]
@@ -11,16 +11,16 @@ use crate::{
use nym_api_requests::ecash::models::{
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
BatchRedeemTicketsBody, EcashBatchTicketRedemptionResponse, EcashTicketVerificationResponse,
IssuedTicketbooksChallengeResponse, IssuedTicketbooksForResponse, VerifyEcashTicketBody,
IssuedTicketbooksChallengeResponse, IssuedTicketbooksForResponse, SpentCredentialsResponse,
VerifyEcashTicketBody,
};
use nym_api_requests::ecash::{
BlindSignRequestBody, BlindedSignatureResponse, PartialCoinIndicesSignatureResponse,
PartialExpirationDateSignatureResponse, VerificationKeyResponse,
};
use nym_api_requests::models::{
ApiHealthResponse, GatewayBondAnnotated, GatewayCoreStatusResponse,
HistoricalPerformanceResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
NymNodeDescription, RewardEstimationResponse, StakeSaturationResponse,
ApiHealthResponse, GatewayBondAnnotated, GatewayCoreStatusResponse, MixnodeCoreStatusResponse,
MixnodeStatusResponse, NymNodeDescription, RewardEstimationResponse, StakeSaturationResponse,
};
use nym_api_requests::models::{LegacyDescribedGateway, MixNodeBondAnnotated};
use nym_api_requests::nym_nodes::SkimmedNode;
@@ -32,10 +32,10 @@ use time::Date;
use url::Url;
pub use crate::nym_api::NymApiClientExt;
use nym_mixnet_contract_common::EpochRewardedSet;
pub use nym_mixnet_contract_common::{
mixnode::MixNodeDetails, GatewayBond, IdentityKey, IdentityKeyRef, NodeId, NymNodeDetails,
};
// re-export the type to not break existing imports
pub use crate::coconut::EcashApiClient;
@@ -264,31 +264,6 @@ impl<C, S> Client<C, S> {
Ok(self.nym_api.get_gateways_detailed_unfiltered().await?)
}
pub async fn get_full_node_performance_history(
&self,
node_id: NodeId,
) -> Result<Vec<HistoricalPerformanceResponse>, ValidatorClientError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut history = Vec::new();
loop {
let mut res = self
.nym_api
.get_node_performance_history(node_id, Some(page), None)
.await?;
history.append(&mut res.history.data);
if history.len() < res.history.pagination.total {
page += 1
} else {
break;
}
}
Ok(history)
}
// TODO: combine with NymApiClient...
pub async fn get_all_cached_described_nodes(
&self,
@@ -392,10 +367,6 @@ impl NymApiClient {
Ok(self.nym_api.get_basic_gateways().await?.nodes)
}
pub async fn get_current_rewarded_set(&self) -> Result<EpochRewardedSet, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_set().await?.into())
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
pub async fn get_all_basic_entry_assigned_nodes(
@@ -646,6 +617,13 @@ impl NymApiClient {
.await?)
}
#[deprecated]
pub async fn spent_credentials_filter(
&self,
) -> Result<SpentCredentialsResponse, ValidatorClientError> {
Ok(self.nym_api.double_spending_filter_v1().await?)
}
pub async fn partial_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
@@ -65,12 +65,6 @@ pub enum EcashApiError {
#[from]
source: cosmrs::ErrorReport,
},
#[error("nym api error")]
NymApi {
#[from]
source: crate::ValidatorClientError,
},
}
impl TryFrom<ContractVKShare> for EcashApiClient {
@@ -13,7 +13,7 @@ use nym_api_requests::ecash::models::{
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, LegacyDescribedMixNode, NodePerformanceResponse,
NodeRefreshBody, NymNodeDescription, PerformanceHistoryResponse, RewardedSetResponse,
NodeRefreshBody, NymNodeDescription,
};
use nym_api_requests::nym_nodes::PaginatedCachedNodesResponse;
use nym_api_requests::pagination::PaginatedResponse;
@@ -31,7 +31,6 @@ pub use nym_api_requests::{
StakeSaturationResponse, UptimeResponse,
},
nym_nodes::{CachedNodesResponse, SkimmedNode},
NymNetworkDetailsResponse,
};
pub use nym_coconut_dkg_common::types::EpochId;
use nym_contracts_common::IdentityKey;
@@ -164,35 +163,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_node_performance_history(
&self,
node_id: NodeId,
page: Option<u32>,
per_page: Option<u32>,
) -> Result<PerformanceHistoryResponse, NymAPIError> {
let mut params = Vec::new();
if let Some(page) = page {
params.push(("page", page.to_string()))
}
if let Some(per_page) = per_page {
params.push(("per_page", per_page.to_string()))
}
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_PERFORMANCE_HISTORY,
&*node_id.to_string(),
],
&params,
)
.await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_nodes_described(
&self,
@@ -209,15 +179,8 @@ pub trait NymApiClientExt: ApiClient {
params.push(("per_page", per_page.to_string()))
}
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_DESCRIBED,
],
&params,
)
.await
self.get_json(&[routes::API_VERSION, "nym-nodes", "described"], &params)
.await
}
#[tracing::instrument(level = "debug", skip_all)]
@@ -236,15 +199,8 @@ pub trait NymApiClientExt: ApiClient {
params.push(("per_page", per_page.to_string()))
}
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_BONDED,
],
&params,
)
.await
self.get_json(&[routes::API_VERSION, "nym-nodes", "bonded"], &params)
.await
}
#[deprecated]
@@ -254,7 +210,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"mixnodes",
"skimmed",
],
@@ -270,7 +226,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"gateways",
"skimmed",
],
@@ -279,19 +235,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_set(&self) -> Result<RewardedSetResponse, NymAPIError> {
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_REWARDED_SET,
],
NO_PARAMS,
)
.await
}
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
#[instrument(level = "debug", skip(self))]
@@ -319,7 +262,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"skimmed",
"entry-gateways",
"all",
@@ -356,7 +299,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"skimmed",
"mixnodes",
"active",
@@ -393,7 +336,7 @@ pub trait NymApiClientExt: ApiClient {
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"nym-nodes",
"skimmed",
"mixnodes",
"all",
@@ -425,12 +368,7 @@ pub trait NymApiClientExt: ApiClient {
}
self.get_json(
&[
routes::API_VERSION,
"unstable",
routes::NYM_NODES_ROUTES,
"skimmed",
],
&[routes::API_VERSION, "unstable", "nym-nodes", "skimmed"],
&params,
)
.await
@@ -739,8 +677,8 @@ pub trait NymApiClientExt: ApiClient {
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_PERFORMANCE,
"nym-nodes",
"performance",
&node_id.to_string(),
],
NO_PARAMS,
@@ -755,8 +693,8 @@ pub trait NymApiClientExt: ApiClient {
self.get_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_ANNOTATION,
"nym-nodes",
"annotation",
&node_id.to_string(),
],
NO_PARAMS,
@@ -850,6 +788,20 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn double_spending_filter_v1(&self) -> Result<SpentCredentialsResponse, NymAPIError> {
self.get_json(
&[
routes::API_VERSION,
routes::ECASH_ROUTES,
routes::DOUBLE_SPENDING_FILTER_V1,
],
NO_PARAMS,
)
.await
}
#[instrument(level = "debug", skip(self))]
async fn partial_expiration_date_signatures(
&self,
@@ -960,24 +912,18 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn force_refresh_describe_cache(
&self,
request: &NodeRefreshBody,
) -> Result<(), NymAPIError> {
self.post_json(
&[
routes::API_VERSION,
routes::NYM_NODES_ROUTES,
routes::NYM_NODES_REFRESH_DESCRIBED,
],
&[routes::API_VERSION, "nym-nodes", "refresh-described"],
NO_PARAMS,
request,
)
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_for(
&self,
expiration_date: Date,
@@ -994,7 +940,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_ticketbooks_challenge(
&self,
expiration_date: Date,
@@ -1014,15 +959,6 @@ pub trait NymApiClientExt: ApiClient {
)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_network_details(&self) -> Result<NymNetworkDetailsResponse, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::NETWORK, routes::DETAILS],
NO_PARAMS,
)
.await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -13,6 +13,8 @@ pub const DETAILED: &str = "detailed";
pub const DETAILED_UNFILTERED: &str = "detailed-unfiltered";
pub const ACTIVE: &str = "active";
pub const REWARDED: &str = "rewarded";
pub const DOUBLE_SPENDING_FILTER_V1: &str = "double-spending-filter-v1";
pub const ECASH_ROUTES: &str = "ecash";
pub use ecash::*;
@@ -32,19 +34,6 @@ pub mod ecash {
pub const EPOCH_ID_PARAM: &str = "epoch_id";
}
pub const NYM_NODES_ROUTES: &str = "nym-nodes";
pub use nym_nodes::*;
pub mod nym_nodes {
pub const NYM_NODES_PERFORMANCE_HISTORY: &str = "performance-history";
pub const NYM_NODES_PERFORMANCE: &str = "performance";
pub const NYM_NODES_ANNOTATION: &str = "annotation";
pub const NYM_NODES_DESCRIBED: &str = "described";
pub const NYM_NODES_BONDED: &str = "bonded";
pub const NYM_NODES_REWARDED_SET: &str = "rewarded-set";
pub const NYM_NODES_REFRESH_DESCRIBED: &str = "refresh-described";
}
pub const STATUS_ROUTES: &str = "status";
pub const API_STATUS_ROUTES: &str = "api-status";
pub const HEALTH: &str = "health";
@@ -65,8 +54,6 @@ pub const STAKE_SATURATION: &str = "stake-saturation";
pub const INCLUSION_CHANCE: &str = "inclusion-probability";
pub const SUBMIT_GATEWAY: &str = "submit-gateway-monitoring-results";
pub const SUBMIT_NODE: &str = "submit-node-monitoring-results";
pub const PERFORMANCE: &str = "performance";
pub const SERVICE_PROVIDERS: &str = "services";
pub const DETAILS: &str = "details";
pub const NETWORK: &str = "network";
@@ -26,10 +26,10 @@ use nym_mixnet_contract_common::{
reward_params::{Performance, RewardingParams},
rewarding::{EstimatedCurrentEpochRewardResponse, PendingRewardResponse},
ContractBuildInformation, ContractState, ContractStateParams, CurrentIntervalResponse,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochRewardedSet, EpochStatus,
GatewayBond, GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry,
IdentityKey, IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails,
MixOwnershipResponse, MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
CurrentNymNodeVersionResponse, Delegation, EpochEventId, EpochStatus, GatewayBond,
GatewayBondResponse, GatewayOwnershipResponse, HistoricalNymNodeVersionEntry, IdentityKey,
IdentityKeyRef, IntervalEventId, MixNodeBond, MixNodeDetails, MixOwnershipResponse,
MixnodeDetailsByIdentityResponse, MixnodeDetailsResponse, NodeId,
NumberOfPendingEventsResponse, NymNodeBond, NymNodeDetails, NymNodeVersionHistoryResponse,
PagedAllDelegationsResponse, PagedDelegatorDelegationsResponse, PagedGatewayResponse,
PagedMixnodeBondsResponse, PagedNodeDelegationsResponse, PendingEpochEvent,
@@ -670,7 +670,7 @@ impl<T> PagedMixnetQueryClient for T where T: MixnetQueryClient {}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MixnetQueryClientExt: MixnetQueryClient {
async fn get_rewarded_set(&self) -> Result<EpochRewardedSet, NyxdError> {
async fn get_rewarded_set(&self) -> Result<RewardedSet, NyxdError> {
let error_response = |message| Err(NyxdError::extension_query_failure("mixnet", message));
let metadata = self.get_rewarded_set_metadata().await?;
@@ -711,16 +711,13 @@ pub trait MixnetQueryClientExt: MixnetQueryClient {
return error_response("the nodes assigned for 'standby' returned unexpected epoch_id");
}
Ok(EpochRewardedSet {
epoch_id: expected_epoch_id,
assignment: RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
},
Ok(RewardedSet {
entry_gateways: entry.nodes,
exit_gateways: exit.nodes,
layer1: layer1.nodes,
layer2: layer2.nodes,
layer3: layer3.nodes,
standby: standby.nodes,
})
}
}
@@ -153,20 +153,13 @@ pub trait CosmWasmClient: TendermintRpcClient {
let req = QueryAllBalancesRequest {
address: address.to_string(),
pagination,
resolve_denom: false,
};
let mut res = self
.make_abci_query::<_, QueryAllBalancesResponse>(path.clone(), req)
.await?;
let early_break = res.balances.is_empty();
raw_balances.append(&mut res.balances);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -194,13 +187,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryTotalSupplyResponse>(path.clone(), req)
.await?;
let early_break = res.supply.is_empty();
supply.append(&mut res.supply);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -341,13 +328,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryCodesResponse>(path.clone(), req)
.await?;
let early_break = res.code_infos.is_empty();
raw_codes.append(&mut res.code_infos);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -392,13 +373,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryContractsByCodeResponse>(path.clone(), req)
.await?;
let early_break = res.contracts.is_empty();
raw_contracts.append(&mut res.contracts);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -454,13 +429,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QueryContractHistoryResponse>(path.clone(), req)
.await?;
let early_break = res.entries.is_empty();
raw_entries.append(&mut res.entries);
if early_break {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
@@ -4,11 +4,9 @@
use crate::rpc::TendermintRpcClient;
use async_trait::async_trait;
use base64::Engine;
use cosmrs::tendermint;
use cosmrs::tendermint::{block::Height, evidence::Evidence, Hash};
use reqwest::header::HeaderMap;
use reqwest::{header, RequestBuilder};
use tendermint_rpc::dialect::{v0_34, v0_37, v0_38, LatestDialect};
use tendermint_rpc::{
client::CompatMode,
dialect::{self, Dialect},
@@ -23,21 +21,8 @@ macro_rules! perform_with_compat {
($self:expr, $request:expr) => {{
let request = $request;
match $self.compat {
CompatMode::V0_38 => {
$self
.perform_request_with_dialect(request, dialect::v0_38::Dialect)
.await
}
CompatMode::V0_37 => {
$self
.perform_request_with_dialect(request, dialect::v0_37::Dialect)
.await
}
CompatMode::V0_34 => {
$self
.perform_request_with_dialect(request, dialect::v0_34::Dialect)
.await
}
CompatMode::V0_37 => $self.perform_v0_37(request).await,
CompatMode::V0_34 => $self.perform_v0_34(request).await,
}
}};
}
@@ -85,11 +70,7 @@ impl ReqwestRpcClient {
.headers(headers)
}
async fn perform_request_with_dialect<R, S>(
&self,
request: R,
_dialect: S,
) -> Result<R::Output, Error>
async fn perform_request<R, S>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest<S>,
S: Dialect,
@@ -100,25 +81,26 @@ impl ReqwestRpcClient {
.send()
.await
.map_err(TendermintRpcErrorMap::into_rpc_err)?;
let response_status = response.status();
let bytes = response
.bytes()
.await
.map_err(TendermintRpcErrorMap::into_rpc_err)?;
// Successful JSON-RPC requests are expected to return a 200 OK HTTP status.
// Otherwise, this means that the HTTP request failed as a whole,
// as opposed to the JSON-RPC request returning an error,
// and we cannot expect the response body to be a valid JSON-RPC response.
if response_status != reqwest::StatusCode::OK {
// hehe, that's so nasty but we have to somehow convert between different versions of the same lib
return Err(Error::http_request_failed(
response_status.as_u16().try_into().unwrap(),
));
}
R::Response::from_string(bytes).map(Into::into)
}
async fn perform_v0_34<R>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest<dialect::v0_34::Dialect>,
{
self.perform_request(request).await
}
async fn perform_v0_37<R>(&self, request: R) -> Result<R::Output, Error>
where
R: SimpleRequest<dialect::v0_37::Dialect>,
{
self.perform_request(request).await
}
}
trait TendermintRpcErrorMap {
@@ -138,50 +120,18 @@ impl TendermintRpcClient for ReqwestRpcClient {
where
R: SimpleRequest,
{
self.perform_request_with_dialect(request, LatestDialect)
.await
self.perform_request(request).await
}
async fn block<H>(&self, height: H) -> Result<endpoint::block::Response, Error>
async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
where
H: Into<Height> + Send,
{
perform_with_compat!(self, endpoint::block::Request::new(height.into()))
perform_with_compat!(self, block_results::Request::new(height.into()))
}
async fn block_by_hash(
&self,
hash: tendermint::Hash,
) -> Result<endpoint::block_by_hash::Response, Error> {
perform_with_compat!(self, endpoint::block_by_hash::Request::new(hash))
}
async fn latest_block(&self) -> Result<endpoint::block::Response, Error> {
perform_with_compat!(self, endpoint::block::Request::default())
}
async fn block_results<H>(&self, height: H) -> Result<endpoint::block_results::Response, Error>
where
H: Into<Height> + Send,
{
perform_with_compat!(self, endpoint::block_results::Request::new(height.into()))
}
async fn latest_block_results(&self) -> Result<endpoint::block_results::Response, Error> {
perform_with_compat!(self, endpoint::block_results::Request::default())
}
async fn block_search(
&self,
query: Query,
page: u32,
per_page: u8,
order: Order,
) -> Result<endpoint::block_search::Response, Error> {
perform_with_compat!(
self,
endpoint::block_search::Request::new(query, page, per_page, order)
)
async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
perform_with_compat!(self, block_results::Request::default())
}
async fn header<H>(&self, height: H) -> Result<endpoint::header::Response, Error>
@@ -190,26 +140,11 @@ impl TendermintRpcClient for ReqwestRpcClient {
{
let height = height.into();
match self.compat {
CompatMode::V0_38 => {
self.perform_request_with_dialect(
endpoint::header::Request::new(height),
v0_38::Dialect,
)
.await
}
CompatMode::V0_37 => {
self.perform_request_with_dialect(
endpoint::header::Request::new(height),
v0_37::Dialect,
)
.await
}
CompatMode::V0_37 => self.perform(endpoint::header::Request::new(height)).await,
CompatMode::V0_34 => {
// Back-fill with a request to /block endpoint and
// taking just the header from the response.
let resp = self
.perform_request_with_dialect(block::Request::new(height), v0_34::Dialect)
.await?;
let resp = self.perform_v0_34(block::Request::new(height)).await?;
Ok(resp.into())
}
}
@@ -217,25 +152,12 @@ impl TendermintRpcClient for ReqwestRpcClient {
async fn header_by_hash(&self, hash: Hash) -> Result<header_by_hash::Response, Error> {
match self.compat {
CompatMode::V0_38 => {
self.perform_request_with_dialect(
header_by_hash::Request::new(hash),
v0_38::Dialect,
)
.await
}
CompatMode::V0_37 => {
self.perform_request_with_dialect(
header_by_hash::Request::new(hash),
v0_37::Dialect,
)
.await
}
CompatMode::V0_37 => self.perform(header_by_hash::Request::new(hash)).await,
CompatMode::V0_34 => {
// Back-fill with a request to /block_by_hash endpoint and
// taking just the header from the response.
let resp = self
.perform_request_with_dialect(block_by_hash::Request::new(hash), v0_34::Dialect)
.perform_v0_34(block_by_hash::Request::new(hash))
.await?;
Ok(resp.into())
}
@@ -245,18 +167,8 @@ impl TendermintRpcClient for ReqwestRpcClient {
/// `/broadcast_evidence`: broadcast an evidence.
async fn broadcast_evidence(&self, e: Evidence) -> Result<evidence::Response, Error> {
match self.compat {
CompatMode::V0_38 => {
self.perform_request_with_dialect(evidence::Request::new(e), v0_38::Dialect)
.await
}
CompatMode::V0_37 => {
self.perform_request_with_dialect(evidence::Request::new(e), v0_37::Dialect)
.await
}
CompatMode::V0_34 => {
self.perform_request_with_dialect(evidence::Request::new(e), v0_34::Dialect)
.await
}
CompatMode::V0_37 => self.perform(evidence::Request::new(e)).await,
CompatMode::V0_34 => self.perform_v0_34(evidence::Request::new(e)).await,
}
}
+1 -2
View File
@@ -12,8 +12,7 @@ dirs = { workspace = true, optional = true }
handlebars = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
toml = { workspace = true, features = ["display"] }
toml = { workspace = true }
url = { workspace = true }
nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
-10
View File
@@ -1,10 +0,0 @@
use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NymConfigTomlError {
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
TomlSerializeFailure(#[from] toml::ser::Error),
}
-37
View File
@@ -13,7 +13,6 @@ pub use helpers::{parse_urls, OptionalSet};
pub use toml::de::Error as TomlDeError;
pub mod defaults;
pub mod error;
pub mod helpers;
pub mod legacy_helpers;
pub mod serde_helpers;
@@ -96,42 +95,6 @@ where
config.format_to_writer(file)
}
pub fn save_unformatted_config_to_file<C, P>(
config: &C,
path: P,
) -> Result<(), error::NymConfigTomlError>
where
C: Serialize + ?Sized,
P: AsRef<Path>,
{
let path = path.as_ref();
log::info!("saving config file to {}", path.display());
if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}
let mut file = File::create(path)?;
// TODO: check for whether any of our configs store anything sensitive
// and change that to 0o644 instead
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(path, perms)?;
}
// let serde format the TOML in whatever ugly way it chooses
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
let toml_string = toml::to_string_pretty(config)?;
Ok(file.write_all(toml_string.as_bytes())?)
}
pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
where
C: DeserializeOwned,
@@ -7,7 +7,6 @@
use crate::constants::{TOKEN_SUPPLY, UNIT_DELEGATION_BASE};
use crate::error::MixnetContractError;
use crate::helpers::IntoBaseDecimal;
use crate::nym_node::Role;
use crate::reward_params::{NodeRewardingParameters, RewardingParams};
use crate::rewarding::helpers::truncate_reward;
use crate::rewarding::RewardDistribution;
@@ -625,16 +624,6 @@ pub enum LegacyMixLayer {
Three = 3,
}
impl From<LegacyMixLayer> for Role {
fn from(layer: LegacyMixLayer) -> Self {
match layer {
LegacyMixLayer::One => Role::Layer1,
LegacyMixLayer::Two => Role::Layer2,
LegacyMixLayer::Three => Role::Layer3,
}
}
}
impl From<LegacyMixLayer> for String {
fn from(layer: LegacyMixLayer) -> Self {
(layer as u8).to_string()

Some files were not shown because too many files have changed in this diff Show More