Compare commits

...

98 Commits

Author SHA1 Message Date
Jon Häggblad c44ddf3504 Merge remote-tracking branch 'origin/release/v1.1.1' into release/v1.1.2 2022-11-28 12:57:54 +01:00
Jon Häggblad 018bf8c241 client-core: less frequent status logging (#1806) 2022-11-28 12:14:27 +01:00
Jon Häggblad 65a69b2cba socks5: if any task panics, signal all other tasks to shutdown (#1805)
* socks5: signal shutdown on error

* Mark as success

* Tidy

* Reduce wait to 5 sec

* Replace unwrap with expect

* Two more unwraps

* Update changelog
2022-11-28 10:49:00 +01:00
Jon Häggblad e2ba85c9bf websocket-requests: fix length check before deserialize (#1799) 2022-11-25 10:54:12 +01:00
Jon Häggblad cb7e57b5f8 changelog: add missing entry for fixing message decrypt in gateway-client 2022-11-25 10:54:12 +01:00
Jon Häggblad 17f89aecd5 Make connection_id optional in ClientRequest::Send (#1798) 2022-11-25 10:54:12 +01:00
Jędrzej Stuczyński 0be6fe5079 Feature/use expect instead of panicking (#1797)
* Implementation of 'Debug' on 'RealMessage'

* expect with failed channel name instead of throwing empty panics

* Introduced Debug trait constraint in ProxyRunner

* Derive Debug for socks5_requests::Message
2022-11-25 10:54:12 +01:00
Jon Häggblad 358687f43a Fix decrypting stored received msg (#1786)
* Fix decrypting stored received msg

* rustfmt

* Moving binary message recovery to separate function

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
2022-11-25 10:54:12 +01:00
Jon Häggblad fb31dbee16 client-core: add warning when delay multiplier is larger than 1 2022-11-25 10:54:12 +01:00
Jon Häggblad bb98d796a8 Update wallet and connect lock files (#1793) 2022-11-25 10:54:12 +01:00
Jon Häggblad 30dc929e40 real_traffic_stream: reduce frequency of status print (#1794) 2022-11-25 10:54:12 +01:00
Mark Sinclair f1378c3488 Update contracts-build.yml 2022-11-25 10:54:12 +01:00
cgi-bin/ 39ca9c22af Possibilty to change gateway ws listener (#1779)
* add: set gatewayListener

* Update types.ts

* Update worker.ts
2022-11-25 10:54:12 +01:00
Fran Arbanas 4ff741ed9a Add step to release GH actions (#1792)
* feat: add a release step to nym contracts GH action

* feat: add shrinking the size of wasm
2022-11-25 10:54:12 +01:00
Jon Häggblad c9779df2a4 rust: bump required version to 1.65 in some crates that need it 2022-11-25 10:54:12 +01:00
Jon Häggblad c6d624a3b3 Network-requester: throttle inbound connections (#1789)
* Return and handle ClientRequest::LaneQueueLenghts

* Pass lane queue lengths to inbound future

* Remove unused self reference

* Request lane queue lengths periodically for all open connections

* Add timeouts

* Rename to ConnectionCommandSender and Receiver

* Rename to client_connection_tx/rx

* Fix wasm build

* Replace bool with enum
2022-11-25 10:54:12 +01:00
Bogdan-Ștefan Neacşu f4fb0d6d6c Remove required deposit from signers (#1791) 2022-11-23 12:10:24 +02:00
Jon Häggblad 236594f0c6 Fix clippy::derive-partial-eq-without-eq (#1790) 2022-11-23 10:09:50 +01:00
Jon Häggblad e873845178 Fix some client send unwraps encountered during use (#1787) 2022-11-23 10:09:22 +01:00
Raphaël Walther 2e2f2bb702 atty is unmaintained 2022-11-22 15:28:08 +01:00
Bogdan-Ștefan Neacşu 1cec2ddff0 Remove debugging transaction (#1788) 2022-11-22 14:33:35 +02:00
Bogdan-Ștefan Neacşu 2db1bc8efa Feature/dkg publish vk (#1747)
* Save to disk coconut keypair

* Check verification keys of the other signers

* Post verification key to chain

* Add multisig propose/vote for vks

* Execute the proposal

* Parse announce address argument

* Gateway uses chain data

* Network requester uses chain data

* Native&socks5 clients use chain data

* Credential client signature uses chain data

* Remove redundant api endpoints

* Undo debugging logging

* Fix some tests

* Fix clippy

* Fix wasm client and contract test

* More contract clippy

* Update CHANGELOG

* Use a bigger expiry period then the testing one
2022-11-22 11:16:02 +02:00
Jon Häggblad f1deebc0f1 ci: check formatting first, and add all targets to coconut clippy step 2022-11-22 09:29:32 +01:00
Jon Häggblad 9063a86d26 client: log and handle error when cant load reply key storage (#1785)
* client: log and handle error when cant load reply key storage

* clippy
2022-11-22 01:15:50 +01:00
Jon Häggblad d82fd620ad Update blake2 and ed25519 deps (#1762) 2022-11-22 01:09:12 +01:00
Jon Häggblad fa95d15eac socks5-client: throttle connection inbound from application until data is sent (#1783)
* socks5: throttle send

* client-connections: add additional methods

* WIP

* Update

* Input message sender bounded

* WIP

* Remove the delay that is no longer needed

* rustfmt

* clippy

* Fix wasm build

* clippy

* Try to use MixProxySender/Reader type alias

* Extract out wait function

* Wait on every msg

* changelog: add note

* rustfmt
2022-11-21 23:52:30 +01:00
Bogdan-Ștefan Neacşu b71a8708db Feature/dkg dealing (#1708)
* Reintroduce epoch states

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>

* Use admin address for sensible txs

* Validator-api watch contract and handle events

* Handle dealing exchange

* Dealing exchange

* Recover raw verification keys for 5 dkgs

* Test coconut with dkg keys

* Split dealing storage

* Finish dkg task when it achieved its purpose

* Temporary fix for clippy

* Fix clippy

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
2022-11-21 18:00:47 +02:00
Bogdan-Ștefan Neacşu fea6f44a57 Feature/dkg (#1678)
* Port code without epoch and blacklisting

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>

* Add dkg contract to validator client

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>

* Introduce publisher

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>

* Fix mock testing client

* Apply fmt to contract

* Get data from attributes

* Minor fixes

* Fix wasm client

* Add pem files for dkg keys

* Save/load dkg keys in/from pem files

* Get dealer old or fresh dealer index

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
2022-11-21 17:28:26 +02:00
Raphaël Walther 23e97e9643 Set schedule for nightly build on release 2022-11-21 16:23:36 +01:00
Raphaël Walther 3f5bfcc696 Cleaned 2022-11-21 16:20:44 +01:00
Raphaël Walther f568673fbc Debugging workflow 2022-11-21 16:13:46 +01:00
Jon Häggblad f6576939d9 Merge remote-tracking branch 'origin/release/v1.1.1' into develop 2022-11-21 16:09:29 +01:00
Raphaël Walther ce17196d48 Debugging workflow 2022-11-21 16:04:12 +01:00
Raphaël Walther 6dde8ecd0a Debugging 2022-11-21 15:49:55 +01:00
Bogdan-Ștefan Neacşu c4ee964557 Setup with 1 epoch and full test that skips key update (#1647)
* Setup with 1 epoch and full test that skips key update

* Remove a bunch of epoch code

* Remove unnecessary map from one element vector

* Remove tau, epoch and lambda_t

* Removed lambda_t completely

Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
2022-11-21 16:34:50 +02:00
Raphaël Walther 9337821712 Removed extra spaces in sed expression 2022-11-21 15:07:50 +01:00
Raphaël Walther 279ba7034c Typo 2022-11-21 14:53:30 +01:00
Jon Häggblad 1859ca0a30 client-core: expose lane queue length state to other components (#1777)
* client-core: publish lane queue lengths

* client-connections: rename to LaneQueueLenghts plural

* Fix clippy

* Fix wasm build

* rustfmt

* clippy
2022-11-21 11:21:28 +01:00
Mark Sinclair e30fd270a1 Publish @nymproject/sdk v1.1.4 2022-11-18 15:32:57 +00:00
Mark Sinclair e3cda93919 WASM client: update crate version so that the client works with mainnet 2022-11-18 13:06:03 +00:00
benedetta davico 87fb4daeda Release/v1.1.1 nym wallet (#1775)
* fix undelegating with vesting tokens

* update version number

* update tauri conf version

* fix(wallet): explorer links

* refactor(explorer): rename mixnodeidentitykey to mixid

* fix(wallet): broken explorer links

Co-authored-by: fmtabbara <fmtabbara@hotmail.co.uk>
Co-authored-by: pierre <dommerc.pierre@gmail.com>
2022-11-18 12:38:16 +00:00
Dave Hrycyszyn 9f56796bf6 Removing unused nym-chitchat experiment 2022-11-18 12:28:21 +00:00
Jon Häggblad 09b51226c2 client: add LaneQueueLength to ServerResponse (#1772)
* client: add LaneQueueLength to ServerResponse

* fix test compilation

* changlog: add note
2022-11-18 10:24:28 +01:00
Jędrzej Stuczyński 6946151b25 Made vesting_contract_address argument peroperly optional (#1769)
* Made vesting_contract_address argument peroperly optional

* Updated changelog
2022-11-17 17:13:58 +00:00
Jon Häggblad a4aee465fa Merge pull request #1758 from nymtech/jon/fix/deserialize-length-check
Fix deserialize length check
2022-11-17 16:09:54 +01:00
Jon Häggblad 0d71ac5e75 websocket-requests: fix lints in file 2022-11-17 15:32:12 +01:00
Jon Häggblad ce14e40968 websocket-requests: fix deserialize length check 2022-11-17 15:32:12 +01:00
Pierre Dommerc d108edb424 fix(nym-cli): typo in sign command json output (#1768) 2022-11-17 15:15:45 +01:00
Tommy Verrall 18f5623b05 Merge pull request #1759 from nymtech/feature/mixnet-contract-migratemsg
Added migration code for updating vesting contract address
2022-11-17 12:48:57 +01:00
Mark Sinclair 8e92801929 Reduce logging in explorer api 2022-11-16 16:45:59 +00:00
Gala 3fc1bc4e7c Merge pull request #1764 from nymtech/no-display-maintenance
Update banner
2022-11-16 17:25:40 +01:00
Gala 72de726762 another text update 2022-11-16 16:08:47 +00:00
Gala cb9dfa8188 update banner text 2022-11-16 16:03:27 +00:00
Mark Sinclair f102ed53a7 Merge remote-tracking branch 'origin/release/v1.1.0' into develop
# Conflicts:
#	.github/workflows/network-explorer-api.yml
2022-11-16 15:54:08 +00:00
Raphaël Walther a803c7f25e Set output 2022-11-16 16:06:03 +01:00
Gala f20b620cbb turn maintenance banner display false 2022-11-16 15:04:24 +00:00
Raphaël Walther d771d15959 Corrected reference 2022-11-16 15:50:53 +01:00
Raphaël Walther 49e6f387ff Hardcoded branch name 2022-11-16 15:12:50 +01:00
Raphaël Walther 9568c0ba1d Upgraded checkout action 2022-11-16 14:50:46 +01:00
Gala 0b7b705e56 Merge pull request #1602 from nymtech/327-nym-connect-colours
Nym Connect: Various ui updates
2022-11-16 14:30:41 +01:00
Raphaël Walther 5daea675e7 Sent errors through the pipe 2022-11-16 14:28:45 +01:00
Gala ebd18586a8 Merge branch '327-nym-connect-colours' of github.com:nymtech/nym into 327-nym-connect-colours 2022-11-16 13:14:40 +00:00
Gala 585610295f Merge branch 'develop' into 327-nym-connect-colours 2022-11-16 13:14:17 +00:00
Raphaël Walther 91653d13c6 Added echo 2022-11-16 14:03:34 +01:00
Mark Sinclair b6a765481a GitHub Actions: add workflow to build network explorer api 2022-11-16 11:43:53 +00:00
Raphaël Walther 39798de1e8 Fixed dependency 2022-11-16 11:40:28 +01:00
Raphaël Walther c650587e4c Fixed typo 2022-11-16 11:35:06 +01:00
Raphaël Walther 660d5d8b05 Added missing property 2022-11-16 11:18:12 +01:00
Raphaël Walther 79f9db91ae Fixed typo 2022-11-16 11:16:07 +01:00
Raphaël Walther 43822f27a8 Switched to job outputs 2022-11-16 11:13:36 +01:00
Raphaël Walther e500d154dd Fixed issue with environment variable 2022-11-16 10:37:28 +01:00
Raphaël Walther 3ceb00fae1 Added matrix 2022-11-16 10:24:49 +01:00
Raphaël Walther d019343fd9 Added nightly build on latest release 2022-11-16 10:10:59 +01:00
Raphaël Walther f55a55b784 Cleaned workflows 2022-11-16 10:10:14 +01:00
Raphaël Walther 0ea8da79c8 Enabled yanked crates warning 2022-11-16 09:24:58 +01:00
Jon Häggblad 0e12251773 Update some deps suggested by cargo deny (#1761)
* Update yanked cpufeatures dependency

* Update yanked textwrap version

* Updated yanked crossbeam-channel version

* Update client-core dep to 1.1.0
2022-11-16 09:10:40 +01:00
Jon Häggblad f886326014 wallet_storage: fix clippy (#1757) 2022-11-16 07:43:49 +01:00
Jess c73c2beb33 Update CHANGELOG.md 2022-11-15 19:04:38 +00:00
Raphaël Walther b6b40163c6 Added security audit for whole tree 2022-11-15 17:43:51 +01:00
Jędrzej Stuczyński f46c0142e7 Updated changelog 2022-11-15 16:28:15 +00:00
Jędrzej Stuczyński 8774b22d84 Added migration code for updating vesting contract address 2022-11-15 16:06:03 +00:00
Mark Sinclair c74a880838 Update README for SDK 2022-11-14 15:58:00 +00:00
Mark Sinclair ccbb254b1a Add wildcard glob for all files to SDK npm package.json 2022-11-14 15:36:30 +00:00
Dave Hrycyszyn 2bd0cfc870 Moving towards a publishable npm sdk package 2022-11-14 15:08:58 +00:00
Gala aeaf31ed59 Merge pull request #1756 from nymtech/feature-457-banner
Feature 457 banner
2022-11-14 14:04:07 +01:00
Gala 05820cfca7 mantenance banner text update 2022-11-14 12:58:49 +00:00
Gala 0469d5b602 adding a comment and different alignment 2022-11-11 11:53:49 +01:00
Jon Häggblad d912844543 Client: multiplex connection data streams (#1720)
* WIP: QA network details

* Initial implementation to multiplex socks5-client sends

* Introduce TransmissionLane enum

* WIP

* WIP: client requests connection id

* WIP

* mulitplex somewhat done

* Remove closed lanes

* WIP: connection handling over ws

* Remove unused published active connections shared data

* Start on status timer

* Max number of connections, and prune

* Some tidy

* Remove commented out code and tweak log

* Tidy

* Tweak log output

* Rename to TransmissionBuffer

* Use number of msg sent instead of time to rank age of lanes

* Create client-connections crate

* Remove waker call that probably are not needed

* Extract out some types from real traffic stream module

* Revert to develop qa.env

* Tweak comments, tidy for getting ready to merge

* Update changelog

* wasm client compile fixes

* rustfmt
2022-11-11 11:04:49 +01:00
Gala 64757ebc83 adding missing spaces 2022-11-11 07:16:13 +01:00
Gala ba55affe0a reducing ne banner height 2022-11-10 20:43:09 +01:00
Gala e9f826e705 adding a mintenance banner 2022-11-10 20:11:59 +01:00
Mark Sinclair b68fb4f5dd Merge branch 'release/v1.1.0' into develop 2022-11-10 15:06:36 +00:00
Gala 6eb482fc4b CR: use a parameter instead of cardcoded value 2022-09-22 16:54:07 +02:00
Gala f9be735d4f various ui updates 2022-09-22 16:54:06 +02:00
Gala 0fd178a304 Merge branch 'develop' into 327-nym-connect-colours 2022-09-22 10:37:06 +02:00
Gala 16ccbd9e48 CR: use a parameter instead of cardcoded value 2022-09-20 13:30:09 +02:00
Gala bd0ea45f35 Merge branch 'develop' into 327-nym-connect-colours 2022-09-20 13:17:15 +02:00
Gala 28cc772d7b various ui updates 2022-09-08 11:43:59 +02:00
223 changed files with 6684 additions and 4883 deletions
+4 -1
View File
@@ -16,7 +16,10 @@ jobs:
- name: Install cargo deny
run: cargo install --locked cargo-deny
- name: Run cargo deny
run: cargo deny check advisories --hide-inclusion-graph &> .github/workflows/support-files/notifications/deny.message
run: |
find . -name Cargo.toml -exec cargo deny --manifest-path {} check \
advisories -A advisory-not-detected --hide-inclusion-graph \; &> \
>(uniq &> .github/workflows/support-files/notifications/deny.message )
- uses: actions/upload-artifact@v3
with:
name: report
+9 -7
View File
@@ -29,6 +29,12 @@ jobs:
override: true
components: rustfmt, clippy
- name: Check formatting
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- name: Build all binaries
uses: actions-rs/cargo@v1
with:
@@ -48,12 +54,6 @@ jobs:
command: test
args: --workspace --all-features -- --ignored
- name: Check formatting
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- uses: actions-rs/clippy-check@v1
name: Clippy checks
with:
@@ -66,6 +66,8 @@ jobs:
command: clippy
args: --workspace -- -D warnings
# COCONUT stuff
- name: Build all binaries with coconut enabled
uses: actions-rs/cargo@v1
with:
@@ -82,4 +84,4 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
args: --features=coconut -- -D warnings
args: --all-targets --features=coconut -- -D warnings
-72
View File
@@ -1,72 +0,0 @@
name: Continuous integration on dispatch
on: workflow_dispatch
jobs:
build:
runs-on: [ self-hosted, custom-linux ]
# Enable sccache via environment variable
env:
RUSTC_WRAPPER: /home/ubuntu/.cargo/bin/sccache
steps:
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev squashfs-tools
- name: Check out repository code
uses: actions/checkout@v2
- name: Install rust toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
components: rustfmt, clippy
- name: Build all binaries
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace
- name: Run all tests
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --all-features
- name: Check formatting
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- uses: actions-rs/clippy-check@v1
name: Clippy checks
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features
- name: Run clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace -- -D warnings
- name: Build all binaries with coconut enabled
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace --features=coconut
- name: Run all tests with coconut enabled
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --features=coconut
- name: Run clippy with coconut enabled
uses: actions-rs/cargo@v1
with:
command: clippy
args: --features=coconut -- -D warnings
+18 -5
View File
@@ -1,16 +1,21 @@
name: Build release of Nym smart contracts
on:
workflow_dispatch:
defaults:
run:
working-directory: contracts
release:
types: [created]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Check the release tag starts with `nym-contracts-`
if: startsWith(github.ref, 'refs/tags/nym-contracts-') == false && github.event_name != 'workflow_dispatch'
uses: actions/github-script@v3
with:
script: |
core.setFailed('Release tag did not start with nym-contracts-...')
- name: Install Rust stable
uses: actions-rs/toolchain@v1
@@ -21,7 +26,7 @@ jobs:
components: rustfmt, clippy
- name: Build release contracts
run: RUSTFLAGS='-C link-arg=-s' cargo build --release --target wasm32-unknown-unknown
run: make wasm
- name: Upload Mixnet Contract Artifact
uses: actions/upload-artifact@v3
@@ -36,3 +41,11 @@ jobs:
name: vesting_contract.wasm
path: contracts/target/wasm32-unknown-unknown/release/vesting_contract.wasm
retention-days: 5
- name: Upload to release based on tag name
uses: softprops/action-gh-release@v1
if: github.event_name == 'release'
with:
files: |
contracts/target/wasm32-unknown-unknown/release/vesting_contract.wasm
contracts/target/wasm32-unknown-unknown/release/mixnet_contract.wasm
@@ -1,32 +1,49 @@
name: Nightly builds on dispatch
name: Nightly builds on latest release
on: workflow_dispatch
on:
schedule:
- cron: '14 2 * * *'
jobs:
matrix_prep:
runs-on: ubuntu-latest
outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
steps:
# creates the matrix strategy from nightly_build_matrix_includes.json
- uses: actions/checkout@v2
# creates the matrix strategy from nightly_build_release_matrix.json
- uses: actions/checkout@v3
- id: set-matrix
uses: JoshuaTheMiller/conditional-build-matrix@main
with:
inputFile: '.github/workflows/nightly_build_matrix_on_dispatch.json'
inputFile: '.github/workflows/nightly_build_release_matrix.json'
filter: '[?runOnEvent==`${{ github.event_name }}` || runOnEvent==`always`]'
build:
get_release:
runs-on: ubuntu-latest
needs: matrix_prep
outputs:
output1: ${{ steps.step2.outputs.latest_release }}
steps:
- name: Check out repository code
uses: actions/checkout@v3
- name: Fetch all branches
run: git fetch --all
- name: Set output variable to latest release branch
id: step2
run: echo "latest_release=$(git branch -r | grep -E 'release/v[0-9]+\.[0-9]+\.[0-9]+' | tail -n 1 | sed 's/ origin\///')" >> $GITHUB_OUTPUT
build:
needs: [get_release,matrix_prep]
strategy:
matrix: ${{fromJson(needs.matrix_prep.outputs.matrix)}}
runs-on: ${{ matrix.os }}
continue-on-error: ${{ matrix.rust == 'nightly' || matrix.rust == 'beta' || matrix.rust == 'stable' }}
steps:
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev squashfs-tools
run: sudo apt-get update && sudo apt-get install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools
if: matrix.os == 'ubuntu-latest'
- name: Check out repository code
uses: actions/checkout@v2
- name: Check out latest release branch
uses: actions/checkout@v3
with:
ref: ${{needs.get_release.outputs.output1}}
- name: Install rust toolchain
uses: actions-rs/toolchain@v1
@@ -42,6 +59,12 @@ jobs:
command: build
args: --workspace
- name: Reclaim some disk space (because Windows is being annoying)
uses: actions-rs/cargo@v1
if: ${{ matrix.os == 'windows-latest' }}
with:
command: clean
- name: Run all tests
uses: actions-rs/cargo@v1
with:
@@ -99,6 +122,12 @@ jobs:
command: build
args: --workspace --features=coconut
- name: Reclaim some disk space (because Windows is being annoying)
uses: actions-rs/cargo@v1
if: ${{ matrix.os == 'windows-latest' }}
with:
command: clean
- name: Run all tests with coconut enabled
uses: actions-rs/cargo@v1
with:
@@ -151,7 +180,7 @@ jobs:
- name: Collect jobs status
uses: technote-space/workflow-conclusion-action@v2
- name: Check out repository code
uses: actions/checkout@v2
uses: actions/checkout@v3
- name: Keybase - Node Install
if: env.WORKFLOW_CONCLUSION == 'failure'
run: npm install
@@ -160,14 +189,14 @@ jobs:
if: env.WORKFLOW_CONCLUSION == 'failure'
env:
NYM_NOTIFICATION_KIND: nightly
NYM_PROJECT_NAME: "Nym nightly build"
NYM_PROJECT_NAME: "Nym nightly build on latest release"
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
GIT_BRANCH: "${GITHUB_REF##*/}"
KEYBASE_NYMBOT_USERNAME: "${{ secrets.KEYBASE_NYMBOT_USERNAME }}"
KEYBASE_NYMBOT_PAPERKEY: "${{ secrets.KEYBASE_NYMBOT_PAPERKEY }}"
KEYBASE_NYMBOT_TEAM: "${{ secrets.KEYBASE_NYMTECH_TEAM }}"
KEYBASE_NYM_CHANNEL: "${{ secrets.KEYBASE_CHANNEL_DEV_CORE_ID }}"
KEYBASE_NYMBOT_TEAM: "${{ secrets.KEYBASE_NYMBOT_TEAM }}"
KEYBASE_NYM_CHANNEL: "ci-nightly-release"
IS_SUCCESS: "${{ env.WORKFLOW_CONCLUSION == 'success' }}"
uses: docker://keybaseio/client:stable-node
with:
+25
View File
@@ -4,7 +4,32 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## Unreleased
### Added
- binaries: add `-c` shortform for `--config-env-file`
- websocket-requests: add server response signalling current packet queue length in the client
- contracts: DKG contract that handles coconut key generation ([#1678][#1708][#1747])
- validator-api: generate coconut keys interactively, using DKG and multisig contracts ([#1678][#1708][#1747])
### Changed
- clients: add concept of transmission lanes to better handle multiple data streams ([#1720])
- clients,validator-api: take coconut signers from the chain instead of specifying them via CLI ([#1747])
- multisig contract: add DKG contract to the list of addresses that can create proposals ([#1747])
- socks5-client: wait closing inbound connection until data is sent, and throttle incoming data in general ([#1783])
### Fixed
- gateway-client: fix decrypting stored messages on reconnect ([#1786])
- socks5-client: fix shutting down all tasks if anyone of them panics or errors out ([#1805])
[#1678]: https://github.com/nymtech/nym/pull/1678
[#1708]: https://github.com/nymtech/nym/pull/1708
[#1720]: https://github.com/nymtech/nym/pull/1720
[#1747]: https://github.com/nymtech/nym/pull/1747
[#1783]: https://github.com/nymtech/nym/pull/1783
[#1786]: https://github.com/nymtech/nym/pull/1786
[#1805]: https://github.com/nymtech/nym/pull/1805
## [v1.1.0](https://github.com/nymtech/nym/tree/v1.1.0) (2022-11-09)
Generated
+88 -20
View File
@@ -576,10 +576,19 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "client-connections"
version = "0.1.0"
dependencies = [
"futures",
"log",
]
[[package]]
name = "client-core"
version = "1.1.0"
dependencies = [
"client-connections",
"config",
"crypto",
"dirs",
@@ -627,6 +636,18 @@ dependencies = [
"serde",
]
[[package]]
name = "coconut-dkg-common"
version = "0.1.0"
dependencies = [
"contracts-common",
"cosmwasm-std",
"cw-utils",
"multisig-contract-common",
"schemars",
"serde",
]
[[package]]
name = "coconut-interface"
version = "0.1.0"
@@ -736,7 +757,9 @@ checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
name = "contracts-common"
version = "0.1.0"
dependencies = [
"bs58",
"cosmwasm-std",
"dkg",
"schemars",
"serde",
"serde_json",
@@ -850,9 +873,9 @@ dependencies = [
[[package]]
name = "cpufeatures"
version = "0.2.2"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"
checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320"
dependencies = [
"libc",
]
@@ -942,7 +965,6 @@ dependencies = [
"crypto",
"rand 0.7.3",
"thiserror",
"url",
"validator-api-requests",
"validator-client",
]
@@ -1221,9 +1243,9 @@ dependencies = [
[[package]]
name = "cw-utils"
version = "0.13.2"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "babd2c090f39d07ce5bf2556962305e795daa048ce20a93709eb591476e4a29e"
checksum = "9dbaecb78c8e8abfd6b4258c7f4fbeb5c49a5e45ee4d910d3240ee8e1d714e1b"
dependencies = [
"cosmwasm-std",
"schemars",
@@ -1232,10 +1254,22 @@ dependencies = [
]
[[package]]
name = "cw3"
version = "0.13.2"
name = "cw2"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f871854338a54c7bb094d16ffe17212b93b146d9659dbce4c9402a9b77e240ef"
checksum = "04cf4639517490dd36b333bbd6c4fbd92e325fd0acf4683b41753bc5eb63bfc1"
dependencies = [
"cosmwasm-std",
"cw-storage-plus",
"schemars",
"serde",
]
[[package]]
name = "cw3"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe19462a7f644ba60c19d3443cb90d00c50d9b6b3b0a3a7fca93df8261af979b"
dependencies = [
"cosmwasm-std",
"cw-utils",
@@ -1244,10 +1278,26 @@ dependencies = [
]
[[package]]
name = "cw4"
version = "0.13.2"
name = "cw3-fixed-multisig"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4476d6a7c13c46ed9ff260bd0e1cf648dc37b13f483822e1ff2a431f0f6ee52"
checksum = "df54aa54c13f405ec4ab36b6217538bc957d439eee58f89312db05a79caf6706"
dependencies = [
"cosmwasm-std",
"cw-storage-plus",
"cw-utils",
"cw2",
"cw3",
"schemars",
"serde",
"thiserror",
]
[[package]]
name = "cw4"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0acc3549d5ce11c6901b3a676f2e2628684722197054d97cd0101ea174ed5cbd"
dependencies = [
"cosmwasm-std",
"cw-storage-plus",
@@ -1402,6 +1452,7 @@ dependencies = [
"ff 0.11.0",
"group 0.11.0",
"lazy_static",
"pemstore",
"rand 0.8.5",
"rand_chacha 0.3.1",
"rand_core 0.6.3",
@@ -1453,9 +1504,9 @@ dependencies = [
[[package]]
name = "ed25519"
version = "1.4.1"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d5c4b5e5959dc2c2b89918d8e2cc40fcdd623cef026ed09d2f0ee05199dc8e4"
checksum = "1e9c280362032ea4203659fc489832d0204ef09f247a0506f170dafcac08c369"
dependencies = [
"serde",
"signature",
@@ -2930,6 +2981,7 @@ dependencies = [
"cosmwasm-std",
"cw-utils",
"cw3",
"cw3-fixed-multisig",
"cw4",
"schemars",
"serde",
@@ -3125,6 +3177,7 @@ name = "nym-client"
version = "1.1.0"
dependencies = [
"clap 3.2.8",
"client-connections",
"client-core",
"coconut-interface",
"completions",
@@ -3146,6 +3199,7 @@ dependencies = [
"serde",
"serde_json",
"sled",
"tap",
"task",
"thiserror",
"tokio",
@@ -3253,6 +3307,7 @@ version = "1.1.0"
dependencies = [
"async-trait",
"clap 3.2.8",
"client-connections",
"completions",
"dirs",
"futures",
@@ -3299,6 +3354,7 @@ name = "nym-socks5-client"
version = "1.1.0"
dependencies = [
"clap 3.2.8",
"client-connections",
"client-core",
"coconut-interface",
"completions",
@@ -3323,6 +3379,7 @@ dependencies = [
"serde",
"snafu 0.6.10",
"socks5-requests",
"tap",
"task",
"thiserror",
"tokio",
@@ -3365,9 +3422,11 @@ version = "1.1.0"
dependencies = [
"anyhow",
"async-trait",
"bs58",
"cfg-if 1.0.0",
"clap 3.2.8",
"coconut-bandwidth-contract-common",
"coconut-dkg-common",
"coconut-interface",
"config",
"console-subscriber",
@@ -3379,6 +3438,7 @@ dependencies = [
"cw-utils",
"cw3",
"dirs",
"dkg",
"dotenv",
"futures",
"gateway-client",
@@ -3392,6 +3452,7 @@ dependencies = [
"nymcoconut",
"nymsphinx",
"okapi",
"pemstore",
"pin-project",
"pretty_env_logger",
"rand 0.7.3",
@@ -3444,16 +3505,19 @@ name = "nymcoconut"
version = "0.5.0"
dependencies = [
"bincode",
"bls12_381 0.5.0",
"bls12_381 0.6.0",
"bs58",
"criterion",
"digest 0.9.0",
"dkg",
"doc-comment",
"ff 0.10.1",
"ff 0.11.0",
"getrandom 0.2.6",
"group 0.10.0",
"group 0.11.0",
"itertools",
"pemstore",
"rand 0.8.5",
"rand_chacha 0.3.1",
"serde",
"serde_derive",
"sha2 0.9.9",
@@ -4123,6 +4187,7 @@ name = "proxy-helpers"
version = "0.1.0"
dependencies = [
"bytes",
"client-connections",
"futures",
"log",
"ordered-buffer",
@@ -5628,7 +5693,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
name = "task"
version = "0.1.0"
dependencies = [
"futures",
"log",
"thiserror",
"tokio",
]
@@ -5768,18 +5835,18 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]]
name = "thiserror"
version = "1.0.35"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c53f98874615aea268107765aa1ed8f6116782501d18e53d08b471733bea6c85"
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.35"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8b463991b4eab2d801e724172285ec4195c650e8ec79b149e6c2a8e6dd3f783"
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
dependencies = [
"proc-macro2",
"quote",
@@ -6407,6 +6474,7 @@ dependencies = [
"base64",
"bip39",
"coconut-bandwidth-contract-common",
"coconut-dkg-common",
"coconut-interface",
"colored",
"config",
+2
View File
@@ -26,10 +26,12 @@ members = [
"common/client-libs/gateway-client",
"common/client-libs/mixnet-client",
"common/client-libs/validator-client",
"common/client-connections",
"common/coconut-interface",
"common/commands",
"common/config",
"common/cosmwasm-smart-contracts/coconut-bandwidth-contract",
"common/cosmwasm-smart-contracts/coconut-dkg",
"common/cosmwasm-smart-contracts/contracts-common",
"common/cosmwasm-smart-contracts/mixnet-contract",
"common/cosmwasm-smart-contracts/multisig-contract",
+4 -2
View File
@@ -14,11 +14,13 @@ log = "0.4"
rand = { version = "0.7.3", features = ["wasm-bindgen"] }
serde = { version = "1.0", features = ["derive"] }
sled = { version = "0.34", optional = true }
tap = "1.0.1"
thiserror = "1.0.34"
url = { version ="2.2", features = ["serde"] }
# internal
config = { path = "../../common/config" }
client-connections = { path = "../../common/client-connections" }
crypto = { path = "../../common/crypto" }
gateway-client = { path = "../../common/client-libs/gateway-client" }
#gateway-client = { path = "../../common/client-libs/gateway-client", default-features = false, features = ["wasm", "coconut"] }
@@ -28,7 +30,6 @@ nymsphinx = { path = "../../common/nymsphinx" }
pemstore = { path = "../../common/pemstore" }
topology = { path = "../../common/topology" }
validator-client = { path = "../../common/client-libs/validator-client", default-features = false }
tap = "1.0.1"
tokio = { version = "1.21.2", features = ["time", "macros"]}
@@ -56,4 +57,5 @@ tempfile = "3.1.0"
default = ["reply-surb"]
wasm = ["gateway-client/wasm"]
coconut = ["gateway-client/coconut", "gateway-requests/coconut"]
reply-surb = ["sled"]
reply-surb = ["sled"]
@@ -178,6 +178,10 @@ impl LoopCoverTrafficStream<OsRng> {
// This isn't a problem, if the channel is full means we're already sending the
// max amount of messages downstream can handle.
log::debug!("Failed to send cover message - channel full");
// However it's still useful to alert the user that the gateway or the link to
// the gateway can't keep up. Either due to insufficient bandwidth on the
// client side, or that the gateway is overloaded.
log::warn!("Failed to send: gateway appears to not keep up");
}
TrySendError::Closed(_) => {
log::warn!("Failed to send cover message - channel closed");
@@ -224,7 +228,9 @@ impl LoopCoverTrafficStream<OsRng> {
}
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("LoopCoverTrafficStream: Exiting");
})
}
@@ -1,9 +1,9 @@
use futures::channel::mpsc;
use client_connections::TransmissionLane;
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::anonymous_replies::ReplySurb;
pub type InputMessageSender = mpsc::UnboundedSender<InputMessage>;
pub type InputMessageReceiver = mpsc::UnboundedReceiver<InputMessage>;
pub type InputMessageSender = tokio::sync::mpsc::Sender<InputMessage>;
pub type InputMessageReceiver = tokio::sync::mpsc::Receiver<InputMessage>;
#[derive(Debug)]
pub enum InputMessage {
@@ -11,6 +11,7 @@ pub enum InputMessage {
recipient: Recipient,
data: Vec<u8>,
with_reply_surb: bool,
lane: TransmissionLane,
},
Reply {
reply_surb: ReplySurb,
@@ -19,11 +20,17 @@ pub enum InputMessage {
}
impl InputMessage {
pub fn new_fresh(recipient: Recipient, data: Vec<u8>, with_reply_surb: bool) -> Self {
pub fn new_fresh(
recipient: Recipient,
data: Vec<u8>,
with_reply_surb: bool,
lane: TransmissionLane,
) -> Self {
InputMessage::Fresh {
recipient,
data,
with_reply_surb,
lane,
}
}
@@ -69,6 +69,8 @@ impl MixTrafficController {
#[cfg(not(target_arch = "wasm32"))]
pub fn start_with_shutdown(mut self, mut shutdown: task::ShutdownListener) {
use std::time::Duration;
spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");
@@ -88,7 +90,9 @@ impl MixTrafficController {
}
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("MixTrafficController: Exiting");
})
}
-9
View File
@@ -1,5 +1,3 @@
use std::sync::atomic::AtomicBool;
pub mod cover_traffic_stream;
pub mod inbound_messages;
pub mod key_manager;
@@ -9,10 +7,3 @@ pub mod received_buffer;
#[cfg(feature = "reply-surb")]
pub mod reply_key_storage;
pub mod topology_control;
// This is *NOT* used to signal shutdown.
// It's critical that we don't have any tasks finishing early, this is an additional safety check
// that tasks exiting are doing so because shutdown has been signalled, and no other reason.
// In particular for tasks that rely on their associated channel being closed to signal shutdown,
// and don't have access to a shutdown listener channel.
pub static SHUTDOWN_HAS_BEEN_SIGNALLED: AtomicBool = AtomicBool::new(false);
@@ -33,7 +33,7 @@ impl AcknowledgementListener {
}
async fn on_ack(&mut self, ack_content: Vec<u8>) {
debug!("Received an ack");
trace!("Received an ack");
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
.map(FragmentIdentifier::try_from_bytes)
{
@@ -72,6 +72,8 @@ impl AcknowledgementListener {
#[cfg(not(target_arch = "wasm32"))]
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
use std::time::Duration;
debug!("Started AcknowledgementListener with graceful shutdown support");
while !shutdown.is_shutdown() {
@@ -88,7 +90,9 @@ impl AcknowledgementListener {
}
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("AcknowledgementListener: Exiting");
}
@@ -272,7 +272,9 @@ impl ActionController {
}
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("ActionController: Exiting");
}
@@ -8,7 +8,7 @@ use crate::client::{
real_messages_control::real_traffic_stream::{BatchRealMessageSender, RealMessage},
topology_control::TopologyAccessor,
};
use futures::StreamExt;
use client_connections::TransmissionLane;
use log::*;
use nymsphinx::anonymous_replies::ReplySurb;
use nymsphinx::preparer::MessagePreparer;
@@ -104,6 +104,7 @@ where
content: Vec<u8>,
with_reply_surb: bool,
) -> Option<Vec<RealMessage>> {
log::trace!("handling msg size: {}", content.len());
let topology_permit = self.topology_access.get_read_permit().await;
let topology = match topology_permit
.try_get_valid_topology_ref(&self.ack_recipient, Some(&recipient))
@@ -164,37 +165,44 @@ where
}
async fn on_input_message(&mut self, msg: InputMessage) {
let real_messages = match msg {
let (real_messages, lane) = match msg {
InputMessage::Fresh {
recipient,
data,
with_reply_surb,
} => {
lane,
} => (
self.handle_fresh_message(recipient, data, with_reply_surb)
.await,
lane,
),
InputMessage::Reply { reply_surb, data } => (
self.handle_reply(reply_surb, data)
.await
}
InputMessage::Reply { reply_surb, data } => self
.handle_reply(reply_surb, data)
.await
.map(|message| vec![message]),
.map(|message| vec![message]),
TransmissionLane::Reply,
),
};
// there's no point in trying to send nothing
if let Some(real_messages) = real_messages {
// tells real message sender (with the poisson timer) to send this to the mix network
self.real_message_sender
.unbounded_send(real_messages)
.unwrap();
.send((real_messages, lane))
.await
.expect("BatchRealMessageReceiver has stopped receiving!");
}
}
#[cfg(not(target_arch = "wasm32"))]
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
use std::time::Duration;
debug!("Started InputMessageListener with graceful shutdown support");
while !shutdown.is_shutdown() {
tokio::select! {
input_msg = self.input_receiver.next() => match input_msg {
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
self.on_input_message(input_msg).await;
},
@@ -208,14 +216,16 @@ where
}
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("InputMessageListener: Exiting");
}
#[cfg(target_arch = "wasm32")]
pub(super) async fn run(&mut self) {
debug!("Started InputMessageListener without graceful shutdown support");
while let Some(input_msg) = self.input_receiver.next().await {
while let Some(input_msg) = self.input_receiver.recv().await {
self.on_input_message(input_msg).await;
}
}
@@ -1,17 +1,21 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::action_controller::{Action, ActionSender};
use super::PendingAcknowledgement;
use super::RetransmissionRequestReceiver;
use super::{
action_controller::{Action, ActionSender},
PendingAcknowledgement, RetransmissionRequestReceiver,
};
use crate::client::{
real_messages_control::real_traffic_stream::{BatchRealMessageSender, RealMessage},
topology_control::TopologyAccessor,
};
use client_connections::TransmissionLane;
use futures::StreamExt;
use log::*;
use nymsphinx::preparer::MessagePreparer;
use nymsphinx::{acknowledgements::AckKey, addressing::clients::Recipient};
use nymsphinx::{
acknowledgements::AckKey, addressing::clients::Recipient, preparer::MessagePreparer,
};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
@@ -113,15 +117,18 @@ where
// send to `OutQueueControl` to eventually send to the mix network
self.real_message_sender
.unbounded_send(vec![RealMessage::new(
prepared_fragment.mix_packet,
frag_id,
)])
.unwrap();
.send((
vec![RealMessage::new(prepared_fragment.mix_packet, frag_id)],
TransmissionLane::Retransmission,
))
.await
.expect("BatchRealMessageReceiver has stopped receiving!");
}
#[cfg(not(target_arch = "wasm32"))]
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
use std::time::Duration;
debug!("Started RetransmissionRequestListener with graceful shutdown support");
while !shutdown.is_shutdown() {
@@ -138,7 +145,9 @@ where
}
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("RetransmissionRequestListener: Exiting");
}
@@ -8,12 +8,15 @@
use self::{
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
};
use crate::client::real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors;
use crate::client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
topology_control::TopologyAccessor,
use crate::{
client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
topology_control::TopologyAccessor,
},
spawn_future,
};
use crate::spawn_future;
use client_connections::{ConnectionCommandReceiver, LaneQueueLengths};
use futures::channel::mpsc;
use gateway_client::AcknowledgementReceiver;
use log::*;
@@ -103,6 +106,7 @@ where
// obviously when we finally make shared rng that is on 'higher' level, this should become
// generic `R`
impl RealMessagesController<OsRng> {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: Config,
ack_receiver: AcknowledgementReceiver,
@@ -110,10 +114,12 @@ impl RealMessagesController<OsRng> {
mix_sender: BatchMixMessageSender,
topology_access: TopologyAccessor,
#[cfg(feature = "reply-surb")] reply_key_storage: ReplyKeyStorage,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
) -> Self {
let rng = OsRng;
let (real_message_sender, real_message_receiver) = mpsc::unbounded();
let (real_message_sender, real_message_receiver) = tokio::sync::mpsc::channel(1);
let (sent_notifier_tx, sent_notifier_rx) = mpsc::unbounded();
let ack_controller_connectors = AcknowledgementControllerConnectors::new(
@@ -159,6 +165,8 @@ impl RealMessagesController<OsRng> {
rng,
config.self_recipient,
topology_access,
lane_queue_lengths,
client_connection_rx,
);
RealMessagesController {
@@ -4,7 +4,9 @@
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
use crate::client::topology_control::TopologyAccessor;
use futures::channel::mpsc;
use client_connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use log::*;
@@ -16,7 +18,6 @@ use nymsphinx::forwarding::packet::MixPacket;
use nymsphinx::params::PacketSize;
use nymsphinx::utils::sample_poisson_duration;
use rand::{CryptoRng, Rng};
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -27,22 +28,22 @@ use tokio::time;
#[cfg(target_arch = "wasm32")]
use wasm_timer;
// The minimum time between increasing the average delay between packets. If we hit the ceiling in
// the available buffer space we want to take somewhat swift action, but we still need to give a
// short time to give the channel a chance reduce pressure.
const INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 1;
// The minimum time between decreasing the average delay between packets. We don't want to change
// to quickly to keep things somewhat stable. Also there are buffers downstreams meaning we need to
// wait a little to see the effect before we decrease further.
const DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 30;
// If we enough time passes without any sign of backpressure in the channel, we can consider
// lowering the average delay. The goal is to keep somewhat stable, rather than maxing out
// bandwidth at all times.
const ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS: u64 = 30;
// The maximum multiplier we apply to the base average Poisson delay.
const MAX_DELAY_MULTIPLIER: u32 = 6;
// The minium multiplier we apply to the base average Poisson delay.
const MIN_DELAY_MULTIPLIER: u32 = 1;
use self::{
sending_delay_controller::SendingDelayController, transmission_buffer::TransmissionBuffer,
};
mod sending_delay_controller;
mod transmission_buffer;
#[cfg(not(target_arch = "wasm32"))]
fn get_time_now() -> time::Instant {
time::Instant::now()
}
#[cfg(target_arch = "wasm32")]
fn get_time_now() -> wasm_timer::Instant {
wasm_timer::Instant::now()
}
/// Configurable parameters of the `OutQueueControl`
pub(crate) struct Config {
@@ -85,101 +86,6 @@ impl Config {
}
}
struct SendingDelayController {
/// Multiply the average sending delay.
/// This is normally set to unity, but if we detect backpressure we increase this
/// multiplier. We use discrete steps.
current_multiplier: u32,
/// Maximum delay multiplier
upper_bound: u32,
/// Minimum delay multiplier
lower_bound: u32,
/// To make sure we don't change the multiplier to fast, we limit a change to some duration
#[cfg(not(target_arch = "wasm32"))]
time_when_changed: time::Instant,
#[cfg(target_arch = "wasm32")]
time_when_changed: wasm_timer::Instant,
/// If we have a long enough time without any backpressure detected we try reducing the sending
/// delay multiplier
#[cfg(not(target_arch = "wasm32"))]
time_when_backpressure_detected: time::Instant,
#[cfg(target_arch = "wasm32")]
time_when_backpressure_detected: wasm_timer::Instant,
}
#[cfg(not(target_arch = "wasm32"))]
fn get_time_now() -> time::Instant {
time::Instant::now()
}
#[cfg(target_arch = "wasm32")]
fn get_time_now() -> wasm_timer::Instant {
wasm_timer::Instant::now()
}
impl SendingDelayController {
fn new(lower_bound: u32, upper_bound: u32) -> Self {
assert!(lower_bound <= upper_bound);
let now = get_time_now();
SendingDelayController {
current_multiplier: MIN_DELAY_MULTIPLIER,
upper_bound,
lower_bound,
time_when_changed: now,
time_when_backpressure_detected: now,
}
}
fn current_multiplier(&self) -> u32 {
self.current_multiplier
}
fn increase_delay_multiplier(&mut self) {
self.current_multiplier =
(self.current_multiplier + 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
log::debug!(
"Increasing sending delay multiplier to: {}",
self.current_multiplier
);
}
fn decrease_delay_multiplier(&mut self) {
self.current_multiplier =
(self.current_multiplier - 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
log::debug!(
"Decreasing sending delay multiplier to: {}",
self.current_multiplier
);
}
fn record_backpressure_detected(&mut self) {
self.time_when_backpressure_detected = get_time_now();
}
fn not_increased_delay_recently(&self) -> bool {
get_time_now()
> self.time_when_changed + Duration::from_secs(INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS)
}
fn is_sending_reliable(&self) -> bool {
let now = get_time_now();
let delay_change_interval = Duration::from_secs(DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS);
let acceptable_time_without_backpressure =
Duration::from_secs(ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS);
now > self.time_when_backpressure_detected + acceptable_time_without_backpressure
&& now > self.time_when_changed + delay_change_interval
}
}
pub(crate) struct OutQueueControl<R>
where
R: CryptoRng + Rng,
@@ -203,7 +109,7 @@ where
// To make sure we don't overload the mix_tx channel, we limit the rate we are pushing
// messages.
sending_rate_controller: SendingDelayController,
sending_delay_controller: SendingDelayController,
/// Channel used for sending prepared sphinx packets to `MixTrafficController` that sends them
/// out to the network without any further delays.
@@ -222,10 +128,19 @@ where
/// Accessor to the common instance of network topology.
topology_access: TopologyAccessor,
/// Buffer containing all real messages received. It is first exhausted before more are pulled.
received_buffer: VecDeque<RealMessage>,
/// Buffer containing all incoming real messages keyed by transmission lane, that we will send
/// out to the mixnet.
transmission_buffer: TransmissionBuffer,
/// Incoming channel for being notified of closed connections, so that we can close lanes
/// corresponding to connections. To avoid sending traffic unnecessary
client_connection_rx: ConnectionCommandReceiver,
/// Report queue lengths so that upstream can backoff sending data, and keep connections open.
lane_queue_lengths: LaneQueueLengths,
}
#[derive(Debug)]
pub(crate) struct RealMessage {
mix_packet: MixPacket,
fragment_id: FragmentIdentifier,
@@ -242,8 +157,9 @@ impl RealMessage {
// messages are already prepared, etc. the real point of it is to forward it to mix_traffic
// after sufficient delay
pub(crate) type BatchRealMessageSender = mpsc::UnboundedSender<Vec<RealMessage>>;
type BatchRealMessageReceiver = mpsc::UnboundedReceiver<Vec<RealMessage>>;
pub(crate) type BatchRealMessageSender =
tokio::sync::mpsc::Sender<(Vec<RealMessage>, TransmissionLane)>;
type BatchRealMessageReceiver = tokio::sync::mpsc::Receiver<(Vec<RealMessage>, TransmissionLane)>;
pub(crate) enum StreamMessage {
Cover,
@@ -266,22 +182,23 @@ where
rng: R,
our_full_destination: Recipient,
topology_access: TopologyAccessor,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
) -> Self {
OutQueueControl {
config,
ack_key,
sent_notifier,
next_delay: None,
sending_rate_controller: SendingDelayController::new(
MIN_DELAY_MULTIPLIER,
MAX_DELAY_MULTIPLIER,
),
sending_delay_controller: Default::default(),
mix_tx,
real_receiver,
our_full_destination,
rng,
topology_access,
received_buffer: VecDeque::with_capacity(0), // we won't be putting any data into this guy directly
transmission_buffer: Default::default(),
client_connection_rx,
lane_queue_lengths,
}
}
@@ -337,7 +254,7 @@ where
};
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
log::error!("Failed to send - channel closed: {}", err);
log::error!("Failed to send: {}", err);
}
// notify ack controller about sending our message only after we actually managed to push it
@@ -346,6 +263,10 @@ where
self.sent_notify(fragment_id);
}
// In addition to closing connections on receiving messages throught client_connection_rx,
// also close connections when sufficiently stale.
self.transmission_buffer.prune_stale_connections();
// JS: Not entirely sure why or how it fixes stuff, but without the yield call,
// the UnboundedReceiver [of mix_rx] will not get a chance to read anything
// JS2: Basically it was the case that with high enough rate, the stream had already a next value
@@ -357,44 +278,71 @@ where
tokio::task::yield_now().await;
}
fn on_close_connection(&mut self, connection_id: ConnectionId) {
log::debug!("Removing lane for connection: {connection_id}");
self.transmission_buffer
.remove(&TransmissionLane::ConnectionId(connection_id));
}
fn current_average_message_sending_delay(&self) -> Duration {
self.config.average_message_sending_delay
* self.sending_rate_controller.current_multiplier()
* self.sending_delay_controller.current_multiplier()
}
fn adjust_current_average_message_sending_delay(&mut self) {
let used_slots = self.mix_tx.max_capacity() - self.mix_tx.capacity();
log::trace!(
"used_slots: {used_slots}, current_multiplier: {}",
self.sending_rate_controller.current_multiplier()
self.sending_delay_controller.current_multiplier()
);
// Even just a single used slot is enough to signal backpressure
if used_slots > 0 {
log::trace!("Backpressure detected");
self.sending_rate_controller.record_backpressure_detected();
self.sending_delay_controller.record_backpressure_detected();
}
// If the buffer is running out, slow down the sending rate
if self.mix_tx.capacity() == 0
&& self.sending_rate_controller.not_increased_delay_recently()
&& self.sending_delay_controller.not_increased_delay_recently()
{
self.sending_rate_controller.increase_delay_multiplier();
self.sending_delay_controller.increase_delay_multiplier();
}
// Very carefully step up the sending rate in case it seems like we can solidly handle the
// current rate.
if self.sending_rate_controller.is_sending_reliable() {
self.sending_rate_controller.decrease_delay_multiplier();
if self.sending_delay_controller.is_sending_reliable() {
self.sending_delay_controller.decrease_delay_multiplier();
}
}
fn pop_next_message(&mut self) -> Option<RealMessage> {
// Pop the next message from the transmission buffer
let (lane, real_next) = self.transmission_buffer.pop_next_message_at_random()?;
// Update the published queue length
let lane_length = self.transmission_buffer.lane_length(&lane);
self.lane_queue_lengths.set(&lane, lane_length);
Some(real_next)
}
fn poll_poisson(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// The average delay could change depending on if backpressure in the downstream channel
// (mix_tx) was detected.
self.adjust_current_average_message_sending_delay();
let avg_delay = self.current_average_message_sending_delay();
// Start by checking if we have any incoming messages about closed connections
// NOTE: this feels a bit iffy, the `OutQueueControl` is getting ripe for a rewrite to
// something simpler.
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
match id {
ConnectionCommand::Close(id) => self.on_close_connection(id),
ConnectionCommand::ActiveConnections(_) => panic!(),
}
}
if let Some(ref mut next_delay) = &mut self.next_delay {
// it is not yet time to return a message
if next_delay.as_mut().poll(cx).is_pending() {
@@ -419,28 +367,32 @@ where
next_delay.as_mut().reset(next_poisson_delay);
}
// check if we have anything immediately available
if let Some(real_available) = self.received_buffer.pop_front() {
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
}
// decide what kind of message to send
match Pin::new(&mut self.real_receiver).poll_next(cx) {
// On every iteration we get new messages from upstream. Given that these come bunched
// in `Vec`, this ensures that on average we will fetch messages faster than we can
// send, which is a condition for being able to multiplex sphinx packets from multiple
// data streams.
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
Poll::Ready(None) => Poll::Ready(None),
// if there are more messages available, return first one and store the rest
Poll::Ready(Some(real_messages)) => {
self.received_buffer = real_messages.into();
// we MUST HAVE received at least ONE message
Poll::Ready(Some(StreamMessage::Real(Box::new(
self.received_buffer.pop_front().unwrap(),
))))
Poll::Ready(Some((real_messages, conn_id))) => {
log::trace!("handling real_messages: size: {}", real_messages.len());
self.transmission_buffer.store(&conn_id, real_messages);
let real_next = self.pop_next_message().expect("Just stored one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
}
// otherwise construct a dummy one
Poll::Pending => Poll::Ready(Some(StreamMessage::Cover)),
Poll::Pending => {
if let Some(real_next) = self.pop_next_message() {
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
} else {
// otherwise construct a dummy one
Poll::Ready(Some(StreamMessage::Cover))
}
}
}
} else {
// we never set an initial delay - let's do it now
@@ -462,32 +414,36 @@ where
}
fn poll_immediate(&mut self, cx: &mut Context<'_>) -> Poll<Option<StreamMessage>> {
// check if we have anything immediately available
if let Some(real_available) = self.received_buffer.pop_front() {
// if there are more messages immediately available, notify the runtime
// because we should be polled again
if !self.received_buffer.is_empty() {
cx.waker().wake_by_ref()
// Start by checking if we have any incoming messages about closed connections
if let Poll::Ready(Some(id)) = Pin::new(&mut self.client_connection_rx).poll_next(cx) {
match id {
ConnectionCommand::Close(id) => self.on_close_connection(id),
ConnectionCommand::ActiveConnections(_) => panic!(),
}
return Poll::Ready(Some(StreamMessage::Real(Box::new(real_available))));
}
match Pin::new(&mut self.real_receiver).poll_next(cx) {
match Pin::new(&mut self.real_receiver).poll_recv(cx) {
// in the case our real message channel stream was closed, we should also indicate we are closed
// (and whoever is using the stream should panic)
Poll::Ready(None) => Poll::Ready(None),
// if there are more messages available, return first one and store the rest
Poll::Ready(Some(real_messages)) => {
self.received_buffer = real_messages.into();
// we MUST HAVE received at least ONE message
Poll::Ready(Some(StreamMessage::Real(Box::new(
self.received_buffer.pop_front().unwrap(),
))))
Poll::Ready(Some((real_messages, conn_id))) => {
log::trace!("handling real_messages: size: {}", real_messages.len());
// First store what we got for the given connection id
self.transmission_buffer.store(&conn_id, real_messages);
let real_next = self.pop_next_message().expect("we just added one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
}
// if there's nothing, then there's nothing
Poll::Pending => Poll::Pending,
Poll::Pending => {
if let Some(real_next) = self.pop_next_message() {
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
} else {
Poll::Pending
}
}
}
}
@@ -502,28 +458,73 @@ where
}
}
#[cfg(not(target_arch = "wasm32"))]
fn log_status(&self) {
let packets = self.transmission_buffer.total_size();
let backlog = self.transmission_buffer.total_size_in_bytes() as f64 / 1024.0;
let lanes = self.transmission_buffer.num_lanes();
let mult = self.sending_delay_controller.current_multiplier();
let delay = self.current_average_message_sending_delay().as_millis();
let status_str = if self.config.disable_poisson_packet_distribution {
format!(
"Status: {lanes} lanes, backlog: {:.2} kiB ({packets}), no delay",
backlog
)
} else {
format!(
"Status: {lanes} lanes, backlog: {:.2} kiB ({packets}), avg delay: {}ms ({mult})",
backlog, delay
)
};
if packets > 1000 {
log::warn!("{status_str}");
} else if packets > 0 {
log::info!("{status_str}");
} else {
log::debug!("{status_str}");
}
}
#[cfg(not(target_arch = "wasm32"))]
fn log_status_infrequent(&self) {
if self.sending_delay_controller.current_multiplier() > 1 {
log::warn!(
"Unable to send packets fast enough - sending delay multiplier set to: {}",
self.sending_delay_controller.current_multiplier()
);
}
}
#[cfg(not(target_arch = "wasm32"))]
pub(super) async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
debug!("Started OutQueueControl with graceful shutdown support");
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
let mut infrequent_status_timer = tokio::time::interval(Duration::from_secs(60));
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("OutQueueControl: Received shutdown");
}
next_message = self.next() => match next_message {
Some(next_message) => {
self.on_message(next_message).await;
},
None => {
log::trace!("OutQueueControl: Stopping since channel closed");
break;
}
_ = status_timer.tick() => {
self.log_status();
}
_ = infrequent_status_timer.tick() => {
self.log_status_infrequent();
}
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
} else {
log::trace!("OutQueueControl: Stopping since channel closed");
break;
}
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("OutQueueControl: Exiting");
}
@@ -0,0 +1,124 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::get_time_now;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time;
#[cfg(target_arch = "wasm32")]
use wasm_timer;
// The minimum time between increasing the average delay between packets. If we hit the ceiling in
// the available buffer space we want to take somewhat swift action, but we still need to give a
// short time to give the channel a chance reduce pressure.
const INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 1;
// The minimum time between decreasing the average delay between packets. We don't want to change
// to quickly to keep things somewhat stable. Also there are buffers downstreams meaning we need to
// wait a little to see the effect before we decrease further.
const DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS: u64 = 30;
// If we enough time passes without any sign of backpressure in the channel, we can consider
// lowering the average delay. The goal is to keep somewhat stable, rather than maxing out
// bandwidth at all times.
const ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS: u64 = 30;
// The maximum multiplier we apply to the base average Poisson delay.
const MAX_DELAY_MULTIPLIER: u32 = 6;
// The minium multiplier we apply to the base average Poisson delay.
const MIN_DELAY_MULTIPLIER: u32 = 1;
pub(crate) struct SendingDelayController {
/// Multiply the average sending delay.
/// This is normally set to unity, but if we detect backpressure we increase this
/// multiplier. We use discrete steps.
current_multiplier: u32,
/// Maximum delay multiplier
upper_bound: u32,
/// Minimum delay multiplier
lower_bound: u32,
/// To make sure we don't change the multiplier to fast, we limit a change to some duration
#[cfg(not(target_arch = "wasm32"))]
time_when_changed: time::Instant,
#[cfg(target_arch = "wasm32")]
time_when_changed: wasm_timer::Instant,
/// If we have a long enough time without any backpressure detected we try reducing the sending
/// delay multiplier
#[cfg(not(target_arch = "wasm32"))]
time_when_backpressure_detected: time::Instant,
#[cfg(target_arch = "wasm32")]
time_when_backpressure_detected: wasm_timer::Instant,
}
impl Default for SendingDelayController {
fn default() -> Self {
SendingDelayController::new(MIN_DELAY_MULTIPLIER, MAX_DELAY_MULTIPLIER)
}
}
impl SendingDelayController {
pub(crate) fn new(lower_bound: u32, upper_bound: u32) -> Self {
assert!(lower_bound <= upper_bound);
let now = get_time_now();
SendingDelayController {
current_multiplier: MIN_DELAY_MULTIPLIER,
upper_bound,
lower_bound,
time_when_changed: now,
time_when_backpressure_detected: now,
}
}
pub(crate) fn current_multiplier(&self) -> u32 {
self.current_multiplier
}
pub(crate) fn increase_delay_multiplier(&mut self) {
if self.current_multiplier < self.upper_bound {
self.current_multiplier =
(self.current_multiplier + 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
log::warn!(
"Increasing sending delay multiplier to: {}",
self.current_multiplier
);
} else {
log::warn!("Trying to increase delay multipler higher than allowed");
}
}
pub(crate) fn decrease_delay_multiplier(&mut self) {
if self.current_multiplier > self.lower_bound {
self.current_multiplier =
(self.current_multiplier - 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
log::debug!(
"Decreasing sending delay multiplier to: {}",
self.current_multiplier
);
}
}
pub(crate) fn record_backpressure_detected(&mut self) {
self.time_when_backpressure_detected = get_time_now();
}
pub(crate) fn not_increased_delay_recently(&self) -> bool {
get_time_now()
> self.time_when_changed + Duration::from_secs(INCREASE_DELAY_MIN_CHANGE_INTERVAL_SECS)
}
pub(crate) fn is_sending_reliable(&self) -> bool {
let now = get_time_now();
let delay_change_interval = Duration::from_secs(DECREASE_DELAY_MIN_CHANGE_INTERVAL_SECS);
let acceptable_time_without_backpressure =
Duration::from_secs(ACCEPTABLE_TIME_WITHOUT_BACKPRESSURE_SECS);
now > self.time_when_backpressure_detected + acceptable_time_without_backpressure
&& now > self.time_when_changed + delay_change_interval
}
}
@@ -0,0 +1,211 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use client_connections::TransmissionLane;
use rand::seq::SliceRandom;
use std::{
collections::{HashMap, HashSet, VecDeque},
time::Duration,
};
#[cfg(not(target_arch = "wasm32"))]
use tokio::time;
#[cfg(target_arch = "wasm32")]
use wasm_timer;
use super::{get_time_now, RealMessage};
// The number of lanes included in the oldest set. Used when we need to prioritize traffic.
const OLDEST_LANE_SET_SIZE: usize = 5;
// As a way of prune connections we also check for timeouts.
const MSG_CONSIDERED_STALE_AFTER_SECS: u64 = 10 * 60;
#[derive(Default)]
pub(crate) struct TransmissionBuffer {
buffer: HashMap<TransmissionLane, LaneBufferEntry>,
}
impl TransmissionBuffer {
#[allow(unused)]
pub(crate) fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub(crate) fn remove(&mut self, lane: &TransmissionLane) -> Option<LaneBufferEntry> {
self.buffer.remove(lane)
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn num_lanes(&self) -> usize {
self.buffer.keys().count()
}
pub(crate) fn lane_length(&self, lane: &TransmissionLane) -> Option<usize> {
self.buffer.get(lane).map(LaneBufferEntry::len)
}
#[allow(unused)]
pub(crate) fn connections(&self) -> HashSet<u64> {
self.buffer
.keys()
.filter_map(|lane| match lane {
TransmissionLane::ConnectionId(id) => Some(id),
_ => None,
})
.copied()
.collect()
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn total_size(&self) -> usize {
self.buffer.values().map(LaneBufferEntry::len).sum()
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn total_size_in_bytes(&self) -> usize {
self.buffer
.values()
.map(|lane_buffer_entry| {
lane_buffer_entry
.real_messages
.iter()
.map(|real_message| real_message.mix_packet.sphinx_packet().len())
.sum::<usize>()
})
.sum()
}
fn get_oldest_set(&self) -> Vec<TransmissionLane> {
let mut buffer: Vec<_> = self
.buffer
.iter()
.map(|(k, v)| (k, v.messages_transmitted))
.collect();
buffer.sort_by_key(|v| v.1);
buffer
.iter()
.rev()
.map(|(k, _)| *k)
.take(OLDEST_LANE_SET_SIZE)
.copied()
.collect()
}
pub(crate) fn store(&mut self, lane: &TransmissionLane, real_messages: Vec<RealMessage>) {
if let Some(lane_buffer_entry) = self.buffer.get_mut(lane) {
lane_buffer_entry.append(real_messages);
} else {
self.buffer
.insert(*lane, LaneBufferEntry::new(real_messages));
}
}
fn pick_random_lane(&self) -> Option<&TransmissionLane> {
let lanes: Vec<&TransmissionLane> = self.buffer.keys().collect();
lanes.choose(&mut rand::thread_rng()).copied()
}
fn pick_random_small_lane(&self) -> Option<&TransmissionLane> {
let lanes: Vec<&TransmissionLane> = self
.buffer
.iter()
.filter(|(_, v)| v.is_small())
.map(|(k, _)| k)
.collect();
lanes.choose(&mut rand::thread_rng()).copied()
}
fn pick_random_old_lane(&self) -> Option<TransmissionLane> {
let lanes = self.get_oldest_set();
lanes.choose(&mut rand::thread_rng()).copied()
}
fn pop_front_from_lane(&mut self, lane: &TransmissionLane) -> Option<RealMessage> {
let real_msgs_queued = self.buffer.get_mut(lane)?;
let real_next = real_msgs_queued.pop_front()?;
real_msgs_queued.messages_transmitted += 1;
if real_msgs_queued.is_empty() {
self.buffer.remove(lane);
}
Some(real_next)
}
pub(crate) fn pop_next_message_at_random(&mut self) -> Option<(TransmissionLane, RealMessage)> {
if self.buffer.is_empty() {
return None;
}
// Very basic heuristic where we prioritize according to small lanes first, the older lanes
// to try to finish lanes when possible, then the rest.
let lane = if let Some(small_lane) = self.pick_random_small_lane() {
*small_lane
} else if let Some(old_lane) = self.pick_random_old_lane() {
old_lane
} else {
*self.pick_random_lane()?
};
let msg = self.pop_front_from_lane(&lane)?;
log::trace!("picking to send from lane: {:?}", lane);
Some((lane, msg))
}
pub(crate) fn prune_stale_connections(&mut self) {
let stale_entries: Vec<_> = self
.buffer
.iter()
.filter_map(|(lane, entry)| if entry.is_stale() { Some(lane) } else { None })
.copied()
.collect();
for lane in stale_entries {
self.remove(&lane);
}
}
}
pub(crate) struct LaneBufferEntry {
pub real_messages: VecDeque<RealMessage>,
pub messages_transmitted: usize,
#[cfg(not(target_arch = "wasm32"))]
pub time_for_last_activity: time::Instant,
#[cfg(target_arch = "wasm32")]
pub time_for_last_activity: wasm_timer::Instant,
}
impl LaneBufferEntry {
fn new(real_messages: Vec<RealMessage>) -> Self {
LaneBufferEntry {
real_messages: real_messages.into(),
messages_transmitted: 0,
time_for_last_activity: get_time_now(),
}
}
fn append(&mut self, real_messages: Vec<RealMessage>) {
self.real_messages.append(&mut real_messages.into());
self.time_for_last_activity = get_time_now();
}
fn pop_front(&mut self) -> Option<RealMessage> {
self.real_messages.pop_front()
}
fn is_small(&self) -> bool {
self.real_messages.len() < 100
}
fn is_stale(&self) -> bool {
get_time_now() - self.time_for_last_activity
> Duration::from_secs(MSG_CONSIDERED_STALE_AFTER_SECS)
}
fn len(&self) -> usize {
self.real_messages.len()
}
fn is_empty(&self) -> bool {
self.real_messages.is_empty()
}
}
@@ -208,7 +208,7 @@ impl ReceivedMessagesBuffer {
}
async fn handle_new_received(&mut self, msgs: Vec<Vec<u8>>) {
debug!(
trace!(
"Processing {:?} new message that might get added to the buffer!",
msgs.len()
);
@@ -322,6 +322,8 @@ impl RequestReceiver {
#[cfg(not(target_arch = "wasm32"))]
async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
use std::time::Duration;
debug!("Started RequestReceiver with graceful shutdown support");
while !shutdown.is_shutdown() {
tokio::select! {
@@ -340,7 +342,9 @@ impl RequestReceiver {
},
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("RequestReceiver: Exiting");
}
@@ -372,6 +376,8 @@ impl FragmentedMessageReceiver {
#[cfg(not(target_arch = "wasm32"))]
async fn run_with_shutdown(&mut self, mut shutdown: task::ShutdownListener) {
use std::time::Duration;
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
while !shutdown.is_shutdown() {
tokio::select! {
@@ -389,7 +395,9 @@ impl FragmentedMessageReceiver {
}
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("FragmentedMessageReceiver: Exiting");
}
@@ -8,10 +8,13 @@ use nymsphinx::anonymous_replies::{
};
use std::path::Path;
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum ReplyKeyStorageError {
#[error("DB Read Error: {0}")]
DbReadError(sled::Error),
#[error("DB Write Error: {0}")]
DbWriteError(sled::Error),
#[error("DB Open Error: {0}")]
DbOpenError(sled::Error),
}
@@ -138,7 +138,7 @@ impl TopologyRefresherConfig {
}
pub struct TopologyRefresher {
validator_client: validator_client::ApiClient,
validator_client: validator_client::client::ApiClient,
client_version: String,
validator_api_urls: Vec<Url>,
@@ -154,7 +154,9 @@ impl TopologyRefresher {
cfg.validator_api_urls.shuffle(&mut thread_rng());
TopologyRefresher {
validator_client: validator_client::ApiClient::new(cfg.validator_api_urls[0].clone()),
validator_client: validator_client::client::ApiClient::new(
cfg.validator_api_urls[0].clone(),
),
client_version: cfg.client_version,
validator_api_urls: cfg.validator_api_urls,
topology_accessor,
@@ -309,7 +311,9 @@ impl TopologyRefresher {
},
}
}
assert!(shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("TopologyRefresher: Exiting");
})
}
+12
View File
@@ -125,6 +125,10 @@ impl<T: NymConfig> Config<T> {
self.client.gateway_endpoint.gateway_id = id.into();
}
pub fn set_custom_validators(&mut self, validator_urls: Vec<Url>) {
self.client.validator_urls = validator_urls;
}
pub fn set_custom_validator_apis(&mut self, validator_api_urls: Vec<Url>) {
self.client.validator_api_urls = validator_api_urls;
}
@@ -179,6 +183,10 @@ impl<T: NymConfig> Config<T> {
self.client.ack_key_file.clone()
}
pub fn get_validator_endpoints(&self) -> Vec<Url> {
self.client.validator_urls.clone()
}
pub fn get_validator_api_endpoints(&self) -> Vec<Url> {
self.client.validator_api_urls.clone()
}
@@ -306,6 +314,9 @@ pub struct Client<T> {
#[serde(default)]
disabled_credentials_mode: bool,
/// Addresses to nymd validators via which the client can communicate with the chain.
validator_urls: Vec<Url>,
/// Addresses to APIs running on validator from which the client gets the view of the network.
validator_api_urls: Vec<Url>,
@@ -354,6 +365,7 @@ impl<T: NymConfig> Default for Client<T> {
version: env!("CARGO_PKG_VERSION").to_string(),
id: "".to_string(),
disabled_credentials_mode: true,
validator_urls: vec![],
validator_api_urls: vec![],
private_identity_key_file: Default::default(),
public_identity_key_file: Default::default(),
+3
View File
@@ -26,4 +26,7 @@ pub enum ClientCoreError {
CouldNotLoadExistingGatewayConfiguration(std::io::Error),
#[error("The current network topology seem to be insufficient to route any packets through")]
InsufficientNetworkTopology,
#[error("Unexpected exit")]
UnexpectedExit,
}
+1 -1
View File
@@ -31,7 +31,7 @@ pub async fn query_gateway_details(
let validator_api = validator_servers
.choose(&mut thread_rng())
.ok_or(ClientCoreError::ListOfValidatorApisIsEmpty)?;
let validator_client = validator_client::ApiClient::new(validator_api.clone());
let validator_client = validator_client::client::ApiClient::new(validator_api.clone());
log::trace!("Fetching list of gateways from: {}", validator_api);
let gateways = validator_client.get_cached_gateways().await?;
+14 -7
View File
@@ -14,8 +14,9 @@ use credential_storage::PersistentStorage;
use credentials::coconut::bandwidth::{BandwidthVoucher, TOTAL_ATTRIBUTES};
use credentials::coconut::utils::obtain_aggregate_signature;
use crypto::asymmetric::{encryption, identity};
use network_defaults::VOUCHER_INFO;
use network_defaults::{NymNetworkDetails, VOUCHER_INFO};
use validator_client::nymd::tx::Hash;
use validator_client::{CoconutApiClient, Config};
use crate::client::Client;
use crate::error::{CredentialClientError, Result};
@@ -107,10 +108,9 @@ pub(crate) struct GetCredential {
/// The hash of a successful deposit transaction
#[clap(long)]
tx_hash: String,
/// The URLs to the validator-api endpoints the are run as coconut signer authorities, separated
/// by comma (,)
/// The nymd URL that should be used
#[clap(long)]
signer_authorities: String,
nymd_url: String,
/// If we want to get the signature without attaching a blind sign request; it is expected that
/// there is already a signature stored on the signer
#[clap(long, parse(from_flag))]
@@ -124,7 +124,10 @@ impl Execute for GetCredential {
.get::<State>(&self.tx_hash)
.ok_or(CredentialClientError::NoDeposit)?;
let urls = config::parse_validators(&self.signer_authorities);
let network_details = NymNetworkDetails::new_from_env();
let config = Config::try_from_nym_network_details(&network_details)?;
let client = validator_client::Client::new_query(config)?;
let coconut_api_clients = CoconutApiClient::all_coconut_api_clients(&client).await?;
let params = Parameters::new(TOTAL_ATTRIBUTES).unwrap();
let bandwidth_credential_attributes = if self.__no_request {
@@ -178,8 +181,12 @@ impl Execute for GetCredential {
)?);
db.set(&self.tx_hash, &state).unwrap();
let signature =
obtain_aggregate_signature(&params, &bandwidth_credential_attributes, &urls).await?;
let signature = obtain_aggregate_signature(
&params,
&bandwidth_credential_attributes,
&coconut_api_clients,
)
.await?;
shared_storage
.insert_coconut_credential(
state.amount.to_string(),
+4
View File
@@ -8,6 +8,7 @@ use credentials::error::Error as CredentialError;
use crypto::asymmetric::encryption::KeyRecoveryError;
use crypto::asymmetric::identity::Ed25519RecoveryError;
use validator_client::nymd::error::NymdError;
use validator_client::ValidatorClientError;
pub type Result<T> = std::result::Result<T, CredentialClientError>;
@@ -16,6 +17,9 @@ pub enum CredentialClientError {
#[error("Nymd error: {0}")]
Nymd(#[from] NymdError),
#[error("Validator client error: {0}")]
ValidatorClientError(#[from] ValidatorClientError),
#[error("Credential error: {0}")]
Credential(#[from] CredentialError),
+3 -1
View File
@@ -4,7 +4,7 @@ version = "1.1.0"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
rust-version = "1.56"
rust-version = "1.65"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -33,6 +33,7 @@ tokio-tungstenite = "0.14" # websocket
## internal
client-core = { path = "../client-core" }
client-connections = { path = "../../common/client-connections" }
coconut-interface = { path = "../../common/coconut-interface", optional = true }
config = { path = "../../common/config" }
completions = { path = "../../common/completions" }
@@ -50,6 +51,7 @@ topology = { path = "../../common/topology" }
validator-client = { path = "../../common/client-libs/validator-client", features = ["nymd-client"] }
version-checker = { path = "../../common/version-checker" }
websocket-requests = { path = "websocket-requests" }
tap = "1.0.1"
[features]
coconut = ["coconut-interface", "credentials", "credentials/coconut", "gateway-requests/coconut", "gateway-client/coconut", "client-core/coconut"]
@@ -43,6 +43,7 @@ async fn send_file_with_reply() {
recipient,
message: read_data,
with_reply_surb: true,
connection_id: Some(0),
};
println!("sending content of 'dummy_file' over the mix network...");
@@ -91,6 +92,7 @@ async fn send_file_without_reply() {
recipient,
message: read_data,
with_reply_surb: false,
connection_id: Some(0),
};
println!("sending content of 'dummy_file' over the mix network...");
@@ -23,6 +23,13 @@ id = '{{ client.id }}'
# to claim bandwidth without presenting bandwidth credentials.
disabled_credentials_mode = {{ client.disabled_credentials_mode }}
# Addresses to nymd validators via which the client can communicate with the chain.
validator_urls = [
{{#each client.validator_urls }}
'{{this}}',
{{/each}}
]
# Addresses to APIs running on validator from which the client gets the view of the network.
validator_api_urls = [
{{#each client.validator_api_urls }}
+70 -19
View File
@@ -1,6 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use client_connections::{
ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths, TransmissionLane,
};
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
use client_core::client::inbound_messages::{
InputMessage, InputMessageReceiver, InputMessageSender,
@@ -31,6 +34,7 @@ use nymsphinx::addressing::clients::Recipient;
use nymsphinx::addressing::nodes::NodeIdentity;
use nymsphinx::anonymous_replies::ReplySurb;
use nymsphinx::receiver::ReconstructedMessage;
use tap::TapFallible;
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
use crate::client::config::{Config, SocketType};
@@ -110,6 +114,7 @@ impl NymClient {
stream.start_with_shutdown(shutdown);
}
#[allow(clippy::too_many_arguments)]
fn start_real_traffic_controller(
&self,
topology_accessor: TopologyAccessor,
@@ -117,6 +122,8 @@ impl NymClient {
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
shutdown: ShutdownListener,
) {
let mut controller_config = real_messages_control::Config::new(
@@ -146,6 +153,8 @@ impl NymClient {
mix_sender,
topology_accessor,
reply_key_storage,
lane_queue_lengths,
client_connection_rx,
)
.start_with_shutdown(shutdown);
}
@@ -192,11 +201,22 @@ impl NymClient {
.expect("provided gateway id is invalid!");
#[cfg(feature = "coconut")]
let bandwidth_controller = BandwidthController::new(
credential_storage::initialise_storage(self.config.get_base().get_database_path())
.await,
self.config.get_base().get_validator_api_endpoints(),
);
let bandwidth_controller = {
let details = network_defaults::NymNetworkDetails::new_from_env();
let client_config = validator_client::Config::try_from_nym_network_details(&details)
.expect("failed to construct validator client config");
let client = validator_client::Client::new_query(client_config)
.expect("Could not construct query client");
let coconut_api_clients =
validator_client::CoconutApiClient::all_coconut_api_clients(&client)
.await
.expect("Could not query api clients");
BandwidthController::new(
credential_storage::initialise_storage(self.config.get_base().get_database_path())
.await,
coconut_api_clients,
)
};
#[cfg(not(feature = "coconut"))]
let bandwidth_controller = BandwidthController::new(
credential_storage::initialise_storage(self.config.get_base().get_database_path())
@@ -279,11 +299,18 @@ impl NymClient {
&self,
buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender,
shared_lane_queue_lengths: LaneQueueLengths,
client_connection_tx: ConnectionCommandSender,
) {
info!("Starting websocket listener...");
let websocket_handler =
websocket::Handler::new(msg_input, buffer_requester, self.as_mix_recipient());
let websocket_handler = websocket::Handler::new(
msg_input,
client_connection_tx,
buffer_requester,
&self.as_mix_recipient(),
shared_lane_queue_lengths,
);
websocket::Listener::new(self.config.get_listening_port()).start(websocket_handler);
}
@@ -291,27 +318,35 @@ impl NymClient {
/// EXPERIMENTAL DIRECT RUST API
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
/// well enough in local tests)
pub fn send_message(&mut self, recipient: Recipient, message: Vec<u8>, with_reply_surb: bool) {
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
pub async fn send_message(
&mut self,
recipient: Recipient,
message: Vec<u8>,
with_reply_surb: bool,
) {
let lane = TransmissionLane::General;
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb, lane);
self.input_tx
.as_ref()
.expect("start method was not called before!")
.unbounded_send(input_msg)
.unwrap();
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
}
/// EXPERIMENTAL DIRECT RUST API
/// It's untested and there are absolutely no guarantees about it (but seems to have worked
/// well enough in local tests)
pub fn send_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) {
pub async fn send_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) {
let input_msg = InputMessage::new_reply(reply_surb, message);
self.input_tx
.as_ref()
.expect("start method was not called before!")
.unbounded_send(input_msg)
.unwrap();
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
}
/// EXPERIMENTAL DIRECT RUST API
@@ -368,7 +403,7 @@ impl NymClient {
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
// channels responsible for controlling real messages
let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
@@ -376,7 +411,10 @@ impl NymClient {
let reply_key_storage =
ReplyKeyStorage::load(self.config.get_base().get_reply_encryption_key_store_path())
.expect("Failed to load reply key storage!");
.tap_err(|err| {
log::error!("Failed to load reply key storage - is it perhaps already in use?");
log::error!("{}", err);
})?;
// Shutdown notifier for signalling tasks to stop
let shutdown = ShutdownNotifier::default();
@@ -403,12 +441,22 @@ impl NymClient {
let sphinx_message_sender =
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
// Channels that the websocket listener can use to signal downstream to the real traffic
// controller that connections are closed.
let (client_connection_tx, client_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
let shared_lane_queue_lengths = LaneQueueLengths::new();
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
ack_receiver,
input_receiver,
sphinx_message_sender.clone(),
shared_lane_queue_lengths.clone(),
client_connection_rx,
shutdown.subscribe(),
);
@@ -425,9 +473,12 @@ impl NymClient {
}
match self.config.get_socket_type() {
SocketType::WebSocket => {
self.start_websocket_listener(received_buffer_request_sender, input_sender)
}
SocketType::WebSocket => self.start_websocket_listener(
received_buffer_request_sender,
input_sender,
shared_lane_queue_lengths,
client_connection_tx,
),
SocketType::None => {
// if we did not start the socket, it means we're running (supposedly) in the native mode
// and hence we should announce 'ourselves' to the buffer
+8 -3
View File
@@ -25,9 +25,13 @@ pub(crate) struct Init {
#[clap(long)]
force_register_gateway: bool,
/// Comma separated list of rest endpoints of the validators
/// Comma separated list of rest endpoints of the nymd validators
#[clap(long)]
validators: Option<String>,
nymd_validators: Option<String>,
/// Comma separated list of rest endpoints of the API validators
#[clap(long)]
api_validators: Option<String>,
/// Whether to not start the websocket
#[clap(long)]
@@ -52,7 +56,8 @@ pub(crate) struct Init {
impl From<Init> for OverrideConfig {
fn from(init_config: Init) -> Self {
OverrideConfig {
validators: init_config.validators,
nymd_validators: init_config.nymd_validators,
api_validators: init_config.api_validators,
disable_socket: init_config.disable_socket,
port: init_config.port,
fastmode: init_config.fastmode,
+14 -2
View File
@@ -75,7 +75,8 @@ pub(crate) enum Commands {
// Configuration that can be overridden.
pub(crate) struct OverrideConfig {
validators: Option<String>,
nymd_validators: Option<String>,
api_validators: Option<String>,
disable_socket: bool,
port: Option<u16>,
fastmode: bool,
@@ -98,7 +99,18 @@ pub(crate) async fn execute(args: &Cli) -> Result<(), ClientError> {
}
pub(crate) fn override_config(mut config: Config, args: OverrideConfig) -> Config {
if let Some(raw_validators) = args.validators {
if let Some(raw_validators) = args.nymd_validators {
config
.get_base_mut()
.set_custom_validators(config::parse_validators(&raw_validators));
} else if std::env::var(network_defaults::var_names::CONFIGURED).is_ok() {
let raw_validators = std::env::var(network_defaults::var_names::NYMD_VALIDATOR)
.expect("nymd validator not set");
config
.get_base_mut()
.set_custom_validators(config::parse_validators(&raw_validators));
}
if let Some(raw_validators) = args.api_validators {
config
.get_base_mut()
.set_custom_validator_apis(config::parse_validators(&raw_validators));
+8 -3
View File
@@ -18,9 +18,13 @@ pub(crate) struct Run {
#[clap(long)]
id: String,
/// Comma separated list of rest endpoints of the validators
/// Comma separated list of rest endpoints of the nymd validators
#[clap(long)]
validators: Option<String>,
nymd_validators: Option<String>,
/// Comma separated list of rest endpoints of the API validators
#[clap(long)]
api_validators: Option<String>,
/// Id of the gateway we want to connect to. If overridden, it is user's responsibility to
/// ensure prior registration happened
@@ -45,7 +49,8 @@ pub(crate) struct Run {
impl From<Run> for OverrideConfig {
fn from(run_config: Run) -> Self {
OverrideConfig {
validators: run_config.validators,
nymd_validators: run_config.nymd_validators,
api_validators: run_config.api_validators,
disable_socket: run_config.disable_socket,
port: run_config.port,
fastmode: false,
+3 -1
View File
@@ -1,4 +1,4 @@
use client_core::error::ClientCoreError;
use client_core::{client::reply_key_storage::ReplyKeyStorageError, error::ClientCoreError};
use crypto::asymmetric::identity::Ed25519RecoveryError;
use gateway_client::error::GatewayClientError;
use validator_client::ValidatorClientError;
@@ -15,6 +15,8 @@ pub enum ClientError {
ValidatorClientError(#[from] ValidatorClientError),
#[error("client-core error: {0}")]
ClientCoreError(#[from] ClientCoreError),
#[error("Reply key storage error: {0}")]
ReplyKeyStorageError(#[from] ReplyKeyStorageError),
#[error("Failed to load config for: {0}")]
FailedToLoadConfig(String),
+128 -52
View File
@@ -1,6 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use client_connections::{
ConnectionCommand, ConnectionCommandSender, LaneQueueLengths, TransmissionLane,
};
use client_core::client::{
inbound_messages::{InputMessage, InputMessageSender},
received_buffer::{
@@ -34,10 +37,12 @@ impl Default for ReceivedResponseType {
pub(crate) struct Handler {
msg_input: InputMessageSender,
client_connection_tx: ConnectionCommandSender,
buffer_requester: ReceivedBufferRequestSender,
self_full_address: Recipient,
socket: Option<WebSocketStream<TcpStream>>,
received_response_type: ReceivedResponseType,
lane_queue_lengths: LaneQueueLengths,
}
// clone is used to use handler on a new connection, which initially is `None`
@@ -45,10 +50,12 @@ impl Clone for Handler {
fn clone(&self) -> Self {
Handler {
msg_input: self.msg_input.clone(),
client_connection_tx: self.client_connection_tx.clone(),
buffer_requester: self.buffer_requester.clone(),
self_full_address: self.self_full_address,
socket: None,
received_response_type: Default::default(),
lane_queue_lengths: self.lane_queue_lengths.clone(),
}
}
}
@@ -64,38 +71,85 @@ impl Drop for Handler {
impl Handler {
pub(crate) fn new(
msg_input: InputMessageSender,
client_connection_tx: ConnectionCommandSender,
buffer_requester: ReceivedBufferRequestSender,
self_full_address: Recipient,
self_full_address: &Recipient,
lane_queue_lengths: LaneQueueLengths,
) -> Self {
Handler {
msg_input,
client_connection_tx,
buffer_requester,
self_full_address,
self_full_address: *self_full_address,
socket: None,
received_response_type: Default::default(),
lane_queue_lengths,
}
}
fn handle_send(
async fn handle_send(
&mut self,
recipient: Recipient,
recipient: &Recipient,
message: Vec<u8>,
with_reply_surb: bool,
connection_id: Option<u64>,
) -> Option<ServerResponse> {
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_fresh(recipient, message, with_reply_surb);
self.msg_input.unbounded_send(input_msg).unwrap();
// We map the absence of a connection id as going into the general lane.
let lane = connection_id.map_or(TransmissionLane::General, |id| {
TransmissionLane::ConnectionId(id)
});
None
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_fresh(*recipient, message, with_reply_surb, lane);
self.msg_input
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
// Only reply back with a `LaneQueueLength` if the sender providided a connection id
let connection_id = match lane {
TransmissionLane::General
| TransmissionLane::Reply
| TransmissionLane::Retransmission
| TransmissionLane::Control => return None,
TransmissionLane::ConnectionId(id) => id,
};
// on receiving a send, we reply back the current lane queue length for that connection id.
// Note that this does _NOT_ take into account the packets that have been received but not
// yet reach `OutQueueControl`, so it might be a tad low.
let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() else {
log::warn!(
"Failed to get the lane queue length lock, \
not responding back with the current queue length"
);
return None;
};
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
Some(ServerResponse::LaneQueueLength(connection_id, queue_length))
}
fn handle_reply(&mut self, reply_surb: ReplySurb, message: Vec<u8>) -> Option<ServerResponse> {
async fn handle_reply(
&mut self,
reply_surb: ReplySurb,
message: Vec<u8>,
) -> Option<ServerResponse> {
if message.len() > ReplySurb::max_msg_len(Default::default()) {
return Some(ServerResponse::new_error(format!("too long message to put inside a reply SURB. Received: {} bytes and maximum is {} bytes", message.len(), ReplySurb::max_msg_len(Default::default()))));
return Some(
ServerResponse::new_error(
format!(
"too long message to put inside a reply SURB. Received: {} bytes and maximum is {} bytes",
message.len(), ReplySurb::max_msg_len(Default::default()))
)
);
}
let input_msg = InputMessage::new_reply(reply_surb, message);
self.msg_input.unbounded_send(input_msg).unwrap();
self.msg_input
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
None
}
@@ -104,22 +158,48 @@ impl Handler {
ServerResponse::SelfAddress(self.self_full_address)
}
fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
fn handle_closed_connection(&self, connection_id: u64) -> Option<ServerResponse> {
self.client_connection_tx
.unbounded_send(ConnectionCommand::Close(connection_id))
.unwrap();
None
}
fn handle_get_lane_queue_length(&self, connection_id: u64) -> Option<ServerResponse> {
let Ok(lane_queue_lengths) = self.lane_queue_lengths.lock() else {
log::warn!(
"Failed to get the lane queue length lock, not responding back with the current queue length"
);
return None;
};
let lane = TransmissionLane::ConnectionId(connection_id);
let queue_length = lane_queue_lengths.get(&lane).unwrap_or(0);
Some(ServerResponse::LaneQueueLength(connection_id, queue_length))
}
async fn handle_request(&mut self, request: ClientRequest) -> Option<ServerResponse> {
match request {
ClientRequest::Send {
recipient,
message,
with_reply_surb,
} => self.handle_send(recipient, message, with_reply_surb),
connection_id,
} => {
self.handle_send(&recipient, message, with_reply_surb, connection_id)
.await
}
ClientRequest::Reply {
message,
reply_surb,
} => self.handle_reply(reply_surb, message),
} => self.handle_reply(reply_surb, message).await,
ClientRequest::SelfAddress => Some(self.handle_self_address()),
ClientRequest::ClosedConnection(id) => self.handle_closed_connection(id),
ClientRequest::GetLaneQueueLength(id) => self.handle_get_lane_queue_length(id),
}
}
fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
async fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
debug!("Handling text message request");
trace!("Content: {:?}", msg);
@@ -128,13 +208,13 @@ impl Handler {
let response = match client_request {
Err(err) => Some(ServerResponse::Error(err)),
Ok(req) => self.handle_request(req),
Ok(req) => self.handle_request(req).await,
};
response.map(|resp| WsMessage::text(resp.into_text()))
}
fn handle_binary_message(&mut self, msg: Vec<u8>) -> Option<WsMessage> {
async fn handle_binary_message(&mut self, msg: &[u8]) -> Option<WsMessage> {
debug!("Handling binary message request");
self.received_response_type = ReceivedResponseType::Binary;
@@ -142,49 +222,23 @@ impl Handler {
let response = match client_request {
Err(err) => Some(ServerResponse::Error(err)),
Ok(req) => self.handle_request(req),
Ok(req) => self.handle_request(req).await,
};
response.map(|resp| WsMessage::Binary(resp.into_binary()))
}
fn handle_ws_request(&mut self, raw_request: WsMessage) -> Option<WsMessage> {
async fn handle_ws_request(&mut self, raw_request: WsMessage) -> Option<WsMessage> {
// apparently tungstenite auto-handles ping/pong/close messages so for now let's ignore
// them and let's test that claim. If that's not the case, just copy code from
// old version of this file.
match raw_request {
WsMessage::Text(text_message) => self.handle_text_message(text_message),
WsMessage::Binary(binary_message) => self.handle_binary_message(binary_message),
WsMessage::Text(text_message) => self.handle_text_message(text_message).await,
WsMessage::Binary(binary_message) => self.handle_binary_message(&binary_message).await,
_ => None,
}
}
// I'm still not entirely sure why `send_all` requires `TryStream` rather than `Stream`, but
// let's just play along for now
fn prepare_reconstructed_binary(
&self,
reconstructed_messages: Vec<ReconstructedMessage>,
) -> Vec<Result<WsMessage, WsError>> {
reconstructed_messages
.into_iter()
.map(ServerResponse::Received)
.map(|resp| Ok(WsMessage::Binary(resp.into_binary())))
.collect()
}
// I'm still not entirely sure why `send_all` requires `TryStream` rather than `Stream`, but
// let's just play along for now
fn prepare_reconstructed_text(
&self,
reconstructed_messages: Vec<ReconstructedMessage>,
) -> Vec<Result<WsMessage, WsError>> {
reconstructed_messages
.into_iter()
.map(ServerResponse::Received)
.map(|resp| Ok(WsMessage::Text(resp.into_text())))
.collect()
}
async fn push_websocket_received_plaintexts(
&mut self,
reconstructed_messages: Vec<ReconstructedMessage>,
@@ -193,10 +247,8 @@ impl Handler {
// if it's text or binary, but for time being we use the naive assumption that if
// client is sending Message::Text it expects text back. Same for Message::Binary
let response_messages = match self.received_response_type {
ReceivedResponseType::Binary => {
self.prepare_reconstructed_binary(reconstructed_messages)
}
ReceivedResponseType::Text => self.prepare_reconstructed_text(reconstructed_messages),
ReceivedResponseType::Binary => prepare_reconstructed_binary(reconstructed_messages),
ReceivedResponseType::Text => prepare_reconstructed_text(reconstructed_messages),
};
let mut send_stream = futures::stream::iter(response_messages);
@@ -244,7 +296,7 @@ impl Handler {
break;
}
if let Some(response) = self.handle_ws_request(socket_msg) {
if let Some(response) = self.handle_ws_request(socket_msg).await {
if let Err(err) = self.send_websocket_response(response).await {
warn!(
"Failed to send message over websocket: {}. Assuming the connection is dead.",
@@ -291,3 +343,27 @@ impl Handler {
self.listen_for_requests(reconstructed_receiver).await;
}
}
// I'm still not entirely sure why `send_all` requires `TryStream` rather than `Stream`, but
// let's just play along for now
fn prepare_reconstructed_binary(
reconstructed_messages: Vec<ReconstructedMessage>,
) -> Vec<Result<WsMessage, WsError>> {
reconstructed_messages
.into_iter()
.map(ServerResponse::Received)
.map(|resp| Ok(WsMessage::Binary(resp.into_binary())))
.collect()
}
// I'm still not entirely sure why `send_all` requires `TryStream` rather than `Stream`, but
// let's just play along for now
fn prepare_reconstructed_text(
reconstructed_messages: Vec<ReconstructedMessage>,
) -> Vec<Result<WsMessage, WsError>> {
reconstructed_messages
.into_iter()
.map(ServerResponse::Received)
.map(|resp| Ok(WsMessage::Text(resp.into_text())))
.collect()
}
+121 -15
View File
@@ -20,6 +20,12 @@ pub const REPLY_REQUEST_TAG: u8 = 0x01;
/// Value tag representing [`SelfAddress`] variant of the [`ClientRequest`]
pub const SELF_ADDRESS_REQUEST_TAG: u8 = 0x02;
/// Value tag representing [`ClosedConnection`] variant of the [`ClientRequest`]
pub const CLOSED_CONNECTION_REQUEST_TAG: u8 = 0x03;
/// Value tag representing [`GetLaneQueueLength`] variant of the [`ClientRequest`]
pub const GET_LANE_QUEUE_LENGHT_TAG: u8 = 0x04;
#[allow(non_snake_case)]
#[derive(Debug)]
pub enum ClientRequest {
@@ -28,32 +34,42 @@ pub enum ClientRequest {
message: Vec<u8>,
// Perhaps we could change it to a number to indicate how many reply_SURBs we want to include?
with_reply_surb: bool,
connection_id: Option<u64>,
},
Reply {
message: Vec<u8>,
reply_surb: ReplySurb,
},
SelfAddress,
ClosedConnection(u64),
GetLaneQueueLength(u64),
}
// we could have been parsing it directly TryFrom<WsMessage>, but we want to retain
// information about whether it came from binary or text to send appropriate response back
impl ClientRequest {
// SEND_REQUEST_TAG || with_surb || recipient || data_len || data
fn serialize_send(recipient: Recipient, data: Vec<u8>, with_reply_surb: bool) -> Vec<u8> {
// SEND_REQUEST_TAG || with_surb || recipient || conn_id || data_len || data
fn serialize_send(
recipient: Recipient,
data: Vec<u8>,
with_reply_surb: bool,
connection_id: Option<u64>,
) -> Vec<u8> {
let data_len_bytes = (data.len() as u64).to_be_bytes();
let conn_id_bytes = connection_id.unwrap_or(0).to_be_bytes();
std::iter::once(SEND_REQUEST_TAG)
.chain(std::iter::once(with_reply_surb as u8))
.chain(recipient.to_bytes().iter().cloned()) // will not be length prefixed because the length is constant
.chain(conn_id_bytes.iter().cloned())
.chain(data_len_bytes.iter().cloned())
.chain(data.into_iter())
.collect()
}
// SEND_REQUEST_TAG || with_reply || recipient || data_len || data
// SEND_REQUEST_TAG || with_reply || recipient || conn_id || data_len || data
fn deserialize_send(b: &[u8]) -> Result<Self, error::Error> {
// we need to have at least 1 (tag) + 1 (reply flag) + Recipient::LEN + sizeof<u64> bytes
if b.len() < 2 + Recipient::LEN + size_of::<u64>() {
// we need to have at least 1 (tag) + 1 (reply flag) + Recipient::LEN + 2*sizeof<u64> bytes
if b.len() < 2 + Recipient::LEN + 2 * size_of::<u64>() {
return Err(error::Error::new(
ErrorKind::TooShortRequest,
"not enough data provided to recover 'send'".to_string(),
@@ -86,9 +102,20 @@ impl ClientRequest {
}
};
let data_len_bytes = &b[2 + Recipient::LEN..2 + Recipient::LEN + size_of::<u64>()];
let mut connection_id_bytes = [0u8; size_of::<u64>()];
connection_id_bytes
.copy_from_slice(&b[2 + Recipient::LEN..2 + Recipient::LEN + size_of::<u64>()]);
let connection_id = u64::from_be_bytes(connection_id_bytes);
let connection_id = if connection_id == 0 {
None
} else {
Some(connection_id)
};
let data_len_bytes =
&b[2 + Recipient::LEN + size_of::<u64>()..2 + Recipient::LEN + 2 * size_of::<u64>()];
let data_len = u64::from_be_bytes(data_len_bytes.try_into().unwrap());
let data = &b[2 + Recipient::LEN + size_of::<u64>()..];
let data = &b[2 + Recipient::LEN + 2 * size_of::<u64>()..];
if data.len() as u64 != data_len {
return Err(error::Error::new(
ErrorKind::MalformedRequest,
@@ -104,11 +131,12 @@ impl ClientRequest {
with_reply_surb,
recipient,
message: data.to_vec(),
connection_id,
})
}
// REPLY_REQUEST_TAG || surb_len || surb || message_len || message
fn serialize_reply(message: Vec<u8>, reply_surb: ReplySurb) -> Vec<u8> {
fn serialize_reply(message: Vec<u8>, reply_surb: &ReplySurb) -> Vec<u8> {
let reply_surb_bytes = reply_surb.to_bytes();
let surb_len_bytes = (reply_surb_bytes.len() as u64).to_be_bytes();
let message_len_bytes = (message.len() as u64).to_be_bytes();
@@ -202,20 +230,79 @@ impl ClientRequest {
ClientRequest::SelfAddress
}
// CLOSED_CONNECTION_REQUEST_TAG
fn serialize_closed_connection(connection_id: u64) -> Vec<u8> {
let conn_id_bytes = connection_id.to_be_bytes();
std::iter::once(CLOSED_CONNECTION_REQUEST_TAG)
.chain(conn_id_bytes.iter().copied())
.collect()
}
// CLOSED_CONNECTION_REQUEST_TAG
fn deserialize_closed_connection(b: &[u8]) -> Result<Self, error::Error> {
if b.len() != 1 + size_of::<u64>() {
return Err(error::Error::new(
ErrorKind::MalformedRequest,
"the received closed connection has invalid length".to_string(),
));
}
// this MUST match because it was called by 'deserialize'
debug_assert_eq!(b[0], CLOSED_CONNECTION_REQUEST_TAG);
let mut connection_id_bytes = [0u8; size_of::<u64>()];
connection_id_bytes.copy_from_slice(&b[1..=size_of::<u64>()]);
let connection_id = u64::from_be_bytes(connection_id_bytes);
Ok(ClientRequest::ClosedConnection(connection_id))
}
// GET_LANE_QUEUE_LENGHT_TAG
fn serialize_get_lane_queue_lengths(connection_id: u64) -> Vec<u8> {
let conn_id_bytes = connection_id.to_be_bytes();
std::iter::once(GET_LANE_QUEUE_LENGHT_TAG)
.chain(conn_id_bytes.iter().copied())
.collect()
}
// GET_LANE_QUEUE_LENGHT_TAG
fn deserialize_get_lane_queue_length(b: &[u8]) -> Result<Self, error::Error> {
if b.len() != 1 + size_of::<u64>() {
return Err(error::Error::new(
ErrorKind::MalformedRequest,
"the received get lane queue length has invalid length".to_string(),
));
}
// this MUST match because it was called by 'deserialize'
debug_assert_eq!(b[0], GET_LANE_QUEUE_LENGHT_TAG);
let mut connection_id_bytes = [0u8; size_of::<u64>()];
connection_id_bytes.copy_from_slice(&b[1..=size_of::<u64>()]);
let connection_id = u64::from_be_bytes(connection_id_bytes);
Ok(ClientRequest::GetLaneQueueLength(connection_id))
}
pub fn serialize(self) -> Vec<u8> {
match self {
ClientRequest::Send {
recipient,
message,
with_reply_surb,
} => Self::serialize_send(recipient, message, with_reply_surb),
connection_id,
} => Self::serialize_send(recipient, message, with_reply_surb, connection_id),
ClientRequest::Reply {
message,
reply_surb,
} => Self::serialize_reply(message, reply_surb),
} => Self::serialize_reply(message, &reply_surb),
ClientRequest::SelfAddress => Self::serialize_self_address(),
ClientRequest::ClosedConnection(id) => Self::serialize_closed_connection(id),
ClientRequest::GetLaneQueueLength(id) => Self::serialize_get_lane_queue_lengths(id),
}
}
@@ -245,15 +332,17 @@ impl ClientRequest {
SEND_REQUEST_TAG => Self::deserialize_send(b),
REPLY_REQUEST_TAG => Self::deserialize_reply(b),
SELF_ADDRESS_REQUEST_TAG => Ok(Self::deserialize_self_address(b)),
CLOSED_CONNECTION_REQUEST_TAG => Self::deserialize_closed_connection(b),
GET_LANE_QUEUE_LENGHT_TAG => Self::deserialize_get_lane_queue_length(b),
n => Err(error::Error::new(
ErrorKind::UnknownRequest,
format!("type {}", n),
format!("type {n}"),
)),
}
}
pub fn try_from_binary(raw_req: Vec<u8>) -> Result<Self, error::Error> {
Self::deserialize(&raw_req)
pub fn try_from_binary(raw_req: &[u8]) -> Result<Self, error::Error> {
Self::deserialize(raw_req)
}
pub fn try_from_text(raw_req: String) -> Result<Self, error::Error> {
@@ -280,6 +369,7 @@ mod tests {
recipient,
message: b"foomp".to_vec(),
with_reply_surb: false,
connection_id: Some(42),
};
let bytes = send_request_no_surb.serialize();
@@ -289,10 +379,12 @@ mod tests {
recipient,
message,
with_reply_surb,
connection_id,
} => {
assert_eq!(recipient.to_string(), recipient_string);
assert_eq!(message, b"foomp".to_vec());
assert!(!with_reply_surb)
assert!(!with_reply_surb);
assert_eq!(connection_id, Some(42))
}
_ => unreachable!(),
}
@@ -301,6 +393,7 @@ mod tests {
recipient,
message: b"foomp".to_vec(),
with_reply_surb: true,
connection_id: None,
};
let bytes = send_request_surb.serialize();
@@ -310,10 +403,12 @@ mod tests {
recipient,
message,
with_reply_surb,
connection_id,
} => {
assert_eq!(recipient.to_string(), recipient_string);
assert_eq!(message, b"foomp".to_vec());
assert!(with_reply_surb)
assert!(with_reply_surb);
assert_eq!(connection_id, None)
}
_ => unreachable!(),
}
@@ -352,4 +447,15 @@ mod tests {
_ => unreachable!(),
}
}
#[test]
fn close_connection_request_serialization_works() {
let close_connection_request = ClientRequest::ClosedConnection(42);
let bytes = close_connection_request.serialize();
let recovered = ClientRequest::deserialize(&bytes).unwrap();
match recovered {
ClientRequest::ClosedConnection(id) => assert_eq!(id, 42),
_ => unreachable!(),
}
}
}
@@ -23,10 +23,14 @@ pub const RECEIVED_RESPONSE_TAG: u8 = 0x01;
/// Value tag representing [`SelfAddress`] variant of the [`ServerResponse`]
pub const SELF_ADDRESS_RESPONSE_TAG: u8 = 0x02;
/// Value tag representing [`LaneQueueLength`] variant of the [`ServerResponse`]
pub const LANE_QUEUE_LENGTH_RESPONSE_TAG: u8 = 0x03;
#[derive(Debug)]
pub enum ServerResponse {
Received(ReconstructedMessage),
SelfAddress(Recipient),
LaneQueueLength(u64, usize),
Error(error::Error),
}
@@ -193,6 +197,31 @@ impl ServerResponse {
Ok(ServerResponse::SelfAddress(recipient))
}
// LANE_QUEUE_LENGTH_RESPONSE_TAG || lane || queue_length
fn serialize_lane_queue_length(lane: u64, queue_length: usize) -> Vec<u8> {
std::iter::once(LANE_QUEUE_LENGTH_RESPONSE_TAG)
.chain(lane.to_be_bytes().iter().cloned())
.chain(queue_length.to_be_bytes().iter().cloned())
.collect()
}
// LANE_QUEUE_LENGTH_RESPONSE_TAG || lane || queue_length
fn deserialize_lane_queue_length(b: &[u8]) -> Result<Self, error::Error> {
// this MUST match because it was called by 'deserialize'
debug_assert_eq!(b[0], LANE_QUEUE_LENGTH_RESPONSE_TAG);
let mut lane_bytes = [0u8; size_of::<u64>()];
lane_bytes.copy_from_slice(&b[1..=size_of::<u64>()]);
let lane = u64::from_be_bytes(lane_bytes);
let mut queue_length_bytes = [0u8; size_of::<usize>()];
queue_length_bytes
.copy_from_slice(&b[1 + size_of::<u64>()..1 + size_of::<u64>() + size_of::<usize>()]);
let queue_length = usize::from_be_bytes(queue_length_bytes);
Ok(ServerResponse::LaneQueueLength(lane, queue_length))
}
// ERROR_RESPONSE_TAG || err_code || msg_len || msg
fn serialize_error(error: error::Error) -> Vec<u8> {
let message_len_bytes = (error.message.len() as u64).to_be_bytes();
@@ -272,6 +301,9 @@ impl ServerResponse {
Self::serialize_received(reconstructed_message)
}
ServerResponse::SelfAddress(address) => Self::serialize_self_address(address),
ServerResponse::LaneQueueLength(lane, queue_length) => {
Self::serialize_lane_queue_length(lane, queue_length)
}
ServerResponse::Error(err) => Self::serialize_error(err),
}
}
@@ -302,6 +334,7 @@ impl ServerResponse {
match response_tag {
RECEIVED_RESPONSE_TAG => Self::deserialize_received(b),
SELF_ADDRESS_RESPONSE_TAG => Self::deserialize_self_address(b),
LANE_QUEUE_LENGTH_RESPONSE_TAG => Self::deserialize_lane_queue_length(b),
ERROR_RESPONSE_TAG => Self::deserialize_error(b),
n => Err(error::Error::new(
ErrorKind::UnknownResponse,
@@ -378,6 +411,20 @@ mod tests {
}
}
#[test]
fn lane_queue_length_response_serialization_works() {
let lane_queue_length_response = ServerResponse::LaneQueueLength(13, 42);
let bytes = lane_queue_length_response.serialize();
let recovered = ServerResponse::deserialize(&bytes).unwrap();
match recovered {
ServerResponse::LaneQueueLength(lane, queue_length) => {
assert_eq!(lane, 13);
assert_eq!(queue_length, 42)
}
_ => unreachable!(),
}
}
#[test]
fn error_response_serialization_works() {
let dummy_error = error::Error::new(ErrorKind::UnknownRequest, "foomp message".to_string());
@@ -20,6 +20,7 @@ pub(super) enum ClientRequestText {
message: String,
recipient: String,
with_reply_surb: bool,
connection_id: Option<u64>,
},
SelfAddress,
#[serde(rename_all = "camelCase")]
@@ -46,6 +47,7 @@ impl TryInto<ClientRequest> for ClientRequestText {
message,
recipient,
with_reply_surb,
connection_id,
} => {
let message_bytes = message.into_bytes();
let recipient = Recipient::try_from_base58_string(recipient).map_err(|err| {
@@ -56,6 +58,7 @@ impl TryInto<ClientRequest> for ClientRequestText {
message: message_bytes,
recipient,
with_reply_surb,
connection_id,
})
}
ClientRequestText::SelfAddress => Ok(ClientRequest::SelfAddress),
@@ -91,6 +94,10 @@ pub(super) enum ServerResponseText {
SelfAddress {
address: String,
},
LaneQueueLength {
lane: u64,
queue_length: usize,
},
Error {
message: String,
},
@@ -132,6 +139,9 @@ impl From<ServerResponse> for ServerResponseText {
ServerResponse::SelfAddress(recipient) => ServerResponseText::SelfAddress {
address: recipient.to_string(),
},
ServerResponse::LaneQueueLength(lane, queue_length) => {
ServerResponseText::LaneQueueLength { lane, queue_length }
}
ServerResponse::Error(err) => ServerResponseText::Error {
message: err.to_string(),
},
+2
View File
@@ -26,6 +26,7 @@ url = "2.2"
# internal
client-core = { path = "../client-core" }
client-connections = { path = "../../common/client-connections" }
coconut-interface = { path = "../../common/coconut-interface", optional = true }
config = { path = "../../common/config" }
completions = { path = "../../common/completions" }
@@ -45,6 +46,7 @@ task = { path = "../../common/task" }
topology = { path = "../../common/topology" }
validator-client = { path = "../../common/client-libs/validator-client", features = ["nymd-client"] }
version-checker = { path = "../../common/version-checker" }
tap = "1.0.1"
[features]
coconut = ["coconut-interface", "credentials", "gateway-requests/coconut", "gateway-client/coconut", "credentials/coconut", "client-core/coconut"]
@@ -23,6 +23,13 @@ id = '{{ client.id }}'
# to claim bandwidth without presenting bandwidth credentials.
disabled_credentials_mode = {{ client.disabled_credentials_mode }}
# Addresses to nymd validators via which the client can communicate with the chain.
validator_urls = [
{{#each client.validator_urls }}
'{{this}}',
{{/each}}
]
# Addresses to APIs running on validator from which the client gets the view of the network.
validator_api_urls = [
{{#each client.validator_api_urls }}
+88 -23
View File
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::sync::atomic::Ordering;
use std::error::Error;
use crate::client::config::Config;
use crate::error::Socks5ClientError;
@@ -9,6 +9,7 @@ use crate::socks::{
authentication::{AuthenticationMethods, Authenticator, User},
server::SphinxSocksServer,
};
use client_connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use client_core::client::cover_traffic_stream::LoopCoverTrafficStream;
use client_core::client::inbound_messages::{
InputMessage, InputMessageReceiver, InputMessageSender,
@@ -36,7 +37,9 @@ use gateway_client::{
use log::*;
use nymsphinx::addressing::clients::Recipient;
use nymsphinx::addressing::nodes::NodeIdentity;
use task::{wait_for_signal, ShutdownListener, ShutdownNotifier};
use tap::TapFallible;
use task::signal::wait_for_signal_and_error;
use task::{ShutdownListener, ShutdownNotifier};
pub mod config;
@@ -110,6 +113,7 @@ impl NymClient {
stream.start_with_shutdown(shutdown);
}
#[allow(clippy::too_many_arguments)]
fn start_real_traffic_controller(
&self,
topology_accessor: TopologyAccessor,
@@ -117,6 +121,8 @@ impl NymClient {
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
client_connection_rx: ConnectionCommandReceiver,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) {
let mut controller_config = client_core::client::real_messages_control::Config::new(
@@ -146,6 +152,8 @@ impl NymClient {
mix_sender,
topology_accessor,
reply_key_storage,
lane_queue_lengths,
client_connection_rx,
)
.start_with_shutdown(shutdown);
}
@@ -192,11 +200,22 @@ impl NymClient {
.expect("provided gateway id is invalid!");
#[cfg(feature = "coconut")]
let bandwidth_controller = BandwidthController::new(
credential_storage::initialise_storage(self.config.get_base().get_database_path())
.await,
self.config.get_base().get_validator_api_endpoints(),
);
let bandwidth_controller = {
let details = network_defaults::NymNetworkDetails::new_from_env();
let client_config = validator_client::Config::try_from_nym_network_details(&details)
.expect("failed to construct validator client config");
let client = validator_client::Client::new_query(client_config)
.expect("Could not construct query client");
let coconut_api_clients =
validator_client::CoconutApiClient::all_coconut_api_clients(&client)
.await
.expect("Could not query api clients");
BandwidthController::new(
credential_storage::initialise_storage(self.config.get_base().get_database_path())
.await,
coconut_api_clients,
)
};
#[cfg(not(feature = "coconut"))]
let bandwidth_controller = BandwidthController::new(
credential_storage::initialise_storage(self.config.get_base().get_database_path())
@@ -279,7 +298,9 @@ impl NymClient {
&self,
buffer_requester: ReceivedBufferRequestSender,
msg_input: InputMessageSender,
shutdown: ShutdownListener,
client_connection_tx: ConnectionCommandSender,
lane_queue_lengths: LaneQueueLengths,
mut shutdown: ShutdownListener,
) {
info!("Starting socks5 listener...");
let auth_methods = vec![AuthenticationMethods::NoAuth as u8];
@@ -291,34 +312,58 @@ impl NymClient {
authenticator,
self.config.get_provider_mix_address(),
self.as_mix_recipient(),
shutdown,
lane_queue_lengths,
shutdown.clone(),
);
tokio::spawn(async move { sphinx_socks.serve(msg_input, buffer_requester).await });
tokio::spawn(async move {
// Ideally we should have a fully fledged task manager to check for errors in all
// tasks.
// However, pragmatically, we start out by at least reporting errors for some of the
// tasks that interact with the outside world and can fail in normal operation, such as
// network issues.
// TODO: replace this by a generic solution, such as a task manager that stores all
// JoinHandles of all spawned tasks.
if let Err(res) = sphinx_socks
.serve(msg_input, buffer_requester, client_connection_tx)
.await
{
shutdown.send_we_stopped(Box::new(res));
}
});
}
/// blocking version of `start` method. Will run forever (or until SIGINT is sent)
pub async fn run_forever(&mut self) -> Result<(), Socks5ClientError> {
let mut shutdown = self.start().await?;
wait_for_signal().await;
pub async fn run_forever(&mut self) -> Result<(), Box<dyn Error + Send>> {
let mut shutdown = self
.start()
.await
.map_err(|err| Box::new(err) as Box<dyn Error + Send>)?;
let res = wait_for_signal_and_error(&mut shutdown).await;
log::info!("Sending shutdown");
client_core::client::SHUTDOWN_HAS_BEEN_SIGNALLED.store(true, Ordering::Relaxed);
shutdown.signal_shutdown().ok();
log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
shutdown.wait_for_shutdown().await;
log::info!("Stopping nym-socks5-client");
Ok(())
res
}
// Variant of `run_forever` that listends for remote control messages
pub async fn run_and_listen(
&mut self,
mut receiver: Socks5ControlMessageReceiver,
) -> Result<(), Socks5ClientError> {
let mut shutdown = self.start().await?;
tokio::select! {
) -> Result<(), Box<dyn Error + Send>> {
// Start the main task
let mut shutdown = self
.start()
.await
.map_err(|err| Box::new(err) as Box<dyn Error + Send>)?;
let res = tokio::select! {
biased;
message = receiver.next() => {
log::debug!("Received message: {:?}", message);
match message {
@@ -329,21 +374,26 @@ impl NymClient {
log::info!("Channel closed, stopping");
}
}
Ok(())
}
Some(msg) = shutdown.wait_for_error() => {
log::info!("Task error: {:?}", msg);
Err(msg)
}
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
Ok(())
},
}
};
log::info!("Sending shutdown");
client_core::client::SHUTDOWN_HAS_BEEN_SIGNALLED.store(true, Ordering::Relaxed);
shutdown.signal_shutdown().ok();
log::info!("Waiting for tasks to finish... (Press ctrl-c to force)");
shutdown.wait_for_shutdown().await;
log::info!("Stopping nym-socks5-client");
Ok(())
res
}
pub async fn start(&mut self) -> Result<ShutdownNotifier, Socks5ClientError> {
@@ -361,7 +411,7 @@ impl NymClient {
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
// channels responsible for controlling real messages
let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
@@ -369,7 +419,10 @@ impl NymClient {
let reply_key_storage =
ReplyKeyStorage::load(self.config.get_base().get_reply_encryption_key_store_path())
.expect("Failed to load reply key storage!");
.tap_err(|err| {
log::error!("Failed to load reply key storage - is it perhaps already in use?");
log::error!("{}", err);
})?;
// Shutdown notifier for signalling tasks to stop
let shutdown = ShutdownNotifier::default();
@@ -396,12 +449,22 @@ impl NymClient {
let sphinx_message_sender =
Self::start_mix_traffic_controller(gateway_client, shutdown.subscribe());
// Channel for announcing closed (socks5) connections by the controller.
// This will be forwarded to `OutQueueControl`
let (client_connection_tx, client_connection_rx) = mpsc::unbounded();
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections
let shared_lane_queue_lengths = LaneQueueLengths::new();
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
reply_key_storage,
ack_receiver,
input_receiver,
sphinx_message_sender.clone(),
client_connection_rx,
shared_lane_queue_lengths.clone(),
shutdown.subscribe(),
);
@@ -420,6 +483,8 @@ impl NymClient {
self.start_socks5_listener(
received_buffer_request_sender,
input_sender,
client_connection_tx,
shared_lane_queue_lengths,
shutdown.subscribe(),
);
+8 -3
View File
@@ -29,9 +29,13 @@ pub(crate) struct Init {
#[clap(long)]
force_register_gateway: bool,
/// Comma separated list of rest endpoints of the validators
/// Comma separated list of rest endpoints of the nymd validators
#[clap(long)]
validators: Option<String>,
nymd_validators: Option<String>,
/// Comma separated list of rest endpoints of the API validators
#[clap(long)]
api_validators: Option<String>,
/// Port for the socket to listen on in all subsequent runs
#[clap(short, long)]
@@ -52,7 +56,8 @@ pub(crate) struct Init {
impl From<Init> for OverrideConfig {
fn from(init_config: Init) -> Self {
OverrideConfig {
validators: init_config.validators,
nymd_validators: init_config.nymd_validators,
api_validators: init_config.api_validators,
port: init_config.port,
fastmode: init_config.fastmode,
#[cfg(feature = "coconut")]
+15 -4
View File
@@ -1,8 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::error::Error;
use crate::client::config::Config;
use crate::error::Socks5ClientError;
use clap::CommandFactory;
use clap::{Parser, Subcommand};
use completions::{fig_generate, ArgShell};
@@ -78,7 +79,8 @@ pub(crate) enum Commands {
// Configuration that can be overridden.
pub(crate) struct OverrideConfig {
validators: Option<String>,
nymd_validators: Option<String>,
api_validators: Option<String>,
port: Option<u16>,
fastmode: bool,
@@ -86,7 +88,7 @@ pub(crate) struct OverrideConfig {
enabled_credentials_mode: bool,
}
pub(crate) async fn execute(args: &Cli) -> Result<(), Socks5ClientError> {
pub(crate) async fn execute(args: &Cli) -> Result<(), Box<dyn Error + Send>> {
let bin_name = "nym-socks5-client";
match &args.command {
@@ -100,7 +102,16 @@ pub(crate) async fn execute(args: &Cli) -> Result<(), Socks5ClientError> {
}
pub(crate) fn override_config(mut config: Config, args: OverrideConfig) -> Config {
if let Some(raw_validators) = args.validators {
if let Some(raw_validators) = args.nymd_validators {
config
.get_base_mut()
.set_custom_validators(parse_validators(&raw_validators));
} else if let Ok(raw_validators) = std::env::var(network_defaults::var_names::NYMD_VALIDATOR) {
config
.get_base_mut()
.set_custom_validators(parse_validators(&raw_validators));
}
if let Some(raw_validators) = args.api_validators {
config
.get_base_mut()
.set_custom_validator_apis(parse_validators(&raw_validators));
+13 -6
View File
@@ -31,9 +31,13 @@ pub(crate) struct Run {
#[clap(long)]
gateway: Option<String>,
/// Comma separated list of rest endpoints of the validators
/// Comma separated list of rest endpoints of the nymd validators
#[clap(long)]
validators: Option<String>,
nymd_validators: Option<String>,
/// Comma separated list of rest endpoints of the API validators
#[clap(long)]
api_validators: Option<String>,
/// Port for the socket to listen on
#[clap(short, long)]
@@ -49,7 +53,8 @@ pub(crate) struct Run {
impl From<Run> for OverrideConfig {
fn from(run_config: Run) -> Self {
OverrideConfig {
validators: run_config.validators,
nymd_validators: run_config.nymd_validators,
api_validators: run_config.api_validators,
port: run_config.port,
fastmode: false,
@@ -81,14 +86,16 @@ fn version_check(cfg: &Config) -> bool {
}
}
pub(crate) async fn execute(args: &Run) -> Result<(), Socks5ClientError> {
pub(crate) async fn execute(args: &Run) -> Result<(), Box<dyn std::error::Error + Send>> {
let id = &args.id;
let mut config = match Config::load_from_file(Some(id)) {
Ok(cfg) => cfg,
Err(err) => {
error!("Failed to load config for {}. Are you sure you have run `init` before? (Error was: {})", id, err);
return Err(Socks5ClientError::FailedToLoadConfig(id.to_string()));
return Err(Box::new(Socks5ClientError::FailedToLoadConfig(
id.to_string(),
)));
}
};
@@ -97,7 +104,7 @@ pub(crate) async fn execute(args: &Run) -> Result<(), Socks5ClientError> {
if !version_check(&config) {
error!("failed the local version check");
return Err(Socks5ClientError::FailedLocalVersionCheck);
return Err(Box::new(Socks5ClientError::FailedLocalVersionCheck));
}
NymClient::new(config).run_forever().await
+10 -1
View File
@@ -1,8 +1,10 @@
use client_core::error::ClientCoreError;
use client_core::{client::reply_key_storage::ReplyKeyStorageError, error::ClientCoreError};
use crypto::asymmetric::identity::Ed25519RecoveryError;
use gateway_client::error::GatewayClientError;
use validator_client::ValidatorClientError;
use crate::socks::types::SocksProxyError;
#[derive(thiserror::Error, Debug)]
pub enum Socks5ClientError {
#[error("I/O error: {0}")]
@@ -15,9 +17,16 @@ pub enum Socks5ClientError {
ValidatorClientError(#[from] ValidatorClientError),
#[error("client-core error: {0}")]
ClientCoreError(#[from] ClientCoreError),
#[error("Reply key storage error: {0}")]
ReplyKeyStorageError(#[from] ReplyKeyStorageError),
#[error("SOCKS proxy error")]
SocksProxyError(SocksProxyError),
#[error("Failed to load config for: {0}")]
FailedToLoadConfig(String),
#[error("Failed local version check, client and config mismatch")]
FailedLocalVersionCheck,
#[error("Fail to bind address")]
FailToBindAddress,
}
+3 -2
View File
@@ -1,8 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::error::Error;
use clap::{crate_version, Parser};
use error::Socks5ClientError;
use logging::setup_logging;
use network_defaults::setup_env;
@@ -12,7 +13,7 @@ pub mod error;
pub mod socks;
#[tokio::main]
async fn main() -> Result<(), Socks5ClientError> {
async fn main() -> Result<(), Box<dyn Error + Send>> {
setup_logging();
println!("{}", banner());
+21 -5
View File
@@ -4,8 +4,8 @@ use super::authentication::{AuthenticationMethods, Authenticator, User};
use super::request::{SocksCommand, SocksRequest};
use super::types::{ResponseCode, SocksProxyError};
use super::{RESERVED, SOCKS_VERSION};
use client_core::client::inbound_messages::InputMessage;
use client_core::client::inbound_messages::InputMessageSender;
use client_connections::{LaneQueueLengths, TransmissionLane};
use client_core::client::inbound_messages::{InputMessage, InputMessageSender};
use futures::channel::mpsc;
use futures::task::{Context, Poll};
use log::*;
@@ -141,6 +141,7 @@ pub(crate) struct SocksClient {
service_provider: Recipient,
self_address: Recipient,
started_proxy: bool,
lane_queue_lengths: LaneQueueLengths,
shutdown_listener: ShutdownListener,
}
@@ -158,6 +159,7 @@ impl Drop for SocksClient {
impl SocksClient {
/// Create a new SOCKClient
#[allow(clippy::too_many_arguments)]
pub fn new(
stream: TcpStream,
authenticator: Authenticator,
@@ -165,6 +167,7 @@ impl SocksClient {
service_provider: Recipient,
controller_sender: ControllerSender,
self_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown_listener: ShutdownListener,
) -> Self {
let connection_id = Self::generate_random();
@@ -179,6 +182,7 @@ impl SocksClient {
service_provider,
self_address,
started_proxy: false,
lane_queue_lengths,
shutdown_listener,
}
}
@@ -198,6 +202,7 @@ impl SocksClient {
pub async fn shutdown(&mut self) -> Result<(), SocksProxyError> {
info!("client is shutting down its TCP stream");
self.stream.shutdown().await?;
self.shutdown_listener.mark_as_success();
Ok(())
}
@@ -230,8 +235,16 @@ impl SocksClient {
let req = Request::new_connect(self.connection_id, remote_address, self.self_address);
let msg = Message::Request(req);
let input_message = InputMessage::new_fresh(self.service_provider, msg.into_bytes(), false);
self.input_sender.unbounded_send(input_message).unwrap();
let input_message = InputMessage::new_fresh(
self.service_provider,
msg.into_bytes(),
false,
TransmissionLane::ConnectionId(self.connection_id),
);
self.input_sender
.send(input_message)
.await
.expect("InputMessageReceiver has stopped receiving!");
}
async fn run_proxy(&mut self, conn_receiver: ConnectionReceiver, remote_proxy_target: String) {
@@ -254,12 +267,14 @@ impl SocksClient {
conn_receiver,
input_sender,
connection_id,
Some(self.lane_queue_lengths.clone()),
self.shutdown_listener.clone(),
)
.run(move |conn_id, read_data, socket_closed| {
let provider_request = Request::new_send(conn_id, read_data, socket_closed);
let provider_message = Message::Request(provider_request);
InputMessage::new_fresh(recipient, provider_message.into_bytes(), false)
let lane = TransmissionLane::ConnectionId(conn_id);
InputMessage::new_fresh(recipient, provider_message.into_bytes(), false, lane)
})
.await
.into_inner();
@@ -304,6 +319,7 @@ impl SocksClient {
SocksCommand::UdpAssociate => unimplemented!(), // not handled
};
self.shutdown_listener.mark_as_success();
Ok(())
}
+14 -3
View File
@@ -1,3 +1,5 @@
use std::time::Duration;
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
@@ -18,9 +20,16 @@ pub(crate) struct MixnetResponseListener {
impl Drop for MixnetResponseListener {
fn drop(&mut self) {
self.buffer_requester
if let Err(err) = self
.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect)
.expect("the buffer request failed!")
{
if self.shutdown.is_shutdown_poll() {
log::debug!("The buffer request failed: {}", err);
} else {
log::error!("The buffer request failed: {}", err);
}
}
}
}
@@ -96,7 +105,9 @@ impl MixnetResponseListener {
}
}
}
assert!(self.shutdown.is_shutdown_poll());
tokio::time::timeout(Duration::from_secs(5), self.shutdown.recv())
.await
.expect("Task stopped without shutdown called");
log::debug!("MixnetResponseListener: Exiting");
}
}
+20 -9
View File
@@ -1,16 +1,17 @@
use crate::error::Socks5ClientError;
use super::authentication::Authenticator;
use super::client::SocksClient;
use super::{
mixnet_responses::MixnetResponseListener,
types::{ResponseCode, SocksProxyError},
};
use super::{mixnet_responses::MixnetResponseListener, types::ResponseCode};
use client_connections::{ConnectionCommandSender, LaneQueueLengths};
use client_core::client::{
inbound_messages::InputMessageSender, received_buffer::ReceivedBufferRequestSender,
};
use log::*;
use nymsphinx::addressing::clients::Recipient;
use proxy_helpers::connection_controller::Controller;
use proxy_helpers::connection_controller::{BroadcastActiveConnections, Controller};
use std::net::SocketAddr;
use tap::TapFallible;
use task::ShutdownListener;
use tokio::net::TcpListener;
@@ -20,6 +21,7 @@ pub struct SphinxSocksServer {
listening_address: SocketAddr,
service_provider: Recipient,
self_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
}
@@ -30,6 +32,7 @@ impl SphinxSocksServer {
authenticator: Authenticator,
service_provider: Recipient,
self_address: Recipient,
lane_queue_lengths: LaneQueueLengths,
shutdown: ShutdownListener,
) -> Self {
// hardcode ip as we (presumably) ONLY want to listen locally. If we change it, we can
@@ -41,6 +44,7 @@ impl SphinxSocksServer {
listening_address: format!("{}:{}", ip, port).parse().unwrap(),
service_provider,
self_address,
lane_queue_lengths,
shutdown,
}
}
@@ -51,13 +55,19 @@ impl SphinxSocksServer {
&mut self,
input_sender: InputMessageSender,
buffer_requester: ReceivedBufferRequestSender,
) -> Result<(), SocksProxyError> {
let listener = TcpListener::bind(self.listening_address).await.unwrap();
client_connection_tx: ConnectionCommandSender,
) -> Result<(), Socks5ClientError> {
let listener = TcpListener::bind(self.listening_address)
.await
.tap_err(|err| log::error!("Failed to bind to address: {err}"))?;
info!("Serving Connections...");
// controller for managing all active connections
let (mut active_streams_controller, controller_sender) =
Controller::new(self.shutdown.clone());
let (mut active_streams_controller, controller_sender) = Controller::new(
client_connection_tx,
BroadcastActiveConnections::Off,
self.shutdown.clone(),
);
tokio::spawn(async move {
active_streams_controller.run().await;
});
@@ -83,6 +93,7 @@ impl SphinxSocksServer {
self.service_provider,
controller_sender.clone(),
self.self_address,
self.lane_queue_lengths.clone(),
self.shutdown.clone(),
);
+8 -6
View File
@@ -1,7 +1,7 @@
[package]
name = "nym-client-wasm"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jedrzej Stuczynski <andrew@nymtech.net>"]
version = "1.0.1"
version = "1.1.0"
edition = "2021"
keywords = ["nym", "sphinx", "wasm", "webassembly", "privacy", "client"]
license = "Apache-2.0"
@@ -19,16 +19,18 @@ coconut = ["coconut-interface", "credentials", "gateway-client/coconut"]
[dependencies]
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde-wasm-bindgen = "0.4"
wasm-bindgen = { version = "=0.2.83", features = ["serde-serialize"] }
wasm-bindgen-futures = "0.4"
js-sys = "0.3"
rand = { version = "0.7.3", features = ["wasm-bindgen"] }
serde = { version = "1.0", features = ["derive"] }
serde-wasm-bindgen = "0.4"
tokio = { version = "1.21.2", features = ["sync"] }
url = "2.2"
wasm-bindgen = { version = "=0.2.83", features = ["serde-serialize"] }
wasm-bindgen-futures = "0.4"
# internal
client-core = { path = "../client-core", default-features = false, features = ["wasm"] }
client-connections = { path = "../../common/client-connections" }
coconut-interface = { path = "../../common/coconut-interface", optional = true }
credentials = { path = "../../common/credentials", optional = true }
crypto = { path = "../../common/crypto" }
@@ -59,4 +61,4 @@ wasm-opt = true
[profile.release]
lto = true
opt-level = 'z'
opt-level = 'z'
+21 -4
View File
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use self::config::Config;
use client_connections::{ConnectionCommandReceiver, LaneQueueLengths, TransmissionLane};
use client_core::client::{
cover_traffic_stream::LoopCoverTrafficStream,
inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender},
@@ -127,6 +128,8 @@ impl NymClient {
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
client_connection_rx: ConnectionCommandReceiver,
lane_queue_lengths: LaneQueueLengths,
) {
let mut controller_config = real_messages_control::Config::new(
self.key_manager.ack_key(),
@@ -151,6 +154,8 @@ impl NymClient {
input_receiver,
mix_sender,
topology_accessor,
lane_queue_lengths,
client_connection_rx,
)
.start();
}
@@ -321,12 +326,16 @@ impl NymClient {
let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
// channels responsible for controlling real messages
let (input_sender, input_receiver) = mpsc::unbounded::<InputMessage>();
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor = TopologyAccessor::new();
// Channel that the real traffix controller can listed to for closing connections.
// Currently unused in the wasm client.
let (_client_connection_tx, client_connection_rx) = mpsc::unbounded();
// the components are started in very specific order. Unless you know what you are doing,
// do not change that.
self.start_topology_refresher(shared_topology_accessor.clone())
@@ -346,11 +355,17 @@ impl NymClient {
// The MixTrafficController then sends the actual traffic
let sphinx_message_sender = Self::start_mix_traffic_controller(gateway_client);
// Shared queue length data. Published by the `OutQueueController` in the client, and used
// primarily to throttle incoming connections
let shared_lane_queue_lengths = LaneQueueLengths::new();
self.start_real_traffic_controller(
shared_topology_accessor.clone(),
ack_receiver,
input_receiver,
sphinx_message_sender.clone(),
client_connection_rx,
shared_lane_queue_lengths,
);
if !self.config.debug.disable_loop_cover_traffic_stream {
@@ -376,14 +391,16 @@ impl NymClient {
console_log!("Sending {} bytes to {}", message.len(), recipient);
let recipient = Recipient::try_from_base58_string(recipient).unwrap();
let lane = TransmissionLane::General;
let input_msg = InputMessage::new_fresh(recipient, message, false);
let input_msg = InputMessage::new_fresh(recipient, message, false, lane);
self.input_tx
.as_ref()
.expect("start method was not called before!")
.unbounded_send(input_msg)
.unwrap();
.send(input_msg)
.await
.expect("InputMessageReceiver has stopped receiving!");
self
}
+1 -1
View File
@@ -6,7 +6,7 @@ use wasm_bindgen::prelude::*;
#[wasm_bindgen]
pub async fn get_gateway(api_server: String, preferred: Option<String>) -> GatewayEndpoint {
let validator_client = validator_client::ApiClient::new(api_server.parse().unwrap());
let validator_client = validator_client::client::ApiClient::new(api_server.parse().unwrap());
let gateways = match validator_client.get_cached_gateways().await {
Err(err) => panic!("failed to obtain list of all gateways - {}", err),
+10
View File
@@ -0,0 +1,10 @@
[package]
name = "client-connections"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3"
log = "0.4.17"
+111
View File
@@ -0,0 +1,111 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use futures::channel::mpsc;
pub type ConnectionId = u64;
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
pub enum TransmissionLane {
General,
Reply,
Retransmission,
Control,
ConnectionId(ConnectionId),
}
/// Used by the connection controller to report current state for client connections.
pub type ConnectionCommandSender = mpsc::UnboundedSender<ConnectionCommand>;
pub type ConnectionCommandReceiver = mpsc::UnboundedReceiver<ConnectionCommand>;
pub enum ConnectionCommand {
// Announce that at a connection was closed. E.g the `OutQueueControl` uses this to discard
// transmission lanes.
Close(ConnectionId),
// In the network requester for example, we usually want to broadcast active connections
// regularly, so we know what connections we need to request lane queue lengths for from the
// client.
// In the socks5-client, this is not needed since have direct access to the lane queue lengths.
ActiveConnections(Vec<ConnectionId>),
}
// The `OutQueueControl` publishes the backlog per lane, primarily so that upstream can slow down
// if needed.
#[derive(Clone, Debug)]
pub struct LaneQueueLengths(std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>);
impl LaneQueueLengths {
pub fn new() -> Self {
LaneQueueLengths(std::sync::Arc::new(std::sync::Mutex::new(
LaneQueueLengthsInner {
map: HashMap::new(),
},
)))
}
pub fn set(&mut self, lane: &TransmissionLane, lane_length: Option<usize>) {
match self.0.lock() {
Ok(mut inner) => {
if let Some(length) = lane_length {
inner
.map
.entry(*lane)
.and_modify(|e| *e = length)
.or_insert(length);
} else {
inner.map.remove(lane);
}
}
Err(err) => log::warn!("Failed to set lane queue length: {err}"),
}
}
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
match self.0.lock() {
Ok(inner) => inner.get(lane),
Err(err) => {
log::warn!("Failed to get lane queue length: {err}");
None
}
}
}
}
impl Default for LaneQueueLengths {
fn default() -> Self {
Self::new()
}
}
impl std::ops::Deref for LaneQueueLengths {
type Target = std::sync::Arc<std::sync::Mutex<LaneQueueLengthsInner>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug)]
pub struct LaneQueueLengthsInner {
pub map: HashMap<TransmissionLane, usize>,
}
impl LaneQueueLengthsInner {
pub fn get(&self, lane: &TransmissionLane) -> Option<usize> {
self.map.get(lane).copied()
}
pub fn values(&self) -> impl Iterator<Item = &usize> {
self.map.values()
}
pub fn modify<F>(&mut self, lane: &TransmissionLane, f: F)
where
F: FnOnce(&mut usize),
{
self.map.entry(*lane).and_modify(f);
}
}
@@ -17,6 +17,8 @@ use credential_storage::error::StorageError;
#[cfg(feature = "coconut")]
use std::str::FromStr;
#[cfg(feature = "coconut")]
use validator_client::client::CoconutApiClient;
#[cfg(feature = "coconut")]
use {
coconut_interface::Base58,
credentials::coconut::{
@@ -29,7 +31,7 @@ pub struct BandwidthController<St: Storage> {
#[allow(dead_code)]
storage: St,
#[cfg(feature = "coconut")]
validator_endpoints: Vec<url::Url>,
coconut_api_clients: Vec<CoconutApiClient>,
}
impl<St> BandwidthController<St>
@@ -37,10 +39,10 @@ where
St: Storage + Clone + 'static,
{
#[cfg(feature = "coconut")]
pub fn new(storage: St, validator_endpoints: Vec<url::Url>) -> Self {
pub fn new(storage: St, coconut_api_clients: Vec<CoconutApiClient>) -> Self {
BandwidthController {
storage,
validator_endpoints,
coconut_api_clients,
}
}
@@ -53,7 +55,7 @@ where
pub async fn prepare_coconut_credential(
&self,
) -> Result<coconut_interface::Credential, GatewayClientError> {
let verification_key = obtain_aggregate_verification_key(&self.validator_endpoints).await?;
let verification_key = obtain_aggregate_verification_key(&self.coconut_api_clients).await?;
let bandwidth_credential = self.storage.get_next_coconut_credential().await?;
let voucher_value = u64::from_str(&bandwidth_credential.voucher_value)
.map_err(|_| StorageError::InconsistentData)?;
+21 -10
View File
@@ -2,19 +2,13 @@
// SPDX-License-Identifier: Apache-2.0
use crate::bandwidth::BandwidthController;
use crate::cleanup_socket_message;
use crate::error::GatewayClientError;
use crate::packet_router::PacketRouter;
pub use crate::packet_router::{
AcknowledgementReceiver, AcknowledgementSender, MixnetMessageReceiver, MixnetMessageSender,
};
use crate::socket_state::{PartiallyDelegated, SocketState};
#[cfg(target_arch = "wasm32")]
use crate::wasm_storage::PersistentStorage;
#[cfg(feature = "coconut")]
use coconut_interface::Credential;
#[cfg(not(target_arch = "wasm32"))]
use credential_storage::PersistentStorage;
use crate::{cleanup_socket_message, try_decrypt_binary_message};
use crypto::asymmetric::identity;
use futures::{FutureExt, SinkExt, StreamExt};
use gateway_requests::authentication::encrypted_address::EncryptedAddressBytes;
@@ -28,13 +22,20 @@ use rand::rngs::OsRng;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use task::ShutdownListener;
use tungstenite::protocol::Message;
#[cfg(feature = "coconut")]
use coconut_interface::Credential;
#[cfg(not(target_arch = "wasm32"))]
use credential_storage::PersistentStorage;
#[cfg(not(target_arch = "wasm32"))]
use task::ShutdownListener;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::connect_async;
#[cfg(target_arch = "wasm32")]
use crate::wasm_storage::PersistentStorage;
#[cfg(target_arch = "wasm32")]
use wasm_timer;
#[cfg(target_arch = "wasm32")]
@@ -306,6 +307,8 @@ impl GatewayClient {
let m_shutdown = self.shutdown.clone();
async {
if let Some(mut s) = m_shutdown {
// TODO: fix this by marking as success _after_ the select
s.mark_as_success();
s.recv().await
} else {
std::future::pending::<()>().await
@@ -336,7 +339,15 @@ impl GatewayClient {
};
match ws_msg {
Message::Binary(bin_msg) => {
if let Err(err) = self.packet_router.route_received(vec![bin_msg]) {
// if we have established the shared key already, attempt to use it for decryption
// otherwise there's not much we can do apart from just routing what we have on hand
if let Some(shared_keys) = &self.shared_key {
if let Some(plaintext) = try_decrypt_binary_message(bin_msg, shared_keys) {
if let Err(err) = self.packet_router.route_received(vec![plaintext]) {
log::warn!("Route received failed: {:?}", err);
}
}
} else if let Err(err) = self.packet_router.route_received(vec![bin_msg]) {
log::warn!("Route received failed: {:?}", err);
}
}
@@ -3,6 +3,9 @@
use crate::error::GatewayClientError;
pub use client::GatewayClient;
use gateway_requests::registration::handshake::SharedKeys;
use gateway_requests::BinaryResponse;
use log::warn;
pub use packet_router::{
AcknowledgementReceiver, AcknowledgementSender, MixnetMessageReceiver, MixnetMessageSender,
};
@@ -37,3 +40,21 @@ pub(crate) fn cleanup_socket_messages(
None => Err(GatewayClientError::ConnectionAbruptlyClosed),
}
}
pub(crate) fn try_decrypt_binary_message(
bin_msg: Vec<u8>,
shared_keys: &SharedKeys,
) -> Option<Vec<u8>> {
match BinaryResponse::try_from_encrypted_tagged_bytes(bin_msg, shared_keys) {
Ok(bin_response) => match bin_response {
BinaryResponse::PushedMixMessage(plaintext) => Some(plaintext),
},
Err(err) => {
warn!(
"message received from the gateway was malformed! - {:?}",
err
);
None
}
}
}
@@ -1,14 +1,13 @@
// Copyright 2021-2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cleanup_socket_messages;
use crate::error::GatewayClientError;
use crate::packet_router::PacketRouter;
use crate::{cleanup_socket_messages, try_decrypt_binary_message};
use futures::channel::oneshot;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use gateway_requests::registration::handshake::SharedKeys;
use gateway_requests::BinaryResponse;
use log::*;
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
@@ -50,21 +49,9 @@ impl PartiallyDelegated {
match ws_msg {
Message::Binary(bin_msg) => {
// this function decrypts the request and checks the MAC
let plaintext = match BinaryResponse::try_from_encrypted_tagged_bytes(
bin_msg, shared_key,
) {
Ok(bin_response) => match bin_response {
BinaryResponse::PushedMixMessage(plaintext) => plaintext,
},
Err(err) => {
warn!(
"message received from the gateway was malformed! - {:?}",
err
);
continue;
}
};
plaintexts.push(plaintext)
if let Some(plaintext) = try_decrypt_binary_message(bin_msg, shared_key) {
plaintexts.push(plaintext)
}
}
// I think that in the future we should perhaps have some sequence number system, i.e.
// so each request/response pair can be easily identified, so that if messages are
@@ -10,10 +10,11 @@ rust-version = "1.56"
[dependencies]
base64 = "0.13"
colored = "2.0"
cw3 = "0.13.1"
coconut-dkg-common = { path = "../../cosmwasm-smart-contracts/coconut-dkg" }
contracts-common = { path = "../../cosmwasm-smart-contracts/contracts-common" }
mixnet-contract-common = { path= "../../cosmwasm-smart-contracts/mixnet-contract" }
vesting-contract-common = { path= "../../cosmwasm-smart-contracts/vesting-contract" }
contracts-common = { path = "../../cosmwasm-smart-contracts/contracts-common" }
coconut-bandwidth-contract-common = { path= "../../cosmwasm-smart-contracts/coconut-bandwidth-contract" }
multisig-contract-common = { path = "../../cosmwasm-smart-contracts/multisig-contract" }
vesting-contract = { path = "../../../contracts/vesting" }
@@ -37,6 +38,7 @@ async-trait = { version = "0.1.51", optional = true }
bip39 = { version = "1", features = ["rand"], optional = true }
config = { path = "../../config", optional = true }
cosmrs = { git = "https://github.com/neacsu/cosmos-rust", branch = "neacsu/feegrant_support", features = ["rpc", "bip32", "cosmwasm"], optional = true}
cw3 = { version = "0.13.4", optional = true }
prost = { version = "0.10", default-features = false, optional = true }
flate2 = { version = "1.0.20", optional = true }
sha2 = { version = "0.9.5", optional = true }
@@ -53,6 +55,7 @@ nymd-client = [
"bip39",
"config",
"cosmrs",
"cw3",
"prost",
"flate2",
"sha2",
+202 -18
View File
@@ -2,13 +2,21 @@
// SPDX-License-Identifier: Apache-2.0
use crate::{validator_api, ValidatorClientError};
use coconut_dkg_common::types::NodeIndex;
#[cfg(feature = "nymd-client")]
use coconut_dkg_common::{
dealer::ContractDealing, types::DealerDetails, verification_key::ContractVKShare,
};
#[cfg(feature = "nymd-client")]
use coconut_interface::Base58;
use coconut_interface::VerificationKey;
use mixnet_contract_common::mixnode::MixNodeDetails;
use mixnet_contract_common::MixId;
use mixnet_contract_common::{GatewayBond, IdentityKeyRef};
use url::Url;
#[cfg(feature = "nymd-client")]
use std::str::FromStr;
use validator_api_requests::coconut::{
BlindSignRequestBody, BlindedSignatureResponse, CosmosAddressResponse, VerificationKeyResponse,
VerifyCredentialBody, VerifyCredentialResponse,
BlindSignRequestBody, BlindedSignatureResponse, VerifyCredentialBody, VerifyCredentialResponse,
};
use validator_api_requests::models::{
GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
@@ -16,10 +24,12 @@ use validator_api_requests::models::{
};
#[cfg(feature = "nymd-client")]
use crate::nymd::traits::MixnetQueryClient;
use crate::nymd::traits::{DkgQueryClient, MixnetQueryClient, MultisigQueryClient};
#[cfg(feature = "nymd-client")]
use crate::nymd::{self, CosmWasmClient, NymdClient, QueryNymdClient, SigningNymdClient};
#[cfg(feature = "nymd-client")]
use cw3::ProposalResponse;
#[cfg(feature = "nymd-client")]
use mixnet_contract_common::{
mixnode::MixNodeBond,
pending_events::{PendingEpochEvent, PendingIntervalEvent},
@@ -27,6 +37,7 @@ use mixnet_contract_common::{
};
#[cfg(feature = "nymd-client")]
use network_defaults::NymNetworkDetails;
use url::Url;
#[cfg(feature = "nymd-client")]
use validator_api_requests::models::MixNodeBondAnnotated;
@@ -43,6 +54,9 @@ pub struct Config {
gateway_page_limit: Option<u32>,
mixnode_delegations_page_limit: Option<u32>,
rewarded_set_page_limit: Option<u32>,
dealers_page_limit: Option<u32>,
verification_key_page_limit: Option<u32>,
proposals_page_limit: Option<u32>,
}
#[cfg(feature = "nymd-client")]
@@ -72,6 +86,9 @@ impl Config {
gateway_page_limit: None,
mixnode_delegations_page_limit: None,
rewarded_set_page_limit: None,
dealers_page_limit: None,
verification_key_page_limit: None,
proposals_page_limit: None,
})
}
@@ -119,6 +136,9 @@ pub struct Client<C> {
gateway_page_limit: Option<u32>,
mixnode_delegations_page_limit: Option<u32>,
rewarded_set_page_limit: Option<u32>,
dealers_page_limit: Option<u32>,
verification_key_page_limit: Option<u32>,
proposals_page_limit: Option<u32>,
// ideally they would have been read-only, but unfortunately rust doesn't have such features
pub validator_api: validator_api::Client,
@@ -145,6 +165,9 @@ impl Client<SigningNymdClient> {
gateway_page_limit: config.gateway_page_limit,
mixnode_delegations_page_limit: config.mixnode_delegations_page_limit,
rewarded_set_page_limit: config.rewarded_set_page_limit,
dealers_page_limit: config.dealers_page_limit,
verification_key_page_limit: config.verification_key_page_limit,
proposals_page_limit: config.proposals_page_limit,
validator_api: validator_api_client,
nymd: nymd_client,
})
@@ -178,6 +201,9 @@ impl Client<QueryNymdClient> {
gateway_page_limit: config.gateway_page_limit,
mixnode_delegations_page_limit: config.mixnode_delegations_page_limit,
rewarded_set_page_limit: config.rewarded_set_page_limit,
dealers_page_limit: config.dealers_page_limit,
verification_key_page_limit: config.verification_key_page_limit,
proposals_page_limit: config.proposals_page_limit,
validator_api: validator_api_client,
nymd: nymd_client,
})
@@ -519,6 +545,135 @@ impl<C> Client<C> {
Ok(events)
}
pub async fn get_all_nymd_current_dealers(
&self,
) -> Result<Vec<DealerDetails>, ValidatorClientError>
where
C: CosmWasmClient + Sync + Send,
{
let mut dealers = Vec::new();
let mut start_after = None;
loop {
let mut paged_response = self
.nymd
.get_current_dealers_paged(start_after.take(), self.dealers_page_limit)
.await?;
dealers.append(&mut paged_response.dealers);
if let Some(start_after_res) = paged_response.start_next_after {
start_after = Some(start_after_res.into_string())
} else {
break;
}
}
Ok(dealers)
}
pub async fn get_all_nymd_past_dealers(
&self,
) -> Result<Vec<DealerDetails>, ValidatorClientError>
where
C: CosmWasmClient + Sync + Send,
{
let mut dealers = Vec::new();
let mut start_after = None;
loop {
let mut paged_response = self
.nymd
.get_past_dealers_paged(start_after.take(), self.dealers_page_limit)
.await?;
dealers.append(&mut paged_response.dealers);
if let Some(start_after_res) = paged_response.start_next_after {
start_after = Some(start_after_res.into_string())
} else {
break;
}
}
Ok(dealers)
}
pub async fn get_all_nymd_epoch_dealings(
&self,
idx: usize,
) -> Result<Vec<ContractDealing>, ValidatorClientError>
where
C: CosmWasmClient + Sync + Send,
{
let mut dealings = Vec::new();
let mut start_after = None;
loop {
let mut paged_response = self
.nymd
.get_dealings_paged(idx, start_after.take(), self.dealers_page_limit)
.await?;
dealings.append(&mut paged_response.dealings);
if let Some(start_after_res) = paged_response.start_next_after {
start_after = Some(start_after_res.into_string())
} else {
break;
}
}
Ok(dealings)
}
pub async fn get_all_nymd_verification_key_shares(
&self,
) -> Result<Vec<ContractVKShare>, ValidatorClientError>
where
C: CosmWasmClient + Sync + Send,
{
let mut shares = Vec::new();
let mut start_after = None;
loop {
let mut paged_response = self
.nymd
.get_vk_shares_paged(start_after.take(), self.verification_key_page_limit)
.await?;
shares.append(&mut paged_response.shares);
if let Some(start_after_res) = paged_response.start_next_after {
start_after = Some(start_after_res.into_string())
} else {
break;
}
}
Ok(shares)
}
pub async fn get_all_nymd_proposals(
&self,
) -> Result<Vec<ProposalResponse>, ValidatorClientError>
where
C: CosmWasmClient + Sync + Send,
{
let mut proposals = Vec::new();
let mut start_after = None;
loop {
let mut paged_response = self
.nymd
.list_proposals(start_after.take(), self.proposals_page_limit)
.await?;
let last_id = paged_response.proposals.last().map(|prop| prop.id);
proposals.append(&mut paged_response.proposals);
if let Some(start_after_res) = last_id {
start_after = Some(start_after_res)
} else {
break;
}
}
Ok(proposals)
}
}
// validator-api wrappers
@@ -572,14 +727,53 @@ impl<C> Client<C> {
) -> Result<BlindedSignatureResponse, ValidatorClientError> {
Ok(self.validator_api.blind_sign(request_body).await?)
}
}
pub async fn get_coconut_verification_key(
&self,
) -> Result<VerificationKeyResponse, ValidatorClientError> {
Ok(self.validator_api.get_coconut_verification_key().await?)
#[derive(Clone)]
pub struct CoconutApiClient {
pub api_client: ApiClient,
pub verification_key: VerificationKey,
pub node_id: NodeIndex,
#[cfg(feature = "nymd-client")]
pub cosmos_address: cosmrs::AccountId,
}
#[cfg(feature = "nymd-client")]
impl CoconutApiClient {
pub async fn all_coconut_api_clients<C>(
nymd_client: &Client<C>,
) -> Result<Vec<Self>, ValidatorClientError>
where
C: CosmWasmClient + Sync + Send,
{
Ok(nymd_client
.get_all_nymd_verification_key_shares()
.await?
.into_iter()
.filter_map(Self::try_from)
.collect())
}
fn try_from(share: ContractVKShare) -> Option<Self> {
if share.verified {
if let Ok(url_address) = Url::parse(&share.announce_address) {
if let Ok(verification_key) = VerificationKey::try_from_bs58(&share.share) {
if let Ok(cosmos_address) = cosmrs::AccountId::from_str(share.owner.as_str()) {
return Some(CoconutApiClient {
api_client: ApiClient::new(url_address),
verification_key,
node_id: share.node_index,
cosmos_address,
});
}
}
}
}
None
}
}
#[derive(Clone)]
pub struct ApiClient {
pub validator_api: validator_api::Client,
// TODO: perhaps if we really need it at some (currently I don't see any reasons for it)
@@ -685,16 +879,6 @@ impl ApiClient {
.await?)
}
pub async fn get_coconut_verification_key(
&self,
) -> Result<VerificationKeyResponse, ValidatorClientError> {
Ok(self.validator_api.get_coconut_verification_key().await?)
}
pub async fn get_cosmos_address(&self) -> Result<CosmosAddressResponse, ValidatorClientError> {
Ok(self.validator_api.get_cosmos_address().await?)
}
pub async fn verify_bandwidth_credential(
&self,
request_body: &VerifyCredentialBody,
@@ -9,7 +9,8 @@ mod error;
pub mod nymd;
pub mod validator_api;
pub use crate::client::ApiClient;
#[cfg(feature = "nymd-client")]
pub use crate::client::{ApiClient, CoconutApiClient};
pub use crate::error::ValidatorClientError;
pub use validator_api_requests::*;
@@ -6,6 +6,9 @@ use cosmrs::tendermint::abci;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
pub use coconut_bandwidth_contract_common::event_attributes::*;
pub use coconut_dkg_common::event_attributes::*;
// it seems that currently validators just emit stringified events (which are also returned as part of deliverTx response)
// as theirs logs
#[derive(Debug, Serialize, Deserialize)]
@@ -65,6 +65,7 @@ pub struct Config {
pub(crate) bandwidth_claim_contract_address: Option<AccountId>,
pub(crate) coconut_bandwidth_contract_address: Option<AccountId>,
pub(crate) multisig_contract_address: Option<AccountId>,
pub(crate) coconut_dkg_contract_address: Option<AccountId>,
// TODO: add this in later commits
// pub(crate) gas_price: GasPrice,
}
@@ -118,6 +119,10 @@ impl Config {
details.contracts.multisig_contract_address.as_ref(),
prefix,
)?,
coconut_dkg_contract_address: Self::parse_optional_account(
details.contracts.coconut_dkg_contract_address.as_ref(),
prefix,
)?,
})
}
}
@@ -275,6 +280,14 @@ impl<C> NymdClient<C> {
self.config.multisig_contract_address.as_ref().unwrap()
}
// TODO: this should get changed into Result<&AccountId, NymdError> (or Option<&AccountId> in future commits
// note: what unwrap is doing here is just moving a failure that would have normally
// occurred in `connect` when attempting to parse an empty address,
// so it's not introducing new source of failure (just moves it)
pub fn coconut_dkg_contract_address(&self) -> &AccountId {
self.config.coconut_dkg_contract_address.as_ref().unwrap()
}
pub fn set_simulated_gas_multiplier(&mut self, multiplier: f32) {
self.simulated_gas_multiplier = multiplier;
}
@@ -0,0 +1,126 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::nymd::error::NymdError;
use crate::nymd::{CosmWasmClient, NymdClient};
use async_trait::async_trait;
use coconut_dkg_common::dealer::{
DealerDetailsResponse, PagedDealerResponse, PagedDealingsResponse,
};
use coconut_dkg_common::msg::QueryMsg as DkgQueryMsg;
use coconut_dkg_common::types::EpochState;
use coconut_dkg_common::verification_key::PagedVKSharesResponse;
use cosmrs::AccountId;
#[async_trait]
pub trait DkgQueryClient {
async fn get_current_epoch_state(&self) -> Result<EpochState, NymdError>;
async fn get_dealer_details(
&self,
address: &AccountId,
) -> Result<DealerDetailsResponse, NymdError>;
async fn get_current_dealers_paged(
&self,
start_after: Option<String>,
page_limit: Option<u32>,
) -> Result<PagedDealerResponse, NymdError>;
async fn get_past_dealers_paged(
&self,
start_after: Option<String>,
page_limit: Option<u32>,
) -> Result<PagedDealerResponse, NymdError>;
async fn get_dealings_paged(
&self,
idx: usize,
start_after: Option<String>,
page_limit: Option<u32>,
) -> Result<PagedDealingsResponse, NymdError>;
async fn get_vk_shares_paged(
&self,
start_after: Option<String>,
page_limit: Option<u32>,
) -> Result<PagedVKSharesResponse, NymdError>;
}
#[async_trait]
impl<C> DkgQueryClient for NymdClient<C>
where
C: CosmWasmClient + Send + Sync,
{
async fn get_current_epoch_state(&self) -> Result<EpochState, NymdError> {
let request = DkgQueryMsg::GetCurrentEpochState {};
self.client
.query_contract_smart(self.coconut_dkg_contract_address(), &request)
.await
}
async fn get_dealer_details(
&self,
address: &AccountId,
) -> Result<DealerDetailsResponse, NymdError> {
let request = DkgQueryMsg::GetDealerDetails {
dealer_address: address.to_string(),
};
self.client
.query_contract_smart(self.coconut_dkg_contract_address(), &request)
.await
}
async fn get_current_dealers_paged(
&self,
start_after: Option<String>,
page_limit: Option<u32>,
) -> Result<PagedDealerResponse, NymdError> {
let request = DkgQueryMsg::GetCurrentDealers {
start_after,
limit: page_limit,
};
self.client
.query_contract_smart(self.coconut_dkg_contract_address(), &request)
.await
}
async fn get_past_dealers_paged(
&self,
start_after: Option<String>,
page_limit: Option<u32>,
) -> Result<PagedDealerResponse, NymdError> {
let request = DkgQueryMsg::GetPastDealers {
start_after,
limit: page_limit,
};
self.client
.query_contract_smart(self.coconut_dkg_contract_address(), &request)
.await
}
async fn get_dealings_paged(
&self,
idx: usize,
start_after: Option<String>,
page_limit: Option<u32>,
) -> Result<PagedDealingsResponse, NymdError> {
let request = DkgQueryMsg::GetDealing {
idx: idx as u64,
limit: page_limit,
start_after,
};
self.client
.query_contract_smart(self.coconut_dkg_contract_address(), &request)
.await
}
async fn get_vk_shares_paged(
&self,
start_after: Option<String>,
page_limit: Option<u32>,
) -> Result<PagedVKSharesResponse, NymdError> {
let request = DkgQueryMsg::GetVerificationKeys {
limit: page_limit,
start_after,
};
self.client
.query_contract_smart(self.coconut_dkg_contract_address(), &request)
.await
}
}
@@ -0,0 +1,100 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::nymd::cosmwasm_client::types::ExecuteResult;
use crate::nymd::error::NymdError;
use crate::nymd::{Fee, NymdClient, SigningCosmWasmClient};
use async_trait::async_trait;
use coconut_dkg_common::msg::ExecuteMsg as DkgExecuteMsg;
use coconut_dkg_common::types::EncodedBTEPublicKeyWithProof;
use coconut_dkg_common::verification_key::VerificationKeyShare;
use contracts_common::dealings::ContractSafeBytes;
#[async_trait]
pub trait DkgSigningClient {
async fn register_dealer(
&self,
bte_key: EncodedBTEPublicKeyWithProof,
announce_address: String,
fee: Option<Fee>,
) -> Result<ExecuteResult, NymdError>;
async fn submit_dealing_bytes(
&self,
commitment: ContractSafeBytes,
fee: Option<Fee>,
) -> Result<ExecuteResult, NymdError>;
async fn submit_verification_key_share(
&self,
share: VerificationKeyShare,
fee: Option<Fee>,
) -> Result<ExecuteResult, NymdError>;
}
#[async_trait]
impl<C> DkgSigningClient for NymdClient<C>
where
C: SigningCosmWasmClient + Send + Sync,
{
async fn register_dealer(
&self,
bte_key: EncodedBTEPublicKeyWithProof,
announce_address: String,
fee: Option<Fee>,
) -> Result<ExecuteResult, NymdError> {
let req = DkgExecuteMsg::RegisterDealer {
bte_key_with_proof: bte_key,
announce_address,
};
self.client
.execute(
self.address(),
self.coconut_dkg_contract_address(),
&req,
fee.unwrap_or_default(),
format!("registering {} as a dealer", self.address()),
vec![],
)
.await
}
async fn submit_dealing_bytes(
&self,
dealing_bytes: ContractSafeBytes,
fee: Option<Fee>,
) -> Result<ExecuteResult, NymdError> {
let req = DkgExecuteMsg::CommitDealing { dealing_bytes };
self.client
.execute(
self.address(),
self.coconut_dkg_contract_address(),
&req,
fee.unwrap_or_default(),
"dealing commitment",
vec![],
)
.await
}
async fn submit_verification_key_share(
&self,
share: VerificationKeyShare,
fee: Option<Fee>,
) -> Result<ExecuteResult, NymdError> {
let req = DkgExecuteMsg::CommitVerificationKeyShare { share };
self.client
.execute(
self.address(),
self.coconut_dkg_contract_address(),
&req,
fee.unwrap_or_default(),
"verification key share commitment",
vec![],
)
.await
}
}
@@ -3,6 +3,8 @@
mod coconut_bandwidth_query_client;
mod coconut_bandwidth_signing_client;
mod dkg_query_client;
mod dkg_signing_client;
mod mixnet_query_client;
mod mixnet_signing_client;
mod multisig_query_client;
@@ -12,6 +14,8 @@ mod vesting_signing_client;
pub use coconut_bandwidth_query_client::CoconutBandwidthQueryClient;
pub use coconut_bandwidth_signing_client::CoconutBandwidthSigningClient;
pub use dkg_query_client::DkgQueryClient;
pub use dkg_signing_client::DkgSigningClient;
pub use mixnet_query_client::MixnetQueryClient;
pub use mixnet_signing_client::MixnetSigningClient;
pub use multisig_query_client::MultisigQueryClient;
@@ -4,13 +4,19 @@
use crate::nymd::error::NymdError;
use crate::nymd::{CosmWasmClient, NymdClient};
use multisig_contract_common::msg::{ProposalResponse, QueryMsg};
use cw3::{ProposalListResponse, ProposalResponse};
use multisig_contract_common::msg::QueryMsg;
use async_trait::async_trait;
#[async_trait]
pub trait MultisigQueryClient {
async fn get_proposal(&self, proposal_id: u64) -> Result<ProposalResponse, NymdError>;
async fn list_proposals(
&self,
start_after: Option<u64>,
limit: Option<u32>,
) -> Result<ProposalListResponse, NymdError>;
}
#[async_trait]
@@ -21,4 +27,15 @@ impl<C: CosmWasmClient + Sync + Send> MultisigQueryClient for NymdClient<C> {
.query_contract_smart(self.multisig_contract_address(), &request)
.await
}
async fn list_proposals(
&self,
start_after: Option<u64>,
limit: Option<u32>,
) -> Result<ProposalListResponse, NymdError> {
let request = QueryMsg::ListProposals { start_after, limit };
self.client
.query_contract_smart(self.multisig_contract_address(), &request)
.await
}
}
@@ -7,11 +7,11 @@ use crate::nymd::error::NymdError;
use crate::nymd::{Fee, NymdClient};
use coconut_bandwidth_contract_common::msg::ExecuteMsg as CoconutBandwidthExecuteMsg;
use cw3::Vote;
use multisig_contract_common::msg::ExecuteMsg;
use async_trait::async_trait;
use cosmwasm_std::{to_binary, Coin, CosmosMsg, WasmMsg};
use cw3::Vote;
#[async_trait]
pub trait MultisigSigningClient {
@@ -9,8 +9,7 @@ use reqwest::Response;
use serde::{Deserialize, Serialize};
use url::Url;
use validator_api_requests::coconut::{
BlindSignRequestBody, BlindedSignatureResponse, CosmosAddressResponse, VerificationKeyResponse,
VerifyCredentialBody, VerifyCredentialResponse,
BlindSignRequestBody, BlindedSignatureResponse, VerifyCredentialBody, VerifyCredentialResponse,
};
use validator_api_requests::models::{
GatewayCoreStatusResponse, GatewayStatusReportResponse, GatewayUptimeHistoryResponse,
@@ -27,6 +26,7 @@ type Params<'a, K, V> = &'a [(K, V)];
const NO_PARAMS: Params<'_, &'_ str, &'_ str> = &[];
#[derive(Clone)]
pub struct Client {
url: Url,
reqwest_client: reqwest::Client,
@@ -443,34 +443,6 @@ impl Client {
.await
}
pub async fn get_coconut_verification_key(
&self,
) -> Result<VerificationKeyResponse, ValidatorAPIError> {
self.query_validator_api(
&[
routes::API_VERSION,
routes::COCONUT_ROUTES,
routes::BANDWIDTH,
routes::COCONUT_VERIFICATION_KEY,
],
NO_PARAMS,
)
.await
}
pub async fn get_cosmos_address(&self) -> Result<CosmosAddressResponse, ValidatorAPIError> {
self.query_validator_api(
&[
routes::API_VERSION,
routes::COCONUT_ROUTES,
routes::BANDWIDTH,
routes::COCONUT_COSMOS_ADDRESS,
],
NO_PARAMS,
)
.await
}
pub async fn verify_bandwidth_credential(
&self,
request_body: &VerifyCredentialBody,
@@ -15,8 +15,6 @@ pub const BANDWIDTH: &str = "bandwidth";
pub const COCONUT_BLIND_SIGN: &str = "blind-sign";
pub const COCONUT_PARTIAL_BANDWIDTH_CREDENTIAL: &str = "partial-bandwidth-credential";
pub const COCONUT_VERIFICATION_KEY: &str = "verification-key";
pub const COCONUT_COSMOS_ADDRESS: &str = "cosmos-address";
pub const COCONUT_VERIFY_BANDWIDTH_CREDENTIAL: &str = "verify-bandwidth-credential";
pub const STATUS_ROUTES: &str = "status";
@@ -13,7 +13,7 @@ use validator_client::nymd::wallet::DirectSecp256k1HdWallet;
pub struct SignatureOutputJson {
pub account_id: String,
pub public_key: PublicKey,
pub signature: String,
pub signature_as_hex: String,
}
#[derive(Debug, Parser)]
@@ -46,7 +46,7 @@ pub fn sign(args: Args, prefix: &str, mnemonic: Option<bip39::Mnemonic>) {
let output = SignatureOutputJson {
account_id: account.address().to_string(),
public_key: account.public_key(),
signature: signature.to_string(),
signature_as_hex: signature.to_string(),
};
println!("{}", json!(output));
}
@@ -0,0 +1,4 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub const BANDWIDTH_PROPOSAL_ID: &str = "proposal_id";
@@ -1,4 +1,5 @@
pub mod deposit;
pub mod event_attributes;
pub mod events;
pub mod msg;
pub mod spend_credential;
@@ -0,0 +1,15 @@
[package]
name = "coconut-dkg-common"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
cosmwasm-std = "1.0.0"
cw-utils = "0.13.4"
schemars = "0.8"
serde = { version = "1.0.103", default-features = false, features = ["derive"] }
contracts-common = { path = "../contracts-common" }
multisig-contract-common = { path = "../multisig-contract" }
@@ -0,0 +1,102 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::types::{ContractSafeBytes, EncodedBTEPublicKeyWithProof, NodeIndex};
use cosmwasm_std::Addr;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub struct DealerDetails {
pub address: Addr,
pub bte_public_key_with_proof: EncodedBTEPublicKeyWithProof,
pub announce_address: String,
pub assigned_index: NodeIndex,
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum DealerType {
Current,
Past,
Unknown,
}
impl DealerType {
pub fn is_current(&self) -> bool {
matches!(&self, DealerType::Current)
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct DealerDetailsResponse {
pub details: Option<DealerDetails>,
pub dealer_type: DealerType,
}
impl DealerDetailsResponse {
pub fn new(details: Option<DealerDetails>, dealer_type: DealerType) -> Self {
DealerDetailsResponse {
details,
dealer_type,
}
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct PagedDealerResponse {
pub dealers: Vec<DealerDetails>,
pub per_page: usize,
pub start_next_after: Option<Addr>,
}
impl PagedDealerResponse {
pub fn new(
dealers: Vec<DealerDetails>,
per_page: usize,
start_next_after: Option<Addr>,
) -> Self {
PagedDealerResponse {
dealers,
per_page,
start_next_after,
}
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct ContractDealing {
pub dealing: ContractSafeBytes,
pub dealer: Addr,
}
impl ContractDealing {
pub fn new(dealing: ContractSafeBytes, dealer: Addr) -> Self {
ContractDealing { dealing, dealer }
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct PagedDealingsResponse {
pub dealings: Vec<ContractDealing>,
pub per_page: usize,
pub start_next_after: Option<Addr>,
}
impl PagedDealingsResponse {
pub fn new(
dealings: Vec<ContractDealing>,
per_page: usize,
start_next_after: Option<Addr>,
) -> Self {
PagedDealingsResponse {
dealings,
per_page,
start_next_after,
}
}
}
@@ -0,0 +1,5 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub const NODE_INDEX: &str = "node_index";
pub const DKG_PROPOSAL_ID: &str = "proposal_id";
@@ -0,0 +1,5 @@
pub mod dealer;
pub mod event_attributes;
pub mod msg;
pub mod types;
pub mod verification_key;
@@ -0,0 +1,69 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::types::{ContractSafeBytes, EncodedBTEPublicKeyWithProof};
use crate::verification_key::VerificationKeyShare;
use cosmwasm_std::Addr;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema)]
pub struct InstantiateMsg {
pub group_addr: String,
pub multisig_addr: String,
pub admin: String,
pub mix_denom: String,
}
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum ExecuteMsg {
RegisterDealer {
bte_key_with_proof: EncodedBTEPublicKeyWithProof,
announce_address: String,
},
CommitDealing {
dealing_bytes: ContractSafeBytes,
},
CommitVerificationKeyShare {
share: VerificationKeyShare,
},
VerifyVerificationKeyShare {
owner: Addr,
},
AdvanceEpochState {},
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum QueryMsg {
GetCurrentEpochState {},
GetDealerDetails {
dealer_address: String,
},
GetCurrentDealers {
limit: Option<u32>,
start_after: Option<String>,
},
GetPastDealers {
limit: Option<u32>,
start_after: Option<String>,
},
GetDealing {
idx: u64,
limit: Option<u32>,
start_after: Option<String>,
},
GetVerificationKeys {
limit: Option<u32>,
start_after: Option<String>,
},
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct MigrateMsg {}
@@ -0,0 +1,81 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
pub use crate::dealer::{DealerDetails, PagedDealerResponse};
pub use contracts_common::dealings::ContractSafeBytes;
pub use cosmwasm_std::{Addr, Coin};
pub type EncodedBTEPublicKeyWithProof = String;
pub type EncodedBTEPublicKeyWithProofRef<'a> = &'a str;
pub type NodeIndex = u64;
// 2 public attributes, 2 private attributes, 1 fixed for coconut credential
pub const TOTAL_DEALINGS: usize = 2 + 2 + 1;
// currently (it is still extremely likely to change, we might be able to get rid of verification key-related complaints),
// the epoch can be in the following states (in order):
// 1. PublicKeySubmission -> potential dealers are submitting their BTE and ed25519 public keys to participate in dealing exchange
// 2. DealingExchange -> the actual (off-chain) dealing exchange is happening
// 3. ComplaintSubmission -> receivers submitting evidence of other dealers sending malformed data
// 4. ComplaintVoting -> (if any complaints were submitted) receivers voting on the validity of the evidence provided
// 5. VerificationKeySubmission -> receivers submitting their partial (and master) verification keys
// 6. VerificationKeyMismatchSubmission -> receivers / watchers raising issue that the submitted VK are mismatched with their local derivations
// 7. VerificationKeyMismatchVoting -> (if any complaints were submitted) receivers voting on received mismatches
// 8. InProgress -> all receivers have all their secrets derived and all is good
//
// Note: It's important that the variant ordering is not changed otherwise it would mess up the derived `PartialOrd`
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd)]
#[serde(rename_all = "snake_case")]
pub enum EpochState {
PublicKeySubmission,
DealingExchange,
VerificationKeySubmission,
VerificationKeyValidation,
VerificationKeyFinalization,
InProgress,
}
impl Default for EpochState {
fn default() -> Self {
Self::PublicKeySubmission
}
}
impl Display for EpochState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
EpochState::PublicKeySubmission => write!(f, "PublicKeySubmission"),
EpochState::DealingExchange => write!(f, "DealingExchange"),
EpochState::VerificationKeySubmission => write!(f, "VerificationKeySubmission"),
EpochState::VerificationKeyValidation => write!(f, "VerificationKeyValidation"),
EpochState::VerificationKeyFinalization => write!(f, "VerificationKeyFinalization"),
EpochState::InProgress => write!(f, "InProgress"),
}
}
}
impl EpochState {
pub fn next(self) -> Option<Self> {
match self {
EpochState::PublicKeySubmission => Some(EpochState::DealingExchange),
EpochState::DealingExchange => Some(EpochState::VerificationKeySubmission),
EpochState::VerificationKeySubmission => Some(EpochState::VerificationKeyValidation),
EpochState::VerificationKeyValidation => Some(EpochState::VerificationKeyFinalization),
EpochState::VerificationKeyFinalization => Some(EpochState::InProgress),
EpochState::InProgress => None,
}
}
pub fn all_until(&self, end: Self) -> Vec<Self> {
let mut states = vec![*self];
while states.last().unwrap() != &end {
let next_state = states.last().unwrap().next().expect("somehow reached the end of state diff -> this should be impossible under any circumstances!");
states.push(next_state);
}
states
}
}
@@ -0,0 +1,70 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::msg::ExecuteMsg;
use crate::types::NodeIndex;
use cosmwasm_std::{from_binary, to_binary, Addr, CosmosMsg, StdResult, Timestamp, WasmMsg};
use cw_utils::Expiration;
use multisig_contract_common::msg::ExecuteMsg as MultisigExecuteMsg;
use serde::{Deserialize, Serialize};
pub type VerificationKeyShare = String;
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct ContractVKShare {
pub share: VerificationKeyShare,
pub announce_address: String,
pub node_index: NodeIndex,
pub owner: Addr,
pub verified: bool,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct PagedVKSharesResponse {
pub shares: Vec<ContractVKShare>,
pub per_page: usize,
pub start_next_after: Option<Addr>,
}
pub fn to_cosmos_msg(
owner: Addr,
coconut_dkg_addr: String,
multisig_addr: String,
expiration_time: Timestamp,
) -> StdResult<CosmosMsg> {
let verify_vk_share_req = ExecuteMsg::VerifyVerificationKeyShare { owner };
let verify_vk_share_msg = CosmosMsg::Wasm(WasmMsg::Execute {
contract_addr: coconut_dkg_addr,
msg: to_binary(&verify_vk_share_req)?,
funds: vec![],
});
let req = MultisigExecuteMsg::Propose {
title: String::from("Verify VK share, as ordered by Coconut DKG Contract"),
description: String::new(),
msgs: vec![verify_vk_share_msg],
latest: Some(Expiration::AtTime(expiration_time)),
};
let msg = CosmosMsg::Wasm(WasmMsg::Execute {
contract_addr: multisig_addr,
msg: to_binary(&req)?,
funds: vec![],
});
Ok(msg)
}
pub fn owner_from_cosmos_msgs(msgs: &[CosmosMsg]) -> Option<Addr> {
if let Some(CosmosMsg::Wasm(WasmMsg::Execute {
contract_addr: _,
msg,
funds: _,
})) = msgs.get(0)
{
if let Ok(ExecuteMsg::VerifyVerificationKeyShare { owner }) = from_binary::<ExecuteMsg>(msg)
{
return Some(owner);
}
}
None
}
@@ -7,10 +7,15 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bs58 = "0.4.0"
cosmwasm-std = "1.0.0"
serde = { version = "1.0", features = ["derive"] }
dkg = { path = "../../../common/crypto/dkg", optional = true }
schemars = "0.8"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1"
[dev-dependencies]
serde_json = "1.0.0"
[features]
coconut = ["dkg"]
@@ -0,0 +1,87 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#[cfg(feature = "coconut")]
use dkg::{error::DkgError, Dealing};
use schemars::JsonSchema;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt::{Display, Formatter};
use std::ops::Deref;
// some sane upper-bound size on byte sizes
// currently set to 128 bytes
pub const MAX_DISPLAY_SIZE: usize = 128;
// TODO: if we are to use this for different types, it might make sense to introduce something like
// CommitmentTypeId field on the below for distinguishing different ones. it would somehow become part of the trait
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, JsonSchema)]
pub struct ContractSafeBytes(Vec<u8>);
impl Deref for ContractSafeBytes {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Display for ContractSafeBytes {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if !self.0.is_empty() {
write!(f, "0x")?;
}
for byte in self.0.iter().take(MAX_DISPLAY_SIZE) {
write!(f, "{:02X}", byte)?;
}
// just some sanity safeguards
if self.0.len() > MAX_DISPLAY_SIZE {
write!(f, "...")?;
}
Ok(())
}
}
// since cosmwasm stores everything with byte representation of stringified json, it's actually more efficient
// to serialize this as a string as opposed to keeping it as vector of bytes.
// for example vec![255,255] would have string representation of "[255,255]" and will be serialized to
// [91, 50, 53, 53, 44, 50, 53, 53, 93]. the equivalent base58 encoded string `"LUv"` will be serialized to
// [34, 76, 85, 118, 34]
//
// the difference between base58 and base64 is rather minimal and I've gone with base58 for consistency sake
impl Serialize for ContractSafeBytes {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&bs58::encode(&self.0).into_string())
}
}
impl<'de> Deserialize<'de> for ContractSafeBytes {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = <String>::deserialize(deserializer)?;
let bytes = bs58::decode(&s)
.into_vec()
.map_err(serde::de::Error::custom)?;
Ok(ContractSafeBytes(bytes))
}
}
#[cfg(feature = "coconut")]
impl<'a> From<&'a Dealing> for ContractSafeBytes {
fn from(dealing: &'a Dealing) -> Self {
ContractSafeBytes(dealing.to_bytes())
}
}
#[cfg(feature = "coconut")]
impl<'a> TryFrom<&'a ContractSafeBytes> for Dealing {
type Error = DkgError;
fn try_from(value: &'a ContractSafeBytes) -> Result<Self, Self::Error> {
Dealing::try_from_bytes(&value.0)
}
}
@@ -4,6 +4,7 @@
#![warn(clippy::expect_used)]
#![warn(clippy::unwrap_used)]
pub mod dealings;
pub mod events;
pub mod types;
@@ -413,4 +413,6 @@ pub enum QueryMsg {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct MigrateMsg {}
pub struct MigrateMsg {
pub vesting_contract_address: Option<String>,
}
@@ -6,9 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
cw-utils = { version = "0.13.1" }
cw3 = { version = "0.13.1" }
cw4 = { version = "0.13.1" }
cw-utils = { version = "0.13.4" }
cw3 = { version = "0.13.4" }
cw3-fixed-multisig = { version = "0.13.4", features = ["library"] }
cw4 = { version = "0.13.4" }
cosmwasm-std = "1.0.0"
schemars = "0.8"
serde = { version = "1.0.103", default-features = false, features = ["derive"] }
@@ -5,7 +5,6 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use cosmwasm_std::{CosmosMsg, Empty};
pub use cw3::ProposalResponse;
use cw3::Vote;
use cw4::MemberChangedHookMsg;
use cw_utils::{Duration, Expiration, Threshold};
@@ -15,6 +14,7 @@ pub struct InstantiateMsg {
// this is the group contract that contains the member list
pub group_addr: String,
pub coconut_bandwidth_contract_address: String,
pub coconut_dkg_contract_address: String,
pub threshold: Threshold,
pub max_voting_period: Duration,
}
@@ -82,4 +82,5 @@ pub enum QueryMsg {
#[serde(rename_all = "snake_case")]
pub struct MigrateMsg {
pub coconut_bandwidth_address: String,
pub coconut_dkg_address: String,
}
-1
View File
@@ -9,7 +9,6 @@ edition = "2021"
bls12_381 = { version = "0.5", default-features = false, features = ["pairings", "alloc", "experimental"] }
cosmrs = { git = "https://github.com/neacsu/cosmos-rust", branch = "neacsu/feegrant_support", optional = true }
thiserror = "1.0"
url = "2.2"
# I guess temporarily until we get serde support in coconut up and running
coconut-interface = { path = "../coconut-interface" }
+32 -54
View File
@@ -8,8 +8,8 @@ use coconut_interface::{
use crypto::asymmetric::encryption::PublicKey;
use crypto::shared_key::recompute_shared_key;
use crypto::symmetric::stream_cipher;
use url::Url;
use validator_api_requests::coconut::BlindSignRequestBody;
use validator_client::client::CoconutApiClient;
use crate::coconut::bandwidth::{BandwidthVoucher, PRIVATE_ATTRIBUTES, PUBLIC_ATTRIBUTES};
use crate::coconut::params::{
@@ -17,46 +17,21 @@ use crate::coconut::params::{
};
use crate::error::Error;
/// Contacts all provided validators and then aggregate their verification keys.
///
/// # Arguments
///
/// * `validators`: list of validators to obtain verification keys from.
///
/// Note: list of validators must be correctly ordered by the polynomial coordinates used
/// during key generation and it is responsibility of the caller to ensure that correct
/// number of them is provided
///
/// # Examples
///
/// ```no_run
/// use url::{Url, ParseError};
/// use credentials::obtain_aggregate_verification_key;
///
/// async fn example() -> Result<(), ParseError> {
/// let validators = vec!["https://sandbox-validator1.nymtech.net/api".parse()?, "https://sandbox-validator2.nymtech.net/api".parse()?];
/// let aggregated_key = obtain_aggregate_verification_key(&validators).await;
/// // deal with the obtained Result
/// Ok(())
/// }
/// ```
pub async fn obtain_aggregate_verification_key(
validators: &[Url],
api_clients: &[CoconutApiClient],
) -> Result<VerificationKey, Error> {
if validators.is_empty() {
if api_clients.is_empty() {
return Err(Error::NoValidatorsAvailable);
}
let mut indices = Vec::with_capacity(validators.len());
let mut shares = Vec::with_capacity(validators.len());
let mut client = validator_client::ApiClient::new(validators[0].clone());
for (id, validator_url) in validators.iter().enumerate() {
client.change_validator_api(validator_url.clone());
let response = client.get_coconut_verification_key().await?;
indices.push((id + 1) as u64);
shares.push(response.key);
}
let indices: Vec<_> = api_clients
.iter()
.map(|api_client| api_client.node_id)
.collect();
let shares: Vec<_> = api_clients
.iter()
.map(|api_client| api_client.verification_key.clone())
.collect();
Ok(aggregate_verification_keys(&shares, Some(&indices))?)
}
@@ -64,7 +39,7 @@ pub async fn obtain_aggregate_verification_key(
async fn obtain_partial_credential(
params: &Parameters,
attributes: &BandwidthVoucher,
client: &validator_client::ApiClient,
client: &validator_client::client::ApiClient,
validator_vk: &VerificationKey,
) -> Result<Signature, Error> {
let public_attributes = attributes.get_public_attributes();
@@ -118,26 +93,33 @@ async fn obtain_partial_credential(
pub async fn obtain_aggregate_signature(
params: &Parameters,
attributes: &BandwidthVoucher,
validators: &[Url],
coconut_api_clients: &[CoconutApiClient],
) -> Result<Signature, Error> {
if validators.is_empty() {
if coconut_api_clients.is_empty() {
return Err(Error::NoValidatorsAvailable);
}
let public_attributes = attributes.get_public_attributes();
let private_attributes = attributes.get_private_attributes();
let mut shares = Vec::with_capacity(validators.len());
let mut validators_partial_vks: Vec<VerificationKey> = Vec::with_capacity(validators.len());
let mut shares = Vec::with_capacity(coconut_api_clients.len());
let validators_partial_vks: Vec<_> = coconut_api_clients
.iter()
.map(|api_client| api_client.verification_key.clone())
.collect();
let indices: Vec<_> = coconut_api_clients
.iter()
.map(|api_client| api_client.node_id)
.collect();
let mut client = validator_client::ApiClient::new(validators[0].clone());
for (id, validator_url) in validators.iter().enumerate() {
client.change_validator_api(validator_url.clone());
let validator_partial_vk = client.get_coconut_verification_key().await?;
validators_partial_vks.push(validator_partial_vk.key.clone());
let signature =
obtain_partial_credential(params, attributes, &client, &validator_partial_vk.key)
.await?;
let share = SignatureShare::new(signature, (id + 1) as u64);
for coconut_api_client in coconut_api_clients.iter() {
let signature = obtain_partial_credential(
params,
attributes,
&coconut_api_client.api_client,
&coconut_api_client.verification_key,
)
.await?;
let share = SignatureShare::new(signature, coconut_api_client.node_id);
shares.push(share)
}
@@ -145,10 +127,6 @@ pub async fn obtain_aggregate_signature(
attributes.extend_from_slice(&private_attributes);
attributes.extend_from_slice(&public_attributes);
let mut indices: Vec<u64> = Vec::with_capacity(validators_partial_vks.len());
for i in 0..validators_partial_vks.len() {
indices.push((i + 1) as u64);
}
let verification_key =
aggregate_verification_keys(&validators_partial_vks, Some(indices.as_ref()))?;
+2
View File
@@ -25,6 +25,8 @@ serde_derive = "1.0"
thiserror = "1.0"
zeroize = { version = "1.4", features = ["zeroize_derive"] }
pemstore = { path = "../../pemstore" }
[dependencies.group]
version = "0.11"
default-features = false
+15 -52
View File
@@ -9,7 +9,7 @@ use dkg::bte::proof_discrete_log::ProofOfDiscreteLog;
use dkg::bte::proof_sharing::ProofOfSecretSharing;
use dkg::bte::{
decrypt_share, encrypt_shares, keygen, proof_chunking, proof_sharing, setup, DecryptionKey,
Epoch, PublicKey,
PublicKey,
};
use dkg::interpolation::polynomial::Polynomial;
use dkg::{Dealing, NodeIndex, Share};
@@ -54,7 +54,6 @@ pub fn creating_dealing_for_3_parties(c: &mut Criterion) {
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let threshold = 2;
let epoch = Epoch::new(2);
let (receivers, _) = prepare_keys(&mut rng, 3);
@@ -66,7 +65,6 @@ pub fn creating_dealing_for_3_parties(c: &mut Criterion) {
&params,
receivers.keys().next().copied().unwrap(),
threshold,
epoch,
&receivers,
None,
)
@@ -80,7 +78,6 @@ pub fn verifying_dealing_made_for_3_parties_and_recovering_share(c: &mut Criteri
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let threshold = 2;
let epoch = Epoch::new(2);
let (receivers, mut dks) = prepare_keys(&mut rng, 3);
let (dealing, _) = Dealing::create(
@@ -88,22 +85,18 @@ pub fn verifying_dealing_made_for_3_parties_and_recovering_share(c: &mut Criteri
&params,
receivers.keys().next().copied().unwrap(),
threshold,
epoch,
&receivers,
None,
);
let first_key = dks.get_mut(0).unwrap();
first_key.try_update_to(epoch, &params, &mut rng).unwrap();
c.bench_function(
"verifying single dealing made for 3 parties (threshold 2) and recovering share",
|b| {
b.iter(|| {
assert!(dealing
.verify(&params, epoch, threshold, &receivers, None)
.is_ok());
black_box(decrypt_share(first_key, 0, &dealing.ciphertexts, epoch, None).unwrap());
assert!(dealing.verify(&params, threshold, &receivers, None).is_ok());
black_box(decrypt_share(first_key, 0, &dealing.ciphertexts, None).unwrap());
})
},
);
@@ -114,7 +107,6 @@ pub fn creating_dealing_for_20_parties(c: &mut Criterion) {
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let threshold = 14;
let epoch = Epoch::new(2);
let (receivers, _) = prepare_keys(&mut rng, 20);
@@ -128,7 +120,6 @@ pub fn creating_dealing_for_20_parties(c: &mut Criterion) {
&params,
receivers.keys().next().copied().unwrap(),
threshold,
epoch,
&receivers,
None,
)
@@ -143,7 +134,6 @@ pub fn verifying_dealing_made_for_20_parties_and_recovering_share(c: &mut Criter
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let threshold = 14;
let epoch = Epoch::new(2);
let (receivers, mut dks) = prepare_keys(&mut rng, 20);
let (dealing, _) = Dealing::create(
@@ -151,22 +141,18 @@ pub fn verifying_dealing_made_for_20_parties_and_recovering_share(c: &mut Criter
&params,
receivers.keys().next().copied().unwrap(),
threshold,
epoch,
&receivers,
None,
);
let first_key = dks.get_mut(0).unwrap();
first_key.try_update_to(epoch, &params, &mut rng).unwrap();
c.bench_function(
"verifying single dealing made for 20 parties (threshold 14) and recovering share",
|b| {
b.iter(|| {
assert!(dealing
.verify(&params, epoch, threshold, &receivers, None)
.is_ok());
black_box(decrypt_share(first_key, 0, &dealing.ciphertexts, epoch, None).unwrap());
assert!(dealing.verify(&params, threshold, &receivers, None).is_ok());
black_box(decrypt_share(first_key, 0, &dealing.ciphertexts, None).unwrap());
})
},
);
@@ -177,7 +163,6 @@ pub fn creating_dealing_for_100_parties(c: &mut Criterion) {
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let threshold = 67;
let epoch = Epoch::new(2);
let (receivers, _) = prepare_keys(&mut rng, 100);
@@ -191,7 +176,6 @@ pub fn creating_dealing_for_100_parties(c: &mut Criterion) {
&params,
receivers.keys().next().copied().unwrap(),
threshold,
epoch,
&receivers,
None,
)
@@ -206,7 +190,6 @@ pub fn verifying_dealing_made_for_100_parties_and_recovering_share(c: &mut Crite
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let threshold = 67;
let epoch = Epoch::new(2);
let (receivers, mut dks) = prepare_keys(&mut rng, 100);
let (dealing, _) = Dealing::create(
@@ -214,22 +197,18 @@ pub fn verifying_dealing_made_for_100_parties_and_recovering_share(c: &mut Crite
&params,
receivers.keys().next().copied().unwrap(),
threshold,
epoch,
&receivers,
None,
);
let first_key = dks.get_mut(0).unwrap();
first_key.try_update_to(epoch, &params, &mut rng).unwrap();
c.bench_function(
"verifying single dealing made for 100 parties (threshold 67) and recovering share",
|b| {
b.iter(|| {
assert!(dealing
.verify(&params, epoch, threshold, &receivers, None)
.is_ok());
black_box(decrypt_share(first_key, 0, &dealing.ciphertexts, epoch, None).unwrap());
assert!(dealing.verify(&params, threshold, &receivers, None).is_ok());
black_box(decrypt_share(first_key, 0, &dealing.ciphertexts, None).unwrap());
})
},
);
@@ -266,7 +245,6 @@ pub fn creating_proof_of_chunking_for_100_parties(c: &mut Criterion) {
let dummy_seed = [42u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let epoch = Epoch::new(2);
let (receivers, _) = prepare_keys(&mut rng, 100);
@@ -283,7 +261,7 @@ pub fn creating_proof_of_chunking_for_100_parties(c: &mut Criterion) {
.collect::<Vec<_>>();
let ordered_public_keys = receivers.values().copied().collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, epoch, &params, &mut rng);
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
c.bench_function("creating proof of chunking for 100 parties", |b| {
b.iter(|| {
@@ -301,7 +279,6 @@ pub fn verifying_proof_of_chunking_for_100_parties(c: &mut Criterion) {
let dummy_seed = [42u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let epoch = Epoch::new(2);
let (receivers, _) = prepare_keys(&mut rng, 100);
@@ -318,7 +295,7 @@ pub fn verifying_proof_of_chunking_for_100_parties(c: &mut Criterion) {
.collect::<Vec<_>>();
let ordered_public_keys = receivers.values().copied().collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, epoch, &params, &mut rng);
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
let chunking_instance = proof_chunking::Instance::new(&ordered_public_keys, &ciphertexts);
let proof_of_chunking =
@@ -338,7 +315,6 @@ pub fn creating_proof_of_secret_sharing_for_100_parties(c: &mut Criterion) {
let dummy_seed = [42u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let epoch = Epoch::new(2);
let (receivers, _) = prepare_keys(&mut rng, 100);
@@ -354,7 +330,7 @@ pub fn creating_proof_of_secret_sharing_for_100_parties(c: &mut Criterion) {
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, epoch, &params, &mut rng);
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
let combined_ciphertexts = ciphertexts.combine_ciphertexts();
let combined_r = hazmat.combine_rs();
@@ -381,7 +357,6 @@ pub fn verifying_proof_of_secret_sharing_for_100_parties(c: &mut Criterion) {
let dummy_seed = [42u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let epoch = Epoch::new(2);
let (receivers, _) = prepare_keys(&mut rng, 100);
@@ -397,7 +372,7 @@ pub fn verifying_proof_of_secret_sharing_for_100_parties(c: &mut Criterion) {
.map(|(share, key)| (share, key))
.collect::<Vec<_>>();
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, epoch, &params, &mut rng);
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, &params, &mut rng);
let combined_ciphertexts = ciphertexts.combine_ciphertexts();
let combined_r = hazmat.combine_rs();
@@ -430,7 +405,6 @@ pub fn single_share_encryption(c: &mut Criterion) {
let dummy_seed = [42u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let epoch = Epoch::new(2);
let (_, pk) = keygen(&params, &mut rng);
let polynomial = Polynomial::new_random(&mut rng, 3);
@@ -440,7 +414,6 @@ pub fn single_share_encryption(c: &mut Criterion) {
b.iter(|| {
black_box(encrypt_shares(
&[(&share, pk.public_key())],
epoch,
&params,
&mut rng,
))
@@ -452,7 +425,6 @@ pub fn share_encryption_100(c: &mut Criterion) {
let dummy_seed = [42u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let epoch = Epoch::new(2);
let (receivers, _) = prepare_keys(&mut rng, 100);
let polynomial = Polynomial::new_random(&mut rng, 3);
@@ -468,14 +440,7 @@ pub fn share_encryption_100(c: &mut Criterion) {
.collect::<Vec<_>>();
c.bench_function("100 shares encryption", |b| {
b.iter(|| {
black_box(encrypt_shares(
&remote_share_key_pairs,
epoch,
&params,
&mut rng,
))
})
b.iter(|| black_box(encrypt_shares(&remote_share_key_pairs, &params, &mut rng)))
});
}
@@ -483,16 +448,14 @@ pub fn share_decryption(c: &mut Criterion) {
let dummy_seed = [42u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let epoch = Epoch::new(2);
let (mut dk, pk) = keygen(&params, &mut rng);
let (dk, pk) = keygen(&params, &mut rng);
let polynomial = Polynomial::new_random(&mut rng, 3);
let share: Share = polynomial.evaluate_at(&Scalar::from(42)).into();
let (ciphertexts, _) = encrypt_shares(&[(&share, pk.public_key())], epoch, &params, &mut rng);
dk.try_update_to(epoch, &params, &mut rng).unwrap();
let (ciphertexts, _) = encrypt_shares(&[(&share, pk.public_key())], &params, &mut rng);
c.bench_function("single share decryption", |b| {
b.iter(|| black_box(decrypt_share(&dk, 0, &ciphertexts, epoch, None)))
b.iter(|| black_box(decrypt_share(&dk, 0, &ciphertexts, None)))
});
}
+26 -179
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::bte::keys::{DecryptionKey, PublicKey};
use crate::bte::{Epoch, Params, CHUNK_SIZE, G2_GENERATOR_PREPARED, NUM_CHUNKS, PAIRING_BASE};
use crate::bte::{evaluate_f, Params, CHUNK_SIZE, G2_GENERATOR_PREPARED, NUM_CHUNKS, PAIRING_BASE};
use crate::error::DkgError;
use crate::utils::{combine_g1_chunks, combine_scalar_chunks, deserialize_g1, deserialize_g2};
use crate::{Chunk, ChunkedShare, Share};
@@ -24,7 +24,7 @@ pub struct Ciphertexts {
}
impl Ciphertexts {
pub fn verify_integrity(&self, params: &Params, epoch: Epoch) -> bool {
pub fn verify_integrity(&self, params: &Params) -> bool {
// if this checks fails it means the ciphertext is undefined as values
// in `r`, `s` and `z` are meaningless since technically this ciphertext
// has been created for 0 parties
@@ -33,9 +33,7 @@ impl Ciphertexts {
}
let g1_neg = G1Affine::generator().neg();
let f = epoch
.as_extended_tau(&self.rr, &self.ss, &self.ciphertext_chunks)
.evaluate_f(params);
let f = evaluate_f(params);
// we have to use `f` in up to `NUM_CHUNKS` pairings (if everything is valid),
// so perform some precomputation on it
@@ -192,7 +190,6 @@ impl HazmatRandomness {
pub fn encrypt_shares(
shares: &[(&Share, &PublicKey)],
epoch: Epoch,
params: &Params,
mut rng: impl RngCore,
) -> (Ciphertexts, HazmatRandomness) {
@@ -242,7 +239,7 @@ pub fn encrypt_shares(
let rr = rr.try_into().unwrap();
let ss = ss.try_into().unwrap();
let f = epoch.as_extended_tau(&rr, &ss, &cc).evaluate_f(params);
let f = evaluate_f(params);
let mut zz = Vec::with_capacity(NUM_CHUNKS);
for i in 0..NUM_CHUNKS {
@@ -269,35 +266,22 @@ pub fn decrypt_share(
// in the case of multiple receivers, specifies which index of ciphertext chunks should be used
i: usize,
ciphertext: &Ciphertexts,
epoch: Epoch,
lookup_table: Option<&BabyStepGiantStepLookup>,
) -> Result<Share, DkgError> {
let mut plaintext = ChunkedShare::default();
let decryption_node = dk.try_get_compatible_node(epoch)?;
let extended_tau = epoch.as_extended_tau(
&ciphertext.rr,
&ciphertext.ss,
&ciphertext.ciphertext_chunks,
);
if i >= ciphertext.ciphertext_chunks.len() {
return Err(DkgError::UnavailableCiphertext(i));
}
let height = decryption_node.tau.height();
let b_neg = decryption_node
.ds
let b_neg = dk
.dh
.iter()
.chain(decryption_node.dh.iter())
.zip(extended_tau.0.iter().by_vals().skip(height))
.filter(|(_, i)| *i)
.map(|(d_i, _)| d_i)
.fold(decryption_node.b, |acc, d_i| acc + d_i)
.fold(dk.b, |acc, d_i| acc + d_i)
.neg()
.to_affine();
let e_neg = decryption_node.e.neg().to_affine();
let e_neg = dk.e.neg().to_affine();
for j in 0..NUM_CHUNKS {
let rr_j = &ciphertext.rr[j];
@@ -308,7 +292,7 @@ pub fn decrypt_share(
let miller = bls12_381::multi_miller_loop(&[
(&cc_ij.to_affine(), &G2_GENERATOR_PREPARED),
(&rr_j.to_affine(), &G2Prepared::from(b_neg)),
(&decryption_node.a.to_affine(), &G2Prepared::from(zz_j)),
(&dk.a.to_affine(), &G2Prepared::from(zz_j)),
(&ss_j.to_affine(), &G2Prepared::from(e_neg)),
]);
let m = miller.final_exponentiation();
@@ -466,7 +450,6 @@ mod tests {
let (decryption_key1, public_key1) = keygen(&params, &mut rng);
let (decryption_key2, public_key2) = keygen(&params, &mut rng);
let epoch = Epoch::new(0);
let lookup_table = &DEFAULT_BSGS_TABLE;
@@ -475,13 +458,13 @@ mod tests {
let m2 = Share::random(&mut rng);
let shares = &[(&m1, &public_key1.key), (&m2, &public_key2.key)];
let (ciphertext, hazmat) = encrypt_shares(shares, epoch, &params, &mut rng);
let (ciphertext, hazmat) = encrypt_shares(shares, &params, &mut rng);
verify_hazmat_rand(&ciphertext, &hazmat);
let recovered1 =
decrypt_share(&decryption_key1, 0, &ciphertext, epoch, Some(lookup_table)).unwrap();
decrypt_share(&decryption_key1, 0, &ciphertext, Some(lookup_table)).unwrap();
let recovered2 =
decrypt_share(&decryption_key2, 1, &ciphertext, epoch, Some(lookup_table)).unwrap();
decrypt_share(&decryption_key2, 1, &ciphertext, Some(lookup_table)).unwrap();
assert_eq!(m1, recovered1);
assert_eq!(m2, recovered2);
}
@@ -494,15 +477,8 @@ mod tests {
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let (mut decryption_key1, public_key1) = keygen(&params, &mut rng);
let (mut decryption_key2, public_key2) = keygen(&params, &mut rng);
let epoch = Epoch::new(12345);
decryption_key1
.try_update_to(epoch, &params, &mut rng)
.unwrap();
decryption_key2
.try_update_to(epoch, &params, &mut rng)
.unwrap();
let (decryption_key1, public_key1) = keygen(&params, &mut rng);
let (decryption_key2, public_key2) = keygen(&params, &mut rng);
let lookup_table = &DEFAULT_BSGS_TABLE;
@@ -511,121 +487,18 @@ mod tests {
let m2 = Share::random(&mut rng);
let shares = &[(&m1, &public_key1.key), (&m2, &public_key2.key)];
let (ciphertext, hazmat) = encrypt_shares(shares, epoch, &params, &mut rng);
let (ciphertext, hazmat) = encrypt_shares(shares, &params, &mut rng);
verify_hazmat_rand(&ciphertext, &hazmat);
let recovered1 =
decrypt_share(&decryption_key1, 0, &ciphertext, epoch, Some(lookup_table)).unwrap();
decrypt_share(&decryption_key1, 0, &ciphertext, Some(lookup_table)).unwrap();
let recovered2 =
decrypt_share(&decryption_key2, 1, &ciphertext, epoch, Some(lookup_table)).unwrap();
decrypt_share(&decryption_key2, 1, &ciphertext, Some(lookup_table)).unwrap();
assert_eq!(m1, recovered1);
assert_eq!(m2, recovered2);
}
}
#[test]
fn decryption_with_root_key() {
let dummy_seed = [42u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let (root_key, public_key) = keygen(&params, &mut rng);
let share = Share::random(&mut rng);
let epoch0 = Epoch::new(0);
let epoch42 = Epoch::new(42);
let epoch_big = Epoch::new(3292547435);
let (ciphertext1, hazmat1) =
encrypt_shares(&[(&share, &public_key.key)], epoch0, &params, &mut rng);
verify_hazmat_rand(&ciphertext1, &hazmat1);
let (ciphertext2, hazmat2) =
encrypt_shares(&[(&share, &public_key.key)], epoch42, &params, &mut rng);
verify_hazmat_rand(&ciphertext2, &hazmat2);
let (ciphertext3, hazmat3) =
encrypt_shares(&[(&share, &public_key.key)], epoch_big, &params, &mut rng);
verify_hazmat_rand(&ciphertext3, &hazmat3);
let recovered1 = decrypt_share(&root_key, 0, &ciphertext1, epoch0, None).unwrap();
let recovered2 = decrypt_share(&root_key, 0, &ciphertext2, epoch42, None).unwrap();
let recovered3 = decrypt_share(&root_key, 0, &ciphertext3, epoch_big, None).unwrap();
assert_eq!(share, recovered1);
assert_eq!(share, recovered2);
assert_eq!(share, recovered3);
}
#[test]
#[ignore] // expensive test
fn update_and_decrypt_10() {
let dummy_seed = [1u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let (mut decryption_key, public_key) = keygen(&params, &mut rng);
for epoch_value in 0..10 {
let epoch = Epoch::new(epoch_value);
let share = Share::random(&mut rng);
decryption_key
.try_update_to(epoch, &params, &mut rng)
.unwrap();
let (ciphertext, hazmat) =
encrypt_shares(&[(&share, &public_key.key)], epoch, &params, &mut rng);
verify_hazmat_rand(&ciphertext, &hazmat);
let recovered = decrypt_share(&decryption_key, 0, &ciphertext, epoch, None).unwrap();
assert_eq!(share, recovered);
}
}
#[test]
#[ignore] // expensive test
fn reblinding_node_doesnt_affect_decryption() {
let dummy_seed = [1u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let params = setup();
let (mut decryption_key, public_key) = keygen(&params, &mut rng);
let epoch = Epoch::new(12345);
decryption_key
.try_update_to(epoch, &params, &mut rng)
.unwrap();
for node in decryption_key.nodes.iter_mut() {
node.reblind(&params, &mut rng);
}
let share = Share::random(&mut rng);
let (ciphertext, hazmat) =
encrypt_shares(&[(&share, &public_key.key)], epoch, &params, &mut rng);
verify_hazmat_rand(&ciphertext, &hazmat);
let recovered = decrypt_share(&decryption_key, 0, &ciphertext, epoch, None).unwrap();
assert_eq!(share, recovered);
// attempt to update the key again so we have to derive fresh nodes using previous reblinded results
let epoch2 = Epoch::new(67890);
decryption_key
.try_update_to(epoch2, &params, &mut rng)
.unwrap();
for node in decryption_key.nodes.iter_mut() {
node.reblind(&params, &mut rng);
}
let share2 = Share::random(&mut rng);
let (ciphertext, hazmat) =
encrypt_shares(&[(&share2, &public_key.key)], epoch2, &params, &mut rng);
verify_hazmat_rand(&ciphertext, &hazmat);
let recovered = decrypt_share(&decryption_key, 0, &ciphertext, epoch2, None).unwrap();
assert_eq!(share2, recovered);
}
#[test]
#[ignore] // expensive test
fn ciphertext_integrity_check_passes_for_valid_data() {
@@ -634,14 +507,11 @@ mod tests {
let dummy_seed = [1u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let (mut dk, public_key) = keygen(&params, &mut rng);
let epoch = Epoch::new(1);
let (_, public_key) = keygen(&params, &mut rng);
dk.try_update_to(epoch, &params, &mut rng).unwrap();
let share = Share::random(&mut rng);
let (ciphertext, _) =
encrypt_shares(&[(&share, &public_key.key)], epoch, &params, &mut rng);
assert!(ciphertext.verify_integrity(&params, epoch))
let (ciphertext, _) = encrypt_shares(&[(&share, &public_key.key)], &params, &mut rng);
assert!(ciphertext.verify_integrity(&params))
}
#[test]
@@ -652,45 +522,22 @@ mod tests {
let dummy_seed = [1u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let (mut dk, public_key) = keygen(&params, &mut rng);
let epoch = Epoch::new(1);
let (_, public_key) = keygen(&params, &mut rng);
dk.try_update_to(epoch, &params, &mut rng).unwrap();
let share = Share::random(&mut rng);
let (ciphertext, _) =
encrypt_shares(&[(&share, &public_key.key)], epoch, &params, &mut rng);
let (ciphertext, _) = encrypt_shares(&[(&share, &public_key.key)], &params, &mut rng);
let mut bad_cipher1 = ciphertext.clone();
bad_cipher1.rr[4] = G1Projective::generator();
assert!(!bad_cipher1.verify_integrity(&params, epoch));
assert!(!bad_cipher1.verify_integrity(&params));
let mut bad_cipher2 = ciphertext.clone();
bad_cipher2.ss[4] = G1Projective::generator();
assert!(!bad_cipher2.verify_integrity(&params, epoch));
assert!(!bad_cipher2.verify_integrity(&params));
let mut bad_cipher3 = ciphertext;
bad_cipher3.zz[4] = G2Projective::generator();
assert!(!bad_cipher3.verify_integrity(&params, epoch));
}
#[test]
#[ignore] // expensive test
fn ciphertext_integrity_check_passes_fails_for_wrong_epoch() {
let params = setup();
let dummy_seed = [1u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let (mut dk, public_key) = keygen(&params, &mut rng);
let epoch = Epoch::new(1);
dk.try_update_to(epoch, &params, &mut rng).unwrap();
let share = Share::random(&mut rng);
let (ciphertext, _) =
encrypt_shares(&[(&share, &public_key.key)], epoch, &params, &mut rng);
let another_epoch = Epoch::new(2);
assert!(!ciphertext.verify_integrity(&params, another_epoch))
assert!(!bad_cipher3.verify_integrity(&params));
}
#[test]
@@ -711,7 +558,7 @@ mod tests {
}
let refs = shares.iter().zip(public_keys.iter()).collect::<Vec<_>>();
let (ciphertext, hazmat) = encrypt_shares(&refs, Epoch::new(42), &params, &mut rng);
let (ciphertext, hazmat) = encrypt_shares(&refs, &params, &mut rng);
let combined_r = combine_scalar_chunks(hazmat.r());
let combined_rr = ciphertext.combine_rs();
+147 -713
View File
@@ -2,306 +2,16 @@
// SPDX-License-Identifier: Apache-2.0
use crate::bte::proof_discrete_log::ProofOfDiscreteLog;
use crate::bte::{Epoch, Params, Tau};
use crate::bte::Params;
use crate::error::DkgError;
use crate::utils::{deserialize_g1, deserialize_g2, deserialize_scalar};
use bls12_381::{G1Projective, G2Projective, Scalar};
use ff::Field;
use group::GroupEncoding;
use pemstore::traits::{PemStorableKey, PemStorableKeyPair};
use rand_core::RngCore;
use zeroize::Zeroize;
#[derive(Debug, Zeroize)]
#[zeroize(drop)]
#[cfg_attr(test, derive(Clone, PartialEq))]
pub(crate) struct Node {
pub(crate) tau: Tau,
// g1^rho
pub(crate) a: G1Projective,
// g2^x
pub(crate) b: G2Projective,
// f_i^rho, up to lambda_t elements
pub(crate) ds: Vec<G2Projective>,
// fh_i^rho, always lambda_h elements
pub(crate) dh: Vec<G2Projective>,
// h^rho
pub(crate) e: G2Projective,
}
impl Node {
fn new_root(
a: G1Projective,
b: G2Projective,
ds: Vec<G2Projective>,
dh: Vec<G2Projective>,
e: G2Projective,
) -> Self {
Node {
tau: Tau::new_root(),
a,
b,
ds,
dh,
e,
}
}
fn is_root(&self) -> bool {
self.tau.0.is_empty()
}
pub(crate) fn reblind(&mut self, params: &Params, mut rng: impl RngCore) {
let delta = Scalar::random(&mut rng);
self.a += G1Projective::generator() * delta;
// TODO: or do we have to do full tau evaluation here?
self.b += self.tau.evaluate_partial_f(params) * delta;
self.ds
.iter_mut()
.zip(params.fs.iter().skip(self.tau.height()))
.for_each(|(d_i, f_i)| *d_i += f_i * delta);
self.dh
.iter_mut()
.zip(params.fh.iter())
.for_each(|(d_i, f_i)| *d_i += f_i * delta);
self.e += params.h * delta;
}
// note: it's unsafe to use this method outside `try_update_to` as
// we have guaranteed there that `self` is parent of the target
// and that `self.tau != target_tau`
/// Given `self` with `Tau1` and `target_tau` with `Tau2`, such that `Tau1` prefixes `Tau2`,
/// i.e. `Tau2 == Tau1 || SUFFIX`, and `Tau2` is a leaf node, derive all required crypto material
/// for its construction.
fn derive_target_child_with_partials(
&self,
params: &Params,
target_tau: Tau,
partial_b: &G2Projective,
partial_f: &G2Projective,
mut rng: impl RngCore,
) -> Self {
debug_assert!(self.tau.is_parent_of(&target_tau));
debug_assert_ne!(self.tau, target_tau);
let delta = Scalar::random(&mut rng);
let a = self.a + G1Projective::generator() * delta;
let b = partial_b + partial_f * delta;
let ds = self
.ds
.iter()
.zip(params.fs.iter())
.skip(target_tau.height())
.map(|(d_i, f_i)| d_i + f_i * delta)
.collect();
let dh = self
.dh
.iter()
.zip(params.fh.iter())
.map(|(dh_i, fh_i)| dh_i + fh_i * delta)
.collect();
let e = self.e + params.h * delta;
Node {
tau: target_tau,
a,
b,
ds,
dh,
e,
}
}
// note: it's unsafe to use this method outside `try_update_to` as
// we have guaranteed there that `self` is parent of the target
// and that `self.tau != target_tau`
/// Given `self` with `Tau1` and `most_direct_parent` with `Tau2`, such that `Tau1` prefixes `Tau2`,
/// i.e. `Tau2 == Tau1 || SUFFIX`, derive node with `Tau3 = Tau2 || 1`
fn derive_right_nonfinal_child_of_with_partials(
&self,
params: &Params,
most_direct_parent: Tau,
partial_b: &G2Projective,
partial_f: &G2Projective,
mut rng: impl RngCore,
) -> Self {
let right_branch = most_direct_parent.right_child();
debug_assert!(self.tau.is_parent_of(&most_direct_parent));
debug_assert!(self.tau.is_parent_of(&right_branch));
debug_assert_ne!(self.tau, right_branch);
// n is height difference between self and the child
let n = right_branch.height() - self.tau.height();
// i is the index of the last bit we just added
let i = right_branch.height() - 1;
let delta = Scalar::random(&mut rng);
let a = self.a + G1Projective::generator() * delta;
let d0 = self.ds[n - 1];
let b = partial_b + d0 + (partial_f + params.fs[i]) * delta;
let ds = self
.ds
.iter()
.skip(n)
.zip(params.fs.iter().skip(right_branch.height()))
.map(|(d_i, f_i)| d_i + f_i * delta)
.collect();
let dh = self
.dh
.iter()
.zip(params.fh.iter())
.map(|(dh_i, fh_i)| dh_i + fh_i * delta)
.collect();
let e = self.e + params.h * delta;
Node {
tau: right_branch,
a,
b,
ds,
dh,
e,
}
}
// tau_bytes_len || tau || a || b || len_ds || ds || len_dh || dh || e
pub(crate) fn to_bytes(&self) -> Vec<u8> {
let g1_elements = 1;
let g2_elements = self.ds.len() + self.dh.len() + 2;
let tau_bytes = self.tau.to_bytes();
// the extra 12 comes from the triple u32 we use for encoding lengths of tau, ds and dh
let mut bytes =
Vec::with_capacity(tau_bytes.len() + g1_elements * 48 + g2_elements * 96 + 12);
bytes.extend_from_slice(&((tau_bytes.len() as u32).to_be_bytes()));
bytes.extend_from_slice(&tau_bytes);
bytes.extend_from_slice(self.a.to_bytes().as_ref());
bytes.extend_from_slice(self.b.to_bytes().as_ref());
bytes.extend_from_slice(&((self.ds.len() as u32).to_be_bytes()));
for d_i in &self.ds {
bytes.extend_from_slice(d_i.to_bytes().as_ref());
}
bytes.extend_from_slice(&((self.dh.len() as u32).to_be_bytes()));
for dh_i in &self.dh {
bytes.extend_from_slice(dh_i.to_bytes().as_ref());
}
bytes.extend_from_slice(self.e.to_bytes().as_ref());
bytes
}
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, DkgError> {
// at the very least we require bytes for:
// - tau_len ( 4 )
// - tau ( could be 0 for root node )
// - a ( 48 )
// - b ( 96 )
// - length indication of ds ( 4 )
// - length indication of dh ( 4 )
// - e ( 96 )
if bytes.len() < 4 + 48 + 96 + 4 + 4 + 96 {
return Err(DkgError::new_deserialization_failure(
"Node",
"insufficient number of bytes provided",
));
}
let tau_len = u32::from_be_bytes((&bytes[..4]).try_into().unwrap()) as usize;
let mut i = 4;
let tau = Tau::try_from_bytes(&bytes[i..i + tau_len])?;
i += tau_len;
// perform another length check to account for bytes consumed by tau
if bytes[i..].len() < 48 + 96 + 4 + 4 + 96 {
return Err(DkgError::new_deserialization_failure(
"Node",
"insufficient number of bytes provided",
));
}
let a = deserialize_g1(&bytes[i..i + 48]).ok_or_else(|| {
DkgError::new_deserialization_failure("Node.a", "invalid curve point")
})?;
i += 48;
let b = deserialize_g2(&bytes[i..i + 96]).ok_or_else(|| {
DkgError::new_deserialization_failure("Node.b", "invalid curve point")
})?;
i += 96;
let ds_len = u32::from_be_bytes((&bytes[i..i + 4]).try_into().unwrap()) as usize;
i += 4;
if bytes[i..].len() < ds_len * 96 + 4 {
return Err(DkgError::new_deserialization_failure(
"Node",
"insufficient number of bytes provided (ds)",
));
}
let mut ds = Vec::with_capacity(ds_len);
for j in 0..ds_len {
let d_i = deserialize_g2(&bytes[i..i + 96]).ok_or_else(|| {
DkgError::new_deserialization_failure(
format!("Node.ds_{}", j),
"invalid curve point",
)
})?;
ds.push(d_i);
i += 96;
}
let dh_len = u32::from_be_bytes((&bytes[i..i + 4]).try_into().unwrap()) as usize;
i += 4;
if bytes[i..].len() != (dh_len + 1) * 96 {
return Err(DkgError::new_deserialization_failure(
"Node",
"insufficient number of bytes provided (dh)",
));
}
let mut dh = Vec::with_capacity(dh_len);
for j in 0..dh_len {
let dh_i = deserialize_g2(&bytes[i..i + 96]).ok_or_else(|| {
DkgError::new_deserialization_failure(
format!("Node.dh_{}", j),
"invalid curve point",
)
})?;
dh.push(dh_i);
i += 96;
}
let e = deserialize_g2(&bytes[i..]).ok_or_else(|| {
DkgError::new_deserialization_failure("Node.h", "invalid curve point")
})?;
Ok(Node {
tau,
a,
b,
ds,
dh,
e,
})
}
}
// produces public key and a decryption key for the root of the tree
pub fn keygen(params: &Params, mut rng: impl RngCore) -> (DecryptionKey, PublicKeyWithProof) {
let g1 = G1Projective::generator();
@@ -317,11 +27,10 @@ pub fn keygen(params: &Params, mut rng: impl RngCore) -> (DecryptionKey, PublicK
let a = g1 * rho;
let b = g2 * x + params.f0 * rho;
let ds = params.fs.iter().map(|f_i| f_i * rho).collect();
let dh = params.fh.iter().map(|fh_i| fh_i * rho).collect();
let e = params.h * rho;
let dk = DecryptionKey::new_root(Node::new_root(a, b, ds, dh, e));
let dk = DecryptionKey::new_root(a, b, dh, e);
let public_key = PublicKey(y);
let key_with_proof = PublicKeyWithProof {
@@ -351,6 +60,20 @@ pub struct PublicKeyWithProof {
pub(crate) proof: ProofOfDiscreteLog,
}
impl PemStorableKey for PublicKeyWithProof {
type Error = DkgError;
fn pem_type() -> &'static str {
"DKG PUBLIC KEY WITH PROOF"
}
fn to_bytes(&self) -> Vec<u8> {
self.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<Self, Self::Error> {
Self::try_from_bytes(bytes)
}
}
impl PublicKeyWithProof {
pub fn verify(&self) -> bool {
self.key.verify(&self.proof)
@@ -412,233 +135,159 @@ impl PublicKeyWithProof {
#[derive(Debug, Zeroize)]
#[zeroize(drop)]
#[cfg_attr(test, derive(PartialEq))]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct DecryptionKey {
// note that the nodes are ordered from "right" to "left"
pub(crate) nodes: Vec<Node>,
// g1^rho
pub(crate) a: G1Projective,
// g2^x * f0^rho
pub(crate) b: G2Projective,
// fh_i^rho, always lambda_h elements
pub(crate) dh: Vec<G2Projective>,
// h^rho
pub(crate) e: G2Projective,
}
impl PemStorableKey for DecryptionKey {
type Error = DkgError;
fn pem_type() -> &'static str {
"DKG DECRYPTION KEY"
}
fn to_bytes(&self) -> Vec<u8> {
self.to_bytes()
}
fn from_bytes(bytes: &[u8]) -> Result<Self, Self::Error> {
Self::try_from_bytes(bytes)
}
}
impl DecryptionKey {
fn new_root(root_node: Node) -> Self {
DecryptionKey {
nodes: vec![root_node],
}
}
fn current(&self) -> Result<&Node, DkgError> {
// we must have at least a single node, otherwise we have a malformed key
self.nodes.last().ok_or(DkgError::MalformedDecryptionKey)
}
pub fn current_epoch(&self, params: &Params) -> Result<Option<Epoch>, DkgError> {
let current_node = self.current()?;
if current_node.is_root() {
Ok(None)
} else {
Epoch::try_from_tau(&current_node.tau, params).map(Option::Some)
}
}
pub(crate) fn try_get_compatible_node(&self, epoch: Epoch) -> Result<&Node, DkgError> {
let tau = epoch.as_tau();
self.nodes
.iter()
.rev()
.find(|node| node.tau.is_parent_of(&tau))
.ok_or(DkgError::ExpiredKey)
}
pub fn try_update_to_next_epoch(
&mut self,
params: &Params,
mut rng: impl RngCore,
) -> Result<(), DkgError> {
if self.nodes.is_empty() {
return Err(DkgError::MalformedDecryptionKey);
}
let mut target_epoch = Epoch::new(0);
if self.nodes.len() == 1 && self.nodes[0].is_root() {
return self.try_update_to(target_epoch, params, &mut rng);
}
// unwrap is fine as we have asserted self.nodes is not empty
self.nodes.pop().unwrap();
if let Some(tail) = self.nodes.last() {
target_epoch = tail.tau.lowest_valid_epoch_child(params)?;
} else {
// essentially our key consisted of only a single node and it wasn't a root,
// so either it was malformed or we somehow reached the final epoch and wanted to update
// beyond that. Either way, update to l + 1 is impossible
return Err(DkgError::MalformedDecryptionKey);
}
self.try_update_to(target_epoch, params, &mut rng)
}
/// Attempts to update `self` to the provided `epoch`. If the update is not possible,
/// because the target was in the past or the key is malformed, an error is returned.
///
/// Note that this method mutates the key in place and if the original key was malformed,
/// there are no guarantees about its internal state post-call.
pub fn try_update_to(
&mut self,
target_epoch: Epoch,
params: &Params,
mut rng: impl RngCore,
) -> Result<(), DkgError> {
if self.nodes.is_empty() {
// somehow we have an empty decryption key
return Err(DkgError::MalformedDecryptionKey);
}
// makes it easier to work with since we will be generating non-leaf nodes
let target_tau = target_epoch.as_tau();
let current_tau = &self.current()?.tau;
if current_tau == &target_tau {
// our key is already updated to the target
return Ok(());
}
if current_tau > &target_tau {
// we cannot derive keys for past epochs
return Err(DkgError::TargetEpochUpdateInThePast);
}
// drop the nodes that are no longer required and get the most direct parent for the target epoch available
let mut parent = loop {
// if pop() fails the key is malformed since we checked that the target_epoch > current_epoch,
// hence the update should have been possible
let tail = self.nodes.pop().ok_or(DkgError::MalformedDecryptionKey)?;
if tail.tau.is_parent_of(&target_tau) {
break tail;
}
};
// essentially the case of updating epoch n to n + 1, where n is even;
// in that case the last two nodes are [..., epoch_{n+1}, epoch_n]
// so we just have to reblind the n+1 node and we're done
if parent.tau == target_tau {
parent.reblind(params, &mut rng);
self.nodes.push(parent);
return Ok(());
}
// accumulators, note that the previous elements have already been included by the parent,
// i.e. for example for parent at height l <= n, b = g2^x * f0^rho * d1^{tau_1} * ... * dl^{tau_l}
// new_b_accumulator = b * d1^{tau_1} * d2^{tau_2} * ... * dn^{tau_n}
// new_f_accumulator = f0 * f1^{tau_1} * f2^{tau_2} * ... * fn^{tau_n} (up to lambda_t)
let mut new_b_accumulator = parent.b;
let mut new_f_accumulator = parent.tau.evaluate_partial_f(params);
let parent_height = parent.tau.height();
// path from the parent to the child
for (n, bit) in target_tau
.0
.iter()
.by_vals()
.skip(parent.tau.height())
.enumerate()
{
// ith bit of the [child] epoch
// note that n represents height difference between parent and the current bit
let i = n + parent_height;
// if the bit is NOT set, push the right '1' subtree (for future keys)
// so for example if given parent with some `PREFIX` tau and target_epoch being `PREFIX || 010`,
// in the first loop iteration we're going to look at bit `0` and
// derive child node `PREFIX || 1` so that in the future we could derive keys for all other epochs starting with `PREFIX || 1`
// in the next loop iteration we're going to look at bit `1` and simply update the accumulators,
// as we don't need to generate any "left" nodes as all of them would have constructed epochs that are already in the past
// finally, in the last iteration, we look at the bit `0` and derive node `PREFIX || 011`,
// i.e. the one that FOLLOWS the target node.
if !bit {
let direct_parent = target_tau.try_get_parent_at_height(i)?;
self.nodes
.push(parent.derive_right_nonfinal_child_of_with_partials(
params,
direct_parent,
&new_b_accumulator,
&new_f_accumulator,
&mut rng,
));
} else {
// only update the accumulators when the bit is set, as d^0 == identity, so there's
// no point in doing anything else;
// note that we don't have to generate any new nodes when going into the right branch
// of the tree as everything on the left would have been in the past, so we don't care about them
new_b_accumulator += parent.ds[n]; // add d0
new_f_accumulator += params.fs[i]; // f_i
}
}
self.nodes.push(parent.derive_target_child_with_partials(
params,
target_epoch.as_tau(),
&new_b_accumulator,
&new_f_accumulator,
&mut rng,
));
Ok(())
fn new_root(a: G1Projective, b: G2Projective, dh: Vec<G2Projective>, e: G2Projective) -> Self {
DecryptionKey { a, b, dh, e }
}
pub fn to_bytes(&self) -> Vec<u8> {
let num_nodes = self.nodes.len() as u32;
let g1_elements = 1;
let g2_elements = self.dh.len() + 2;
// unfortunately we're not going to know the expected capacity
let mut bytes = Vec::new();
bytes.extend_from_slice(&num_nodes.to_be_bytes());
// the extra 8 comes from the triple u32 we use for encoding lengths of ds and dh
let mut bytes = Vec::with_capacity(g1_elements * 48 + g2_elements * 96 + 8);
for node in &self.nodes {
let mut node_bytes = node.to_bytes();
bytes.extend_from_slice(&((node_bytes.len() as u32).to_be_bytes()));
bytes.append(&mut node_bytes)
bytes.extend_from_slice(self.a.to_bytes().as_ref());
bytes.extend_from_slice(self.b.to_bytes().as_ref());
bytes.extend_from_slice(&((self.dh.len() as u32).to_be_bytes()));
for dh_i in &self.dh {
bytes.extend_from_slice(dh_i.to_bytes().as_ref());
}
bytes.extend_from_slice(self.e.to_bytes().as_ref());
bytes
}
pub fn try_from_bytes(b: &[u8]) -> Result<Self, DkgError> {
// we have to be able to read the length of nodes
if b.len() < 4 {
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, DkgError> {
// at the very least we require bytes for:
// - a ( 48 )
// - b ( 96 )
// - length indication of dh ( 4 )
// - e ( 96 )
if bytes.len() < 48 + 96 + 4 + 96 {
return Err(DkgError::new_deserialization_failure(
"DecryptionKey",
"Node",
"insufficient number of bytes provided",
));
}
let nodes_len = u32::from_be_bytes([b[0], b[1], b[2], b[3]]) as usize;
let mut nodes = Vec::with_capacity(nodes_len);
let mut i = 4;
for _ in 0..nodes_len {
// check if we can actually read the length...
if b[i..].len() < 4 {
return Err(DkgError::new_deserialization_failure(
"DecryptionKey.Node",
"insufficient number of bytes provided for BTE Node recovery",
));
}
let mut i = 0;
let a = deserialize_g1(&bytes[i..i + 48]).ok_or_else(|| {
DkgError::new_deserialization_failure("Node.a", "invalid curve point")
})?;
i += 48;
let node_bytes = u32::from_be_bytes([b[i], b[i + 1], b[i + 2], b[i + 3]]) as usize;
if b[i + 4..].len() < node_bytes {
return Err(DkgError::new_deserialization_failure(
"DecryptionKey.Node",
"insufficient number of bytes provided for BTE Node recovery",
));
}
i += 4;
let b = deserialize_g2(&bytes[i..i + 96]).ok_or_else(|| {
DkgError::new_deserialization_failure("Node.b", "invalid curve point")
})?;
i += 96;
let node = Node::try_from_bytes(&b[i..i + node_bytes])?;
nodes.push(node);
i += node_bytes;
let dh_len = u32::from_be_bytes((&bytes[i..i + 4]).try_into().unwrap()) as usize;
i += 4;
if bytes[i..].len() != (dh_len + 1) * 96 {
return Err(DkgError::new_deserialization_failure(
"Node",
"insufficient number of bytes provided (dh)",
));
}
Ok(DecryptionKey { nodes })
let mut dh = Vec::with_capacity(dh_len);
for j in 0..dh_len {
let dh_i = deserialize_g2(&bytes[i..i + 96]).ok_or_else(|| {
DkgError::new_deserialization_failure(
format!("Node.dh_{}", j),
"invalid curve point",
)
})?;
dh.push(dh_i);
i += 96;
}
let e = deserialize_g2(&bytes[i..]).ok_or_else(|| {
DkgError::new_deserialization_failure("Node.h", "invalid curve point")
})?;
Ok(Self { a, b, dh, e })
}
}
pub struct KeyPair {
pub(crate) private_key: DecryptionKey,
pub(crate) public_key: PublicKeyWithProof,
}
impl KeyPair {
pub fn new(params: &Params, rng: impl RngCore) -> Self {
let (dk, pk) = keygen(params, rng);
Self {
private_key: dk,
public_key: pk,
}
}
pub fn private_key(&self) -> &DecryptionKey {
&self.private_key
}
pub fn public_key(&self) -> &PublicKeyWithProof {
&self.public_key
}
pub fn from_bytes(priv_bytes: &[u8], pub_bytes: &[u8]) -> Result<Self, DkgError> {
Ok(KeyPair {
private_key: DecryptionKey::try_from_bytes(priv_bytes)?,
public_key: PublicKeyWithProof::try_from_bytes(pub_bytes)?,
})
}
}
impl PemStorableKeyPair for KeyPair {
type PrivatePemKey = DecryptionKey;
type PublicPemKey = PublicKeyWithProof;
fn private_key(&self) -> &Self::PrivatePemKey {
self.private_key()
}
fn public_key(&self) -> &Self::PublicPemKey {
self.public_key()
}
fn from_keys(private_key: Self::PrivatePemKey, public_key: Self::PublicPemKey) -> Self {
KeyPair {
private_key,
public_key,
}
}
}
@@ -646,163 +295,8 @@ impl DecryptionKey {
mod tests {
use super::*;
use crate::bte::setup;
use bitvec::bitvec;
use bitvec::order::Msb0;
use rand_core::SeedableRng;
#[test]
#[ignore] // expensive test
fn basic_coverage_nodes() {
// it's some basic test I've been performing when writing the update function, but figured
// might as well put it into a unit test. note that it doesn't check the entire structure,
// but just the few last nodes of low height
let params = setup();
let dummy_seed = [1u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let (mut dk, _) = keygen(&params, &mut rng);
let root_node_copy = dk.nodes.clone();
// this is a root node
assert_eq!(dk.nodes.len(), 1);
assert!(dk.nodes[0].is_root());
// we have to have a node for right branch on each height (1, 01, 001, ... etc)
// plus an additional one for the two left-most leaves (epochs "0" and "1")
dk.try_update_to(Epoch::new(0), &params, &mut rng).unwrap();
assert_eq!(dk.nodes.len(), 33);
let expected_last = Tau::new(0);
// (and yes, I had to look up those names in a thesaurus)
let expected_penultimate = Tau::new(1);
// note that this value is 31bit long
let expected_antepenultimate = Tau(bitvec![u32, Msb0;
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 1
]);
let mut nodes_iter = dk.nodes.iter().rev();
assert_eq!(expected_last, nodes_iter.next().unwrap().tau);
assert_eq!(expected_penultimate, nodes_iter.next().unwrap().tau);
assert_eq!(expected_antepenultimate, nodes_iter.next().unwrap().tau);
let mut epoch_zero_nodes = dk.nodes.clone();
// nodes for epoch1 should be identical for those for epoch0 minus the 00..00 leaf
dk.try_update_to(Epoch::new(1), &params, &mut rng).unwrap();
assert_eq!(dk.nodes.len(), 32);
epoch_zero_nodes.pop().unwrap();
assert_eq!(
epoch_zero_nodes
.iter()
.map(|node| node.tau.clone())
.collect::<Vec<_>>(),
dk.nodes
.iter()
.map(|node| node.tau.clone())
.collect::<Vec<_>>()
);
dk.try_update_to(Epoch::new(2), &params, &mut rng).unwrap();
dk.try_update_to(Epoch::new(3), &params, &mut rng).unwrap();
dk.try_update_to(Epoch::new(4), &params, &mut rng).unwrap();
let expected_last = Tau::new(4);
let expected_penultimate = Tau::new(5);
let expected_antepenultimate = Tau(bitvec![u32, Msb0;
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1
]);
let expected_preantepenultimate = Tau(bitvec![u32, Msb0;
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1
]);
assert_eq!(dk.nodes.len(), 32);
let mut nodes_iter = dk.nodes.iter().rev();
assert_eq!(expected_last, nodes_iter.next().unwrap().tau);
assert_eq!(expected_penultimate, nodes_iter.next().unwrap().tau);
assert_eq!(expected_antepenultimate, nodes_iter.next().unwrap().tau);
assert_eq!(expected_preantepenultimate, nodes_iter.next().unwrap().tau);
// the result should be the same of regardless if we update incrementally or go to the target immediately
let mut new_root = DecryptionKey {
nodes: root_node_copy,
};
new_root
.try_update_to(Epoch::new(4), &params, &mut rng)
.unwrap();
assert_eq!(
dk.nodes
.iter()
.map(|node| node.tau.clone())
.collect::<Vec<_>>(),
new_root
.nodes
.iter()
.map(|node| node.tau.clone())
.collect::<Vec<_>>()
);
// getting expected nodes for those epochs is non-trivial for test purposes, but the last node
// should ALWAYS be equal to the target epoch
dk.try_update_to(Epoch::new(42), &params, &mut rng).unwrap();
assert_eq!(dk.nodes.last().unwrap().tau, Tau::new(42));
dk.try_update_to(Epoch::new(123456), &params, &mut rng)
.unwrap();
assert_eq!(dk.nodes.last().unwrap().tau, Tau::new(123456));
dk.try_update_to(Epoch::new(3292547435), &params, &mut rng)
.unwrap();
assert_eq!(dk.nodes.last().unwrap().tau, Tau::new(3292547435));
// trying to go to past epochs fails
assert!(dk
.try_update_to(Epoch::new(531), &params, &mut rng)
.is_err())
}
#[test]
#[ignore] // expensive test
fn updating_to_next_epoch() {
let params = setup();
let dummy_seed = [1u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let (mut dk, _) = keygen(&params, &mut rng);
// for root node current epoch is `None`
assert_eq!(None, dk.current_epoch(&params).unwrap());
// for root node it should result in epoch 0
dk.try_update_to_next_epoch(&params, &mut rng).unwrap();
assert_eq!(Some(Epoch::new(0)), dk.current_epoch(&params).unwrap());
dk.try_update_to_next_epoch(&params, &mut rng).unwrap();
assert_eq!(Some(Epoch::new(1)), dk.current_epoch(&params).unwrap());
dk.try_update_to_next_epoch(&params, &mut rng).unwrap();
assert_eq!(Some(Epoch::new(2)), dk.current_epoch(&params).unwrap());
// if we start from some non-root epoch, it should result in l + 1
dk.try_update_to(Epoch::new(42), &params, &mut rng).unwrap();
dk.try_update_to_next_epoch(&params, &mut rng).unwrap();
assert_eq!(Some(Epoch::new(43)), dk.current_epoch(&params).unwrap());
dk.try_update_to(Epoch::new(12345), &params, &mut rng)
.unwrap();
dk.try_update_to_next_epoch(&params, &mut rng).unwrap();
assert_eq!(Some(Epoch::new(12346)), dk.current_epoch(&params).unwrap());
dk.try_update_to(Epoch::new(3292547435), &params, &mut rng)
.unwrap();
dk.try_update_to_next_epoch(&params, &mut rng).unwrap();
assert_eq!(
Some(Epoch::new(3292547436)),
dk.current_epoch(&params).unwrap()
);
}
#[test]
fn public_key_with_proof_roundtrip() {
let params = setup();
@@ -816,64 +310,4 @@ mod tests {
assert_eq!(pk, recovered)
}
#[test]
#[ignore] // expensive test
fn bte_node_roundtrip() {
let params = setup();
let dummy_seed = [1u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let (mut dk, _) = keygen(&params, &mut rng);
let root_node = dk.nodes[0].clone();
let bytes = root_node.to_bytes();
let recovered = Node::try_from_bytes(&bytes).unwrap();
assert_eq!(root_node, recovered);
dk.try_update_to(Epoch::new(3292547435), &params, &mut rng)
.unwrap();
for node in &dk.nodes {
let bytes = node.to_bytes();
let recovered = Node::try_from_bytes(&bytes).unwrap();
assert_eq!(node, &recovered);
}
}
#[test]
#[ignore] // expensive test
fn decryption_key_node_roundtrip() {
let params = setup();
let dummy_seed = [1u8; 32];
let mut rng = rand_chacha::ChaCha20Rng::from_seed(dummy_seed);
let (mut dk, _) = keygen(&params, &mut rng);
let bytes = dk.to_bytes();
let recovered = DecryptionKey::try_from_bytes(&bytes).unwrap();
assert_eq!(dk, recovered);
dk.try_update_to(Epoch::new(0), &params, &mut rng).unwrap();
let bytes = dk.to_bytes();
let recovered = DecryptionKey::try_from_bytes(&bytes).unwrap();
assert_eq!(dk, recovered);
dk.try_update_to(Epoch::new(1), &params, &mut rng).unwrap();
let bytes = dk.to_bytes();
let recovered = DecryptionKey::try_from_bytes(&bytes).unwrap();
assert_eq!(dk, recovered);
dk.try_update_to(Epoch::new(42), &params, &mut rng).unwrap();
let bytes = dk.to_bytes();
let recovered = DecryptionKey::try_from_bytes(&bytes).unwrap();
assert_eq!(dk, recovered);
dk.try_update_to(Epoch::new(3292547435), &params, &mut rng)
.unwrap();
let bytes = dk.to_bytes();
let recovered = DecryptionKey::try_from_bytes(&bytes).unwrap();
assert_eq!(dk, recovered);
}
}
+6 -394
View File
@@ -1,17 +1,11 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::DkgError;
use crate::utils::{hash_g2, RandomOracleBuilder};
use crate::utils::hash_g2;
use crate::{Chunk, Share};
use bitvec::field::BitField;
use bitvec::order::Msb0;
use bitvec::vec::BitVec;
use bitvec::view::BitView;
use bls12_381::{G1Affine, G1Projective, G2Affine, G2Prepared, G2Projective, Gt};
use bls12_381::{G1Affine, G2Affine, G2Prepared, G2Projective, Gt};
use group::Curve;
use lazy_static::lazy_static;
use zeroize::Zeroize;
pub mod encryption;
pub mod keys;
@@ -35,10 +29,6 @@ lazy_static! {
// https://datatracker.ietf.org/doc/html/draft-irtf-cfrg-hash-to-curve-11#section-3.1
const SETUP_DOMAIN: &[u8] = b"NYM_COCONUT_NIDKG_V01_CS01_WITH_BLS12381G2_XMD:SHA-256_SSWU_RO_SETUP";
// this particular domain is not for curve hashing, but might as well also follow the same naming pattern
const TREE_TAU_EXTENSION_DOMAIN: &[u8] = b"NYM_COCONUT_NIDKG_V01_CS01_SHA-256_TREE_EXTENSION";
const MAX_EPOCHS_EXP: usize = 32;
const HASH_SECURITY_PARAM: usize = 256;
// note: CHUNK_BYTES * NUM_CHUNKS must equal to SCALAR_SIZE
@@ -49,253 +39,17 @@ pub const SCALAR_SIZE: usize = 32;
/// In paper B; number of distinct chunks
pub const CHUNK_SIZE: usize = 1 << (CHUNK_BYTES << 3);
pub(crate) type EpochStore = u32;
#[derive(Clone, Debug, PartialEq, PartialOrd)]
// None empty bitvec implies this is a root node
pub(crate) struct Tau(BitVec<EpochStore, Msb0>);
impl Tau {
pub fn new_root() -> Self {
Tau(BitVec::new())
}
// TODO: perhaps this should be explicitly moved to some test module
#[cfg(test)]
pub(crate) fn new(epoch: EpochStore) -> Self {
Tau(epoch.view_bits().to_bitvec())
}
#[allow(unused)]
pub fn left_child(&self) -> Self {
let mut child = self.0.clone();
child.push(false);
Tau(child)
}
pub fn right_child(&self) -> Self {
let mut child = self.0.clone();
child.push(true);
Tau(child)
}
pub fn is_leaf(&self, params: &Params) -> bool {
self.height() == params.lambda_t
}
pub fn try_get_parent_at_height(&self, height: usize) -> Result<Self, DkgError> {
if height > self.0.len() {
return Err(DkgError::NotAValidParent);
}
Ok(Tau(self.0[..height].to_bitvec()))
}
// essentially is this tau prefixing the other
pub fn is_parent_of(&self, other: &Tau) -> bool {
if self.0.len() > other.0.len() {
return false;
}
for (i, b) in self.0.iter().enumerate() {
if b != other.0[i] {
return false;
}
}
true
}
pub fn lowest_valid_epoch_child(&self, params: &Params) -> Result<Epoch, DkgError> {
if self.0.len() > params.lambda_t {
// this node is already BELOW a valid leaf-epoch node. it can only happen
// if either some invariant was broken or additional data was pushed to `tau`
// in order compute some intermediate results, but in that case this method should have
// never been called anyway. tl;dr: if this is called, the underlying key is malformed
return Err(DkgError::NotAValidParent);
}
let mut child = self.0.clone();
for _ in 0..(params.lambda_t - self.0.len()) {
child.push(false)
}
// the unwrap here is fine as we ensure we have exactly `params.tree_height` bits here
// (we could just propagate the error instead of unwraping and putting it behind an `Ok` anyway
// but I'd prefer to just blow up since this would be a serious error
Ok(Epoch::try_from_tau(&Tau(child), params).unwrap())
}
pub fn height(&self) -> usize {
self.0.len()
}
fn extend(
&self,
rr: &[G1Projective; NUM_CHUNKS],
ss: &[G1Projective; NUM_CHUNKS],
cc: &[[G1Projective; NUM_CHUNKS]],
) -> Self {
let mut random_oracle_builder = RandomOracleBuilder::new(TREE_TAU_EXTENSION_DOMAIN);
random_oracle_builder.update_with_g1_elements(rr.iter());
random_oracle_builder.update_with_g1_elements(ss.iter());
for ciphertext_chunks in cc {
random_oracle_builder.update_with_g1_elements(ciphertext_chunks.iter());
}
let tau_mem = self.0.as_raw_slice();
assert_eq!(tau_mem.len(), 1, "tau length invariant was broken");
random_oracle_builder.update(tau_mem[0].to_be_bytes());
let oracle_output = random_oracle_builder.finalize();
debug_assert_eq!(oracle_output.len() * 8, HASH_SECURITY_PARAM);
let mut extended_tau = self.clone();
for byte in oracle_output {
extended_tau
.0
.extend_from_bitslice(byte.view_bits::<Msb0>())
}
extended_tau
}
// considers all lambda_t + lambda_h bits
fn evaluate_f(&self, params: &Params) -> G2Projective {
self.0
.iter()
.by_vals()
.zip(params.fs.iter().chain(params.fh.iter()))
.filter(|(i, _)| *i)
.map(|(_, f_i)| f_i)
.fold(params.f0, |acc, f_i| acc + f_i)
}
// only considers up to lambda_t bits
fn evaluate_partial_f(&self, params: &Params) -> G2Projective {
self.0
.iter()
.by_vals()
.zip(params.fs.iter())
.filter(|(i, _)| *i)
.map(|(_, f_i)| f_i)
.fold(params.f0, |acc, f_i| acc + f_i)
}
pub(crate) fn to_bytes(&self) -> Vec<u8> {
let len_bytes = (self.0.len() as u32).to_be_bytes();
len_bytes
.into_iter()
.chain(self.0.chunks(8).map(BitField::load_be))
.collect()
}
pub(crate) fn try_from_bytes(b: &[u8]) -> Result<Self, DkgError> {
if b.len() < 4 {
return Err(DkgError::new_deserialization_failure(
"Tau",
"insufficient number of bytes provided",
));
}
let tau_len = u32::from_be_bytes([b[0], b[1], b[2], b[3]]) as usize;
// maximum theoretical length
if tau_len > MAX_EPOCHS_EXP + HASH_SECURITY_PARAM {
return Err(DkgError::new_deserialization_failure(
"Tau",
format!(
"malformed length {} is greater than maximum {}",
tau_len,
MAX_EPOCHS_EXP + HASH_SECURITY_PARAM
),
));
}
if tau_len == 0 {
if b.len() != 4 {
Err(DkgError::new_deserialization_failure(
"Tau",
"malformed bytes",
))
} else {
Ok(Tau::new_root())
}
} else if b.len() == 4 {
Err(DkgError::new_deserialization_failure(
"Tau",
"insufficient number of bytes provided",
))
} else {
let mut inner = BitVec::repeat(false, tau_len);
for (slot, &byte) in inner.chunks_mut(8).zip(b[4..].iter()) {
slot.store_be(byte);
}
Ok(Tau(inner))
}
}
}
impl Zeroize for Tau {
fn zeroize(&mut self) {
for v in self.0.as_raw_mut_slice() {
v.zeroize()
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd)]
pub struct Epoch(EpochStore);
impl Epoch {
pub fn new(value: EpochStore) -> Self {
Epoch(value)
}
pub(crate) fn as_tau(&self) -> Tau {
(*self).into()
}
pub(crate) fn as_extended_tau(
&self,
rr: &[G1Projective; NUM_CHUNKS],
ss: &[G1Projective; NUM_CHUNKS],
cc: &[[G1Projective; NUM_CHUNKS]],
) -> Tau {
self.as_tau().extend(rr, ss, cc)
}
pub(crate) fn try_from_tau(tau: &Tau, params: &Params) -> Result<Self, DkgError> {
if !tau.is_leaf(params) {
Err(DkgError::MalformedEpoch)
} else {
Ok(Epoch(tau.0.load_be()))
}
}
}
impl From<Epoch> for Tau {
fn from(epoch: Epoch) -> Self {
Tau(epoch.0.view_bits().to_bitvec())
}
}
impl From<EpochStore> for Epoch {
fn from(epoch: EpochStore) -> Self {
Epoch(epoch)
}
// considers all lambda_h bits
pub fn evaluate_f(params: &Params) -> G2Projective {
params.fh.iter().fold(params.f0, |acc, f_i| acc + f_i)
}
pub struct Params {
/// Maximum size of an epoch, in bits.
pub lambda_t: usize,
/// Security parameter of our $H_{\Lamda_H}$ hash function
pub lambda_h: usize,
// keeping f0 separate from the rest of the curve points makes it easier to work with tau
f0: G2Projective,
fs: Vec<G2Projective>, // f_1, f_2, .... f_{lambda_t} in the paper
fh: Vec<G2Projective>, // f_{lambda_t+1}, f_{lambda_t+1}, .... f_{lambda_t+lambda_h} in the paper
fh: Vec<G2Projective>, // f_{lambda_h}, f_{lambda_h+1}, .... f_{lambda_h} in the paper
h: G2Projective,
/// Precomputed `h` used for the miller loop
@@ -305,10 +59,6 @@ pub struct Params {
pub fn setup() -> Params {
let f0 = hash_g2(b"f0", SETUP_DOMAIN);
let fs = (1..=MAX_EPOCHS_EXP)
.map(|i| hash_g2(format!("f{}", i), SETUP_DOMAIN))
.collect();
let fh = (0..HASH_SECURITY_PARAM)
.map(|i| hash_g2(format!("fh{}", i), SETUP_DOMAIN))
.collect();
@@ -316,148 +66,10 @@ pub fn setup() -> Params {
let h = hash_g2(b"h", SETUP_DOMAIN);
Params {
lambda_t: MAX_EPOCHS_EXP,
lambda_h: HASH_SECURITY_PARAM,
f0,
fs,
fh,
h,
_h_prepared: G2Prepared::from(h.to_affine()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use bitvec::bitvec;
use bitvec::order::Msb0;
#[test]
fn creating_tau_from_epoch() {
assert!(Tau::new_root().0.is_empty());
let zero = Tau::new(0);
assert!(zero.0.iter().by_vals().all(|b| !b));
let one = Tau::new(1);
let mut iter = one.0.iter().by_vals();
// first 31 bits are 0, the last one is 1
for _ in 0..31 {
assert!(!iter.next().unwrap())
}
assert!(iter.next().unwrap());
// 101010 in binary
let forty_two = Tau::new(42);
// first 26 bits are not set
let mut iter = forty_two.0.iter().by_vals();
for _ in 0..26 {
assert!(!iter.next().unwrap())
}
assert!(iter.next().unwrap());
assert!(!iter.next().unwrap());
assert!(iter.next().unwrap());
assert!(!iter.next().unwrap());
assert!(iter.next().unwrap());
assert!(!iter.next().unwrap());
// value that requires an actual u32 (i.e. takes 4 bytes to represent)
// 11000100_01000000_01001001_01101011 in binary
let big_val = Tau::new(3292547435);
let expected = bitvec![u32, Msb0;
1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1,
0, 1, 1
];
assert_eq!(expected, big_val.0)
}
#[test]
fn getting_parent_at_height() {
let tau = Tau(bitvec![u32, Msb0; 1,0,1,1,0,0,1]);
let expected_0 = Tau(BitVec::new());
let expected_1 = Tau(bitvec![u32, Msb0; 1]);
let expected_5 = Tau(bitvec![u32, Msb0; 1,0,1,1,0]);
assert_eq!(expected_0, tau.try_get_parent_at_height(0).unwrap());
assert_eq!(expected_1, tau.try_get_parent_at_height(1).unwrap());
assert_eq!(expected_5, tau.try_get_parent_at_height(5).unwrap());
assert_eq!(tau, tau.try_get_parent_at_height(7).unwrap());
assert!(tau.try_get_parent_at_height(8).is_err())
}
#[test]
fn converting_tau_to_epoch() {
let params = setup();
let tau0: Tau = Epoch::new(0).into();
let tau1: Tau = Epoch::new(1).into();
let tau42: Tau = Epoch::new(42).into();
let tau_big: Tau = Epoch::new(3292547435).into();
assert_eq!(Epoch::new(0), Epoch::try_from_tau(&tau0, &params).unwrap());
assert_eq!(Epoch::new(1), Epoch::try_from_tau(&tau1, &params).unwrap());
assert_eq!(
Epoch::new(42),
Epoch::try_from_tau(&tau42, &params).unwrap()
);
assert_eq!(
Epoch::new(3292547435),
Epoch::try_from_tau(&tau_big, &params).unwrap()
);
assert!(Epoch::try_from_tau(&Tau(BitVec::new()), &params).is_err());
assert!(Epoch::try_from_tau(&Tau(bitvec![u32, Msb0; 1,0,1,1,0]), &params).is_err());
let _31bit_tau = Tau(bitvec![u32, Msb0;
1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1,
0, 1
]);
assert!(Epoch::try_from_tau(&_31bit_tau, &params).is_err());
let _33bit_tau = Tau(bitvec![u32, Msb0;
1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1,
0, 1, 1, 0
]);
assert!(Epoch::try_from_tau(&_33bit_tau, &params).is_err());
}
#[test]
fn tau_roundtrip() {
let good_taus = vec![
Tau::new_root(),
Tau::new(0),
Tau::new(1),
Tau::new(2),
Tau::new(42),
Tau::new(123456),
Tau::new(3292547435),
Tau::new(u32::MAX),
];
for tau in good_taus {
let bytes = tau.to_bytes();
let recovered = Tau::try_from_bytes(&bytes).unwrap();
assert_eq!(tau, recovered);
}
// more valid variants
let mut another_tau = Tau::new(u32::MAX);
another_tau.0.push(true);
another_tau.0.push(false);
another_tau.0.push(true);
let bytes = another_tau.to_bytes();
let recovered = Tau::try_from_bytes(&bytes).unwrap();
assert_eq!(another_tau, recovered);
// ensure there are no panics
let big_length_bytes = [255, 255, 255, 255, 42];
assert!(Tau::try_from_bytes(&big_length_bytes).is_err());
assert!(Tau::try_from_bytes(&[]).is_err());
assert!(Tau::try_from_bytes(&[1, 1, 1, 1]).is_err());
assert!(Tau::try_from_bytes(&[0, 0, 0, 1]).is_err());
assert!(Tau::try_from_bytes(&[1, 0, 0, 0]).is_err());
assert!(Tau::try_from_bytes(&[1, 0, 0]).is_err());
}
}
+26 -45
View File
@@ -3,9 +3,7 @@
use crate::bte::proof_chunking::ProofOfChunking;
use crate::bte::proof_sharing::ProofOfSecretSharing;
use crate::bte::{
encrypt_shares, proof_chunking, proof_sharing, Ciphertexts, Epoch, Params, PublicKey,
};
use crate::bte::{encrypt_shares, proof_chunking, proof_sharing, Ciphertexts, Params, PublicKey};
use crate::error::DkgError;
use crate::interpolation::polynomial::{Polynomial, PublicCoefficients};
use crate::interpolation::{
@@ -17,6 +15,13 @@ use rand_core::RngCore;
use std::collections::BTreeMap;
use zeroize::Zeroize;
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct RecoveredVerificationKeys {
pub recovered_master: G2Projective,
pub recovered_partials: Vec<G2Projective>,
}
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct Dealing {
@@ -34,7 +39,6 @@ impl Dealing {
params: &Params,
dealer_index: NodeIndex,
threshold: Threshold,
epoch: Epoch,
// BTreeMap ensures the keys are sorted by their indices
receivers: &BTreeMap<NodeIndex, PublicKey>,
prior_resharing_secret: Option<Scalar>,
@@ -58,8 +62,7 @@ impl Dealing {
.collect::<Vec<_>>();
let ordered_public_keys = receivers.values().copied().collect::<Vec<_>>();
let (ciphertexts, hazmat) =
encrypt_shares(&remote_share_key_pairs, epoch, params, &mut rng);
let (ciphertexts, hazmat) = encrypt_shares(&remote_share_key_pairs, params, &mut rng);
// create proofs of knowledge
let chunking_instance = proof_chunking::Instance::new(&ordered_public_keys, &ciphertexts);
@@ -108,7 +111,6 @@ impl Dealing {
pub fn verify(
&self,
params: &Params,
epoch: Epoch,
threshold: Threshold,
receivers: &BTreeMap<NodeIndex, PublicKey>,
prior_resharing_public: Option<G2Projective>,
@@ -134,7 +136,7 @@ impl Dealing {
});
}
if !self.ciphertexts.verify_integrity(params, epoch) {
if !self.ciphertexts.verify_integrity(params) {
return Err(DkgError::FailedCiphertextIntegrityCheck);
}
@@ -250,7 +252,7 @@ pub fn try_recover_verification_keys(
dealings: &[Dealing],
threshold: Threshold,
receivers: &BTreeMap<NodeIndex, PublicKey>,
) -> Result<(G2Projective, Vec<G2Projective>), DkgError> {
) -> Result<RecoveredVerificationKeys, DkgError> {
if dealings.is_empty() {
return Err(DkgError::NoDealingsAvailable);
}
@@ -297,7 +299,10 @@ pub fn try_recover_verification_keys(
.map(|index| interpolated_coefficients.evaluate_at(&Scalar::from(*index)))
.collect();
Ok((master_verification_key, verification_key_shares))
Ok(RecoveredVerificationKeys {
recovered_master: master_verification_key,
recovered_partials: verification_key_shares,
})
}
pub fn verify_verification_keys(
@@ -369,32 +374,18 @@ mod tests {
full_keys.push((dk, pk))
}
// start off in a defined epoch (i.e. not root);
let epoch = Epoch::new(2);
let dealings = node_indices
.iter()
.map(|&dealer_index| {
Dealing::create(
&mut rng,
&params,
dealer_index,
threshold,
epoch,
&receivers,
None,
)
.0
Dealing::create(&mut rng, &params, dealer_index, threshold, &receivers, None).0
})
.collect::<Vec<_>>();
let mut derived_secrets = Vec::new();
for (i, (ref mut dk, _)) in full_keys.iter_mut().enumerate() {
dk.try_update_to(epoch, &params, &mut rng).unwrap();
let shares = dealings
.iter()
.map(|dealing| decrypt_share(dk, i, &dealing.ciphertexts, epoch, None).unwrap())
.map(|dealing| decrypt_share(dk, i, &dealing.ciphertexts, None).unwrap())
.collect();
derived_secrets.push(
combine_shares(shares, &receivers.keys().copied().collect::<Vec<_>>()).unwrap(),
@@ -408,8 +399,10 @@ mod tests {
.unwrap();
// END OF SETUP
let (recovered_master, recovered_partials) =
try_recover_verification_keys(&dealings, threshold, &receivers).unwrap();
let RecoveredVerificationKeys {
recovered_master,
recovered_partials,
} = try_recover_verification_keys(&dealings, threshold, &receivers).unwrap();
let g2 = G2Projective::generator();
assert_eq!(g2 * master_secret, recovered_master);
@@ -437,27 +430,17 @@ mod tests {
full_keys.push((dk, pk))
}
// start off in a defined epoch (i.e. not root);
let epoch = Epoch::new(2);
let dealings = node_indices
.iter()
.map(|&dealer_index| {
Dealing::create(
&mut rng,
&params,
dealer_index,
threshold,
epoch,
&receivers,
None,
)
.0
Dealing::create(&mut rng, &params, dealer_index, threshold, &receivers, None).0
})
.collect::<Vec<_>>();
let (recovered_master, recovered_partials) =
try_recover_verification_keys(&dealings, threshold, &receivers).unwrap();
let RecoveredVerificationKeys {
recovered_master,
recovered_partials,
} = try_recover_verification_keys(&dealings, threshold, &receivers).unwrap();
assert!(verify_verification_keys(
&recovered_master,
@@ -478,7 +461,6 @@ mod tests {
let parties = 5;
let threshold = ((parties as f32 * 2.) / 3. + 1.) as Threshold;
let node_indices = (1..=parties).collect::<Vec<_>>();
let epoch = Epoch::new(2);
let mut receivers = BTreeMap::new();
for index in &node_indices {
@@ -491,7 +473,6 @@ mod tests {
&params,
node_indices[0],
threshold,
epoch,
&receivers,
None,
);
+24 -56
View File
@@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use bls12_381::{G2Projective, Scalar};
use dkg::bte::{decrypt_share, keygen, setup, Epoch};
use dkg::bte::{decrypt_share, keygen, setup};
use dkg::dealing::RecoveredVerificationKeys;
use dkg::interpolation::perform_lagrangian_interpolation_at_origin;
use dkg::{combine_shares, try_recover_verification_keys, Dealing};
use rand_core::SeedableRng;
@@ -32,9 +33,6 @@ fn single_sender() {
full_keys.push((dk, pk))
}
// start off in a defined epoch (i.e. not root);
let epoch = Epoch::new(2);
// TODO: HERE BE SERIALIZATION / DESERIALIZATION THAT'S NOT IMPLEMENTED YET
// verify remote proofs of key possession
for key in full_keys.iter() {
@@ -46,23 +44,20 @@ fn single_sender() {
&params,
node_indices[0],
threshold,
epoch,
&receivers,
None,
);
dealing
.verify(&params, epoch, threshold, &receivers, None)
.verify(&params, threshold, &receivers, None)
.unwrap();
// make sure each share is actually decryptable (even though proofs say they must be, perform this sanity check)
for (i, (ref mut dk, _)) in full_keys.iter_mut().enumerate() {
dk.try_update_to(epoch, &params, &mut rng).unwrap();
let _recovered = decrypt_share(dk, i, &dealing.ciphertexts, epoch, None).unwrap();
let _recovered = decrypt_share(dk, i, &dealing.ciphertexts, None).unwrap();
}
// and for good measure, check that the dealer's share matches decryption result
let recovered_dealer =
decrypt_share(&full_keys[0].0, 0, &dealing.ciphertexts, epoch, None).unwrap();
let recovered_dealer = decrypt_share(&full_keys[0].0, 0, &dealing.ciphertexts, None).unwrap();
assert_eq!(recovered_dealer, dealer_share.unwrap());
}
@@ -87,9 +82,6 @@ fn full_threshold_secret_sharing() {
full_keys.push((dk, pk))
}
// start off in a defined epoch (i.e. not root);
let epoch = Epoch::new(2);
// TODO: HERE BE SERIALIZATION / DESERIALIZATION THAT'S NOT IMPLEMENTED YET
// verify remote proofs of key possession
for key in full_keys.iter() {
@@ -99,37 +91,28 @@ fn full_threshold_secret_sharing() {
let dealings = node_indices
.iter()
.map(|&dealer_index| {
Dealing::create(
&mut rng,
&params,
dealer_index,
threshold,
epoch,
&receivers,
None,
)
.0
Dealing::create(&mut rng, &params, dealer_index, threshold, &receivers, None).0
})
.collect::<Vec<_>>();
for dealing in dealings.iter() {
dealing
.verify(&params, epoch, threshold, &receivers, None)
.verify(&params, threshold, &receivers, None)
.unwrap();
}
// recover verification keys
let (recovered_master, recovered_partials) =
try_recover_verification_keys(&dealings, threshold, &receivers).unwrap();
let RecoveredVerificationKeys {
recovered_master,
recovered_partials,
} = try_recover_verification_keys(&dealings, threshold, &receivers).unwrap();
let g2 = G2Projective::generator();
let mut derived_secrets = Vec::new();
for (i, (ref mut dk, _)) in full_keys.iter_mut().enumerate() {
dk.try_update_to(epoch, &params, &mut rng).unwrap();
let shares = dealings
.iter()
.map(|dealing| decrypt_share(dk, i, &dealing.ciphertexts, epoch, None).unwrap())
.map(|dealing| decrypt_share(dk, i, &dealing.ciphertexts, None).unwrap())
.collect();
// we know dealer_share matches, but it would be inconvenient to try to put them in here,
@@ -183,36 +166,24 @@ fn full_threshold_secret_resharing() {
full_keys.push((dk, pk))
}
// start off in a defined epoch (i.e. not root);
let epoch = Epoch::new(2);
let first_dealings = node_indices
.iter()
.map(|&dealer_index| {
Dealing::create(
&mut rng,
&params,
dealer_index,
threshold,
epoch,
&receivers,
None,
)
.0
Dealing::create(&mut rng, &params, dealer_index, threshold, &receivers, None).0
})
.collect::<Vec<_>>();
// recover verification keys
let (public_original_master, recovered_partials) =
try_recover_verification_keys(&first_dealings, threshold, &receivers).unwrap();
let RecoveredVerificationKeys {
recovered_master: public_original_master,
recovered_partials,
} = try_recover_verification_keys(&first_dealings, threshold, &receivers).unwrap();
let mut derived_secrets = Vec::new();
for (i, (ref mut dk, _)) in full_keys.iter_mut().enumerate() {
dk.try_update_to(epoch, &params, &mut rng).unwrap();
let shares = first_dealings
.iter()
.map(|dealing| decrypt_share(dk, i, &dealing.ciphertexts, epoch, None).unwrap())
.map(|dealing| decrypt_share(dk, i, &dealing.ciphertexts, None).unwrap())
.collect();
let recovered_secret =
@@ -227,8 +198,6 @@ fn full_threshold_secret_resharing() {
])
.unwrap();
let next_epoch = Epoch::new(3);
// attempt to create resharing dealings!
let resharing_dealings = node_indices
.iter()
@@ -239,7 +208,6 @@ fn full_threshold_secret_resharing() {
&params,
dealer_index,
threshold,
next_epoch,
&receivers,
Some(*prior_secret),
)
@@ -249,21 +217,21 @@ fn full_threshold_secret_resharing() {
for (reshared_dealing, prior_vk) in resharing_dealings.iter().zip(recovered_partials.iter()) {
reshared_dealing
.verify(&params, next_epoch, threshold, &receivers, Some(*prior_vk))
.verify(&params, threshold, &receivers, Some(*prior_vk))
.unwrap();
}
// recover verification keys
let (public_reshared_master, reshared_partials) =
try_recover_verification_keys(&resharing_dealings, threshold, &receivers).unwrap();
let RecoveredVerificationKeys {
recovered_master: public_reshared_master,
recovered_partials: reshared_partials,
} = try_recover_verification_keys(&resharing_dealings, threshold, &receivers).unwrap();
let mut reshared_secrets = Vec::new();
for (i, (ref mut dk, _)) in full_keys.iter_mut().enumerate() {
dk.try_update_to(next_epoch, &params, &mut rng).unwrap();
let shares = resharing_dealings
.iter()
.map(|dealing| decrypt_share(dk, i, &dealing.ciphertexts, next_epoch, None).unwrap())
.map(|dealing| decrypt_share(dk, i, &dealing.ciphertexts, None).unwrap())
.collect();
let recovered_secret =
+13
View File
@@ -36,6 +36,7 @@ pub struct NymContracts {
pub bandwidth_claim_contract_address: Option<String>,
pub coconut_bandwidth_contract_address: Option<String>,
pub multisig_contract_address: Option<String>,
pub coconut_dkg_contract_address: Option<String>,
}
// I wanted to use the simpler `NetworkDetails` name, but there's a clash
@@ -96,6 +97,9 @@ impl NymNetworkDetails {
.with_multisig_contract(Some(
var(var_names::MULTISIG_CONTRACT_ADDRESS).expect("multisig contract not set"),
))
.with_coconut_dkg_contract(Some(
var(var_names::COCONUT_DKG_CONTRACT_ADDRESS).expect("coconut dkg contract not set"),
))
}
pub fn new_mainnet() -> Self {
@@ -121,6 +125,9 @@ impl NymNetworkDetails {
mainnet::COCONUT_BANDWIDTH_CONTRACT_ADDRESS,
),
multisig_contract_address: parse_optional_str(mainnet::MULTISIG_CONTRACT_ADDRESS),
coconut_dkg_contract_address: parse_optional_str(
mainnet::COCONUT_DKG_CONTRACT_ADDRESS,
),
},
}
}
@@ -190,6 +197,12 @@ impl NymNetworkDetails {
self.contracts.multisig_contract_address = contract.map(Into::into);
self
}
#[must_use]
pub fn with_coconut_dkg_contract<S: Into<String>>(mut self, contract: Option<S>) -> Self {
self.contracts.coconut_dkg_contract_address = contract.map(Into::into);
self
}
}
#[derive(Debug, Copy, Serialize, Deserialize, Clone, PartialEq, Eq)]

Some files were not shown because too many files have changed in this diff Show More