Compare commits

...

59 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 8d305c29eb Gateway peer fixes (#4985)
* Create bandwidth entry

* Remove mismatch possibilities
2024-10-18 12:49:31 +03:00
Bogdan-Ștefan Neacşu e057f672fa Add topup req constructor (#4983) 2024-10-18 08:55:56 +03:00
Bogdan-Ștefan Neacşu edc2a367c0 Top up bandwidth (#4975)
* Top up wg bandwidth

* Introduce v3 with top up

* Verify and increase cred bw

* Add log

* Fix clippy
2024-10-17 16:35:30 +03:00
Bogdan-Ștefan Neacşu e78040f0ec Use ticket type when retrieving from storage (#4947) 2024-10-15 11:23:34 +03:00
Jędrzej Stuczyński 5e0d1bb14e swapped base85 for base85rs with more compatible license 2024-10-08 15:30:57 +01:00
Jędrzej Stuczyński c16746a47b introduced internal tool for importing contract states 2024-10-08 15:06:41 +01:00
Jędrzej Stuczyński a21052b72e exposed contract state query to nym-cli 2024-10-08 11:19:01 +01:00
Jędrzej Stuczyński 92e9da7be5 extended CosmWasmClient with AllContractState query 2024-10-08 11:18:49 +01:00
Simon Wicky 143b336978 expose autheticator address along other address in node-details (#4953) 2024-10-07 17:19:17 +02:00
Jędrzej Stuczyński d4293c9bae Merge pull request #4936 from nymtech/bugfix/rpc-pagination
bugfix: correctly paginate through 'search_tx' endpoint
2024-10-07 16:17:45 +01:00
Drazen Urch e2d1806e49 Extract packet processing from mixnode-common (#4949)
* Extract packet processing from mixnode-common

* Cleanup
2024-10-07 12:00:36 +02:00
Jon Häggblad 469f85fc49 Switch over the last set of jobs to arc runners (#4938)
* Switch over the last set of jobs to arc runners

* Use dind runners

* Disable matrix notification so we can use non-dind runners

* wip

* Add workflow_dispatch

* Revert "wip"

This reverts commit ae34efd32e.
2024-10-07 11:46:35 +02:00
import this 1202a2f5f4 [DOCs:/operators]: Update FAQ sphinx size (#4946)
* updating sphinx size to develop branch from an outdated commit url

* remove outdated graphs

* add token page url
2024-09-30 14:42:11 +00:00
dependabot[bot] 6030bf6c95 build(deps): bump the patch-updates group across 1 directory with 9 updates (#4944) 2024-09-30 16:40:54 +02:00
Drazen Urch 09a771f58f Add "utoipa" feature to nym-node (#4945) 2024-09-30 15:40:33 +02:00
Drazen Urch 676a909aee V2 performance monitoring feature flag (#4943) 2024-09-30 15:13:22 +02:00
Jędrzej Stuczyński e37145422c Merge pull request #4942 from nymtech/bugfix/rewarder-post-pruning-adjustments
Bugfix/rewarder post pruning adjustments
2024-09-27 18:23:32 +01:00
Jędrzej Stuczyński 4ad52accc0 fixed stabilised clippy issue 2024-09-27 18:10:59 +01:00
Jędrzej Stuczyński 784fae2204 fix logic for determining end height for processing block ranges 2024-09-27 17:53:55 +01:00
Jędrzej Stuczyński 8aa5711bee fixed query for historical validator data 2024-09-27 17:48:56 +01:00
Jędrzej Stuczyński 07022314fc fixed typos and formatting 2024-09-27 16:51:45 +01:00
Jędrzej Stuczyński 76c3081470 introduced rewarding resync alongside recovery instructions 2024-09-27 16:41:07 +01:00
Jędrzej Stuczyński d399161d31 adjust 'process_until' command to allow empty stop height 2024-09-27 16:35:53 +01:00
Jędrzej Stuczyński 27fb4ae0cc log error when rewarding fails due to missing blocks 2024-09-27 16:21:27 +01:00
Jędrzej Stuczyński 74392a2886 don't request useless blocks during startup sync 2024-09-27 15:53:16 +01:00
Jędrzej Stuczyński 457c478a03 introduced cli command to process a block range 2024-09-27 15:15:13 +01:00
Jędrzej Stuczyński 5e95992427 introduced cli command to process an individual block 2024-09-27 14:57:39 +01:00
Jędrzej Stuczyński d7eecd481c decreased SOCKET_FAILURE_RESET
the previous value of 2h was way too big. especially since it was quite likely for multiple failures to occur hourly during increased validator load when mixnet epoch was getting transitioned
2024-09-27 14:13:09 +01:00
import this e08fc4894b [DOCs/operators]: Release notes v2024.11-wedel (#4939)
* finish release notes

* add a note
2024-09-27 13:12:15 +00:00
Bogdan-Ștefan Neacşu fabd48b7ea Fix broken build after merge (#4937) 2024-09-26 18:44:21 +02:00
Bogdan-Ștefan Neacşu 894e0bd1bf Add more conversions for responses of authenticator messages (#4929)
* More conversions for responses

* Expose version
2024-09-26 18:00:13 +02:00
Jędrzej Stuczyński f86e088663 bugfix: correctly paginate through 'search_tx' endpoint 2024-09-26 16:30:57 +01:00
benedetta davico f76300669a Merge pull request #4931 from nymtech/feature/wedel-merge-conflicts
Wedel release to develop
2024-09-26 13:46:24 +02:00
Jędrzej Stuczyński 333ace1f97 Merge branch 'release/2024.11-wedel' into feature/wedel-merge-conflicts 2024-09-26 08:56:11 +01:00
Dinko Zdravac 487bf6732e Assume offline mode in sqlx (#4926)
* Assume offline mode

* PR feedback
2024-09-25 13:28:36 +02:00
Jędrzej Stuczyński 5d4a0fef55 Merge pull request #4871 from nymtech/chore/remove-another-mixnet-migration
chore: remove queued migration for adding explicit admin
2024-09-25 09:37:49 +01:00
Jon Häggblad 1627146c0e Make ip-packet-request VERSION pub (#4925) 2024-09-25 09:56:32 +02:00
Dinko Zdravac ae40a00b8f Data Observatory stub (#4905)
* Data Observatory stub

* Fix sqlx in CI

* Add troubleshooting tips for sqlx

* Update CI paths to trigger for this package

* Add this to CI upload binary build
2024-09-24 16:48:15 +02:00
Jon Häggblad 7f3c0470e0 Fix argument to cargo-deny action (#4922) 2024-09-24 13:17:35 +02:00
Bogdan-Ștefan Neacşu 1bc26ed79f Expose error type (#4924) 2024-09-24 11:48:54 +02:00
mx 60fa5cfeb8 Max/rust sdk stream abstraction (#4743)
* add TcpProxyClient and TcpProxyServer abstractions to SDK 
* add single connection example
* add multi-connection example 
* add simple echo server to `tools/`: used for multi-connection example 
* update FFI toml files: switched to local imports 
* add proxy bindings to `ffi/shared`
* add proxy bindings and example to `ffi/go` 
* add note to `ffi/cpp` about lack of Proxy bindings for the moment
2024-09-24 09:29:46 +00:00
Jon Häggblad 3b7088aeea Fix nymvpn.com url in mainnet defaults (#4920) 2024-09-24 10:25:27 +02:00
Bogdan-Ștefan Neacşu 179d214e21 Check both version and type in message header (#4918)
* Move client type to the client code

* Check both version and type in header
2024-09-23 17:57:03 +02:00
Jon Häggblad 2a94ce6443 Bump http-api-client default timeout to 30 sec (#4917) 2024-09-23 15:45:47 +02:00
Bogdan-Ștefan Neacşu 95ec91daa1 Entry wireguard tickets (#4888)
* Create credential verifier in authenticator

* Add new version of peer storage with client id

* Fix v1 to what it was before

* Compact storage into ecash verifier

* Fix non-linux build

* Less overlapping conditions

* Remove moved code

* Use handler thread for each peer

* Re-spawn stored handles at startup

* Keep new function without async & Result

* Put query peer in function too

* Query bandwidth

* Fix clippy

* Replace tap with inspect_err

* Fix copyright year

* Handle version 2 on the reqeust deser

* Add protocol type in req/resp messages
2024-09-23 14:49:18 +02:00
benedettadavico 803850be74 bump versions & update changelog 2024-09-23 10:00:20 +02:00
Jędrzej Stuczyński 3dc62a9a60 Merge pull request #4892 from nymtech/bugfix/ticketbook-false-double-spending
Bugfix/ticketbook false double spending
2024-09-19 09:44:43 +01:00
Jędrzej Stuczyński 5753b79997 slightly refactored bandwidth tracking 2024-09-18 11:27:35 +01:00
Jędrzej Stuczyński 2a6aa13ecd fixed client bandwidth being not correctly deducted 2024-09-18 11:12:24 +01:00
benedetta davico d5c9e1d8cb Merge pull request #4899 from nymtech/jon/cherry-pick-4894-into-wedel
Backport #4894 to fix ci
2024-09-18 09:28:15 +02:00
Jon Häggblad 87751894d9 Fix apt install in ci-build-upload-binaries.yml (#4894) 2024-09-18 09:07:45 +02:00
Jędrzej Stuczyński c8c3928575 put client bandwidth (gateway-side) behind shared pointer 2024-09-17 18:40:24 +01:00
Jędrzej Stuczyński 2fa8da8117 making sure there can be only a single client task claiming more bandwidth 2024-09-17 18:39:49 +01:00
Jędrzej Stuczyński 4548ef4d05 adding extra logs 2024-09-17 18:39:01 +01:00
Jędrzej Stuczyński 7f147ee2b0 Merge pull request #4891 from nymtech/bugfix/ticketbook-aux-imports
fix: allow updating globally stored signatures
2024-09-17 15:33:34 +01:00
Jędrzej Stuczyński 48bcd7e802 fix: allow updating globally stored signatures 2024-09-17 14:21:42 +01:00
benedettadavico aad028be3f update qa env 2024-09-13 11:48:49 +02:00
Bogdan-Ștefan Neacşu 6db3b34bcb Bump defguard to github latest version (#4872)
* Bump defguard to github latest version

* Fix comment location
2024-09-12 13:49:33 +02:00
Jędrzej Stuczyński f9383578da chore: remove queued migration for adding explicit admin 2024-09-12 11:03:51 +01:00
188 changed files with 8531 additions and 1533 deletions
+1 -22
View File
@@ -5,7 +5,7 @@ on:
jobs:
build:
runs-on: ubuntu-20.04-16-core
runs-on: arc-ubuntu-20.04
steps:
- uses: actions/checkout@v4
- name: Install Dependencies (Linux)
@@ -99,24 +99,3 @@ jobs:
run: vercel deploy --prebuilt --prod --token=${{ secrets.VERCEL_TOKEN }}
working-directory: dist/docs
continue-on-error: false
- name: Matrix - Node Install
run: npm install
working-directory: .github/workflows/support-files
- name: Matrix - Send Notification
env:
NYM_NOTIFICATION_KIND: cd-docs
NYM_PROJECT_NAME: "Docs CD"
NYM_CI_WWW_BASE: "${{ secrets.NYM_CD_WWW_BASE }}"
NYM_CI_WWW_LOCATION: "${{ env.GITHUB_REF_SLUG }}"
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
GIT_BRANCH: "${GITHUB_REF##*/}"
MATRIX_SERVER: "${{ secrets.MATRIX_SERVER }}"
MATRIX_ROOM: "${{ secrets.MATRIX_ROOM_DOCS }}"
MATRIX_USER_ID: "${{ secrets.MATRIX_USER_ID }}"
MATRIX_TOKEN: "${{ secrets.MATRIX_TOKEN }}"
MATRIX_DEVICE_ID: "${{ secrets.MATRIX_DEVICE_ID }}"
IS_SUCCESS: "${{ job.status == 'success' }}"
uses: docker://keybaseio/client:stable-node
with:
args: .github/workflows/support-files/notifications/entry_point.sh
+2 -21
View File
@@ -1,6 +1,7 @@
name: ci-build-ts
on:
workflow_dispatch:
pull_request:
paths:
- "ts-packages/**"
@@ -9,7 +10,7 @@ on:
jobs:
build:
runs-on: ubuntu-20.04-16-core
runs-on: arc-ubuntu-20.04
steps:
- uses: actions/checkout@v4
- name: Install rsync
@@ -45,23 +46,3 @@ jobs:
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/ts-${{ env.GITHUB_REF_SLUG }}-example
EXCLUDE: "/dist/, /node_modules/"
- name: Matrix - Node Install
run: npm install
working-directory: .github/workflows/support-files
- name: Matrix - Send Notification
env:
NYM_NOTIFICATION_KIND: ts-packages
NYM_PROJECT_NAME: "ts-packages"
NYM_CI_WWW_BASE: "${{ secrets.NYM_CI_WWW_BASE }}"
NYM_CI_WWW_LOCATION: "ts-${{ env.GITHUB_REF_SLUG }}"
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
GIT_BRANCH: "${GITHUB_REF##*/}"
IS_SUCCESS: "${{ job.status == 'success' }}"
MATRIX_SERVER: "${{ secrets.MATRIX_SERVER }}"
MATRIX_ROOM: "${{ secrets.MATRIX_ROOM }}"
MATRIX_USER_ID: "${{ secrets.MATRIX_USER_ID }}"
MATRIX_TOKEN: "${{ secrets.MATRIX_TOKEN }}"
MATRIX_DEVICE_ID: "${{ secrets.MATRIX_DEVICE_ID }}"
uses: docker://keybaseio/client:stable-node
with:
args: .github/workflows/support-files/notifications/entry_point.sh
@@ -26,6 +26,7 @@ on:
- "nym-api/**"
- "nym-node/**"
- "nym-outfox/**"
- 'nym-data-observatory/**'
- "nym-validator-rewarder/**"
- "sdk/rust/nym-sdk/**"
- "service-providers/**"
@@ -96,6 +97,7 @@ jobs:
target/release/nym-socks5-client
target/release/nym-api
target/release/nym-network-requester
target/release/nym-data-observatory
target/release/nym-cli
target/release/nymvisor
target/release/nym-node
@@ -113,6 +115,7 @@ jobs:
cp target/release/nym-socks5-client $OUTPUT_DIR
cp target/release/nym-api $OUTPUT_DIR
cp target/release/nym-network-requester $OUTPUT_DIR
cp target/release/nym-data-observatory $OUTPUT_DIR
cp target/release/nymvisor $OUTPUT_DIR
cp target/release/nym-node $OUTPUT_DIR
cp target/release/nym-cli $OUTPUT_DIR
+1
View File
@@ -16,6 +16,7 @@ on:
- 'nym-api/**'
- 'nym-node/**'
- 'nym-outfox/**'
- 'nym-data-observatory/**'
- 'nym-validator-rewarder/**'
- 'tools/**'
- 'wasm/**'
+1 -1
View File
@@ -22,4 +22,4 @@ jobs:
with:
log-level: warn
command: check ${{ matrix.checks }}
argument: --all-features
arguments: --all-features
+1 -22
View File
@@ -10,7 +10,7 @@ on:
jobs:
build:
runs-on: ubuntu-20.04-16-core
runs-on: arc-ubuntu-20.04
steps:
- uses: actions/checkout@v4
- name: Install Dependencies (Linux)
@@ -70,24 +70,3 @@ jobs:
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/docs-${{ env.GITHUB_REF_SLUG }}
EXCLUDE: "/node_modules/"
- name: Matrix - Node Install
run: npm install
working-directory: .github/workflows/support-files
- name: Matrix - Send Notification
env:
NYM_NOTIFICATION_KIND: ci-docs
NYM_PROJECT_NAME: "Docs CI"
NYM_CI_WWW_BASE: "${{ secrets.NYM_CI_WWW_BASE }}"
NYM_CI_WWW_LOCATION: "docs-${{ env.GITHUB_REF_SLUG }}"
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
GIT_BRANCH: "${GITHUB_REF##*/}"
MATRIX_SERVER: "${{ secrets.MATRIX_SERVER }}"
MATRIX_ROOM: "${{ secrets.MATRIX_ROOM_DOCS }}"
MATRIX_USER_ID: "${{ secrets.MATRIX_USER_ID }}"
MATRIX_TOKEN: "${{ secrets.MATRIX_TOKEN }}"
MATRIX_DEVICE_ID: "${{ secrets.MATRIX_DEVICE_ID }}"
IS_SUCCESS: "${{ job.status == 'success' }}"
uses: docker://keybaseio/client:stable-node
with:
args: .github/workflows/support-files/notifications/entry_point.sh
+2 -22
View File
@@ -1,6 +1,7 @@
name: ci-lint-typescript
on:
workflow_dispatch:
pull_request:
paths:
- "ts-packages/**"
@@ -14,7 +15,7 @@ on:
jobs:
build:
runs-on: ubuntu-20.04-16-core
runs-on: arc-ubuntu-20.04
steps:
- uses: actions/checkout@v4
- uses: rlespinasse/github-slug-action@v3.x
@@ -53,24 +54,3 @@ jobs:
run: yarn lint
- name: Typecheck with tsc
run: yarn tsc
- name: Matrix - Node Install
run: npm install
working-directory: .github/workflows/support-files
- name: Matrix - Send Notification
env:
NYM_NOTIFICATION_KIND: ts-packages
NYM_PROJECT_NAME: "ts-packages"
NYM_CI_WWW_BASE: "${{ secrets.NYM_CI_WWW_BASE }}"
NYM_CI_WWW_LOCATION: "ts-${{ env.GITHUB_REF_SLUG }}"
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
GIT_BRANCH: "${GITHUB_REF##*/}"
IS_SUCCESS: "${{ job.status == 'success' }}"
MATRIX_SERVER: "${{ secrets.MATRIX_SERVER }}"
MATRIX_ROOM: "${{ secrets.MATRIX_ROOM }}"
MATRIX_USER_ID: "${{ secrets.MATRIX_USER_ID }}"
MATRIX_TOKEN: "${{ secrets.MATRIX_TOKEN }}"
MATRIX_DEVICE_ID: "${{ secrets.MATRIX_DEVICE_ID }}"
uses: docker://keybaseio/client:stable-node
with:
args: .github/workflows/support-files/notifications/entry_point.sh
+1 -1
View File
@@ -4,7 +4,7 @@ on:
jobs:
publish:
runs-on: ubuntu-20.04-16-core
runs-on: arc-ubuntu-20.04
steps:
- uses: actions/checkout@v4
+76
View File
@@ -4,6 +4,82 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2024.11-wedel] (2024-09-23)
- Backport #4894 to fix ci ([#4899])
- Bugfix/ticketbook false double spending ([#4892])
- fix: allow updating globally stored signatures ([#4891])
- [DOCs/operators]: Document changelog for patch/2024.10-caramello ([#4886])
- [DOCs/operators]: Post release docs updates ([#4874])
- Bump defguard to github latest version ([#4872])
- chore: removed completed queued mixnet migration ([#4865])
- Disable push trigger and add missing paths in ci-build ([#4864])
- Fix linux conditional in ci-build.yml ([#4863])
- Remove golang workaround in ci-sdk-wasm ([#4858])
- Revert runner for ci-docs ([#4855])
- Move credential verification into common crate ([#4853])
- Fix test failure in ipr request size ([#4844])
- Start switching over jobs to arc-ubuntu-20.04 ([#4843])
- Use ecash credential type for bandwidth value ([#4840])
- Create nym-repo-setup debian package and nym-vpn meta package ([#4837])
- Remove serde_crate named import ([#4832])
- Run cargo autoinherit following last weeks dependabot updates ([#4831])
- revamped ticketbook serialisation and exposed additional cli methods ([#4827])
- Expose wireguard details on self described endpoint ([#4825])
- Remove unused wireguard flag from SDK ([#4823])
- Add `axum` server to `nym-api` ([#4803])
- Run cargo-autoinherit for a few new crates ([#4801])
- Update dependabot ([#4796])
- Fix clippy for unwrap_or_default ([#4783])
- Enable dependabot version upgrades for root rust workspace ([#4778])
- Persist used wireguard private IPs ([#4771])
- Avoid race on ip and registration structures ([#4766])
- docs/hotfix ([#4765])
- chore: remove repetitive words ([#4763])
- Make gateway latency check generic ([#4759])
- Remove duplicate stat count for retransmissions ([#4756])
- Update peer refresh value ([#4754])
- Remove deprecated mark_as_success and use new disarm ([#4751])
- Add get_mixnodes_described to validator_client ([#4725])
- New Network Monitor ([#4610])
[#4899]: https://github.com/nymtech/nym/pull/4899
[#4892]: https://github.com/nymtech/nym/pull/4892
[#4891]: https://github.com/nymtech/nym/pull/4891
[#4886]: https://github.com/nymtech/nym/pull/4886
[#4874]: https://github.com/nymtech/nym/pull/4874
[#4872]: https://github.com/nymtech/nym/pull/4872
[#4865]: https://github.com/nymtech/nym/pull/4865
[#4864]: https://github.com/nymtech/nym/pull/4864
[#4863]: https://github.com/nymtech/nym/pull/4863
[#4858]: https://github.com/nymtech/nym/pull/4858
[#4855]: https://github.com/nymtech/nym/pull/4855
[#4853]: https://github.com/nymtech/nym/pull/4853
[#4844]: https://github.com/nymtech/nym/pull/4844
[#4843]: https://github.com/nymtech/nym/pull/4843
[#4840]: https://github.com/nymtech/nym/pull/4840
[#4837]: https://github.com/nymtech/nym/pull/4837
[#4832]: https://github.com/nymtech/nym/pull/4832
[#4831]: https://github.com/nymtech/nym/pull/4831
[#4827]: https://github.com/nymtech/nym/pull/4827
[#4825]: https://github.com/nymtech/nym/pull/4825
[#4823]: https://github.com/nymtech/nym/pull/4823
[#4803]: https://github.com/nymtech/nym/pull/4803
[#4801]: https://github.com/nymtech/nym/pull/4801
[#4796]: https://github.com/nymtech/nym/pull/4796
[#4783]: https://github.com/nymtech/nym/pull/4783
[#4778]: https://github.com/nymtech/nym/pull/4778
[#4771]: https://github.com/nymtech/nym/pull/4771
[#4766]: https://github.com/nymtech/nym/pull/4766
[#4765]: https://github.com/nymtech/nym/pull/4765
[#4763]: https://github.com/nymtech/nym/pull/4763
[#4759]: https://github.com/nymtech/nym/pull/4759
[#4756]: https://github.com/nymtech/nym/pull/4756
[#4754]: https://github.com/nymtech/nym/pull/4754
[#4751]: https://github.com/nymtech/nym/pull/4751
[#4725]: https://github.com/nymtech/nym/pull/4725
[#4610]: https://github.com/nymtech/nym/pull/4610
## [2024.10-caramello] (2024-09-10)
- Backport 4844 and 4845 ([#4857])
Generated
+628 -115
View File
File diff suppressed because it is too large Load Diff
+21 -9
View File
@@ -81,6 +81,7 @@ members = [
"common/nyxd-scraper",
"common/pemstore",
"common/serde-helpers",
"common/service-provider-requests-common",
"common/socks5-client-core",
"common/socks5/proxy-helpers",
"common/socks5/requests",
@@ -102,6 +103,9 @@ members = [
"mixnode",
"sdk/lib/socks5-listener",
"sdk/rust/nym-sdk",
"sdk/ffi/shared",
"sdk/ffi/go",
"sdk/ffi/cpp",
"service-providers/authenticator",
"service-providers/common",
"service-providers/ip-packet-router",
@@ -110,11 +114,13 @@ members = [
"nym-api",
"nym-browser-extension/storage",
"nym-api/nym-api-requests",
"nym-data-observatory",
"nym-node",
"nym-node/nym-node-http-api",
"nym-node/nym-node-requests",
"nym-outfox",
"nym-validator-rewarder",
"tools/echo-server",
"tools/internal/ssl-inject",
# "tools/internal/sdk-version-bump",
"tools/internal/testnet-manager",
@@ -129,6 +135,11 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
]
default-members = [
@@ -138,6 +149,7 @@ default-members = [
"gateway",
"mixnode",
"nym-api",
"nym-data-observatory",
"nym-node",
"nym-validator-rewarder",
"service-providers/authenticator",
@@ -152,7 +164,6 @@ exclude = [
"nym-wallet",
"nym-vpn/ui/src-tauri",
"cpu-cycles",
"sdk/ffi/cpp",
]
[workspace.package]
@@ -173,9 +184,9 @@ aes-gcm-siv = "0.11.1"
aead = "0.5.2"
anyhow = "1.0.89"
argon2 = "0.5.0"
async-trait = "0.1.82"
async-trait = "0.1.83"
axum = "0.7.5"
axum-extra = "0.9.3"
axum-extra = "0.9.4"
base64 = "0.22.1"
bincode = "1.3.3"
bip39 = { version = "2.0.0", features = ["zeroize"] }
@@ -188,7 +199,7 @@ blake3 = "1.5.4"
bloomfilter = "1.0.14"
bs58 = "0.5.1"
bytecodec = "0.4.15"
bytes = "1.7.1"
bytes = "1.7.2"
cargo_metadata = "0.18.1"
celes = "2.4.0"
cfg-if = "1.0.0"
@@ -196,7 +207,7 @@ chacha20 = "0.9.0"
chacha20poly1305 = "0.10.1"
chrono = "0.4.31"
cipher = "0.4.3"
clap = "4.5.17"
clap = "4.5.18"
clap_complete = "4.5"
clap_complete_fig = "4.5"
colored = "2.0"
@@ -212,7 +223,8 @@ ctr = "0.9.1"
cupid = "0.6.1"
curve25519-dalek = "4.1"
dashmap = "5.5.3"
defguard_wireguard_rs = "0.4.2"
# We want https://github.com/DefGuard/wireguard-rs/pull/64 , but there's no crates.io release being pushed out anymore
defguard_wireguard_rs = { git = "https://github.com/DefGuard/wireguard-rs.git", rev = "v0.4.7" }
digest = "0.10.7"
dirs = "5.0"
doc-comment = "0.3"
@@ -222,7 +234,7 @@ ed25519-dalek = "2.1"
etherparse = "0.13.0"
eyre = "0.6.9"
fastrand = "2.1.1"
flate2 = "1.0.33"
flate2 = "1.0.34"
futures = "0.3.28"
generic-array = "0.14.7"
getrandom = "0.2.10"
@@ -297,9 +309,9 @@ subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.30.13"
tap = "1.0.1"
tar = "0.4.41"
tar = "0.4.42"
tempfile = "3.5.0"
thiserror = "1.0.63"
thiserror = "1.0.64"
time = "0.3.30"
tokio = "1.39"
tokio-stream = "0.1.16"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.40"
version = "1.1.41"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.40"
version = "1.1.41"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
+15
View File
@@ -9,9 +9,24 @@ edition.workspace = true
license.workspace = true
[dependencies]
base64 = { workspace = true }
bincode = { workspace = true }
rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
nym-credentials-interface = { path = "../credentials-interface" }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
nym-service-provider-requests-common = { path = "../service-provider-requests-common" }
nym-sphinx = { path = "../nymsphinx" }
nym-wireguard-types = { path = "../wireguard-types" }
## verify:
hmac = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
x25519-dalek = { workspace = true, features = ["static_secrets"] }
[features]
default = ["verify"]
# this is moved to a separate feature as we really need clients to import it (especially, *cough*, wasm)
verify = ["hmac", "sha2"]
@@ -0,0 +1,25 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("the provided base64-encoded client MAC ('{mac}') was malformed: {source}")]
MalformedClientMac {
mac: String,
#[source]
source: base64::DecodeError,
},
#[cfg(feature = "verify")]
#[error("failed to verify mac provided by '{client}': {source}")]
FailedClientMacVerification {
client: String,
#[source]
source: hmac::digest::MacError,
},
#[error("conversion: {0}")]
Conversion(String),
}
+8 -1
View File
@@ -2,8 +2,15 @@
// SPDX-License-Identifier: Apache-2.0
pub mod v1;
pub mod v2;
pub mod v3;
pub const CURRENT_VERSION: u8 = 1;
mod error;
pub use error::Error;
pub use v3 as latest;
pub const CURRENT_VERSION: u8 = 3;
fn make_bincode_serializer() -> impl bincode::Options {
use bincode::Options;
+7 -1
View File
@@ -1,7 +1,13 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod registration;
pub mod request;
pub mod response;
const VERSION: u8 = 1;
pub use registration::{ClientMac, GatewayClient, InitMessage, Nonce};
#[cfg(feature = "verify")]
pub use registration::HmacSha256;
pub const VERSION: u8 = 1;
@@ -0,0 +1,218 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{engine::general_purpose, Engine};
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
use std::time::SystemTime;
use std::{fmt, ops::Deref, str::FromStr};
#[cfg(feature = "verify")]
use hmac::{Hmac, Mac};
#[cfg(feature = "verify")]
use nym_crypto::asymmetric::encryption::PrivateKey;
#[cfg(feature = "verify")]
use sha2::Sha256;
pub type PendingRegistrations = HashMap<PeerPublicKey, RegistrationData>;
pub type PrivateIPs = HashMap<IpAddr, Taken>;
#[cfg(feature = "verify")]
pub type HmacSha256 = Hmac<Sha256>;
pub type Nonce = u64;
pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: i64 = 1024 * 1024 * 1024; // 1 GB
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct InitMessage {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
}
impl InitMessage {
pub fn new(pub_key: PeerPublicKey) -> Self {
InitMessage { pub_key }
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistrationData {
pub nonce: u64,
pub gateway_data: GatewayClient,
pub wg_port: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistredData {
pub pub_key: PeerPublicKey,
pub private_ip: IpAddr,
pub wg_port: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RemainingBandwidthData {
pub available_bandwidth: u64,
pub suspended: bool,
}
/// Client that wants to register sends its PublicKey bytes mac digest encrypted with a DH shared secret.
/// Gateway/Nym node can then verify pub_key payload using the same process
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GatewayClient {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
/// Assigned private IP
pub private_ip: IpAddr,
/// Sha256 hmac on the data (alongside the prior nonce)
pub mac: ClientMac,
}
impl GatewayClient {
#[cfg(feature = "verify")]
pub fn new(
local_secret: &PrivateKey,
remote_public: x25519_dalek::PublicKey,
private_ip: IpAddr,
nonce: u64,
) -> Self {
// convert from 1.0 x25519-dalek private key into 2.0 x25519-dalek
#[allow(clippy::expect_used)]
let static_secret = x25519_dalek::StaticSecret::from(local_secret.to_bytes());
let local_public: x25519_dalek::PublicKey = (&static_secret).into();
let dh = static_secret.diffie_hellman(&remote_public);
// TODO: change that to use our nym_crypto::hmac module instead
#[allow(clippy::expect_used)]
let mut mac = HmacSha256::new_from_slice(dh.as_bytes())
.expect("x25519 shared secret is always 32 bytes long");
mac.update(local_public.as_bytes());
mac.update(private_ip.to_string().as_bytes());
mac.update(&nonce.to_le_bytes());
GatewayClient {
pub_key: PeerPublicKey::new(local_public),
private_ip,
mac: ClientMac(mac.finalize().into_bytes().to_vec()),
}
}
// Reusable secret should be gateways Wireguard PK
// Client should perform this step when generating its payload, using its own WG PK
#[cfg(feature = "verify")]
pub fn verify(&self, gateway_key: &PrivateKey, nonce: u64) -> Result<(), Error> {
// convert from 1.0 x25519-dalek private key into 2.0 x25519-dalek
#[allow(clippy::expect_used)]
let static_secret = x25519_dalek::StaticSecret::from(gateway_key.to_bytes());
let dh = static_secret.diffie_hellman(&self.pub_key);
// TODO: change that to use our nym_crypto::hmac module instead
#[allow(clippy::expect_used)]
let mut mac = HmacSha256::new_from_slice(dh.as_bytes())
.expect("x25519 shared secret is always 32 bytes long");
mac.update(self.pub_key.as_bytes());
mac.update(self.private_ip.to_string().as_bytes());
mac.update(&nonce.to_le_bytes());
mac.verify_slice(&self.mac)
.map_err(|source| Error::FailedClientMacVerification {
client: self.pub_key.to_string(),
source,
})
}
pub fn pub_key(&self) -> PeerPublicKey {
self.pub_key
}
}
// TODO: change the inner type into generic array of size HmacSha256::OutputSize
// TODO2: rely on our internal crypto/hmac
#[derive(Debug, Clone)]
pub struct ClientMac(Vec<u8>);
impl fmt::Display for ClientMac {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", general_purpose::STANDARD.encode(&self.0))
}
}
impl ClientMac {
#[allow(dead_code)]
pub fn new(mac: Vec<u8>) -> Self {
ClientMac(mac)
}
}
impl Deref for ClientMac {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl FromStr for ClientMac {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mac_bytes: Vec<u8> =
general_purpose::STANDARD
.decode(s)
.map_err(|source| Error::MalformedClientMac {
mac: s.to_string(),
source,
})?;
Ok(ClientMac(mac_bytes))
}
}
impl Serialize for ClientMac {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let encoded_key = general_purpose::STANDARD.encode(self.0.clone());
serializer.serialize_str(&encoded_key)
}
}
impl<'de> Deserialize<'de> for ClientMac {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let encoded_key = String::deserialize(deserializer)?;
ClientMac::from_str(&encoded_key).map_err(serde::de::Error::custom)
}
}
#[cfg(test)]
mod tests {
use super::*;
use nym_crypto::asymmetric::encryption;
#[test]
#[cfg(feature = "verify")]
fn client_request_roundtrip() {
let mut rng = rand::thread_rng();
let gateway_key_pair = encryption::KeyPair::new(&mut rng);
let client_key_pair = encryption::KeyPair::new(&mut rng);
let nonce = 1234567890;
let client = GatewayClient::new(
client_key_pair.private_key(),
x25519_dalek::PublicKey::from(gateway_key_pair.public_key().to_bytes()),
"10.0.0.42".parse().unwrap(),
nonce,
);
assert!(client.verify(gateway_key_pair.private_key(), nonce).is_ok())
}
}
@@ -1,8 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{GatewayClient, InitMessage};
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::{GatewayClient, InitMessage, PeerPublicKey};
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
@@ -82,3 +83,24 @@ pub enum AuthenticatorRequestData {
Final(GatewayClient),
QueryBandwidth(PeerPublicKey),
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn check_first_byte_version() {
let version = 2;
let data = AuthenticatorRequest {
version,
data: AuthenticatorRequestData::Initial(InitMessage::new(
PeerPublicKey::from_str("yvNUDpT5l7W/xDhiu6HkqTHDQwbs/B3J5UrLmORl1EQ=").unwrap(),
)),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
request_id: 1,
};
let bytes = data.to_bytes().unwrap();
assert_eq!(*bytes.first().unwrap(), version);
}
}
@@ -1,8 +1,8 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{RegistrationData, RegistredData, RemainingBandwidthData};
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::registration::{RegistrationData, RegistredData, RemainingBandwidthData};
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
@@ -0,0 +1,174 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use crate::{v1, v2};
impl From<v1::request::AuthenticatorRequest> for v2::request::AuthenticatorRequest {
fn from(authenticator_request: v1::request::AuthenticatorRequest) -> Self {
Self {
protocol: Protocol {
version: 2,
service_provider_type: ServiceProviderType::Authenticator,
},
data: authenticator_request.data.into(),
reply_to: authenticator_request.reply_to,
request_id: authenticator_request.request_id,
}
}
}
impl From<v1::request::AuthenticatorRequestData> for v2::request::AuthenticatorRequestData {
fn from(authenticator_request_data: v1::request::AuthenticatorRequestData) -> Self {
match authenticator_request_data {
v1::request::AuthenticatorRequestData::Initial(init_msg) => {
v2::request::AuthenticatorRequestData::Initial(init_msg.into())
}
v1::request::AuthenticatorRequestData::Final(gw_client) => {
v2::request::AuthenticatorRequestData::Final(gw_client.into())
}
v1::request::AuthenticatorRequestData::QueryBandwidth(pub_key) => {
v2::request::AuthenticatorRequestData::QueryBandwidth(pub_key)
}
}
}
}
impl From<v1::registration::InitMessage> for v2::registration::InitMessage {
fn from(init_msg: v1::registration::InitMessage) -> Self {
Self {
pub_key: init_msg.pub_key,
}
}
}
impl From<v1::registration::GatewayClient> for Box<v2::registration::FinalMessage> {
fn from(gw_client: v1::registration::GatewayClient) -> Self {
Box::new(v2::registration::FinalMessage {
gateway_client: gw_client.into(),
credential: None,
})
}
}
impl From<v1::registration::GatewayClient> for v2::registration::GatewayClient {
fn from(gw_client: v1::registration::GatewayClient) -> Self {
Self {
pub_key: gw_client.pub_key,
private_ip: gw_client.private_ip,
mac: gw_client.mac.into(),
}
}
}
impl From<v2::registration::GatewayClient> for v1::registration::GatewayClient {
fn from(gw_client: v2::registration::GatewayClient) -> Self {
Self {
pub_key: gw_client.pub_key,
private_ip: gw_client.private_ip,
mac: gw_client.mac.into(),
}
}
}
impl From<v1::registration::ClientMac> for v2::registration::ClientMac {
fn from(mac: v1::registration::ClientMac) -> Self {
Self::new(mac.to_vec())
}
}
impl From<v2::registration::ClientMac> for v1::registration::ClientMac {
fn from(mac: v2::registration::ClientMac) -> Self {
Self::new(mac.to_vec())
}
}
impl From<v2::response::AuthenticatorResponse> for v1::response::AuthenticatorResponse {
fn from(authenticator_response: v2::response::AuthenticatorResponse) -> Self {
Self {
version: authenticator_response.protocol.version,
data: authenticator_response.data.into(),
reply_to: authenticator_response.reply_to,
}
}
}
impl From<v2::response::AuthenticatorResponseData> for v1::response::AuthenticatorResponseData {
fn from(authenticator_response_data: v2::response::AuthenticatorResponseData) -> Self {
match authenticator_response_data {
v2::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => v1::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response.into(),
),
v2::response::AuthenticatorResponseData::Registered(registered_response) => {
v1::response::AuthenticatorResponseData::Registered(registered_response.into())
}
v2::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => v1::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response.into(),
),
}
}
}
impl From<v2::response::PendingRegistrationResponse> for v1::response::PendingRegistrationResponse {
fn from(value: v2::response::PendingRegistrationResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.into(),
}
}
}
impl From<v2::response::RegisteredResponse> for v1::response::RegisteredResponse {
fn from(value: v2::response::RegisteredResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.into(),
}
}
}
impl From<v2::response::RemainingBandwidthResponse> for v1::response::RemainingBandwidthResponse {
fn from(value: v2::response::RemainingBandwidthResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.map(Into::into),
}
}
}
impl From<v2::registration::RegistrationData> for v1::registration::RegistrationData {
fn from(value: v2::registration::RegistrationData) -> Self {
Self {
nonce: value.nonce,
gateway_data: value.gateway_data.into(),
wg_port: value.wg_port,
}
}
}
impl From<v2::registration::RegistredData> for v1::registration::RegistredData {
fn from(value: v2::registration::RegistredData) -> Self {
Self {
pub_key: value.pub_key,
private_ip: value.private_ip,
wg_port: value.wg_port,
}
}
}
impl From<v2::registration::RemainingBandwidthData> for v1::registration::RemainingBandwidthData {
fn from(value: v2::registration::RemainingBandwidthData) -> Self {
Self {
available_bandwidth: value.available_bandwidth as u64,
suspended: false,
}
}
}
@@ -0,0 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod conversion;
pub mod registration;
pub mod request;
pub mod response;
pub const VERSION: u8 = 2;
@@ -1,9 +1,10 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use crate::PeerPublicKey;
use base64::{engine::general_purpose, Engine};
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
@@ -29,32 +30,26 @@ pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: u64 = 1024 * 1024 * 1024; // 1 GB
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type", rename_all = "camelCase")]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub enum ClientMessage {
Initial(InitMessage),
Final(GatewayClient),
Query(PeerPublicKey),
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct InitMessage {
/// Base64 encoded x25519 public key
#[cfg_attr(feature = "openapi", schema(value_type = String, format = Byte))]
pub pub_key: PeerPublicKey,
}
impl InitMessage {
pub fn pub_key(&self) -> PeerPublicKey {
self.pub_key
}
pub fn new(pub_key: PeerPublicKey) -> Self {
InitMessage { pub_key }
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FinalMessage {
/// Gateway client data
pub gateway_client: GatewayClient,
/// Ecash credential
pub credential: Option<CredentialSpendingData>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistrationData {
pub nonce: u64,
@@ -71,24 +66,20 @@ pub struct RegistredData {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RemainingBandwidthData {
pub available_bandwidth: u64,
pub suspended: bool,
pub available_bandwidth: i64,
}
/// Client that wants to register sends its PublicKey bytes mac digest encrypted with a DH shared secret.
/// Gateway/Nym node can then verify pub_key payload using the same process
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct GatewayClient {
/// Base64 encoded x25519 public key
#[cfg_attr(feature = "openapi", schema(value_type = String, format = Byte))]
pub pub_key: PeerPublicKey,
/// Assigned private IP
pub private_ip: IpAddr,
/// Sha256 hmac on the data (alongside the prior nonce)
#[cfg_attr(feature = "openapi", schema(value_type = String, format = Byte))]
pub mac: ClientMac,
}
@@ -0,0 +1,116 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{FinalMessage, InitMessage};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
use super::VERSION;
fn generate_random() -> u64 {
use rand::RngCore;
let mut rng = rand::rngs::OsRng;
rng.next_u64()
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AuthenticatorRequest {
pub protocol: Protocol,
pub data: AuthenticatorRequestData,
pub reply_to: Recipient,
pub request_id: u64,
}
impl AuthenticatorRequest {
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
pub fn new_initial_request(init_message: InitMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::Initial(init_message),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_final_request(final_message: FinalMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::Final(Box::new(final_message)),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_query_request(peer_public_key: PeerPublicKey, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::QueryBandwidth(peer_public_key),
reply_to,
request_id,
},
request_id,
)
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AuthenticatorRequestData {
Initial(InitMessage),
Final(Box<FinalMessage>),
QueryBandwidth(PeerPublicKey),
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn check_first_bytes_protocol() {
let version = 2;
let data = AuthenticatorRequest {
protocol: Protocol { version, service_provider_type: ServiceProviderType::Authenticator },
data: AuthenticatorRequestData::Initial(InitMessage::new(
PeerPublicKey::from_str("yvNUDpT5l7W/xDhiu6HkqTHDQwbs/B3J5UrLmORl1EQ=").unwrap(),
)),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
request_id: 1,
};
let bytes = *data.to_bytes().unwrap().first_chunk::<2>().unwrap();
assert_eq!(bytes, [version, ServiceProviderType::Authenticator as u8]);
}
}
@@ -0,0 +1,129 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{RegistrationData, RegistredData, RemainingBandwidthData};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
use super::VERSION;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AuthenticatorResponse {
pub protocol: Protocol,
pub data: AuthenticatorResponseData,
pub reply_to: Recipient,
}
impl AuthenticatorResponse {
pub fn new_pending_registration_success(
registration_data: RegistrationData,
request_id: u64,
reply_to: Recipient,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::PendingRegistration(PendingRegistrationResponse {
reply: registration_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_registered(
registred_data: RegistredData,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::Registered(RegisteredResponse {
reply: registred_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_remaining_bandwidth(
remaining_bandwidth_data: Option<RemainingBandwidthData>,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::RemainingBandwidth(RemainingBandwidthResponse {
reply: remaining_bandwidth_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn recipient(&self) -> Recipient {
self.reply_to
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
pub fn id(&self) -> Option<u64> {
match &self.data {
AuthenticatorResponseData::PendingRegistration(response) => Some(response.request_id),
AuthenticatorResponseData::Registered(response) => Some(response.request_id),
AuthenticatorResponseData::RemainingBandwidth(response) => Some(response.request_id),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AuthenticatorResponseData {
PendingRegistration(PendingRegistrationResponse),
Registered(RegisteredResponse),
RemainingBandwidth(RemainingBandwidthResponse),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PendingRegistrationResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RegistrationData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RegisteredResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RegistredData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RemainingBandwidthResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: Option<RemainingBandwidthData>,
}
@@ -0,0 +1,188 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use crate::{v2, v3};
impl From<v2::request::AuthenticatorRequest> for v3::request::AuthenticatorRequest {
fn from(authenticator_request: v2::request::AuthenticatorRequest) -> Self {
Self {
protocol: Protocol {
version: 2,
service_provider_type: ServiceProviderType::Authenticator,
},
data: authenticator_request.data.into(),
reply_to: authenticator_request.reply_to,
request_id: authenticator_request.request_id,
}
}
}
impl From<v2::request::AuthenticatorRequestData> for v3::request::AuthenticatorRequestData {
fn from(authenticator_request_data: v2::request::AuthenticatorRequestData) -> Self {
match authenticator_request_data {
v2::request::AuthenticatorRequestData::Initial(init_msg) => {
v3::request::AuthenticatorRequestData::Initial(init_msg.into())
}
v2::request::AuthenticatorRequestData::Final(gw_client) => {
v3::request::AuthenticatorRequestData::Final(gw_client.into())
}
v2::request::AuthenticatorRequestData::QueryBandwidth(pub_key) => {
v3::request::AuthenticatorRequestData::QueryBandwidth(pub_key)
}
}
}
}
impl From<v2::registration::InitMessage> for v3::registration::InitMessage {
fn from(init_msg: v2::registration::InitMessage) -> Self {
Self {
pub_key: init_msg.pub_key,
}
}
}
impl From<Box<v2::registration::FinalMessage>> for Box<v3::registration::FinalMessage> {
fn from(gw_client: Box<v2::registration::FinalMessage>) -> Self {
Box::new(v3::registration::FinalMessage {
gateway_client: gw_client.gateway_client.into(),
credential: gw_client.credential,
})
}
}
impl From<v2::registration::GatewayClient> for v3::registration::GatewayClient {
fn from(gw_client: v2::registration::GatewayClient) -> Self {
Self {
pub_key: gw_client.pub_key,
private_ip: gw_client.private_ip,
mac: gw_client.mac.into(),
}
}
}
impl From<v3::registration::GatewayClient> for v2::registration::GatewayClient {
fn from(gw_client: v3::registration::GatewayClient) -> Self {
Self {
pub_key: gw_client.pub_key,
private_ip: gw_client.private_ip,
mac: gw_client.mac.into(),
}
}
}
impl From<v2::registration::ClientMac> for v3::registration::ClientMac {
fn from(mac: v2::registration::ClientMac) -> Self {
Self::new(mac.to_vec())
}
}
impl From<v3::registration::ClientMac> for v2::registration::ClientMac {
fn from(mac: v3::registration::ClientMac) -> Self {
Self::new(mac.to_vec())
}
}
impl TryFrom<v3::response::AuthenticatorResponse> for v2::response::AuthenticatorResponse {
type Error = crate::Error;
fn try_from(
authenticator_response: v3::response::AuthenticatorResponse,
) -> Result<Self, Self::Error> {
Ok(Self {
data: authenticator_response.data.try_into()?,
reply_to: authenticator_response.reply_to,
protocol: authenticator_response.protocol,
})
}
}
impl TryFrom<v3::response::AuthenticatorResponseData> for v2::response::AuthenticatorResponseData {
type Error = crate::Error;
fn try_from(
authenticator_response_data: v3::response::AuthenticatorResponseData,
) -> Result<Self, Self::Error> {
match authenticator_response_data {
v3::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Ok(
v2::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response.into(),
),
),
v3::response::AuthenticatorResponseData::Registered(registered_response) => Ok(
v2::response::AuthenticatorResponseData::Registered(registered_response.into()),
),
v3::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Ok(v2::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response.into(),
)),
v3::response::AuthenticatorResponseData::TopUpBandwidth(_) => {
Err(Self::Error::Conversion(
"a v2 request couldn't produce a v3 only type of response".to_string(),
))
}
}
}
}
impl From<v3::response::PendingRegistrationResponse> for v2::response::PendingRegistrationResponse {
fn from(value: v3::response::PendingRegistrationResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.into(),
}
}
}
impl From<v3::response::RegisteredResponse> for v2::response::RegisteredResponse {
fn from(value: v3::response::RegisteredResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.into(),
}
}
}
impl From<v3::response::RemainingBandwidthResponse> for v2::response::RemainingBandwidthResponse {
fn from(value: v3::response::RemainingBandwidthResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.map(Into::into),
}
}
}
impl From<v3::registration::RegistrationData> for v2::registration::RegistrationData {
fn from(value: v3::registration::RegistrationData) -> Self {
Self {
nonce: value.nonce,
gateway_data: value.gateway_data.into(),
wg_port: value.wg_port,
}
}
}
impl From<v3::registration::RegistredData> for v2::registration::RegistredData {
fn from(value: v3::registration::RegistredData) -> Self {
Self {
pub_key: value.pub_key,
private_ip: value.private_ip,
wg_port: value.wg_port,
}
}
}
impl From<v3::registration::RemainingBandwidthData> for v2::registration::RemainingBandwidthData {
fn from(value: v3::registration::RemainingBandwidthData) -> Self {
Self {
available_bandwidth: value.available_bandwidth,
}
}
}
@@ -0,0 +1,10 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod conversion;
pub mod registration;
pub mod request;
pub mod response;
pub mod topup;
pub const VERSION: u8 = 3;
@@ -0,0 +1,227 @@
// -2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{engine::general_purpose, Engine};
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
use std::time::SystemTime;
use std::{fmt, ops::Deref, str::FromStr};
#[cfg(feature = "verify")]
use hmac::{Hmac, Mac};
#[cfg(feature = "verify")]
use nym_crypto::asymmetric::encryption::PrivateKey;
#[cfg(feature = "verify")]
use sha2::Sha256;
pub type PendingRegistrations = HashMap<PeerPublicKey, RegistrationData>;
pub type PrivateIPs = HashMap<IpAddr, Taken>;
#[cfg(feature = "verify")]
pub type HmacSha256 = Hmac<Sha256>;
pub type Nonce = u64;
pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: u64 = 1024 * 1024 * 1024; // 1 GB
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct InitMessage {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
}
impl InitMessage {
pub fn new(pub_key: PeerPublicKey) -> Self {
InitMessage { pub_key }
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FinalMessage {
/// Gateway client data
pub gateway_client: GatewayClient,
/// Ecash credential
pub credential: Option<CredentialSpendingData>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistrationData {
pub nonce: u64,
pub gateway_data: GatewayClient,
pub wg_port: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistredData {
pub pub_key: PeerPublicKey,
pub private_ip: IpAddr,
pub wg_port: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RemainingBandwidthData {
pub available_bandwidth: i64,
}
/// Client that wants to register sends its PublicKey bytes mac digest encrypted with a DH shared secret.
/// Gateway/Nym node can then verify pub_key payload using the same process
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GatewayClient {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
/// Assigned private IP
pub private_ip: IpAddr,
/// Sha256 hmac on the data (alongside the prior nonce)
pub mac: ClientMac,
}
impl GatewayClient {
#[cfg(feature = "verify")]
pub fn new(
local_secret: &PrivateKey,
remote_public: x25519_dalek::PublicKey,
private_ip: IpAddr,
nonce: u64,
) -> Self {
// convert from 1.0 x25519-dalek private key into 2.0 x25519-dalek
#[allow(clippy::expect_used)]
let static_secret = x25519_dalek::StaticSecret::from(local_secret.to_bytes());
let local_public: x25519_dalek::PublicKey = (&static_secret).into();
let dh = static_secret.diffie_hellman(&remote_public);
// TODO: change that to use our nym_crypto::hmac module instead
#[allow(clippy::expect_used)]
let mut mac = HmacSha256::new_from_slice(dh.as_bytes())
.expect("x25519 shared secret is always 32 bytes long");
mac.update(local_public.as_bytes());
mac.update(private_ip.to_string().as_bytes());
mac.update(&nonce.to_le_bytes());
GatewayClient {
pub_key: PeerPublicKey::new(local_public),
private_ip,
mac: ClientMac(mac.finalize().into_bytes().to_vec()),
}
}
// Reusable secret should be gateways Wireguard PK
// Client should perform this step when generating its payload, using its own WG PK
#[cfg(feature = "verify")]
pub fn verify(&self, gateway_key: &PrivateKey, nonce: u64) -> Result<(), Error> {
// convert from 1.0 x25519-dalek private key into 2.0 x25519-dalek
#[allow(clippy::expect_used)]
let static_secret = x25519_dalek::StaticSecret::from(gateway_key.to_bytes());
let dh = static_secret.diffie_hellman(&self.pub_key);
// TODO: change that to use our nym_crypto::hmac module instead
#[allow(clippy::expect_used)]
let mut mac = HmacSha256::new_from_slice(dh.as_bytes())
.expect("x25519 shared secret is always 32 bytes long");
mac.update(self.pub_key.as_bytes());
mac.update(self.private_ip.to_string().as_bytes());
mac.update(&nonce.to_le_bytes());
mac.verify_slice(&self.mac)
.map_err(|source| Error::FailedClientMacVerification {
client: self.pub_key.to_string(),
source,
})
}
pub fn pub_key(&self) -> PeerPublicKey {
self.pub_key
}
}
// TODO: change the inner type into generic array of size HmacSha256::OutputSize
// TODO2: rely on our internal crypto/hmac
#[derive(Debug, Clone)]
pub struct ClientMac(Vec<u8>);
impl fmt::Display for ClientMac {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", general_purpose::STANDARD.encode(&self.0))
}
}
impl ClientMac {
#[allow(dead_code)]
pub fn new(mac: Vec<u8>) -> Self {
ClientMac(mac)
}
}
impl Deref for ClientMac {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl FromStr for ClientMac {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mac_bytes: Vec<u8> =
general_purpose::STANDARD
.decode(s)
.map_err(|source| Error::MalformedClientMac {
mac: s.to_string(),
source,
})?;
Ok(ClientMac(mac_bytes))
}
}
impl Serialize for ClientMac {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let encoded_key = general_purpose::STANDARD.encode(self.0.clone());
serializer.serialize_str(&encoded_key)
}
}
impl<'de> Deserialize<'de> for ClientMac {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let encoded_key = String::deserialize(deserializer)?;
ClientMac::from_str(&encoded_key).map_err(serde::de::Error::custom)
}
}
#[cfg(test)]
mod tests {
use super::*;
use nym_crypto::asymmetric::encryption;
#[test]
#[cfg(feature = "verify")]
fn client_request_roundtrip() {
let mut rng = rand::thread_rng();
let gateway_key_pair = encryption::KeyPair::new(&mut rng);
let client_key_pair = encryption::KeyPair::new(&mut rng);
let nonce = 1234567890;
let client = GatewayClient::new(
client_key_pair.private_key(),
x25519_dalek::PublicKey::from(gateway_key_pair.public_key().to_bytes()),
"10.0.0.42".parse().unwrap(),
nonce,
);
assert!(client.verify(gateway_key_pair.private_key(), nonce).is_ok())
}
}
@@ -0,0 +1,136 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::{
registration::{FinalMessage, InitMessage},
topup::TopUpMessage,
};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
use super::VERSION;
fn generate_random() -> u64 {
use rand::RngCore;
let mut rng = rand::rngs::OsRng;
rng.next_u64()
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AuthenticatorRequest {
pub protocol: Protocol,
pub data: AuthenticatorRequestData,
pub reply_to: Recipient,
pub request_id: u64,
}
impl AuthenticatorRequest {
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
pub fn new_initial_request(init_message: InitMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::Initial(init_message),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_final_request(final_message: FinalMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::Final(Box::new(final_message)),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_query_request(peer_public_key: PeerPublicKey, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::QueryBandwidth(peer_public_key),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_topup_request(top_up_message: TopUpMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::TopUpBandwidth(Box::new(top_up_message)),
reply_to,
request_id,
},
request_id,
)
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AuthenticatorRequestData {
Initial(InitMessage),
Final(Box<FinalMessage>),
QueryBandwidth(PeerPublicKey),
TopUpBandwidth(Box<TopUpMessage>),
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn check_first_bytes_protocol() {
let version = 2;
let data = AuthenticatorRequest {
protocol: Protocol { version, service_provider_type: ServiceProviderType::Authenticator },
data: AuthenticatorRequestData::Initial(InitMessage::new(
PeerPublicKey::from_str("yvNUDpT5l7W/xDhiu6HkqTHDQwbs/B3J5UrLmORl1EQ=").unwrap(),
)),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
request_id: 1,
};
let bytes = *data.to_bytes().unwrap().first_chunk::<2>().unwrap();
assert_eq!(bytes, [version, ServiceProviderType::Authenticator as u8]);
}
}
@@ -0,0 +1,157 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{RegistrationData, RegistredData, RemainingBandwidthData};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
use super::VERSION;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AuthenticatorResponse {
pub protocol: Protocol,
pub data: AuthenticatorResponseData,
pub reply_to: Recipient,
}
impl AuthenticatorResponse {
pub fn new_pending_registration_success(
registration_data: RegistrationData,
request_id: u64,
reply_to: Recipient,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::PendingRegistration(PendingRegistrationResponse {
reply: registration_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_registered(
registred_data: RegistredData,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::Registered(RegisteredResponse {
reply: registred_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_remaining_bandwidth(
remaining_bandwidth_data: Option<RemainingBandwidthData>,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::RemainingBandwidth(RemainingBandwidthResponse {
reply: remaining_bandwidth_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_topup_bandwidth(
remaining_bandwidth_data: RemainingBandwidthData,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::TopUpBandwidth(TopUpBandwidthResponse {
reply: remaining_bandwidth_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn recipient(&self) -> Recipient {
self.reply_to
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
pub fn id(&self) -> Option<u64> {
match &self.data {
AuthenticatorResponseData::PendingRegistration(response) => Some(response.request_id),
AuthenticatorResponseData::Registered(response) => Some(response.request_id),
AuthenticatorResponseData::RemainingBandwidth(response) => Some(response.request_id),
AuthenticatorResponseData::TopUpBandwidth(response) => Some(response.request_id),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AuthenticatorResponseData {
PendingRegistration(PendingRegistrationResponse),
Registered(RegisteredResponse),
RemainingBandwidth(RemainingBandwidthResponse),
TopUpBandwidth(TopUpBandwidthResponse),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PendingRegistrationResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RegistrationData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RegisteredResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RegistredData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RemainingBandwidthResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: Option<RemainingBandwidthData>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TopUpBandwidthResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RemainingBandwidthData,
}
@@ -0,0 +1,15 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TopUpMessage {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
/// Ecash credential
pub credential: CredentialSpendingData,
}
+8 -4
View File
@@ -16,7 +16,7 @@ use nym_credential_storage::models::RetrievedTicketbook;
use nym_credential_storage::storage::Storage;
use nym_credentials::ecash::bandwidth::CredentialSpendingData;
use nym_credentials_interface::{
AnnotatedCoinIndexSignature, AnnotatedExpirationDateSignature, VerificationKeyAuth,
AnnotatedCoinIndexSignature, AnnotatedExpirationDateSignature, TicketType, VerificationKeyAuth,
};
use nym_ecash_time::Date;
use nym_validator_client::nym_api::EpochId;
@@ -64,9 +64,10 @@ impl<C, St: Storage> BandwidthController<C, St> {
BandwidthController { storage, client }
}
/// Tries to retrieve one of the stored, unused credentials that hasn't yet expired.
/// Tries to retrieve one of the stored, unused credentials for the given type that hasn't yet expired.
pub async fn get_next_usable_ticketbook(
&self,
ticketbook_type: TicketType,
tickets: u32,
) -> Result<RetrievedTicketbook, BandwidthControllerError>
where
@@ -74,7 +75,7 @@ impl<C, St: Storage> BandwidthController<C, St> {
{
let Some(ticketbook) = self
.storage
.get_next_unspent_usable_ticketbook(tickets)
.get_next_unspent_usable_ticketbook(ticketbook_type.to_string(), tickets)
.await
.map_err(BandwidthControllerError::credential_storage_error)?
else {
@@ -181,6 +182,7 @@ impl<C, St: Storage> BandwidthController<C, St> {
pub async fn prepare_ecash_ticket(
&self,
ticketbook_type: TicketType,
provider_pk: [u8; 32],
tickets_to_spend: u32,
) -> Result<PreparedCredential, BandwidthControllerError>
@@ -188,7 +190,9 @@ impl<C, St: Storage> BandwidthController<C, St> {
C: DkgQueryClient + Sync + Send,
<St as Storage>::StorageError: Send + Sync + 'static,
{
let retrieved_ticketbook = self.get_next_usable_ticketbook(tickets_to_spend).await?;
let retrieved_ticketbook = self
.get_next_usable_ticketbook(ticketbook_type, tickets_to_spend)
.await?;
let ticketbook_id = retrieved_ticketbook.ticketbook_id;
let epoch_id = retrieved_ticketbook.ticketbook.epoch_id();
@@ -24,6 +24,7 @@ zeroize.workspace = true
nym-bandwidth-controller = { path = "../../bandwidth-controller" }
nym-credentials = { path = "../../credentials" }
nym-credential-storage = { path = "../../credential-storage" }
nym-credentials-interface = { path = "../../credentials-interface" }
nym-crypto = { path = "../../crypto" }
nym-gateway-requests = { path = "../../gateway-requests" }
nym-network-defaults = { path = "../../network-defaults" }
@@ -2,21 +2,37 @@
// SPDX-License-Identifier: Apache-2.0
use si_scale::helpers::bibytes2;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Clone, Default)]
pub(crate) struct BandwidthClaimGuard {
inner: Arc<ClientBandwidthInner>,
}
impl Drop for BandwidthClaimGuard {
fn drop(&mut self) {
let old = self.inner.claiming_more.swap(false, Ordering::SeqCst);
assert!(
old,
"critical failure: there were multiple BandwidthClaimGuard existing"
)
}
}
#[derive(Clone)]
pub struct ClientBandwidth {
inner: Arc<ClientBandwidthInner>,
}
#[derive(Default)]
struct ClientBandwidthInner {
/// the actual bandwidth amount (in bytes) available
available: AtomicI64,
/// flag to indicate whether this client is currently in the process of claiming additional bandwidth
claiming_more: AtomicBool,
/// defines the timestamp when the bandwidth information has been logged to the logs stream
last_logged_ts: AtomicI64,
@@ -29,11 +45,28 @@ impl ClientBandwidth {
ClientBandwidth {
inner: Arc::new(ClientBandwidthInner {
available: AtomicI64::new(0),
claiming_more: AtomicBool::new(false),
last_logged_ts: AtomicI64::new(0),
last_updated_ts: AtomicI64::new(0),
}),
}
}
pub(crate) fn begin_bandwidth_claim(&self) -> Option<BandwidthClaimGuard> {
if self
.inner
.claiming_more
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
Some(BandwidthClaimGuard {
inner: self.inner.clone(),
})
} else {
None
}
}
pub(crate) fn remaining(&self) -> i64 {
self.inner.available.load(Ordering::Acquire)
}
@@ -16,6 +16,7 @@ use nym_bandwidth_controller::{BandwidthController, BandwidthStatusMessage};
use nym_credential_storage::ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_credentials::CredentialSpendingData;
use nym_credentials_interface::TicketType;
use nym_crypto::asymmetric::identity;
use nym_gateway_requests::registration::handshake::client_handshake;
use nym_gateway_requests::{
@@ -724,6 +725,11 @@ impl<C, St> GatewayClient<C, St> {
return Err(GatewayClientError::NoBandwidthControllerAvailable);
}
let Some(_claim_guard) = self.bandwidth.begin_bandwidth_claim() else {
debug!("there's already an existing bandwidth claim ongoing");
return Ok(());
};
warn!("Not enough bandwidth. Trying to get more bandwidth, this might take a while");
if !self.cfg.bandwidth.require_tickets {
info!("The client is running in disabled credentials mode - attempting to claim bandwidth without a credential");
@@ -743,7 +749,11 @@ impl<C, St> GatewayClient<C, St> {
}
let prepared_credential = self
.unchecked_bandwidth_controller()
.prepare_ecash_ticket(self.gateway_identity.to_bytes(), TICKETS_TO_SPEND)
.prepare_ecash_ticket(
TicketType::V1MixnetEntry,
self.gateway_identity.to_bytes(),
TICKETS_TO_SPEND,
)
.await?;
match self.claim_ecash_bandwidth(prepared_credential.data).await {
@@ -20,6 +20,7 @@ nym-coconut-bandwidth-contract-common = { path = "../../cosmwasm-smart-contracts
nym-ecash-contract-common = { path = "../../cosmwasm-smart-contracts/ecash-contract" }
nym-multisig-contract-common = { path = "../../cosmwasm-smart-contracts/multisig-contract" }
nym-group-contract-common = { path = "../../cosmwasm-smart-contracts/group-contract" }
nym-serde-helpers = { path = "../../serde-helpers", features = ["hex", "base64"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
nym-http-api-client = { path = "../../../common/http-api-client" }
@@ -5,7 +5,7 @@ use crate::nyxd;
use crate::nyxd::coin::Coin;
use crate::nyxd::cosmwasm_client::helpers::{create_pagination, next_page_key};
use crate::nyxd::cosmwasm_client::types::{
Account, CodeDetails, Contract, ContractCodeId, SequenceResponse, SimulateResponse,
Account, CodeDetails, Contract, ContractCodeId, Model, SequenceResponse, SimulateResponse,
};
use crate::nyxd::error::NyxdError;
use crate::nyxd::Query;
@@ -21,11 +21,11 @@ use cosmrs::proto::cosmos::tx::v1beta1::{
SimulateRequest, SimulateResponse as ProtoSimulateResponse,
};
use cosmrs::proto::cosmwasm::wasm::v1::{
QueryCodeRequest, QueryCodeResponse, QueryCodesRequest, QueryCodesResponse,
QueryContractHistoryRequest, QueryContractHistoryResponse, QueryContractInfoRequest,
QueryContractInfoResponse, QueryContractsByCodeRequest, QueryContractsByCodeResponse,
QueryRawContractStateRequest, QueryRawContractStateResponse, QuerySmartContractStateRequest,
QuerySmartContractStateResponse,
QueryAllContractStateRequest, QueryAllContractStateResponse, QueryCodeRequest,
QueryCodeResponse, QueryCodesRequest, QueryCodesResponse, QueryContractHistoryRequest,
QueryContractHistoryResponse, QueryContractInfoRequest, QueryContractInfoResponse,
QueryContractsByCodeRequest, QueryContractsByCodeResponse, QueryRawContractStateRequest,
QueryRawContractStateResponse, QuerySmartContractStateRequest, QuerySmartContractStateResponse,
};
use cosmrs::tendermint::{block, chain, Hash};
use cosmrs::{AccountId, Coin as CosmosCoin, Tx};
@@ -218,17 +218,19 @@ pub trait CosmWasmClient: TendermintRpcClient {
loop {
let mut res = self
.tx_search(query.clone(), false, page, 100, Order::Ascending)
.tx_search(query.clone(), false, page, per_page, Order::Ascending)
.await?;
results.append(&mut res.txs);
// sanity check for if tendermint's maximum per_page was modified -
// we don't want to accidentally be stuck in an infinite loop
if res.total_count == 0 || res.txs.is_empty() {
let early_break = res.total_count == 0 || res.txs.is_empty();
results.append(&mut res.txs);
if early_break {
break;
}
if res.total_count >= per_page {
if res.total_count > results.len() as u32 {
page += 1
} else {
break;
@@ -442,6 +444,38 @@ pub trait CosmWasmClient: TendermintRpcClient {
.collect::<Result<_, _>>()?)
}
async fn query_all_contract_state(&self, address: &AccountId) -> Result<Vec<Model>, NyxdError> {
let path = Some("/cosmwasm.wasm.v1.Query/AllContractState".to_owned());
let mut models = Vec::new();
let mut pagination = None;
loop {
let req = QueryAllContractStateRequest {
address: address.to_string(),
pagination,
};
let mut res = self
.make_abci_query::<_, QueryAllContractStateResponse>(path.clone(), req)
.await?;
let empty_response = res.models.is_empty();
models.append(&mut res.models);
if empty_response {
break;
}
if let Some(next_key) = next_page_key(res.pagination) {
pagination = Some(create_pagination(next_key))
} else {
break;
}
}
Ok(models.into_iter().map(Into::into).collect())
}
async fn query_contract_raw(
&self,
address: &AccountId,
@@ -27,13 +27,34 @@ use cosmrs::vesting::{
};
use cosmrs::{AccountId, Any, Coin as CosmosCoin};
use prost::Message;
use serde::Serialize;
use serde::{Deserialize, Serialize};
pub use cosmrs::abci::GasInfo;
pub use cosmrs::abci::MsgResponse;
pub type ContractCodeId = u64;
// yet another thing to put in cosmrs
#[derive(Serialize, Deserialize)]
pub struct Model {
#[serde(with = "nym_serde_helpers::hex")]
pub key: Vec<u8>,
#[serde(with = "nym_serde_helpers::base64")]
pub value: Vec<u8>,
}
// follow the cosmwasm serialisation format, i.e. hex for key and base64 for value
impl From<cosmrs::proto::cosmwasm::wasm::v1::Model> for Model {
fn from(model: cosmrs::proto::cosmwasm::wasm::v1::Model) -> Self {
Model {
key: model.key,
value: model.value,
}
}
}
#[derive(Serialize)]
pub struct EmptyMsg {}
+6 -1
View File
@@ -9,10 +9,15 @@ use comfy_table::Table;
use nym_credential_storage::initialise_persistent_storage;
use nym_credential_storage::storage::Storage;
use nym_credentials::ecash::bandwidth::serialiser::VersionedSerialise;
use nym_credentials_interface::TicketType;
use std::path::PathBuf;
#[derive(Debug, Parser)]
pub struct Args {
/// Specify which type of ticketbook
#[clap(long, default_value_t = TicketType::V1MixnetEntry)]
pub(crate) ticketbook_type: TicketType,
/// Specify the index of the ticket to retrieve from the ticketbook.
/// By default, the current unspent value is used.
#[clap(long, group = "output")]
@@ -62,7 +67,7 @@ pub async fn execute(args: Args) -> anyhow::Result<()> {
let persistent_storage = initialise_persistent_storage(&credentials_store).await;
let Some(mut next_ticketbook) = persistent_storage
.get_next_unspent_usable_ticketbook(0)
.get_next_unspent_usable_ticketbook(args.ticketbook_type.to_string(), 0)
.await?
else {
bail!(
@@ -1,8 +1,6 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::utils::CommonConfigsWrapper;
use anyhow::bail;
use clap::ArgGroup;
use clap::Parser;
use nym_credential_storage::initialise_persistent_storage;
@@ -31,7 +29,7 @@ impl FromStr for CredentialDataWrapper {
pub struct Args {
/// Config file of the client that is supposed to use the credential.
#[clap(long)]
pub(crate) client_config: PathBuf,
pub(crate) credentials_store: PathBuf,
/// Explicitly provide the encoded credential data (as base58)
#[clap(long, group = "cred_data")]
@@ -70,21 +68,7 @@ impl Args {
}
pub async fn execute(args: Args) -> anyhow::Result<()> {
let loaded = CommonConfigsWrapper::try_load(&args.client_config)?;
if let Ok(id) = loaded.try_get_id() {
println!("loaded config file for client '{id}'");
}
let Ok(credentials_store) = loaded.try_get_credentials_store() else {
bail!("the loaded config does not have a credentials store information")
};
println!(
"using credentials store at '{}'",
credentials_store.display()
);
let credentials_store = initialise_persistent_storage(credentials_store).await;
let credentials_store = initialise_persistent_storage(args.credentials_store.clone()).await;
let version = args.version;
let standalone = args.standalone;
@@ -107,7 +107,7 @@ async fn issue_to_file(args: Args, client: SigningClient) -> anyhow::Result<()>
utils::issue_credential(&client, &credentials_store, &secret, args.ticketbook_type).await?;
let ticketbook = credentials_store
.get_next_unspent_usable_ticketbook(0)
.get_next_unspent_usable_ticketbook(args.ticketbook_type.to_string(), 0)
.await?
.ok_or(anyhow!("we just issued a ticketbook, it must be present!"))?
.ticketbook;
@@ -7,13 +7,14 @@ pub mod execute_contract;
pub mod generators;
pub mod init_contract;
pub mod migrate_contract;
pub mod raw_contract_state;
pub mod upload_contract;
#[derive(Debug, Args)]
#[clap(args_conflicts_with_subcommands = true, subcommand_required = true)]
pub struct Cosmwasm {
#[clap(subcommand)]
pub command: Option<CosmwasmCommands>,
pub command: CosmwasmCommands,
}
#[derive(Debug, Subcommand)]
@@ -28,4 +29,6 @@ pub enum CosmwasmCommands {
Migrate(crate::validator::cosmwasm::migrate_contract::Args),
/// Execute a WASM smart contract method
Execute(crate::validator::cosmwasm::execute_contract::Args),
/// Obtain raw contract state of a cosmwasm smart contract
RawContractState(crate::validator::cosmwasm::raw_contract_state::Args),
}
@@ -0,0 +1,39 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::context::QueryClient;
use clap::Parser;
use cosmrs::AccountId;
use log::trace;
use nym_validator_client::nyxd::CosmWasmClient;
use std::fs;
use std::fs::File;
use std::path::PathBuf;
#[derive(Debug, Parser)]
pub struct Args {
#[clap(long, value_parser)]
#[clap(help = "The address of contract to get the state of")]
pub contract: AccountId,
#[clap(short, long)]
#[clap(help = "Output file for the retrieved contract state")]
pub output: PathBuf,
}
pub async fn execute(args: Args, client: QueryClient) -> anyhow::Result<()> {
trace!("args: {args:?}");
let output = File::create(&args.output)?;
let raw = client.query_all_contract_state(&args.contract).await?;
serde_json::to_writer(output, &raw)?;
println!(
"wrote {} key-value from {} pairs into '{}'",
raw.len(),
args.contract,
fs::canonicalize(args.output)?.display()
);
Ok(())
}
@@ -73,6 +73,7 @@ impl MemoryEcachTicketbookManager {
pub async fn get_next_unspent_ticketbook_and_update(
&self,
ticketbook_type: String,
tickets: u32,
) -> Option<RetrievedTicketbook> {
let mut guard = self.inner.write().await;
@@ -81,6 +82,7 @@ impl MemoryEcachTicketbookManager {
if !t.ticketbook.expired()
&& t.ticketbook.spent_tickets() + tickets as u64
<= t.ticketbook.params_total_tickets()
&& t.ticketbook.ticketbook_type().to_string() == ticketbook_type
{
t.ticketbook
.update_spent_tickets(t.ticketbook.spent_tickets() + tickets as u64);
@@ -171,10 +171,20 @@ impl SqliteEcashTicketbookManager {
data: &[u8],
) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO master_verification_key(epoch_id, serialised_key, serialization_revision) VALUES (?, ?, ?)",
r#"
INSERT OR IGNORE INTO master_verification_key(epoch_id, serialised_key, serialization_revision) VALUES (?, ?, ?);
UPDATE master_verification_key
SET
serialised_key = ?,
serialization_revision = ?
WHERE epoch_id = ?
"#,
epoch_id,
data,
serialisation_revision
serialisation_revision,
data,
serialisation_revision,
epoch_id
)
.execute(&self.connection_pool)
.await?;
@@ -204,10 +214,20 @@ impl SqliteEcashTicketbookManager {
data: &[u8],
) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO coin_indices_signatures(epoch_id, serialised_signatures, serialization_revision) VALUES (?, ?, ?)",
r#"
INSERT OR IGNORE INTO coin_indices_signatures(epoch_id, serialised_signatures, serialization_revision) VALUES (?, ?, ?);
UPDATE coin_indices_signatures
SET
serialised_signatures = ?,
serialization_revision = ?
WHERE epoch_id = ?
"#,
epoch_id,
data,
serialisation_revision
serialisation_revision,
data,
serialisation_revision,
epoch_id,
)
.execute(&self.connection_pool)
.await?;
@@ -240,13 +260,21 @@ impl SqliteEcashTicketbookManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO expiration_date_signatures(expiration_date, epoch_id, serialised_signatures, serialization_revision)
VALUES (?, ?, ?, ?)
INSERT OR IGNORE INTO expiration_date_signatures(expiration_date, epoch_id, serialised_signatures, serialization_revision)
VALUES (?, ?, ?, ?);
UPDATE expiration_date_signatures
SET
serialised_signatures = ?,
serialization_revision = ?
WHERE expiration_date = ?
"#,
expiration_date,
epoch_id,
data,
serialisation_revision
serialisation_revision,
data,
serialisation_revision,
expiration_date
)
.execute(&self.connection_pool)
.await?;
@@ -256,6 +284,7 @@ impl SqliteEcashTicketbookManager {
pub(crate) async fn get_next_unspent_ticketbook<'a, E>(
executor: E,
ticketbook_type: String,
deadline: Date,
tickets: u32,
) -> Result<Option<StoredIssuedTicketbook>, sqlx::Error>
@@ -268,12 +297,14 @@ where
FROM ecash_ticketbook
WHERE used_tickets + ? <= total_tickets
AND expiration_date >= ?
AND ticketbook_type = ?
ORDER BY expiration_date ASC
LIMIT 1
"#,
)
.bind(tickets)
.bind(deadline)
.bind(ticketbook_type)
.fetch_optional(executor)
.await
}
@@ -85,17 +85,18 @@ impl Storage for EphemeralStorage {
Ok(())
}
/// Tries to retrieve one of the stored ticketbook,
/// Tries to retrieve one of the stored ticketbook for the specified type,
/// that has not yet expired and has required number of unspent tickets.
/// it immediately updated the on-disk number of used tickets so that another task
/// could obtain their own tickets at the same time
async fn get_next_unspent_usable_ticketbook(
&self,
ticketbook_type: String,
tickets: u32,
) -> Result<Option<RetrievedTicketbook>, Self::StorageError> {
Ok(self
.storage_manager
.get_next_unspent_ticketbook_and_update(tickets)
.get_next_unspent_ticketbook_and_update(ticketbook_type, tickets)
.await)
}
@@ -171,13 +171,16 @@ impl Storage for PersistentStorage {
/// could obtain their own tickets at the same time
async fn get_next_unspent_usable_ticketbook(
&self,
ticketbook_type: String,
tickets: u32,
) -> Result<Option<RetrievedTicketbook>, Self::StorageError> {
let deadline = ecash_today().ecash_date();
let mut tx = self.storage_manager.begin_storage_tx().await?;
// we don't want ticketbooks with expiration in the past
let Some(raw) = get_next_unspent_ticketbook(&mut tx, deadline, tickets).await? else {
let Some(raw) =
get_next_unspent_ticketbook(&mut tx, ticketbook_type, deadline, tickets).await?
else {
// make sure to finish our tx
tx.commit().await?;
return Ok(None);
+2 -1
View File
@@ -45,12 +45,13 @@ pub trait Storage: Send + Sync {
async fn remove_pending_ticketbook(&self, pending_id: i64) -> Result<(), Self::StorageError>;
/// Tries to retrieve one of the stored ticketbook,
/// Tries to retrieve one of the stored ticketbook for the specified type,
/// that has not yet expired and has required number of unspent tickets.
/// it immediately updated the on-disk number of used tickets so that another task
/// could obtain their own tickets at the same time
async fn get_next_unspent_usable_ticketbook(
&self,
ticketbook_type: String,
tickets: u32,
) -> Result<Option<RetrievedTicketbook>, Self::StorageError>;
@@ -1,6 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::*;
use crate::BandwidthFlushingBehaviourConfig;
use crate::ClientBandwidth;
use nym_credentials::ecash::utils::ecash_today;
use nym_credentials_interface::Bandwidth;
use nym_gateway_requests::ServerResponse;
@@ -9,10 +12,6 @@ use si_scale::helpers::bibytes2;
use time::OffsetDateTime;
use tracing::*;
use crate::error::*;
use crate::BandwidthFlushingBehaviourConfig;
use crate::ClientBandwidth;
const FREE_TESTNET_BANDWIDTH_VALUE: Bandwidth = Bandwidth::new_unchecked(64 * 1024 * 1024 * 1024); // 64GB
#[derive(Clone)]
@@ -41,9 +40,13 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
}
}
pub async fn available_bandwidth(&self) -> i64 {
self.client_bandwidth.available().await
}
async fn sync_expiration(&mut self) -> Result<()> {
self.storage
.set_expiration(self.client_id, self.client_bandwidth.bandwidth.expiration)
.set_expiration(self.client_id, self.client_bandwidth.expiration().await)
.await?;
Ok(())
}
@@ -57,17 +60,17 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
self.increase_bandwidth(FREE_TESTNET_BANDWIDTH_VALUE, ecash_today())
.await?;
let available_total = self.client_bandwidth.bandwidth.bytes;
let available_total = self.client_bandwidth.available().await;
Ok(ServerResponse::Bandwidth { available_total })
}
#[instrument(skip_all)]
pub async fn try_use_bandwidth(&mut self, required_bandwidth: i64) -> Result<i64> {
if self.client_bandwidth.bandwidth.expired() {
if self.client_bandwidth.expired().await {
self.expire_bandwidth().await?;
}
let available_bandwidth = self.client_bandwidth.bandwidth.bytes;
let available_bandwidth = self.client_bandwidth.available().await;
if available_bandwidth < required_bandwidth {
return Err(Error::OutOfBandwidth {
@@ -86,8 +89,7 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
async fn expire_bandwidth(&mut self) -> Result<()> {
self.storage.reset_bandwidth(self.client_id).await?;
self.client_bandwidth.bandwidth = Default::default();
self.client_bandwidth.update_sync_data();
self.client_bandwidth.expire_bandwidth().await;
Ok(())
}
@@ -97,31 +99,31 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
///
/// * `amount`: amount to decrease the available bandwidth by.
async fn consume_bandwidth(&mut self, amount: i64) -> Result<()> {
self.client_bandwidth.bandwidth.bytes -= amount;
self.client_bandwidth.bytes_delta_since_sync -= amount;
self.client_bandwidth.decrease_bandwidth(amount).await;
// since we're going to be operating on a fair use policy anyway, even if we crash and let extra few packets
// through, that's completely fine
if self.client_bandwidth.should_sync(self.bandwidth_cfg) {
self.sync_bandwidth().await?;
if self.client_bandwidth.should_sync(self.bandwidth_cfg).await {
self.sync_storage_bandwidth().await?;
}
Ok(())
}
#[instrument(level = "trace", skip_all)]
async fn sync_bandwidth(&mut self) -> Result<()> {
async fn sync_storage_bandwidth(&mut self) -> Result<()> {
trace!("syncing client bandwidth with the underlying storage");
let updated = self
.storage
.increase_bandwidth(self.client_id, self.client_bandwidth.bytes_delta_since_sync)
.increase_bandwidth(
self.client_id,
self.client_bandwidth.delta_since_sync().await,
)
.await?;
trace!(updated);
self.client_bandwidth.bandwidth.bytes = updated;
self.client_bandwidth.update_sync_data();
self.client_bandwidth
.resync_bandwidth_with_storage(updated)
.await;
Ok(())
}
@@ -136,13 +138,14 @@ impl<S: Storage + Clone + 'static> BandwidthStorageManager<S> {
bandwidth: Bandwidth,
expiration: OffsetDateTime,
) -> Result<()> {
self.client_bandwidth.bandwidth.bytes += bandwidth.value() as i64;
self.client_bandwidth.bytes_delta_since_sync += bandwidth.value() as i64;
self.client_bandwidth.bandwidth.expiration = expiration;
self.client_bandwidth
.increase_bandwidth(bandwidth.value() as i64, expiration)
.await;
// any increases to bandwidth should get flushed immediately
// (we don't want to accidentally miss somebody claiming a gigabyte voucher)
self.sync_expiration().await?;
self.sync_bandwidth().await
self.sync_storage_bandwidth().await?;
Ok(())
}
}
@@ -1,10 +1,15 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::time::Duration;
use nym_credentials::ecash::utils::ecash_today;
use nym_credentials_interface::AvailableBandwidth;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::RwLock;
const DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE: Duration = Duration::from_millis(5);
const DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT: i64 = 512 * 1024; // 512kB
#[derive(Debug, Clone, Copy)]
pub struct BandwidthFlushingBehaviourConfig {
@@ -15,10 +20,25 @@ pub struct BandwidthFlushingBehaviourConfig {
pub client_bandwidth_max_delta_flushing_amount: i64,
}
#[derive(Debug, Clone, Copy)]
impl Default for BandwidthFlushingBehaviourConfig {
fn default() -> Self {
Self {
client_bandwidth_max_flushing_rate: DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE,
client_bandwidth_max_delta_flushing_amount:
DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT,
}
}
}
#[derive(Debug, Clone)]
pub struct ClientBandwidth {
inner: Arc<RwLock<ClientBandwidthInner>>,
}
#[derive(Debug)]
struct ClientBandwidthInner {
pub(crate) bandwidth: AvailableBandwidth,
pub(crate) last_flushed: OffsetDateTime,
pub(crate) last_synced: OffsetDateTime,
/// the number of bytes the client had during the last sync.
/// it is used to determine whether the current value should be synced with the storage
@@ -30,28 +50,74 @@ pub struct ClientBandwidth {
impl ClientBandwidth {
pub fn new(bandwidth: AvailableBandwidth) -> ClientBandwidth {
ClientBandwidth {
bandwidth,
last_flushed: OffsetDateTime::now_utc(),
bytes_at_last_sync: bandwidth.bytes,
bytes_delta_since_sync: 0,
inner: Arc::new(RwLock::new(ClientBandwidthInner {
bandwidth,
last_synced: OffsetDateTime::now_utc(),
bytes_at_last_sync: bandwidth.bytes,
bytes_delta_since_sync: 0,
})),
}
}
pub(crate) fn should_sync(&self, cfg: BandwidthFlushingBehaviourConfig) -> bool {
if self.bytes_delta_since_sync.abs() >= cfg.client_bandwidth_max_delta_flushing_amount {
pub(crate) async fn should_sync(&self, cfg: BandwidthFlushingBehaviourConfig) -> bool {
let guard = self.inner.read().await;
if guard.bytes_delta_since_sync.abs() >= cfg.client_bandwidth_max_delta_flushing_amount {
return true;
}
if self.last_flushed + cfg.client_bandwidth_max_flushing_rate < OffsetDateTime::now_utc() {
if guard.last_synced + cfg.client_bandwidth_max_flushing_rate < OffsetDateTime::now_utc() {
return true;
}
false
}
pub(crate) fn update_sync_data(&mut self) {
self.last_flushed = OffsetDateTime::now_utc();
self.bytes_at_last_sync = self.bandwidth.bytes;
self.bytes_delta_since_sync = 0;
pub(crate) async fn available(&self) -> i64 {
self.inner.read().await.bandwidth.bytes
}
pub(crate) async fn delta_since_sync(&self) -> i64 {
self.inner.read().await.bytes_delta_since_sync
}
pub(crate) async fn expiration(&self) -> OffsetDateTime {
self.inner.read().await.bandwidth.expiration
}
pub(crate) async fn expired(&self) -> bool {
self.expiration().await < ecash_today()
}
pub(crate) async fn decrease_bandwidth(&self, decrease: i64) {
let mut guard = self.inner.write().await;
guard.bandwidth.bytes -= decrease;
guard.bytes_delta_since_sync -= decrease;
}
pub(crate) async fn increase_bandwidth(&self, increase: i64, new_expiration: OffsetDateTime) {
let mut guard = self.inner.write().await;
guard.bandwidth.bytes += increase;
guard.bandwidth.expiration = new_expiration;
guard.bytes_delta_since_sync += increase;
}
pub(crate) async fn expire_bandwidth(&self) {
let mut guard = self.inner.write().await;
guard.bandwidth = AvailableBandwidth::default();
guard.last_synced = OffsetDateTime::now_utc();
guard.bytes_at_last_sync = 0;
guard.bytes_delta_since_sync = 0;
}
pub(crate) async fn resync_bandwidth_with_storage(&self, stored: i64) {
let mut guard = self.inner.write().await;
guard.bandwidth.bytes = stored;
guard.bytes_at_last_sync = stored;
guard.bytes_delta_since_sync = 0;
guard.last_synced = OffsetDateTime::now_utc();
}
}
@@ -73,6 +73,10 @@ where
self.shared_state.verification_key(epoch_id).await
}
pub fn storage(&self) -> &S {
&self.shared_state.storage
}
//Check for duplicate pay_info, then check the payment, then insert pay_info if everything succeeded
pub async fn check_payment(
&self,
+2 -2
View File
@@ -150,7 +150,7 @@ impl<S: Storage + Clone + 'static> CredentialVerifier<S> {
Ok(self
.bandwidth_storage_manager
.client_bandwidth
.bandwidth
.bytes)
.available()
.await)
}
}
@@ -0,0 +1,10 @@
/*
* Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
ALTER TABLE wireguard_peer
ADD COLUMN client_id INTEGER REFERENCES clients(id) DEFAULT NULL;
ALTER TABLE wireguard_peer
DROP COLUMN suspended;
+41 -12
View File
@@ -227,12 +227,14 @@ pub trait Storage: Send + Sync {
/// # Arguments
///
/// * `peer`: wireguard peer data to be stored
/// * `suspended`: if peer exists, but it's currently suspended
/// * `with_client_id`: if the peer should have a corresponding client_id
/// (created with entry wireguard ticket) or live without one (or with an
/// exiting one), for temporary backwards compatibility.
async fn insert_wireguard_peer(
&self,
peer: &defguard_wireguard_rs::host::Peer,
suspended: bool,
) -> Result<(), StorageError>;
with_client_id: bool,
) -> Result<Option<i64>, StorageError>;
/// Tries to retrieve available bandwidth for the particular peer.
///
@@ -334,14 +336,23 @@ impl Storage for PersistentStorage {
client_address: DestinationAddressBytes,
shared_keys: &SharedGatewayKey,
) -> Result<i64, StorageError> {
let client_id = self
.client_manager
.insert_client(ClientType::EntryMixnet)
.await?;
let client_address_bs58 = client_address.as_base58_string();
let client_id = match self
.shared_key_manager
.client_id(&client_address_bs58)
.await
{
Ok(client_id) => client_id,
_ => {
self.client_manager
.insert_client(ClientType::EntryMixnet)
.await?
}
};
self.shared_key_manager
.insert_shared_keys(
client_id,
client_address.as_base58_string(),
client_address_bs58,
shared_keys.aes128_ctr_hmac_bs58().as_deref(),
shared_keys.aes256_gcm_siv().as_deref(),
)
@@ -640,12 +651,30 @@ impl Storage for PersistentStorage {
async fn insert_wireguard_peer(
&self,
peer: &defguard_wireguard_rs::host::Peer,
suspended: bool,
) -> Result<(), StorageError> {
with_client_id: bool,
) -> Result<Option<i64>, StorageError> {
let client_id = match self
.wireguard_peer_manager
.retrieve_peer(&peer.public_key.to_string())
.await?
{
Some(peer) => peer.client_id,
_ => {
if with_client_id {
Some(
self.client_manager
.insert_client(ClientType::EntryWireguard)
.await?,
)
} else {
None
}
}
};
let mut peer = WireguardPeer::from(peer.clone());
peer.suspended = suspended;
peer.client_id = client_id;
self.wireguard_peer_manager.insert_peer(&peer).await?;
Ok(())
Ok(client_id)
}
async fn get_wireguard_peer(
+2 -2
View File
@@ -116,7 +116,7 @@ pub struct WireguardPeer {
pub rx_bytes: i64,
pub persistent_keepalive_interval: Option<i64>,
pub allowed_ips: Vec<u8>,
pub suspended: bool,
pub client_id: Option<i64>,
}
impl From<defguard_wireguard_rs::host::Peer> for WireguardPeer {
@@ -146,7 +146,7 @@ impl From<defguard_wireguard_rs::host::Peer> for WireguardPeer {
&value.allowed_ips,
)
.unwrap_or_default(),
suspended: false,
client_id: None,
}
}
}
@@ -27,16 +27,16 @@ impl WgPeerManager {
pub(crate) async fn insert_peer(&self, peer: &WireguardPeer) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT OR IGNORE INTO wireguard_peer(public_key, preshared_key, protocol_version, endpoint, last_handshake, tx_bytes, rx_bytes, persistent_keepalive_interval, allowed_ips, suspended)
INSERT OR IGNORE INTO wireguard_peer(public_key, preshared_key, protocol_version, endpoint, last_handshake, tx_bytes, rx_bytes, persistent_keepalive_interval, allowed_ips, client_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
UPDATE wireguard_peer
SET preshared_key = ?, protocol_version = ?, endpoint = ?, last_handshake = ?, tx_bytes = ?, rx_bytes = ?, persistent_keepalive_interval = ?, allowed_ips = ?, suspended = ?
SET preshared_key = ?, protocol_version = ?, endpoint = ?, last_handshake = ?, tx_bytes = ?, rx_bytes = ?, persistent_keepalive_interval = ?, allowed_ips = ?, client_id = ?
WHERE public_key = ?
"#,
peer.public_key, peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.suspended,
peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.suspended,peer.public_key,
peer.public_key, peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.client_id,
peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.client_id,
peer.public_key,
)
.execute(&self.connection_pool)
.await?;
@@ -78,7 +78,7 @@ impl WgPeerManager {
.await
}
/// Retrieve the wireguard peer with the provided public key from the storage.
/// Remove the wireguard peer with the provided public key from the storage.
///
/// # Arguments
///
+3 -1
View File
@@ -18,7 +18,9 @@ pub use user_agent::UserAgent;
mod user_agent;
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
// The timeout is relatively high as we are often making requests over the mixnet, where latency is
// high and chatty protocols take a while to complete.
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
pub type PathSegments<'a> = &'a [&'a str];
pub type Params<'a, K, V> = &'a [(K, V)];
+1 -1
View File
@@ -2,4 +2,4 @@ pub mod conversion;
pub mod request;
pub mod response;
const VERSION: u8 = 6;
pub const VERSION: u8 = 6;
+1 -1
View File
@@ -3,4 +3,4 @@ pub mod request;
pub mod response;
pub mod signature;
const VERSION: u8 = 7;
pub const VERSION: u8 = 7;
@@ -2,30 +2,17 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx_acknowledgements::surb_ack::SurbAckRecoveryError;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddressError;
use nym_sphinx_types::{NymPacketError, OutfoxError, SphinxError};
use nym_sphinx_framing::processing::PacketProcessingError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MixProcessingError {
#[error("failed to process received packet: {0}")]
NymPacketProcessingError(#[from] NymPacketError),
#[error("failed to process received sphinx packet: {0}")]
SphinxProcessingError(#[from] SphinxError),
#[error("the forward hop address was malformed: {0}")]
InvalidForwardHopAddress(#[from] NymNodeRoutingAddressError),
#[error("the final hop did not contain a SURB-Ack")]
NoSurbAckInFinalHop,
#[error("failed to recover the expected SURB-Ack packet: {0}")]
MalformedSurbAck(#[from] SurbAckRecoveryError),
#[error("the received packet was set to use the very old and very much deprecated 'VPN' mode")]
ReceivedOldTypeVpnPacket,
#[error("failed to process received outfox packet: {0}")]
OutfoxProcessingError(#[from] OutfoxError),
#[error("failed to process received Nym packet: {0}")]
NymPacketProcessingError(#[from] PacketProcessingError),
}
@@ -1,38 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::packet_processor::error::MixProcessingError;
use log::*;
use nym_metrics::nanos;
use nym_sphinx_acknowledgements::surb_ack::SurbAck;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_framing::packet::FramedNymPacket;
use nym_sphinx_params::{PacketSize, PacketType};
use nym_sphinx_types::{
Delay as SphinxDelay, DestinationAddressBytes, NodeAddressBytes, NymPacket, NymProcessedPacket,
PrivateKey, ProcessedPacket,
};
use std::sync::Arc;
type ForwardAck = MixPacket;
#[derive(Debug)]
pub struct ProcessedFinalHop {
pub destination: DestinationAddressBytes,
pub forward_ack: Option<ForwardAck>,
pub message: Vec<u8>,
}
#[derive(Debug)]
pub enum MixProcessingResult {
/// Contains unwrapped data that should first get delayed before being sent to next hop.
ForwardHop(MixPacket, Option<SphinxDelay>),
/// Contains all data extracted out of the final hop packet that could be forwarded to the destination.
FinalHop(ProcessedFinalHop),
}
use nym_sphinx_types::PrivateKey;
#[derive(Clone)]
pub struct SphinxPacketProcessor {
@@ -48,280 +19,7 @@ impl SphinxPacketProcessor {
}
}
/// Performs a fresh sphinx unwrapping using no cache.
fn perform_initial_packet_processing(
&self,
packet: NymPacket,
) -> Result<NymProcessedPacket, MixProcessingError> {
nanos!("perform_initial_packet_processing", {
packet.process(&self.sphinx_key).map_err(|err| {
debug!("Failed to unwrap NymPacket packet: {err}");
MixProcessingError::NymPacketProcessingError(err)
})
})
}
/// Takes the received framed packet and tries to unwrap it from the sphinx encryption.
fn perform_initial_unwrapping(
&self,
received: FramedNymPacket,
) -> Result<NymProcessedPacket, MixProcessingError> {
nanos!("perform_initial_unwrapping", {
let packet = received.into_inner();
self.perform_initial_packet_processing(packet)
})
}
/// Processed received forward hop packet - tries to extract next hop address, sets delay
/// and packs all the data in a way that can be easily sent to the next hop.
fn process_forward_hop(
&self,
packet: NymPacket,
forward_address: NodeAddressBytes,
delay: SphinxDelay,
packet_type: PacketType,
) -> Result<MixProcessingResult, MixProcessingError> {
let next_hop_address = NymNodeRoutingAddress::try_from(forward_address)?;
let mix_packet = MixPacket::new(next_hop_address, packet, packet_type);
Ok(MixProcessingResult::ForwardHop(mix_packet, Some(delay)))
}
/// Split data extracted from the final hop sphinx packet into a SURBAck and message
/// that should get delivered to a client.
fn split_hop_data_into_ack_and_message(
&self,
mut extracted_data: Vec<u8>,
packet_type: PacketType,
) -> Result<(Vec<u8>, Vec<u8>), MixProcessingError> {
let ack_len = SurbAck::len(Some(packet_type));
// in theory it's impossible for this to fail since it managed to go into correct `match`
// branch at the caller
if extracted_data.len() < ack_len {
return Err(MixProcessingError::NoSurbAckInFinalHop);
}
let message = extracted_data.split_off(ack_len);
let ack_data = extracted_data;
Ok((ack_data, message))
}
/// Tries to extract a SURBAck that could be sent back into the mix network and message
/// that should get delivered to a client from received Sphinx packet.
fn split_into_ack_and_message(
&self,
data: Vec<u8>,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<(Option<MixPacket>, Vec<u8>), MixProcessingError> {
match packet_size {
PacketSize::AckPacket | PacketSize::OutfoxAckPacket => {
trace!("received an ack packet!");
Ok((None, data))
}
PacketSize::RegularPacket
| PacketSize::ExtendedPacket8
| PacketSize::ExtendedPacket16
| PacketSize::ExtendedPacket32
| PacketSize::OutfoxRegularPacket => {
trace!("received a normal packet!");
let (ack_data, message) =
self.split_hop_data_into_ack_and_message(data, packet_type)?;
let (ack_first_hop, ack_packet) =
match SurbAck::try_recover_first_hop_packet(&ack_data, packet_type) {
Ok((first_hop, packet)) => (first_hop, packet),
Err(err) => {
info!("Failed to recover first hop from ack data: {err}");
return Err(err.into());
}
};
let forward_ack = MixPacket::new(ack_first_hop, ack_packet, packet_type);
Ok((Some(forward_ack), message))
}
}
}
/// Processed received final hop packet - tries to extract SURBAck out of it (assuming the
/// packet itself is not an ACK) and splits it from the message that should get delivered
/// to the destination.
fn process_final_hop(
&self,
destination: DestinationAddressBytes,
payload: Vec<u8>,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<MixProcessingResult, MixProcessingError> {
let (forward_ack, message) =
self.split_into_ack_and_message(payload, packet_size, packet_type)?;
Ok(MixProcessingResult::FinalHop(ProcessedFinalHop {
destination,
forward_ack,
message,
}))
}
/// Performs final processing for the unwrapped packet based on whether it was a forward hop
/// or a final hop.
fn perform_final_processing(
&self,
packet: NymProcessedPacket,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<MixProcessingResult, MixProcessingError> {
match packet {
NymProcessedPacket::Sphinx(packet) => {
match packet {
ProcessedPacket::ForwardHop(packet, address, delay) => self
.process_forward_hop(
NymPacket::Sphinx(*packet),
address,
delay,
packet_type,
),
// right now there's no use for the surb_id included in the header - probably it should get removed from the
// sphinx all together?
ProcessedPacket::FinalHop(destination, _, payload) => self.process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
),
}
}
NymProcessedPacket::Outfox(packet) => {
let next_address = *packet.next_address();
let packet = packet.into_packet();
if packet.is_final_hop() {
self.process_final_hop(
DestinationAddressBytes::from_bytes(next_address),
packet.recover_plaintext()?.to_vec(),
packet_size,
packet_type,
)
} else {
let mix_packet = MixPacket::new(
NymNodeRoutingAddress::try_from_bytes(&next_address)?,
NymPacket::Outfox(packet),
PacketType::Outfox,
);
Ok(MixProcessingResult::ForwardHop(mix_packet, None))
}
}
}
}
pub fn process_received(
&self,
received: FramedNymPacket,
) -> Result<MixProcessingResult, MixProcessingError> {
// explicit packet size will help to correctly parse final hop
nanos!("process_received", {
let packet_size = received.packet_size();
let packet_type = received.packet_type();
// unwrap the sphinx packet and if possible and appropriate, cache keys
let processed_packet = self.perform_initial_unwrapping(received)?;
// for forward packets, extract next hop and set delay (but do NOT delay here)
// for final packets, extract SURBAck
let final_processing_result =
self.perform_final_processing(processed_packet, packet_size, packet_type);
if final_processing_result.is_err() {
error!("{:?}", final_processing_result)
}
final_processing_result
})
}
}
// TODO: what more could we realistically test here?
#[cfg(test)]
mod tests {
use super::*;
use nym_sphinx_types::crypto::keygen;
fn fixture() -> SphinxPacketProcessor {
let local_keys = keygen();
SphinxPacketProcessor::new(local_keys.0)
}
#[tokio::test]
async fn splitting_hop_data_works_for_sufficiently_long_payload() {
let processor = fixture();
let short_data = vec![42u8];
assert!(processor
.split_hop_data_into_ack_and_message(short_data, PacketType::Mix)
.is_err());
let sufficient_data = vec![42u8; SurbAck::len(Some(PacketType::Mix))];
let (ack, data) = processor
.split_hop_data_into_ack_and_message(sufficient_data.clone(), PacketType::Mix)
.unwrap();
assert_eq!(sufficient_data, ack);
assert!(data.is_empty());
let long_data = vec![42u8; SurbAck::len(Some(PacketType::Mix)) * 5];
let (ack, data) = processor
.split_hop_data_into_ack_and_message(long_data, PacketType::Mix)
.unwrap();
assert_eq!(ack.len(), SurbAck::len(Some(PacketType::Mix)));
assert_eq!(data.len(), SurbAck::len(Some(PacketType::Mix)) * 4)
}
#[tokio::test]
async fn splitting_hop_data_works_for_sufficiently_long_payload_outfox() {
let processor = fixture();
let short_data = vec![42u8];
assert!(processor
.split_hop_data_into_ack_and_message(short_data, PacketType::Outfox)
.is_err());
let sufficient_data = vec![42u8; SurbAck::len(Some(PacketType::Outfox))];
let (ack, data) = processor
.split_hop_data_into_ack_and_message(sufficient_data.clone(), PacketType::Outfox)
.unwrap();
assert_eq!(sufficient_data, ack);
assert!(data.is_empty());
let long_data = vec![42u8; SurbAck::len(Some(PacketType::Outfox)) * 5];
let (ack, data) = processor
.split_hop_data_into_ack_and_message(long_data, PacketType::Outfox)
.unwrap();
assert_eq!(ack.len(), SurbAck::len(Some(PacketType::Outfox)));
assert_eq!(data.len(), SurbAck::len(Some(PacketType::Outfox)) * 4)
}
#[tokio::test]
async fn splitting_into_ack_and_message_returns_whole_data_for_ack() {
let processor = fixture();
let data = vec![42u8; SurbAck::len(Some(PacketType::Mix)) + 10];
let (ack, message) = processor
.split_into_ack_and_message(data.clone(), PacketSize::AckPacket, PacketType::Mix)
.unwrap();
assert!(ack.is_none());
assert_eq!(data, message)
}
#[tokio::test]
async fn splitting_into_ack_and_message_returns_whole_data_for_ack_outfox() {
let processor = fixture();
let data = vec![42u8; SurbAck::len(Some(PacketType::Outfox)) + 10];
let (ack, message) = processor
.split_into_ack_and_message(
data.clone(),
PacketSize::OutfoxAckPacket,
PacketType::Outfox,
)
.unwrap();
assert!(ack.is_none());
assert_eq!(data, message)
pub fn sphinx_key(&self) -> &PrivateKey {
&self.sphinx_key
}
}
+1 -1
View File
@@ -32,7 +32,7 @@ pub const NYXD_URL: &str = "https://rpc.nymtech.net";
pub const NYM_API: &str = "https://validator.nymtech.net/api/";
pub const NYXD_WS: &str = "wss://rpc.nymtech.net/websocket";
pub const EXPLORER_API: &str = "https://explorer.nymtech.net/api/";
pub const NYM_VPN_API: &str = "https://nymvpn.net/api/";
pub const NYM_VPN_API: &str = "https://nymvpn.com/api/";
// I'm making clippy mad on purpose, because that url HAS TO be updated and deployed before merging
pub const EXIT_POLICY_URL: &str =
+7
View File
@@ -11,7 +11,14 @@ repository = { workspace = true }
bytes = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
thiserror = { workspace = true }
log = { workspace = true }
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
nym-sphinx-params = { path = "../params", features = ["sphinx", "outfox"] }
nym-sphinx-forwarding = { path = "../forwarding" }
nym-metrics = { path = "../../nym-metrics" }
nym-sphinx-addressing = { path = "../addressing" }
nym-sphinx-acknowledgements = { path = "../acknowledgements" }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
+1
View File
@@ -3,3 +3,4 @@
pub mod codec;
pub mod packet;
pub mod processing;
+284
View File
@@ -0,0 +1,284 @@
use log::{debug, error, info, trace};
use nym_sphinx_acknowledgements::surb_ack::{SurbAck, SurbAckRecoveryError};
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError};
use nym_sphinx_params::{PacketSize, PacketType};
use nym_sphinx_types::{
Delay as SphinxDelay, DestinationAddressBytes, NodeAddressBytes, NymPacket, NymPacketError,
NymProcessedPacket, OutfoxError, PrivateKey, ProcessedPacket, SphinxError,
};
use thiserror::Error;
use crate::packet::FramedNymPacket;
use nym_metrics::nanos;
use nym_sphinx_forwarding::packet::MixPacket;
#[derive(Debug)]
pub enum MixProcessingResult {
/// Contains unwrapped data that should first get delayed before being sent to next hop.
ForwardHop(MixPacket, Option<SphinxDelay>),
/// Contains all data extracted out of the final hop packet that could be forwarded to the destination.
FinalHop(ProcessedFinalHop),
}
type ForwardAck = MixPacket;
#[derive(Debug)]
pub struct ProcessedFinalHop {
pub destination: DestinationAddressBytes,
pub forward_ack: Option<ForwardAck>,
pub message: Vec<u8>,
}
#[derive(Debug, Error)]
pub enum PacketProcessingError {
#[error("failed to process received packet: {0}")]
NymPacketProcessingError(#[from] NymPacketError),
#[error("failed to process received sphinx packet: {0}")]
SphinxProcessingError(#[from] SphinxError),
#[error("the forward hop address was malformed: {0}")]
InvalidForwardHopAddress(#[from] NymNodeRoutingAddressError),
#[error("the final hop did not contain a SURB-Ack")]
NoSurbAckInFinalHop,
#[error("failed to recover the expected SURB-Ack packet: {0}")]
MalformedSurbAck(#[from] SurbAckRecoveryError),
#[error("the received packet was set to use the very old and very much deprecated 'VPN' mode")]
ReceivedOldTypeVpnPacket,
#[error("failed to process received outfox packet: {0}")]
OutfoxProcessingError(#[from] OutfoxError),
}
pub fn process_framed_packet(
received: FramedNymPacket,
sphinx_key: &PrivateKey,
) -> Result<MixProcessingResult, PacketProcessingError> {
nanos!("process_received", {
let packet_size = received.packet_size();
let packet_type = received.packet_type();
// unwrap the sphinx packet and if possible and appropriate, cache keys
let processed_packet = perform_framed_unwrapping(received, sphinx_key)?;
// for forward packets, extract next hop and set delay (but do NOT delay here)
// for final packets, extract SURBAck
let final_processing_result =
perform_final_processing(processed_packet, packet_size, packet_type);
if final_processing_result.is_err() {
error!("{:?}", final_processing_result)
}
final_processing_result
})
}
fn perform_framed_unwrapping(
received: FramedNymPacket,
sphinx_key: &PrivateKey,
) -> Result<NymProcessedPacket, PacketProcessingError> {
nanos!("perform_initial_unwrapping", {
let packet = received.into_inner();
perform_framed_packet_processing(packet, sphinx_key)
})
}
fn perform_framed_packet_processing(
packet: NymPacket,
sphinx_key: &PrivateKey,
) -> Result<NymProcessedPacket, PacketProcessingError> {
nanos!("perform_initial_packet_processing", {
packet.process(sphinx_key).map_err(|err| {
debug!("Failed to unwrap NymPacket packet: {err}");
PacketProcessingError::NymPacketProcessingError(err)
})
})
}
fn perform_final_processing(
packet: NymProcessedPacket,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<MixProcessingResult, PacketProcessingError> {
match packet {
NymProcessedPacket::Sphinx(packet) => {
match packet {
ProcessedPacket::ForwardHop(packet, address, delay) => {
process_forward_hop(NymPacket::Sphinx(*packet), address, delay, packet_type)
}
// right now there's no use for the surb_id included in the header - probably it should get removed from the
// sphinx all together?
ProcessedPacket::FinalHop(destination, _, payload) => process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
),
}
}
NymProcessedPacket::Outfox(packet) => {
let next_address = *packet.next_address();
let packet = packet.into_packet();
if packet.is_final_hop() {
process_final_hop(
DestinationAddressBytes::from_bytes(next_address),
packet.recover_plaintext()?.to_vec(),
packet_size,
packet_type,
)
} else {
let mix_packet = MixPacket::new(
NymNodeRoutingAddress::try_from_bytes(&next_address)?,
NymPacket::Outfox(packet),
PacketType::Outfox,
);
Ok(MixProcessingResult::ForwardHop(mix_packet, None))
}
}
}
}
fn process_final_hop(
destination: DestinationAddressBytes,
payload: Vec<u8>,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<MixProcessingResult, PacketProcessingError> {
let (forward_ack, message) = split_into_ack_and_message(payload, packet_size, packet_type)?;
Ok(MixProcessingResult::FinalHop(ProcessedFinalHop {
destination,
forward_ack,
message,
}))
}
fn split_into_ack_and_message(
data: Vec<u8>,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<(Option<MixPacket>, Vec<u8>), PacketProcessingError> {
match packet_size {
PacketSize::AckPacket | PacketSize::OutfoxAckPacket => {
trace!("received an ack packet!");
Ok((None, data))
}
PacketSize::RegularPacket
| PacketSize::ExtendedPacket8
| PacketSize::ExtendedPacket16
| PacketSize::ExtendedPacket32
| PacketSize::OutfoxRegularPacket => {
trace!("received a normal packet!");
let (ack_data, message) = split_hop_data_into_ack_and_message(data, packet_type)?;
let (ack_first_hop, ack_packet) =
match SurbAck::try_recover_first_hop_packet(&ack_data, packet_type) {
Ok((first_hop, packet)) => (first_hop, packet),
Err(err) => {
info!("Failed to recover first hop from ack data: {err}");
return Err(err.into());
}
};
let forward_ack = MixPacket::new(ack_first_hop, ack_packet, packet_type);
Ok((Some(forward_ack), message))
}
}
}
fn split_hop_data_into_ack_and_message(
mut extracted_data: Vec<u8>,
packet_type: PacketType,
) -> Result<(Vec<u8>, Vec<u8>), PacketProcessingError> {
let ack_len = SurbAck::len(Some(packet_type));
// in theory it's impossible for this to fail since it managed to go into correct `match`
// branch at the caller
if extracted_data.len() < ack_len {
return Err(PacketProcessingError::NoSurbAckInFinalHop);
}
let message = extracted_data.split_off(ack_len);
let ack_data = extracted_data;
Ok((ack_data, message))
}
fn process_forward_hop(
packet: NymPacket,
forward_address: NodeAddressBytes,
delay: SphinxDelay,
packet_type: PacketType,
) -> Result<MixProcessingResult, PacketProcessingError> {
let next_hop_address = NymNodeRoutingAddress::try_from(forward_address)?;
let mix_packet = MixPacket::new(next_hop_address, packet, packet_type);
Ok(MixProcessingResult::ForwardHop(mix_packet, Some(delay)))
}
// TODO: what more could we realistically test here?
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn splitting_hop_data_works_for_sufficiently_long_payload() {
let short_data = vec![42u8];
assert!(split_hop_data_into_ack_and_message(short_data, PacketType::Mix).is_err());
let sufficient_data = vec![42u8; SurbAck::len(Some(PacketType::Mix))];
let (ack, data) =
split_hop_data_into_ack_and_message(sufficient_data.clone(), PacketType::Mix).unwrap();
assert_eq!(sufficient_data, ack);
assert!(data.is_empty());
let long_data: Vec<u8> = vec![42u8; SurbAck::len(Some(PacketType::Mix)) * 5];
let (ack, data) = split_hop_data_into_ack_and_message(long_data, PacketType::Mix).unwrap();
assert_eq!(ack.len(), SurbAck::len(Some(PacketType::Mix)));
assert_eq!(data.len(), SurbAck::len(Some(PacketType::Mix)) * 4)
}
#[tokio::test]
async fn splitting_hop_data_works_for_sufficiently_long_payload_outfox() {
let short_data = vec![42u8];
assert!(split_hop_data_into_ack_and_message(short_data, PacketType::Outfox).is_err());
let sufficient_data = vec![42u8; SurbAck::len(Some(PacketType::Outfox))];
let (ack, data) =
split_hop_data_into_ack_and_message(sufficient_data.clone(), PacketType::Outfox)
.unwrap();
assert_eq!(sufficient_data, ack);
assert!(data.is_empty());
let long_data = vec![42u8; SurbAck::len(Some(PacketType::Outfox)) * 5];
let (ack, data) =
split_hop_data_into_ack_and_message(long_data, PacketType::Outfox).unwrap();
assert_eq!(ack.len(), SurbAck::len(Some(PacketType::Outfox)));
assert_eq!(data.len(), SurbAck::len(Some(PacketType::Outfox)) * 4)
}
#[tokio::test]
async fn splitting_into_ack_and_message_returns_whole_data_for_ack() {
let data = vec![42u8; SurbAck::len(Some(PacketType::Mix)) + 10];
let (ack, message) =
split_into_ack_and_message(data.clone(), PacketSize::AckPacket, PacketType::Mix)
.unwrap();
assert!(ack.is_none());
assert_eq!(data, message)
}
#[tokio::test]
async fn splitting_into_ack_and_message_returns_whole_data_for_ack_outfox() {
let data = vec![42u8; SurbAck::len(Some(PacketType::Outfox)) + 10];
let (ack, message) = split_into_ack_and_message(
data.clone(),
PacketSize::OutfoxAckPacket,
PacketType::Outfox,
)
.unwrap();
assert!(ack.is_none());
assert_eq!(data, message)
}
}
+22 -1
View File
@@ -10,6 +10,7 @@ use crate::rpc_client::RpcClient;
use crate::storage::{persist_block, ScraperStorage};
use crate::PruningOptions;
use futures::StreamExt;
use std::cmp::max;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::ops::{Add, Range};
use std::sync::Arc;
@@ -99,7 +100,15 @@ impl BlockProcessor {
})
}
async fn process_block(&mut self, block: BlockToProcess) -> Result<(), ScraperError> {
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.pruning_options = pruning_options;
self
}
pub(super) async fn process_block(
&mut self,
block: BlockToProcess,
) -> Result<(), ScraperError> {
info!("processing block at height {}", block.height);
let full_info = self.rpc_client.try_get_full_details(block).await?;
@@ -169,6 +178,10 @@ impl BlockProcessor {
self.msg_modules = modules;
}
pub(super) fn last_process_height(&self) -> u32 {
self.last_processed_height
}
async fn maybe_request_missing_blocks(&mut self) -> Result<(), ScraperError> {
// we're still processing, so we're good
if self.last_processed_at.elapsed() < MAX_MISSING_BLOCKS_DELAY {
@@ -254,6 +267,7 @@ impl BlockProcessor {
}
if to_prune == 0 {
self.last_pruned_height = self.last_processed_height;
return Ok(());
}
@@ -353,7 +367,14 @@ impl BlockProcessor {
self.maybe_prune_storage().await?;
let latest_block = self.rpc_client.current_block_height().await? as u32;
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
self.last_processed_height = max(self.last_processed_height, last_to_keep);
let request_range = self.last_processed_height + 1..latest_block + 1;
info!("we need to request {request_range:?} to resync");
self.request_missing_blocks(request_range).await?;
+1 -1
View File
@@ -16,7 +16,7 @@ pub enum ScraperError {
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("can't add any modules to the scraper as it's already running")]
#[error("the block scraper is already running")]
ScraperAlreadyRunning,
#[error("failed to establish websocket connection to {url}: {source}")]
+143 -16
View File
@@ -1,21 +1,25 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::BlockProcessor;
use crate::block_requester::BlockRequester;
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::scraper::subscriber::ChainSubscriber;
use crate::storage::ScraperStorage;
use crate::PruningOptions;
use futures::future::join_all;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc::{channel, unbounded_channel};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tracing::info;
use tracing::{error, info};
use url::Url;
mod subscriber;
@@ -115,6 +119,7 @@ pub struct NyxdScraper {
cancel_token: CancellationToken,
startup_sync: Arc<Notify>,
pub storage: ScraperStorage,
rpc_client: RpcClient,
}
impl NyxdScraper {
@@ -125,6 +130,7 @@ impl NyxdScraper {
pub async fn new(config: Config) -> Result<Self, ScraperError> {
config.pruning_options.validate()?;
let storage = ScraperStorage::init(&config.database_path).await?;
let rpc_client = RpcClient::new(&config.rpc_url)?;
Ok(NyxdScraper {
config,
@@ -132,6 +138,7 @@ impl NyxdScraper {
cancel_token: CancellationToken::new(),
startup_sync: Arc::new(Default::default()),
storage,
rpc_client,
})
}
@@ -151,36 +158,156 @@ impl NyxdScraper {
self.task_tracker.close();
}
pub async fn start(&self) -> Result<(), ScraperError> {
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
pub async fn process_single_block(&self, height: u32) -> Result<(), ScraperError> {
info!(height = height, "attempting to process a single block");
if !self.task_tracker.is_empty() {
return Err(ScraperError::ScraperAlreadyRunning);
}
let rpc_client = RpcClient::new(&self.config.rpc_url)?;
let (_, processing_rx) = unbounded_channel();
let (req_tx, _) = channel(5);
// create the tasks
let block_requester = BlockRequester::new(
let mut block_processor = self
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
let block = self.rpc_client.get_basic_block_details(height).await?;
block_processor.process_block(block.into()).await
}
pub async fn process_block_range(
&self,
starting_height: Option<u32>,
end_height: Option<u32>,
) -> Result<(), ScraperError> {
if !self.task_tracker.is_empty() {
return Err(ScraperError::ScraperAlreadyRunning);
}
let (_, processing_rx) = unbounded_channel();
let (req_tx, _) = channel(5);
let mut block_processor = self
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
let current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
let starting_height = match starting_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
// otherwise, attempt to resume where we last stopped
// and if we haven't processed anything, start from the current height
if last_processed != 0 {
last_processed
} else {
current_height
}
}
};
let end_height = match end_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
// otherwise, attempt to either go from the start height to the height right
// before the final processed block held in the storage (in case there are gaps)
// or finally, just go to the current block height
if last_processed > starting_height {
last_processed - 1
} else {
current_height
}
}
};
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
let range = (starting_height..=end_height).collect::<Vec<_>>();
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
}
}
}
}
Ok(())
}
fn new_block_requester(
&self,
req_rx: Receiver<BlockRequest>,
processing_tx: UnboundedSender<BlockToProcess>,
) -> BlockRequester {
BlockRequester::new(
self.cancel_token.clone(),
rpc_client.clone(),
self.rpc_client.clone(),
req_rx,
processing_tx.clone(),
);
let block_processor = BlockProcessor::new(
)
}
async fn new_block_processor(
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
BlockProcessor::new(
self.config.pruning_options,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
req_tx,
self.storage.clone(),
rpc_client,
self.rpc_client.clone(),
)
.await?;
let chain_subscriber = ChainSubscriber::new(
.await
}
async fn new_chain_subscriber(
&self,
processing_tx: UnboundedSender<BlockToProcess>,
) -> Result<ChainSubscriber, ScraperError> {
ChainSubscriber::new(
&self.config.websocket_url,
self.cancel_token.clone(),
self.task_tracker.clone(),
processing_tx,
)
.await?;
.await
}
pub async fn start(&self) -> Result<(), ScraperError> {
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
// create the tasks
let block_requester = self.new_block_requester(req_rx, processing_tx.clone());
let block_processor = self.new_block_processor(req_tx, processing_rx).await?;
let chain_subscriber = self.new_chain_subscriber(processing_tx).await?;
// spawn them
self.start_tasks(block_requester, block_processor, chain_subscriber);
@@ -16,7 +16,7 @@ use url::Url;
const MAX_FAILURES: usize = 10;
const MAX_RECONNECTION_ATTEMPTS: usize = 8;
const SOCKET_FAILURE_RESET: Duration = Duration::hours(2);
const SOCKET_FAILURE_RESET: Duration = Duration::minutes(15);
pub struct ChainSubscriber {
cancel: CancellationToken,
+6 -3
View File
@@ -435,9 +435,12 @@ where
trace!("update_last_processed");
let start = Instant::now();
sqlx::query!("UPDATE metadata SET last_processed_height = ?", height)
.execute(executor)
.await?;
sqlx::query!(
"UPDATE metadata SET last_processed_height = MAX(last_processed_height, ?)",
height
)
.execute(executor)
.await?;
log_db_operation_time("update_last_processed", start);
Ok(())
+2
View File
@@ -13,11 +13,13 @@ license.workspace = true
[dependencies]
serde = { workspace = true }
hex = { workspace = true, optional = true }
bs58 = { workspace = true, optional = true }
base64 = { workspace = true, optional = true }
time = { workspace = true, features = ["formatting", "parsing"], optional = true }
[features]
hex = ["dep:hex"]
bs58 = ["dep:bs58"]
base64 = ["dep:base64"]
date = ["time"]
+14
View File
@@ -32,6 +32,20 @@ pub mod bs58 {
}
}
#[cfg(feature = "hex")]
pub mod hex {
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S: Serializer>(bytes: &[u8], serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&::hex::encode(bytes))
}
pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Vec<u8>, D::Error> {
let s = String::deserialize(deserializer)?;
::hex::decode(&s).map_err(serde::de::Error::custom)
}
}
#[cfg(feature = "date")]
pub mod date {
use serde::ser::Error;
@@ -0,0 +1,14 @@
[package]
name = "nym-service-provider-requests-common"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
serde = { workspace = true, features = ["derive"] }
@@ -0,0 +1,18 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[repr(u8)]
pub enum ServiceProviderType {
NetworkRequester = 0,
IpPacketRouter = 1,
Authenticator = 2,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Protocol {
pub version: u8,
pub service_provider_type: ServiceProviderType,
}
-18
View File
@@ -17,28 +17,10 @@ serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
nym-config = { path = "../config" }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
nym-network-defaults = { path = "../network-defaults" }
# feature-specific dependencies:
## verify:
hmac = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
## openapi:
utoipa = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
x25519-dalek = { workspace = true, features = ["static_secrets"] }
[dev-dependencies]
rand = { workspace = true }
nym-crypto = { path = "../crypto", features = ["rand"] }
[features]
default = ["verify"]
openapi = ["utoipa", "serde_json"]
# this is moved to a separate feature as we really need clients to import it (especially, *cough*, wasm)
verify = ["hmac", "sha2"]
-15
View File
@@ -5,13 +5,6 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("the provided base64-encoded client MAC ('{mac}') was malformed: {source}")]
MalformedClientMac {
mac: String,
#[source]
source: base64::DecodeError,
},
#[error("the provided base64-encoded client x25519 public key ('{pub_key}') was malformed: {source}")]
MalformedPeerPublicKeyEncoding {
pub_key: String,
@@ -24,12 +17,4 @@ pub enum Error {
pub_key: String,
decoded_length: usize,
},
#[cfg(feature = "verify")]
#[error("failed to verify mac provided by '{client}': {source}")]
FailedClientMacVerification {
client: String,
#[source]
source: hmac::digest::MacError,
},
}
-8
View File
@@ -4,19 +4,11 @@
pub mod config;
pub mod error;
pub mod public_key;
pub mod registration;
use std::time::Duration;
pub use config::Config;
pub use error::Error;
pub use public_key::PeerPublicKey;
pub use registration::{ClientMac, ClientMessage, GatewayClient, InitMessage, Nonce};
// To avoid any problems, keep this stale check time bigger (>2x) then the bandwidth cap
// reset time (currently that one is 24h, at UTC midnight)
pub const DEFAULT_PEER_TIMEOUT: Duration = Duration::from_secs(60 * 60 * 24 * 3); // 3 days
pub const DEFAULT_PEER_TIMEOUT_CHECK: Duration = Duration::from_secs(5); // 5 seconds
#[cfg(feature = "verify")]
pub use registration::HmacSha256;
+7 -3
View File
@@ -16,17 +16,21 @@ bincode = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
defguard_wireguard_rs = { workspace = true }
futures = { workspace = true }
# The latest version on crates.io at the time of writing this (6.0.0) has a
# version mismatch with x25519-dalek/curve25519-dalek that is resolved in the
# latest commit. So pick that for now.
x25519-dalek = { workspace = true }
ip_network = { workspace = true }
log.workspace = true
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-stream = { workspace = true }
nym-authenticator-requests = { path = "../authenticator-requests" }
nym-credential-verification = { path = "../credential-verification" }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
nym-gateway-storage = { path = "../gateway-storage" }
nym-network-defaults = { path = "../network-defaults" }
nym-task = { path = "../task" }
nym-wireguard-types = { path = "../wireguard-types" }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-stream = { workspace = true }
+8 -2
View File
@@ -3,12 +3,18 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("peers in wireguard don't match with in-memory ")]
PeerMismatch,
#[error("traffic byte data needs to be increasing")]
InconsistentConsumedBytes,
#[error("{0}")]
Defguard(#[from] defguard_wireguard_rs::error::WireguardInterfaceError),
#[error("internal {0}")]
Internal(String),
#[error("storage should have the requested bandwidht entry")]
MissingClientBandwidthEntry,
#[error("{0}")]
GatewayStorage(#[from] nym_gateway_storage::error::StorageError),
}
+29 -27
View File
@@ -13,12 +13,13 @@ use nym_crypto::asymmetric::encryption::KeyPair;
use nym_wireguard_types::Config;
use peer_controller::PeerControlRequest;
use std::sync::Arc;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{self, Receiver, Sender};
const WG_TUN_NAME: &str = "nymwg";
pub(crate) mod error;
pub mod peer_controller;
pub mod peer_handle;
pub struct WgApiWrapper {
inner: WGApi,
@@ -43,15 +44,12 @@ impl Drop for WgApiWrapper {
pub struct WireguardGatewayData {
config: Config,
keypair: Arc<KeyPair>,
peer_tx: UnboundedSender<PeerControlRequest>,
peer_tx: Sender<PeerControlRequest>,
}
impl WireguardGatewayData {
pub fn new(
config: Config,
keypair: Arc<KeyPair>,
) -> (Self, UnboundedReceiver<PeerControlRequest>) {
let (peer_tx, peer_rx) = mpsc::unbounded_channel();
pub fn new(config: Config, keypair: Arc<KeyPair>) -> (Self, Receiver<PeerControlRequest>) {
let (peer_tx, peer_rx) = mpsc::channel(1);
(
WireguardGatewayData {
config,
@@ -70,44 +68,45 @@ impl WireguardGatewayData {
&self.keypair
}
pub fn peer_tx(&self) -> &UnboundedSender<PeerControlRequest> {
pub fn peer_tx(&self) -> &Sender<PeerControlRequest> {
&self.peer_tx
}
}
pub struct WireguardData {
pub inner: WireguardGatewayData,
pub peer_rx: UnboundedReceiver<PeerControlRequest>,
pub peer_rx: Receiver<PeerControlRequest>,
}
/// Start wireguard device
#[cfg(target_os = "linux")]
pub async fn start_wireguard<St: nym_gateway_storage::Storage + 'static>(
pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>(
storage: St,
all_peers: Vec<nym_gateway_storage::models::WireguardPeer>,
task_client: nym_task::TaskClient,
wireguard_data: WireguardData,
control_tx: UnboundedSender<peer_controller::PeerControlResponse>,
) -> Result<std::sync::Arc<WgApiWrapper>, Box<dyn std::error::Error + Send + Sync + 'static>> {
use base64::{prelude::BASE64_STANDARD, Engine};
use defguard_wireguard_rs::{InterfaceConfiguration, WireguardInterfaceApi};
use ip_network::IpNetwork;
use peer_controller::PeerController;
let mut peers = vec![];
let mut suspended_peers = vec![];
for storage_peer in all_peers {
let suspended = storage_peer.suspended;
let peer = Peer::try_from(storage_peer)?;
if suspended {
suspended_peers.push(peer);
} else {
peers.push(peer);
}
}
use std::collections::HashMap;
use tokio::sync::RwLock;
let ifname = String::from(WG_TUN_NAME);
let wg_api = defguard_wireguard_rs::WGApi::new(ifname.clone(), false)?;
let mut peer_bandwidth_managers = HashMap::with_capacity(all_peers.len());
let peers = all_peers
.into_iter()
.map(Peer::try_from)
.collect::<Result<Vec<_>, _>>()?;
for peer in peers.iter() {
let bandwidth_manager =
PeerController::generate_bandwidth_manager(storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
peer_bandwidth_managers.insert(peer.public_key.clone(), bandwidth_manager);
}
wg_api.create_interface()?;
let interface_config = InterfaceConfiguration {
name: ifname.clone(),
@@ -115,6 +114,7 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + 'static>(
address: wireguard_data.inner.config().private_ip.to_string(),
port: wireguard_data.inner.config().announced_port as u32,
peers,
mtu: None,
};
wg_api.configure_interface(&interface_config)?;
@@ -130,16 +130,18 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + 'static>(
)]);
wg_api.configure_peer_routing(&[catch_all_peer])?;
let host = wg_api.read_interface_data()?;
let wg_api = std::sync::Arc::new(WgApiWrapper::new(wg_api));
let mut controller = PeerController::new(
storage,
wg_api.clone(),
interface_config.peers,
suspended_peers,
host,
peer_bandwidth_managers,
wireguard_data.inner.peer_tx.clone(),
wireguard_data.peer_rx,
control_tx,
task_client,
);
tokio::spawn(async move { controller.run(task_client).await });
tokio::spawn(async move { controller.run().await });
Ok(wg_api)
}
+225 -199
View File
@@ -1,259 +1,285 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use chrono::{Timelike, Utc};
use defguard_wireguard_rs::{host::Peer, key::Key, WireguardInterfaceApi};
use defguard_wireguard_rs::{
host::{Host, Peer},
key::Key,
WireguardInterfaceApi,
};
use futures::channel::oneshot;
use nym_authenticator_requests::{
latest::registration::RemainingBandwidthData, v1::registration::BANDWIDTH_CAP_PER_DAY,
};
use nym_credential_verification::{
bandwidth_storage_manager::BandwidthStorageManager, BandwidthFlushingBehaviourConfig,
ClientBandwidth,
};
use nym_gateway_storage::Storage;
use nym_wireguard_types::registration::{RemainingBandwidthData, BANDWIDTH_CAP_PER_DAY};
use nym_wireguard_types::{DEFAULT_PEER_TIMEOUT, DEFAULT_PEER_TIMEOUT_CHECK};
use std::time::SystemTime;
use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use crate::error::Error;
use crate::peer_handle::PeerHandle;
use crate::WgApiWrapper;
use crate::{error::Error, peer_handle::SharedBandwidthStorageManager};
pub enum PeerControlRequest {
AddPeer(Peer),
RemovePeer(Key),
QueryPeer(Key),
QueryBandwidth(Key),
}
pub enum PeerControlResponse {
AddPeer {
success: bool,
peer: Peer,
ticket_validation: bool,
response_tx: oneshot::Sender<AddPeerControlResponse>,
},
RemovePeer {
success: bool,
key: Key,
response_tx: oneshot::Sender<RemovePeerControlResponse>,
},
QueryPeer {
success: bool,
peer: Option<Peer>,
key: Key,
response_tx: oneshot::Sender<QueryPeerControlResponse>,
},
QueryBandwidth {
bandwidth_data: Option<RemainingBandwidthData>,
key: Key,
response_tx: oneshot::Sender<QueryBandwidthControlResponse>,
},
}
pub struct PeerController<St: Storage> {
storage: St,
request_rx: mpsc::UnboundedReceiver<PeerControlRequest>,
response_tx: mpsc::UnboundedSender<PeerControlResponse>,
wg_api: Arc<WgApiWrapper>,
timeout_check_interval: IntervalStream,
active_peers: HashMap<Key, Peer>,
suspended_peers: HashMap<Key, Peer>,
last_seen_bandwidth: HashMap<Key, u64>,
timeout_count: u8,
pub struct AddPeerControlResponse {
pub success: bool,
pub client_id: Option<i64>,
}
impl<St: Storage> PeerController<St> {
pub struct RemovePeerControlResponse {
pub success: bool,
}
pub struct QueryPeerControlResponse {
pub success: bool,
pub peer: Option<Peer>,
}
pub struct QueryBandwidthControlResponse {
pub success: bool,
pub bandwidth_data: Option<RemainingBandwidthData>,
}
pub struct PeerController<St: Storage + Clone + 'static> {
storage: St,
// used to receive commands from individual handles too
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
wg_api: Arc<WgApiWrapper>,
host_information: Arc<RwLock<Host>>,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
timeout_check_interval: IntervalStream,
task_client: nym_task::TaskClient,
}
impl<St: Storage + Clone + 'static> PeerController<St> {
pub fn new(
storage: St,
wg_api: Arc<WgApiWrapper>,
peers: Vec<Peer>,
suspended_peers: Vec<Peer>,
request_rx: mpsc::UnboundedReceiver<PeerControlRequest>,
response_tx: mpsc::UnboundedSender<PeerControlResponse>,
initial_host_information: Host,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
task_client: nym_task::TaskClient,
) -> Self {
let timeout_check_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
);
let active_peers: HashMap<Key, Peer> = peers
.into_iter()
.map(|peer| (peer.public_key.clone(), peer))
.collect();
let suspended_peers: HashMap<Key, Peer> = suspended_peers
.into_iter()
.map(|peer| (peer.public_key.clone(), peer))
.collect();
let last_seen_bandwidth = active_peers
.iter()
.map(|(k, p)| (k.clone(), p.rx_bytes + p.tx_bytes))
.chain(suspended_peers.keys().map(|k| (k.clone(), 0)))
.collect();
let host_information = Arc::new(RwLock::new(initial_host_information));
for (public_key, bandwidth_storage_manager) in bw_storage_managers.iter() {
let mut handle = PeerHandle::new(
storage.clone(),
public_key.clone(),
host_information.clone(),
bandwidth_storage_manager.clone(),
request_tx.clone(),
&task_client,
);
tokio::spawn(async move {
if let Err(e) = handle.run().await {
log::error!("Peer handle shut down ungracefully - {e}");
}
});
}
PeerController {
storage,
wg_api,
host_information,
bw_storage_managers,
request_tx,
request_rx,
response_tx,
timeout_check_interval,
active_peers,
suspended_peers,
last_seen_bandwidth,
timeout_count: 0,
task_client,
}
}
async fn check_stale_peer(
&self,
peer: &Peer,
current_timestamp: SystemTime,
) -> Result<bool, Error> {
if let Some(timestamp) = peer.last_handshake {
if let Ok(duration_since_handshake) = current_timestamp.duration_since(timestamp) {
if duration_since_handshake > DEFAULT_PEER_TIMEOUT {
self.storage
.remove_wireguard_peer(&peer.public_key.to_string())
.await?;
self.wg_api.inner.remove_peer(&peer.public_key)?;
return Ok(true);
}
}
}
Ok(false)
}
async fn check_suspend_peer(&mut self, peer: &Peer) -> Result<(), Error> {
let prev_peer = self
.active_peers
.get(&peer.public_key)
.ok_or(Error::PeerMismatch)?;
let data_usage =
(peer.rx_bytes + peer.tx_bytes).saturating_sub(prev_peer.rx_bytes + prev_peer.tx_bytes);
if data_usage > BANDWIDTH_CAP_PER_DAY {
self.storage.insert_wireguard_peer(peer, true).await?;
self.wg_api.inner.remove_peer(&peer.public_key)?;
self.active_peers
.remove_entry(&peer.public_key)
.ok_or(Error::PeerMismatch)?;
self.suspended_peers
.insert(peer.public_key.clone(), peer.clone());
} else {
// Update peer stored data
self.storage.insert_wireguard_peer(peer, false).await?;
}
Ok(())
}
async fn check_peers(&mut self) -> Result<(), Error> {
// Add 10 seconds to cover edge cases. At worst, we give ten free seconds worth of bandwidth
// by resetting the bandwidth twice
let reset = Utc::now().num_seconds_from_midnight() as u64
<= DEFAULT_PEER_TIMEOUT_CHECK.as_secs() + 10;
if reset {
for (_, peer) in self.suspended_peers.drain() {
self.wg_api.inner.configure_peer(&peer)?;
}
}
let host = self.wg_api.inner.read_interface_data()?;
self.last_seen_bandwidth = host
.peers
.iter()
.map(|(key, peer)| (key.clone(), peer.rx_bytes + peer.tx_bytes))
.collect();
// Do in-memory updates of bandwidth every DEFAULT_PEER_TIMEOUT_CHECK
// and storage updates every 5 * DEFAULT_PEER_TIMEOUT_CHECK, because in-memory
// is more important for client query preciseness
self.timeout_count = self.timeout_count % 5 + 1;
if !reset && self.timeout_count < 5 {
return Ok(());
}
if reset {
self.active_peers = host.peers;
for peer in self.active_peers.values() {
self.storage.insert_wireguard_peer(peer, false).await?;
}
} else {
let peers = self
// Function that should be used for peer insertion, to handle both storage and kernel interaction
pub async fn add_peer(&self, peer: &Peer, with_client_id: bool) -> Result<Option<i64>, Error> {
let client_id = self
.storage
.insert_wireguard_peer(peer, with_client_id)
.await?;
let ret = self.wg_api.inner.configure_peer(peer);
if ret.is_err() {
// Try to revert the insertion in storage
if self
.storage
.get_all_wireguard_peers()
.await?
.into_iter()
.map(Peer::try_from)
.collect::<Result<Vec<_>, _>>()?;
let current_timestamp = SystemTime::now();
for peer in peers {
if !self.check_stale_peer(&peer, current_timestamp).await? {
self.check_suspend_peer(&peer).await?;
}
.remove_wireguard_peer(&peer.public_key.to_string())
.await
.is_err()
{
log::error!("The storage has been corrupted. Wireguard peer {} will persist in storage indefinitely.", peer.public_key);
}
}
Ok(())
ret?;
Ok(client_id)
}
pub async fn run(&mut self, mut task_client: nym_task::TaskClient) {
// Function that should be used for peer removal, to handle both storage and kernel interaction
pub async fn remove_peer(&mut self, key: &Key) -> Result<(), Error> {
self.storage.remove_wireguard_peer(&key.to_string()).await?;
self.bw_storage_managers.remove(key);
let ret = self.wg_api.inner.remove_peer(key);
if ret.is_err() {
log::error!("Wireguard peer could not be removed from wireguard kernel module. Process should be restarted so that the interface is reset.");
}
Ok(ret?)
}
pub async fn generate_bandwidth_manager(
storage: St,
public_key: &Key,
) -> Result<Option<BandwidthStorageManager<St>>, Error> {
if let Some(client_id) = storage
.get_wireguard_peer(&public_key.to_string())
.await?
.ok_or(Error::MissingClientBandwidthEntry)?
.client_id
{
storage.create_bandwidth_entry(client_id).await?;
Ok(Some(BandwidthStorageManager::new(
storage,
ClientBandwidth::new(Default::default()),
client_id,
BandwidthFlushingBehaviourConfig::default(),
true,
)))
} else {
Ok(None)
}
}
async fn handle_add_request(
&mut self,
peer: &Peer,
with_client_id: bool,
) -> Result<Option<i64>, Error> {
let client_id = self.add_peer(peer, with_client_id).await?;
let bandwidth_storage_manager =
Self::generate_bandwidth_manager(self.storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
let mut handle = PeerHandle::new(
self.storage.clone(),
peer.public_key.clone(),
self.host_information.clone(),
bandwidth_storage_manager.clone(),
self.request_tx.clone(),
&self.task_client,
);
self.bw_storage_managers
.insert(peer.public_key.clone(), bandwidth_storage_manager);
tokio::spawn(async move {
if let Err(e) = handle.run().await {
log::error!("Peer handle shut down ungracefully - {e}");
}
});
Ok(client_id)
}
async fn handle_query_peer(&self, key: &Key) -> Result<Option<Peer>, Error> {
Ok(self
.storage
.get_wireguard_peer(&key.to_string())
.await?
.map(Peer::try_from)
.transpose()?)
}
async fn handle_query_bandwidth(
&self,
key: &Key,
) -> Result<Option<RemainingBandwidthData>, Error> {
let Some(bandwidth_storage_manager) = self.bw_storage_managers.get(key) else {
return Ok(None);
};
let available_bandwidth = if let Some(bandwidth_storage_manager) = bandwidth_storage_manager
{
bandwidth_storage_manager
.read()
.await
.available_bandwidth()
.await
} else {
let Some(peer) = self.host_information.read().await.peers.get(key).cloned() else {
// host information not updated yet
return Ok(None);
};
BANDWIDTH_CAP_PER_DAY.saturating_sub((peer.rx_bytes + peer.tx_bytes) as i64)
};
Ok(Some(RemainingBandwidthData {
available_bandwidth,
}))
}
pub async fn run(&mut self) {
loop {
tokio::select! {
_ = self.timeout_check_interval.next() => {
if let Err(e) = self.check_peers().await {
log::error!("Error while periodically checking peers: {:?}", e);
}
let Ok(host) = self.wg_api.inner.read_interface_data() else {
log::error!("Can't read wireguard kernel data");
continue;
};
*self.host_information.write().await = host;
}
_ = task_client.recv() => {
_ = self.task_client.recv() => {
log::trace!("PeerController handler: Received shutdown");
break;
}
msg = self.request_rx.recv() => {
match msg {
Some(PeerControlRequest::AddPeer(peer)) => {
if let Err(e) = self.storage.insert_wireguard_peer(&peer, false).await {
log::error!("Could not insert peer into storage: {:?}", e);
self.response_tx.send(PeerControlResponse::AddPeer { success: false }).ok();
continue;
Some(PeerControlRequest::AddPeer { peer, ticket_validation, response_tx }) => {
let ret = self.handle_add_request(&peer, ticket_validation).await;
if let Ok(client_id) = ret {
response_tx.send(AddPeerControlResponse { success: true, client_id }).ok();
} else {
response_tx.send(AddPeerControlResponse { success: false, client_id: None }).ok();
}
let success = if let Err(e) = self.wg_api.inner.configure_peer(&peer) {
log::error!("Could not configure peer: {:?}", e);
false
} else {
self.last_seen_bandwidth.insert(peer.public_key.clone(), peer.rx_bytes + peer.tx_bytes);
self.active_peers.insert(peer.public_key.clone(), peer);
true
};
self.response_tx.send(PeerControlResponse::AddPeer { success }).ok();
}
Some(PeerControlRequest::RemovePeer(peer_pubkey)) => {
if let Err(e) = self.storage.remove_wireguard_peer(&peer_pubkey.to_string()).await {
log::error!("Could not remove peer from storage: {:?}", e);
self.response_tx.send(PeerControlResponse::RemovePeer { success: false }).ok();
continue;
Some(PeerControlRequest::RemovePeer { key, response_tx }) => {
let success = self.remove_peer(&key).await.is_ok();
response_tx.send(RemovePeerControlResponse { success }).ok();
}
Some(PeerControlRequest::QueryPeer { key, response_tx }) => {
let ret = self.handle_query_peer(&key).await;
if let Ok(peer) = ret {
response_tx.send(QueryPeerControlResponse { success: true, peer }).ok();
} else {
response_tx.send(QueryPeerControlResponse { success: false, peer: None }).ok();
}
let success = if let Err(e) = self.wg_api.inner.remove_peer(&peer_pubkey) {
log::error!("Could not remove peer: {:?}", e);
false
} else {
self.active_peers.remove(&peer_pubkey);
self.suspended_peers.remove(&peer_pubkey);
true
};
self.response_tx.send(PeerControlResponse::RemovePeer { success }).ok();
}
Some(PeerControlRequest::QueryPeer(peer_pubkey)) => {
let (success, peer) = match self.storage.get_wireguard_peer(&peer_pubkey.to_string()).await {
Err(e) => {
log::error!("Could not query peer storage {e}");
(false, None)
},
Ok(None) => (true, None),
Ok(Some(storage_peer)) => {
match Peer::try_from(storage_peer) {
Ok(peer) => (true, Some(peer)),
Err(e) => {
log::error!("Could not parse storage peer {e}");
(false, None)
}
}
},
};
self.response_tx.send(PeerControlResponse::QueryPeer { success, peer }).ok();
}
Some(PeerControlRequest::QueryBandwidth(peer_pubkey)) => {
let msg = if self.suspended_peers.contains_key(&peer_pubkey) {
PeerControlResponse::QueryBandwidth { bandwidth_data: Some(RemainingBandwidthData{ available_bandwidth: 0, suspended: true }) }
} else if let Some(&consumed_bandwidth) = self.last_seen_bandwidth.get(&peer_pubkey) {
PeerControlResponse::QueryBandwidth { bandwidth_data: Some(RemainingBandwidthData{ available_bandwidth: BANDWIDTH_CAP_PER_DAY - consumed_bandwidth, suspended: false })}
Some(PeerControlRequest::QueryBandwidth { key, response_tx }) => {
let ret = self.handle_query_bandwidth(&key).await;
if let Ok(bandwidth_data) = ret {
response_tx.send(QueryBandwidthControlResponse { success: true, bandwidth_data }).ok();
} else {
PeerControlResponse::QueryBandwidth { bandwidth_data: None }
};
self.response_tx.send(msg).ok();
response_tx.send(QueryBandwidthControlResponse { success: false, bandwidth_data: None }).ok();
}
}
None => {
log::trace!("PeerController [main loop]: stopping since channel closed");
+138
View File
@@ -0,0 +1,138 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use crate::peer_controller::PeerControlRequest;
use defguard_wireguard_rs::host::Peer;
use defguard_wireguard_rs::{host::Host, key::Key};
use futures::channel::oneshot;
use nym_authenticator_requests::v2::registration::BANDWIDTH_CAP_PER_DAY;
use nym_credential_verification::bandwidth_storage_manager::BandwidthStorageManager;
use nym_gateway_storage::models::WireguardPeer;
use nym_gateway_storage::Storage;
use nym_task::TaskClient;
use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
pub(crate) type SharedBandwidthStorageManager<St> = Arc<RwLock<BandwidthStorageManager<St>>>;
pub struct PeerHandle<St> {
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
timeout_check_interval: IntervalStream,
task_client: TaskClient,
}
impl<St: Storage + Clone + 'static> PeerHandle<St> {
pub fn new(
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
task_client: &TaskClient,
) -> Self {
let timeout_check_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
);
let task_client = task_client.fork(format!("peer{public_key}"));
PeerHandle {
storage,
public_key,
host_information,
bandwidth_storage_manager,
request_tx,
timeout_check_interval,
task_client,
}
}
async fn remove_depleted_peer(&self) -> Result<bool, Error> {
log::debug!(
"Peer {} doesn't have bandwidth anymore, removing it",
self.public_key.to_string()
);
let (response_tx, response_rx) = oneshot::channel();
self.request_tx
.send(PeerControlRequest::RemovePeer {
key: self.public_key.clone(),
response_tx,
})
.await
.map_err(|_| Error::Internal("peer controller shut down".to_string()))?;
let success = response_rx
.await
.map_err(|_| Error::Internal("peer controller didn't respond".to_string()))?
.success;
Ok(success)
}
async fn active_peer(
&mut self,
storage_peer: WireguardPeer,
kernel_peer: Peer,
) -> Result<bool, Error> {
if let Some(bandwidth_manager) = &self.bandwidth_storage_manager {
let spent_bandwidth = (kernel_peer.rx_bytes + kernel_peer.tx_bytes)
.checked_sub(storage_peer.rx_bytes as u64 + storage_peer.tx_bytes as u64)
.ok_or(Error::InconsistentConsumedBytes)?
.try_into()
.map_err(|_| Error::InconsistentConsumedBytes)?;
if bandwidth_manager
.write()
.await
.try_use_bandwidth(spent_bandwidth)
.await
.is_err()
{
let success = self.remove_depleted_peer().await?;
return Ok(!success);
}
} else {
let spent_bandwidth = kernel_peer.rx_bytes + kernel_peer.tx_bytes;
if spent_bandwidth >= BANDWIDTH_CAP_PER_DAY {
let success = self.remove_depleted_peer().await?;
return Ok(!success);
}
}
Ok(true)
}
pub async fn run(&mut self) -> Result<(), Error> {
while !self.task_client.is_shutdown() {
tokio::select! {
_ = self.timeout_check_interval.next() => {
let Some(kernel_peer) = self
.host_information
.read()
.await
.peers
.get(&self.public_key)
.cloned() else {
// the host information hasn't beed updated yet
continue;
};
let Some(storage_peer) = self.storage.get_wireguard_peer(&self.public_key.to_string()).await? else {
log::debug!("Peer {:?} not in storage anymore, shutting down handle", self.public_key);
return Ok(());
};
if !self.active_peer(storage_peer, kernel_peer).await? {
log::debug!("Peer {:?} doesn't have bandwidth anymore, shutting down handle", self.public_key);
return Ok(());
}
}
_ = self.task_client.recv() => {
log::trace!("PeerHandle: Received shutdown");
}
}
}
Ok(())
}
}
+4 -4
View File
@@ -1935,18 +1935,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.63"
version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724"
checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.63"
version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3"
dependencies = [
"proc-macro2",
"quote",
+1 -3
View File
@@ -538,15 +538,13 @@ pub fn query(
#[entry_point]
pub fn migrate(
mut deps: DepsMut<'_>,
deps: DepsMut<'_>,
_env: Env,
msg: MigrateMsg,
) -> Result<Response, MixnetContractError> {
set_build_information!(deps.storage)?;
cw2::ensure_from_older_version(deps.storage, CONTRACT_NAME, CONTRACT_VERSION)?;
crate::queued_migrations::explicit_contract_admin(deps.branch())?;
// due to circular dependency on contract addresses (i.e. mixnet contract requiring vesting contract address
// and vesting contract requiring the mixnet contract address), if we ever want to deploy any new fresh
// environment, one of the contracts will HAVE TO go through a migration
+1 -19
View File
@@ -1,20 +1,2 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2022-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::mixnet_contract_settings::storage as mixnet_params_storage;
use cosmwasm_std::DepsMut;
use mixnet_contract_common::error::MixnetContractError;
pub(crate) fn explicit_contract_admin(deps: DepsMut) -> Result<(), MixnetContractError> {
// we need to read the deprecated field to migrate it over
#[allow(deprecated)]
// SAFETY: this value should ALWAYS exist on the first execution of this migration;
// as a matter of fact, it should ALWAYS continue existing until another migration
#[allow(clippy::expect_used)]
let existing_admin = mixnet_params_storage::CONTRACT_STATE
.load(deps.storage)?
.owner
.expect("the contract state is corrupt - there's no admin set");
mixnet_params_storage::ADMIN.set(deps, Some(existing_admin))?;
Ok(())
}
+236
View File
@@ -2,6 +2,242 @@
This page displays a full list of all the changes during our release cycle from [`v2024.3-eclipse`](https://github.com/nymtech/nym/blob/nym-binaries-v2024.3-eclipse/CHANGELOG.md) onwards. Operators can find here the newest updates together with links to relevant documentation. The list is sorted so that the newest changes appear first.
## `v2024.11-wedel`
- [Release binaries](https://github.com/nymtech/nym/releases/tag/nym-binaries-v2024.11-wedel)
- [Release CHANGELOG.md](https://github.com/nymtech/nym/blob/nym-binaries-v2024.11-wedel/CHANGELOG.md)
- [`nym-node`](nodes/nym-node.md) version `1.1.8`
```sh
Binary Name: nym-node
Build Timestamp: 2024-09-27T11:02:37.073944654Z
Build Version: 1.1.8
Commit SHA: c3ec970a377adb25d57be5428551fada2ec55128
Commit Date: 2024-09-26T08:24:53.000000000+02:00
Commit Branch: master
rustc Version: 1.80.1
rustc Channel: stable
cargo Profile: release
```
### Features
- [New Network Monitor](https://github.com/nymtech/nym/pull/4610): Monitors the Nym network by sending itself packages across the mixnet. Network monitor is running two tokio tasks, one manages mixnet clients and another manages monitoring itself. Monitor is designed to be driven externally, via an `HTTP api`. This means that it does not do any monitoring unless driven by something like [`locust`](https://locust.io/). This allows us to tailor the load externally, potentially distributing it across multiple monitors. Includes a dockerised setup for automatically spinning up monitor and driving it with locust.
- *Note: NNM is not deployed on mainnet yet!*
- [Add get_mixnodes_described to validator_client](https://github.com/nymtech/nym/pull/4725)
- [Remove deprecated mark_as_success and use new disarm](https://github.com/nymtech/nym/pull/4751): Update function name to keep terminology consistent with tokio `CancellationToken DropGuard`.
- [Update peer refresh value](https://github.com/nymtech/nym/pull/4754): `lso` expose the value by moving it to wireguard types, and separate the refresh time to the database sync time, so that more probable and needed actions happen faster (refresh) and more improbable ones don't overload the system (peer suspended or stale)
~~~admonish example collapsible=true title='Testing steps performed'
- **Noted** that the constants `DEFAULT_PEER_TIMEOUT` and `DEFAULT_PEER_TIMEOUT_CHECK` have been moved to `common/wireguard-types/src/lib.rs` and are now being used across modules for consistency
- **Observed** that the `peer_controller.rs` now separates the in-memory updates from the storage sync operations to reduce system load
- **Identified** that in-memory updates of peer bandwidth usage happen every `DEFAULT_PEER_TIMEOUT_CHECK` (every 5 seconds), while storage updates occur every 5 * `DEFAULT_PEER_TIMEOUT_CHECK` (every 25 seconds)
**Checked System Load and Performance:**
- **Monitored** system resource usage (CPU, memory, I/O) during the test to assess the impact of the changes
- **Confirmed** that the separation of in-memory updates and storage syncs resulted in reduced system load, particularly I/O operations, compared to previous versions where storage updates occurred more frequently
- **Ensured** that the system remained responsive and no performance bottlenecks were introduced
- **Efficiency Improvement:** The separation of in-memory updates and storage syncs effectively reduced unnecessary database writes, improving system efficiency without compromising data accuracy
~~~
- [Remove duplicate stat count for retransmissions](https://github.com/nymtech/nym/pull/4756)
- [Make gateway latency check generic](https://github.com/nymtech/nym/pull/4759): Replace concrete gateway type with trait in latency check, so we can make use of it in the vpn client.
~~~admonish example collapsible=true title='Testing steps performed'
- Initialised new `nym-client` with the `--latency-based-selection` flag and ensured it still works as normal.
~~~
- [chore: remove repetitive words](https://github.com/nymtech/nym/pull/4763)
- [Avoid race on ip and registration structures](https://github.com/nymtech/nym/pull/4766): To avoid a state where the ip is being cleared out before the registration is also cleared out, couple the two structures under the same lock, since they are anyway very inter-dependent.
~~~admonish example collapsible=true title='Testing steps performed'
1. - **Checked out** the release/2024.10-wedel branch containing the fix for the race condition on IP and registration structures
- **Deployed** the on a controlled test environment to prevent interference
2. **Monitored Logs:**
- **Enabled** debug logging to capture all events
- **Monitored** logs in real-time to observe the handling of concurrent registration requests
- **Checked** for any error messages, warnings, or indications of race conditions
3. **Verified Client Responses:**
- Ensured that all clients received appropriate responses:
- Successful registration with assigned IP and registration data
- Appropriate error messages if no IPs were available or if other issues occurred
- Confirmed that no clients were left in an inconsistent state (e.g., assigned an IP but not fully registered)
4. **Validated Normal Operation:**
- **Conducted standard registration processes** with individual clients to confirm that regular functionality is unaffected via `nym-vpn-cli`
- Ensured that authenticated clients could communicate over the network as expected
~~~
- [Persist used wireguard private IPs](https://github.com/nymtech/nym/pull/4771)
- [Enable dependabot version upgrades for root rust workspace](https://github.com/nymtech/nym/pull/4778)
- [Fix clippy for `unwrap_or_default`](https://github.com/nymtech/nym/pull/4783): Fix nightly build for [beta toolchain](https://github.com/nymtech/nym/actions/runs/10552082396/job/29230401668)
- [Update dependabot](https://github.com/nymtech/nym/pull/4796): Bump max number of dependabot rust PRs to 10. Add readme entry to workspace package.
- [Run `cargo-autoinherit` for a few new crates](https://github.com/nymtech/nym/pull/4801): Run cargo-autoinherit for a few new crates - Sort crates list.
- [Add `axum` server to `nym-api`](https://github.com/nymtech/nym/pull/4803): Summary PR to add axum functionality behind a feature flag `axum`, alongside rocket.
- [Remove unused wireguard flag from SDK](https://github.com/nymtech/nym/pull/4823)
- [Expose wireguard details on self described endpoint](https://github.com/nymtech/nym/pull/4825)
~~~admonish example collapsible=true title='Testing steps performed'
Wireguard details are now visible at the nym-node endpoint `/api/v1/gateway/client-interfaces` as well as on the nym-api self-described endpoint `/api/v1/gateways/described`, above the existing data displaying mixnet_websocket information.
An example of what will be shown is:
```json
"wireguard": {
"port": 51822,
"public_key": "<some public key here>"
}
```
~~~
- [Revamped ticketbook serialisation and exposed additional cli methods](https://github.com/nymtech/nym/pull/4827): `wip` branch that includes changes needed for `vpn-api` alongside additional `ecash utils`
~~~admonish example collapsible=true title='Testing steps performed'
Checked the following commands:
```sh
show-ticket-books # which displays the information about all ticketbooks associated to the client
import-ticket-book # which imports a normal ticketbook to the client alongside `--full` flag
```
On the cli, the following were added: `import-coin-index-signatures`, `import-expiration-date-signatures` and `import-master-verification-key`.
~~~
- [Run cargo autoinherit following last weeks dependabot updates](https://github.com/nymtech/nym/pull/4831)
- [Remove serde_crate named import](https://github.com/nymtech/nym/pull/4832)
- [Create nym-repo-setup debian package and nym-vpn meta package](https://github.com/nymtech/nym/pull/4837): Create nym-repo-setup debian package that sets up the nymtech debian repo on the system it's installed on. It does 2 things:
1. Copy the keyring to `/usr/share/keyrings/nymtech.gpg`
2. Copy the repo spec to `/etc/apt/sources.list.d/nymtech.list`
- Also create a meta package `nym-vpn` which only purpose is to depend on the daemon and UI.
~~~admonish example collapsible=true title='Usage'
1. Install with
```sh
sudo dpkg -i ./nym-repo-setup.deb
```
2. Once it's installed, it should be possible to install the vpn client with
```sh
sudo apt install nym-vpnc
```
3. To reemove the repo, use
```sh
sudo apt remove nym-repo-setup
```
NOTE: removing the repo will not remove any installed nym-vpn packages
~~~
~~~admonish example collapsible=true title='Testing steps performed'
1. **Downloaded** the `nym-repo-setup.deb` package to a Debian-based test system
2. **Installed** the repository setup package using the command:
```bash
sudo dpkg -i ./nym-repo-setup.deb
```
3. **Verified** that the GPG keyring was copied to `/usr/share/keyrings/nymtech.gpg`:
```bash
ls -l /usr/share/keyrings/nymtech.gpg
```
4. **Checked** that the repository specification was added to `/etc/apt/sources.list.d/nymtech.list`:
```bash
cat /etc/apt/sources.list.d/nymtech.list
```
5. **Updated** the package list:
```bash
sudo apt update
```
6. **Installed** the VPN client meta-package:
```bash
sudo apt install nym-vpnc
```
7. **Confirmed** that the `nym-vpnc` package and its dependencies (daemon and UI) were installed successfully
8. **Tested** the VPN client to ensure it operates as expected
9. **Removed** the repository setup package:
```bash
sudo apt remove nym-repo-setup
```
10. **Verified** that the repository specification file `/etc/apt/sources.list.d/nymtech.list` was removed
11. **Ensured** that the installed `nym-vpnc` packages remained installed and functional after removing the repo setup package
~~~
- [Use ecash credential type for bandwidth value](https://github.com/nymtech/nym/pull/4840)
- [Start switching over jobs to arc-ubuntu-20.04](https://github.com/nymtech/nym/pull/4843)
~~~admonish example collapsible=true title='`ci-binary-config-checker`'
```
- ci-build-upload-binaries
- ci-build
- ci-cargo-deny
- ci-contracts-schema
- ci-contracts-upload-binaries
- ci-contracts
- ci-docs
- ci-nym-wallet-rust
- ci-sdk-wasm
```
~~~
- [Move credential verification into common crate](https://github.com/nymtech/nym/pull/4853)
- [Revert runner for `ci-docs`](https://github.com/nymtech/nym/pull/4855)
- [Remove `golang` workaround in `ci-sdk-wasm`](https://github.com/nymtech/nym/pull/4858)
- [Fix linux conditional in `ci-build.yml`](https://github.com/nymtech/nym/pull/4863)
- [Disable push trigger and add missing paths in `ci-build`](https://github.com/nymtech/nym/pull/4864)
- [chore: removed completed queued mixnet migration](https://github.com/nymtech/nym/pull/4865)
- [Bump defguard to github latest version](https://github.com/nymtech/nym/pull/4872)
- [Backport #4894 to fix ci](https://github.com/nymtech/nym/pull/4899)
### Bugfix
- [Fix test failure in ipr request size](https://github.com/nymtech/nym/pull/4844): Nightly build started failing due to a unit test using `now()`, changing the serialized size. Fixed to use a fixed date.
- [Fix clippy for nym-wallet and latest rustc](https://github.com/nymtech/nym/pull/4845)
- [Allow updating globally stored signatures](https://github.com/nymtech/nym/pull/4891)
- [Bugfix/ticketbook false double spending](https://github.com/nymtech/nym/pull/4892)
~~~admonish example collapsible=true title='Testing steps performed'
Tested running a client in mixnet mode, with a standard ticketbook, as well as a client using an imported ticketbook. The double spending bug is no longer an issue, bandwidth is consumed properly, and upon consumption of one ticket another ticket is properly obtained.
~~~
### Operators Guide, Tooling & Updates
- [WSS setup guide updates](https://github.com/nymtech/nym/commit/05d6652177fb77324f8c38b3d8a547d07e729fec): Operators setting up WSS and reverse proxy on Gateways have now cleaner and simpler guide to configure their VPS.
- [Updat hostname instruction for WSS](https://github.com/nymtech/nym/commit/7146c4c012ba7012dc74edc8510bbf377dc32fba): Adding a hostname instruction for clarity
## `nym-node` patch from `release/2024.10-caramello`
- [Patch release binaries](https://github.com/nymtech/nym/releases/tag/nym-binaries-v2024.10-caramello-patch)
@@ -25,7 +25,7 @@ We don't recommend this setup because it's really difficult to get a static IP a
### What's the Sphinx packet size?
The sizes are shown in the configs [here](https://github.com/nymtech/nym/blob/1ba6444e722e7757f1175a296bed6e31e25b8db8/common/nymsphinx/params/src/packet_sizes.rs#L12) (default is the one clients use, the others are for research purposes, not to be used in production as this would fragment the anonymity set). More info can be found [here](https://github.com/nymtech/nym/blob/4844ac953a12b29fa27688609ec193f1d560c996/common/nymsphinx/anonymous-replies/src/reply_surb.rs#L80).
The sizes are shown in the configs [here](https://github.com/nymtech/nym/blob/develop/common/nymsphinx/params/src/packet_sizes.rs#L32) (default is the one clients use, the others are for research purposes, not to be used in production as this would fragment the anonymity set). More info can be found [here](https://github.com/nymtech/nym/blob/develop/common/nymsphinx/anonymous-replies/src/reply_surb.rs#L80).
### Why a Mix Node and a Gateway cannot be bonded with the same wallet?
@@ -2,6 +2,8 @@
### What determines the rewards when running a `nym-node --mode mixnode`?
> **Visit [nymtech.net/about/token](https://nymtech.net/about/token) to find live information, graphs and dashboards about NYM token.**
The stake required for a Mix Node to achieve maximum rewards is called Mix Node saturation point. This is calculated from the staking supply (all circulating supply + part of unlocked tokens). The target level of staking is to have 40% of the staking supply locked in Mix Nodes.
The node stake saturation point, which we denote by Nsat, is given by the stake supply, target level of staking divided between the rewarded nodes.
@@ -20,13 +22,3 @@ The rewarded nodes are the nodes which will receive some rewards by the end of t
For more detailed calculation, read our blog post [Nym Token Economics update](https://blog.nymtech.net/nym-token-economics-update-fedff0ed5267). More info on staking can be found [here](https://blog.nymtech.net/staking-in-nym-introducing-mainnet-mixmining-f9bb1cbc7c36). And [here](https://blog.nymtech.net/want-to-stake-in-nym-here-is-how-to-choose-a-mix-node-to-delegate-nym-to-c3b862add165) is more info on how to choose a Mix Node for delegation. And finally an [update](https://blog.nymtech.net/quarterly-token-economic-parameter-update-b2862948710f) on token economics from July 2023.
<!--
<iframe src="https://status.notrustverify.ch/d-solo/CW3L7dVVk/nym-mixnet?orgId=1&from=1703074829887&to=1705666829887&panelId=31" width="850" height="400" frameborder="0"></iframe>
-->
<iframe src="https://dashboard.notrustverify.ch/d-solo/l71MWkX7k/ntv-mixnode?orgId=1&from=1710949572440&to=1713537972440&panelId=18" width="850" height="400" frameborder="0"></iframe>
*More graphs and stats at [stats.notrustverify.ch](https://status.notrustverify.ch/d/CW3L7dVVk/nym-mixnet?orgId=1&from=1703074861988&to=1705666862004).*
+4 -7
View File
@@ -11,16 +11,13 @@ STAKE_DENOM_DISPLAY=nyx
DENOMS_EXPONENT=6
MIXNET_CONTRACT_ADDRESS=n1hm4y6fzgxgu688jgf7ek66px6xkrtmn3gyk8fax3eawhp68c2d5qujz296
ECASH_CONTRACT_ADDRESS=n14y2x8a60knc5jjfeztt84kw8x8l5pwdgnqg256v0p9v4p7t2q6eswxyusw
GROUP_CONTRACT_ADDRESS=n1qp35fcj0v9u3trhaps5v9q0lc42t4m6aty2wryss75ee8zuqnsqqdcreyq
MULTISIG_CONTRACT_ADDRESS=n1qa4hswlcjmttulj0q9qa46jf64f93pecl6tydcsjldfe0hy5ju0sdmwzya
COCONUT_DKG_CONTRACT_ADDRESS=n1ayrk6wp6w5lf6njtnfjwljmtcc9vevv5sxwkz7uq24rp2pw67t0qhmmxdd
ECASH_CONTRACT_ADDRESS=n13xspq62y9gq6nueqmywxcdv2yep4p6nzv98w2889k25v3nhdy2dq2rkrk7
GROUP_CONTRACT_ADDRESS=n13l7rwuwktklrwskc7m6lv70zws07en85uma28j7dxwsz9y5hvvhspl7a2t
MULTISIG_CONTRACT_ADDRESS=n138c9pyf7f3hyx0j3t6vmsz7ultnw2wj0lu6hzndep9z5grgq9haqlc25k0
COCONUT_DKG_CONTRACT_ADDRESS=n1pk8jgr6y4c5k93gz7qf3xc0hvygmp7csk88c2tf8l39tkq6834wq2a6dtr
VESTING_CONTRACT_ADDRESS=n1jlzdxnyces4hrhqz68dqk28mrw5jgwtcfq0c2funcwrmw0dx9l9s8nnnvj
REWARDING_VALIDATOR_ADDRESS=n1rfvpsynktze6wvn6ldskj8xgwfzzk5v6pnff39
EXPLORER_API=https://qa-network-explorer.qa.nymte.ch/api
NYXD="https://qa-validator.qa.nymte.ch"
NYM_API="https://qa-nym-api.qa.nymte.ch/api"
DKG_TIME_CONFIGURATION="600,300,300,60,60,1209600"
EXIT_POLICY="https://nymtech.net/.wellknown/network-requester/exit-policy.txt"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "explorer-api"
version = "1.1.39"
version = "1.1.40"
edition = "2021"
license.workspace = true
@@ -27,7 +27,7 @@ use nym_gateway_storage::{error::StorageError, Storage};
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use nym_validator_client::coconut::EcashApiError;
use rand::{CryptoRng, Rng};
use rand::{random, CryptoRng, Rng};
use std::{process, time::Duration};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -236,11 +236,7 @@ where
enc_credential: Vec<u8>,
iv: Vec<u8>,
) -> Result<ServerResponse, RequestHandlingError> {
// TODO: change it into a span field instead once we move to tracing
debug!(
"handling e-cash bandwidth request from {}",
self.client.address
);
debug!("handling e-cash bandwidth request");
let credential = ClientControlRequest::try_from_enc_ecash_credential(
enc_credential,
@@ -253,7 +249,11 @@ where
self.bandwidth_storage_manager.clone(),
);
let available_total = verifier.verify().await?;
let available_total = verifier
.verify()
.await
.inspect_err(|verification_failure| debug!("{verification_failure}"))?;
trace!("available total bandwidth: {available_total}");
Ok(ServerResponse::Bandwidth { available_total })
}
@@ -340,20 +340,17 @@ where
&mut self,
ciphertext: Vec<u8>,
nonce: Vec<u8>,
) -> Message {
) -> Result<ServerResponse, RequestHandlingError> {
let Ok(req) = ClientRequest::decrypt(&ciphertext, &nonce, &self.client.shared_keys) else {
return RequestHandlingError::InvalidEncryptedTextRequest.into_error_message();
return Err(RequestHandlingError::InvalidEncryptedTextRequest);
};
match req {
ClientRequest::UpgradeKey {
hkdf_salt,
derived_key_digest,
} => self
.handle_key_upgrade(hkdf_salt, derived_key_digest)
.await
.into_ws_message(),
_ => RequestHandlingError::UnknownEncryptedTextRequest.into_error_message(),
} => self.handle_key_upgrade(hkdf_salt, derived_key_digest).await,
_ => Err(RequestHandlingError::UnknownEncryptedTextRequest),
}
}
@@ -366,59 +363,64 @@ where
/// * `raw_request`: raw message to handle.
async fn handle_text(&mut self, raw_request: String) -> Message {
trace!("text request");
match ClientControlRequest::try_from(raw_request) {
Err(e) => RequestHandlingError::InvalidTextRequest(e).into_error_message(),
Ok(request) => match request {
ClientControlRequest::EncryptedRequest { ciphertext, nonce } => {
self.handle_encrypted_text_request(ciphertext, nonce).await
}
ClientControlRequest::EcashCredential { enc_credential, iv } => self
.handle_ecash_bandwidth(enc_credential, iv)
.await
.into_ws_message(),
ClientControlRequest::BandwidthCredential { .. } => {
RequestHandlingError::IllegalRequest {
additional_context: "coconut credential are not longer supported".into(),
}
.into_error_message()
}
ClientControlRequest::BandwidthCredentialV2 { .. } => {
RequestHandlingError::IllegalRequest {
additional_context: "coconut credential are not longer supported".into(),
}
.into_error_message()
}
ClientControlRequest::ClaimFreeTestnetBandwidth => self
.bandwidth_storage_manager
.handle_claim_testnet_bandwidth()
.await
.map_err(|e| e.into())
.into_ws_message(),
ClientControlRequest::SupportedProtocol { .. } => self
.inner
.handle_supported_protocol_request()
.into_ws_message(),
other @ ClientControlRequest::Authenticate { .. } => {
RequestHandlingError::IllegalRequest {
additional_context: format!(
"received illegal message of type {} in an authenticated client",
other.name()
),
}
.into_error_message()
}
other @ ClientControlRequest::RegisterHandshakeInitRequest { .. } => {
RequestHandlingError::IllegalRequest {
additional_context: format!(
"received illegal message of type {} in an authenticated client",
other.name()
),
}
.into_error_message()
}
_ => RequestHandlingError::UnknownTextRequest.into_error_message(),
},
let request = match ClientControlRequest::try_from(raw_request) {
Ok(req) => {
debug!("received request of type {}", req.name());
req
}
Err(err) => {
debug!("request was malformed: {err}");
return RequestHandlingError::InvalidTextRequest(err).into_error_message();
}
};
match request {
ClientControlRequest::EncryptedRequest { ciphertext, nonce } => {
self.handle_encrypted_text_request(ciphertext, nonce).await
}
ClientControlRequest::EcashCredential { enc_credential, iv } => {
self.handle_ecash_bandwidth(enc_credential, iv).await
}
ClientControlRequest::BandwidthCredential { .. } => {
Err(RequestHandlingError::IllegalRequest {
additional_context: "coconut credential are not longer supported".into(),
})
}
ClientControlRequest::BandwidthCredentialV2 { .. } => {
Err(RequestHandlingError::IllegalRequest {
additional_context: "coconut credential are not longer supported".into(),
})
}
ClientControlRequest::ClaimFreeTestnetBandwidth => self
.bandwidth_storage_manager
.handle_claim_testnet_bandwidth()
.await
.map_err(|e| e.into()),
ClientControlRequest::SupportedProtocol { .. } => {
Ok(self.inner.handle_supported_protocol_request())
}
other @ ClientControlRequest::Authenticate { .. } => {
Err(RequestHandlingError::IllegalRequest {
additional_context: format!(
"received illegal message of type {} in an authenticated client",
other.name()
),
})
}
other @ ClientControlRequest::RegisterHandshakeInitRequest { .. } => {
Err(RequestHandlingError::IllegalRequest {
additional_context: format!(
"received illegal message of type {} in an authenticated client",
other.name()
),
})
}
_ => Err(RequestHandlingError::UnknownTextRequest),
}
.inspect(|res| debug!(response = ?res, "success"))
.inspect_err(|err| debug!(error = %err, "failure"))
.into_ws_message()
}
/// Handles pong message received from the client.
@@ -452,12 +454,13 @@ where
/// # Arguments
///
/// * `raw_request`: raw received websocket message.
#[instrument(level = "debug", skip_all,
fields(
client = %self.client.address.as_base58_string()
)
)]
async fn handle_request(&mut self, raw_request: Message) -> Option<Message> {
// TODO: this should be added via tracing
debug!(
"handling request from {}",
self.client.address.as_base58_string()
);
trace!("new request");
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
// them and let's test that claim. If that's not the case, just copy code from
@@ -478,8 +481,8 @@ where
where
S: AsyncRead + AsyncWrite + Unpin,
{
let tag: u64 = rand::thread_rng().gen();
debug!("Got request to ping our connection: {}", tag);
let tag: u64 = random();
debug!("got request to ping our connection: {tag}");
self.inner
.send_websocket_message(Message::Ping(tag.to_be_bytes().to_vec()))
.await?;
@@ -420,7 +420,7 @@ where
// we can't handle clients with higher protocol than ours
// (perhaps we could try to negotiate downgrade on our end? sounds like a nice future improvement)
if client_protocol_version <= CURRENT_PROTOCOL_VERSION {
info!("the client is using exactly the same (or older) protocol version as we are. We're good to continue!");
debug!("the client is using exactly the same (or older) protocol version as we are. We're good to continue!");
Ok(CURRENT_PROTOCOL_VERSION)
} else {
let err = InitialAuthenticationError::IncompatibleProtocol {
@@ -8,10 +8,10 @@ use futures::channel::mpsc::SendError;
use futures::StreamExt;
use nym_gateway_storage::{error::StorageError, Storage};
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_mixnode_common::packet_processor::processor::ProcessedFinalHop;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::framing::processing::ProcessedFinalHop;
use nym_sphinx::DestinationAddressBytes;
use nym_task::TaskClient;
use std::collections::HashMap;
@@ -21,6 +21,8 @@ use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use tracing::*;
use super::packet_processing::process_packet;
// defines errors that warrant a panic if not thrown in the context of a shutdown
#[derive(Debug, Error)]
enum CriticalPacketProcessingError {
@@ -184,14 +186,14 @@ impl<St: Storage> ConnectionHandler<St> {
// question: can it also be per connection vs global?
//
let processed_final_hop = match self.packet_processor.process_received(framed_sphinx_packet)
{
Err(err) => {
debug!("We failed to process received sphinx packet - {err}");
return Ok(());
}
Ok(processed_final_hop) => processed_final_hop,
};
let processed_final_hop =
match process_packet(framed_sphinx_packet, self.packet_processor.sphinx_key()) {
Err(err) => {
debug!("We failed to process received sphinx packet - {err}");
return Ok(());
}
Ok(processed_final_hop) => processed_final_hop,
};
self.handle_processed_packet(processed_final_hop).await
}
@@ -3,18 +3,24 @@
use nym_crypto::asymmetric::encryption;
use nym_mixnode_common::packet_processor::error::MixProcessingError;
pub use nym_mixnode_common::packet_processor::processor::MixProcessingResult;
use nym_mixnode_common::packet_processor::processor::{ProcessedFinalHop, SphinxPacketProcessor};
use nym_mixnode_common::packet_processor::processor::SphinxPacketProcessor;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::framing::processing::{
process_framed_packet, MixProcessingResult, PacketProcessingError, ProcessedFinalHop,
};
use nym_sphinx::PrivateKey;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum GatewayProcessingError {
#[error("failed to process received mix packet - {0}")]
PacketProcessingError(#[from] MixProcessingError),
PacketProcessing(#[from] MixProcessingError),
#[error("received a forward hop mix packet")]
ForwardHopReceivedError,
ForwardHopReceived,
#[error("failed to process received sphinx packet: {0}")]
NymPacketProcessing(#[from] PacketProcessingError),
}
// PacketProcessor contains all data required to correctly unwrap and store sphinx packets
@@ -24,21 +30,23 @@ pub struct PacketProcessor {
}
impl PacketProcessor {
pub fn sphinx_key(&self) -> &PrivateKey {
self.inner_processor.sphinx_key()
}
pub(crate) fn new(encryption_key: &encryption::PrivateKey) -> Self {
PacketProcessor {
inner_processor: SphinxPacketProcessor::new(encryption_key.into()),
}
}
}
pub(crate) fn process_received(
&self,
received: FramedNymPacket,
) -> Result<ProcessedFinalHop, GatewayProcessingError> {
match self.inner_processor.process_received(received)? {
MixProcessingResult::ForwardHop(..) => {
Err(GatewayProcessingError::ForwardHopReceivedError)
}
MixProcessingResult::FinalHop(processed_final) => Ok(processed_final),
}
pub(crate) fn process_packet(
received: FramedNymPacket,
sphinx_key: &nym_sphinx::PrivateKey,
) -> Result<ProcessedFinalHop, GatewayProcessingError> {
match process_framed_packet(received, sphinx_key)? {
MixProcessingResult::ForwardHop(..) => Err(GatewayProcessingError::ForwardHopReceived),
MixProcessingResult::FinalHop(processed_final) => Ok(processed_final),
}
}
+19 -13
View File
@@ -246,6 +246,7 @@ impl<St> Gateway<St> {
&mut self,
forwarding_channel: MixForwardingSender,
shutdown: TaskClient,
ecash_verifier: Arc<EcashManager<St>>,
) -> Result<StartedAuthenticator, Box<dyn std::error::Error + Send + Sync>>
where
St: Storage + Clone + 'static,
@@ -256,7 +257,6 @@ impl<St> Gateway<St> {
.ok_or(GatewayError::UnspecifiedAuthenticatorConfig)?;
let (router_tx, mut router_rx) = oneshot::channel();
let (auth_mix_sender, auth_mix_receiver) = mpsc::unbounded();
let (peer_response_tx, peer_response_rx) = tokio::sync::mpsc::unbounded_channel();
let router_shutdown = shutdown.fork("message_router");
let transceiver = LocalGateway::new(
*self.identity_keypair.public_key(),
@@ -286,8 +286,8 @@ impl<St> Gateway<St> {
opts.config.clone(),
wireguard_data.inner.clone(),
used_private_network_ips,
peer_response_rx,
)
.with_ecash_verifier(ecash_verifier)
.with_custom_gateway_transceiver(Box::new(transceiver))
.with_shutdown(shutdown.fork("authenticator"))
.with_wait_for_gateway(true)
@@ -322,7 +322,6 @@ impl<St> Gateway<St> {
all_peers,
shutdown,
wireguard_data,
peer_response_tx,
)
.await?;
@@ -342,6 +341,7 @@ impl<St> Gateway<St> {
&self,
_forwarding_channel: MixForwardingSender,
_shutdown: TaskClient,
_ecash_verifier: Arc<EcashManager<St>>,
) -> Result<StartedAuthenticator, Box<dyn std::error::Error + Send + Sync>> {
todo!("Authenticator is currently only supported on Linux");
}
@@ -616,14 +616,16 @@ impl<St> Gateway<St> {
.maximum_time_between_redemption,
};
let ecash_manager = EcashManager::new(
handler_config,
nyxd_client,
self.identity_keypair.public_key().to_bytes(),
shutdown.fork("EcashVerifier"),
self.storage.clone(),
)
.await?;
let ecash_verifier = Arc::new(
EcashManager::new(
handler_config,
nyxd_client,
self.identity_keypair.public_key().to_bytes(),
shutdown.fork("EcashVerifier"),
self.storage.clone(),
)
.await?,
);
let mix_forwarding_channel = self.start_packet_forwarder(shutdown.fork("PacketForwarder"));
@@ -638,7 +640,7 @@ impl<St> Gateway<St> {
mix_forwarding_channel.clone(),
active_clients_store.clone(),
shutdown.fork("websocket::Listener"),
Arc::new(ecash_manager),
ecash_verifier.clone(),
);
let nr_request_filter = if self.config.network_requester.enabled {
@@ -670,7 +672,11 @@ impl<St> Gateway<St> {
let _wg_api = if self.wireguard_data.is_some() {
let embedded_auth = self
.start_authenticator(mix_forwarding_channel, shutdown.fork("authenticator"))
.start_authenticator(
mix_forwarding_channel,
shutdown.fork("authenticator"),
ecash_verifier,
)
.await
.map_err(|source| GatewayError::AuthenticatorStartError { source })?;
active_clients_store.insert_embedded(embedded_auth.handle);
@@ -1,9 +1,7 @@
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::listener::connection_handler::packet_processing::{
MixProcessingResult, PacketProcessor,
};
use crate::node::listener::connection_handler::packet_processing::PacketProcessor;
use crate::node::packet_delayforwarder::PacketDelayForwardSender;
use crate::node::TaskClient;
use futures::StreamExt;
@@ -13,7 +11,9 @@ use nym_metrics::nanos;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::framing::processing::MixProcessingResult;
use nym_sphinx::Delay as SphinxDelay;
use packet_processing::process_received_packet;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::time::Instant;
@@ -38,6 +38,10 @@ impl ConnectionHandler {
}
}
pub fn packet_processor(&self) -> &PacketProcessor {
&self.packet_processor
}
fn delay_and_forward_packet(&self, mix_packet: MixPacket, delay: Option<SphinxDelay>) {
// determine instant at which packet should get forwarded. this way we minimise effect of
// being stuck in the queue [of the channel] to get inserted into the delay queue
@@ -60,7 +64,10 @@ impl ConnectionHandler {
// all processing such, key caching, etc. was done.
// however, if it was a forward hop, we still need to delay it
nanos!("handle_received_packet", {
match self.packet_processor.process_received(framed_sphinx_packet) {
self.packet_processor
.node_stats_update_sender()
.report_received();
match process_received_packet(framed_sphinx_packet, self.packet_processor().inner()) {
Err(err) => debug!("We failed to process received sphinx packet - {err}"),
Ok(res) => match res {
MixProcessingResult::ForwardHop(forward_packet, delay) => {
@@ -4,13 +4,13 @@
use crate::node::node_statistics;
use nym_crypto::asymmetric::encryption;
use nym_mixnode_common::packet_processor::error::MixProcessingError;
pub use nym_mixnode_common::packet_processor::processor::MixProcessingResult;
use nym_mixnode_common::packet_processor::processor::SphinxPacketProcessor;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::framing::processing::{process_framed_packet, MixProcessingResult};
// PacketProcessor contains all data required to correctly unwrap and forward sphinx packets
#[derive(Clone)]
pub struct PacketProcessor {
pub(crate) struct PacketProcessor {
/// Responsible for performing unwrapping
inner_processor: SphinxPacketProcessor,
@@ -29,11 +29,18 @@ impl PacketProcessor {
}
}
pub(crate) fn process_received(
&self,
received: FramedNymPacket,
) -> Result<MixProcessingResult, MixProcessingError> {
self.node_stats_update_sender.report_received();
self.inner_processor.process_received(received)
pub fn inner(&self) -> &SphinxPacketProcessor {
&self.inner_processor
}
pub fn node_stats_update_sender(&self) -> &node_statistics::UpdateSender {
&self.node_stats_update_sender
}
}
pub fn process_received_packet(
packet: FramedNymPacket,
inner_processor: &SphinxPacketProcessor,
) -> Result<MixProcessingResult, MixProcessingError> {
Ok(process_framed_packet(packet, inner_processor.sphinx_key())?)
}
+2 -1
View File
@@ -4,7 +4,7 @@
[package]
name = "nym-api"
license = "GPL-3.0"
version = "1.1.43"
version = "1.1.44"
authors = [
"Dave Hrycyszyn <futurechimp@users.noreply.github.com>",
"Jędrzej Stuczyński <andrew@nymtech.net>",
@@ -131,6 +131,7 @@ nym-http-api-common = { path = "../common/http-api-common", features = ["utoipa"
[features]
no-reward = []
v2-performance = []
generate-ts = ["ts-rs"]
axum = ["dep:axum",
"axum-extra",
+27 -12
View File
@@ -395,18 +395,33 @@ impl StorageManager {
start: i64,
end: i64,
) -> Result<Option<f32>, sqlx::Error> {
let result = sqlx::query!(
r#"
SELECT AVG(reliability) as "reliability: f32" FROM mixnode_status
WHERE mixnode_details_id= ? AND timestamp >= ? AND timestamp <= ?
"#,
id,
start,
end
)
.fetch_one(&self.connection_pool)
.await?;
Ok(result.reliability)
if cfg!(feature = "v2-performance") {
let result = sqlx::query!(
r#"
SELECT AVG(reliability) as "reliability: f32" FROM mixnode_status_v2
WHERE mixnode_details_id= ? AND timestamp >= ? AND timestamp <= ?
"#,
id,
start,
end
)
.fetch_one(&self.connection_pool)
.await?;
Ok(result.reliability)
} else {
let result = sqlx::query!(
r#"
SELECT AVG(reliability) as "reliability: f32" FROM mixnode_status
WHERE mixnode_details_id= ? AND timestamp >= ? AND timestamp <= ?
"#,
id,
start,
end
)
.fetch_one(&self.connection_pool)
.await?;
Ok(result.reliability)
}
}
pub(super) async fn get_gateway_average_reliability_in_interval(
+40
View File
@@ -0,0 +1,40 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
[package]
name = "nym-data-observatory"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
anyhow = { workspace = true }
axum = { workspace = true, features = ["tokio"] }
chrono = { workspace = true }
nym-bin-common = { path = "../common/bin-common" }
nym-network-defaults = { path = "../common/network-defaults" }
nym-task = { path = "../common/task" }
nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "offline"] }
tokio = { workspace = true, features = ["process"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tower-http = { workspace = true, features = ["cors", "trace"] }
utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
utoipauto = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros" ] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }

Some files were not shown because too many files have changed in this diff Show More