Compare commits
508 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6efaa7a493 | |||
| 7a407e592b | |||
| 6d0827a1da | |||
| 3fbd3f9c33 | |||
| 795934df83 | |||
| cbbcdea480 | |||
| 31fe1585d2 | |||
| e70cb4e241 | |||
| 321025f39d | |||
| 43c4905c1c | |||
| ec09d32864 | |||
| 5eaa6a442d | |||
| d2a96f5a88 | |||
| 6c3207c25c | |||
| 29ce0192ee | |||
| b4242d86eb | |||
| 0579eb27bf | |||
| 3985352615 | |||
| 13595cde26 | |||
| d50f2d1bc3 | |||
| 4f597387bb | |||
| ce169320f5 | |||
| 59a58e7bbb | |||
| 19a97e1398 | |||
| e79efb3708 | |||
| fb74b0df0a | |||
| 516f85a2c7 | |||
| d3c9592d7d | |||
| 64c34ac762 | |||
| f5809e1423 | |||
| 231dd71cc7 | |||
| 048b491803 | |||
| 8a2307d6e3 | |||
| e26fcced39 | |||
| 338774895a | |||
| 40e6ef40c2 | |||
| 5f2d1c7e11 | |||
| 34a5292c4a | |||
| 514df81b2c | |||
| 4d83f5a34e | |||
| ea93efd00d | |||
| eb106587df | |||
| e98ef7d12a | |||
| 978b4c53cd | |||
| 345df90d2e | |||
| 7f8830f894 | |||
| 1918bcf5ba | |||
| 9e4e8a9b2b | |||
| 077a64b076 | |||
| d3fad05a09 | |||
| 5c937d2a67 | |||
| b403c68a15 | |||
| 65309170b4 | |||
| d41d26f5ac | |||
| 56ebf001aa | |||
| 44687f31ef | |||
| 5c4987cd75 | |||
| 4d30f522b7 | |||
| 26984b619c | |||
| 8cfa4ddc12 | |||
| 387c0698aa | |||
| 4b1a78f6ec | |||
| 31dd960d47 | |||
| 8ec4159532 | |||
| 491d424fd6 | |||
| 20b70bf0f9 | |||
| 97469ebe5f | |||
| d2943d0099 | |||
| 6717afc4d9 | |||
| 3e9d214679 | |||
| 63216fc1d0 | |||
| 84846db9a6 | |||
| 8e442ebed9 | |||
| 50a4b0a755 | |||
| 94a1bc9515 | |||
| bd227d3f51 | |||
| 6318708239 | |||
| 3984fcc804 | |||
| bd008c24ad | |||
| d8c59f4d38 | |||
| 88a3cf2094 | |||
| b74d2447a8 | |||
| ee5eb6042d | |||
| 3aa6462ead | |||
| fd8d226d68 | |||
| 5edb1dd3b0 | |||
| 7c236b2f86 | |||
| 5a082b4170 | |||
| 8cff6e5a73 | |||
| f875b38c87 | |||
| 86621d0b94 | |||
| ea5ec0733c | |||
| 98ccf9e37f | |||
| 3fd74fb590 | |||
| 4d72d72eb0 | |||
| 81b891a21c | |||
| d2c42ab7c6 | |||
| 2e71a442cd | |||
| 9fae4f26a1 | |||
| 1b757a806c | |||
| ce6e40a148 | |||
| 9244413539 | |||
| a554125187 | |||
| 035080cd3b | |||
| 9fc04bd8bc | |||
| f406e1d266 | |||
| 334e99538b | |||
| 9f77ff4d82 | |||
| afa64decc2 | |||
| 04bc42e333 | |||
| 02c5fe2b87 | |||
| defc43a50b | |||
| 9235777426 | |||
| c936e24e70 | |||
| aa4fcc90ec | |||
| 1d289c7c25 | |||
| acee708a66 | |||
| af819ab8e5 | |||
| a811c06295 | |||
| 13616ed992 | |||
| 44d6529adf | |||
| c2c775bd5b | |||
| 014e21f3ef | |||
| 2e90cb1ffc | |||
| 168da8afc4 | |||
| bb38ad62bf | |||
| 387ea3b52e | |||
| 796ae70e50 | |||
| 7beca33673 | |||
| c76bc49abf | |||
| ef32b0e543 | |||
| 0f6f0f3ca7 | |||
| 1f134a9fac | |||
| de4f813de8 | |||
| 390c14a547 | |||
| 4d592ac026 | |||
| 75c7c587dd | |||
| 29ae23bc93 | |||
| 7b78644527 | |||
| c30fdcbe23 | |||
| c3b465725a | |||
| cc2b787cd3 | |||
| 406293d781 | |||
| 5334ff7fc4 | |||
| 1288d4bbcd | |||
| 7c359e29f1 | |||
| 2a5bb6e1a1 | |||
| 91c1c2d43e | |||
| 93c189fcb0 | |||
| b7c64f290f | |||
| fddf1aee28 | |||
| 22c0d8a104 | |||
| 057e9cdcf8 | |||
| bc4d2f70ab | |||
| 0300c6d2d3 | |||
| 19e6554c22 | |||
| ee4f5da834 | |||
| 09b9866bf0 | |||
| b088699dfe | |||
| 909c681555 | |||
| f89c69ae33 | |||
| e84c9b2c8f | |||
| 99a0f159d7 | |||
| 7c0a45ed52 | |||
| 33c30624d2 | |||
| 0fecc56879 | |||
| 7ea009173b | |||
| 571b83d74d | |||
| 74538ac652 | |||
| a5844a461b | |||
| 03ec6b87a8 | |||
| 590acef2af | |||
| c1ad831426 | |||
| 900ea92724 | |||
| f753de780a | |||
| 139a9fdce2 | |||
| fde4492b0b | |||
| 4ec4893552 | |||
| 8506d07428 | |||
| b644afaa67 | |||
| 87260a422b | |||
| 6475282ebc | |||
| 7fdba349ab | |||
| fb77d06ea5 | |||
| 6d3b184ca3 | |||
| d02b0229fc | |||
| 7eb5f7c2b0 | |||
| a18d270d03 | |||
| b2f6836756 | |||
| 07f03f5239 | |||
| aac1b4361e | |||
| 87e429d78a | |||
| da48afc092 | |||
| 9e421024f1 | |||
| 9691b03c87 | |||
| 7ea7b9e482 | |||
| d0fe0a6468 | |||
| a5169ab1b4 | |||
| 7180292c0a | |||
| 68ffc797a7 | |||
| 6a70193bdf | |||
| 4178809555 | |||
| be082f5ed2 | |||
| 6d5755ca1f | |||
| 3efb7531ac | |||
| 9c949988e2 | |||
| 9de5d7213a | |||
| 94eb362a71 | |||
| 327ad7982e | |||
| 0f615f48f2 | |||
| d511611641 | |||
| 17d3ff2d77 | |||
| 43855209e8 | |||
| bf8f109602 | |||
| dd3dcfa7fe | |||
| 653a7fd900 | |||
| 86ea2d23cb | |||
| 42a37442e8 | |||
| 69eec1d3a1 | |||
| 6b24f081e1 | |||
| 6e5d0dac1b | |||
| f0b6a2d7e1 | |||
| 792931d77c | |||
| 9b64fc7755 | |||
| 231d94b6d4 | |||
| a29b0eecea | |||
| a4f3cf9c7e | |||
| 645f9f976b | |||
| 5f2740bf66 | |||
| ecb15034d3 | |||
| bd49c222a3 | |||
| 50b044a100 | |||
| ba645694d4 | |||
| be44811a65 | |||
| df5ceec522 | |||
| 62e1d32e4f | |||
| 9a4bbe1d67 | |||
| 98090d18b4 | |||
| 79f8066c13 | |||
| d0209766a3 | |||
| 844030091f | |||
| a7a421b006 | |||
| b2d8e45e7e | |||
| 6680fbd61a | |||
| fe2d21cf88 | |||
| eeaca9fc10 | |||
| 7255f79b9c | |||
| 589069504a | |||
| 4da7bc7442 | |||
| 35be8de9f1 | |||
| 2b14a9e6f8 | |||
| e9269da897 | |||
| 7bceeadf16 | |||
| e72ce8fa92 | |||
| 1ccdd5d660 | |||
| c6d38d3c4f | |||
| e8e2bf107f | |||
| efe4e5c1c1 | |||
| 2230609a72 | |||
| 6d80c37b21 | |||
| cb8b4c56af | |||
| 4d486abfef | |||
| b694845e4c | |||
| 5cb2800d15 | |||
| fd14394958 | |||
| 134883522d | |||
| fb3878389d | |||
| 0d397ab5cc | |||
| f637debdcc | |||
| 221e01e9b8 | |||
| bc00bdcbc3 | |||
| b943fd4768 | |||
| ff2279358a | |||
| dcc48db301 | |||
| 7528109693 | |||
| 203d682f2c | |||
| 589575eed8 | |||
| 35bf1cc717 | |||
| f5e02d5652 | |||
| 2fc641a7ff | |||
| 0ccca19cc2 | |||
| a07e567eb2 | |||
| e8f34bcca8 | |||
| 2e11971780 | |||
| 3c6a879061 | |||
| 0d3d939d55 | |||
| f3400a0aa5 | |||
| 6ba80c0c06 | |||
| bc7993f440 | |||
| 31d23b88f2 | |||
| bf8614a545 | |||
| 9aed938c79 | |||
| b7e3687757 | |||
| b9b969b7d3 | |||
| 47303e5b3b | |||
| 6b38ffd4f3 | |||
| 169c313404 | |||
| a3e19b4563 | |||
| ccf430ea62 | |||
| cf13b79e93 | |||
| 134a0196f8 | |||
| 54aef7c242 | |||
| 6c45c9f0b0 | |||
| b5afae0916 | |||
| 988eca857f | |||
| 3c05db2874 | |||
| a8e268f84a | |||
| ac22533ecd | |||
| bdc0b875a4 | |||
| d7b67c1408 | |||
| 606e29ebb0 | |||
| 21e3c1538d | |||
| 0fc7cc657d | |||
| 23a7f01c05 | |||
| 3a21cfa1ab | |||
| 1d2e6d916c | |||
| 4c2bf3642e | |||
| 70e2e32385 | |||
| 68a192daa3 | |||
| d6aacae14e | |||
| 6f00023d09 | |||
| 982ec56874 | |||
| 5dcc1ed6dc | |||
| d62bc0a10b | |||
| d1fb926a2a | |||
| dea69acd49 | |||
| ada2d2247a | |||
| 0159d7c27a | |||
| 882003c08c | |||
| b71a491872 | |||
| 8f48ae08c4 | |||
| 31b9623407 | |||
| 6d90ffdd2c | |||
| 28997c7f97 | |||
| 9550934d1f | |||
| a6c586a33b | |||
| ea8610623f | |||
| 9b9920a27c | |||
| f9bb522885 | |||
| 884d9440f2 | |||
| f6a547e5c2 | |||
| 0956bbee90 | |||
| 595bfd8073 | |||
| d4e9441de9 | |||
| 7c85c1a271 | |||
| 92c8d1b73f | |||
| 554e9ca490 | |||
| d44a40c186 | |||
| 0d47fe01fb | |||
| aa86bf0ee3 | |||
| bd79fb1200 | |||
| 11a3bee257 | |||
| 50ac19a4f5 | |||
| 961a81312d | |||
| cb56dd1fe2 | |||
| e9871f6bd5 | |||
| 6e6675f7bf | |||
| 06c3215211 | |||
| a7f7ebfbae | |||
| 0221d45aff | |||
| adb7e4fb60 | |||
| d0d6e38b02 | |||
| 1aec8be85e | |||
| e736025d6d | |||
| dc0f46c11c | |||
| beab341f55 | |||
| 3d6803928b | |||
| 306cfaf0c5 | |||
| f67c184458 | |||
| 79ddcb3b35 | |||
| fe49d38aea | |||
| feda01f1cc | |||
| c212412b90 | |||
| 1e4b360c6d | |||
| d66f253e8b | |||
| 80fc25a3c0 | |||
| 4b474dd8ff | |||
| b163dba2d4 | |||
| ba4c65c558 | |||
| aba10e6a59 | |||
| f1ac0530a3 | |||
| 2f5aed0354 | |||
| bde87587f2 | |||
| f37519942e | |||
| 93ed710fba | |||
| f06fe4d36c | |||
| 626f074229 | |||
| 78ea2eb9c4 | |||
| a0daabab03 | |||
| b0a5b60945 | |||
| 6a0725130b | |||
| 74e949a4cb | |||
| 325b7ef93a | |||
| 2502e2f8c3 | |||
| 732ba41179 | |||
| dcfd830407 | |||
| 4626c999f8 | |||
| 5aac04ac01 | |||
| 0d7fcf34d9 | |||
| dc804c1235 | |||
| f4a416c478 | |||
| 9639f08db6 | |||
| 6ec3e3931b | |||
| bfeec4980d | |||
| 11c1a7c515 | |||
| 5b52e775d8 | |||
| 88c7009e88 | |||
| 50494ea1e5 | |||
| ecf3c4c3ac | |||
| 39b4d6a9a2 | |||
| 2ca034be1c | |||
| c116b75f33 | |||
| aa83501ed0 | |||
| e904c58e6e | |||
| ce02c858a5 | |||
| 7116c5a85d | |||
| 9b87a06238 | |||
| 02fe3f1c5f | |||
| e2d7df8bf0 | |||
| 5c980b03b6 | |||
| 7dc7250499 | |||
| 86f91eff29 | |||
| f45e87a848 | |||
| 46dc3284ee | |||
| ea11eb5a1f | |||
| ecd3ace897 | |||
| 990f5a81f4 | |||
| d87607787f | |||
| d6fc3fe51f | |||
| ec89954d3e | |||
| 5b6108ec80 | |||
| ccb44ee6ff | |||
| e155d293fc | |||
| c3f48debdf | |||
| a7a11ac3a5 | |||
| 1ed824ab55 | |||
| 7f9da13e51 | |||
| c26fe1977b | |||
| a039f2e73d | |||
| c45492cdc8 | |||
| fb3a7aed6b | |||
| 920c1f8855 | |||
| 5f06414a12 | |||
| 656838811a | |||
| 7b8458630a | |||
| cf2ab08b4d | |||
| 2466112829 | |||
| e5306908e4 | |||
| 3f2b57d4a6 | |||
| dee15b2f24 | |||
| 1da0b34c45 | |||
| 67adcce9a4 | |||
| 6fc68c92d0 | |||
| e3885b9fe6 | |||
| fb8b4f82db | |||
| 40a44028d4 | |||
| 8a26830ea2 | |||
| 997c75cfff | |||
| 627ed95aa4 | |||
| deee4ecce7 | |||
| 5f43bb7671 | |||
| b41f0e9b8a | |||
| f8a224787d | |||
| 47c3287226 | |||
| a4914061e0 | |||
| 3feaf98397 | |||
| af24d776c0 | |||
| 2b303d2bdd | |||
| 5409fe0ec2 | |||
| 601863419b | |||
| 89848ec91b | |||
| b181f96d1b | |||
| 80e58678a0 | |||
| b628a5f814 | |||
| 396248be8e | |||
| 665bb60d91 | |||
| deb0ef381e | |||
| 9f945e9d10 | |||
| ae7f908b8a | |||
| 6e3331cb13 | |||
| 103523d20a | |||
| d40115fe0f | |||
| 3b2e753f91 | |||
| 134f0909cd | |||
| 5e3259c787 | |||
| 7cbff7ce8d | |||
| f5a9b43c4c | |||
| 2eb53ef55a | |||
| 280283e41a | |||
| 07d10c35b6 | |||
| 0ab2a589a6 | |||
| bc849c7c46 | |||
| f94f3b1a1d | |||
| c3bcb96bad | |||
| 9b6816e265 | |||
| 68223000ea | |||
| 9f5e153ddb | |||
| cf87ced120 | |||
| 7560837f01 | |||
| 16b9bcb91d | |||
| 07e946a195 | |||
| 648223a235 | |||
| 62045d76b3 | |||
| f8317f5a03 | |||
| c3ec970a37 | |||
| 5a573bc278 | |||
| 3d200db722 | |||
| e4139713cb |
@@ -19,7 +19,7 @@ jobs:
|
||||
- uses: rlespinasse/github-slug-action@v3.x
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: 18
|
||||
node-version: 20
|
||||
- name: Setup yarn
|
||||
run: npm install -g yarn
|
||||
- name: Build
|
||||
|
||||
@@ -54,6 +54,20 @@ jobs:
|
||||
override: true
|
||||
components: rustfmt, clippy
|
||||
|
||||
# To avoid running out of disk space, skip generating debug symbols
|
||||
- name: Set debug to false (unix)
|
||||
if: contains(matrix.os, 'ubuntu') || contains(matrix.os, 'mac')
|
||||
run: |
|
||||
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
|
||||
git diff
|
||||
|
||||
- name: Set debug to false (win)
|
||||
if: contains(matrix.os, 'windows')
|
||||
shell: pwsh
|
||||
run: |
|
||||
(Get-Content Cargo.toml) -replace '\[profile.dev\]', "`$&`ndebug = false" | Set-Content Cargo.toml
|
||||
git diff
|
||||
|
||||
- name: Check formatting
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
|
||||
@@ -9,6 +9,8 @@ on:
|
||||
paths:
|
||||
- 'contracts/**'
|
||||
- 'common/**'
|
||||
- 'Cargo.lock'
|
||||
- 'Cargo.toml'
|
||||
- '.github/workflows/ci-contracts.yml'
|
||||
|
||||
jobs:
|
||||
|
||||
@@ -10,7 +10,7 @@ on:
|
||||
- "nym-connect/desktop/package.json"
|
||||
- "nym-wallet/src/**"
|
||||
- "nym-wallet/package.json"
|
||||
- "explorer/**"
|
||||
- "explorer-nextjs/src/**"
|
||||
- ".github/workflows/ci-lint-typescript.yml"
|
||||
|
||||
jobs:
|
||||
@@ -22,7 +22,7 @@ jobs:
|
||||
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: 18
|
||||
node-version: 20
|
||||
- name: Setup yarn
|
||||
run: npm install -g yarn
|
||||
|
||||
@@ -37,12 +37,12 @@ jobs:
|
||||
- name: Install wasm-opt
|
||||
uses: ./.github/actions/install-wasm-opt
|
||||
with:
|
||||
version: '116'
|
||||
version: "116"
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '1.20'
|
||||
go-version: "1.20"
|
||||
|
||||
- name: Install
|
||||
run: yarn
|
||||
|
||||
@@ -4,8 +4,8 @@ on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
paths:
|
||||
- 'explorer/**'
|
||||
- '.github/workflows/ci-nym-network-explorer.yml'
|
||||
- "explorer/**"
|
||||
- ".github/workflows/ci-nym-network-explorer.yml"
|
||||
|
||||
defaults:
|
||||
run:
|
||||
@@ -15,78 +15,78 @@ jobs:
|
||||
build:
|
||||
runs-on: custom-linux
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install rsync
|
||||
run: sudo apt-get install rsync
|
||||
continue-on-error: true
|
||||
- uses: rlespinasse/github-slug-action@v3.x
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: 18
|
||||
- name: Setup yarn
|
||||
run: npm install -g yarn
|
||||
continue-on-error: true
|
||||
- name: Build shared packages
|
||||
run: cd .. && yarn && yarn build
|
||||
- name: Set environment from the example
|
||||
run: cp .env.prod .env
|
||||
# - run: yarn test
|
||||
# continue-on-error: true
|
||||
- run: yarn && yarn build
|
||||
continue-on-error: true
|
||||
- run: yarn storybook:build
|
||||
name: Build storybook
|
||||
- name: Deploy branch to CI www
|
||||
continue-on-error: true
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
|
||||
ARGS: "-rltgoDzvO --delete"
|
||||
SOURCE: "explorer/dist/"
|
||||
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
|
||||
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
|
||||
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/network-explorer-${{ env.GITHUB_REF_SLUG }}
|
||||
EXCLUDE: "/dist/, /node_modules/"
|
||||
- name: Deploy storybook to CI www
|
||||
continue-on-error: true
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
|
||||
ARGS: "-rltgoDzvO --delete"
|
||||
SOURCE: "explorer/storybook-static/"
|
||||
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
|
||||
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
|
||||
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/ne-sb-${{ env.GITHUB_REF_SLUG }}
|
||||
EXCLUDE: "/dist/, /node_modules/"
|
||||
- name: Matrix - Node Install
|
||||
run: npm install
|
||||
working-directory: .github/workflows/support-files
|
||||
- name: Matrix - Send Notification
|
||||
env:
|
||||
NYM_NOTIFICATION_KIND: network-explorer
|
||||
NYM_PROJECT_NAME: "Network Explorer"
|
||||
NYM_CI_WWW_BASE: "${{ secrets.NYM_CI_WWW_BASE }}"
|
||||
NYM_CI_WWW_LOCATION: "network-explorer-${{ env.GITHUB_REF_SLUG }}"
|
||||
NYM_CI_WWW_LOCATION_STORYBOOK: "ne-sb-${{ env.GITHUB_REF_SLUG }}"
|
||||
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
|
||||
GIT_BRANCH: "${GITHUB_REF##*/}"
|
||||
IS_SUCCESS: "${{ job.status == 'success' }}"
|
||||
MATRIX_SERVER: "${{ secrets.MATRIX_SERVER }}"
|
||||
MATRIX_ROOM: "${{ secrets.MATRIX_ROOM }}"
|
||||
MATRIX_USER_ID: "${{ secrets.MATRIX_USER_ID }}"
|
||||
MATRIX_TOKEN: "${{ secrets.MATRIX_TOKEN }}"
|
||||
MATRIX_DEVICE_ID: "${{ secrets.MATRIX_DEVICE_ID }}"
|
||||
uses: docker://keybaseio/client:stable-node
|
||||
with:
|
||||
args: .github/workflows/support-files/notifications/entry_point.sh
|
||||
- name: Deploy
|
||||
if: github.event_name == 'workflow_dispatch'
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
SSH_PRIVATE_KEY: ${{ secrets.CD_PROD_NE_SSH_PRIVATE_KEY }}
|
||||
ARGS: "-rltgoDzvO --delete"
|
||||
SOURCE: "explorer/dist/"
|
||||
REMOTE_HOST: ${{ secrets.CD_PROD_NE_REMOTE_HOST }}
|
||||
REMOTE_USER: ${{ secrets.CD_PROD_NE_REMOTE_USER }}
|
||||
TARGET: ${{ secrets.CD_PROD_NE_REMOTE_TARGET }}
|
||||
EXCLUDE: "/dist/, /node_modules/"
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install rsync
|
||||
run: sudo apt-get install rsync
|
||||
continue-on-error: true
|
||||
- uses: rlespinasse/github-slug-action@v3.x
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: 20
|
||||
- name: Setup yarn
|
||||
run: npm install -g yarn
|
||||
continue-on-error: true
|
||||
- name: Build shared packages
|
||||
run: cd .. && yarn && yarn build
|
||||
- name: Set environment from the example
|
||||
run: cp .env.prod .env
|
||||
# - run: yarn test
|
||||
# continue-on-error: true
|
||||
- run: yarn && yarn build
|
||||
continue-on-error: true
|
||||
- run: yarn storybook:build
|
||||
name: Build storybook
|
||||
- name: Deploy branch to CI www
|
||||
continue-on-error: true
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
|
||||
ARGS: "-rltgoDzvO --delete"
|
||||
SOURCE: "explorer/dist/"
|
||||
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
|
||||
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
|
||||
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/network-explorer-${{ env.GITHUB_REF_SLUG }}
|
||||
EXCLUDE: "/dist/, /node_modules/"
|
||||
- name: Deploy storybook to CI www
|
||||
continue-on-error: true
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
|
||||
ARGS: "-rltgoDzvO --delete"
|
||||
SOURCE: "explorer/storybook-static/"
|
||||
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
|
||||
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
|
||||
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/ne-sb-${{ env.GITHUB_REF_SLUG }}
|
||||
EXCLUDE: "/dist/, /node_modules/"
|
||||
- name: Matrix - Node Install
|
||||
run: npm install
|
||||
working-directory: .github/workflows/support-files
|
||||
- name: Matrix - Send Notification
|
||||
env:
|
||||
NYM_NOTIFICATION_KIND: network-explorer
|
||||
NYM_PROJECT_NAME: "Network Explorer"
|
||||
NYM_CI_WWW_BASE: "${{ secrets.NYM_CI_WWW_BASE }}"
|
||||
NYM_CI_WWW_LOCATION: "network-explorer-${{ env.GITHUB_REF_SLUG }}"
|
||||
NYM_CI_WWW_LOCATION_STORYBOOK: "ne-sb-${{ env.GITHUB_REF_SLUG }}"
|
||||
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
|
||||
GIT_BRANCH: "${GITHUB_REF##*/}"
|
||||
IS_SUCCESS: "${{ job.status == 'success' }}"
|
||||
MATRIX_SERVER: "${{ secrets.MATRIX_SERVER }}"
|
||||
MATRIX_ROOM: "${{ secrets.MATRIX_ROOM }}"
|
||||
MATRIX_USER_ID: "${{ secrets.MATRIX_USER_ID }}"
|
||||
MATRIX_TOKEN: "${{ secrets.MATRIX_TOKEN }}"
|
||||
MATRIX_DEVICE_ID: "${{ secrets.MATRIX_DEVICE_ID }}"
|
||||
uses: docker://keybaseio/client:stable-node
|
||||
with:
|
||||
args: .github/workflows/support-files/notifications/entry_point.sh
|
||||
- name: Deploy
|
||||
if: github.event_name == 'workflow_dispatch'
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
SSH_PRIVATE_KEY: ${{ secrets.CD_PROD_NE_SSH_PRIVATE_KEY }}
|
||||
ARGS: "-rltgoDzvO --delete"
|
||||
SOURCE: "explorer/dist/"
|
||||
REMOTE_HOST: ${{ secrets.CD_PROD_NE_REMOTE_HOST }}
|
||||
REMOTE_USER: ${{ secrets.CD_PROD_NE_REMOTE_USER }}
|
||||
TARGET: ${{ secrets.CD_PROD_NE_REMOTE_TARGET }}
|
||||
EXCLUDE: "/dist/, /node_modules/"
|
||||
|
||||
@@ -30,6 +30,12 @@ jobs:
|
||||
override: true
|
||||
components: rustfmt, clippy
|
||||
|
||||
- name: Set debug to false
|
||||
working-directory: nym-wallet
|
||||
run: |
|
||||
sed -i.bak '1s/^/\[profile.dev\]\ndebug = false\n\n/' Cargo.toml
|
||||
git diff
|
||||
|
||||
- name: Build all binaries
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
|
||||
@@ -20,7 +20,7 @@ jobs:
|
||||
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: 18
|
||||
node-version: 20
|
||||
|
||||
- name: Setup yarn
|
||||
run: npm install -g yarn
|
||||
|
||||
@@ -45,6 +45,11 @@ jobs:
|
||||
- name: Install wasm-bindgen-cli
|
||||
run: cargo install wasm-bindgen-cli
|
||||
|
||||
- name: Set debug to false
|
||||
run: |
|
||||
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
|
||||
git diff
|
||||
|
||||
- name: "Build"
|
||||
run: make sdk-wasm-build
|
||||
|
||||
|
||||
+116
@@ -4,6 +4,122 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [2025.3-ruta] (2025-02-10)
|
||||
|
||||
- Push down forget me to client configs ([#5431])
|
||||
- Fix statistics shutdown ([#5426])
|
||||
- Make wait_for_graceful_shutdown to be pub ([#5424])
|
||||
- Upgrade to thiserror 2.0 ([#5414])
|
||||
- build(deps): bump the patch-updates group across 1 directory with 9 updates ([#5406])
|
||||
- Relocate a validator api function ([#5401])
|
||||
- Send shutdown instead of panic when reaching max fail ([#5398])
|
||||
- Change Explorer URL to new smooshed nodes ([#5396])
|
||||
- reduce log severity for checking topology validity ([#5395])
|
||||
- MixnetClient can send ClientRequests ([#5381])
|
||||
- Fix missing path triggers for CI ([#5380])
|
||||
- Uncouple storage reference for bandwidth client ([#5372])
|
||||
- build(deps): bump tokio from 1.40.0 to 1.43.0 ([#5370])
|
||||
- DNS resolver configuration for internal HTTP client lookups ([#5355])
|
||||
- Update README.md ([#5328])
|
||||
- Update README.md ([#5327])
|
||||
|
||||
[#5431]: https://github.com/nymtech/nym/pull/5431
|
||||
[#5426]: https://github.com/nymtech/nym/pull/5426
|
||||
[#5424]: https://github.com/nymtech/nym/pull/5424
|
||||
[#5414]: https://github.com/nymtech/nym/pull/5414
|
||||
[#5406]: https://github.com/nymtech/nym/pull/5406
|
||||
[#5401]: https://github.com/nymtech/nym/pull/5401
|
||||
[#5398]: https://github.com/nymtech/nym/pull/5398
|
||||
[#5396]: https://github.com/nymtech/nym/pull/5396
|
||||
[#5395]: https://github.com/nymtech/nym/pull/5395
|
||||
[#5381]: https://github.com/nymtech/nym/pull/5381
|
||||
[#5380]: https://github.com/nymtech/nym/pull/5380
|
||||
[#5372]: https://github.com/nymtech/nym/pull/5372
|
||||
[#5370]: https://github.com/nymtech/nym/pull/5370
|
||||
[#5355]: https://github.com/nymtech/nym/pull/5355
|
||||
[#5328]: https://github.com/nymtech/nym/pull/5328
|
||||
[#5327]: https://github.com/nymtech/nym/pull/5327
|
||||
|
||||
## [2025.2-hu] (2025-02-04)
|
||||
|
||||
- Feature/remove double spending bloomfilter ([#5417])
|
||||
- HU - Downgrade harmless log message from info to debug ([#5405])
|
||||
- lower default ticket verification quorum to 0.7 ([#5404])
|
||||
- Downgrade harmless log message from info to debug ([#5403])
|
||||
- Redirect from mixnode page to nodes page ([#5397])
|
||||
- chore :update version of chain watcher and validator rewarder ([#5394])
|
||||
- bugfix: correctly handle ingore epoch roles flag ([#5390])
|
||||
- bugfix: terminate mixnet socket listener on shutdown ([#5389])
|
||||
- feat: make client ignore dual mode nodes by default ([#5388])
|
||||
- Handle ecash network errors differently ([#5378])
|
||||
- Remove empty ephemeral keys ([#5376])
|
||||
- fixed sql migration for adding default message timestamp ([#5374])
|
||||
- Bind to [::] on nym-node for both IP versions ([#5361])
|
||||
- exposed NymApiClient method for obtaining node performance history ([#5360])
|
||||
- Client gateway selection ([#5358])
|
||||
- chore: refresh wasm sdk ([#5353])
|
||||
- chore: update indexed_db_futures ([#5347])
|
||||
- build(deps): bump mikefarah/yq from 4.44.6 to 4.45.1 ([#5342])
|
||||
- updated cosmrs and tendermint-rpc to their most recent versions ([#5339])
|
||||
- build(deps): bump ts-rs from 10.0.0 to 10.1.0 ([#5338])
|
||||
- build(deps): bump tempfile from 3.14.0 to 3.15.0 ([#5337])
|
||||
- build(deps): bump the patch-updates group with 8 updates ([#5336])
|
||||
- feature: introduce /load endpoint for self-reported quantised NymNode load ([#5326])
|
||||
- feature: `CancellationToken`-based shutdowns ([#5325])
|
||||
- Use expect in geodata test to give error message on failure ([#5314])
|
||||
- feature: periodically remove stale gateway messages ([#5312])
|
||||
- build(deps): bump the patch-updates group across 1 directory with 35 updates ([#5310])
|
||||
- Add dependabot assignes for the root cargo ecosystem ([#5297])
|
||||
- Move tun constants to network defaults ([#5286])
|
||||
- Include IPINFO_API_TOKEN in nightly CI ([#5285])
|
||||
- Nyx Chain Watcher ([#5274])
|
||||
- bugfix: remove unnecessary arguments for nym-api swagger endpoints ([#5272])
|
||||
- feature: nym topology revamp ([#5271])
|
||||
- Add windows to CI builds ([#5269])
|
||||
- http-api-client: deduplicate code ([#5267])
|
||||
- build(deps): bump http from 1.1.0 to 1.2.0 ([#5228])
|
||||
- NS API: add mixnet scraper ([#5200])
|
||||
- build(deps): bump criterion from 0.4.0 to 0.5.1 ([#4911])
|
||||
|
||||
[#5417]: https://github.com/nymtech/nym/pull/5417
|
||||
[#5405]: https://github.com/nymtech/nym/pull/5405
|
||||
[#5404]: https://github.com/nymtech/nym/pull/5404
|
||||
[#5403]: https://github.com/nymtech/nym/pull/5403
|
||||
[#5397]: https://github.com/nymtech/nym/pull/5397
|
||||
[#5394]: https://github.com/nymtech/nym/pull/5394
|
||||
[#5390]: https://github.com/nymtech/nym/pull/5390
|
||||
[#5389]: https://github.com/nymtech/nym/pull/5389
|
||||
[#5388]: https://github.com/nymtech/nym/pull/5388
|
||||
[#5378]: https://github.com/nymtech/nym/pull/5378
|
||||
[#5376]: https://github.com/nymtech/nym/pull/5376
|
||||
[#5374]: https://github.com/nymtech/nym/pull/5374
|
||||
[#5361]: https://github.com/nymtech/nym/pull/5361
|
||||
[#5360]: https://github.com/nymtech/nym/pull/5360
|
||||
[#5358]: https://github.com/nymtech/nym/pull/5358
|
||||
[#5353]: https://github.com/nymtech/nym/pull/5353
|
||||
[#5347]: https://github.com/nymtech/nym/pull/5347
|
||||
[#5342]: https://github.com/nymtech/nym/pull/5342
|
||||
[#5339]: https://github.com/nymtech/nym/pull/5339
|
||||
[#5338]: https://github.com/nymtech/nym/pull/5338
|
||||
[#5337]: https://github.com/nymtech/nym/pull/5337
|
||||
[#5336]: https://github.com/nymtech/nym/pull/5336
|
||||
[#5326]: https://github.com/nymtech/nym/pull/5326
|
||||
[#5325]: https://github.com/nymtech/nym/pull/5325
|
||||
[#5314]: https://github.com/nymtech/nym/pull/5314
|
||||
[#5312]: https://github.com/nymtech/nym/pull/5312
|
||||
[#5310]: https://github.com/nymtech/nym/pull/5310
|
||||
[#5297]: https://github.com/nymtech/nym/pull/5297
|
||||
[#5286]: https://github.com/nymtech/nym/pull/5286
|
||||
[#5285]: https://github.com/nymtech/nym/pull/5285
|
||||
[#5274]: https://github.com/nymtech/nym/pull/5274
|
||||
[#5272]: https://github.com/nymtech/nym/pull/5272
|
||||
[#5271]: https://github.com/nymtech/nym/pull/5271
|
||||
[#5269]: https://github.com/nymtech/nym/pull/5269
|
||||
[#5267]: https://github.com/nymtech/nym/pull/5267
|
||||
[#5228]: https://github.com/nymtech/nym/pull/5228
|
||||
[#5200]: https://github.com/nymtech/nym/pull/5200
|
||||
[#4911]: https://github.com/nymtech/nym/pull/4911
|
||||
|
||||
## [2025.1-reeses] (2025-01-15)
|
||||
|
||||
- Feture/legacy alert ([#5346])
|
||||
|
||||
Generated
+1273
-1086
File diff suppressed because it is too large
Load Diff
+36
-30
@@ -48,7 +48,6 @@ members = [
|
||||
"common/credentials-interface",
|
||||
"common/crypto",
|
||||
"common/dkg",
|
||||
"common/ecash-double-spending",
|
||||
"common/ecash-time",
|
||||
"common/execute",
|
||||
"common/exit-policy",
|
||||
@@ -188,18 +187,21 @@ readme = "README.md"
|
||||
|
||||
[workspace.dependencies]
|
||||
addr = "0.15.6"
|
||||
aead = "0.5.2"
|
||||
aes = "0.8.1"
|
||||
aes-gcm = "0.10.1"
|
||||
aes-gcm-siv = "0.11.1"
|
||||
aead = "0.5.2"
|
||||
ammonia = "4"
|
||||
anyhow = "1.0.95"
|
||||
arc-swap = "1.7.1"
|
||||
argon2 = "0.5.0"
|
||||
async-trait = "0.1.85"
|
||||
axum-client-ip = "0.6.1"
|
||||
async-trait = "0.1.86"
|
||||
axum = "0.7.5"
|
||||
axum-client-ip = "0.6.1"
|
||||
axum-extra = "0.9.4"
|
||||
axum-test = "16.2.0"
|
||||
base64 = "0.22.1"
|
||||
base85rs = "0.1.3"
|
||||
bincode = "1.3.3"
|
||||
bip39 = { version = "2.0.0", features = ["zeroize"] }
|
||||
bit-vec = "0.7.0" # can we unify those?
|
||||
@@ -210,17 +212,17 @@ bs58 = "0.5.1"
|
||||
bytecodec = "0.4.15"
|
||||
bytes = "1.7.2"
|
||||
cargo_metadata = "0.18.1"
|
||||
celes = "2.4.0"
|
||||
celes = "2.5.0"
|
||||
cfg-if = "1.0.0"
|
||||
chacha20 = "0.9.0"
|
||||
chacha20poly1305 = "0.10.1"
|
||||
chrono = "0.4.39"
|
||||
cipher = "0.4.3"
|
||||
clap = "4.5.26"
|
||||
clap = "4.5.30"
|
||||
clap_complete = "4.5"
|
||||
clap_complete_fig = "4.5"
|
||||
colored = "2.0"
|
||||
comfy-table = "7.1.3"
|
||||
colored = "2.2"
|
||||
comfy-table = "7.1.4"
|
||||
console = "0.15.10"
|
||||
console-subscriber = "0.1.1"
|
||||
console_error_panic_hook = "0.1"
|
||||
@@ -240,8 +242,9 @@ doc-comment = "0.3"
|
||||
dotenvy = "0.15.6"
|
||||
ecdsa = "0.16"
|
||||
ed25519-dalek = "2.1"
|
||||
etherparse = "0.13.0"
|
||||
env_logger = "0.11.6"
|
||||
envy = "0.4"
|
||||
etherparse = "0.13.0"
|
||||
eyre = "0.6.9"
|
||||
fastrand = "2.1.1"
|
||||
flate2 = "1.0.35"
|
||||
@@ -249,23 +252,23 @@ futures = "0.3.31"
|
||||
futures-util = "0.3"
|
||||
generic-array = "0.14.7"
|
||||
getrandom = "0.2.10"
|
||||
getset = "0.1.3"
|
||||
getset = "0.1.4"
|
||||
handlebars = "3.5.5"
|
||||
headers = "0.4.0"
|
||||
hex = "0.4.3"
|
||||
hex-literal = "0.3.3"
|
||||
hickory-resolver = "0.24.2"
|
||||
hickory-resolver = "0.24.3"
|
||||
hkdf = "0.12.3"
|
||||
hmac = "0.12.1"
|
||||
http = "1"
|
||||
http-body-util = "0.1"
|
||||
httpcodec = "0.2.3"
|
||||
human-repr = "1.1.0"
|
||||
humantime = "2.1.0"
|
||||
humantime-serde = "1.1.1"
|
||||
human-repr = "1.1.0"
|
||||
hyper = "1.4.1"
|
||||
hyper = "1.6.0"
|
||||
hyper-util = "0.1"
|
||||
indicatif = "0.17.9"
|
||||
indicatif = "0.17.11"
|
||||
inquire = "0.6.2"
|
||||
ip_network = "0.4.1"
|
||||
ipnetwork = "0.20"
|
||||
@@ -277,13 +280,12 @@ ledger-transport = "0.10.0"
|
||||
ledger-transport-hid = "0.10.0"
|
||||
log = "0.4"
|
||||
maxminddb = "0.23.0"
|
||||
rs_merkle = "1.4.2"
|
||||
mime = "0.3.17"
|
||||
moka = { version = "0.12", features = ["future"] }
|
||||
nix = "0.27.1"
|
||||
notify = "5.1.0"
|
||||
okapi = "0.7.0"
|
||||
once_cell = "1.20.2"
|
||||
once_cell = "1.20.3"
|
||||
opentelemetry = "0.19.0"
|
||||
opentelemetry-jaeger = "0.18.0"
|
||||
parking_lot = "0.12.3"
|
||||
@@ -292,7 +294,7 @@ petgraph = "0.6.5"
|
||||
pin-project = "1.1"
|
||||
pin-project-lite = "0.2.16"
|
||||
pretty_env_logger = "0.4.0"
|
||||
publicsuffix = "2.2.3"
|
||||
publicsuffix = "2.3.0"
|
||||
quote = "1"
|
||||
rand = "0.8.5"
|
||||
rand_chacha = "0.3"
|
||||
@@ -306,20 +308,21 @@ reqwest = { version = "0.12.4", default-features = false }
|
||||
rocket = "0.5.0"
|
||||
rocket_cors = "0.6.0"
|
||||
rocket_okapi = "0.8.0"
|
||||
rs_merkle = "1.4.2"
|
||||
safer-ffi = "0.1.13"
|
||||
schemars = "0.8.21"
|
||||
semver = "1.0.24"
|
||||
semver = "1.0.25"
|
||||
serde = "1.0.217"
|
||||
serde_bytes = "0.11.15"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0.135"
|
||||
serde_json_path = "0.7.1"
|
||||
serde_json = "1.0.138"
|
||||
serde_json_path = "0.7.2"
|
||||
serde_repr = "0.1"
|
||||
serde_with = "3.9.0"
|
||||
serde_yaml = "0.9.25"
|
||||
sha2 = "0.10.8"
|
||||
si-scale = "0.2.3"
|
||||
sphinx-packet = "0.1.1"
|
||||
sphinx-packet = "0.3.1"
|
||||
sqlx = "0.7.4"
|
||||
strum = "0.26"
|
||||
strum_macros = "0.26"
|
||||
@@ -327,29 +330,32 @@ subtle-encoding = "0.5"
|
||||
syn = "1"
|
||||
sysinfo = "0.33.0"
|
||||
tap = "1.0.1"
|
||||
tar = "0.4.43"
|
||||
tar = "0.4.44"
|
||||
tempfile = "3.15"
|
||||
thiserror = "1.0.64"
|
||||
thiserror = "2.0"
|
||||
time = "0.3.37"
|
||||
tokio = "1.39"
|
||||
tokio = "1.43"
|
||||
tokio-postgres = "0.7"
|
||||
tokio-stream = "0.1.17"
|
||||
tokio-test = "0.4.4"
|
||||
tokio-tun = "0.11.5"
|
||||
tokio-tungstenite = { version = "0.20.1" }
|
||||
tokio-util = "0.7.13"
|
||||
toml = "0.8.19"
|
||||
tower = "0.4.13"
|
||||
toml = "0.8.20"
|
||||
tower = "0.5.2"
|
||||
tower-http = "0.5.2"
|
||||
tracing = "0.1.41"
|
||||
tracing-log = "0.2"
|
||||
tracing-opentelemetry = "0.19.0"
|
||||
tracing-subscriber = "0.3.19"
|
||||
tracing-tree = "0.2.2"
|
||||
tracing-log = "0.2"
|
||||
ts-rs = "10.1.0"
|
||||
tungstenite = { version = "0.20.1", default-features = false }
|
||||
uniffi = "0.29.0"
|
||||
uniffi_build = "0.29.0"
|
||||
url = "2.5"
|
||||
utoipa = "5.2"
|
||||
utoipa-swagger-ui = "8.0"
|
||||
utoipa-swagger-ui = "8.1"
|
||||
utoipauto = "0.2"
|
||||
uuid = "*"
|
||||
vergen = { version = "=8.3.1", default-features = false }
|
||||
@@ -385,10 +391,10 @@ cw4 = { version = "=1.1.2" }
|
||||
cw-controllers = { version = "=1.1.0" }
|
||||
|
||||
# cosmrs-related
|
||||
bip32 = { version = "0.5.2", default-features = false }
|
||||
bip32 = { version = "0.5.3", default-features = false }
|
||||
|
||||
|
||||
cosmrs = { version = "0.21.0" }
|
||||
cosmrs = { version = "0.21.1" }
|
||||
tendermint = "0.40.0"
|
||||
tendermint-rpc = "0.40.0"
|
||||
prost = { version = "0.13", default-features = false }
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nym-client"
|
||||
version = "1.1.46"
|
||||
version = "1.1.48"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
|
||||
description = "Implementation of the Nym Client"
|
||||
edition = "2021"
|
||||
|
||||
@@ -56,7 +56,7 @@ pub fn default_data_directory<P: AsRef<Path>>(id: P) -> PathBuf {
|
||||
.join(DEFAULT_DATA_DIR)
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, PartialEq, Serialize)]
|
||||
#[derive(Debug, Deserialize, PartialEq, Serialize, Clone)]
|
||||
pub struct Config {
|
||||
#[serde(flatten)]
|
||||
pub base: BaseClientConfig,
|
||||
@@ -94,6 +94,10 @@ impl CliClientConfig for Config {
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn base(&self) -> BaseClientConfig {
|
||||
self.base.clone()
|
||||
}
|
||||
|
||||
pub fn new<S: AsRef<str>>(id: S) -> Self {
|
||||
Config {
|
||||
base: BaseClientConfig::new(id.as_ref(), env!("CARGO_PKG_VERSION")),
|
||||
@@ -209,7 +213,7 @@ impl SocketType {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, PartialEq, Eq, Serialize)]
|
||||
#[derive(Debug, Deserialize, PartialEq, Eq, Serialize, Clone)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
pub struct Socket {
|
||||
pub socket_type: SocketType,
|
||||
|
||||
@@ -107,5 +107,8 @@ enabled = {{ debug.stats_reporting.enabled }}
|
||||
provider_address = '{{ debug.stats_reporting.provider_address }}'
|
||||
reporting_interval = '{{ debug.stats_reporting.reporting_interval }}'
|
||||
|
||||
[debug.forget_me]
|
||||
client = {{ debug.forget_me.client }}
|
||||
stats = {{ debug.forget_me.stats }}
|
||||
|
||||
"#;
|
||||
|
||||
@@ -20,7 +20,7 @@ pub use nym_sphinx::addressing::clients::Recipient;
|
||||
|
||||
pub mod config;
|
||||
|
||||
type NativeClientBuilder<'a> = BaseClientBuilder<'a, QueryHttpRpcNyxdClient, OnDiskPersistent>;
|
||||
type NativeClientBuilder = BaseClientBuilder<QueryHttpRpcNyxdClient, OnDiskPersistent>;
|
||||
|
||||
pub struct SocketClient {
|
||||
/// Client configuration options, including, among other things, packet sending rates,
|
||||
@@ -32,6 +32,10 @@ pub struct SocketClient {
|
||||
}
|
||||
|
||||
impl SocketClient {
|
||||
pub fn config(&self) -> Config {
|
||||
self.config.clone()
|
||||
}
|
||||
|
||||
pub fn new(config: Config, custom_mixnet: Option<PathBuf>) -> Self {
|
||||
SocketClient {
|
||||
config,
|
||||
@@ -45,7 +49,7 @@ impl SocketClient {
|
||||
client_output: ClientOutput,
|
||||
client_state: ClientState,
|
||||
self_address: &Recipient,
|
||||
shutdown: nym_task::TaskClient,
|
||||
task_client: nym_task::TaskClient,
|
||||
packet_type: PacketType,
|
||||
) {
|
||||
info!("Starting websocket listener...");
|
||||
@@ -73,10 +77,15 @@ impl SocketClient {
|
||||
shared_lane_queue_lengths,
|
||||
reply_controller_sender,
|
||||
Some(packet_type),
|
||||
task_client.fork("websocket_handler"),
|
||||
);
|
||||
|
||||
websocket::Listener::new(config.socket.host, config.socket.listening_port)
|
||||
.start(websocket_handler, shutdown);
|
||||
websocket::Listener::new(
|
||||
config.socket.host,
|
||||
config.socket.listening_port,
|
||||
task_client.with_suffix("websocket_listener"),
|
||||
)
|
||||
.start(websocket_handler);
|
||||
}
|
||||
|
||||
/// blocking version of `start_socket` method. Will run forever (or until SIGINT is sent)
|
||||
@@ -108,8 +117,9 @@ impl SocketClient {
|
||||
let storage = self.initialise_storage().await?;
|
||||
let user_agent = nym_bin_common::bin_info!().into();
|
||||
|
||||
let mut base_client = BaseClientBuilder::new(&self.config.base, storage, dkg_query_client)
|
||||
.with_user_agent(user_agent);
|
||||
let mut base_client =
|
||||
BaseClientBuilder::new(self.config().base(), storage, dkg_query_client)
|
||||
.with_user_agent(user_agent);
|
||||
|
||||
if let Some(custom_mixnet) = &self.custom_mixnet {
|
||||
base_client = base_client.with_stored_topology(custom_mixnet)?;
|
||||
|
||||
@@ -82,6 +82,7 @@ impl From<Init> for OverrideConfig {
|
||||
nyxd_urls: init_config.common_args.nyxd_urls,
|
||||
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
|
||||
stats_reporting_address: init_config.common_args.stats_reporting_address,
|
||||
forget_me: init_config.common_args.forget_me.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ use nym_bin_common::completions::{fig_generate, ArgShell};
|
||||
use nym_client::client::Recipient;
|
||||
use nym_client_core::cli_helpers::CliClient;
|
||||
use nym_client_core::client::base_client::storage::migration_helpers::v1_1_33;
|
||||
use nym_client_core::config::ForgetMe;
|
||||
use nym_config::OptionalSet;
|
||||
use std::error::Error;
|
||||
use std::net::IpAddr;
|
||||
@@ -106,6 +107,7 @@ pub(crate) struct OverrideConfig {
|
||||
nyxd_urls: Option<Vec<url::Url>>,
|
||||
enabled_credentials_mode: Option<bool>,
|
||||
stats_reporting_address: Option<Recipient>,
|
||||
forget_me: ForgetMe,
|
||||
}
|
||||
|
||||
pub(crate) async fn execute(args: Cli) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
@@ -133,6 +135,7 @@ pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
|
||||
args.fastmode,
|
||||
)
|
||||
.with_base(BaseClientConfig::with_disabled_cover_traffic, args.no_cover)
|
||||
.with_base(BaseClientConfig::with_forget_me, args.forget_me)
|
||||
.with_optional(Config::with_port, args.port)
|
||||
.with_optional(Config::with_host, args.host)
|
||||
.with_optional_custom_env_ext(
|
||||
|
||||
@@ -41,6 +41,7 @@ impl From<Run> for OverrideConfig {
|
||||
nyxd_urls: run_config.common_args.nyxd_urls,
|
||||
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
|
||||
stats_reporting_address: run_config.common_args.stats_reporting_address,
|
||||
forget_me: run_config.common_args.forget_me.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use nym_sphinx::receiver::ReconstructedMessage;
|
||||
use nym_task::connections::{
|
||||
ConnectionCommand, ConnectionCommandSender, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use nym_task::TaskClient;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
@@ -43,9 +44,11 @@ pub(crate) struct HandlerBuilder {
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
packet_type: Option<PacketType>,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl HandlerBuilder {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
msg_input: InputMessageSender,
|
||||
client_connection_tx: ConnectionCommandSender,
|
||||
@@ -54,6 +57,7 @@ impl HandlerBuilder {
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
packet_type: Option<PacketType>,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
Self {
|
||||
msg_input,
|
||||
@@ -63,11 +67,14 @@ impl HandlerBuilder {
|
||||
lane_queue_lengths,
|
||||
reply_controller_sender,
|
||||
packet_type,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: make sure we only ever have one active handler
|
||||
pub fn create_active_handler(&self) -> Handler {
|
||||
let mut task_client = self.task_client.fork("active_handler");
|
||||
task_client.disarm();
|
||||
Handler {
|
||||
msg_input: self.msg_input.clone(),
|
||||
client_connection_tx: self.client_connection_tx.clone(),
|
||||
@@ -78,6 +85,7 @@ impl HandlerBuilder {
|
||||
lane_queue_lengths: self.lane_queue_lengths.clone(),
|
||||
reply_controller_sender: self.reply_controller_sender.clone(),
|
||||
packet_type: self.packet_type,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -92,16 +100,18 @@ pub(crate) struct Handler {
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
packet_type: Option<PacketType>,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl Drop for Handler {
|
||||
fn drop(&mut self) {
|
||||
if self
|
||||
if let Err(err) = self
|
||||
.buffer_requester
|
||||
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect)
|
||||
.is_err()
|
||||
{
|
||||
error!("we failed to disconnect the receiver from the buffer! presumably the shutdown procedure has been initiated!")
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("failed to disconnect the receiver from the buffer: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -125,10 +135,23 @@ impl Handler {
|
||||
};
|
||||
|
||||
// get the number of pending replies waiting for reply surbs
|
||||
let reply_queue_length = self
|
||||
let reply_queue_length = match self
|
||||
.reply_controller_sender
|
||||
.get_lane_queue_length(connection_id)
|
||||
.await;
|
||||
.await
|
||||
{
|
||||
Ok(length) => length,
|
||||
Err(err) => {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!(
|
||||
"Failed to get reply queue length for connection {connection_id}: {err}"
|
||||
);
|
||||
}
|
||||
// We're just going to assume that the queue is empty, and I think that's okay
|
||||
// during shutdown.
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
let queue_length = base_length + reply_queue_length;
|
||||
|
||||
@@ -168,10 +191,11 @@ impl Handler {
|
||||
|
||||
// the ack control is now responsible for chunking, etc.
|
||||
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
|
||||
self.msg_input
|
||||
.send(input_msg)
|
||||
.await
|
||||
.expect("InputMessageReceiver has stopped receiving!");
|
||||
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send message to the input buffer: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
|
||||
let TransmissionLane::ConnectionId(connection_id) = lane else {
|
||||
@@ -200,10 +224,11 @@ impl Handler {
|
||||
|
||||
let input_msg =
|
||||
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
|
||||
self.msg_input
|
||||
.send(input_msg)
|
||||
.await
|
||||
.expect("InputMessageReceiver has stopped receiving!");
|
||||
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send anonymous message to the input buffer: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
|
||||
let TransmissionLane::ConnectionId(connection_id) = lane else {
|
||||
@@ -227,10 +252,11 @@ impl Handler {
|
||||
});
|
||||
|
||||
let input_msg = InputMessage::new_reply(recipient_tag, message, lane, self.packet_type);
|
||||
self.msg_input
|
||||
.send(input_msg)
|
||||
.await
|
||||
.expect("InputMessageReceiver has stopped receiving!");
|
||||
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send reply message to the input buffer: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
|
||||
let TransmissionLane::ConnectionId(connection_id) = lane else {
|
||||
@@ -245,9 +271,14 @@ impl Handler {
|
||||
}
|
||||
|
||||
fn handle_closed_connection(&self, connection_id: u64) -> Option<ServerResponse> {
|
||||
self.client_connection_tx
|
||||
if let Err(err) = self
|
||||
.client_connection_tx
|
||||
.unbounded_send(ConnectionCommand::Close(connection_id))
|
||||
.unwrap();
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send close connection command: {err}");
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
@@ -362,11 +393,10 @@ impl Handler {
|
||||
}
|
||||
}
|
||||
|
||||
async fn listen_for_requests(
|
||||
&mut self,
|
||||
mut msg_receiver: ReconstructedMessagesReceiver,
|
||||
mut task_client: nym_task::TaskClient,
|
||||
) {
|
||||
async fn listen_for_requests(&mut self, mut msg_receiver: ReconstructedMessagesReceiver) {
|
||||
let mut task_client = self.task_client.fork("select");
|
||||
task_client.disarm();
|
||||
|
||||
while !task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
// we can either get a client request from the websocket
|
||||
@@ -415,15 +445,7 @@ impl Handler {
|
||||
}
|
||||
|
||||
// consume self to make sure `drop` is called after this is done
|
||||
pub(crate) async fn handle_connection(
|
||||
mut self,
|
||||
socket: TcpStream,
|
||||
mut task_client: nym_task::TaskClient,
|
||||
) {
|
||||
// We don't want a crash in the connection handler to trigger a shutdown of the whole
|
||||
// process.
|
||||
task_client.disarm();
|
||||
|
||||
pub(crate) async fn handle_connection(mut self, socket: TcpStream) {
|
||||
let ws_stream = match accept_async(socket).await {
|
||||
Ok(ws_stream) => ws_stream,
|
||||
Err(err) => {
|
||||
@@ -436,14 +458,18 @@ impl Handler {
|
||||
let (reconstructed_sender, reconstructed_receiver) = mpsc::unbounded();
|
||||
|
||||
// tell the buffer to start sending stuff to us
|
||||
self.buffer_requester
|
||||
.unbounded_send(ReceivedBufferMessage::ReceiverAnnounce(
|
||||
reconstructed_sender,
|
||||
))
|
||||
.expect("the buffer request failed!");
|
||||
if let Err(err) =
|
||||
self.buffer_requester
|
||||
.unbounded_send(ReceivedBufferMessage::ReceiverAnnounce(
|
||||
reconstructed_sender,
|
||||
))
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("failed to announce the receiver to the buffer: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
self.listen_for_requests(reconstructed_receiver, task_client)
|
||||
.await;
|
||||
self.listen_for_requests(reconstructed_receiver).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use super::handler::HandlerBuilder;
|
||||
use log::*;
|
||||
use nym_task::TaskClient;
|
||||
use std::net::IpAddr;
|
||||
use std::{net::SocketAddr, process, sync::Arc};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -22,21 +23,19 @@ impl State {
|
||||
pub(crate) struct Listener {
|
||||
address: SocketAddr,
|
||||
state: State,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub(crate) fn new(host: IpAddr, port: u16) -> Self {
|
||||
pub(crate) fn new(host: IpAddr, port: u16, task_client: TaskClient) -> Self {
|
||||
Listener {
|
||||
address: SocketAddr::new(host, port),
|
||||
state: State::AwaitingConnection,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run(
|
||||
&mut self,
|
||||
handler: HandlerBuilder,
|
||||
mut task_client: nym_task::TaskClient,
|
||||
) {
|
||||
pub(crate) async fn run(&mut self, handler: HandlerBuilder) {
|
||||
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
@@ -47,11 +46,11 @@ impl Listener {
|
||||
|
||||
let notify = Arc::new(Notify::new());
|
||||
|
||||
loop {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
// When the handler finishes we check if shutdown is signalled
|
||||
_ = notify.notified() => {
|
||||
if task_client.is_shutdown() {
|
||||
if self.task_client.is_shutdown() {
|
||||
log::trace!("Websocket listener: detected shutdown after connection closed");
|
||||
break;
|
||||
}
|
||||
@@ -60,7 +59,7 @@ impl Listener {
|
||||
}
|
||||
// ... but when there is no connected client at the time of shutdown being
|
||||
// signalled, we handle it here.
|
||||
_ = task_client.recv() => {
|
||||
_ = self.task_client.recv() => {
|
||||
if !self.state.is_connected() {
|
||||
log::trace!("Not connected: shutting down");
|
||||
break;
|
||||
@@ -88,9 +87,8 @@ impl Listener {
|
||||
// hanging because the executor doesn't come back here
|
||||
let notify_clone = Arc::clone(¬ify);
|
||||
let fresh_handler = handler.create_active_handler();
|
||||
let task_client_handler = task_client.clone();
|
||||
tokio::spawn(async move {
|
||||
fresh_handler.handle_connection(socket, task_client_handler).await;
|
||||
fresh_handler.handle_connection(socket).await;
|
||||
notify_clone.notify_one();
|
||||
});
|
||||
self.state = State::Connected;
|
||||
@@ -104,13 +102,9 @@ impl Listener {
|
||||
log::debug!("Websocket listener: Exiting");
|
||||
}
|
||||
|
||||
pub(crate) fn start(
|
||||
mut self,
|
||||
handler: HandlerBuilder,
|
||||
shutdown: nym_task::TaskClient,
|
||||
) -> JoinHandle<()> {
|
||||
pub(crate) fn start(mut self, handler: HandlerBuilder) -> JoinHandle<()> {
|
||||
info!("Running websocket on {:?}", self.address.to_string());
|
||||
|
||||
tokio::spawn(async move { self.run(handler, shutdown).await })
|
||||
tokio::spawn(async move { self.run(handler).await })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nym-socks5-client"
|
||||
version = "1.1.46"
|
||||
version = "1.1.48"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
|
||||
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
|
||||
edition = "2021"
|
||||
|
||||
@@ -93,6 +93,7 @@ impl From<Init> for OverrideConfig {
|
||||
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
|
||||
outfox: false,
|
||||
stats_reporting_address: init_config.common_args.stats_reporting_address,
|
||||
forget_me: init_config.common_args.forget_me.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ use nym_bin_common::completions::{fig_generate, ArgShell};
|
||||
use nym_client_core::cli_helpers::CliClient;
|
||||
use nym_client_core::client::base_client::storage::migration_helpers::v1_1_33;
|
||||
use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup;
|
||||
use nym_client_core::config::{GroupBy, TopologyStructure};
|
||||
use nym_client_core::config::{ForgetMe, GroupBy, TopologyStructure};
|
||||
use nym_config::OptionalSet;
|
||||
use nym_sphinx::addressing::Recipient;
|
||||
use nym_sphinx::params::{PacketSize, PacketType};
|
||||
@@ -113,6 +113,7 @@ pub(crate) struct OverrideConfig {
|
||||
enabled_credentials_mode: Option<bool>,
|
||||
outfox: bool,
|
||||
stats_reporting_address: Option<Recipient>,
|
||||
forget_me: ForgetMe,
|
||||
}
|
||||
|
||||
pub(crate) async fn execute(args: Cli) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
@@ -179,6 +180,7 @@ pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
|
||||
BaseClientConfig::with_topology_structure,
|
||||
topology_structure,
|
||||
)
|
||||
.with_base(BaseClientConfig::with_forget_me, args.forget_me)
|
||||
.with_optional(Config::with_anonymous_replies, args.use_anonymous_replies)
|
||||
.with_optional(Config::with_port, args.port)
|
||||
.with_optional(Config::with_ip, args.ip)
|
||||
|
||||
@@ -65,6 +65,7 @@ impl From<Run> for OverrideConfig {
|
||||
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
|
||||
outfox: run_config.outfox,
|
||||
stats_reporting_address: run_config.common_args.stats_reporting_address,
|
||||
forget_me: run_config.common_args.forget_me.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,4 +113,8 @@ enabled = {{ core.debug.stats_reporting.enabled }}
|
||||
provider_address = '{{ core.debug.stats_reporting.provider_address }}'
|
||||
reporting_interval = '{{ core.debug.stats_reporting.reporting_interval }}'
|
||||
|
||||
[core.debug.forget_me]
|
||||
client = {{ core.debug.forget_me.client }}
|
||||
stats = {{ core.debug.forget_me.stats }}
|
||||
|
||||
"#;
|
||||
|
||||
@@ -60,7 +60,7 @@ impl From<IpAddr> for IpPair {
|
||||
std::net::IpAddr::V4(ipv4_addr) => (ipv4_addr.octets()[2], ipv4_addr.octets()[3]),
|
||||
std::net::IpAddr::V6(ipv6_addr) => (ipv6_addr.octets()[14], ipv6_addr.octets()[15]),
|
||||
};
|
||||
let last_bytes = (before_last_byte as u16) << 8 | last_byte as u16;
|
||||
let last_bytes = ((before_last_byte as u16) << 8) | last_byte as u16;
|
||||
let ipv4 = Ipv4Addr::new(
|
||||
WG_TUN_DEVICE_IP_ADDRESS_V4.octets()[0],
|
||||
WG_TUN_DEVICE_IP_ADDRESS_V4.octets()[1],
|
||||
|
||||
@@ -40,6 +40,7 @@ nym-crypto = { path = "../crypto" }
|
||||
nym-explorer-client = { path = "../../explorer-api/explorer-client" }
|
||||
nym-gateway-client = { path = "../client-libs/gateway-client" }
|
||||
nym-gateway-requests = { path = "../gateway-requests" }
|
||||
nym-http-api-client = { path = "../http-api-client" }
|
||||
nym-metrics = { path = "../nym-metrics" }
|
||||
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
|
||||
nym-sphinx = { path = "../nymsphinx" }
|
||||
|
||||
@@ -145,6 +145,11 @@ impl Config {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_forget_me(mut self, forget_me: ForgetMe) -> Self {
|
||||
self.debug.forget_me = forget_me;
|
||||
self
|
||||
}
|
||||
|
||||
// TODO: this should be refactored properly
|
||||
// as of 12.09.23 the below is true (not sure how this comment will rot in the future)
|
||||
// medium_toggle:
|
||||
@@ -517,7 +522,7 @@ impl Default for Acknowledgements {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
#[serde(default)]
|
||||
pub struct Topology {
|
||||
/// The uniform delay every which clients are querying the directory server
|
||||
/// to try to obtain a compatible network topology to send sphinx packets through.
|
||||
@@ -558,6 +563,10 @@ pub struct Topology {
|
||||
/// Specifies whether this client should ignore the current epoch role of the target egress node
|
||||
/// when constructing the final hop packets.
|
||||
pub ignore_egress_epoch_role: bool,
|
||||
|
||||
/// Specifies whether this client should ignore the current epoch role of the ingress node
|
||||
/// when attempting to establish new connection
|
||||
pub ignore_ingress_epoch_role: bool,
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
@@ -595,7 +604,9 @@ impl Default for Topology {
|
||||
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
|
||||
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
|
||||
use_extended_topology: false,
|
||||
ignore_egress_epoch_role: false,
|
||||
|
||||
ignore_egress_epoch_role: true,
|
||||
ignore_ingress_epoch_role: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -713,6 +724,9 @@ pub struct DebugConfig {
|
||||
|
||||
/// Defines all configuration options related to stats reporting.
|
||||
pub stats_reporting: StatsReporting,
|
||||
|
||||
/// Defines all configuration options related to the forget me flag.
|
||||
pub forget_me: ForgetMe,
|
||||
}
|
||||
|
||||
impl DebugConfig {
|
||||
@@ -735,6 +749,69 @@ impl Default for DebugConfig {
|
||||
topology: Default::default(),
|
||||
reply_surbs: Default::default(),
|
||||
stats_reporting: Default::default(),
|
||||
forget_me: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default, Debug, Deserialize, PartialEq, Serialize, Copy)]
|
||||
pub struct ForgetMe {
|
||||
client: bool,
|
||||
stats: bool,
|
||||
}
|
||||
|
||||
impl From<bool> for ForgetMe {
|
||||
fn from(value: bool) -> Self {
|
||||
if value {
|
||||
Self::new_all()
|
||||
} else {
|
||||
Self::new_none()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ForgetMe {
|
||||
pub fn new_all() -> Self {
|
||||
Self {
|
||||
client: true,
|
||||
stats: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_client() -> Self {
|
||||
Self {
|
||||
client: true,
|
||||
stats: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_stats() -> Self {
|
||||
Self {
|
||||
client: false,
|
||||
stats: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(client: bool, stats: bool) -> Self {
|
||||
Self { client, stats }
|
||||
}
|
||||
|
||||
pub fn any(&self) -> bool {
|
||||
self.client || self.stats
|
||||
}
|
||||
|
||||
pub fn client(&self) -> bool {
|
||||
self.client
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> bool {
|
||||
self.stats
|
||||
}
|
||||
|
||||
pub fn new_none() -> Self {
|
||||
Self {
|
||||
client: false,
|
||||
stats: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,7 +182,7 @@ impl From<ConfigV5> for Config {
|
||||
maximum_reply_key_age: value.debug.reply_surbs.maximum_reply_key_age,
|
||||
surb_mix_hops: value.debug.reply_surbs.surb_mix_hops,
|
||||
},
|
||||
stats_reporting: Default::default(),
|
||||
..Default::default()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,6 +120,7 @@ where
|
||||
&core.client.nym_api_urls,
|
||||
user_agent,
|
||||
core.debug.topology.minimum_gateway_performance,
|
||||
core.debug.topology.ignore_ingress_epoch_role,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
@@ -93,6 +93,10 @@ pub struct CommonClientInitArgs {
|
||||
/// Sets the address to report statistics
|
||||
#[cfg_attr(feature = "cli", clap(long, hide = true))]
|
||||
pub stats_reporting_address: Option<Recipient>,
|
||||
|
||||
/// Sets the forget me flag
|
||||
#[cfg_attr(feature = "cli", clap(long, hide = true, default_value_t = false))]
|
||||
pub forget_me: bool,
|
||||
}
|
||||
|
||||
pub struct InitResultsWithConfig<T> {
|
||||
@@ -175,6 +179,7 @@ where
|
||||
&core.client.nym_api_urls,
|
||||
user_agent,
|
||||
core.debug.topology.minimum_gateway_performance,
|
||||
core.debug.topology.ignore_ingress_epoch_role,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
@@ -61,4 +61,8 @@ pub struct CommonClientRunArgs {
|
||||
/// Sets the address to report statistics
|
||||
#[cfg_attr(feature = "cli", clap(long, hide = true))]
|
||||
pub stats_reporting_address: Option<Recipient>,
|
||||
|
||||
/// Sets the forget me flag
|
||||
#[cfg_attr(feature = "cli", clap(long, hide = true, default_value_t = false))]
|
||||
pub forget_me: bool,
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use super::mix_traffic::ClientRequestSender;
|
||||
use super::received_buffer::ReceivedBufferMessage;
|
||||
use super::statistics_control::StatisticsControl;
|
||||
use crate::client::base_client::storage::helpers::store_client_keys;
|
||||
@@ -31,13 +32,15 @@ use crate::init::{
|
||||
setup_gateway,
|
||||
types::{GatewaySetup, InitialisationResult},
|
||||
};
|
||||
use crate::{config, spawn_future, ForgetMe};
|
||||
use crate::{config, spawn_future};
|
||||
use futures::channel::mpsc;
|
||||
use log::*;
|
||||
use nym_bandwidth_controller::BandwidthController;
|
||||
use nym_client_core_config_types::ForgetMe;
|
||||
use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
|
||||
use nym_credential_storage::storage::Storage as CredentialStorage;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_crypto::hkdf::DerivationMaterial;
|
||||
use nym_gateway_client::client::config::GatewayClientConfig;
|
||||
use nym_gateway_client::{
|
||||
AcknowledgementReceiver, GatewayClient, GatewayConfig, MixnetMessageReceiver, PacketRouter,
|
||||
@@ -175,8 +178,8 @@ impl From<bool> for CredentialsToggle {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
|
||||
config: &'a Config,
|
||||
pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
|
||||
config: Config,
|
||||
client_store: S,
|
||||
dkg_query_client: Option<C>,
|
||||
|
||||
@@ -191,19 +194,19 @@ pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
|
||||
forget_me: ForgetMe,
|
||||
derivation_material: Option<DerivationMaterial>,
|
||||
}
|
||||
|
||||
impl<'a, C, S> BaseClientBuilder<'a, C, S>
|
||||
impl<C, S> BaseClientBuilder<C, S>
|
||||
where
|
||||
S: MixnetClientStorage + 'static,
|
||||
C: DkgQueryClient + Send + Sync + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
base_config: &'a Config,
|
||||
base_config: Config,
|
||||
client_store: S,
|
||||
dkg_query_client: Option<C>,
|
||||
) -> BaseClientBuilder<'a, C, S> {
|
||||
) -> BaseClientBuilder<C, S> {
|
||||
BaseClientBuilder {
|
||||
config: base_config,
|
||||
client_store,
|
||||
@@ -216,13 +219,22 @@ where
|
||||
setup_method: GatewaySetup::MustLoad { gateway_id: None },
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
forget_me: Default::default(),
|
||||
derivation_material: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_derivation_material(
|
||||
mut self,
|
||||
derivation_material: Option<DerivationMaterial>,
|
||||
) -> Self {
|
||||
self.derivation_material = derivation_material;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
|
||||
self.forget_me = forget_me.clone();
|
||||
self.config.debug.forget_me = *forget_me;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -298,7 +310,7 @@ where
|
||||
topology_accessor: TopologyAccessor,
|
||||
mix_tx: BatchMixMessageSender,
|
||||
stats_tx: ClientStatsSender,
|
||||
shutdown: TaskClient,
|
||||
task_client: TaskClient,
|
||||
) {
|
||||
info!("Starting loop cover traffic stream...");
|
||||
|
||||
@@ -311,9 +323,10 @@ where
|
||||
debug_config.traffic,
|
||||
debug_config.cover_traffic,
|
||||
stats_tx,
|
||||
task_client,
|
||||
);
|
||||
|
||||
stream.start_with_shutdown(shutdown);
|
||||
stream.start();
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -328,7 +341,7 @@ where
|
||||
reply_controller_receiver: ReplyControllerReceiver,
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
client_connection_rx: ConnectionCommandReceiver,
|
||||
shutdown: TaskClient,
|
||||
task_client: TaskClient,
|
||||
packet_type: PacketType,
|
||||
stats_tx: ClientStatsSender,
|
||||
) {
|
||||
@@ -346,8 +359,9 @@ where
|
||||
lane_queue_lengths,
|
||||
client_connection_rx,
|
||||
stats_tx,
|
||||
task_client,
|
||||
)
|
||||
.start_with_shutdown(shutdown, packet_type);
|
||||
.start(packet_type);
|
||||
}
|
||||
|
||||
// buffer controlling all messages fetched from provider
|
||||
@@ -370,8 +384,9 @@ where
|
||||
reply_key_storage,
|
||||
reply_controller_sender,
|
||||
metrics_reporter,
|
||||
shutdown,
|
||||
);
|
||||
controller.start_with_shutdown(shutdown)
|
||||
controller.start()
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -560,15 +575,22 @@ where
|
||||
topology_accessor: TopologyAccessor,
|
||||
local_gateway: NodeIdentity,
|
||||
wait_for_gateway: bool,
|
||||
mut shutdown: TaskClient,
|
||||
mut task_client: TaskClient,
|
||||
) -> Result<(), ClientCoreError> {
|
||||
let topology_refresher_config =
|
||||
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
|
||||
|
||||
if topology_config.disable_refreshing {
|
||||
// if we're not spawning the refresher, don't cause shutdown immediately
|
||||
info!("The background topology refesher is not going to be started");
|
||||
task_client.disarm();
|
||||
}
|
||||
|
||||
let mut topology_refresher = TopologyRefresher::new(
|
||||
topology_refresher_config,
|
||||
topology_accessor,
|
||||
topology_provider,
|
||||
task_client,
|
||||
);
|
||||
// before returning, block entire runtime to refresh the current network view so that any
|
||||
// components depending on topology would see a non-empty view
|
||||
@@ -609,15 +631,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
if topology_config.disable_refreshing {
|
||||
// if we're not spawning the refresher, don't cause shutdown immediately
|
||||
info!("The topology refesher is not going to be started");
|
||||
shutdown.disarm();
|
||||
} else {
|
||||
if !topology_config.disable_refreshing {
|
||||
// don't spawn the refresher if we don't want to be refreshing the topology.
|
||||
// only use the initial values obtained
|
||||
info!("Starting topology refresher...");
|
||||
topology_refresher.start_with_shutdown(shutdown);
|
||||
topology_refresher.start();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -628,30 +646,29 @@ where
|
||||
user_agent: Option<UserAgent>,
|
||||
client_stats_id: String,
|
||||
input_sender: Sender<InputMessage>,
|
||||
shutdown: TaskClient,
|
||||
task_client: TaskClient,
|
||||
) -> ClientStatsSender {
|
||||
info!("Starting statistics control...");
|
||||
StatisticsControl::create_and_start_with_shutdown(
|
||||
StatisticsControl::create_and_start(
|
||||
config.debug.stats_reporting,
|
||||
user_agent
|
||||
.map(|u| u.application)
|
||||
.unwrap_or("unknown".to_string()),
|
||||
client_stats_id,
|
||||
input_sender.clone(),
|
||||
shutdown.with_suffix("controller"),
|
||||
task_client,
|
||||
)
|
||||
}
|
||||
|
||||
fn start_mix_traffic_controller(
|
||||
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
|
||||
shutdown: TaskClient,
|
||||
forget_me: ForgetMe,
|
||||
) -> BatchMixMessageSender {
|
||||
) -> (BatchMixMessageSender, ClientRequestSender) {
|
||||
info!("Starting mix traffic controller...");
|
||||
let (mix_traffic_controller, mix_tx) =
|
||||
MixTrafficController::new(gateway_transceiver, forget_me);
|
||||
mix_traffic_controller.start_with_shutdown(shutdown);
|
||||
mix_tx
|
||||
let (mix_traffic_controller, mix_tx, client_tx) =
|
||||
MixTrafficController::new(gateway_transceiver, shutdown);
|
||||
mix_traffic_controller.start();
|
||||
(mix_tx, client_tx)
|
||||
}
|
||||
|
||||
// TODO: rename it as it implies the data is persistent whilst one can use InMemBackend
|
||||
@@ -686,6 +703,7 @@ where
|
||||
setup_method: GatewaySetup,
|
||||
key_store: &S::KeyStore,
|
||||
details_store: &S::GatewaysDetailsStore,
|
||||
derivation_material: Option<DerivationMaterial>,
|
||||
) -> Result<InitialisationResult, ClientCoreError>
|
||||
where
|
||||
<S::KeyStore as KeyStore>::StorageError: Sync + Send,
|
||||
@@ -695,7 +713,12 @@ where
|
||||
if key_store.load_keys().await.is_err() {
|
||||
info!("could not find valid client keys - a new set will be generated");
|
||||
let mut rng = OsRng;
|
||||
let keys = ClientKeys::generate_new(&mut rng);
|
||||
let keys = if let Some(derivation_material) = derivation_material {
|
||||
ClientKeys::from_master_key(&mut rng, &derivation_material)
|
||||
.map_err(|_| ClientCoreError::HkdfDerivationError {})?
|
||||
} else {
|
||||
ClientKeys::generate_new(&mut rng)
|
||||
};
|
||||
store_client_keys(keys, key_store).await?;
|
||||
}
|
||||
|
||||
@@ -717,6 +740,7 @@ where
|
||||
self.setup_method,
|
||||
self.client_store.key_store(),
|
||||
self.client_store.gateway_details_store(),
|
||||
self.derivation_material,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -773,7 +797,7 @@ where
|
||||
);
|
||||
|
||||
let stats_reporter = Self::start_statistics_control(
|
||||
self.config,
|
||||
&self.config,
|
||||
self.user_agent.clone(),
|
||||
generate_client_stats_id(*self_address.identity()),
|
||||
input_sender.clone(),
|
||||
@@ -799,7 +823,7 @@ where
|
||||
|
||||
let gateway_transceiver = Self::setup_gateway_transceiver(
|
||||
self.custom_gateway_transceiver,
|
||||
self.config,
|
||||
&self.config,
|
||||
init_res,
|
||||
bandwidth_controller,
|
||||
&details_store,
|
||||
@@ -833,10 +857,9 @@ where
|
||||
// traffic stream.
|
||||
// The MixTrafficController then sends the actual traffic
|
||||
|
||||
let message_sender = Self::start_mix_traffic_controller(
|
||||
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
|
||||
gateway_transceiver,
|
||||
shutdown.fork("mix_traffic_controller"),
|
||||
self.forget_me,
|
||||
);
|
||||
|
||||
// Channels that the websocket listener can use to signal downstream to the real traffic
|
||||
@@ -911,6 +934,8 @@ where
|
||||
},
|
||||
stats_reporter,
|
||||
task_handle: shutdown,
|
||||
client_request_sender,
|
||||
forget_me: self.config.debug.forget_me,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -922,6 +947,7 @@ pub struct BaseClient {
|
||||
pub client_output: ClientOutputStatus,
|
||||
pub client_state: ClientState,
|
||||
pub stats_reporter: ClientStatsSender,
|
||||
|
||||
pub client_request_sender: ClientRequestSender,
|
||||
pub task_handle: TaskHandle,
|
||||
pub forget_me: ForgetMe,
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
// TODO: combine those more closely. Perhaps into a single underlying store.
|
||||
// Like for persistent, on-disk, storage, what's the point of having 3 different databases?
|
||||
|
||||
use rand::rngs::OsRng;
|
||||
|
||||
use crate::client::key_manager::persistence::{InMemEphemeralKeys, KeyStore};
|
||||
use crate::client::replies::reply_storage;
|
||||
use crate::client::replies::reply_storage::ReplyStorageBackend;
|
||||
@@ -63,7 +65,7 @@ pub trait MixnetClientStorage {
|
||||
fn gateway_details_store(&self) -> &Self::GatewaysDetailsStore;
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Clone)]
|
||||
pub struct Ephemeral {
|
||||
key_store: InMemEphemeralKeys,
|
||||
reply_store: reply_storage::Empty,
|
||||
@@ -71,9 +73,14 @@ pub struct Ephemeral {
|
||||
gateway_details_store: InMemGatewaysDetails,
|
||||
}
|
||||
|
||||
impl Ephemeral {
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
impl Default for Ephemeral {
|
||||
fn default() -> Self {
|
||||
Ephemeral {
|
||||
key_store: InMemEphemeralKeys::new(&mut OsRng),
|
||||
reply_store: Default::default(),
|
||||
credential_store: Default::default(),
|
||||
gateway_details_store: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ use nym_sphinx::cover::generate_loop_cover_packet;
|
||||
use nym_sphinx::params::{PacketSize, PacketType};
|
||||
use nym_sphinx::utils::sample_poisson_duration;
|
||||
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
|
||||
use nym_task::TaskClient;
|
||||
use rand::{rngs::OsRng, CryptoRng, Rng};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
@@ -64,6 +65,8 @@ where
|
||||
packet_type: PacketType,
|
||||
|
||||
stats_tx: ClientStatsSender,
|
||||
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl<R> Stream for LoopCoverTrafficStream<R>
|
||||
@@ -110,6 +113,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
traffic_config: config::Traffic,
|
||||
cover_config: config::CoverTraffic,
|
||||
stats_tx: ClientStatsSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
let rng = OsRng;
|
||||
|
||||
@@ -128,6 +132,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
secondary_packet_size: traffic_config.secondary_packet_size,
|
||||
packet_type: traffic_config.packet_type,
|
||||
stats_tx,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,7 +180,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
}
|
||||
};
|
||||
|
||||
let cover_message = generate_loop_cover_packet(
|
||||
let cover_message = match generate_loop_cover_packet(
|
||||
&mut self.rng,
|
||||
topology_ref,
|
||||
&self.ack_key,
|
||||
@@ -184,8 +189,15 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
self.cover_traffic.loop_cover_traffic_average_delay,
|
||||
cover_traffic_packet_size,
|
||||
self.packet_type,
|
||||
)
|
||||
.expect("Somehow failed to generate a loop cover message with a valid topology");
|
||||
) {
|
||||
Ok(cover_message) => cover_message,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Somehow failed to generate a loop cover message with a valid topology: {err}"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = self.mix_tx.try_send(vec![cover_message]) {
|
||||
match err {
|
||||
@@ -217,7 +229,7 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
|
||||
pub fn start(mut self) {
|
||||
if self.cover_traffic.disable_loop_cover_traffic_stream {
|
||||
// we should have never got here in the first place - the task should have never been created to begin with
|
||||
// so panic and review the code that lead to this branch
|
||||
@@ -231,6 +243,8 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
);
|
||||
self.set_next_delay(sampled);
|
||||
|
||||
let mut shutdown = self.task_client.fork("select");
|
||||
|
||||
spawn_future(async move {
|
||||
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
|
||||
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::key_manager::persistence::KeyStore;
|
||||
use nym_crypto::asymmetric::{encryption, identity};
|
||||
use nym_crypto::{
|
||||
asymmetric::{encryption, identity},
|
||||
hkdf::{DerivationMaterial, InvalidLength},
|
||||
};
|
||||
use nym_gateway_requests::shared_key::{LegacySharedKeys, SharedGatewayKey, SharedSymmetricKey};
|
||||
use nym_sphinx::acknowledgements::AckKey;
|
||||
use rand::{CryptoRng, RngCore};
|
||||
@@ -10,6 +13,7 @@ use std::sync::Arc;
|
||||
use zeroize::ZeroizeOnDrop;
|
||||
|
||||
pub mod persistence;
|
||||
mod test;
|
||||
|
||||
// Note: to support key rotation in the future, all keys will require adding an extra smart pointer,
|
||||
// most likely an AtomicCell, or if it doesn't work as I think it does, a Mutex. Although I think
|
||||
@@ -43,6 +47,24 @@ impl ClientKeys {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_master_key<R>(
|
||||
rng: &mut R,
|
||||
derivation_material: &DerivationMaterial,
|
||||
) -> Result<Self, InvalidLength>
|
||||
where
|
||||
R: RngCore + CryptoRng,
|
||||
{
|
||||
let secret = derivation_material.derive_secret()?;
|
||||
Ok(ClientKeys {
|
||||
identity_keypair: Arc::new(identity::KeyPair::from_secret(
|
||||
secret,
|
||||
derivation_material.index(),
|
||||
)),
|
||||
encryption_keypair: Arc::new(encryption::KeyPair::new(rng)),
|
||||
ack_key: Arc::new(AckKey::new(rng)),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn from_keys(
|
||||
id_keypair: identity::KeyPair,
|
||||
enc_keypair: encryption::KeyPair,
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::client::key_manager::ClientKeys;
|
||||
use async_trait::async_trait;
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use std::error::Error;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -195,9 +196,20 @@ impl KeyStore for OnDiskKeys {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Clone)]
|
||||
pub struct InMemEphemeralKeys {
|
||||
keys: Arc<Mutex<Option<ClientKeys>>>,
|
||||
keys: Arc<Mutex<ClientKeys>>,
|
||||
}
|
||||
|
||||
impl InMemEphemeralKeys {
|
||||
pub fn new<R>(rng: &mut R) -> Self
|
||||
where
|
||||
R: RngCore + CryptoRng,
|
||||
{
|
||||
InMemEphemeralKeys {
|
||||
keys: Arc::new(Mutex::new(ClientKeys::generate_new(rng))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -210,11 +222,11 @@ impl KeyStore for InMemEphemeralKeys {
|
||||
type StorageError = EphemeralKeysError;
|
||||
|
||||
async fn load_keys(&self) -> Result<ClientKeys, Self::StorageError> {
|
||||
self.keys.lock().await.clone().ok_or(EphemeralKeysError)
|
||||
Ok(self.keys.lock().await.clone())
|
||||
}
|
||||
|
||||
async fn store_keys(&self, keys: &ClientKeys) -> Result<(), Self::StorageError> {
|
||||
*self.keys.lock().await = Some(keys.clone());
|
||||
*self.keys.lock().await = keys.clone();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::client::key_manager::ClientKeys;
|
||||
use nym_crypto::hkdf::DerivationMaterial;
|
||||
use rand::SeedableRng;
|
||||
use rand_chacha::ChaCha20Rng;
|
||||
|
||||
#[test]
|
||||
fn test_from_master_key_success() {
|
||||
// Set up a deterministic RNG.
|
||||
let seed = [33u8; 32];
|
||||
let mut rng = ChaCha20Rng::from_seed(seed);
|
||||
|
||||
// Set up the derivation material.
|
||||
let master_key = b"this is a secret master key";
|
||||
let salt = b"unique-salt";
|
||||
let derivation_material = DerivationMaterial::new(master_key, 0, salt);
|
||||
|
||||
// Generate ClientKeys from the master key.
|
||||
let client_keys = ClientKeys::from_master_key(&mut rng, &derivation_material)
|
||||
.expect("Failed to create client keys");
|
||||
|
||||
assert_eq!(
|
||||
client_keys.identity_keypair().public_key().to_string(),
|
||||
String::from("FX4Undr5LPPBA7zThWWpAKXKQTXSbW1C28PnxbCqUkU4")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
client_keys.identity_keypair().private_key().to_string(),
|
||||
String::from("6S3uMi2rU5SwyUUYCiMrF5qqdcYnEDMYLggBSvavVzEt")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_from_master_key_deterministic_identity() {
|
||||
// Using identical derivation material should result in the exactly same identity keypair.
|
||||
let seed = [1u8; 32];
|
||||
let mut rng1 = ChaCha20Rng::from_seed(seed);
|
||||
let mut rng2 = ChaCha20Rng::from_seed(seed);
|
||||
|
||||
let master_key = b"another secret master key";
|
||||
let salt = b"deterministic-salt";
|
||||
let index = 7u32;
|
||||
let derivation_material = DerivationMaterial::new(master_key, index, salt);
|
||||
|
||||
let client_keys1 = ClientKeys::from_master_key(&mut rng1, &derivation_material)
|
||||
.expect("Failed to create client keys (first instance)");
|
||||
let client_keys2 = ClientKeys::from_master_key(&mut rng2, &derivation_material)
|
||||
.expect("Failed to create client keys (second instance)");
|
||||
|
||||
assert_eq!(
|
||||
client_keys1.identity_keypair().public_key().to_string(),
|
||||
client_keys2.identity_keypair().public_key().to_string()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
client_keys1.identity_keypair().private_key().to_string(),
|
||||
client_keys2.identity_keypair().private_key().to_string()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_from_master_key_different_indices() {
|
||||
// Changing the index should yield a different identity key.
|
||||
let seed = [5u8; 32];
|
||||
let mut rng = ChaCha20Rng::from_seed(seed);
|
||||
|
||||
let master_key = b"same secret key";
|
||||
let salt = b"same-salt";
|
||||
|
||||
let derivation_material1 = DerivationMaterial::new(master_key, 1, salt);
|
||||
let derivation_material2 = DerivationMaterial::new(master_key, 2, salt);
|
||||
|
||||
let client_keys1 = ClientKeys::from_master_key(&mut rng, &derivation_material1)
|
||||
.expect("Failed to create client keys for index 1");
|
||||
let client_keys2 = ClientKeys::from_master_key(&mut rng, &derivation_material2)
|
||||
.expect("Failed to create client keys for index 2");
|
||||
|
||||
assert_ne!(
|
||||
client_keys1.identity_keypair().public_key().to_string(),
|
||||
client_keys2.identity_keypair().public_key().to_string()
|
||||
);
|
||||
|
||||
assert_ne!(
|
||||
client_keys1.identity_keypair().private_key().to_string(),
|
||||
client_keys2.identity_keypair().private_key().to_string()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -2,13 +2,18 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
|
||||
use crate::{spawn_future, ForgetMe};
|
||||
use crate::error::ClientCoreError;
|
||||
use crate::spawn_future;
|
||||
use log::*;
|
||||
use nym_gateway_requests::ClientRequest;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_task::TaskClient;
|
||||
use transceiver::ErasedGatewayError;
|
||||
|
||||
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
|
||||
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
|
||||
pub type ClientRequestReceiver = tokio::sync::mpsc::Receiver<ClientRequest>;
|
||||
pub type ClientRequestSender = tokio::sync::mpsc::Sender<ClientRequest>;
|
||||
|
||||
pub mod transceiver;
|
||||
|
||||
@@ -23,52 +28,73 @@ pub struct MixTrafficController {
|
||||
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
|
||||
|
||||
mix_rx: BatchMixMessageReceiver,
|
||||
client_rx: ClientRequestReceiver,
|
||||
|
||||
// TODO: this is temporary work-around.
|
||||
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
|
||||
consecutive_gateway_failure_count: usize,
|
||||
forget_me: ForgetMe,
|
||||
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl MixTrafficController {
|
||||
pub fn new<T>(
|
||||
gateway_transceiver: T,
|
||||
forget_me: ForgetMe,
|
||||
) -> (MixTrafficController, BatchMixMessageSender)
|
||||
task_client: TaskClient,
|
||||
) -> (
|
||||
MixTrafficController,
|
||||
BatchMixMessageSender,
|
||||
ClientRequestSender,
|
||||
)
|
||||
where
|
||||
T: GatewayTransceiver + Send + 'static,
|
||||
{
|
||||
let (message_sender, message_receiver) =
|
||||
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
|
||||
|
||||
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
(
|
||||
MixTrafficController {
|
||||
gateway_transceiver: Box::new(gateway_transceiver),
|
||||
mix_rx: message_receiver,
|
||||
client_rx: client_receiver,
|
||||
consecutive_gateway_failure_count: 0,
|
||||
forget_me,
|
||||
task_client,
|
||||
},
|
||||
message_sender,
|
||||
client_sender,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn new_dynamic(
|
||||
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
|
||||
forget_me: ForgetMe,
|
||||
) -> (MixTrafficController, BatchMixMessageSender) {
|
||||
task_client: TaskClient,
|
||||
) -> (
|
||||
MixTrafficController,
|
||||
BatchMixMessageSender,
|
||||
ClientRequestSender,
|
||||
) {
|
||||
let (message_sender, message_receiver) =
|
||||
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
|
||||
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(1);
|
||||
(
|
||||
MixTrafficController {
|
||||
gateway_transceiver,
|
||||
mix_rx: message_receiver,
|
||||
client_rx: client_receiver,
|
||||
consecutive_gateway_failure_count: 0,
|
||||
forget_me,
|
||||
task_client,
|
||||
},
|
||||
message_sender,
|
||||
client_sender,
|
||||
)
|
||||
}
|
||||
|
||||
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
|
||||
async fn on_messages(
|
||||
&mut self,
|
||||
mut mix_packets: Vec<MixPacket>,
|
||||
) -> Result<(), ErasedGatewayError> {
|
||||
debug_assert!(!mix_packets.is_empty());
|
||||
|
||||
let result = if mix_packets.len() == 1 {
|
||||
@@ -80,64 +106,60 @@ impl MixTrafficController {
|
||||
.await
|
||||
};
|
||||
|
||||
match result {
|
||||
Err(err) => {
|
||||
error!("Failed to send sphinx packet(s) to the gateway: {err}");
|
||||
self.consecutive_gateway_failure_count += 1;
|
||||
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
|
||||
// todo: in the future this should initiate a 'graceful' shutdown or try
|
||||
// to reconnect?
|
||||
panic!("failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead. Can't do anything about it yet :(")
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
|
||||
self.consecutive_gateway_failure_count = 0;
|
||||
}
|
||||
if result.is_err() {
|
||||
self.consecutive_gateway_failure_count += 1;
|
||||
} else {
|
||||
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
|
||||
self.consecutive_gateway_failure_count = 0;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
|
||||
pub fn start(mut self) {
|
||||
spawn_future(async move {
|
||||
debug!("Started MixTrafficController with graceful shutdown support");
|
||||
|
||||
loop {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
mix_packets = self.mix_rx.recv() => match mix_packets {
|
||||
Some(mix_packets) => {
|
||||
self.on_messages(mix_packets).await;
|
||||
if let Err(err) = self.on_messages(mix_packets).await {
|
||||
error!("Failed to send sphinx packet(s) to the gateway: {err}");
|
||||
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
|
||||
// Disconnect from the gateway. If we should try to re-connect
|
||||
// is handled at a higher layer.
|
||||
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
|
||||
// Do we need to handle the embedded mixnet client case
|
||||
// separately?
|
||||
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {
|
||||
log::trace!("MixTrafficController: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
client_request = self.client_rx.recv() => match client_request {
|
||||
Some(client_request) => {
|
||||
match self.gateway_transceiver.send_client_request(client_request).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => error!("Failed to send client request: {}", e),
|
||||
};
|
||||
},
|
||||
None => {
|
||||
log::trace!("MixTrafficController, client request channel closed");
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("MixTrafficController: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
|
||||
if self.forget_me.any() {
|
||||
log::info!("Sending forget me request to the gateway");
|
||||
match self
|
||||
.gateway_transceiver
|
||||
.send_client_request(ClientRequest::ForgetMe {
|
||||
client: self.forget_me.client(),
|
||||
stats: self.forget_me.stats(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
log::info!("Successfully sent forget me request to the gateway");
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Failed to send forget me request to the gateway: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
|
||||
log::debug!("MixTrafficController: Exiting");
|
||||
});
|
||||
|
||||
@@ -86,7 +86,9 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
|
||||
&mut self,
|
||||
message: ClientRequest,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
(**self).send_client_request(message).await
|
||||
let _ = (**self).send_client_request(message.clone()).await?;
|
||||
log::debug!("Sent client request: {:?}", message);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,14 +145,7 @@ where
|
||||
&mut self,
|
||||
message: ClientRequest,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
if let Some(shared_key) = self.gateway_client.shared_key() {
|
||||
self.gateway_client
|
||||
.send_websocket_message(message.encrypt(&*shared_key)?)
|
||||
.await?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(GatewayClientError::ConnectionInInvalidState)
|
||||
}
|
||||
self.gateway_client.send_client_request(message).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+15
-6
@@ -11,6 +11,7 @@ use nym_sphinx::{
|
||||
acknowledgements::{identifier::recover_identifier, AckKey},
|
||||
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
|
||||
};
|
||||
use nym_task::TaskClient;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Module responsible for listening for any data resembling acknowledgements from the network
|
||||
@@ -20,6 +21,7 @@ pub(super) struct AcknowledgementListener {
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
action_sender: AckActionSender,
|
||||
stats_tx: ClientStatsSender,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl AcknowledgementListener {
|
||||
@@ -28,12 +30,14 @@ impl AcknowledgementListener {
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
action_sender: AckActionSender,
|
||||
stats_tx: ClientStatsSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
AcknowledgementListener {
|
||||
ack_key,
|
||||
ack_receiver,
|
||||
action_sender,
|
||||
stats_tx,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,9 +68,14 @@ impl AcknowledgementListener {
|
||||
trace!("Received {} from the mix network", frag_id);
|
||||
self.stats_tx
|
||||
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
|
||||
self.action_sender
|
||||
if let Err(err) = self
|
||||
.action_sender
|
||||
.unbounded_send(Action::new_remove(frag_id))
|
||||
.unwrap();
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send remove action to action controller: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_ack_receiver_item(&mut self, item: Vec<Vec<u8>>) {
|
||||
@@ -76,10 +85,10 @@ impl AcknowledgementListener {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
|
||||
pub(super) async fn run(&mut self) {
|
||||
debug!("Started AcknowledgementListener with graceful shutdown support");
|
||||
|
||||
while !shutdown.is_shutdown() {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
acks = self.ack_receiver.next() => match acks {
|
||||
Some(acks) => self.handle_ack_receiver_item(acks).await,
|
||||
@@ -88,12 +97,12 @@ impl AcknowledgementListener {
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("AcknowledgementListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("AcknowledgementListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+15
-16
@@ -9,6 +9,7 @@ use log::*;
|
||||
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
|
||||
use nym_sphinx::chunking::fragment::FragmentIdentifier;
|
||||
use nym_sphinx::Delay as SphinxDelay;
|
||||
use nym_task::TaskClient;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -101,6 +102,8 @@ pub(super) struct ActionController {
|
||||
|
||||
/// Channel for notifying `RetransmissionRequestListener` about expired acknowledgements.
|
||||
retransmission_sender: RetransmissionRequestSender,
|
||||
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl ActionController {
|
||||
@@ -108,6 +111,7 @@ impl ActionController {
|
||||
config: Config,
|
||||
retransmission_sender: RetransmissionRequestSender,
|
||||
incoming_actions: AckActionReceiver,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
ActionController {
|
||||
config,
|
||||
@@ -115,6 +119,7 @@ impl ActionController {
|
||||
pending_acks_timers: NonExhaustiveDelayQueue::new(),
|
||||
incoming_actions,
|
||||
retransmission_sender,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,11 +221,7 @@ impl ActionController {
|
||||
}
|
||||
|
||||
// note: when the entry expires it's automatically removed from pending_acks_timers
|
||||
fn handle_expired_ack_timer(
|
||||
&mut self,
|
||||
expired_ack: Expired<FragmentIdentifier>,
|
||||
task_client: &mut nym_task::TaskClient,
|
||||
) {
|
||||
fn handle_expired_ack_timer(&mut self, expired_ack: Expired<FragmentIdentifier>) {
|
||||
// I'm honestly not sure how to handle it, because getting it means other things in our
|
||||
// system are already misbehaving. If we ever see this panic, then I guess we should worry
|
||||
// about it. Perhaps just reschedule it at later point?
|
||||
@@ -238,15 +239,13 @@ impl ActionController {
|
||||
// downgrading an arc and then upgrading vs cloning is difference of 30ns vs 15ns
|
||||
// so it's literally a NO difference while it might prevent us from unnecessarily
|
||||
// resending data (in maybe 1 in 1 million cases, but it's something)
|
||||
if self
|
||||
if let Err(err) = self
|
||||
.retransmission_sender
|
||||
.unbounded_send(Arc::downgrade(pending_ack_data))
|
||||
.is_err()
|
||||
{
|
||||
assert!(
|
||||
task_client.is_shutdown_poll(),
|
||||
"Failed to send pending ack for retransmission"
|
||||
);
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
log::error!("Failed to send pending ack for retransmission: {err}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// this shouldn't cause any issues but shouldn't have happened to begin with!
|
||||
@@ -265,10 +264,10 @@ impl ActionController {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
|
||||
pub(super) async fn run(&mut self) {
|
||||
debug!("Started ActionController with graceful shutdown support");
|
||||
|
||||
loop {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
action = self.incoming_actions.next() => match action {
|
||||
Some(action) => self.process_action(action),
|
||||
@@ -280,19 +279,19 @@ impl ActionController {
|
||||
}
|
||||
},
|
||||
expired_ack = self.pending_acks_timers.next() => match expired_ack {
|
||||
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack, &mut shutdown),
|
||||
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack),
|
||||
None => {
|
||||
log::trace!("ActionController: Stopping since ack channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("ActionController: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("ActionController: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+15
-5
@@ -11,6 +11,7 @@ use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use nym_task::TaskClient;
|
||||
use rand::{CryptoRng, Rng};
|
||||
|
||||
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
|
||||
@@ -23,6 +24,7 @@ where
|
||||
input_receiver: InputMessageReceiver,
|
||||
message_handler: MessageHandler<R>,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl<R> InputMessageListener<R>
|
||||
@@ -36,11 +38,13 @@ where
|
||||
input_receiver: InputMessageReceiver,
|
||||
message_handler: MessageHandler<R>,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
InputMessageListener {
|
||||
input_receiver,
|
||||
message_handler,
|
||||
reply_controller_sender,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,8 +67,14 @@ where
|
||||
lane: TransmissionLane,
|
||||
) {
|
||||
// offload reply handling to the dedicated task
|
||||
self.reply_controller_sender
|
||||
if let Err(err) = self
|
||||
.reply_controller_sender
|
||||
.send_reply(recipient_tag, data, lane)
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("failed to send a reply - {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_plain_message(
|
||||
@@ -164,10 +174,10 @@ where
|
||||
};
|
||||
}
|
||||
|
||||
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
|
||||
pub(super) async fn run(&mut self) {
|
||||
debug!("Started InputMessageListener with graceful shutdown support");
|
||||
|
||||
while !shutdown.is_shutdown() {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
input_msg = self.input_receiver.recv() => match input_msg {
|
||||
Some(input_msg) => {
|
||||
@@ -178,12 +188,12 @@ where
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("InputMessageListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("InputMessageListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+18
-27
@@ -24,6 +24,7 @@ use nym_sphinx::{
|
||||
Delay as SphinxDelay,
|
||||
};
|
||||
use nym_statistics_common::clients::ClientStatsSender;
|
||||
use nym_task::TaskClient;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::{
|
||||
sync::{Arc, Weak},
|
||||
@@ -66,7 +67,7 @@ pub(crate) enum PacketDestination {
|
||||
|
||||
/// Structure representing a data `Fragment` that is on-route to the specified `Recipient`
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct PendingAcknowledgement {
|
||||
pub struct PendingAcknowledgement {
|
||||
message_chunk: Fragment,
|
||||
delay: SphinxDelay,
|
||||
destination: PacketDestination,
|
||||
@@ -216,6 +217,7 @@ where
|
||||
message_handler: MessageHandler<R>,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
stats_tx: ClientStatsSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();
|
||||
|
||||
@@ -225,6 +227,7 @@ where
|
||||
action_config,
|
||||
retransmission_tx,
|
||||
connectors.ack_action_receiver,
|
||||
task_client.fork("action_controller"),
|
||||
);
|
||||
|
||||
// will listen for any acks coming from the network
|
||||
@@ -233,6 +236,7 @@ where
|
||||
connectors.ack_receiver,
|
||||
connectors.ack_action_sender.clone(),
|
||||
stats_tx,
|
||||
task_client.fork("acknowledgement_listener"),
|
||||
);
|
||||
|
||||
// will listen for any new messages from the client
|
||||
@@ -240,6 +244,7 @@ where
|
||||
connectors.input_receiver,
|
||||
message_handler.clone(),
|
||||
reply_controller_sender.clone(),
|
||||
task_client.fork("input_message_listener"),
|
||||
);
|
||||
|
||||
// will listen for any ack timeouts and trigger retransmission
|
||||
@@ -249,12 +254,16 @@ where
|
||||
message_handler,
|
||||
retransmission_rx,
|
||||
reply_controller_sender,
|
||||
task_client.fork("retransmission_request_listener"),
|
||||
);
|
||||
|
||||
// will listen for events indicating the packet was sent through the network so that
|
||||
// the retransmission timer should be started.
|
||||
let sent_notification_listener =
|
||||
SentNotificationListener::new(connectors.sent_notifier, connectors.ack_action_sender);
|
||||
let sent_notification_listener = SentNotificationListener::new(
|
||||
connectors.sent_notifier,
|
||||
connectors.ack_action_sender,
|
||||
task_client.with_suffix("sent_notification_listener"),
|
||||
);
|
||||
|
||||
AcknowledgementController {
|
||||
acknowledgement_listener,
|
||||
@@ -265,53 +274,35 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn start_with_shutdown(
|
||||
self,
|
||||
shutdown: nym_task::TaskClient,
|
||||
packet_type: PacketType,
|
||||
) {
|
||||
pub(super) fn start(self, packet_type: PacketType) {
|
||||
let mut acknowledgement_listener = self.acknowledgement_listener;
|
||||
let mut input_message_listener = self.input_message_listener;
|
||||
let mut retransmission_request_listener = self.retransmission_request_listener;
|
||||
let mut sent_notification_listener = self.sent_notification_listener;
|
||||
let mut action_controller = self.action_controller;
|
||||
|
||||
let shutdown_handle = shutdown.fork("acknowledgement_listener");
|
||||
spawn_future(async move {
|
||||
acknowledgement_listener
|
||||
.run_with_shutdown(shutdown_handle)
|
||||
.await;
|
||||
acknowledgement_listener.run().await;
|
||||
debug!("The acknowledgement listener has finished execution!");
|
||||
});
|
||||
|
||||
let shutdown_handle = shutdown.fork("input_message_listener");
|
||||
spawn_future(async move {
|
||||
input_message_listener
|
||||
.run_with_shutdown(shutdown_handle)
|
||||
.await;
|
||||
input_message_listener.run().await;
|
||||
debug!("The input listener has finished execution!");
|
||||
});
|
||||
|
||||
let shutdown_handle = shutdown.fork("retransmission_request_listener");
|
||||
spawn_future(async move {
|
||||
retransmission_request_listener
|
||||
.run_with_shutdown(shutdown_handle, packet_type)
|
||||
.await;
|
||||
retransmission_request_listener.run(packet_type).await;
|
||||
debug!("The retransmission request listener has finished execution!");
|
||||
});
|
||||
|
||||
let shutdown_handle = shutdown.fork("sent_notification_listener");
|
||||
spawn_future(async move {
|
||||
sent_notification_listener
|
||||
.run_with_shutdown(shutdown_handle)
|
||||
.await;
|
||||
sent_notification_listener.run().await;
|
||||
debug!("The sent notification listener has finished execution!");
|
||||
});
|
||||
|
||||
spawn_future(async move {
|
||||
action_controller
|
||||
.run_with_shutdown(shutdown.with_suffix("action_controller"))
|
||||
.await;
|
||||
action_controller.run().await;
|
||||
debug!("The controller has finished execution!");
|
||||
});
|
||||
}
|
||||
|
||||
+32
-17
@@ -14,7 +14,7 @@ use log::*;
|
||||
use nym_sphinx::chunking::fragment::Fragment;
|
||||
use nym_sphinx::preparer::PreparedFragment;
|
||||
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use nym_task::{connections::TransmissionLane, TaskClient};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
@@ -25,6 +25,7 @@ pub(super) struct RetransmissionRequestListener<R> {
|
||||
message_handler: MessageHandler<R>,
|
||||
request_receiver: RetransmissionRequestReceiver,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl<R> RetransmissionRequestListener<R>
|
||||
@@ -37,6 +38,7 @@ where
|
||||
message_handler: MessageHandler<R>,
|
||||
request_receiver: RetransmissionRequestReceiver,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
RetransmissionRequestListener {
|
||||
maximum_retransmissions,
|
||||
@@ -44,6 +46,7 @@ where
|
||||
message_handler,
|
||||
request_receiver,
|
||||
reply_controller_sender,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,9 +82,12 @@ where
|
||||
if let Some(limit) = self.maximum_retransmissions {
|
||||
if timed_out_ack.retransmissions >= limit {
|
||||
warn!("reached maximum number of allowed retransmissions for the packet");
|
||||
self.action_sender
|
||||
if let Err(err) = self
|
||||
.action_sender
|
||||
.unbounded_send(Action::new_remove(frag_id))
|
||||
.unwrap();
|
||||
{
|
||||
error!("Failed to send remove action to the controller: {err}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -93,11 +99,16 @@ where
|
||||
} => {
|
||||
// if this is retransmission for reply, offload it to the dedicated task
|
||||
// that deals with all the surbs
|
||||
return self.reply_controller_sender.send_retransmission_data(
|
||||
if let Err(err) = self.reply_controller_sender.send_retransmission_data(
|
||||
*recipient_tag,
|
||||
weak_timed_out_ack,
|
||||
*extra_surb_request,
|
||||
);
|
||||
) {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send retransmission data to the reply controller: {err}");
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
PacketDestination::KnownRecipient(recipient) => {
|
||||
self.prepare_normal_retransmission_chunk(
|
||||
@@ -114,9 +125,12 @@ where
|
||||
Err(err) => {
|
||||
warn!("Could not retransmit the packet - {err}");
|
||||
// we NEED to start timer here otherwise we will have this guy permanently stuck in memory
|
||||
self.action_sender
|
||||
if let Err(err) = self
|
||||
.action_sender
|
||||
.unbounded_send(Action::new_start_timer(frag_id))
|
||||
.unwrap();
|
||||
{
|
||||
error!("Failed to send start timer action to the controller: {err}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -141,9 +155,14 @@ where
|
||||
// is sent to the `OutQueueControl` and has gone through its internal queue
|
||||
// with the additional poisson delay.
|
||||
// And since Actions are executed in order `UpdateTimer` will HAVE TO be executed before `StartTimer`
|
||||
self.action_sender
|
||||
if let Err(err) = self
|
||||
.action_sender
|
||||
.unbounded_send(Action::new_update_pending_ack(frag_id, new_delay))
|
||||
.unwrap();
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send update pending ack action to the controller: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
// send to `OutQueueControl` to eventually send to the mix network
|
||||
self.message_handler
|
||||
@@ -157,14 +176,10 @@ where
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn run_with_shutdown(
|
||||
&mut self,
|
||||
mut shutdown: nym_task::TaskClient,
|
||||
packet_type: PacketType,
|
||||
) {
|
||||
pub(super) async fn run(&mut self, packet_type: PacketType) {
|
||||
debug!("Started RetransmissionRequestListener with graceful shutdown support");
|
||||
|
||||
while !shutdown.is_shutdown() {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
|
||||
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
|
||||
@@ -173,12 +188,12 @@ where
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = shutdown.recv() => {
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("RetransmissionRequestListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("RetransmissionRequestListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+15
-6
@@ -6,6 +6,7 @@ use super::SentPacketNotificationReceiver;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
|
||||
use nym_task::TaskClient;
|
||||
|
||||
/// Module responsible for starting up retransmission timers.
|
||||
/// It is required because when we send our packet to the `real traffic stream` controlled
|
||||
@@ -14,16 +15,19 @@ use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
|
||||
pub(super) struct SentNotificationListener {
|
||||
sent_notifier: SentPacketNotificationReceiver,
|
||||
action_sender: AckActionSender,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl SentNotificationListener {
|
||||
pub(super) fn new(
|
||||
sent_notifier: SentPacketNotificationReceiver,
|
||||
action_sender: AckActionSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
SentNotificationListener {
|
||||
sent_notifier,
|
||||
action_sender,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,15 +36,20 @@ impl SentNotificationListener {
|
||||
trace!("sent off a cover message - no need to start retransmission timer!");
|
||||
return;
|
||||
}
|
||||
self.action_sender
|
||||
if let Err(err) = self
|
||||
.action_sender
|
||||
.unbounded_send(Action::new_start_timer(frag_id))
|
||||
.unwrap();
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send start timer action to action controller: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
|
||||
pub(super) async fn run(&mut self) {
|
||||
debug!("Started SentNotificationListener with graceful shutdown support");
|
||||
|
||||
loop {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
frag_id = self.sent_notifier.next() => match frag_id {
|
||||
Some(frag_id) => {
|
||||
@@ -51,13 +60,13 @@ impl SentNotificationListener {
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("SentNotificationListener: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
assert!(shutdown.is_shutdown_poll());
|
||||
assert!(self.task_client.is_shutdown_poll());
|
||||
log::debug!("SentNotificationListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use nym_sphinx::params::{PacketSize, PacketType};
|
||||
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
|
||||
use nym_sphinx::Delay;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use nym_task::TaskClient;
|
||||
use nym_topology::{NymRouteProvider, NymTopologyError};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::collections::HashMap;
|
||||
@@ -149,12 +150,14 @@ pub(crate) struct MessageHandler<R> {
|
||||
topology_access: TopologyAccessor,
|
||||
reply_key_storage: SentReplyKeys,
|
||||
tag_storage: UsedSenderTags,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl<R> MessageHandler<R>
|
||||
where
|
||||
R: CryptoRng + Rng,
|
||||
{
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
config: Config,
|
||||
rng: R,
|
||||
@@ -163,6 +166,7 @@ where
|
||||
topology_access: TopologyAccessor,
|
||||
reply_key_storage: SentReplyKeys,
|
||||
tag_storage: UsedSenderTags,
|
||||
task_client: TaskClient,
|
||||
) -> Self
|
||||
where
|
||||
R: Copy,
|
||||
@@ -183,6 +187,7 @@ where
|
||||
topology_access,
|
||||
reply_key_storage,
|
||||
tag_storage,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -609,15 +614,25 @@ where
|
||||
}
|
||||
|
||||
pub(crate) fn update_ack_delay(&self, id: FragmentIdentifier, new_delay: Delay) {
|
||||
self.action_sender
|
||||
if let Err(err) = self
|
||||
.action_sender
|
||||
.unbounded_send(Action::UpdatePendingAck(id, new_delay))
|
||||
.expect("action control task has died")
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send update action to the controller: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn insert_pending_acks(&self, pending_acks: Vec<PendingAcknowledgement>) {
|
||||
self.action_sender
|
||||
if let Err(err) = self
|
||||
.action_sender
|
||||
.unbounded_send(Action::new_insert(pending_acks))
|
||||
.expect("action control task has died")
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to send insert action to the controller: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// tells real message sender (with the poisson timer) to send this to the mix network
|
||||
@@ -626,9 +641,14 @@ where
|
||||
messages: Vec<RealMessage>,
|
||||
transmission_lane: TransmissionLane,
|
||||
) {
|
||||
self.real_message_sender
|
||||
if let Err(err) = self
|
||||
.real_message_sender
|
||||
.send((messages, transmission_lane))
|
||||
.await
|
||||
.expect("real message receiver task (OutQueueControl) has died");
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("Failed to forward messages to the real message sender: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::params::PacketType;
|
||||
use nym_statistics_common::clients::ClientStatsSender;
|
||||
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
|
||||
use nym_task::TaskClient;
|
||||
use rand::{rngs::OsRng, CryptoRng, Rng};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -147,6 +148,7 @@ impl RealMessagesController<OsRng> {
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
client_connection_rx: ConnectionCommandReceiver,
|
||||
stats_tx: ClientStatsSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
let rng = OsRng;
|
||||
|
||||
@@ -177,6 +179,7 @@ impl RealMessagesController<OsRng> {
|
||||
topology_access.clone(),
|
||||
reply_storage.key_storage(),
|
||||
reply_storage.tags_storage(),
|
||||
task_client.fork("message_handler"),
|
||||
);
|
||||
|
||||
let ack_control = AcknowledgementController::new(
|
||||
@@ -186,6 +189,7 @@ impl RealMessagesController<OsRng> {
|
||||
message_handler.clone(),
|
||||
reply_controller_sender,
|
||||
stats_tx.clone(),
|
||||
task_client.fork("ack_control"),
|
||||
);
|
||||
|
||||
let reply_control = ReplyController::new(
|
||||
@@ -193,6 +197,7 @@ impl RealMessagesController<OsRng> {
|
||||
message_handler,
|
||||
reply_storage,
|
||||
reply_controller_receiver,
|
||||
task_client.fork("reply_controller"),
|
||||
);
|
||||
|
||||
let out_queue_control = OutQueueControl::new(
|
||||
@@ -205,6 +210,7 @@ impl RealMessagesController<OsRng> {
|
||||
lane_queue_lengths,
|
||||
client_connection_rx,
|
||||
stats_tx,
|
||||
task_client.with_suffix("out_queue_control"),
|
||||
);
|
||||
|
||||
RealMessagesController {
|
||||
@@ -214,22 +220,20 @@ impl RealMessagesController<OsRng> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_with_shutdown(self, shutdown: nym_task::TaskClient, packet_type: PacketType) {
|
||||
pub fn start(self, packet_type: PacketType) {
|
||||
let mut out_queue_control = self.out_queue_control;
|
||||
let ack_control = self.ack_control;
|
||||
let mut reply_control = self.reply_control;
|
||||
|
||||
let shutdown_handle = shutdown.fork("out_queue_control");
|
||||
spawn_future(async move {
|
||||
out_queue_control.run_with_shutdown(shutdown_handle).await;
|
||||
out_queue_control.run().await;
|
||||
debug!("The out queue controller has finished execution!");
|
||||
});
|
||||
let shutdown_handle = shutdown.fork("reply_control");
|
||||
spawn_future(async move {
|
||||
reply_control.run_with_shutdown(shutdown_handle).await;
|
||||
reply_control.run().await;
|
||||
debug!("The reply controller has finished execution!");
|
||||
});
|
||||
|
||||
ack_control.start_with_shutdown(shutdown.with_suffix("ack_control"), packet_type);
|
||||
ack_control.start(packet_type);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, C
|
||||
use nym_task::connections::{
|
||||
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
|
||||
};
|
||||
use nym_task::TaskClient;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
@@ -117,6 +118,8 @@ where
|
||||
|
||||
/// Channel used for sending metrics events (specifically `PacketStatistics` events) to the metrics tracker.
|
||||
stats_tx: ClientStatsSender,
|
||||
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -176,6 +179,7 @@ where
|
||||
lane_queue_lengths: LaneQueueLengths,
|
||||
client_connection_rx: ConnectionCommandReceiver,
|
||||
stats_tx: ClientStatsSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
OutQueueControl {
|
||||
config,
|
||||
@@ -190,6 +194,7 @@ where
|
||||
client_connection_rx,
|
||||
lane_queue_lengths,
|
||||
stats_tx,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,7 +203,9 @@ where
|
||||
// queues and client load rather than the required delay. So realistically we can treat
|
||||
// whatever is about to happen as negligible additional delay.
|
||||
trace!("{} is about to get sent to the mixnet", frag_id);
|
||||
self.sent_notifier.unbounded_send(frag_id).unwrap();
|
||||
if let Err(err) = self.sent_notifier.unbounded_send(frag_id) {
|
||||
error!("Failed to notify about sent message: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
fn loop_cover_message_size(&mut self) -> PacketSize {
|
||||
@@ -271,7 +278,9 @@ where
|
||||
};
|
||||
|
||||
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
|
||||
log::error!("Failed to send: {err}");
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
log::error!("Failed to send: {err}");
|
||||
}
|
||||
} else {
|
||||
let event = if fragment_id.is_some() {
|
||||
PacketStatisticsEvent::RealPacketSent(packet_size)
|
||||
@@ -504,7 +513,7 @@ where
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
fn log_status(&self, shutdown: &mut nym_task::TaskClient) {
|
||||
fn log_status(&self, shutdown: &mut TaskClient) {
|
||||
use crate::error::ClientCoreStatusMessage;
|
||||
|
||||
let packets = self.transmission_buffer.total_size();
|
||||
@@ -535,17 +544,19 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
|
||||
pub(super) async fn run(&mut self) {
|
||||
debug!("Started OutQueueControl with graceful shutdown support");
|
||||
|
||||
let mut shutdown = self.task_client.fork("select");
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
|
||||
|
||||
loop {
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("OutQueueControl: Received shutdown");
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use nym_sphinx::message::{NymMessage, PlainMessage};
|
||||
use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
|
||||
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
|
||||
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
|
||||
use nym_task::TaskClient;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -152,6 +153,7 @@ struct ReceivedMessagesBuffer<R: MessageReceiver> {
|
||||
inner: Arc<Mutex<ReceivedMessagesBufferInner<R>>>,
|
||||
reply_key_storage: SentReplyKeys,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
@@ -160,6 +162,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
reply_key_storage: SentReplyKeys,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
stats_tx: ClientStatsSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
ReceivedMessagesBuffer {
|
||||
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
|
||||
@@ -172,6 +175,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
})),
|
||||
reply_key_storage,
|
||||
reply_controller_sender,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,11 +261,15 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
}
|
||||
};
|
||||
|
||||
self.reply_controller_sender.send_additional_surbs(
|
||||
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
|
||||
msg.sender_tag,
|
||||
reply_surbs,
|
||||
from_surb_request,
|
||||
)
|
||||
) {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("{err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
reconstructed
|
||||
}
|
||||
@@ -276,8 +284,14 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
ReplyMessageContent::Data { message } => reconstructed.push(message.into()),
|
||||
ReplyMessageContent::SurbRequest { recipient, amount } => {
|
||||
debug!("received request for {amount} additional reply SURBs from {recipient}");
|
||||
self.reply_controller_sender
|
||||
.send_additional_surbs_request(*recipient, amount);
|
||||
if let Err(err) = self
|
||||
.reply_controller_sender
|
||||
.send_additional_surbs_request(*recipient, amount)
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("{err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -399,16 +413,19 @@ pub enum ReceivedBufferMessage {
|
||||
struct RequestReceiver<R: MessageReceiver> {
|
||||
received_buffer: ReceivedMessagesBuffer<R>,
|
||||
query_receiver: ReceivedBufferRequestReceiver,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl<R: MessageReceiver> RequestReceiver<R> {
|
||||
fn new(
|
||||
received_buffer: ReceivedMessagesBuffer<R>,
|
||||
query_receiver: ReceivedBufferRequestReceiver,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
RequestReceiver {
|
||||
received_buffer,
|
||||
query_receiver,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -423,12 +440,12 @@ impl<R: MessageReceiver> RequestReceiver<R> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
|
||||
async fn run(&mut self) {
|
||||
debug!("Started RequestReceiver with graceful shutdown support");
|
||||
while !shutdown.is_shutdown() {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("RequestReceiver: Received shutdown");
|
||||
}
|
||||
request = self.query_receiver.next() => {
|
||||
@@ -441,7 +458,7 @@ impl<R: MessageReceiver> RequestReceiver<R> {
|
||||
},
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
self.task_client.recv().await;
|
||||
log::debug!("RequestReceiver: Exiting");
|
||||
}
|
||||
}
|
||||
@@ -449,25 +466,25 @@ impl<R: MessageReceiver> RequestReceiver<R> {
|
||||
struct FragmentedMessageReceiver<R: MessageReceiver> {
|
||||
received_buffer: ReceivedMessagesBuffer<R>,
|
||||
mixnet_packet_receiver: MixnetMessageReceiver,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
|
||||
fn new(
|
||||
received_buffer: ReceivedMessagesBuffer<R>,
|
||||
mixnet_packet_receiver: MixnetMessageReceiver,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
FragmentedMessageReceiver {
|
||||
received_buffer,
|
||||
mixnet_packet_receiver,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_with_shutdown(
|
||||
&mut self,
|
||||
mut shutdown: nym_task::TaskClient,
|
||||
) -> Result<(), MessageRecoveryError> {
|
||||
async fn run(&mut self) -> Result<(), MessageRecoveryError> {
|
||||
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
|
||||
while !shutdown.is_shutdown() {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
new_messages = self.mixnet_packet_receiver.next() => {
|
||||
if let Some(new_messages) = new_messages {
|
||||
@@ -477,12 +494,12 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = self.task_client.recv_with_delay() => {
|
||||
log::trace!("FragmentedMessageReceiver: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("FragmentedMessageReceiver: Exiting");
|
||||
Ok(())
|
||||
}
|
||||
@@ -501,41 +518,42 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
|
||||
reply_key_storage: SentReplyKeys,
|
||||
reply_controller_sender: ReplyControllerSender,
|
||||
metrics_reporter: ClientStatsSender,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
let received_buffer = ReceivedMessagesBuffer::new(
|
||||
local_encryption_keypair,
|
||||
reply_key_storage,
|
||||
reply_controller_sender,
|
||||
metrics_reporter,
|
||||
task_client.fork("received_messages_buffer"),
|
||||
);
|
||||
|
||||
ReceivedMessagesBufferController {
|
||||
fragmented_message_receiver: FragmentedMessageReceiver::new(
|
||||
received_buffer.clone(),
|
||||
mixnet_packet_receiver,
|
||||
task_client.fork("fragmented_message_receiver"),
|
||||
),
|
||||
request_receiver: RequestReceiver::new(
|
||||
received_buffer,
|
||||
query_receiver,
|
||||
task_client.with_suffix("request_receiver"),
|
||||
),
|
||||
request_receiver: RequestReceiver::new(received_buffer, query_receiver),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_with_shutdown(self, shutdown: nym_task::TaskClient) {
|
||||
pub fn start(self) {
|
||||
let mut fragmented_message_receiver = self.fragmented_message_receiver;
|
||||
let mut request_receiver = self.request_receiver;
|
||||
|
||||
let shutdown_handle = shutdown.fork("fragmented_message_receiver");
|
||||
spawn_future(async move {
|
||||
match fragmented_message_receiver
|
||||
.run_with_shutdown(shutdown_handle)
|
||||
.await
|
||||
{
|
||||
match fragmented_message_receiver.run().await {
|
||||
Ok(_) => {}
|
||||
Err(e) => error!("{e}"),
|
||||
}
|
||||
});
|
||||
spawn_future(async move {
|
||||
request_receiver
|
||||
.run_with_shutdown(shutdown.with_suffix("request_receiver"))
|
||||
.await;
|
||||
request_receiver.run().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::anonymous_replies::ReplySurb;
|
||||
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
|
||||
use nym_task::connections::{ConnectionId, TransmissionLane};
|
||||
use nym_task::TaskClient;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::btree_map::Entry;
|
||||
@@ -68,6 +69,9 @@ pub struct ReplyController<R> {
|
||||
|
||||
message_handler: MessageHandler<R>,
|
||||
full_reply_storage: CombinedReplyStorage,
|
||||
|
||||
// Listen for shutdown signals
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl<R> ReplyController<R>
|
||||
@@ -79,6 +83,7 @@ where
|
||||
message_handler: MessageHandler<R>,
|
||||
full_reply_storage: CombinedReplyStorage,
|
||||
request_receiver: ReplyControllerReceiver,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
ReplyController {
|
||||
config,
|
||||
@@ -87,6 +92,7 @@ where
|
||||
pending_retransmissions: HashMap::new(),
|
||||
message_handler,
|
||||
full_reply_storage,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -846,9 +852,11 @@ where
|
||||
// todo!()
|
||||
// }
|
||||
|
||||
pub(crate) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
|
||||
pub(crate) async fn run(&mut self) {
|
||||
debug!("Started ReplyController with graceful shutdown support");
|
||||
|
||||
let mut shutdown = self.task_client.fork("select");
|
||||
|
||||
let polling_rate = Duration::from_secs(5);
|
||||
let mut stale_inspection = new_interval_stream(polling_rate);
|
||||
|
||||
@@ -860,7 +868,7 @@ where
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv_with_delay() => {
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("ReplyController: Received shutdown");
|
||||
},
|
||||
req = self.request_receiver.next() => match req {
|
||||
|
||||
@@ -15,6 +15,27 @@ pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerR
|
||||
(tx.into(), rx)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ReplyControllerSenderError {
|
||||
#[error("failed to send retransmission data to reply controller")]
|
||||
SendRetransmissionData(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
|
||||
#[error("failed to send reply to reply controller")]
|
||||
SendReply(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
|
||||
#[error("failed to send additional surbs to reply controller")]
|
||||
AdditionalSurbs(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
|
||||
#[error("failed to send additional surbs request to reply controller")]
|
||||
AdditionalSurbsRequest(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
|
||||
#[error("failed to request lane queue length from reply controller")]
|
||||
LaneQueueLength(#[source] mpsc::TrySendError<ReplyControllerMessage>),
|
||||
|
||||
#[error("response channel was dropped before we could receive the response")]
|
||||
ResponseChannelDropped(#[source] oneshot::Canceled),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ReplyControllerSender(mpsc::UnboundedSender<ReplyControllerMessage>);
|
||||
|
||||
@@ -30,14 +51,14 @@ impl ReplyControllerSender {
|
||||
recipient: AnonymousSenderTag,
|
||||
timed_out_ack: Weak<PendingAcknowledgement>,
|
||||
extra_surb_request: bool,
|
||||
) {
|
||||
) -> Result<(), ReplyControllerSenderError> {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::RetransmitReply {
|
||||
recipient,
|
||||
timed_out_ack,
|
||||
extra_surb_request,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
.map_err(ReplyControllerSenderError::SendRetransmissionData)
|
||||
}
|
||||
|
||||
pub(crate) fn send_reply(
|
||||
@@ -45,14 +66,14 @@ impl ReplyControllerSender {
|
||||
recipient: AnonymousSenderTag,
|
||||
message: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
) {
|
||||
) -> Result<(), ReplyControllerSenderError> {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::SendReply {
|
||||
recipient,
|
||||
message,
|
||||
lane,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
.map_err(ReplyControllerSenderError::SendReply)
|
||||
}
|
||||
|
||||
pub(crate) fn send_additional_surbs(
|
||||
@@ -60,42 +81,47 @@ impl ReplyControllerSender {
|
||||
sender_tag: AnonymousSenderTag,
|
||||
reply_surbs: Vec<ReplySurb>,
|
||||
from_surb_request: bool,
|
||||
) {
|
||||
) -> Result<(), ReplyControllerSenderError> {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::AdditionalSurbs {
|
||||
sender_tag,
|
||||
reply_surbs,
|
||||
from_surb_request,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
.map_err(ReplyControllerSenderError::AdditionalSurbs)
|
||||
}
|
||||
|
||||
pub(crate) fn send_additional_surbs_request(&self, recipient: Recipient, amount: u32) {
|
||||
pub(crate) fn send_additional_surbs_request(
|
||||
&self,
|
||||
recipient: Recipient,
|
||||
amount: u32,
|
||||
) -> Result<(), ReplyControllerSenderError> {
|
||||
self.0
|
||||
.unbounded_send(ReplyControllerMessage::AdditionalSurbsRequest {
|
||||
recipient: Box::new(recipient),
|
||||
amount,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!")
|
||||
.map_err(ReplyControllerSenderError::AdditionalSurbsRequest)
|
||||
}
|
||||
|
||||
pub async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> usize {
|
||||
pub async fn get_lane_queue_length(
|
||||
&self,
|
||||
connection_id: ConnectionId,
|
||||
) -> Result<usize, ReplyControllerSenderError> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.0
|
||||
if let Err(err) = self
|
||||
.0
|
||||
.unbounded_send(ReplyControllerMessage::LaneQueueLength {
|
||||
connection_id,
|
||||
response_channel: response_tx,
|
||||
})
|
||||
.expect("ReplyControllerReceiver has died!");
|
||||
|
||||
match response_rx.await {
|
||||
Ok(length) => length,
|
||||
Err(_) => {
|
||||
error!("The reply controller has dropped our response channel!");
|
||||
// TODO: should we panic here instead? this message implies something weird and unrecoverable has happened
|
||||
0
|
||||
}
|
||||
{
|
||||
return Err(ReplyControllerSenderError::LaneQueueLength(err));
|
||||
}
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(ReplyControllerSenderError::ResponseChannelDropped)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,7 +136,10 @@ impl ReplyQueueLengths {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_lane_queue_length(&self, connection_id: ConnectionId) -> usize {
|
||||
pub async fn get_lane_queue_length(
|
||||
&self,
|
||||
connection_id: ConnectionId,
|
||||
) -> Result<usize, ReplyControllerSenderError> {
|
||||
self.reply_controller_sender
|
||||
.get_lane_queue_length(connection_id)
|
||||
.await
|
||||
@@ -120,7 +149,7 @@ impl ReplyQueueLengths {
|
||||
pub(crate) type ReplyControllerReceiver = mpsc::UnboundedReceiver<ReplyControllerMessage>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ReplyControllerMessage {
|
||||
pub enum ReplyControllerMessage {
|
||||
RetransmitReply {
|
||||
recipient: AnonymousSenderTag,
|
||||
timed_out_ack: Weak<PendingAcknowledgement>,
|
||||
|
||||
@@ -22,7 +22,7 @@ use nym_sphinx::addressing::Recipient;
|
||||
use nym_statistics_common::clients::{
|
||||
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
|
||||
};
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use nym_task::{connections::TransmissionLane, TaskClient};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::{
|
||||
@@ -51,6 +51,9 @@ pub(crate) struct StatisticsControl {
|
||||
|
||||
/// Config for stats reporting (enabled, address, interval)
|
||||
reporting_config: StatsReporting,
|
||||
|
||||
/// Task client for listening for shutdown
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl StatisticsControl {
|
||||
@@ -59,19 +62,24 @@ impl StatisticsControl {
|
||||
client_type: String,
|
||||
client_stats_id: String,
|
||||
report_tx: InputMessageSender,
|
||||
task_client: TaskClient,
|
||||
) -> (Self, ClientStatsSender) {
|
||||
let (stats_tx, stats_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let stats = ClientStatsController::new(client_stats_id, client_type);
|
||||
|
||||
let mut task_client_stats_sender = task_client.fork("stats_sender");
|
||||
task_client_stats_sender.disarm();
|
||||
|
||||
(
|
||||
StatisticsControl {
|
||||
stats,
|
||||
stats_rx,
|
||||
report_tx,
|
||||
reporting_config,
|
||||
task_client,
|
||||
},
|
||||
ClientStatsSender::new(Some(stats_tx)),
|
||||
ClientStatsSender::new(Some(stats_tx), task_client_stats_sender),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -91,7 +99,7 @@ impl StatisticsControl {
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_with_shutdown(&mut self, mut task_client: nym_task::TaskClient) {
|
||||
async fn run(&mut self) {
|
||||
log::debug!("Started StatisticsControl with graceful shutdown support");
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
@@ -121,8 +129,13 @@ impl StatisticsControl {
|
||||
let mut snapshot_interval =
|
||||
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
|
||||
|
||||
loop {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("StatisticsControl: Received shutdown");
|
||||
break;
|
||||
},
|
||||
stats_event = self.stats_rx.recv() => match stats_event {
|
||||
Some(stats_event) => self.stats.handle_event(stats_event),
|
||||
None => {
|
||||
@@ -144,34 +157,34 @@ impl StatisticsControl {
|
||||
}
|
||||
|
||||
_ = local_report_interval.next() => {
|
||||
self.stats.local_report(&mut task_client);
|
||||
self.stats.local_report(&mut self.task_client);
|
||||
}
|
||||
_ = task_client.recv_with_delay() => {
|
||||
log::trace!("StatisticsControl: Received shutdown");
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
task_client.recv_timeout().await;
|
||||
log::debug!("StatisticsControl: Exiting");
|
||||
}
|
||||
|
||||
pub(crate) fn start_with_shutdown(mut self, task_client: nym_task::TaskClient) {
|
||||
pub(crate) fn start(mut self) {
|
||||
spawn_future(async move {
|
||||
self.run_with_shutdown(task_client).await;
|
||||
self.run().await;
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn create_and_start_with_shutdown(
|
||||
pub(crate) fn create_and_start(
|
||||
reporting_config: StatsReporting,
|
||||
client_type: String,
|
||||
client_stats_id: String,
|
||||
report_tx: InputMessageSender,
|
||||
task_client: nym_task::TaskClient,
|
||||
task_client: TaskClient,
|
||||
) -> ClientStatsSender {
|
||||
let (controller, sender) =
|
||||
Self::create(reporting_config, client_type, client_stats_id, report_tx);
|
||||
controller.start_with_shutdown(task_client);
|
||||
let (controller, sender) = Self::create(
|
||||
reporting_config,
|
||||
client_type,
|
||||
client_stats_id,
|
||||
report_tx,
|
||||
task_client,
|
||||
);
|
||||
controller.start();
|
||||
sender
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_sphinx::addressing::nodes::NodeIdentity;
|
||||
use nym_task::TaskClient;
|
||||
use nym_topology::NymTopologyError;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -43,6 +44,8 @@ pub struct TopologyRefresher {
|
||||
|
||||
refresh_rate: Duration,
|
||||
consecutive_failure_count: usize,
|
||||
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl TopologyRefresher {
|
||||
@@ -50,12 +53,14 @@ impl TopologyRefresher {
|
||||
cfg: TopologyRefresherConfig,
|
||||
topology_accessor: TopologyAccessor,
|
||||
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
TopologyRefresher {
|
||||
topology_provider,
|
||||
topology_accessor,
|
||||
refresh_rate: cfg.refresh_rate,
|
||||
consecutive_failure_count: 0,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,7 +147,7 @@ impl TopologyRefresher {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_with_shutdown(mut self, mut shutdown: nym_task::TaskClient) {
|
||||
pub fn start(mut self) {
|
||||
spawn_future(async move {
|
||||
debug!("Started TopologyRefresher with graceful shutdown support");
|
||||
|
||||
@@ -155,17 +160,17 @@ impl TopologyRefresher {
|
||||
let mut interval =
|
||||
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
|
||||
|
||||
while !shutdown.is_shutdown() {
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
_ = interval.next() => {
|
||||
self.try_refresh().await;
|
||||
},
|
||||
_ = shutdown.recv() => {
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("TopologyRefresher: Received shutdown");
|
||||
},
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("TopologyRefresher: Exiting");
|
||||
})
|
||||
}
|
||||
|
||||
@@ -36,6 +36,13 @@ pub enum ClientCoreError {
|
||||
#[error("no gateway with id: {0}")]
|
||||
NoGatewayWithId(String),
|
||||
|
||||
#[error("Invalid URL: {0}")]
|
||||
InvalidUrl(String),
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[error("resolution failed: {0}")]
|
||||
ResolutionFailed(#[from] nym_http_api_client::HickoryDnsError),
|
||||
|
||||
#[error("no gateways on network")]
|
||||
NoGatewaysOnNetwork,
|
||||
|
||||
@@ -96,6 +103,9 @@ pub enum ClientCoreError {
|
||||
#[error("timed out while trying to establish gateway connection")]
|
||||
GatewayConnectionTimeout,
|
||||
|
||||
#[error("failed to forward mix messages to gateway")]
|
||||
GatewayFailedToForwardMessages,
|
||||
|
||||
#[error("no ping measurements for the gateway ({identity}) performed")]
|
||||
NoGatewayMeasurements { identity: String },
|
||||
|
||||
@@ -212,6 +222,9 @@ pub enum ClientCoreError {
|
||||
"fresh registration with gateway {gateway_id} somehow requires an additional key upgrade!"
|
||||
)]
|
||||
UnexpectedKeyUpgrade { gateway_id: String },
|
||||
|
||||
#[error("failed to derive keys from master key")]
|
||||
HkdfDerivationError {},
|
||||
}
|
||||
|
||||
/// Set of messages that the client can send to listeners via the task manager
|
||||
|
||||
@@ -15,6 +15,9 @@ use std::{sync::Arc, time::Duration};
|
||||
use tungstenite::Message;
|
||||
use url::Url;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use crate::init::websockets::connect_async;
|
||||
|
||||
use nym_topology::NodeId;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::net::TcpStream;
|
||||
@@ -23,8 +26,6 @@ use tokio::time::sleep;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time::Instant;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio_tungstenite::connect_async;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_utils::websocket::JSWebsocket;
|
||||
@@ -91,6 +92,7 @@ pub async fn gateways_for_init<R: Rng>(
|
||||
nym_apis: &[Url],
|
||||
user_agent: Option<UserAgent>,
|
||||
minimum_performance: u8,
|
||||
ignore_epoch_roles: bool,
|
||||
) -> Result<Vec<RoutingNode>, ClientCoreError> {
|
||||
let nym_api = nym_apis
|
||||
.choose(rng)
|
||||
@@ -112,7 +114,7 @@ pub async fn gateways_for_init<R: Rng>(
|
||||
// (we don't want instability)
|
||||
let valid_gateways = gateways
|
||||
.iter()
|
||||
.filter(|g| !g.supported_roles.mixnode)
|
||||
.filter(|g| ignore_epoch_roles || !g.supported_roles.mixnode)
|
||||
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
|
||||
.filter_map(|gateway| gateway.try_into().ok())
|
||||
.collect::<Vec<_>>();
|
||||
@@ -131,7 +133,7 @@ pub async fn gateways_for_init<R: Rng>(
|
||||
async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
|
||||
match tokio::time::timeout(CONN_TIMEOUT, connect_async(endpoint)).await {
|
||||
Err(_elapsed) => Err(ClientCoreError::GatewayConnectionTimeout),
|
||||
Ok(Err(conn_failure)) => Err(conn_failure.into()),
|
||||
Ok(Err(conn_failure)) => Err(conn_failure),
|
||||
Ok(Ok((stream, _))) => Ok(stream),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,8 @@ use serde::Serialize;
|
||||
|
||||
pub mod helpers;
|
||||
pub mod types;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub(crate) mod websockets;
|
||||
|
||||
// helpers for error wrapping
|
||||
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
use crate::error::ClientCoreError;
|
||||
|
||||
use nym_http_api_client::HickoryDnsResolver;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
use tungstenite::handshake::client::Response;
|
||||
use url::{Host, Url};
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub(crate) async fn connect_async(
|
||||
endpoint: &str,
|
||||
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), ClientCoreError> {
|
||||
let resolver = HickoryDnsResolver::default();
|
||||
let uri = Url::parse(endpoint).map_err(|_| ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
|
||||
let port: u16 = uri.port_or_known_default().unwrap_or(443);
|
||||
|
||||
let host = uri
|
||||
.host()
|
||||
.ok_or(ClientCoreError::InvalidUrl(endpoint.to_owned()))?;
|
||||
|
||||
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
|
||||
// the default std resolve
|
||||
let sock_addrs: Vec<SocketAddr> = match host {
|
||||
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
|
||||
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
|
||||
Host::Domain(domain) => {
|
||||
// Do a DNS lookup for the domain using our custom DNS resolver
|
||||
resolver
|
||||
.resolve_str(domain)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|a| SocketAddr::new(a, port))
|
||||
.collect()
|
||||
}
|
||||
};
|
||||
|
||||
let stream = TcpStream::connect(&sock_addrs[..]).await?;
|
||||
|
||||
tokio_tungstenite::client_async_tls(endpoint, stream)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
@@ -33,48 +33,3 @@ where
|
||||
{
|
||||
tokio::spawn(future);
|
||||
}
|
||||
|
||||
#[derive(Clone, Default, Debug)]
|
||||
pub struct ForgetMe {
|
||||
client: bool,
|
||||
stats: bool,
|
||||
}
|
||||
|
||||
impl ForgetMe {
|
||||
pub fn new_all() -> Self {
|
||||
Self {
|
||||
client: true,
|
||||
stats: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_client() -> Self {
|
||||
Self {
|
||||
client: true,
|
||||
stats: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_stats() -> Self {
|
||||
Self {
|
||||
client: false,
|
||||
stats: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(client: bool, stats: bool) -> Self {
|
||||
Self { client, stats }
|
||||
}
|
||||
|
||||
pub fn any(&self) -> bool {
|
||||
self.client || self.stats
|
||||
}
|
||||
|
||||
pub fn client(&self) -> bool {
|
||||
self.client
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> bool {
|
||||
self.stats
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ nym-credential-storage = { path = "../../credential-storage" }
|
||||
nym-credentials-interface = { path = "../../credentials-interface" }
|
||||
nym-crypto = { path = "../../crypto" }
|
||||
nym-gateway-requests = { path = "../../gateway-requests" }
|
||||
nym-http-api-client = { path = "../../http-api-client" }
|
||||
nym-network-defaults = { path = "../../network-defaults" }
|
||||
nym-sphinx = { path = "../../nymsphinx" }
|
||||
nym-statistics-common = { path = "../../statistics" }
|
||||
|
||||
@@ -40,8 +40,6 @@ use url::Url;
|
||||
use std::os::fd::RawFd;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time::sleep;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio_tungstenite::connect_async;
|
||||
|
||||
#[cfg(not(unix))]
|
||||
use std::os::raw::c_int as RawFd;
|
||||
@@ -53,6 +51,11 @@ use zeroize::Zeroizing;
|
||||
|
||||
pub mod config;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub(crate) mod websockets;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use websockets::connect_async;
|
||||
|
||||
pub struct GatewayConfig {
|
||||
pub gateway_identity: identity::PublicKey,
|
||||
|
||||
@@ -201,23 +204,15 @@ impl<C, St> GatewayClient<C, St> {
|
||||
"Attemting to establish connection to gateway at: {}",
|
||||
self.gateway_address
|
||||
);
|
||||
let ws_stream = match connect_async(&self.gateway_address).await {
|
||||
Ok((ws_stream, _)) => ws_stream,
|
||||
Err(error) => {
|
||||
return Err(GatewayClientError::NetworkConnectionFailed {
|
||||
address: self.gateway_address.clone(),
|
||||
source: error,
|
||||
})
|
||||
}
|
||||
};
|
||||
let (ws_stream, _) = connect_async(
|
||||
&self.gateway_address,
|
||||
#[cfg(unix)]
|
||||
self.connection_fd_callback.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.connection = SocketState::Available(Box::new(ws_stream));
|
||||
|
||||
#[cfg(unix)]
|
||||
if let (Some(callback), Some(fd)) = (self.connection_fd_callback.as_ref(), self.ws_fd()) {
|
||||
callback.as_ref()(fd);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -271,6 +266,19 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_client_request(
|
||||
&mut self,
|
||||
message: ClientRequest,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
if let Some(shared_key) = self.shared_key() {
|
||||
let encrypted = message.encrypt(&*shared_key)?;
|
||||
Box::pin(self.send_websocket_message(encrypted)).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(GatewayClientError::ConnectionInInvalidState)
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_control_response(&mut self) -> Result<ServerResponse, GatewayClientError> {
|
||||
// we use the fact that all request responses are Message::Text and only pushed
|
||||
// sphinx packets are Message::Binary
|
||||
@@ -1045,7 +1053,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
|
||||
connection: SocketState::NotConnected,
|
||||
packet_router,
|
||||
bandwidth_controller: None,
|
||||
stats_reporter: ClientStatsSender::new(None),
|
||||
stats_reporter: ClientStatsSender::new(None, task_client.clone()),
|
||||
negotiated_protocol: None,
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback: None,
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
use crate::error::GatewayClientError;
|
||||
|
||||
use nym_http_api_client::HickoryDnsResolver;
|
||||
#[cfg(unix)]
|
||||
use std::{
|
||||
os::fd::{AsRawFd, RawFd},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
use tungstenite::handshake::client::Response;
|
||||
use url::{Host, Url};
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub(crate) async fn connect_async(
|
||||
endpoint: &str,
|
||||
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
|
||||
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
|
||||
use tokio::net::TcpSocket;
|
||||
|
||||
let resolver = HickoryDnsResolver::default();
|
||||
let uri =
|
||||
Url::parse(endpoint).map_err(|_| GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
|
||||
let port: u16 = uri.port_or_known_default().unwrap_or(443);
|
||||
|
||||
let host = uri
|
||||
.host()
|
||||
.ok_or(GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
|
||||
|
||||
// Get address for tcp connection, if a domain is provided use our preferred resolver rather than
|
||||
// the default std resolve
|
||||
let sock_addrs: Vec<SocketAddr> = match host {
|
||||
Host::Ipv4(addr) => vec![SocketAddr::new(addr.into(), port)],
|
||||
Host::Ipv6(addr) => vec![SocketAddr::new(addr.into(), port)],
|
||||
Host::Domain(domain) => {
|
||||
// Do a DNS lookup for the domain using our custom DNS resolver
|
||||
resolver
|
||||
.resolve_str(domain)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|a| SocketAddr::new(a, port))
|
||||
.collect()
|
||||
}
|
||||
};
|
||||
|
||||
let mut stream = Err(GatewayClientError::NoEndpointForConnection {
|
||||
address: endpoint.to_owned(),
|
||||
});
|
||||
for sock_addr in sock_addrs {
|
||||
let socket = if sock_addr.is_ipv4() {
|
||||
TcpSocket::new_v4()
|
||||
} else {
|
||||
TcpSocket::new_v6()
|
||||
}
|
||||
.map_err(|err| GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
source: err.into(),
|
||||
})?;
|
||||
|
||||
#[cfg(unix)]
|
||||
if let Some(callback) = connection_fd_callback.as_ref() {
|
||||
callback.as_ref()(socket.as_raw_fd());
|
||||
}
|
||||
|
||||
match socket.connect(sock_addr).await {
|
||||
Ok(s) => {
|
||||
stream = Ok(s);
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
stream = Err(GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
source: err.into(),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tokio_tungstenite::client_async_tls(endpoint, stream?)
|
||||
.await
|
||||
.map_err(|error| GatewayClientError::NetworkConnectionFailed {
|
||||
address: endpoint.to_owned(),
|
||||
source: error,
|
||||
})
|
||||
}
|
||||
@@ -43,8 +43,15 @@ pub enum GatewayClientError {
|
||||
#[error("connection failed: {address}: {source}")]
|
||||
NetworkConnectionFailed { address: String, source: WsError },
|
||||
|
||||
#[error("no socket address for endpoint: {address}")]
|
||||
NoEndpointForConnection { address: String },
|
||||
|
||||
#[error("Invalid URL: {0}")]
|
||||
InvalidURL(String),
|
||||
InvalidUrl(String),
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[error("resolution failed: {0}")]
|
||||
ResolutionFailed(#[from] nym_http_api_client::HickoryDnsError),
|
||||
|
||||
#[error("No shared key was provided or obtained")]
|
||||
NoSharedKeyAvailable,
|
||||
|
||||
@@ -11,8 +11,7 @@ use crate::{
|
||||
use nym_api_requests::ecash::models::{
|
||||
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
|
||||
BatchRedeemTicketsBody, EcashBatchTicketRedemptionResponse, EcashTicketVerificationResponse,
|
||||
IssuedTicketbooksChallengeResponse, IssuedTicketbooksForResponse, SpentCredentialsResponse,
|
||||
VerifyEcashTicketBody,
|
||||
IssuedTicketbooksChallengeResponse, IssuedTicketbooksForResponse, VerifyEcashTicketBody,
|
||||
};
|
||||
use nym_api_requests::ecash::{
|
||||
BlindSignRequestBody, BlindedSignatureResponse, PartialCoinIndicesSignatureResponse,
|
||||
@@ -647,13 +646,6 @@ impl NymApiClient {
|
||||
.await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn spent_credentials_filter(
|
||||
&self,
|
||||
) -> Result<SpentCredentialsResponse, ValidatorClientError> {
|
||||
Ok(self.nym_api.double_spending_filter_v1().await?)
|
||||
}
|
||||
|
||||
pub async fn partial_expiration_date_signatures(
|
||||
&self,
|
||||
expiration_date: Option<Date>,
|
||||
|
||||
@@ -65,6 +65,12 @@ pub enum EcashApiError {
|
||||
#[from]
|
||||
source: cosmrs::ErrorReport,
|
||||
},
|
||||
|
||||
#[error("nym api error")]
|
||||
NymApi {
|
||||
#[from]
|
||||
source: crate::ValidatorClientError,
|
||||
},
|
||||
}
|
||||
|
||||
impl TryFrom<ContractVKShare> for EcashApiClient {
|
||||
|
||||
@@ -31,6 +31,7 @@ pub use nym_api_requests::{
|
||||
StakeSaturationResponse, UptimeResponse,
|
||||
},
|
||||
nym_nodes::{CachedNodesResponse, SkimmedNode},
|
||||
NymNetworkDetailsResponse,
|
||||
};
|
||||
pub use nym_coconut_dkg_common::types::EpochId;
|
||||
use nym_contracts_common::IdentityKey;
|
||||
@@ -849,20 +850,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn double_spending_filter_v1(&self) -> Result<SpentCredentialsResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
routes::ECASH_ROUTES,
|
||||
routes::DOUBLE_SPENDING_FILTER_V1,
|
||||
],
|
||||
NO_PARAMS,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn partial_expiration_date_signatures(
|
||||
&self,
|
||||
@@ -1027,6 +1014,15 @@ pub trait NymApiClientExt: ApiClient {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_network_details(&self) -> Result<NymNetworkDetailsResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::NETWORK, routes::DETAILS],
|
||||
NO_PARAMS,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
|
||||
@@ -13,8 +13,6 @@ pub const DETAILED: &str = "detailed";
|
||||
pub const DETAILED_UNFILTERED: &str = "detailed-unfiltered";
|
||||
pub const ACTIVE: &str = "active";
|
||||
pub const REWARDED: &str = "rewarded";
|
||||
pub const DOUBLE_SPENDING_FILTER_V1: &str = "double-spending-filter-v1";
|
||||
|
||||
pub const ECASH_ROUTES: &str = "ecash";
|
||||
|
||||
pub use ecash::*;
|
||||
@@ -67,6 +65,8 @@ pub const STAKE_SATURATION: &str = "stake-saturation";
|
||||
pub const INCLUSION_CHANCE: &str = "inclusion-probability";
|
||||
pub const SUBMIT_GATEWAY: &str = "submit-gateway-monitoring-results";
|
||||
pub const SUBMIT_NODE: &str = "submit-node-monitoring-results";
|
||||
pub const PERFORMANCE: &str = "performance";
|
||||
|
||||
pub const SERVICE_PROVIDERS: &str = "services";
|
||||
|
||||
pub const DETAILS: &str = "details";
|
||||
pub const NETWORK: &str = "network";
|
||||
|
||||
@@ -8,81 +8,81 @@ use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug, PartialEq)]
|
||||
pub enum VestingContractError {
|
||||
#[error("VESTING ({}): {0}", line!())]
|
||||
#[error("VESTING ({l}): {0}", l = line!())]
|
||||
Std(#[from] StdError),
|
||||
|
||||
#[error("VESTING: {0}")]
|
||||
OverflowError(#[from] OverflowError),
|
||||
|
||||
#[error("VESTING ({}): Account does not exist - {0}", line!())]
|
||||
#[error("VESTING ({l}): Account does not exist - {0}", l = line!())]
|
||||
NoAccountForAddress(String),
|
||||
|
||||
#[error("VESTING ({}): Only admin can perform this action, {0} is not admin", line!())]
|
||||
#[error("VESTING ({l}): Only admin can perform this action, {0} is not admin", l = line!())]
|
||||
NotAdmin(String),
|
||||
|
||||
#[error("VESTING ({}): Balance not found for existing account ({0}), this is a bug", line!())]
|
||||
#[error("VESTING ({l}): Balance not found for existing account ({0}), this is a bug", l = line!())]
|
||||
NoBalanceForAddress(String),
|
||||
|
||||
#[error("VESTING ({}): Insufficient balance for address {0} -> {1}", line!())]
|
||||
#[error("VESTING ({l}): Insufficient balance for address {0} -> {1}", l = line!())]
|
||||
InsufficientBalance(String, u128),
|
||||
|
||||
#[error("VESTING ({}): Insufficient spendable balance for address {0} -> {1}", line!())]
|
||||
#[error("VESTING ({l}): Insufficient spendable balance for address {0} -> {1}", l = line!())]
|
||||
InsufficientSpendable(String, u128),
|
||||
|
||||
#[error(
|
||||
"VESTING ({}):Only delegation owner can perform delegation actions, {0} is not the delegation owner"
|
||||
, line!())]
|
||||
"VESTING ({l}):Only delegation owner can perform delegation actions, {0} is not the delegation owner"
|
||||
, l = line!())]
|
||||
NotDelegate(String),
|
||||
|
||||
#[error("VESTING ({}): Total vesting amount is inprobably low -> {0}, this is likely an error", line!())]
|
||||
#[error("VESTING ({l}): Total vesting amount is inprobably low -> {0}, this is likely an error", l = line!())]
|
||||
ImprobableVestingAmount(u128),
|
||||
|
||||
#[error("VESTING ({}): Address {0} has already bonded a node", line!())]
|
||||
#[error("VESTING ({l}): Address {0} has already bonded a node", l = line!())]
|
||||
AlreadyBonded(String),
|
||||
|
||||
#[error("VESTING ({}): Received empty funds vector", line!())]
|
||||
#[error("VESTING ({l}): Received empty funds vector", l = line!())]
|
||||
EmptyFunds,
|
||||
|
||||
#[error("VESTING ({}): Received wrong denom: {0}, expected {1}", line!())]
|
||||
#[error("VESTING ({l}): Received wrong denom: {0}, expected {1}", l = line!())]
|
||||
WrongDenom(String, String),
|
||||
|
||||
#[error("VESTING ({}): Received multiple denoms, expected 1", line!())]
|
||||
#[error("VESTING ({l}): Received multiple denoms, expected 1", l = line!())]
|
||||
MultipleDenoms,
|
||||
|
||||
#[error("VESTING ({}): No delegations found for account {0}, mix_identity {1}", line!())]
|
||||
#[error("VESTING ({l}): No delegations found for account {0}, mix_identity {1}", l = line!())]
|
||||
NoSuchDelegation(Addr, NodeId),
|
||||
|
||||
#[error("VESTING ({}): Only mixnet contract can perform this operation, got {0}", line!())]
|
||||
#[error("VESTING ({l}): Only mixnet contract can perform this operation, got {0}", l = line!())]
|
||||
NotMixnetContract(Addr),
|
||||
|
||||
#[error("VESTING ({}): Calculation underflowed", line!())]
|
||||
#[error("VESTING ({l}): Calculation underflowed", l = line!())]
|
||||
Underflow,
|
||||
|
||||
#[error("VESTING ({}): No bond found for account {0}", line!())]
|
||||
#[error("VESTING ({l}): No bond found for account {0}", l = line!())]
|
||||
NoBondFound(String),
|
||||
|
||||
#[error("VESTING: Attempted to reduce mixnode bond pledge below zero! The current pledge is {current} and we attempted to reduce it by {decrease_by}.")]
|
||||
InvalidBondPledgeReduction { current: Coin, decrease_by: Coin },
|
||||
|
||||
#[error("VESTING ({}): Action can only be executed by account owner -> {0}", line!())]
|
||||
#[error("VESTING ({l}): Action can only be executed by account owner -> {0}", l = line!())]
|
||||
NotOwner(String),
|
||||
|
||||
#[error("VESTING ({}): Invalid address: {0}", line!())]
|
||||
#[error("VESTING ({l}): Invalid address: {0}", l = line!())]
|
||||
InvalidAddress(String),
|
||||
|
||||
#[error("VESTING ({}): Account already exists: {0}", line!())]
|
||||
#[error("VESTING ({l}): Account already exists: {0}", l = line!())]
|
||||
AccountAlreadyExists(String),
|
||||
|
||||
#[error("VESTING ({}): Staking account already exists: {0}", line!())]
|
||||
#[error("VESTING ({l}): Staking account already exists: {0}", l = line!())]
|
||||
StakingAccountAlreadyExists(String),
|
||||
|
||||
#[error("VESTING ({}): Too few coins sent for vesting account creation, sent {sent}, need at least {need}", line!())]
|
||||
#[error("VESTING ({l}): Too few coins sent for vesting account creation, sent {sent}, need at least {need}", l = line!())]
|
||||
MinVestingFunds { sent: u128, need: u128 },
|
||||
|
||||
#[error("VESTING ({}): Maximum amount of locked coins has already been pledged: {current}, cap is {cap}", line!())]
|
||||
#[error("VESTING ({l}): Maximum amount of locked coins has already been pledged: {current}, cap is {cap}", l = line!())]
|
||||
LockedPledgeCapReached { current: Uint128, cap: Uint128 },
|
||||
|
||||
#[error("VESTING: ({}: Account owned by {owner} has unpopulated vesting periods!", line!())]
|
||||
#[error("VESTING: ({l}: Account owned by {owner} has unpopulated vesting periods!", l = line!())]
|
||||
UnpopulatedVestingPeriods { owner: Addr },
|
||||
|
||||
#[error("VESTING: Vesting account associated with {0} already exists, only addresses with not existing vesting accounts can be added as staking addresses")]
|
||||
|
||||
@@ -26,7 +26,6 @@ nym-api-requests = { path = "../../nym-api/nym-api-requests" }
|
||||
nym-credentials = { path = "../credentials" }
|
||||
nym-credentials-interface = { path = "../credentials-interface" }
|
||||
nym-ecash-contract-common = { path = "../cosmwasm-smart-contracts/ecash-contract" }
|
||||
nym-ecash-double-spending = { path = "../ecash-double-spending" }
|
||||
nym-gateway-requests = { path = "../gateway-requests" }
|
||||
nym-gateway-storage = { path = "../gateway-storage" }
|
||||
nym-task = { path = "../task" }
|
||||
|
||||
@@ -13,6 +13,7 @@ use nym_api_requests::constants::MIN_BATCH_REDEMPTION_DELAY;
|
||||
use nym_api_requests::ecash::models::{BatchRedeemTicketsBody, VerifyEcashTicketBody};
|
||||
use nym_credentials_interface::Bandwidth;
|
||||
use nym_credentials_interface::{ClientTicket, TicketType};
|
||||
use nym_validator_client::coconut::EcashApiError;
|
||||
use nym_validator_client::nym_api::EpochId;
|
||||
use nym_validator_client::nyxd::contract_traits::{
|
||||
EcashSigningClient, MultisigQueryClient, MultisigSigningClient, PagedMultisigQueryClient,
|
||||
@@ -352,7 +353,9 @@ impl CredentialHandler {
|
||||
}
|
||||
Err(err) => {
|
||||
error!("failed to send ticket {ticket_id} for verification to ecash signer '{client}': {err}. if we don't reach quorum, we'll retry later");
|
||||
Ok(false)
|
||||
Err(EcashTicketError::ApiFailure(EcashApiError::NymApi {
|
||||
source: err,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ ed25519-dalek = { workspace = true, features = ["rand_core"], optional = true }
|
||||
rand = { workspace = true, optional = true }
|
||||
serde_bytes = { workspace = true, optional = true }
|
||||
serde = { workspace = true, features = ["derive"], optional = true }
|
||||
sha2 = { workspace = true, optional = true }
|
||||
subtle-encoding = { workspace = true, features = ["bech32-preview"] }
|
||||
thiserror = { workspace = true }
|
||||
zeroize = { workspace = true, optional = true, features = ["zeroize_derive"] }
|
||||
@@ -36,11 +37,10 @@ nym-pemstore = { path = "../../common/pemstore", version = "0.3.0" }
|
||||
rand_chacha = { workspace = true }
|
||||
|
||||
[features]
|
||||
default = ["sphinx"]
|
||||
default = []
|
||||
aead = ["dep:aead", "aead/std", "aes-gcm-siv", "generic-array"]
|
||||
serde = ["dep:serde", "serde_bytes", "ed25519-dalek/serde", "x25519-dalek/serde"]
|
||||
asymmetric = ["x25519-dalek", "ed25519-dalek", "zeroize"]
|
||||
hashing = ["blake3", "digest", "hkdf", "hmac", "generic-array"]
|
||||
hashing = ["blake3", "digest", "hkdf", "hmac", "generic-array", "sha2"]
|
||||
stream_cipher = ["aes", "ctr", "cipher", "generic-array"]
|
||||
sphinx = ["nym-sphinx-types/sphinx"]
|
||||
outfox = ["nym-sphinx-types/outfox"]
|
||||
sphinx = ["nym-sphinx-types/sphinx"]
|
||||
@@ -202,6 +202,18 @@ impl PemStorableKey for PublicKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<x25519_dalek::PublicKey> for PublicKey {
|
||||
fn from(public_key: x25519_dalek::PublicKey) -> Self {
|
||||
PublicKey(public_key)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PublicKey> for x25519_dalek::PublicKey {
|
||||
fn from(public_key: PublicKey) -> Self {
|
||||
public_key.0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Zeroize, ZeroizeOnDrop)]
|
||||
pub struct PrivateKey(x25519_dalek::StaticSecret);
|
||||
|
||||
@@ -308,109 +320,15 @@ impl PemStorableKey for PrivateKey {
|
||||
}
|
||||
}
|
||||
|
||||
// compatibility with sphinx keys:
|
||||
#[cfg(feature = "sphinx")]
|
||||
impl From<PublicKey> for nym_sphinx_types::PublicKey {
|
||||
fn from(key: PublicKey) -> Self {
|
||||
nym_sphinx_types::PublicKey::from(key.to_bytes())
|
||||
impl From<x25519_dalek::StaticSecret> for PrivateKey {
|
||||
fn from(secret: x25519_dalek::StaticSecret) -> Self {
|
||||
PrivateKey(secret)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sphinx")]
|
||||
impl<'a> From<&'a PublicKey> for nym_sphinx_types::PublicKey {
|
||||
fn from(key: &'a PublicKey) -> Self {
|
||||
nym_sphinx_types::PublicKey::from((*key).to_bytes())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sphinx")]
|
||||
impl From<nym_sphinx_types::PublicKey> for PublicKey {
|
||||
fn from(pub_key: nym_sphinx_types::PublicKey) -> Self {
|
||||
Self(x25519_dalek::PublicKey::from(*pub_key.as_bytes()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sphinx")]
|
||||
impl From<PrivateKey> for nym_sphinx_types::PrivateKey {
|
||||
fn from(key: PrivateKey) -> Self {
|
||||
nym_sphinx_types::PrivateKey::from(key.to_bytes())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sphinx")]
|
||||
impl<'a> From<&'a PrivateKey> for nym_sphinx_types::PrivateKey {
|
||||
fn from(key: &'a PrivateKey) -> Self {
|
||||
nym_sphinx_types::PrivateKey::from(key.to_bytes())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sphinx")]
|
||||
impl From<nym_sphinx_types::PrivateKey> for PrivateKey {
|
||||
fn from(private_key: nym_sphinx_types::PrivateKey) -> Self {
|
||||
let private_key_bytes = private_key.to_bytes();
|
||||
assert_eq!(private_key_bytes.len(), PRIVATE_KEY_SIZE);
|
||||
Self::from_bytes(&private_key_bytes).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod sphinx_key_conversion {
|
||||
use super::*;
|
||||
use rand_chacha::rand_core::SeedableRng;
|
||||
use rand_chacha::ChaCha20Rng;
|
||||
|
||||
pub(super) fn test_rng() -> ChaCha20Rng {
|
||||
let dummy_seed = [42u8; 32];
|
||||
ChaCha20Rng::from_seed(dummy_seed)
|
||||
}
|
||||
|
||||
const NUM_ITERATIONS: usize = 100;
|
||||
|
||||
#[test]
|
||||
fn works_for_forward_conversion() {
|
||||
let mut rng = test_rng();
|
||||
|
||||
for _ in 0..NUM_ITERATIONS {
|
||||
let keys = KeyPair::new(&mut rng);
|
||||
let private = &keys.private_key;
|
||||
let public = &keys.public_key;
|
||||
|
||||
let dummy_remote = KeyPair::new(&mut rng);
|
||||
let dh1 = private.diffie_hellman(&dummy_remote.public_key);
|
||||
|
||||
let public_bytes = public.to_bytes();
|
||||
|
||||
let sphinx_private: nym_sphinx_types::PrivateKey = private.into();
|
||||
let recovered_private = PrivateKey::from(sphinx_private);
|
||||
|
||||
let dh2 = recovered_private.diffie_hellman(&dummy_remote.public_key);
|
||||
|
||||
let sphinx_public: nym_sphinx_types::PublicKey = public.into();
|
||||
let recovered_public = PublicKey::from(sphinx_public);
|
||||
assert_eq!(public_bytes, recovered_public.to_bytes());
|
||||
|
||||
// even though the byte representation of the private key changed, the resultant DH is the same
|
||||
// which is what matters
|
||||
assert_eq!(dh1, dh2);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn works_for_backward_conversion() {
|
||||
for _ in 0..NUM_ITERATIONS {
|
||||
let (sphinx_private, sphinx_public) = nym_sphinx_types::crypto::keygen();
|
||||
|
||||
let private_bytes = sphinx_private.to_bytes();
|
||||
let public_bytes = sphinx_public.as_bytes();
|
||||
|
||||
let private: PrivateKey = sphinx_private.into();
|
||||
let recovered_sphinx_private: nym_sphinx_types::PrivateKey = private.into();
|
||||
|
||||
let public: PublicKey = sphinx_public.into();
|
||||
let recovered_sphinx_public: nym_sphinx_types::PublicKey = public.into();
|
||||
assert_eq!(private_bytes, recovered_sphinx_private.to_bytes());
|
||||
assert_eq!(public_bytes, recovered_sphinx_public.as_bytes());
|
||||
}
|
||||
impl AsRef<x25519_dalek::StaticSecret> for PrivateKey {
|
||||
fn as_ref(&self) -> &x25519_dalek::StaticSecret {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
pub use ed25519_dalek::SignatureError;
|
||||
use ed25519_dalek::{Signer, SigningKey};
|
||||
use ed25519_dalek::{SecretKey, Signer, SigningKey};
|
||||
pub use ed25519_dalek::{Verifier, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH, SIGNATURE_LENGTH};
|
||||
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
|
||||
use std::fmt::{self, Debug, Display, Formatter};
|
||||
@@ -18,7 +18,7 @@ pub mod serde_helpers;
|
||||
use nym_sphinx_types::{DestinationAddressBytes, DESTINATION_ADDRESS_LENGTH};
|
||||
|
||||
#[cfg(feature = "rand")]
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use rand::{CryptoRng, Rng, RngCore};
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::de::Error as SerdeError;
|
||||
#[cfg(feature = "serde")]
|
||||
@@ -62,16 +62,33 @@ pub struct KeyPair {
|
||||
// nothing secret about public key
|
||||
#[zeroize(skip)]
|
||||
public_key: PublicKey,
|
||||
|
||||
#[zeroize(skip)]
|
||||
index: u32,
|
||||
}
|
||||
|
||||
/// All keys will always have an index field populated this is to prevent anyone from figuring out if
|
||||
/// the keys are derived or random, and alter their behaviour based on that.
|
||||
impl KeyPair {
|
||||
#[cfg(feature = "rand")]
|
||||
pub fn new<R: RngCore + CryptoRng>(rng: &mut R) -> Self {
|
||||
let index = rng.gen();
|
||||
let ed25519_signing_key = ed25519_dalek::SigningKey::generate(rng);
|
||||
|
||||
KeyPair {
|
||||
private_key: PrivateKey(ed25519_signing_key.to_bytes()),
|
||||
public_key: PublicKey(ed25519_signing_key.verifying_key()),
|
||||
index,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_secret(secret: SecretKey, index: u32) -> Self {
|
||||
let ed25519_signing_key = SigningKey::from(secret);
|
||||
|
||||
KeyPair {
|
||||
private_key: PrivateKey(ed25519_signing_key.to_bytes()),
|
||||
public_key: PublicKey(ed25519_signing_key.verifying_key()),
|
||||
index,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,15 +104,31 @@ impl KeyPair {
|
||||
Ok(KeyPair {
|
||||
private_key: PrivateKey::from_bytes(priv_bytes)?,
|
||||
public_key: PublicKey::from_bytes(pub_bytes)?,
|
||||
index: fake_index(pub_bytes),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Reduces a byte slice into a u32 value by XOR-ing all its bytes into a 4-byte accumulator.
|
||||
/// The process iterates over every byte in the input slice, XOR-ing each one into a slot based on its index modulo 4.
|
||||
/// If the input slice contains fewer than 4 bytes, the remaining positions in the accumulator remain zero.
|
||||
/// Finally, the accumulator is interpreted in big-endian order to produce the resulting u32.
|
||||
/// Index is used to verify deterministic identity key, master key and salt are also requried for verification.
|
||||
fn fake_index(input: &[u8]) -> u32 {
|
||||
let mut accumulator = [0u8; 4];
|
||||
for (i, &byte) in input.iter().enumerate() {
|
||||
accumulator[i % 4] ^= byte;
|
||||
}
|
||||
u32::from_be_bytes(accumulator)
|
||||
}
|
||||
|
||||
impl From<PrivateKey> for KeyPair {
|
||||
fn from(private_key: PrivateKey) -> Self {
|
||||
let public_key = (&private_key).into();
|
||||
KeyPair {
|
||||
public_key: (&private_key).into(),
|
||||
public_key,
|
||||
private_key,
|
||||
index: fake_index(public_key.to_bytes().as_ref()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -115,6 +148,7 @@ impl PemStorableKeyPair for KeyPair {
|
||||
KeyPair {
|
||||
private_key,
|
||||
public_key,
|
||||
index: fake_index(public_key.to_bytes().as_ref()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,10 @@ use hkdf::{
|
||||
},
|
||||
Hkdf,
|
||||
};
|
||||
use sha2::{Sha256, Sha512};
|
||||
|
||||
pub use hkdf::InvalidLength;
|
||||
use zeroize::ZeroizeOnDrop;
|
||||
|
||||
/// Perform HKDF `extract` then `expand` as a single step.
|
||||
pub fn extract_then_expand<D>(
|
||||
@@ -28,3 +32,80 @@ where
|
||||
|
||||
Ok(okm)
|
||||
}
|
||||
|
||||
/// `DerivationMaterial` encapsulates parameters for deterministic key derivation using
|
||||
/// HKDF (SHA-512).
|
||||
///
|
||||
/// It consists of:
|
||||
/// - A master key (`master_key`): the base secret.
|
||||
/// - An index (`index`): ensures unique derivations.
|
||||
/// - A salt (`salt`): adds additional uniqueness, should be application specific.
|
||||
///
|
||||
/// Use the `derive_secret()` method to generate a 32-byte secret. To prepare for a new derivation,
|
||||
/// call the `next()` method, which increments the index. **It is the caller's responsibility to
|
||||
/// track and persist the derivation index if keys need to be rederived.**
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use nym_crypto::hkdf::DerivationMaterial;
|
||||
///
|
||||
/// let master_key = [0u8; 32]; // your secret master key
|
||||
/// let salt = b"unique-salt-value";
|
||||
/// let material = DerivationMaterial::new(master_key, 0, salt);
|
||||
///
|
||||
/// // Derive a secret
|
||||
/// let secret = material.derive_secret().expect("Failed to derive secret");
|
||||
///
|
||||
/// // Prepare for the next derivation
|
||||
/// let next_material = material.next();
|
||||
/// ```
|
||||
#[derive(ZeroizeOnDrop)]
|
||||
pub struct DerivationMaterial {
|
||||
master_key: [u8; 32],
|
||||
index: u32,
|
||||
salt: Vec<u8>,
|
||||
}
|
||||
|
||||
impl DerivationMaterial {
|
||||
pub fn index(&self) -> u32 {
|
||||
self.index
|
||||
}
|
||||
|
||||
pub fn salt(&self) -> &[u8] {
|
||||
&self.salt
|
||||
}
|
||||
|
||||
/// Derives a 32-byte seed from a master seed and an index using HKDF (with SHA-512).
|
||||
///
|
||||
/// The `salt` and the use of the index (as info) bind this derivation to an application/client.
|
||||
pub fn derive_secret(&self) -> Result<[u8; 32], hkdf::InvalidLength> {
|
||||
let salt = &self.salt;
|
||||
let info = self.index.to_be_bytes(); // Use the index as info
|
||||
let hk = Hkdf::<Sha512>::new(Some(salt), &self.master_key);
|
||||
let mut okm = [0u8; 32];
|
||||
hk.expand(&info, &mut okm)?;
|
||||
Ok(okm)
|
||||
}
|
||||
|
||||
pub fn new<T: AsRef<[u8]>>(master_key: T, index: u32, salt: &[u8]) -> Self {
|
||||
// Coerce master_key to [u8; 32]
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(master_key.as_ref());
|
||||
let master_key = hasher.finalize().into();
|
||||
|
||||
Self {
|
||||
master_key,
|
||||
index,
|
||||
salt: salt.to_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next(&self) -> Self {
|
||||
Self {
|
||||
master_key: self.master_key,
|
||||
index: self.index + 1,
|
||||
salt: self.salt.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
[package]
|
||||
name = "nym-ecash-double-spending"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
bit-vec = { workspace = true }
|
||||
bloomfilter = { workspace = true }
|
||||
nym-network-defaults = { path = "../network-defaults" }
|
||||
@@ -1,136 +0,0 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use bit_vec::BitVec;
|
||||
use bloomfilter::Bloom;
|
||||
use nym_network_defaults::{BloomfilterParameters, ECASH_DS_BLOOMFILTER_PARAMS};
|
||||
|
||||
pub struct DoubleSpendingFilter {
|
||||
params: BloomfilterParameters,
|
||||
inner: Bloom<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl Default for DoubleSpendingFilter {
|
||||
fn default() -> Self {
|
||||
DoubleSpendingFilter::new_empty_ecash()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bloom_from_params<T>(params: &BloomfilterParameters, bitvec: BitVec) -> Bloom<Vec<T>> {
|
||||
assert_eq!(params.bitmap_size, bitvec.len() as u64);
|
||||
|
||||
Bloom::from_bit_vec(
|
||||
bitvec,
|
||||
params.bitmap_size,
|
||||
params.num_hashes,
|
||||
params.sip_keys,
|
||||
)
|
||||
}
|
||||
|
||||
impl DoubleSpendingFilter {
|
||||
pub fn new_empty(params: BloomfilterParameters) -> Self {
|
||||
let bitvec = BitVec::from_elem(params.bitmap_size as usize, false);
|
||||
DoubleSpendingFilter {
|
||||
inner: bloom_from_params(¶ms, bitvec),
|
||||
params,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn params(&self) -> BloomfilterParameters {
|
||||
self.params
|
||||
}
|
||||
|
||||
pub fn rebuild(&self) -> DoubleSpendingFilterBuilder {
|
||||
DoubleSpendingFilterBuilder::new(self.params)
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.inner.clear()
|
||||
}
|
||||
|
||||
pub fn new_empty_ecash() -> Self {
|
||||
DoubleSpendingFilter::new_empty(ECASH_DS_BLOOMFILTER_PARAMS)
|
||||
}
|
||||
|
||||
pub fn builder(params: BloomfilterParameters) -> DoubleSpendingFilterBuilder {
|
||||
DoubleSpendingFilterBuilder::new(params)
|
||||
}
|
||||
|
||||
pub fn from_bytes(params: BloomfilterParameters, bitmap: &[u8]) -> Self {
|
||||
DoubleSpendingFilter {
|
||||
inner: bloom_from_params(¶ms, BitVec::from_bytes(bitmap)),
|
||||
params,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn replace_bitvec(&mut self, new: BitVec) {
|
||||
self.inner = bloom_from_params(&self.params, new)
|
||||
}
|
||||
|
||||
pub fn dump_bitmap(&self) -> Vec<u8> {
|
||||
self.inner.bitmap()
|
||||
}
|
||||
|
||||
pub fn set(&mut self, b: &Vec<u8>) {
|
||||
self.inner.set(b);
|
||||
}
|
||||
|
||||
pub fn check(&self, b: &Vec<u8>) -> bool {
|
||||
self.inner.check(b)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DoubleSpendingFilterBuilder {
|
||||
params: BloomfilterParameters,
|
||||
bit_vec_builder: Option<BitVecBuilder>,
|
||||
}
|
||||
|
||||
impl DoubleSpendingFilterBuilder {
|
||||
pub fn new(params: BloomfilterParameters) -> Self {
|
||||
DoubleSpendingFilterBuilder {
|
||||
params,
|
||||
bit_vec_builder: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_bytes(&mut self, b: &[u8]) -> bool {
|
||||
match &mut self.bit_vec_builder {
|
||||
None => {
|
||||
self.bit_vec_builder = Some(BitVecBuilder::new(b));
|
||||
true
|
||||
}
|
||||
Some(builder) => builder.add_bytes(b),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build(self) -> DoubleSpendingFilter {
|
||||
match self.bit_vec_builder {
|
||||
None => DoubleSpendingFilter::new_empty(self.params),
|
||||
Some(builder) => DoubleSpendingFilter {
|
||||
inner: bloom_from_params(&self.params, builder.finish()),
|
||||
params: self.params,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BitVecBuilder(BitVec);
|
||||
|
||||
impl BitVecBuilder {
|
||||
pub fn new(initial_bitmap: &[u8]) -> Self {
|
||||
BitVecBuilder(BitVec::from_bytes(initial_bitmap))
|
||||
}
|
||||
|
||||
pub fn add_bytes(&mut self, b: &[u8]) -> bool {
|
||||
let add = BitVec::from_bytes(b);
|
||||
if self.0.len() != add.len() {
|
||||
return false;
|
||||
}
|
||||
self.0.or(&add);
|
||||
true
|
||||
}
|
||||
|
||||
pub fn finish(self) -> BitVec {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
@@ -59,12 +59,12 @@ pub enum GatewayRequestsError {
|
||||
source: NymNodeRoutingAddressError,
|
||||
},
|
||||
|
||||
#[error("received request had invalid size. (actual: {0}, but expected one of: {} (ACK), {} (REGULAR), {}, {}, {} (EXTENDED))",
|
||||
PacketSize::AckPacket.size(),
|
||||
PacketSize::RegularPacket.size(),
|
||||
PacketSize::ExtendedPacket8.size(),
|
||||
PacketSize::ExtendedPacket16.size(),
|
||||
PacketSize::ExtendedPacket32.size())
|
||||
#[error("received request had invalid size. (actual: {0}, but expected one of: {a} (ACK), {r} (REGULAR), {e8}, {e16}, {e32} (EXTENDED))",
|
||||
a = PacketSize::AckPacket.size(),
|
||||
r = PacketSize::RegularPacket.size(),
|
||||
e8 = PacketSize::ExtendedPacket8.size(),
|
||||
e16 = PacketSize::ExtendedPacket16.size(),
|
||||
e32 = PacketSize::ExtendedPacket32.size())
|
||||
]
|
||||
RequestOfInvalidSize(usize),
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ use std::str::FromStr;
|
||||
use tungstenite::Message;
|
||||
|
||||
// wrapper for all encrypted requests for ease of use
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[non_exhaustive]
|
||||
pub enum ClientRequest {
|
||||
UpgradeKey {
|
||||
|
||||
@@ -3,5 +3,22 @@
|
||||
* SPDX-License-Identifier: GPL-3.0-only
|
||||
*/
|
||||
|
||||
|
||||
ALTER TABLE message_store
|
||||
ADD COLUMN timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP;
|
||||
RENAME TO message_store_old;
|
||||
|
||||
-- add new column with message timestamp.
|
||||
-- note: we can't simply alter existing table to add it since the default value is non-constant
|
||||
CREATE TABLE message_store
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
client_address_bs58 TEXT NOT NULL,
|
||||
content BLOB NOT NULL,
|
||||
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
INSERT INTO message_store(id, client_address_bs58, content)
|
||||
SELECT id, client_address_bs58, content
|
||||
FROM message_store_old;
|
||||
DROP TABLE message_store_old;
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
reqwest = { workspace = true, features = ["json", "gzip"] }
|
||||
http.workspace = true
|
||||
url = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
|
||||
@@ -40,6 +40,7 @@ struct SocketAddrs {
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("hickory-dns resolver error: {hickory_error}")]
|
||||
/// Error occurring while resolving a hostname into an IP address.
|
||||
pub struct HickoryDnsError {
|
||||
#[from]
|
||||
hickory_error: ResolveError,
|
||||
|
||||
+425
-336
File diff suppressed because it is too large
Load Diff
@@ -7,11 +7,16 @@ use http::HeaderValue;
|
||||
use nym_bin_common::build_information::{BinaryBuildInformation, BinaryBuildInformationOwned};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Characteristic elements sent to the API providing basic context information of the requesting client.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct UserAgent {
|
||||
/// The internal crate / application / subsystem making use of API client
|
||||
pub application: String,
|
||||
/// version of the calling crate / application / subsystem
|
||||
pub version: String,
|
||||
/// client platform
|
||||
pub platform: String,
|
||||
/// source commit version for the calling calling crate / subsystem
|
||||
pub git_commit: String,
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ mime = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json.workspace = true
|
||||
serde_yaml = { workspace = true }
|
||||
subtle.workspace = true
|
||||
tower = { workspace = true }
|
||||
tracing.workspace = true
|
||||
utoipa = { workspace = true, optional = true }
|
||||
|
||||
@@ -7,6 +7,7 @@ use axum::{extract::Request, response::Response};
|
||||
use futures::future::BoxFuture;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use subtle::ConstantTimeEq;
|
||||
use tower::{Layer, Service};
|
||||
use tracing::{debug, instrument, trace};
|
||||
use zeroize::Zeroizing;
|
||||
@@ -76,7 +77,7 @@ impl<S> RequireAuth<S> {
|
||||
return Err("`Authorization` header must contain non-empty `Bearer` token");
|
||||
}
|
||||
|
||||
if self.bearer_token.as_str() != bearer_token {
|
||||
if bool::from(self.bearer_token.as_bytes().ct_ne(bearer_token.as_bytes())) {
|
||||
return Err("`Authorization` header does not contain the correct `Bearer` token");
|
||||
}
|
||||
|
||||
|
||||
@@ -26,4 +26,4 @@ utoipa = [ "dep:utoipa" ]
|
||||
|
||||
[build-dependencies]
|
||||
regex = { workspace = true }
|
||||
cargo_metadata = { version = "0.18" }
|
||||
cargo_metadata = { workspace = true }
|
||||
|
||||
@@ -31,7 +31,7 @@ nym-pemstore = { path = "../pemstore" }
|
||||
nym-network-defaults = { path = "../network-defaults", default-features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = { version = "0.5.1", features = ["html_reports"] }
|
||||
criterion = { workspace = true, features = ["html_reports"] }
|
||||
|
||||
|
||||
[[bench]]
|
||||
|
||||
@@ -48,12 +48,10 @@ features = ["sync"]
|
||||
[features]
|
||||
default = ["sphinx"]
|
||||
sphinx = [
|
||||
"nym-crypto/sphinx",
|
||||
"nym-sphinx-params/sphinx",
|
||||
"nym-sphinx-types/sphinx",
|
||||
]
|
||||
outfox = [
|
||||
"nym-crypto/outfox",
|
||||
"nym-sphinx-params/outfox",
|
||||
"nym-sphinx-types/outfox",
|
||||
]
|
||||
|
||||
@@ -8,7 +8,7 @@ license = { workspace = true }
|
||||
repository = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
nym-crypto = { path = "../../crypto", features = ["asymmetric"] } # all addresses are expressed in terms on their crypto keys
|
||||
nym-crypto = { path = "../../crypto", features = ["asymmetric", "sphinx"] } # all addresses are expressed in terms on their crypto keys
|
||||
nym-sphinx-types = { path = "../types", features = ["sphinx"] } # we need to be able to refer to some types defined inside sphinx crate
|
||||
serde = { workspace = true } # implementing serialization/deserialization for some types, like `Recipient`
|
||||
thiserror = { workspace = true }
|
||||
|
||||
@@ -559,7 +559,7 @@ mod tests {
|
||||
let mut address_bytes = [0; NODE_ADDRESS_LENGTH];
|
||||
rng.fill_bytes(&mut address_bytes);
|
||||
|
||||
let dummy_private = PrivateKey::new_with_rng(rng);
|
||||
let dummy_private = PrivateKey::random_from_rng(rng);
|
||||
let pub_key = (&dummy_private).into();
|
||||
Node {
|
||||
address: NodeAddressBytes::from_bytes(address_bytes),
|
||||
|
||||
@@ -130,28 +130,33 @@ impl Decoder for NymCodec {
|
||||
mod packet_encoding {
|
||||
use super::*;
|
||||
use nym_sphinx_types::{
|
||||
crypto, Delay as SphinxDelay, Destination, DestinationAddressBytes, Node, NodeAddressBytes,
|
||||
DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, NODE_ADDRESS_LENGTH,
|
||||
Delay as SphinxDelay, Destination, DestinationAddressBytes, Node, NodeAddressBytes,
|
||||
PrivateKey, DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, NODE_ADDRESS_LENGTH,
|
||||
};
|
||||
|
||||
fn random_pubkey() -> nym_sphinx_types::PublicKey {
|
||||
let private_key = PrivateKey::random();
|
||||
(&private_key).into()
|
||||
}
|
||||
|
||||
fn make_valid_outfox_packet(size: PacketSize) -> NymPacket {
|
||||
let (_, node1_pk) = crypto::keygen();
|
||||
let node1_pk = random_pubkey();
|
||||
let node1 = Node::new(
|
||||
NodeAddressBytes::from_bytes([5u8; NODE_ADDRESS_LENGTH]),
|
||||
node1_pk,
|
||||
);
|
||||
let (_, node2_pk) = crypto::keygen();
|
||||
let node2_pk = random_pubkey();
|
||||
let node2 = Node::new(
|
||||
NodeAddressBytes::from_bytes([4u8; NODE_ADDRESS_LENGTH]),
|
||||
node2_pk,
|
||||
);
|
||||
let (_, node3_pk) = crypto::keygen();
|
||||
let node3_pk = random_pubkey();
|
||||
let node3 = Node::new(
|
||||
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
|
||||
node3_pk,
|
||||
);
|
||||
|
||||
let (_, node4_pk) = crypto::keygen();
|
||||
let node4_pk = random_pubkey();
|
||||
let node4 = Node::new(
|
||||
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
|
||||
node4_pk,
|
||||
@@ -170,17 +175,17 @@ mod packet_encoding {
|
||||
}
|
||||
|
||||
fn make_valid_sphinx_packet(size: PacketSize) -> NymPacket {
|
||||
let (_, node1_pk) = crypto::keygen();
|
||||
let node1_pk = random_pubkey();
|
||||
let node1 = Node::new(
|
||||
NodeAddressBytes::from_bytes([5u8; NODE_ADDRESS_LENGTH]),
|
||||
node1_pk,
|
||||
);
|
||||
let (_, node2_pk) = crypto::keygen();
|
||||
let node2_pk = random_pubkey();
|
||||
let node2 = Node::new(
|
||||
NodeAddressBytes::from_bytes([4u8; NODE_ADDRESS_LENGTH]),
|
||||
node2_pk,
|
||||
);
|
||||
let (_, node3_pk) = crypto::keygen();
|
||||
let node3_pk = random_pubkey();
|
||||
let node3 = Node::new(
|
||||
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
|
||||
node3_pk,
|
||||
|
||||
@@ -4,8 +4,10 @@ use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressE
|
||||
use nym_sphinx_params::{PacketSize, PacketType};
|
||||
use nym_sphinx_types::{
|
||||
Delay as SphinxDelay, DestinationAddressBytes, NodeAddressBytes, NymPacket, NymPacketError,
|
||||
NymProcessedPacket, OutfoxError, PrivateKey, ProcessedPacket, SphinxError,
|
||||
NymProcessedPacket, OutfoxError, PrivateKey, ProcessedPacketData, SphinxError,
|
||||
Version as SphinxPacketVersion,
|
||||
};
|
||||
use std::fmt::Display;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::packet::FramedNymPacket;
|
||||
@@ -13,12 +15,38 @@ use nym_metrics::nanos;
|
||||
use nym_sphinx_forwarding::packet::MixPacket;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MixProcessingResult {
|
||||
pub enum MixProcessingResultData {
|
||||
/// Contains unwrapped data that should first get delayed before being sent to next hop.
|
||||
ForwardHop(MixPacket, Option<SphinxDelay>),
|
||||
ForwardHop {
|
||||
packet: MixPacket,
|
||||
delay: Option<SphinxDelay>,
|
||||
},
|
||||
|
||||
/// Contains all data extracted out of the final hop packet that could be forwarded to the destination.
|
||||
FinalHop(ProcessedFinalHop),
|
||||
FinalHop { final_hop_data: ProcessedFinalHop },
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum MixPacketVersion {
|
||||
Outfox,
|
||||
Sphinx(SphinxPacketVersion),
|
||||
}
|
||||
|
||||
impl Display for MixPacketVersion {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
MixPacketVersion::Outfox => "outfox".fmt(f),
|
||||
MixPacketVersion::Sphinx(sphinx_version) => {
|
||||
write!(f, "sphinx-{}", sphinx_version.value())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MixProcessingResult {
|
||||
pub packet_version: MixPacketVersion,
|
||||
pub processing_data: MixProcessingResultData,
|
||||
}
|
||||
|
||||
type ForwardAck = MixPacket;
|
||||
@@ -107,37 +135,63 @@ fn perform_final_processing(
|
||||
) -> Result<MixProcessingResult, PacketProcessingError> {
|
||||
match packet {
|
||||
NymProcessedPacket::Sphinx(packet) => {
|
||||
match packet {
|
||||
ProcessedPacket::ForwardHop(packet, address, delay) => {
|
||||
process_forward_hop(NymPacket::Sphinx(*packet), address, delay, packet_type)
|
||||
}
|
||||
let processing_data = match packet.data {
|
||||
ProcessedPacketData::ForwardHop {
|
||||
next_hop_packet,
|
||||
next_hop_address,
|
||||
delay,
|
||||
} => process_forward_hop(
|
||||
NymPacket::Sphinx(next_hop_packet),
|
||||
next_hop_address,
|
||||
delay,
|
||||
packet_type,
|
||||
),
|
||||
// right now there's no use for the surb_id included in the header - probably it should get removed from the
|
||||
// sphinx all together?
|
||||
ProcessedPacket::FinalHop(destination, _, payload) => process_final_hop(
|
||||
ProcessedPacketData::FinalHop {
|
||||
destination,
|
||||
identifier: _,
|
||||
payload,
|
||||
} => process_final_hop(
|
||||
destination,
|
||||
payload.recover_plaintext()?,
|
||||
packet_size,
|
||||
packet_type,
|
||||
),
|
||||
}
|
||||
}?;
|
||||
|
||||
Ok(MixProcessingResult {
|
||||
packet_version: MixPacketVersion::Sphinx(packet.version),
|
||||
processing_data,
|
||||
})
|
||||
}
|
||||
NymProcessedPacket::Outfox(packet) => {
|
||||
let next_address = *packet.next_address();
|
||||
let packet = packet.into_packet();
|
||||
if packet.is_final_hop() {
|
||||
process_final_hop(
|
||||
let processing_data = process_final_hop(
|
||||
DestinationAddressBytes::from_bytes(next_address),
|
||||
packet.recover_plaintext()?.to_vec(),
|
||||
packet_size,
|
||||
packet_type,
|
||||
)
|
||||
)?;
|
||||
Ok(MixProcessingResult {
|
||||
packet_version: MixPacketVersion::Outfox,
|
||||
processing_data,
|
||||
})
|
||||
} else {
|
||||
let mix_packet = MixPacket::new(
|
||||
let packet = MixPacket::new(
|
||||
NymNodeRoutingAddress::try_from_bytes(&next_address)?,
|
||||
NymPacket::Outfox(packet),
|
||||
PacketType::Outfox,
|
||||
);
|
||||
Ok(MixProcessingResult::ForwardHop(mix_packet, None))
|
||||
Ok(MixProcessingResult {
|
||||
packet_version: MixPacketVersion::Outfox,
|
||||
processing_data: MixProcessingResultData::ForwardHop {
|
||||
packet,
|
||||
delay: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -148,14 +202,16 @@ fn process_final_hop(
|
||||
payload: Vec<u8>,
|
||||
packet_size: PacketSize,
|
||||
packet_type: PacketType,
|
||||
) -> Result<MixProcessingResult, PacketProcessingError> {
|
||||
) -> Result<MixProcessingResultData, PacketProcessingError> {
|
||||
let (forward_ack, message) = split_into_ack_and_message(payload, packet_size, packet_type)?;
|
||||
|
||||
Ok(MixProcessingResult::FinalHop(ProcessedFinalHop {
|
||||
destination,
|
||||
forward_ack,
|
||||
message,
|
||||
}))
|
||||
Ok(MixProcessingResultData::FinalHop {
|
||||
final_hop_data: ProcessedFinalHop {
|
||||
destination,
|
||||
forward_ack,
|
||||
message,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn split_into_ack_and_message(
|
||||
@@ -211,11 +267,14 @@ fn process_forward_hop(
|
||||
forward_address: NodeAddressBytes,
|
||||
delay: SphinxDelay,
|
||||
packet_type: PacketType,
|
||||
) -> Result<MixProcessingResult, PacketProcessingError> {
|
||||
) -> Result<MixProcessingResultData, PacketProcessingError> {
|
||||
let next_hop_address = NymNodeRoutingAddress::try_from(forward_address)?;
|
||||
|
||||
let mix_packet = MixPacket::new(next_hop_address, packet, packet_type);
|
||||
Ok(MixProcessingResult::ForwardHop(mix_packet, Some(delay)))
|
||||
let packet = MixPacket::new(next_hop_address, packet, packet_type);
|
||||
Ok(MixProcessingResultData::ForwardHop {
|
||||
packet,
|
||||
delay: Some(delay),
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: what more could we realistically test here?
|
||||
|
||||
@@ -16,5 +16,5 @@ nym-sphinx-types = { path = "../types" }
|
||||
|
||||
[features]
|
||||
default = ["sphinx"]
|
||||
sphinx = ["nym-crypto/sphinx", "nym-sphinx-types/outfox"]
|
||||
outfox = ["nym-crypto/outfox", "nym-sphinx-types/outfox"]
|
||||
sphinx = ["nym-sphinx-types/outfox"]
|
||||
outfox = ["nym-sphinx-types/outfox"]
|
||||
|
||||
@@ -1,14 +1,22 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::{array::TryFromSliceError, fmt};
|
||||
use thiserror::Error;
|
||||
|
||||
#[cfg(feature = "outfox")]
|
||||
use nym_outfox::packet::{OutfoxPacket, OutfoxProcessedPacket};
|
||||
|
||||
#[cfg(feature = "sphinx")]
|
||||
use sphinx_packet::{SphinxPacket, SphinxPacketBuilder};
|
||||
|
||||
#[cfg(feature = "outfox")]
|
||||
pub use nym_outfox::{
|
||||
constants::MIN_PACKET_SIZE, constants::MIX_PARAMS_LEN, constants::OUTFOX_PACKET_OVERHEAD,
|
||||
error::OutfoxError,
|
||||
};
|
||||
// re-exporting types and constants available in sphinx
|
||||
#[cfg(feature = "outfox")]
|
||||
use nym_outfox::packet::{OutfoxPacket, OutfoxProcessedPacket};
|
||||
|
||||
#[cfg(feature = "sphinx")]
|
||||
pub use sphinx_packet::{
|
||||
constants::{
|
||||
@@ -21,12 +29,10 @@ pub use sphinx_packet::{
|
||||
payload::{Payload, PAYLOAD_OVERHEAD_SIZE},
|
||||
route::{Destination, DestinationAddressBytes, Node, NodeAddressBytes, SURBIdentifier},
|
||||
surb::{SURBMaterial, SURB},
|
||||
Error as SphinxError, ProcessedPacket,
|
||||
version::Version,
|
||||
version::UPDATED_LEGACY_VERSION,
|
||||
Error as SphinxError, ProcessedPacket, ProcessedPacketData,
|
||||
};
|
||||
#[cfg(feature = "sphinx")]
|
||||
use sphinx_packet::{SphinxPacket, SphinxPacketBuilder};
|
||||
use std::{array::TryFromSliceError, fmt};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum NymPacketError {
|
||||
@@ -85,8 +91,12 @@ impl NymPacket {
|
||||
destination: &Destination,
|
||||
delays: &[Delay],
|
||||
) -> Result<NymPacket, NymPacketError> {
|
||||
// FIXME:
|
||||
// for now explicitly use the legacy version until sufficient number of nodes
|
||||
// understand both variants
|
||||
Ok(NymPacket::Sphinx(
|
||||
SphinxPacketBuilder::new()
|
||||
.with_version(UPDATED_LEGACY_VERSION)
|
||||
.with_payload_size(size)
|
||||
.build_packet(message, route, destination, delays)?,
|
||||
))
|
||||
|
||||
@@ -30,6 +30,10 @@ pub struct Config {
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn base(&self) -> BaseClientConfig {
|
||||
self.base.clone()
|
||||
}
|
||||
|
||||
pub fn new<S: Into<String>>(id: S, version: S, provider_mix_address: S) -> Self {
|
||||
Config {
|
||||
base: BaseClientConfig::new(id, version),
|
||||
|
||||
@@ -76,6 +76,10 @@ where
|
||||
<S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
|
||||
<S::KeyStore as KeyStore>::StorageError: Send + Sync,
|
||||
{
|
||||
pub fn config(&self) -> Config {
|
||||
self.config.clone()
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
config: Config,
|
||||
storage: S,
|
||||
@@ -233,7 +237,7 @@ where
|
||||
};
|
||||
|
||||
let mut base_builder =
|
||||
BaseClientBuilder::new(&self.config.base, self.storage, dkg_query_client)
|
||||
BaseClientBuilder::new(self.config.base(), self.storage, dkg_query_client)
|
||||
.with_gateway_setup(self.setup_method)
|
||||
.with_user_agent(self.user_agent);
|
||||
|
||||
@@ -241,7 +245,7 @@ where
|
||||
base_builder = base_builder.with_stored_topology(custom_mixnet)?;
|
||||
}
|
||||
|
||||
let packet_type = self.config.base.debug.traffic.packet_type;
|
||||
let packet_type = self.config.base().debug.traffic.packet_type;
|
||||
let mut started_client = base_builder.start_base().await?;
|
||||
let self_address = started_client.address;
|
||||
let client_input = started_client.client_input.register_producer();
|
||||
@@ -252,7 +256,7 @@ where
|
||||
|
||||
Self::start_socks5_listener(
|
||||
&self.config.socks5,
|
||||
self.config.base.debug,
|
||||
self.config.base().debug,
|
||||
client_input,
|
||||
client_output,
|
||||
client_state,
|
||||
|
||||
@@ -25,19 +25,28 @@ pub type ClientStatsReceiver = tokio::sync::mpsc::UnboundedReceiver<ClientStatsE
|
||||
#[derive(Clone)]
|
||||
pub struct ClientStatsSender {
|
||||
stats_tx: Option<UnboundedSender<ClientStatsEvents>>,
|
||||
task_client: TaskClient,
|
||||
}
|
||||
|
||||
impl ClientStatsSender {
|
||||
/// Create a new statistics Sender
|
||||
pub fn new(stats_tx: Option<UnboundedSender<ClientStatsEvents>>) -> Self {
|
||||
ClientStatsSender { stats_tx }
|
||||
pub fn new(
|
||||
stats_tx: Option<UnboundedSender<ClientStatsEvents>>,
|
||||
task_client: TaskClient,
|
||||
) -> Self {
|
||||
ClientStatsSender {
|
||||
stats_tx,
|
||||
task_client,
|
||||
}
|
||||
}
|
||||
|
||||
/// Report a statistics event using the sender.
|
||||
pub fn report(&self, event: ClientStatsEvents) {
|
||||
if let Some(tx) = &self.stats_tx {
|
||||
if let Err(err) = tx.send(event) {
|
||||
log::error!("Failed to send stats event: {:?}", err);
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
log::error!("Failed to send stats event: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,7 +221,7 @@ impl TaskManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_graceful_shutdown(&mut self) {
|
||||
pub async fn wait_for_graceful_shutdown(&mut self) {
|
||||
if let Some(notify_rx) = self.notify_rx.take() {
|
||||
drop(notify_rx);
|
||||
}
|
||||
@@ -299,6 +299,8 @@ impl Clone for TaskClient {
|
||||
None
|
||||
};
|
||||
|
||||
log::debug!("Cloned task client: {name:?}");
|
||||
|
||||
TaskClient {
|
||||
name,
|
||||
shutdown: AtomicBool::new(self.shutdown.load(Ordering::Relaxed)),
|
||||
@@ -315,7 +317,7 @@ impl TaskClient {
|
||||
const MAX_NAME_LENGTH: usize = 128;
|
||||
const OVERFLOW_NAME: &'static str = "reached maximum TaskClient children name depth";
|
||||
|
||||
const SHUTDOWN_TIMEOUT_WAITING_FOR_SIGNAL_ON_EXIT: Duration = Duration::from_secs(5);
|
||||
const SHUTDOWN_TIMEOUT_WAITING_FOR_SIGNAL_ON_EXIT: Duration = Duration::from_secs(10);
|
||||
|
||||
fn new(
|
||||
notify: watch::Receiver<()>,
|
||||
@@ -344,6 +346,7 @@ impl TaskClient {
|
||||
format!("unknown-{suffix}")
|
||||
};
|
||||
|
||||
log::debug!("Forked task client: {child_name}");
|
||||
child.name = Some(child_name);
|
||||
child
|
||||
}
|
||||
@@ -377,6 +380,7 @@ impl TaskClient {
|
||||
} else {
|
||||
format!("unknown-{suffix}")
|
||||
};
|
||||
log::debug!("Renamed task client: {name}");
|
||||
self.named(name)
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ wasm-bindgen = { workspace = true, optional = true }
|
||||
|
||||
## internal
|
||||
nym-config = { path = "../config" }
|
||||
nym-crypto = { path = "../crypto", features = ["sphinx", "outfox"] }
|
||||
nym-crypto = { path = "../crypto" }
|
||||
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
|
||||
nym-sphinx-addressing = { path = "../nymsphinx/addressing" }
|
||||
nym-sphinx-types = { path = "../nymsphinx/types", features = [
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::borrow::Borrow;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::Display;
|
||||
use std::net::IpAddr;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
pub use crate::node::{EntryDetails, RoutingNode, SupportedRoles};
|
||||
pub use error::NymTopologyError;
|
||||
@@ -293,7 +293,7 @@ impl NymTopology {
|
||||
let has_exit_gateways = !self.rewarded_set.exit_gateways.is_empty();
|
||||
let has_entry_gateways = !self.rewarded_set.entry_gateways.is_empty();
|
||||
|
||||
debug!(
|
||||
trace!(
|
||||
has_layer1 = %has_layer1,
|
||||
has_layer2 = %has_layer2,
|
||||
has_layer3 = %has_layer3,
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user