Compare commits

..

1 Commits

Author SHA1 Message Date
Tommy Verrall 2fa21dc4e9 amend wallet for sandbox link 2025-06-26 17:20:29 +02:00
695 changed files with 13931 additions and 29885 deletions
@@ -38,14 +38,15 @@ jobs:
rm -rf ci-builds || true
mkdir -p $OUTPUT_DIR
echo $OUTPUT_DIR
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libudev-dev
- name: Sets env vars for tokio if set in manual dispatch inputs
if: github.event_name == 'workflow_dispatch' && inputs.add_tokio_unstable == true
run: |
echo "RUSTFLAGS=--cfg tokio_unstable" >> $GITHUB_ENV
echo "CARGO_FEATURES=--features tokio-console" >> $GITHUB_ENV
echo 'RUSTFLAGS="--cfg tokio_unstable"' >> $GITHUB_ENV
if: github.event_name == 'workflow_dispatch' && inputs.add_tokio_unstable == true
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
@@ -102,6 +103,7 @@ jobs:
if [ ${{ github.event_name == 'workflow_dispatch' && inputs.enable_deb == true }} = true ]; then
cp target/debian/*.deb $OUTPUT_DIR
fi
- name: Deploy branch to CI www
continue-on-error: true
uses: easingthemes/ssh-deploy@main
+7 -7
View File
@@ -38,7 +38,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ arc-linux-latest, custom-windows-11, custom-macos-15 ]
os: [ arc-ubuntu-22.04, custom-windows-11, custom-macos-15 ]
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
@@ -46,9 +46,9 @@ jobs:
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools protobuf-compiler cmake
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools protobuf-compiler
continue-on-error: true
if: contains(matrix.os, 'linux')
if: contains(matrix.os, 'ubuntu')
- name: Check out repository code
uses: actions/checkout@v4
@@ -63,7 +63,7 @@ jobs:
# To avoid running out of disk space, skip generating debug symbols
- name: Set debug to false (unix)
if: contains(matrix.os, 'linux') || contains(matrix.os, 'mac')
if: contains(matrix.os, 'ubuntu') || contains(matrix.os, 'mac')
run: |
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
git diff
@@ -93,14 +93,14 @@ jobs:
command: build
- name: Build all examples
if: contains(matrix.os, 'linux')
if: contains(matrix.os, 'ubuntu')
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace --examples
- name: Run all tests
if: contains(matrix.os, 'linux')
if: contains(matrix.os, 'ubuntu')
uses: actions-rs/cargo@v1
env:
NYM_API: https://sandbox-nym-api1.nymtech.net/api
@@ -109,7 +109,7 @@ jobs:
args: --workspace
- name: Run expensive tests
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && contains(matrix.os, 'linux')
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && contains(matrix.os, 'ubuntu')
uses: actions-rs/cargo@v1
with:
command: test
@@ -31,26 +31,33 @@ jobs:
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: stable
target: wasm32-unknown-unknown
override: true
- name: Install wasm-opt
uses: ./.github/actions/install-wasm-opt
with:
version: '114'
- name: Install cosmwasm-check
run: cargo install cosmwasm-check
- name: Build release contracts
run: make publish-contracts
run: make contracts
- name: Prepare build output
shell: bash
env:
OUTPUT_DIR: ci-contract-builds/${{ github.ref_name }}
run: |
find contracts/artifacts -maxdepth 1 -type f -name '*.wasm' -exec cp {} $OUTPUT_DIR \;
# Also include the optimizer-generated checksums if present
if [ -f contracts/artifacts/checksums.txt ]; then
cp contracts/artifacts/checksums.txt $OUTPUT_DIR
fi
cp contracts/target/wasm32-unknown-unknown/release/mixnet_contract.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/vesting_contract.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/nym_coconut_dkg.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/cw3_flex_multisig.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/cw4_group.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/nym_ecash.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/nym_pool_contract.wasm $OUTPUT_DIR
cp contracts/target/wasm32-unknown-unknown/release/nym_performance_contract.wasm $OUTPUT_DIR
- name: Deploy branch to CI www
continue-on-error: true
-19
View File
@@ -1,19 +0,0 @@
name: Run SonarQube Scan
on:
push:
branches:
- develop
# pull_request:
# types: [opened, synchronize, reopened]
jobs:
sonarqube:
name: SonarQube
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
- name: SonarQube Scan
uses: SonarSource/sonarqube-scan-action@v5
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
+15 -38
View File
@@ -5,15 +5,8 @@ on:
inputs:
gateway_probe_git_ref:
type: string
default: nym-vpn-core-v1.4.0
required: true
description: Which gateway probe git ref to build the image with
release_image:
description: 'Tag image as a release'
required: true
default: false
type: boolean
env:
WORKING_DIRECTORY: "nym-node-status-api/nym-node-status-agent"
CONTAINER_NAME: "node-status-agent"
@@ -38,10 +31,10 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
id: get_version
run: |
VERSION=$(yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml)
echo "result=$VERSION" >> $GITHUB_OUTPUT
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: cleanup-gateway-probe-ref
id: cleanup_gateway_probe_ref
@@ -50,35 +43,19 @@ jobs:
GIT_REF_SLUG="${GATEWAY_PROBE_GIT_REF//\//-}"
echo "git_ref=${GIT_REF_SLUG}" >> $GITHUB_OUTPUT
- name: Set GIT_TAG variable
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
fi
- name: Initialize RELEASE_TAG
run: echo "RELEASE_TAG=" >> $GITHUB_ENV
- name: Set RELEASE_TAG for release
if: github.event.inputs.release_image == 'true'
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
- name: Set IMAGE_NAME_AND_TAGS variable
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
- name: New env vars
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
# - name: Remove existing tag if exists
# run: |
# if git rev-parse $${{ env.GIT_TAG }} >/dev/null 2>&1; then
# git push --delete origin $${{ env.GIT_TAG }}
# git tag -d $${{ env.GIT_TAG }}
# fi
# - name: Create tag
# run: |
# git tag -a $${{ env.GIT_TAG }} -m "Version ${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}"
# git push origin $${{ env.GIT_TAG }}
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }} -m "Version ${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
- name: BuildAndPushImageOnHarbor
run: |
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }}
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
+20 -39
View File
@@ -1,13 +1,7 @@
name: Build and upload Node Status API container to harbor.nymte.ch
on:
workflow_dispatch:
inputs:
release_image:
description: 'Tag image as a release'
required: true
default: false
type: boolean
env:
WORKING_DIRECTORY: "nym-node-status-api/nym-node-status-api"
CONTAINER_NAME: "node-status-api"
@@ -32,43 +26,30 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Check if tag exists
run: |
VERSION=$(yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml)
echo "result=$VERSION" >> $GITHUB_OUTPUT
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
echo "Tag ${{ steps.get_version.outputs.result }} already exists"
fi
- name: Set GIT_TAG variable
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Initialise RELEASE_TAG
run: echo "RELEASE_TAG=" >> $GITHUB_ENV
- name: Set RELEASE_TAG for release
if: github.event.inputs.release_image == 'true'
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
- name: Set IMAGE_NAME_AND_TAGS variable
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
- name: New env vars
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
# - name: Remove existing tag if exists, then create
# run: |
# if git rev-parse "$GIT_TAG" >/dev/null 2>&1; then
# echo "Tag '$GIT_TAG' already exists, deleting"
# git push --delete origin "$GIT_TAG"
# git tag -d "$GIT_TAG"
# echo "Tag '$GIT_TAG' deleted"
# else
# echo "Tag '$GIT_TAG' does not exist, creating it"
# git tag -a $GIT_TAG -m "Version ${{ steps.get_version.outputs.result }}"
# git push origin $GIT_TAG
# echo "Tag '$GIT_TAG' created"
# fi
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile-sqlite . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
-2
View File
@@ -35,8 +35,6 @@ validator-api/keypair
contracts/mixnet/code_id
contracts/mixnet/Justfile
contracts/mixnet/Makefile
artifacts
contracts/artifacts
validator-config
*.patch
validator-api-config.toml
-152
View File
@@ -4,158 +4,6 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2025.15-gruyere] (2025-08-20)
- Migrate strum to 0.27.2 ([#5960])
- WG exit policy scripts update ([#5921])
- Make DNS Resolver fallback optional ([#5920])
- nym-node debug command to reset providers db ([#5914])
- basic zulip client for sending messages ([#5913])
- chore: allow compatibility with 'CDLA-Permissive-2.0' ([#5910])
- feat: ecash liveness check ([#5890])
- Remove old free credential handle ([#5864])
[#5960]: https://github.com/nymtech/nym/pull/5960
[#5921]: https://github.com/nymtech/nym/pull/5921
[#5920]: https://github.com/nymtech/nym/pull/5920
[#5914]: https://github.com/nymtech/nym/pull/5914
[#5913]: https://github.com/nymtech/nym/pull/5913
[#5910]: https://github.com/nymtech/nym/pull/5910
[#5890]: https://github.com/nymtech/nym/pull/5890
[#5864]: https://github.com/nymtech/nym/pull/5864
## [2025.14-feta] (2025-08-05)
- chore: nym node tokio console ([#5909])
- Feature/dkg snapshot epoch ([#5900])
- Feature/dkg epoch dealers query ([#5899])
- sqlx-pool-guard: allocate more memory on windows ([#5896])
- Support mnemonic in the NS agent ([#5883])
- Allow PG database backend ([#5880])
[#5909]: https://github.com/nymtech/nym/pull/5909
[#5900]: https://github.com/nymtech/nym/pull/5900
[#5899]: https://github.com/nymtech/nym/pull/5899
[#5896]: https://github.com/nymtech/nym/pull/5896
[#5883]: https://github.com/nymtech/nym/pull/5883
[#5880]: https://github.com/nymtech/nym/pull/5880
## [2025.13-emmental] (2025-07-22)
- fix: don't allow mixnode running in exit mode ([#5898])
- fix contract build process in Makefile ([#5892])
- bugfix: ignore 'Send' responses when claiming bandwidth ([#5884])
- Update push-node-status-agent.yaml ([#5882])
- listen for shutdown signals during nym-node startup ([#5879])
- feat: forbid running mixnode + entry on the same node ([#5878])
- chore: 1.88 clippy ([#5877])
- Batch SQL writes for packet stats ([#5874])
- fix the broken link ([#5873])
- Set busy_timeout in sqlx ([#5872])
- feat: basic performance contract integration [within Nym API] ([#5871])
- scraper bugfix: ignore precommits from missing validators ([#5867])
- Return true remaining ([#5866])
- Make Mix hops optional for Mixnet Client SURBs ([#5861])
- Check gateway supported versions ([#5860])
- Add build info endpoints ([#5857])
- Clear out screaming logs ([#5856])
- fix removal of qa env ([#5855])
- Use display when printing paths ([#5853])
- feat: initial performance contract ([#5833])
- Security patches for the `dkg` crate ([#5828])
- HTTP Discovery objects & network defaults ([#5814])
[#5898]: https://github.com/nymtech/nym/pull/5898
[#5892]: https://github.com/nymtech/nym/pull/5892
[#5884]: https://github.com/nymtech/nym/pull/5884
[#5882]: https://github.com/nymtech/nym/pull/5882
[#5879]: https://github.com/nymtech/nym/pull/5879
[#5878]: https://github.com/nymtech/nym/pull/5878
[#5877]: https://github.com/nymtech/nym/pull/5877
[#5874]: https://github.com/nymtech/nym/pull/5874
[#5873]: https://github.com/nymtech/nym/pull/5873
[#5872]: https://github.com/nymtech/nym/pull/5872
[#5871]: https://github.com/nymtech/nym/pull/5871
[#5867]: https://github.com/nymtech/nym/pull/5867
[#5866]: https://github.com/nymtech/nym/pull/5866
[#5861]: https://github.com/nymtech/nym/pull/5861
[#5860]: https://github.com/nymtech/nym/pull/5860
[#5857]: https://github.com/nymtech/nym/pull/5857
[#5856]: https://github.com/nymtech/nym/pull/5856
[#5855]: https://github.com/nymtech/nym/pull/5855
[#5853]: https://github.com/nymtech/nym/pull/5853
[#5833]: https://github.com/nymtech/nym/pull/5833
[#5828]: https://github.com/nymtech/nym/pull/5828
[#5814]: https://github.com/nymtech/nym/pull/5814
## [2025.12-dolcelatte] (2025-07-07)
- bugfix: key-rotation + reply SURBs ([#5876])
- Bugfix/backwards compat ([#5865])
- bugfix: allow gateways to permit authentication from v4 clients ([#5862])
- fixed client route for obtaining v2 list of gateways ([#5859])
- Updated browser extension piece removal ([#5849])
- Remove/old env references ([#5848])
- Remove qa env ([#5847])
- remove not used old mock-api ([#5845])
- remove bity dir ([#5844])
- build(deps-dev): bump webpack-dev-server from 4.13.2 to 5.2.1 in /wasm/mix-fetch/internal-dev ([#5843])
- Amended the buy section ([#5841])
- Removing test-net faucet ([#5840])
- Feature/node status dvpn directory ([#5829])
- build(deps-dev): bump webpack-dev-server from 4.15.2 to 5.2.1 in /nym-credential-proxy/vpn-api-lib-wasm/internal-dev ([#5826])
- bugfix: fix swapped total and circulating supplies ([#5822])
- build(deps): bump tar-fs from 3.0.8 to 3.0.9 in /sdk/typescript/tests/integration-tests/mix-fetch ([#5821])
- Url scheme warning log ([#5819])
- chore: adjust heuristic for wireguard peer activity ([#5818])
- Use the same client bandwidth for top up ([#5813])
- Replace chrono with time in NS API ([#5811])
- build(deps-dev): bump http-proxy-middleware from 2.0.4 to 2.0.9 in /clients/native/examples/js-examples/websocket ([#5810])
- build(deps): bump tokio from 1.44.2 to 1.45.1 ([#5798])
- Close sqlite pool before moving or reopening databases ([#5796])
- HTTP Client Retries, Fallbacks, and Redirects ([#5789])
- feat: key rotation ([#5777])
- build(deps): bump next from 14.2.15 to 14.2.26 in /documentation/docs ([#5772])
- build(deps): bump undici from 5.28.5 to 5.29.0 in /.github/actions/nym-hash-releases/src ([#5771])
- build(deps): bump cargo_metadata from 0.18.1 to 0.19.2 ([#5765])
- build(deps): bump tempfile from 3.19.1 to 3.20.0 ([#5764])
- [Feature] Noise XKpsk3 integration (2025 version) ([#5692])
- feature: nympool contract ([#5464])
- chore: fixed typo in API endpoint parameter ([#5449])
[#5876]: https://github.com/nymtech/nym/pull/5876
[#5865]: https://github.com/nymtech/nym/pull/5865
[#5862]: https://github.com/nymtech/nym/pull/5862
[#5859]: https://github.com/nymtech/nym/pull/5859
[#5849]: https://github.com/nymtech/nym/pull/5849
[#5848]: https://github.com/nymtech/nym/pull/5848
[#5847]: https://github.com/nymtech/nym/pull/5847
[#5845]: https://github.com/nymtech/nym/pull/5845
[#5844]: https://github.com/nymtech/nym/pull/5844
[#5843]: https://github.com/nymtech/nym/pull/5843
[#5841]: https://github.com/nymtech/nym/pull/5841
[#5840]: https://github.com/nymtech/nym/pull/5840
[#5829]: https://github.com/nymtech/nym/pull/5829
[#5826]: https://github.com/nymtech/nym/pull/5826
[#5822]: https://github.com/nymtech/nym/pull/5822
[#5821]: https://github.com/nymtech/nym/pull/5821
[#5819]: https://github.com/nymtech/nym/pull/5819
[#5818]: https://github.com/nymtech/nym/pull/5818
[#5813]: https://github.com/nymtech/nym/pull/5813
[#5811]: https://github.com/nymtech/nym/pull/5811
[#5810]: https://github.com/nymtech/nym/pull/5810
[#5798]: https://github.com/nymtech/nym/pull/5798
[#5796]: https://github.com/nymtech/nym/pull/5796
[#5789]: https://github.com/nymtech/nym/pull/5789
[#5777]: https://github.com/nymtech/nym/pull/5777
[#5772]: https://github.com/nymtech/nym/pull/5772
[#5771]: https://github.com/nymtech/nym/pull/5771
[#5765]: https://github.com/nymtech/nym/pull/5765
[#5764]: https://github.com/nymtech/nym/pull/5764
[#5692]: https://github.com/nymtech/nym/pull/5692
[#5464]: https://github.com/nymtech/nym/pull/5464
[#5449]: https://github.com/nymtech/nym/pull/5449
## [2025.11-cheddar] (2025-06-10)
- No autoremoval of peers ([#5831])
-686
View File
@@ -1,686 +0,0 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Nym is a privacy platform that uses mixnet technology to protect against metadata surveillance. The platform consists of several key components:
- Mixnet nodes (mixnodes) for packet mixing
- Gateways (entry/exit points for the network)
- Clients for interacting with the network
- Network monitoring tools
- Validators for network consensus
- Various service providers and integrations
## Build Commands
### Rust Components
```bash
# Default build (debug)
cargo build
# Release build
cargo build --release
# Build a specific package
cargo build -p <package-name>
# Build main components
make build
# Build release versions of main binaries and contracts
make build-release
# Build specific binaries
make build-nym-cli
cargo build -p nym-node --release
cargo build -p nym-api --release
```
### Testing
```bash
# Run clippy, unit tests, and formatting
make test
# Run all tests including slow tests
make test-all
# Run clippy on all workspaces
make clippy
# Run unit tests for a specific package
cargo test -p <package-name>
# Run only expensive/ignored tests
cargo test --workspace -- --ignored
# Run API tests
dotenv -f envs/sandbox.env -- cargo test --test public-api-tests
# Run tests with specific log level
RUST_LOG=debug cargo test -p <package-name>
# Run specific test scripts
./nym-node/tests/test_apis.sh
./scripts/wireguard-exit-policy/exit-policy-tests.sh
```
### Linting and Formatting
```bash
# Run rustfmt on all code
make fmt
# Check formatting without modifying
cargo fmt --all -- --check
# Run clippy with all targets
cargo clippy --workspace --all-targets -- -D warnings
# TypeScript linting
yarn lint
yarn lint:fix
yarn types:lint:fix
# Check dependencies for security/licensing issues
cargo deny check
```
### WASM Components
```bash
# Build all WASM components
make sdk-wasm-build
# Build TypeScript SDK
yarn build:sdk
npx lerna run --scope @nymproject/sdk build --stream
# Build and test WASM components
make sdk-wasm
# Build specific WASM packages
cd wasm/client && make
cd wasm/mix-fetch && make
cd wasm/node-tester && make
```
### Contract Development
```bash
# Build all contracts
make contracts
# Build contracts in release mode
make build-release-contracts
# Generate contract schemas
make contract-schema
# Run wasm-opt on contracts
make wasm-opt-contracts
# Check contracts with cosmwasm-check
make cosmwasm-check-contracts
```
### Running Components
```bash
# Run nym-node as a mixnode
cargo run -p nym-node -- run --mode mixnode
# Run nym-node as a gateway
cargo run -p nym-node -- run --mode gateway
# Run the network monitor
cargo run -p nym-network-monitor
# Run the API server
cargo run -p nym-api
# Run with specific environment
dotenv -f envs/sandbox.env -- cargo run -p nym-api
# Start a local network
./scripts/localnet_start.sh
```
## Architecture
The Nym platform consists of various components organized as a monorepo:
1. **Core Mixnet Infrastructure**:
- `nym-node`: Core binary supporting mixnode and gateway modes
- `common/nymsphinx`: Implementation of the Sphinx packet format
- `common/topology`: Network topology management
- `common/types`: Shared data types across components
2. **Network Monitoring**:
- `nym-network-monitor`: Monitors the network's reliability and performance
- `nym-api`: API server for network stats and monitoring data
- Metrics tracking for nodes, routes, and overall network health
3. **Client Implementations**:
- `clients/native`: Native Rust client implementation
- `clients/socks5`: SOCKS5 proxy client for standard applications
- `wasm`: WebAssembly client implementations (for browsers)
- `nym-connect`: Desktop and mobile clients
4. **Blockchain & Smart Contracts**:
- `common/cosmwasm-smart-contracts`: Smart contract implementations
- `contracts`: CosmWasm contracts for the Nym network
- `common/ledger`: Blockchain integration
5. **Utilities & Tools**:
- `tools`: Various CLI tools and utilities
- `sdk`: SDKs for different languages and platforms
- `documentation`: Documentation generation and management
## Packet System
Nym uses a modified Sphinx packet format for its mixnet:
1. **Message Chunking**:
- Messages are divided into "sets" and "fragments"
- Each fragment fits in a single Sphinx packet
- The `common/nymsphinx/chunking` module handles message fragmentation
2. **Routing**:
- Packets traverse through 3 layers of mixnodes
- Routing information is encrypted in layers (onion routing)
- The final gateway receives and processes the messages
3. **Monitoring**:
- Monitoring system tracks packet delivery through the network
- Routes are analyzed for reliability statistics
- Node performance metrics are collected
## Network Protocol
Nym implements the Loopix mixnet design with several key privacy features:
1. **Continuous-time Mixing**:
- Each mixnode delays messages independently with an exponential distribution
- This creates random reordering of packets, destroying timing correlations
- Offers better anonymity properties than batch mixing approaches
2. **Cover Traffic**:
- Clients and nodes generate dummy "loop" packets that circulate through the network
- These packets are indistinguishable from real traffic
- Creates a baseline level of traffic that hides actual communication patterns
- Provides unobservability (hiding when and how much real traffic is being sent)
3. **Stratified Network Architecture**:
- Traffic flows through Entry Gateway → 3 Mixnode Layers → Exit Gateway
- Path selection is independent per-message (unlike Tor)
- Each node connects only to adjacent layers
4. **Anonymous Replies**:
- Single-Use Reply Blocks (SURBs) allow receiving messages without revealing identity
- Enables bidirectional communication while maintaining privacy
## Network Monitoring Architecture
The network monitoring system is a core component that measures mixnet reliability:
1. The `nym-network-monitor` sends test packets through the network
2. These packets follow predefined routes through multiple mixnodes
3. Metrics are collected about:
- Successful and failed packet deliveries
- Node reliability (percentage of successful packet handling)
- Route reliability (which specific route combinations work best)
4. Results are stored in the database and used by `nym-api` to:
- Present node performance statistics
- Determine network rewards
- Provide route selection guidance to clients
In the current branch, metrics collection is being enhanced with a fanout approach to submit to multiple API endpoints.
## Development Environment
### Required Dependencies
- Rust toolchain (stable, 1.80+)
- Node.js (v20+) and yarn for TypeScript components
- SQLite for local database development
- PostgreSQL for API database (optional, for full API functionality)
- CosmWasm tools for contract development
- For building contracts: `wasm-opt` tool from `binaryen`
- Python 3.8+ for some scripts
- Docker (optional, for containerized development)
- protoc (Protocol Buffers compiler) for some components
### Environment Configurations
The `envs/` directory contains pre-configured environments:
#### Available Environments
- **`local.env`**: Local development environment
- Points to local services (localhost)
- Uses test mnemonics and keys
- Ideal for testing without external dependencies
- **`sandbox.env`**: Sandbox test network
- Public test network with real nodes
- Test tokens available from faucet
- Contract addresses for sandbox deployment
- API: https://sandbox-nym-api1.nymtech.net
- **`mainnet.env`**: Production mainnet
- Real network with real tokens
- Production contract addresses
- API: https://validator.nymtech.net
- Use with caution!
- **`canary.env`**: Canary deployment
- Pre-release testing environment
- Tests new features before mainnet
- **`mainnet-local-api.env`**: Hybrid environment
- Uses mainnet contracts but local API
- Useful for API development against mainnet data
#### Key Environment Variables
```bash
# Network configuration
NETWORK_NAME=sandbox # Network identifier
BECH32_PREFIX=n # Address prefix (n for sandbox, n for mainnet)
NYM_API=https://sandbox-nym-api1.nymtech.net/api
NYXD=https://rpc.sandbox.nymtech.net
NYM_API_NETWORK=sandbox
# Contract addresses (network-specific)
MIXNET_CONTRACT_ADDRESS=n1xr3rq8yvd7qplsw5yx90ftsr2zdhg4e9z60h5duusgxpv72hud3sjkxkav
VESTING_CONTRACT_ADDRESS=n1unyuj8qnmygvzuex3dwmg9yzt9alhvyeat0uu0jedg2wj33efl5qackslz
# ... other contract addresses
# Mnemonic for testing (NEVER use in production)
MNEMONIC="clutch captain shoe salt awake harvest setup primary inmate ugly among become"
# API Keys and tokens
IPINFO_API_TOKEN=your_token_here
AUTHENTICATOR_PASSWORD=password_here
# Logging
RUST_LOG=info # Options: error, warn, info, debug, trace
RUST_BACKTRACE=1 # Enable backtraces
# Database
DATABASE_URL=postgresql://user:pass@localhost/nym_api
```
#### Using Environment Files
```bash
# Load environment and run command
dotenv -f envs/sandbox.env -- cargo run -p nym-api
# Export to shell
source envs/sandbox.env
# Use with make targets
dotenv -f envs/sandbox.env -- make run-api-tests
```
## Initial Setup
### First Time Setup
1. **Install Prerequisites**
```bash
# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Install Node.js and yarn
# Via nvm (recommended):
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh | bash
nvm install 20
npm install -g yarn
# Install build tools
# Ubuntu/Debian:
sudo apt-get install build-essential pkg-config libssl-dev protobuf-compiler libpq-dev
# macOS:
brew install protobuf postgresql
# Install wasm-opt for contract builds
npm install -g wasm-opt
# Add wasm target for Rust
rustup target add wasm32-unknown-unknown
```
2. **Clone and Setup Repository**
```bash
git clone https://github.com/nymtech/nym.git
cd nym/nym
# Install JavaScript dependencies
yarn install
# Build the project
make build
```
3. **Database Setup (Optional, for API development)**
```bash
# Install PostgreSQL
# Create database
createdb nym_api
# Run migrations (from nym-api directory)
cd nym-api
sqlx migrate run
```
### Quick Start
```bash
# Run a mixnode locally
dotenv -f envs/sandbox.env -- cargo run -p nym-node -- run --mode mixnode --id my-mixnode
# Run a gateway locally
dotenv -f envs/sandbox.env -- cargo run -p nym-node -- run --mode gateway --id my-gateway
# Run the API server
dotenv -f envs/sandbox.env -- cargo run -p nym-api
# Run a client
cargo run -p nym-client -- init --id my-client
cargo run -p nym-client -- run --id my-client
```
## CI/CD Pipeline
The project uses GitHub Actions for CI/CD with several key workflows:
1. **Build and Test**:
- `ci-build.yml`: Main build workflow for Rust components
- Tests are run on multiple platforms (Linux, Windows, macOS)
- Includes formatting check (rustfmt) and linting (clippy)
2. **Release Process**:
- Binary artifacts are published on release tags
- Multiple platform builds are created
3. **Documentation**:
- Documentation is automatically built and deployed
## Database Structure
The system uses SQLite databases with tables like:
- `mixnode_status`: Status information about mixnodes
- `gateway_status`: Status information about gateways
- `routes`: Route performance information (success/failure of specific paths)
- `monitor_run`: Information about monitoring test runs
## Development Workflows
### Running a Node
To run the mixnode or gateway:
```bash
# Run nym-node as a mixnode with specified identity
cargo run -p nym-node -- run --mode mixnode --id my-mixnode
# Run nym-node as a gateway
cargo run -p nym-node -- run --mode gateway --id my-gateway
```
### Configuration
Nodes can be configured with files in various locations:
- Command-line arguments
- Environment variables
- `.env` files specified with `--config-env-file`
### Monitoring
To monitor the health of your node:
- View logs for real-time information
- Use the node's HTTP API for status information
- Check the explorer for public node statistics
## Common Libraries
- `common/types`: Shared data types across all components
- `common/crypto`: Cryptographic primitives and wrappers
- `common/client-core`: Core client functionality
- `common/gateway-client`: Client-gateway communication
- `common/task`: Task management and concurrency utilities
- `common/nymsphinx`: Sphinx packet implementation for mixnet
- `common/topology`: Network topology management
- `common/credentials`: Credential system for privacy-preserving authentication
- `common/bandwidth-controller`: Bandwidth management and accounting
## Code Conventions
- Error handling: Use anyhow/thiserror for structured error handling
- Logging: Use the tracing framework for logging and diagnostics
- State management: Generally use Tokio/futures for async code
- Configuration: Use the config crate and env vars with defaults
- Database: Use sqlx for type-safe database queries
- Follow clippy recommendations and rustfmt formatting
- Use semantic commit messages: feat, fix, docs, refactor, test, chore
## When Making Changes
- Run `make test` before submitting PRs
- Follow Rust naming conventions
- Use `clippy` to check for common issues
- Update SQLx query caches when modifying DB queries: `cargo sqlx prepare`
- Consider backward compatibility for protocol changes
- Use lefthook pre-commit hooks for TypeScript formatting
- Run `cargo deny check` to verify dependency compliance
- Test against both sandbox and local environments when possible
- Update relevant documentation and CHANGELOG.md
## Development Tools
### Useful Cargo Commands
```bash
# Check for outdated dependencies
cargo outdated
# Analyze binary size
cargo bloat --release -p nym-node
# Generate dependency graph
cargo tree -p nym-api
# Run with instrumentation
cargo run --features profiling -p nym-node
# Check for security advisories
cargo audit
```
### Database Tools
```bash
# SQLx CLI for migrations
cargo install sqlx-cli
# Create new migration
cd nym-api && sqlx migrate add <migration_name>
# Prepare query metadata for offline compilation
cargo sqlx prepare --workspace
# View database schema
./nym-api/enter_db.sh
```
### Development Scripts
- `scripts/build_topology.py`: Generate network topology files
- `scripts/node_api_check.py`: Verify node API endpoints
- `scripts/network_tunnel_manager.sh`: Manage network tunnels
- `scripts/localnet_start.sh`: Start a local test network
- Various deployment scripts in `deployment/` for different environments
## Debugging
- Enable more verbose logging with the RUST_LOG environment variable:
```
RUST_LOG=debug,nym_node=trace cargo run -p nym-node -- run --mode mixnode
```
- Use the HTTP API endpoints for status information
- Check monitoring data in the database for network performance metrics
- For complex issues, use tracing tools to follow packet flow
- Enable backtraces: `RUST_BACKTRACE=full`
- For WASM debugging: Use browser developer tools with source maps
## Deployment and Advanced Configurations
### Deployment Structure
The `deployment/` directory contains Ansible playbooks and configurations for various deployment scenarios:
- **`aws/`**: AWS-specific deployment configurations
- **`mixnode/`**: Mixnode deployment playbooks
- **`gateway/`**: Gateway deployment playbooks
- **`validator/`**: Validator node deployment
- **`sandbox-v2/`**: Complete sandbox environment setup
- **`big-dipper-2/`**: Block explorer deployment
### Sandbox V2 Deployment
The sandbox-v2 deployment (`deployment/sandbox-v2/`) provides a complete test environment:
```bash
# Key playbooks:
- deploy.yaml # Main deployment orchestrator
- deploy-mixnodes.yaml # Deploy mixnodes
- deploy-gateways.yaml # Deploy gateways
- deploy-validators.yaml # Deploy validator nodes
- deploy-nym-api.yaml # Deploy API services
```
### Custom Environment Setup
To create a custom environment:
1. Copy an existing env file: `cp envs/sandbox.env envs/custom.env`
2. Modify the network endpoints and contract addresses
3. Update the `NETWORK_NAME` to your identifier
4. Set appropriate mnemonics and keys (use fresh ones for production!)
### Contract Addresses
Contract addresses are network-specific and defined in environment files:
- Mixnet contract: Manages mixnode/gateway registry
- Vesting contract: Handles token vesting schedules
- Coconut contracts: Privacy-preserving credentials
- Name service: Human-readable address mapping
- Ecash contract: Electronic cash functionality
### Local Network Setup
For a completely local network:
```bash
# Start local chain
./scripts/localnet_start.sh
# Deploy contracts
cd contracts
make deploy-local
# Start nodes with local config
dotenv -f envs/local.env -- cargo run -p nym-node -- run --mode mixnode
```
## Common Issues and Troubleshooting
### Database Issues
- When modifying database queries, you must update SQLx query caches:
```bash
cargo sqlx prepare
```
- If you see SQLx errors about missing query files, this is likely the cause
- For "database is locked" errors with SQLite, ensure only one process accesses the DB
- For PostgreSQL connection issues, verify DATABASE_URL and that the server is running
### API Connection Issues
- Check the environment variables pointing to the APIs (NYM_API, NYXD)
- Verify network connectivity and API health endpoints
- For authentication issues, check node keys and credentials
- Common endpoints to verify:
- API health: `$NYM_API/health`
- Chain status: `$NYXD/status`
- Contract info: `$NYXD/cosmwasm/wasm/v1/contract/$CONTRACT_ADDRESS`
### Build Problems
- Clean dependencies with `cargo clean` for a fresh build
- Check for compatible Rust version (1.80+ recommended)
- For smart contract builds, ensure wasm-opt is installed: `npm install -g wasm-opt`
- For cross-compilation issues, check target-specific dependencies
- WASM build issues: Ensure wasm32-unknown-unknown target is installed:
```bash
rustup target add wasm32-unknown-unknown
```
- For "cannot find -lpq" errors, install PostgreSQL development files:
```bash
# Ubuntu/Debian
sudo apt-get install libpq-dev
# macOS
brew install postgresql
```
### Environment Issues
- Contract address mismatches: Ensure you're using the correct environment file
- "Account sequence mismatch": The account nonce is out of sync, wait and retry
- Token decimal issues: Sandbox uses different decimal places than mainnet
- API version mismatches: Ensure your local API version matches the network
- "Insufficient funds": Get test tokens from faucet (sandbox) or check balance
- Gateway/mixnode bonding issues: Verify minimum stake requirements
## Working with Routes and Monitoring
1. Route monitoring metrics are stored in a `routes` table with:
- Layer node IDs (layer1, layer2, layer3, gw)
- Success flag (boolean)
- Timestamp
2. To analyze routes:
- Check `NetworkAccount` and `AccountingRoute` in `nym-network-monitor/src/accounting.rs`
- View monitoring logic in `common/nymsphinx/chunking/monitoring.rs`
- Observe how routes are submitted to the database in the `submit_accounting_routes_to_db` function
## Performance Optimization
### Profiling and Benchmarking
```bash
# Run benchmarks
cargo bench -p nym-node
# Profile with perf (Linux)
cargo build --release --features profiling
perf record --call-graph=dwarf ./target/release/nym-node run --mode mixnode
perf report
# Generate flamegraph
cargo install flamegraph
cargo flamegraph --bin nym-node -- run --mode mixnode
```
### Common Performance Considerations
- Use bounded channels for backpressure
- Batch database operations where possible
- Monitor memory usage with `RUST_LOG=nym_node::metrics=debug`
- Use connection pooling for database connections
- Consider using `jemalloc` for better memory allocation performance
Generated
+2312 -1581
View File
File diff suppressed because it is too large Load Diff
+5 -13
View File
@@ -39,8 +39,7 @@ members = [
"common/cosmwasm-smart-contracts/ecash-contract",
"common/cosmwasm-smart-contracts/group-contract",
"common/cosmwasm-smart-contracts/mixnet-contract",
"common/cosmwasm-smart-contracts/multisig-contract",
"common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/multisig-contract", "common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/nym-pool-contract",
"common/cosmwasm-smart-contracts/vesting-contract",
"common/credential-storage",
@@ -50,8 +49,6 @@ members = [
"common/credentials-interface",
"common/crypto",
"common/dkg",
"common/ecash-signer-check",
"common/ecash-signer-check-types",
"common/ecash-time",
"common/execute",
"common/exit-policy",
@@ -92,7 +89,7 @@ members = [
"common/socks5/requests",
"common/statistics",
"common/store-cipher",
"common/task", "common/test-utils",
"common/task",
"common/ticketbooks-merkle",
"common/topology",
"common/tun",
@@ -103,7 +100,6 @@ members = [
"common/wasm/utils",
"common/wireguard",
"common/wireguard-types",
"common/zulip-client",
"documentation/autodoc",
"gateway",
"nym-api",
@@ -222,7 +218,7 @@ clap_complete_fig = "4.5"
colored = "2.2"
comfy-table = "7.1.4"
console = "0.15.11"
console-subscriber = "0.4.1"
console-subscriber = "0.1.1"
console_error_panic_hook = "0.1"
const-str = "0.5.6"
const_format = "0.2.34"
@@ -238,7 +234,6 @@ digest = "0.10.7"
dirs = "5.0"
doc-comment = "0.3"
dotenvy = "0.15.6"
dyn-clone = "1.0.19"
ecdsa = "0.16"
ed25519-dalek = "2.1"
encoding_rs = "0.8.35"
@@ -321,8 +316,8 @@ si-scale = "0.2.3"
snow = "0.9.6"
sphinx-packet = "=0.6.0"
sqlx = "0.8.6"
strum = "0.27.2"
strum_macros = "0.27.2"
strum = "0.26"
strum_macros = "0.26"
subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.33.0"
@@ -439,9 +434,6 @@ opt-level = 'z'
# lto = true
opt-level = 'z'
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[workspace.lints.clippy]
unwrap_used = "deny"
expect_used = "deny"
+10 -58
View File
@@ -12,11 +12,7 @@ help:
@echo " clippy: run clippy for all workspaces"
@echo " test: run clippy, unit tests, and formatting."
@echo " test-all: like test, but also includes the expensive tests"
@echo " deb: build debian packages"
@echo ""
@echo "Contract building targets:"
@echo " contracts: build contracts for development (includes wasm-opt)"
@echo " publish-contracts: build contracts using Docker optimizer (deterministic)"
@echo " deb: build debian packages
# -----------------------------------------------------------------------------
# Meta targets
@@ -134,69 +130,25 @@ cargo-test: sdk-wasm-test
clippy: sdk-wasm-lint
# -----------------------------------------------------------------------------
# Build CosmWasm contracts (deterministic docker build)
# Build contracts ready for deploy
# -----------------------------------------------------------------------------
CONTRACTS=vesting_contract mixnet_contract nym_ecash cw3_flex_multisig cw4_group nym_coconut_dkg nym_pool_contract nym_performance_contract
CONTRACTS_WASM=$(addsuffix .wasm, $(CONTRACTS))
CONTRACTS_OUT_DIR=contracts/target/wasm32-unknown-unknown/release
WASM_CONTRACT_DIR := contracts/target/wasm32-unknown-unknown/release
# Find every direct contract folder that contains a Cargo.toml
CONTRACT_DIRS := $(shell find contracts -type f -name Cargo.toml \( ! -path "contracts/Cargo.toml" \) | grep -v integration-tests | xargs -n1 dirname | sort -u)
CONTRACTS_OUT_DIR = contracts/artifacts
# Build all contracts via the official CosmWasm optimizer image (one invocation per contract)
# See : https://github.com/CosmWasm/optimizer?tab=readme-ov-file#contracts-excluded-from-workspace
# The optimizer ships separate multi-arch images. ARM builds are *not* bit-for-bit identical to the
# canonical x86_64 build (see README notice in CosmWasm/optimizer). For reproducible artefacts we
# therefore always run the amd64 variant by default.
# Override with :
# $ COSMWASM_OPTIMIZER_IMAGE=cosmwasm/optimizer-arm64:0.17.0 make contracts-publish
#
COSMWASM_OPTIMIZER_IMAGE ?= cosmwasm/optimizer:0.17.0
COSMWASM_OPTIMIZER_PLATFORM ?= linux/amd64
# Ensure clean build environment and run the optimizer
optimize-contracts:
@rm -rf artifacts 2>/dev/null || true
@echo "=== Ensuring clean build environment"
docker volume rm nym_contracts_cache 2>/dev/null || true
docker volume rm registry_cache 2>/dev/null || true
@for DIR in $(CONTRACT_DIRS); do \
echo "=== Optimizing $${DIR}"; \
docker run --rm --platform $(COSMWASM_OPTIMIZER_PLATFORM) \
-v $(CURDIR):/code \
--mount type=volume,source=nym_contracts_cache,target=/target \
--mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
-e CARGO_BUILD_INCREMENTAL=false \
-e RUSTFLAGS="-C target-cpu=generic -C debuginfo=0" \
-e SOURCE_DATE_EPOCH=1 \
$(COSMWASM_OPTIMIZER_IMAGE) $${DIR}; \
done
@mkdir -p $(CONTRACTS_OUT_DIR)
@cp artifacts/*.wasm $(CONTRACTS_OUT_DIR)/ 2>/dev/null || true
@cd $(CONTRACTS_OUT_DIR) && sha256sum *.wasm > checksums.txt
# Cleanup temporary artefacts directory
@rm -rf artifacts 2>/dev/null || true
contracts: build-release-contracts wasm-opt-contracts cosmwasm-check-contracts
wasm-opt-contracts:
@for WASM in $(WASM_CONTRACT_DIR)/*.wasm; do \
echo "Running wasm-opt on $$WASM"; \
wasm-opt --signext-lowering -Os $$WASM -o $$WASM ; \
for contract in $(CONTRACTS_WASM); do \
wasm-opt --signext-lowering -Os $(CONTRACTS_OUT_DIR)/$$contract -o $(CONTRACTS_OUT_DIR)/$$contract; \
done
cosmwasm-check-contracts:
@for WASM in $(WASM_CONTRACT_DIR)/*.wasm; do \
echo "Checking $$WASM"; \
cosmwasm-check $$WASM ; \
for contract in $(CONTRACTS_WASM); do \
cosmwasm-check $(CONTRACTS_OUT_DIR)/$$contract; \
done
# Default development build
contracts: build-release-contracts wasm-opt-contracts cosmwasm-check-contracts
# Publishing build used by CI deterministic Docker optimiser
publish-contracts: optimize-contracts cosmwasm-check-contracts
# Consider adding 's' to make plural consistent (beware: used in github workflow)
contract-schema:
$(MAKE) -C contracts schema
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.61"
version = "1.1.57"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -111,7 +111,7 @@ impl SocketClient {
let dkg_query_client = if self.config.base.client.disabled_credentials_mode {
None
} else {
Some(default_query_dkg_client_from_config(&self.config.base)?)
Some(default_query_dkg_client_from_config(&self.config.base))
};
let storage = self.initialise_storage().await?;
+1 -1
View File
@@ -318,7 +318,7 @@ impl Handler {
async fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
debug!("Handling text message request");
trace!("Content: {msg:?}");
trace!("Content: {:?}", msg);
self.received_response_type = ReceivedResponseType::Text;
let client_request = ClientRequest::try_from_text(msg);
+2 -2
View File
@@ -68,9 +68,9 @@ impl Listener {
new_conn = tcp_listener.accept() => {
match new_conn {
Ok((mut socket, remote_addr)) => {
debug!("Received connection from {remote_addr:?}");
debug!("Received connection from {:?}", remote_addr);
if self.state.is_connected() {
warn!("Tried to open a duplicate websocket connection. The request came from {remote_addr}");
warn!("Tried to open a duplicate websocket connection. The request came from {}", remote_addr);
// if we've already got a connection, don't allow another one
// while we only ever want to accept a single connection, we don't want
// to leave clients hanging (and also allow for reconnection if it somehow
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.61"
version = "1.1.57"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
+1 -1
View File
@@ -137,7 +137,7 @@ impl AsyncFileWatcher {
log::error!("the file watcher receiver has been dropped!");
}
} else {
log::debug!("will not propagate information about {event:?}");
log::debug!("will not propagate information about {:?}", event);
}
}
Err(err) => {
@@ -28,6 +28,8 @@ pub type HmacSha256 = Hmac<Sha256>;
pub type Nonce = u64;
pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: u64 = 250 * 1024 * 1024 * 1024; // 250 GB
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct IpPair {
pub ipv4: Ipv4Addr,
+1 -1
View File
@@ -11,7 +11,7 @@ impl std::fmt::Display for BandwidthStatusMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BandwidthStatusMessage::RemainingBandwidth(b) => {
write!(f, "remaining bandwidth: {b}")
write!(f, "remaining bandwidth: {}", b)
}
BandwidthStatusMessage::NoBandwidth => write!(f, "no bandwidth left"),
}
+2 -2
View File
@@ -207,7 +207,7 @@ where
<St as Storage>::StorageError: Send + Sync + 'static,
{
if let Some(stored) = storage
.get_expiration_date_signatures(expiration_date, epoch_id)
.get_expiration_date_signatures(expiration_date)
.await
.map_err(BandwidthControllerError::credential_storage_error)?
{
@@ -220,7 +220,7 @@ where
ecash_apis,
|api| async move {
api.api_client
.global_expiration_date_signatures(Some(expiration_date), Some(epoch_id))
.global_expiration_date_signatures(Some(expiration_date))
.await
},
format!("aggregated coin index signatures for date {expiration_date}"),
+5 -6
View File
@@ -13,10 +13,10 @@ async-trait = { workspace = true }
base64 = { workspace = true }
bs58 = { workspace = true }
clap = { workspace = true, optional = true }
cfg-if = { workspace = true }
comfy-table = { workspace = true, optional = true }
futures = { workspace = true }
humantime = { workspace = true }
humantime-serde = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
rand_chacha = { workspace = true }
serde = { workspace = true, features = ["derive"] }
@@ -25,18 +25,20 @@ sha2 = { workspace = true }
si-scale = { workspace = true }
thiserror = { workspace = true }
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["macros"] }
time = { workspace = true }
tokio = { workspace = true, features = ["sync", "macros"] }
tracing = { workspace = true }
zeroize = { workspace = true }
# internal
nym-id = { path = "../nym-id" }
nym-bandwidth-controller = { path = "../bandwidth-controller" }
nym-config = { path = "../config" }
nym-crypto = { path = "../crypto" }
nym-gateway-client = { path = "../client-libs/gateway-client" }
nym-gateway-requests = { path = "../gateway-requests" }
nym-http-api-client = { path = "../http-api-client" }
nym-metrics = { path = "../nym-metrics" }
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
@@ -124,6 +126,3 @@ fs-surb-storage = ["nym-client-core-surb-storage/fs-surb-storage"]
fs-gateways-storage = ["nym-client-core-gateways-storage/fs-gateways-storage"]
wasm = ["nym-gateway-client/wasm"]
metrics-server = []
[lints]
workspace = true
+12 -8
View File
@@ -57,7 +57,9 @@ const DEFAULT_MAXIMUM_ALLOWED_SURB_REQUEST_SIZE: u32 = 500;
const DEFAULT_MAXIMUM_REPLY_SURB_REREQUEST_WAITING_PERIOD: Duration = Duration::from_secs(10);
const DEFAULT_MAXIMUM_REPLY_SURB_DROP_WAITING_PERIOD: Duration = Duration::from_secs(5 * 60);
const DEFAULT_MAXIMUM_REPLY_SURB_REREQUESTS: usize = 5;
// 12 hours
const DEFAULT_MAXIMUM_REPLY_SURB_AGE: Duration = Duration::from_secs(12 * 60 * 60);
// 24 hours
const DEFAULT_MAXIMUM_REPLY_KEY_AGE: Duration = Duration::from_secs(24 * 60 * 60);
@@ -416,9 +418,6 @@ pub struct Traffic {
/// will be routed as usual, to the entry gateway, through three mix nodes, egressing
/// through the exit gateway. If mix hops are disabled, traffic will be routed directly
/// from the entry gateway to the exit gateway, bypassing the mix nodes.
///
/// This overrides the `use_legacy_sphinx_format` setting as reduced mix hops
/// requires use of the updated SURB packet format.
pub disable_mix_hops: bool,
}
@@ -626,9 +625,10 @@ pub struct ReplySurbs {
#[serde(with = "humantime_serde")]
pub maximum_reply_surb_drop_waiting_period: Duration,
/// Defines maximum number of times the client is going to re-request reply surbs
/// for clearing pending messages before giving up after making no progress.
pub maximum_reply_surbs_rerequests: usize,
/// Defines maximum amount of time given reply surb is going to be valid for.
/// This is going to be superseded by key rotation once implemented.
#[serde(with = "humantime_serde")]
pub maximum_reply_surb_age: Duration,
/// Defines maximum amount of time given reply key is going to be valid for.
/// This is going to be superseded by key rotation once implemented.
@@ -638,6 +638,9 @@ pub struct ReplySurbs {
/// Specifies the number of mixnet hops the packet should go through. If not specified, then
/// the default value is used.
pub surb_mix_hops: Option<u8>,
/// Specifies if we should reset all the sender tags on startup
pub fresh_sender_tags: bool,
}
impl Default for ReplySurbs {
@@ -652,9 +655,10 @@ impl Default for ReplySurbs {
maximum_reply_surb_rerequest_waiting_period:
DEFAULT_MAXIMUM_REPLY_SURB_REREQUEST_WAITING_PERIOD,
maximum_reply_surb_drop_waiting_period: DEFAULT_MAXIMUM_REPLY_SURB_DROP_WAITING_PERIOD,
maximum_reply_surbs_rerequests: DEFAULT_MAXIMUM_REPLY_SURB_REREQUESTS,
maximum_reply_surb_age: DEFAULT_MAXIMUM_REPLY_SURB_AGE,
maximum_reply_key_age: DEFAULT_MAXIMUM_REPLY_KEY_AGE,
surb_mix_hops: None,
fresh_sender_tags: false,
}
}
}
@@ -189,13 +189,14 @@ impl From<ConfigV6> for Config {
.debug
.reply_surbs
.maximum_reply_surb_drop_waiting_period,
maximum_reply_surb_age: value.debug.reply_surbs.maximum_reply_surb_age,
maximum_reply_key_age: value.debug.reply_surbs.maximum_reply_key_age,
surb_mix_hops: value.debug.reply_surbs.surb_mix_hops,
minimum_reply_surb_threshold_buffer: value
.debug
.reply_surbs
.minimum_reply_surb_threshold_buffer,
..Default::default()
fresh_sender_tags: value.debug.reply_surbs.fresh_sender_tags,
},
stats_reporting: StatsReporting {
enabled: value.debug.stats_reporting.enabled,
@@ -9,11 +9,11 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
cosmrs.workspace = true
log.workspace = true
serde = { workspace = true, features = ["derive"] }
thiserror.workspace = true
time.workspace = true
tokio = { workspace = true, features = ["sync"] }
tracing.workspace = true
url.workspace = true
zeroize = { workspace = true, features = ["zeroize_derive"] }
@@ -7,12 +7,12 @@ use crate::{
RawActiveGateway, RawCustomGatewayDetails, RawRegisteredGateway, RawRemoteGatewayDetails,
},
};
use log::{debug, error};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use tracing::{debug, error};
#[derive(Debug, Clone)]
pub struct StorageManager {
@@ -12,12 +12,12 @@ use crate::{
error::ClientCoreError,
init::types::{GatewaySelectionSpecification, GatewaySetup},
};
use log::info;
use nym_client_core_gateways_storage::GatewayDetails;
use nym_crypto::asymmetric::ed25519;
use nym_topology::NymTopology;
use nym_validator_client::UserAgent;
use std::path::PathBuf;
use tracing::info;
#[cfg_attr(feature = "cli", derive(clap::Args))]
#[derive(Debug, Clone)]
@@ -81,14 +81,14 @@ where
// Attempt to use a user-provided gateway, if possible
let user_chosen_gateway_id = common_args.gateway_id;
tracing::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
log::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
let selection_spec = GatewaySelectionSpecification::new(
user_chosen_gateway_id.map(|id| id.to_base58_string()),
Some(common_args.latency_based_selection),
common_args.force_tls_gateway,
);
tracing::debug!("Gateway selection specification: {selection_spec:?}");
log::debug!("Gateway selection specification: {selection_spec:?}");
let registered_gateways = get_all_registered_identities(&details_store).await?;
@@ -58,7 +58,6 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.signatures_path.unwrap())?
}
};
@@ -64,7 +64,6 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.credential_path.unwrap())?
}
};
@@ -58,7 +58,6 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.signatures_path.unwrap())?
}
};
@@ -58,7 +58,6 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.key_path.unwrap())?
}
};
@@ -12,6 +12,7 @@ use crate::{
},
init::types::{GatewaySelectionSpecification, GatewaySetup, InitResults},
};
use log::info;
use nym_client_core_gateways_storage::GatewayDetails;
use nym_crypto::asymmetric::ed25519;
use nym_sphinx::addressing::Recipient;
@@ -19,7 +20,6 @@ use nym_topology::NymTopology;
use nym_validator_client::UserAgent;
use rand::rngs::OsRng;
use std::path::PathBuf;
use tracing::info;
// we can suppress this warning (as suggested by linter itself) since we're only using it in our own code
#[allow(async_fn_in_trait)]
@@ -130,23 +130,23 @@ where
// Attempt to use a user-provided gateway, if possible
let user_chosen_gateway_id = common_args.gateway;
tracing::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
log::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
let selection_spec = GatewaySelectionSpecification::new(
user_chosen_gateway_id.map(|id| id.to_base58_string()),
Some(common_args.latency_based_selection),
common_args.force_tls_gateway,
);
tracing::debug!("Gateway selection specification: {selection_spec:?}");
log::debug!("Gateway selection specification: {selection_spec:?}");
// Load and potentially override config
tracing::debug!("Init arguments: {init_args:#?}");
log::debug!("Init arguments: {init_args:#?}");
let config = C::construct_config(&init_args);
tracing::debug!("Constructed config: {config:#?}");
log::debug!("Constructed config: {config:#?}");
let paths = config.common_paths();
let core = config.core_config();
tracing::info!(
log::info!(
"Using nym-api: {}",
core.client
.nym_api_urls
@@ -18,7 +18,6 @@ use crate::client::received_buffer::{
ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
};
use crate::client::replies::reply_controller;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyControllerSender};
use crate::client::replies::reply_storage::{
CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys,
@@ -35,6 +34,7 @@ use crate::init::{
};
use crate::{config, spawn_future};
use futures::channel::mpsc;
use log::*;
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_config_types::{ForgetMe, RememberMe};
use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
@@ -56,18 +56,13 @@ use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender,
use nym_task::{TaskClient, TaskHandle};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, NymApiClient, UserAgent};
use rand::prelude::SliceRandom;
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use rand::rngs::OsRng;
use rand::thread_rng;
use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use std::path::Path;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use tracing::*;
use url::Url;
#[cfg(all(
@@ -135,11 +130,9 @@ pub enum ClientInputStatus {
}
impl ClientInputStatus {
#[allow(clippy::panic)]
pub fn register_producer(&mut self) -> ClientInput {
match std::mem::replace(self, ClientInputStatus::Connected) {
ClientInputStatus::AwaitingProducer { client_input } => client_input,
// critical failure implying misuse of software
ClientInputStatus::Connected => panic!("producer was already registered before"),
}
}
@@ -151,11 +144,9 @@ pub enum ClientOutputStatus {
}
impl ClientOutputStatus {
#[allow(clippy::panic)]
pub fn register_consumer(&mut self) -> ClientOutput {
match std::mem::replace(self, ClientOutputStatus::Connected) {
ClientOutputStatus::AwaitingConsumer { client_output } => client_output,
// critical failure implying misuse of software
ClientOutputStatus::Connected => panic!("consumer was already registered before"),
}
}
@@ -347,7 +338,6 @@ where
#[allow(clippy::too_many_arguments)]
fn start_real_traffic_controller(
controller_config: real_messages_control::Config,
key_rotation_config: KeyRotationConfig,
topology_accessor: TopologyAccessor,
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
@@ -365,7 +355,6 @@ where
RealMessagesController::new(
controller_config,
key_rotation_config,
ack_receiver,
input_receiver,
mix_sender,
@@ -464,7 +453,7 @@ where
};
let gateway_failure = |err| {
tracing::error!("Could not authenticate and start up the gateway connection - {err}");
log::error!("Could not authenticate and start up the gateway connection - {err}");
ClientCoreError::GatewayClientError {
gateway_id: details.gateway_id.to_base58_string(),
source: Box::new(err),
@@ -566,14 +555,14 @@ where
custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
config_topology: config::Topology,
nym_api_urls: Vec<Url>,
nym_api_client: NymApiClient,
user_agent: Option<UserAgent>,
) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or_else(|| {
Box::new(NymApiTopologyProvider::new(
config_topology,
nym_api_urls,
nym_api_client,
user_agent,
))
})
}
@@ -609,7 +598,7 @@ where
topology_refresher.try_refresh().await;
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
tracing::error!(
log::error!(
"The current network topology seem to be insufficient to route any packets through \
- check if enough nodes and a gateway are online - source: {err}"
);
@@ -685,40 +674,27 @@ where
// TODO: rename it as it implies the data is persistent whilst one can use InMemBackend
async fn setup_persistent_reply_storage(
backend: S::ReplyStore,
key_rotation_config: KeyRotationConfig,
shutdown: TaskClient,
) -> Result<CombinedReplyStorage, ClientCoreError>
where
<S::ReplyStore as ReplyStorageBackend>::StorageError: Sync + Send,
S::ReplyStore: Send + Sync,
{
tracing::trace!("Setup persistent reply storage");
let now = OffsetDateTime::now_utc();
let expected_current_key_rotation_start =
key_rotation_config.expected_current_key_rotation_start(now);
// time of the start of one epoch BEFORE the CURRENT rotation has begun
// this indicates the starting time of when packets with the current keys might have been constructed
// (i.e. any surbs OLDER than that MUST BE invalid)
let prior_epoch_start =
expected_current_key_rotation_start - key_rotation_config.epoch_duration;
log::trace!("Setup persistent reply storage");
let persistent_storage = PersistentReplyStorage::new(backend);
let mem_store = persistent_storage
.load_state_from_backend(prior_epoch_start)
.load_state_from_backend()
.await
.map_err(|err| ClientCoreError::SurbStorageError {
source: Box::new(err),
})?;
let store_clone = mem_store.clone();
spawn_future!(
async move {
persistent_storage
.flush_on_shutdown(store_clone, shutdown)
.await
},
"PersistentReplyStorage::flush_on_shutdown"
);
spawn_future(async move {
persistent_storage
.flush_on_shutdown(store_clone, shutdown)
.await
});
Ok(mem_store)
}
@@ -739,7 +715,7 @@ where
let mut rng = OsRng;
let keys = if let Some(derivation_material) = derivation_material {
ClientKeys::from_master_key(&mut rng, &derivation_material)
.map_err(|_| ClientCoreError::HkdfDerivationError)?
.map_err(|_| ClientCoreError::HkdfDerivationError {})?
} else {
ClientKeys::generate_new(&mut rng)
};
@@ -749,23 +725,6 @@ where
setup_gateway(setup_method, key_store, details_store).await
}
fn construct_nym_api_client(config: &Config, user_agent: Option<UserAgent>) -> NymApiClient {
let mut nym_api_urls = config.get_nym_api_endpoints();
nym_api_urls.shuffle(&mut thread_rng());
if let Some(user_agent) = user_agent {
NymApiClient::new_with_user_agent(nym_api_urls[0].clone(), user_agent)
} else {
NymApiClient::new(nym_api_urls[0].clone())
}
}
async fn determine_key_rotation_state(
client: &NymApiClient,
) -> Result<KeyRotationConfig, ClientCoreError> {
Ok(client.nym_api.get_key_rotation_info().await?.into())
}
pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
where
S::ReplyStore: Send + Sync,
@@ -830,14 +789,11 @@ where
.dkg_query_client
.map(|client| BandwidthController::new(credential_store, client));
let nym_api_client = Self::construct_nym_api_client(&self.config, self.user_agent.clone());
let key_rotation_config = Self::determine_key_rotation_state(&nym_api_client).await?;
let topology_provider = Self::setup_topology_provider(
self.custom_topology_provider.take(),
self.config.debug.topology,
self.config.get_nym_api_endpoints(),
nym_api_client,
self.user_agent.clone(),
);
let stats_reporter = Self::start_statistics_control(
@@ -882,7 +838,6 @@ where
let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
shutdown.fork("persistent_reply_storage"),
)
.await?;
@@ -923,7 +878,6 @@ where
Self::start_real_traffic_controller(
controller_config,
key_rotation_config,
shared_topology_accessor.clone(),
ack_receiver,
input_receiver,
@@ -7,13 +7,13 @@ use crate::{
config::Config,
error::ClientCoreError,
};
use log::{error, info, trace};
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
use std::{io, path::Path};
use time::OffsetDateTime;
use tracing::{error, info, trace};
use url::Url;
async fn setup_fresh_backend<P: AsRef<Path>>(
@@ -90,7 +90,7 @@ pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
let db_path = db_path.as_ref();
if db_path.exists() {
info!("Loading existing surb database");
match fs_backend::Backend::try_load(db_path).await {
match fs_backend::Backend::try_load(db_path, surb_config.fresh_sender_tags).await {
Ok(backend) => Ok(backend),
Err(err) => {
error!("setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
@@ -114,32 +114,41 @@ pub async fn setup_fs_gateways_storage<P: AsRef<Path>>(
})
}
pub fn create_bandwidth_controller_with_urls<St: CredentialStorage>(
nyxd_url: Url,
storage: St,
) -> Result<BandwidthController<QueryHttpRpcNyxdClient, St>, ClientCoreError> {
let client = default_query_dkg_client(nyxd_url)?;
Ok(BandwidthController::new(storage, client))
}
pub fn default_query_dkg_client_from_config(
pub fn create_bandwidth_controller<St: CredentialStorage>(
config: &Config,
) -> Result<QueryHttpRpcNyxdClient, ClientCoreError> {
storage: St,
) -> BandwidthController<QueryHttpRpcNyxdClient, St> {
let nyxd_url = config
.get_validator_endpoints()
.pop()
.ok_or(ClientCoreError::RpcClientMissingUrl)?;
.expect("No nyxd validator endpoint provided");
create_bandwidth_controller_with_urls(nyxd_url, storage)
}
pub fn create_bandwidth_controller_with_urls<St: CredentialStorage>(
nyxd_url: Url,
storage: St,
) -> BandwidthController<QueryHttpRpcNyxdClient, St> {
let client = default_query_dkg_client(nyxd_url);
BandwidthController::new(storage, client)
}
pub fn default_query_dkg_client_from_config(config: &Config) -> QueryHttpRpcNyxdClient {
let nyxd_url = config
.get_validator_endpoints()
.pop()
.expect("No nyxd validator endpoint provided");
default_query_dkg_client(nyxd_url)
}
pub fn default_query_dkg_client(nyxd_url: Url) -> Result<QueryHttpRpcNyxdClient, ClientCoreError> {
pub fn default_query_dkg_client(nyxd_url: Url) -> QueryHttpRpcNyxdClient {
let details = nym_network_defaults::NymNetworkDetails::new_from_env();
let client_config = nyxd::Config::try_from_nym_network_details(&details)
.map_err(|source| ClientCoreError::InvalidNetworkDetails { source })?;
.expect("failed to construct validator client config");
// overwrite env configuration with config URLs
QueryHttpRpcNyxdClient::connect(client_config, nyxd_url.as_str())
.map_err(|source| ClientCoreError::RpcClientCreationFailure { source })
.expect("Could not construct query client")
}
@@ -6,6 +6,7 @@ use crate::client::topology_control::TopologyAccessor;
use crate::{config, spawn_future};
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use log::*;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::cover::generate_loop_cover_packet;
@@ -18,7 +19,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::TrySendError;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
@@ -210,10 +210,10 @@ impl LoopCoverTrafficStream<OsRng> {
TrySendError::Full(_) => {
// This isn't a problem, if the channel is full means we're already sending the
// max amount of messages downstream can handle.
tracing::debug!("Failed to send cover message - channel full");
log::debug!("Failed to send cover message - channel full");
}
TrySendError::Closed(_) => {
tracing::warn!("Failed to send cover message - channel closed");
log::warn!("Failed to send cover message - channel closed");
}
}
} else {
@@ -235,7 +235,6 @@ impl LoopCoverTrafficStream<OsRng> {
tokio::task::yield_now().await;
}
#[allow(clippy::panic)]
pub fn start(mut self) {
if self.cover_traffic.disable_loop_cover_traffic_stream {
// we should have never got here in the first place - the task should have never been created to begin with
@@ -252,30 +251,27 @@ impl LoopCoverTrafficStream<OsRng> {
let mut shutdown = self.task_client.fork("select");
spawn_future!(
async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
spawn_future(async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
tracing::trace!("LoopCoverTrafficStream: Received shutdown");
}
next = self.next() => {
if next.is_some() {
self.on_new_message().await;
} else {
tracing::trace!("LoopCoverTrafficStream: Stopping since channel closed");
break;
}
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("LoopCoverTrafficStream: Received shutdown");
}
next = self.next() => {
if next.is_some() {
self.on_new_message().await;
} else {
log::trace!("LoopCoverTrafficStream: Stopping since channel closed");
break;
}
}
}
shutdown.recv_timeout().await;
tracing::debug!("LoopCoverTrafficStream: Exiting");
},
"LoopCoverTrafficStream"
)
}
shutdown.recv_timeout().await;
log::debug!("LoopCoverTrafficStream: Exiting");
})
}
}
@@ -135,9 +135,7 @@ impl InputMessage {
recipient_tag,
data,
lane,
// \/ set it to SOME sane default so that if we run out of surbs and constantly
// fail to request more, we wouldn't be stuck in limbo
max_retransmissions: Some(10),
max_retransmissions: None,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -4,10 +4,10 @@
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::error::ClientCoreError;
use crate::spawn_future;
use log::*;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::TaskClient;
use tracing::*;
use transceiver::ErasedGatewayError;
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
@@ -96,93 +96,72 @@ impl MixTrafficController {
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
debug_assert!(!mix_packets.is_empty());
let send_future = if mix_packets.len() == 1 {
// SAFETY: we just checked we have one packet
#[allow(clippy::unwrap_used)]
let result = if mix_packets.len() == 1 {
let mix_packet = mix_packets.pop().unwrap();
self.gateway_transceiver.send_mix_packet(mix_packet)
self.gateway_transceiver.send_mix_packet(mix_packet).await
} else {
self.gateway_transceiver.batch_send_mix_packets(mix_packets)
self.gateway_transceiver
.batch_send_mix_packets(mix_packets)
.await
};
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling messages");
Ok(())
}
result = send_future => {
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
result
}
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
}
async fn on_client_request(&mut self, client_request: ClientRequest) {
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while handling client request");
}
result = self.gateway_transceiver.send_client_request(client_request) => {
if let Err(err) = result {
error!("Failed to send client request: {err}")
}
}
}
result
}
pub fn start(mut self) {
spawn_future!(
async move {
debug!("Started MixTrafficController with graceful shutdown support");
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");
while !self.task_client.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
},
None => {
log::trace!("MixTrafficController: Stopping since channel closed");
break;
}
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
},
None => {
tracing::trace!("MixTrafficController: Stopping since channel closed");
break;
}
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
self.on_client_request(client_request).await;
},
None => {
tracing::trace!("MixTrafficController, client request channel closed");
}
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
match self.gateway_transceiver.send_client_request(client_request).await {
Ok(_) => (),
Err(e) => error!("Failed to send client request: {}", e),
};
},
None => {
log::trace!("MixTrafficController, client request channel closed");
}
},
_ = self.task_client.recv() => {
log::trace!("MixTrafficController: Received shutdown");
break;
}
}
self.task_client.recv_timeout().await;
tracing::debug!("MixTrafficController: Exiting");
},
"MixTrafficController"
);
}
self.task_client.recv_timeout().await;
log::debug!("MixTrafficController: Exiting");
});
}
}
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use log::{debug, error};
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::error::GatewayClientError;
@@ -13,7 +14,6 @@ use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use thiserror::Error;
use tracing::{debug, error};
#[cfg(not(target_arch = "wasm32"))]
use futures::channel::oneshot;
@@ -27,7 +27,7 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
ErasedGatewayError(Box::new(err))
}
/// This combines the functionalities of being able to send and receive mix packets.
/// This combines combines the functionalities of being able to send and receive mix packets.
#[async_trait]
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
fn gateway_identity(&self) -> ed25519::PublicKey;
@@ -87,7 +87,7 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
message: ClientRequest,
) -> Result<(), GatewayClientError> {
let _ = (**self).send_client_request(message.clone()).await?;
tracing::debug!("Sent client request: {:?}", message);
log::debug!("Sent client request: {:?}", message);
Ok(())
}
}
@@ -269,8 +269,6 @@ pub struct MockGateway {
}
impl Default for MockGateway {
// test code
#[allow(clippy::unwrap_used)]
fn default() -> Self {
MockGateway {
dummy_identity: "3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7"
@@ -5,6 +5,7 @@ use super::action_controller::{AckActionSender, Action};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use futures::StreamExt;
use log::*;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::{
acknowledgements::{identifier::recover_identifier, AckKey},
@@ -12,7 +13,6 @@ use nym_sphinx::{
};
use nym_task::TaskClient;
use std::sync::Arc;
use tracing::*;
/// Module responsible for listening for any data resembling acknowledgements from the network
/// and firing actions to remove them from the 'Pending' state.
@@ -65,7 +65,7 @@ impl AcknowledgementListener {
return;
}
trace!("Received {frag_id} from the mix network");
trace!("Received {} from the mix network", frag_id);
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
if let Err(err) = self
@@ -93,16 +93,16 @@ impl AcknowledgementListener {
acks = self.ack_receiver.next() => match acks {
Some(acks) => self.handle_ack_receiver_item(acks).await,
None => {
tracing::trace!("AcknowledgementListener: Stopping since channel closed");
log::trace!("AcknowledgementListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("AcknowledgementListener: Received shutdown");
log::trace!("AcknowledgementListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("AcknowledgementListener: Exiting");
log::debug!("AcknowledgementListener: Exiting");
}
}
@@ -5,6 +5,7 @@ use super::PendingAcknowledgement;
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::Delay as SphinxDelay;
@@ -12,7 +13,6 @@ use nym_task::TaskClient;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tracing::*;
pub(crate) type AckActionSender = mpsc::UnboundedSender<Action>;
pub(crate) type AckActionReceiver = mpsc::UnboundedReceiver<Action>;
@@ -126,7 +126,7 @@ impl ActionController {
fn handle_insert(&mut self, pending_acks: Vec<PendingAcknowledgement>) {
for pending_ack in pending_acks {
let frag_id = pending_ack.message_chunk.fragment_identifier();
trace!("{frag_id} is inserted");
trace!("{} is inserted", frag_id);
if self
.pending_acks_data
@@ -161,16 +161,22 @@ impl ActionController {
let new_queue_key = self.pending_acks_timers.insert(frag_id, timeout);
*queue_key = Some(new_queue_key)
} else {
debug!("Tried to START TIMER on pending ack that is already gone! - {frag_id}");
debug!(
"Tried to START TIMER on pending ack that is already gone! - {}",
frag_id
);
}
}
fn handle_remove(&mut self, frag_id: FragmentIdentifier) {
trace!("{frag_id} is getting removed");
trace!("{} is getting removed", frag_id);
match self.pending_acks_data.remove(&frag_id) {
None => {
debug!("Tried to REMOVE pending ack that is already gone! - {frag_id}");
debug!(
"Tried to REMOVE pending ack that is already gone! - {}",
frag_id
);
}
Some((_, queue_key)) => {
if let Some(queue_key) = queue_key {
@@ -182,7 +188,10 @@ impl ActionController {
} else {
// I'm not 100% sure if having a `None` key is even possible here
// (REMOVE would have to be called before START TIMER),
debug!("Tried to REMOVE pending ack without TIMER active - {frag_id}");
debug!(
"Tried to REMOVE pending ack without TIMER active - {}",
frag_id
);
}
}
}
@@ -191,26 +200,27 @@ impl ActionController {
// initiated basically as a first step of retransmission. At first data has its delay updated
// (as new sphinx packet was created with new expected delivery time)
fn handle_update_pending_ack(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
trace!("{frag_id} is updating its delay");
trace!("{} is updating its delay", frag_id);
// TODO: is it possible to solve this without either locking or temporarily removing the value?
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
// SAFETY: this Action is triggered by `RetransmissionRequestListener` (for 'normal' packets)
// this Action is triggered by `RetransmissionRequestListener` (for 'normal' packets)
// or `ReplyController` (for 'reply' packets) which held the other potential
// reference to this Arc. HOWEVER, before the Action was pushed onto the queue, the reference
// was dropped hence this unwrap is safe.
#[allow(clippy::unwrap_used)]
let mut inner_data = Arc::try_unwrap(pending_ack_data).unwrap();
inner_data.update_retransmitted(delay);
self.pending_acks_data
.insert(frag_id, (Arc::new(inner_data), queue_key));
} else {
debug!("Tried to UPDATE TIMER on pending ack that is already gone! - {frag_id}");
debug!(
"Tried to UPDATE TIMER on pending ack that is already gone! - {}",
frag_id
);
}
}
// note: when the entry expires it's automatically removed from pending_acks_timers
#[allow(clippy::panic)]
fn handle_expired_ack_timer(&mut self, expired_ack: Expired<FragmentIdentifier>) {
let frag_id = expired_ack.into_inner();
@@ -231,7 +241,7 @@ impl ActionController {
.unbounded_send(Arc::downgrade(pending_ack_data))
{
if !self.task_client.is_shutdown_poll() {
tracing::error!("Failed to send pending ack for retransmission: {err}");
log::error!("Failed to send pending ack for retransmission: {err}");
}
}
} else {
@@ -259,7 +269,7 @@ impl ActionController {
action = self.incoming_actions.next() => match action {
Some(action) => self.process_action(action),
None => {
tracing::trace!(
log::trace!(
"ActionController: Stopping since incoming actions channel closed"
);
break;
@@ -268,17 +278,17 @@ impl ActionController {
expired_ack = self.pending_acks_timers.next() => match expired_ack {
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack),
None => {
tracing::trace!("ActionController: Stopping since ack channel closed");
log::trace!("ActionController: Stopping since ack channel closed");
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("ActionController: Received shutdown");
log::trace!("ActionController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("ActionController: Exiting");
log::debug!("ActionController: Exiting");
}
}
@@ -5,6 +5,7 @@ use crate::client::inbound_messages::{InputMessage, InputMessageReceiver};
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
use crate::client::replies::reply_controller::ReplyControllerSender;
use log::*;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
@@ -12,7 +13,6 @@ use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use tracing::*;
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
/// putting everything into sphinx packets, etc.
@@ -120,7 +120,6 @@ where
}
}
#[allow(clippy::panic)]
async fn on_input_message(&mut self, msg: InputMessage) {
match msg {
InputMessage::Regular {
@@ -214,9 +213,7 @@ where
self.handle_premade_packets(msgs, lane).await
}
// MessageWrappers can't be nested
InputMessage::MessageWrapper { .. } => {
panic!("attempted to use nested MessageWrapper")
}
InputMessage::MessageWrapper { .. } => unimplemented!(),
},
};
}
@@ -226,24 +223,21 @@ where
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("InputMessageListener: Received shutdown");
break;
}
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
self.on_input_message(input_msg).await;
},
None => {
tracing::trace!("InputMessageListener: Stopping since channel closed");
log::trace!("InputMessageListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
log::trace!("InputMessageListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("InputMessageListener: Exiting");
log::debug!("InputMessageListener: Exiting");
}
}
@@ -13,6 +13,7 @@ use crate::client::replies::reply_controller::ReplyControllerSender;
use crate::spawn_future;
use action_controller::AckActionReceiver;
use futures::channel::mpsc;
use log::*;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::params::{PacketSize, PacketType};
@@ -29,7 +30,6 @@ use std::{
sync::{Arc, Weak},
time::Duration,
};
use tracing::*;
pub(crate) use action_controller::{AckActionSender, Action};
@@ -298,44 +298,29 @@ where
let mut sent_notification_listener = self.sent_notification_listener;
let mut action_controller = self.action_controller;
spawn_future!(
async move {
acknowledgement_listener.run().await;
debug!("The acknowledgement listener has finished execution!");
},
"AcknowledgementController::AcknowledgementListener"
);
spawn_future(async move {
acknowledgement_listener.run().await;
debug!("The acknowledgement listener has finished execution!");
});
spawn_future!(
async move {
input_message_listener.run().await;
debug!("The input listener has finished execution!");
},
"AcknowledgementController::InputMessageListener"
);
spawn_future(async move {
input_message_listener.run().await;
debug!("The input listener has finished execution!");
});
spawn_future!(
async move {
retransmission_request_listener.run(packet_type).await;
debug!("The retransmission request listener has finished execution!");
},
"AcknowledgementController::RetransmissionRequestListener"
);
spawn_future(async move {
retransmission_request_listener.run(packet_type).await;
debug!("The retransmission request listener has finished execution!");
});
spawn_future!(
async move {
sent_notification_listener.run().await;
debug!("The sent notification listener has finished execution!");
},
"AcknowledgementController::SentNotificationListener"
);
spawn_future(async move {
sent_notification_listener.run().await;
debug!("The sent notification listener has finished execution!");
});
spawn_future!(
async move {
action_controller.run().await;
debug!("The controller has finished execution!");
},
"AcknowledgementController::ActionController"
);
spawn_future(async move {
action_controller.run().await;
debug!("The controller has finished execution!");
});
}
}
@@ -10,13 +10,13 @@ use crate::client::real_messages_control::message_handler::{MessageHandler, Prep
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
use crate::client::replies::reply_controller::ReplyControllerSender;
use futures::StreamExt;
use log::*;
use nym_sphinx::chunking::fragment::Fragment;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
use nym_task::{connections::TransmissionLane, TaskClient};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
use tracing::*;
// responsible for packet retransmission upon fired timer
pub(super) struct RetransmissionRequestListener<R> {
@@ -179,22 +179,19 @@ where
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
break;
}
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
None => {
tracing::trace!("RetransmissionRequestListener: Stopping since channel closed");
log::trace!("RetransmissionRequestListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
log::trace!("RetransmissionRequestListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("RetransmissionRequestListener: Exiting");
log::debug!("RetransmissionRequestListener: Exiting");
}
}
@@ -4,9 +4,9 @@
use super::action_controller::{AckActionSender, Action};
use super::SentPacketNotificationReceiver;
use futures::StreamExt;
use log::*;
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use nym_task::TaskClient;
use tracing::*;
/// Module responsible for starting up retransmission timers.
/// It is required because when we send our packet to the `real traffic stream` controlled
@@ -56,17 +56,17 @@ impl SentNotificationListener {
self.on_sent_message(frag_id).await;
}
None => {
tracing::trace!("SentNotificationListener: Stopping since channel closed");
log::trace!("SentNotificationListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("SentNotificationListener: Received shutdown");
log::trace!("SentNotificationListener: Received shutdown");
break;
}
}
}
assert!(self.task_client.is_shutdown_poll());
tracing::debug!("SentNotificationListener: Exiting");
log::debug!("SentNotificationListener: Exiting");
}
}
@@ -9,7 +9,6 @@ use crate::client::real_messages_control::{AckActionSender, Action};
use crate::client::replies::reply_controller::MaxRetransmissions;
use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags};
use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit};
use nym_client_core_surb_storage::RetrievedReplySurb;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessage, ReplyMessage};
@@ -35,9 +34,6 @@ pub enum PreparationError {
#[error(transparent)]
NymTopologyError(#[from] NymTopologyError),
#[error("message wasn't split into any fragments!")]
EmptyFragments,
#[error("message too long for a single SURB, splitting into {fragments} fragments.")]
MessageTooLongForSingleSurb { fragments: usize },
@@ -48,7 +44,10 @@ pub enum PreparationError {
}
impl PreparationError {
fn return_surbs(self, returned_surbs: Vec<RetrievedReplySurb>) -> SurbWrappedPreparationError {
fn return_surbs(
self,
returned_surbs: Vec<ReplySurbWithKeyRotation>,
) -> SurbWrappedPreparationError {
SurbWrappedPreparationError {
source: self,
returned_surbs: Some(returned_surbs),
@@ -62,7 +61,7 @@ pub struct SurbWrappedPreparationError {
#[source]
source: PreparationError,
returned_surbs: Option<Vec<RetrievedReplySurb>>,
returned_surbs: Option<Vec<ReplySurbWithKeyRotation>>,
}
impl<T> From<T> for SurbWrappedPreparationError
@@ -84,7 +83,7 @@ impl SurbWrappedPreparationError {
target: &AnonymousSenderTag,
) -> PreparationError {
if let Some(reply_surbs) = self.returned_surbs {
surb_storage.re_insert_reply_surbs(target, reply_surbs)
surb_storage.insert_surbs(target, reply_surbs)
}
self.source
}
@@ -106,9 +105,6 @@ pub(crate) struct Config {
/// will be routed as usual, to the entry gateway, through three mix nodes, egressing
/// through the exit gateway. If mix hops are disabled, traffic will be routed directly
/// from the entry gateway to the exit gateway, bypassing the mix nodes.
///
/// This overrides the `use_legacy_sphinx_format` setting as reduced mix hops
/// requires use of the updated SURB packet format.
disable_mix_hops: bool,
/// Average delay a data packet is going to get delay at a single mixnode.
@@ -163,12 +159,8 @@ impl Config {
}
/// Configure whether messages senders using this config should use mix hops or not when sending messages.
///
/// This overrides the `use_legacy_sphinx_format` setting as disabled mix hops
/// requires use of the updated SURB packet format.
pub fn disable_mix_hops(mut self, disable_mix_hops: bool) -> Self {
self.disable_mix_hops = disable_mix_hops;
self.use_legacy_sphinx_format = false;
self
}
}
@@ -232,10 +224,6 @@ where
}
}
pub(crate) fn topology_access_handle(&self) -> &TopologyAccessor {
&self.topology_access
}
fn get_or_create_sender_tag(&mut self, recipient: &Recipient) -> AnonymousSenderTag {
if let Some(existing) = self.tag_storage.try_get_existing(recipient) {
trace!("we already had sender tag for {recipient}");
@@ -303,7 +291,7 @@ where
&mut self,
target: AnonymousSenderTag,
message: ReplyMessage,
reply_surb: RetrievedReplySurb,
reply_surb: ReplySurbWithKeyRotation,
is_extra_surb_request: bool,
) -> Result<(), SurbWrappedPreparationError> {
let msg = NymMessage::new_reply(message);
@@ -323,16 +311,6 @@ where
});
}
if fragment.is_empty() {
error!("CRITICAL FAILURE: our split message didn't result in any sendable fragments");
return Err(SurbWrappedPreparationError {
source: PreparationError::EmptyFragments,
returned_surbs: Some(vec![reply_surb]),
});
}
// SAFETY: we just checked we have one fragment
#[allow(clippy::unwrap_used)]
let chunk = fragment.pop().unwrap();
let chunk_clone = chunk.clone();
let prepared_fragment = self
@@ -344,10 +322,7 @@ where
Some(chunk.fragment_identifier()),
);
let delay = prepared_fragment.total_delay;
// we have to set a maximum number of retransmissions in case we fail to retrieve
// surbs for a long period of time; we don't want to be stuck constantly resending the data
let max_retransmissions = Some(10);
let max_retransmissions = None;
let pending_ack = PendingAcknowledgement::new_anonymous(
chunk,
delay,
@@ -370,7 +345,7 @@ where
pub(crate) async fn try_request_additional_reply_surbs(
&mut self,
from: AnonymousSenderTag,
reply_surb: RetrievedReplySurb,
reply_surb: ReplySurbWithKeyRotation,
amount: u32,
) -> Result<(), SurbWrappedPreparationError> {
debug!("requesting {amount} reply SURBs from {from}");
@@ -410,9 +385,11 @@ where
&mut self,
target: AnonymousSenderTag,
fragments: Vec<FragmentWithMaxRetransmissions>,
reply_surbs: impl IntoIterator<Item = RetrievedReplySurb>,
reply_surbs: Vec<ReplySurbWithKeyRotation>,
lane: TransmissionLane,
) -> Result<(), SurbWrappedPreparationError> {
// TODO: technically this is performing an unnecessary cloning, but in the grand scheme of things
// is it really that bad?
self.try_send_reply_chunks(
target,
fragments.into_iter().map(|f| (lane, f)).collect(),
@@ -425,7 +402,7 @@ where
&mut self,
target: AnonymousSenderTag,
fragments: Vec<(TransmissionLane, FragmentWithMaxRetransmissions)>,
reply_surbs: impl IntoIterator<Item = RetrievedReplySurb>,
reply_surbs: Vec<ReplySurbWithKeyRotation>,
) -> Result<(), SurbWrappedPreparationError> {
let prepared_fragments = self
.prepare_reply_chunks_for_sending(
@@ -548,7 +525,6 @@ where
pending_acks.push(pending_ack);
}
drop(topology_permit);
self.insert_pending_acks(pending_acks);
self.forward_messages(real_messages, lane).await;
@@ -588,7 +564,7 @@ where
)
.await?;
tracing::trace!("storing {} reply keys", reply_keys.len());
log::trace!("storing {} reply keys", reply_keys.len());
self.reply_key_storage.insert_multiple(reply_keys);
Ok(())
@@ -628,7 +604,7 @@ where
)
.await?;
tracing::trace!("storing {} reply keys", reply_keys.len());
log::trace!("storing {} reply keys", reply_keys.len());
self.reply_key_storage.insert_multiple(reply_keys);
Ok(())
@@ -658,12 +634,20 @@ where
pub(crate) async fn prepare_reply_chunks_for_sending(
&mut self,
fragments: Vec<Fragment>,
reply_surbs: impl IntoIterator<Item = RetrievedReplySurb>,
reply_surbs: Vec<ReplySurbWithKeyRotation>,
) -> Result<Vec<PreparedFragment>, SurbWrappedPreparationError> {
debug_assert_eq!(
fragments.len(),
reply_surbs.len(),
"attempted to send {} fragments with {} reply surbs",
fragments.len(),
reply_surbs.len()
);
let topology_permit = self.topology_access.get_read_permit().await;
let topology = match self.get_topology(&topology_permit) {
Ok(topology) => topology,
Err(err) => return Err(err.return_surbs(reply_surbs.into_iter().collect())),
Err(err) => return Err(err.return_surbs(reply_surbs)),
};
Ok(fragments
@@ -671,13 +655,12 @@ where
.zip(reply_surbs.into_iter())
.map(|(fragment, reply_surb)| {
// unwrap here is fine as we know we have a valid topology
#[allow(clippy::unwrap_used)]
self.message_preparer
.prepare_reply_chunk_for_sending(
fragment,
topology,
&self.config.ack_key,
reply_surb.into(),
reply_surb,
PacketType::Mix,
)
.unwrap()
@@ -687,7 +670,7 @@ where
pub(crate) async fn try_prepare_single_reply_chunk_for_sending(
&mut self,
reply_surb: RetrievedReplySurb,
reply_surb: ReplySurbWithKeyRotation,
chunk: Fragment,
) -> Result<PreparedFragment, SurbWrappedPreparationError> {
let topology_permit = self.topology_access.get_read_permit().await;
@@ -700,7 +683,7 @@ where
chunk,
topology,
&self.config.ack_key,
reply_surb.into(),
reply_surb,
PacketType::Mix,
)?;
@@ -731,21 +714,17 @@ where
// tells real message sender (with the poisson timer) to send this to the mix network
pub(crate) async fn forward_messages(
&mut self,
&self,
messages: Vec<RealMessage>,
transmission_lane: TransmissionLane,
) {
tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown while attempting to forward mixnet messages");
}
sending_res = self.real_message_sender.send((messages, transmission_lane)) => {
if sending_res.is_err() {
error!(
"failed to forward mixnet messages due to closed channel (outside of shutdown!)"
);
}
if let Err(err) = self
.real_message_sender
.send((messages, transmission_lane))
.await
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to forward messages to the real message sender: {err}");
}
}
}
@@ -24,6 +24,7 @@ use crate::{
spawn_future,
};
use futures::channel::mpsc;
use log::*;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
@@ -33,9 +34,7 @@ use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_task::TaskClient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
use tracing::*;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
pub(crate) use acknowledgement_control::{AckActionSender, Action};
pub(crate) mod acknowledgement_control;
@@ -86,6 +85,12 @@ impl<'a> From<&'a Config> for real_traffic_stream::Config {
}
}
impl<'a> From<&'a Config> for reply_controller::Config {
fn from(cfg: &'a Config) -> Self {
reply_controller::Config::new(cfg.reply_surbs)
}
}
impl<'a> From<&'a Config> for message_handler::Config {
fn from(cfg: &'a Config) -> Self {
message_handler::Config::new(
@@ -134,7 +139,6 @@ impl RealMessagesController<OsRng> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
config: Config,
key_rotation_config: KeyRotationConfig,
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
@@ -165,8 +169,7 @@ impl RealMessagesController<OsRng> {
// create all configs for the components
let ack_control_config = (&config).into();
let out_queue_config = (&config).into();
let reply_controller_config =
reply_controller::Config::new(config.reply_surbs, key_rotation_config);
let reply_controller_config = (&config).into();
let message_handler_config = (&config).into();
// create the actual components
@@ -224,20 +227,14 @@ impl RealMessagesController<OsRng> {
let ack_control = self.ack_control;
let mut reply_control = self.reply_control;
spawn_future!(
async move {
out_queue_control.run().await;
debug!("The out queue controller has finished execution!");
},
"RealMessagesController::OutQueueControl)"
);
spawn_future!(
async move {
reply_control.run().await;
debug!("The reply controller has finished execution!");
},
"RealMessagesController::ReplyController"
);
spawn_future(async move {
out_queue_control.run().await;
debug!("The out queue controller has finished execution!");
});
spawn_future(async move {
reply_control.run().await;
debug!("The reply controller has finished execution!");
});
ack_control.start(packet_type);
}
@@ -9,6 +9,7 @@ use crate::client::transmission_buffer::TransmissionBuffer;
use crate::config;
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use log::*;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
@@ -26,7 +27,6 @@ use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
@@ -202,7 +202,7 @@ where
// well technically the message was not sent just yet, but now it's up to internal
// queues and client load rather than the required delay. So realistically we can treat
// whatever is about to happen as negligible additional delay.
trace!("{frag_id} is about to get sent to the mixnet");
trace!("{} is about to get sent to the mixnet", frag_id);
if let Err(err) = self.sent_notifier.unbounded_send(frag_id) {
error!("Failed to notify about sent message: {err}");
}
@@ -249,8 +249,6 @@ where
}
};
// SAFETY: our topology must be valid at this point
#[allow(clippy::expect_used)]
(
generate_loop_cover_packet(
&mut self.rng,
@@ -280,33 +278,17 @@ where
}
};
let sending_res = tokio::select! {
biased;
_ = self.task_client.recv() => {
trace!("received shutdown signal while attempting to send mix message");
return
}
sending_res = self.mix_tx.send(vec![next_message]) => {
sending_res
}
};
match sending_res {
Err(_) => {
if !self.task_client.is_shutdown_poll() {
tracing::error!(
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
);
}
}
Ok(_) => {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
if !self.task_client.is_shutdown_poll() {
log::error!("Failed to send: {err}");
}
} else {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}
// notify ack controller about sending our message only after we actually managed to push it
@@ -331,7 +313,7 @@ where
}
fn on_close_connection(&mut self, connection_id: ConnectionId) {
tracing::debug!("Removing lane for connection: {connection_id}");
log::debug!("Removing lane for connection: {connection_id}");
self.transmission_buffer
.remove(&TransmissionLane::ConnectionId(connection_id));
}
@@ -343,7 +325,7 @@ where
fn adjust_current_average_message_sending_delay(&mut self) {
let used_slots = self.mix_tx.max_capacity() - self.mix_tx.capacity();
tracing::trace!(
log::trace!(
"used_slots: {used_slots}, current_multiplier: {}",
self.sending_delay_controller.current_multiplier()
);
@@ -352,7 +334,7 @@ where
.sending_delay_controller
.is_backpressure_currently_detected(used_slots)
{
tracing::trace!("Backpressure detected");
log::trace!("Backpressure detected");
self.sending_delay_controller.record_backpressure_detected();
}
@@ -454,11 +436,9 @@ where
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some((real_messages, conn_id))) => {
tracing::trace!("handling real_messages: size: {}", real_messages.len());
log::trace!("handling real_messages: size: {}", real_messages.len());
self.transmission_buffer.store(&conn_id, real_messages);
// SAFETY: we just stored the message
#[allow(clippy::expect_used)]
let real_next = self.pop_next_message().expect("Just stored one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
@@ -503,12 +483,10 @@ where
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some((real_messages, conn_id))) => {
tracing::trace!("handling real_messages: size: {}", real_messages.len());
log::trace!("handling real_messages: size: {}", real_messages.len());
// First store what we got for the given connection id
self.transmission_buffer.store(&conn_id, real_messages);
// SAFETY: we just stored the message
#[allow(clippy::expect_used)]
let real_next = self.pop_next_message().expect("we just added one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
@@ -560,11 +538,11 @@ where
};
if packets > 1000 {
tracing::warn!("{status_str}");
log::warn!("{status_str}");
} else if packets > 0 {
tracing::info!("{status_str}");
log::info!("{status_str}");
} else {
tracing::debug!("{status_str}");
log::debug!("{status_str}");
}
// Send status message to whoever is listening (possibly UI)
@@ -588,7 +566,7 @@ where
tokio::select! {
biased;
_ = shutdown.recv() => {
tracing::trace!("OutQueueControl: Received shutdown");
log::trace!("OutQueueControl: Received shutdown");
break;
}
_ = status_timer.tick() => {
@@ -597,7 +575,7 @@ where
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
} else {
tracing::trace!("OutQueueControl: Stopping since channel closed");
log::trace!("OutQueueControl: Stopping since channel closed");
break;
}
}
@@ -611,18 +589,18 @@ where
tokio::select! {
biased;
_ = shutdown.recv() => {
tracing::trace!("OutQueueControl: Received shutdown");
log::trace!("OutQueueControl: Received shutdown");
}
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
} else {
tracing::trace!("OutQueueControl: Stopping since channel closed");
log::trace!("OutQueueControl: Stopping since channel closed");
break;
}
}
}
}
tracing::debug!("OutQueueControl: Exiting");
log::debug!("OutQueueControl: Exiting");
}
}
@@ -98,12 +98,12 @@ impl SendingDelayController {
self.current_multiplier =
(self.current_multiplier + 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
tracing::debug!(
log::debug!(
"Increasing sending delay multiplier to: {}",
self.current_multiplier
);
} else {
tracing::warn!("Trying to increase delay multipler higher than allowed");
log::warn!("Trying to increase delay multipler higher than allowed");
}
}
@@ -112,7 +112,7 @@ impl SendingDelayController {
self.current_multiplier =
(self.current_multiplier - 1).clamp(self.lower_bound, self.upper_bound);
self.time_when_changed = get_time_now();
tracing::debug!(
log::debug!(
"Decreasing sending delay multiplier to: {}",
self.current_multiplier
);
@@ -164,11 +164,11 @@ impl SendingDelayController {
self.current_multiplier()
);
if self.current_multiplier() > 0 {
tracing::debug!("{status_str}");
log::debug!("{}", status_str);
} else if self.current_multiplier() > 1 {
tracing::info!("{status_str}");
log::info!("{}", status_str);
} else if self.current_multiplier() > 2 {
tracing::warn!("{status_str}");
log::warn!("{}", status_str);
}
self.time_when_logged_about_elevated_multiplier = now;
}
@@ -8,6 +8,7 @@ use crate::spawn_future;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::StreamExt;
use log::*;
use nym_crypto::asymmetric::x25519;
use nym_crypto::Digest;
use nym_gateway_client::MixnetMessageReceiver;
@@ -23,7 +24,6 @@ use nym_task::TaskClient;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::*;
// The interval at which we check for stale buffers
const STALE_BUFFER_CHECK_INTERVAL: Duration = Duration::from_secs(10);
@@ -198,7 +198,6 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
}
#[allow(clippy::panic)]
async fn disconnect_sender(&mut self) {
let mut guard = self.inner.lock().await;
if guard.message_sender.is_none() {
@@ -209,7 +208,6 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
guard.message_sender = None;
}
#[allow(clippy::panic)]
async fn connect_sender(&mut self, sender: ReconstructedMessagesSender) {
let mut guard = self.inner.lock().await;
if guard.message_sender.is_some() {
@@ -223,7 +221,10 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
let stored_messages = std::mem::take(&mut guard.messages);
if !stored_messages.is_empty() {
if let Err(err) = sender.unbounded_send(stored_messages) {
error!("The sender channel we just received is already invalidated - {err:?}");
error!(
"The sender channel we just received is already invalidated - {:?}",
err
);
// put the values back to the buffer
// the returned error has two fields: err: SendError and val: T,
// where val is the value that was failed to get sent;
@@ -309,15 +310,13 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
};
if !reply_surbs.is_empty() {
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
msg.sender_tag,
reply_surbs,
from_surb_request,
) {
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
msg.sender_tag,
reply_surbs,
from_surb_request,
) {
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
}
}
@@ -501,20 +500,20 @@ impl<R: MessageReceiver> RequestReceiver<R> {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("RequestReceiver: Received shutdown");
log::trace!("RequestReceiver: Received shutdown");
}
request = self.query_receiver.next() => {
if let Some(message) = request {
self.handle_message(message).await
} else {
tracing::trace!("RequestReceiver: Stopping since channel closed");
log::trace!("RequestReceiver: Stopping since channel closed");
break;
}
},
}
}
self.task_client.recv().await;
tracing::debug!("RequestReceiver: Exiting");
log::debug!("RequestReceiver: Exiting");
}
}
@@ -545,17 +544,17 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
if let Some(new_messages) = new_messages {
self.received_buffer.handle_new_received(new_messages).await?;
} else {
tracing::trace!("FragmentedMessageReceiver: Stopping since channel closed");
log::trace!("FragmentedMessageReceiver: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv_with_delay() => {
tracing::trace!("FragmentedMessageReceiver: Received shutdown");
log::trace!("FragmentedMessageReceiver: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("FragmentedMessageReceiver: Exiting");
log::debug!("FragmentedMessageReceiver: Exiting");
Ok(())
}
}
@@ -601,20 +600,14 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
let mut fragmented_message_receiver = self.fragmented_message_receiver;
let mut request_receiver = self.request_receiver;
spawn_future!(
async move {
match fragmented_message_receiver.run().await {
Ok(_) => {}
Err(e) => error!("{e}"),
}
},
"ReceivedMessagesBufferController::FragmentedMessageReceiver"
);
spawn_future!(
async move {
request_receiver.run().await;
},
"ReceivedMessagesBufferController::RequestReceiver"
);
spawn_future(async move {
match fragmented_message_receiver.run().await {
Ok(_) => {}
Err(e) => error!("{e}"),
}
});
spawn_future(async move {
request_receiver.run().await;
});
}
}
@@ -1,169 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_topology::NymTopologyMetadata;
use nym_validator_client::models::{
EpochId, KeyRotationId, KeyRotationInfoResponse, KeyRotationState,
};
use std::time::Duration;
use time::OffsetDateTime;
#[derive(Clone, Copy)]
pub(crate) enum SurbRefreshState {
WaitingForNextRotation { last_known: KeyRotationId },
ScheduledForNextInvocation,
}
#[derive(Clone, Copy)]
pub(crate) struct ReferenceEpoch {
pub(crate) absolute_epoch_id: EpochId,
pub(crate) start_time: OffsetDateTime,
}
#[derive(Clone, Copy)]
pub(crate) struct KeyRotationConfig {
pub(crate) epoch_duration: Duration,
pub(crate) rotation_state: KeyRotationState,
pub(crate) reference_epoch: ReferenceEpoch,
}
impl From<KeyRotationInfoResponse> for KeyRotationConfig {
fn from(value: KeyRotationInfoResponse) -> Self {
KeyRotationConfig {
epoch_duration: value.details.epoch_duration,
rotation_state: value.details.key_rotation_state,
reference_epoch: ReferenceEpoch {
absolute_epoch_id: value.details.current_absolute_epoch_id,
start_time: value.details.current_epoch_start,
},
}
}
}
impl KeyRotationConfig {
pub(crate) fn rotation_lifetime(&self) -> Duration {
(self.rotation_state.validity_epochs + 1) * self.epoch_duration
}
pub(crate) fn key_rotation_id(&self, current_absolute_epoch_id: EpochId) -> KeyRotationId {
self.rotation_state
.key_rotation_id(current_absolute_epoch_id)
}
// this is called with the assumption that now is always > reference epoch start
pub(crate) fn expected_current_epoch_id(&self, now: OffsetDateTime) -> EpochId {
let diff_secs = (now - self.reference_epoch.start_time).as_seconds_f64();
let epochs = (diff_secs / self.epoch_duration.as_secs_f64()).floor() as u32;
self.reference_epoch.absolute_epoch_id + epochs
}
fn initial_rotation_epoch_start(&self) -> OffsetDateTime {
let epochs_diff = self
.reference_epoch
.absolute_epoch_id
.saturating_sub(self.rotation_state.initial_epoch_id);
self.reference_epoch.start_time - epochs_diff * self.epoch_duration
}
pub(crate) fn key_rotation_start(&self, key_rotation_id: KeyRotationId) -> OffsetDateTime {
let rotation_duration = self.rotation_state.validity_epochs * self.epoch_duration;
let initial_start = self.initial_rotation_epoch_start();
// note: key rotation starts from 0
initial_start + rotation_duration * key_rotation_id
}
pub(crate) fn expected_current_key_rotation_id(&self, now: OffsetDateTime) -> KeyRotationId {
let expected_current_epoch = self.expected_current_epoch_id(now);
self.key_rotation_id(expected_current_epoch)
}
pub(crate) fn expected_current_key_rotation_start(
&self,
now: OffsetDateTime,
) -> OffsetDateTime {
let expected_current_key_rotation_id = self.expected_current_key_rotation_id(now);
self.key_rotation_start(expected_current_key_rotation_id)
}
pub(crate) fn epoch_stuck(&self, topology_metadata: NymTopologyMetadata) -> bool {
// add leeway of 2mins each direction since transition is not instantaneous
let lower_bound = topology_metadata.refreshed_at - Duration::from_secs(2);
let upper_bound = topology_metadata.refreshed_at + Duration::from_secs(2);
let expected_epoch_lower = self.expected_current_epoch_id(lower_bound);
let expected_epoch_upper = self.expected_current_epoch_id(upper_bound);
topology_metadata.absolute_epoch_id != expected_epoch_lower
&& topology_metadata.absolute_epoch_id != expected_epoch_upper
}
}
#[cfg(test)]
mod tests {
use super::*;
use time::macros::datetime;
fn mock_config() -> KeyRotationConfig {
KeyRotationConfig {
epoch_duration: Duration::from_secs(60 * 60),
rotation_state: KeyRotationState {
validity_epochs: 10,
initial_epoch_id: 80,
},
reference_epoch: ReferenceEpoch {
absolute_epoch_id: 100,
start_time: datetime!(2025-06-30 12:00:00+00:00),
},
}
}
#[test]
fn expected_current_key_rotation_start() {
// rot0: 80-89
// rot1: 90-99
// rot2: 100-109
// rot3: 110-119
// ... etc
let cfg = mock_config();
assert_eq!(
cfg.initial_rotation_epoch_start(),
datetime!(2025-06-29 16:00:00+00:00)
);
let fake_now = datetime!(2025-06-30 12:00:00+00:00);
assert_eq!(cfg.expected_current_epoch_id(fake_now), 100);
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 2);
assert_eq!(
cfg.expected_current_key_rotation_start(fake_now),
datetime!(2025-06-30 12:00:00+00:00)
);
let fake_now = datetime!(2025-06-30 12:30:00+00:00);
assert_eq!(cfg.expected_current_epoch_id(fake_now), 100);
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 2);
assert_eq!(
cfg.expected_current_key_rotation_start(fake_now),
datetime!(2025-06-30 12:00:00+00:00)
);
let fake_now = datetime!(2025-06-30 13:01:00+00:00);
assert_eq!(cfg.expected_current_epoch_id(fake_now), 101);
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 2);
assert_eq!(
cfg.expected_current_key_rotation_start(fake_now),
datetime!(2025-06-30 12:00:00+00:00)
);
let fake_now = datetime!(2025-06-30 22:02:00+00:00);
assert_eq!(cfg.expected_current_epoch_id(fake_now), 110);
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 3);
assert_eq!(
cfg.expected_current_key_rotation_start(fake_now),
datetime!(2025-06-30 22:00:00+00:00)
);
}
}
File diff suppressed because it is too large Load Diff
@@ -1,901 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
use crate::client::real_messages_control::message_handler::{
FragmentWithMaxRetransmissions, MessageHandler, PreparationError,
};
use crate::client::replies::reply_controller::key_rotation_helpers::SurbRefreshState;
use crate::client::replies::reply_controller::Config;
use crate::client::topology_control::TopologyAccessor;
use crate::client::transmission_buffer::TransmissionBuffer;
use futures::channel::oneshot;
use nym_client_core_surb_storage::{ReceivedReplySurb, ReceivedReplySurbsMap};
use nym_crypto::aes::cipher::crypto_common::rand_core::CryptoRng;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_task::connections::{ConnectionId, TransmissionLane};
use nym_topology::NymTopologyMetadata;
use rand::Rng;
use std::cmp::{max, min};
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::mem;
use std::sync::{Arc, Weak};
use time::OffsetDateTime;
use tracing::{debug, error, info, trace, warn};
struct SenderData {
current_clear_rerequest_counter: usize,
pending_replies: TransmissionBuffer<FragmentWithMaxRetransmissions>,
pending_retransmissions: BTreeMap<FragmentIdentifier, Weak<PendingAcknowledgement>>,
last_request_failure: OffsetDateTime,
}
impl Default for SenderData {
fn default() -> Self {
SenderData {
current_clear_rerequest_counter: 0,
pending_replies: Default::default(),
pending_retransmissions: Default::default(),
last_request_failure: OffsetDateTime::UNIX_EPOCH,
}
}
}
impl SenderData {
fn total_pending(&self) -> usize {
let pending_replies = self.pending_replies.total_size();
let pending_retransmissions = self.pending_retransmissions.len();
let total_pending = pending_retransmissions + pending_replies;
debug!("total queue size: {total_pending} = pending data {pending_replies} + pending retransmission {pending_retransmissions}");
total_pending
}
pub(crate) fn increment_current_clear_rerequest_counter(&mut self) {
self.current_clear_rerequest_counter += 1;
}
pub(crate) fn reset_current_clear_rerequest_counter(&mut self) {
self.current_clear_rerequest_counter = 0;
}
pub(crate) fn reset_last_request_failure(&mut self, now: OffsetDateTime) -> OffsetDateTime {
mem::replace(&mut self.last_request_failure, now)
}
}
/// Reply controller responsible for controlling receiver-related part
/// of replies, such as requesting additional reply SURBs
pub struct ReceiverReplyController<R> {
config: Config,
surb_refresh_state: SurbRefreshState,
topology_access: TopologyAccessor,
surb_senders: HashMap<AnonymousSenderTag, SenderData>,
unavailable: HashMap<AnonymousSenderTag, OffsetDateTime>,
surbs_storage: ReceivedReplySurbsMap,
// TODO: incorporate that field at some point
// and use binomial distribution to determine the expected required number
// of surbs required to send the message through
// expected_reliability: f32,
message_handler: MessageHandler<R>,
}
impl<R> ReceiverReplyController<R>
where
R: CryptoRng + Rng,
{
pub(crate) fn new(
config: Config,
storage: ReceivedReplySurbsMap,
message_handler: MessageHandler<R>,
) -> Self {
let topology_access = message_handler.topology_access_handle().clone();
ReceiverReplyController {
config,
surb_refresh_state: SurbRefreshState::WaitingForNextRotation {
last_known: config
.key_rotation
.expected_current_key_rotation_id(OffsetDateTime::now_utc()),
},
topology_access,
surb_senders: Default::default(),
unavailable: Default::default(),
surbs_storage: storage,
message_handler,
}
}
fn get_or_create_surb_sender(&mut self, tag: &AnonymousSenderTag) -> &mut SenderData {
self.surb_senders.entry(*tag).or_default()
}
async fn current_topology_metadata(&self) -> Option<NymTopologyMetadata> {
self.topology_access.current_metadata().await
}
fn insert_pending_replies<I: IntoIterator<Item = FragmentWithMaxRetransmissions>>(
&mut self,
recipient: &AnonymousSenderTag,
fragments: I,
lane: TransmissionLane,
) {
trace!("buffering pending replies for {recipient}");
self.surb_senders
.entry(*recipient)
.or_default()
.pending_replies
.store(&lane, fragments)
}
fn re_insert_pending_replies(
&mut self,
recipient: &AnonymousSenderTag,
fragments: Vec<(TransmissionLane, FragmentWithMaxRetransmissions)>,
) {
trace!("re-inserting pending replies for {recipient}");
// the buffer should ALWAYS exist at this point, if it doesn't, it's a bug...
self.surb_senders
.entry(*recipient)
.or_default()
.pending_replies
.store_multiple(fragments)
}
fn re_insert_pending_retransmission(
&mut self,
recipient: &AnonymousSenderTag,
data: Vec<Arc<PendingAcknowledgement>>,
) {
trace!("re-inserting pending retransmissions for {recipient}");
// SAFETY: the underlying entry MUST exist as we've just got data from there
// and we hold a mut reference
#[allow(clippy::expect_used)]
let map_entry = &mut self
.surb_senders
.get_mut(recipient)
.expect("our pending retransmission entry is somehow gone!")
.pending_retransmissions;
for pending in data {
// if it's 0, we don't need to do anything - we just got that ack!
if Arc::strong_count(&pending) > 1 {
let id = pending.inner_fragment_identifier();
let downgraded = Arc::downgrade(&pending);
map_entry.insert(id, downgraded);
}
}
}
fn should_request_more_surbs(&self, target: &AnonymousSenderTag) -> bool {
trace!("checking if we should request more surbs from {target}");
let total_queue = self
.surb_senders
.get(target)
.map(|pending| pending.total_pending())
.unwrap_or_default();
// only consider 'fresh' surbs
let available_surbs = self.surbs_storage.available_fresh_surbs(target);
let pending_surbs = self.surbs_storage.pending_reception(target) as usize;
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
let max_surbs_threshold = self.surbs_storage.max_surb_threshold();
let min_surbs_threshold_buffer =
self.config.reply_surbs.minimum_reply_surb_threshold_buffer;
// After clearing the queue, we want to have at least `min_surbs_threshold` surbs available
// and reserved for requesting additional surbs, and in addition to that we also want to
// have `min_surbs_threshold_buffer` surbs available proactively.
let target_surbs_after_clearing_queue = min_surbs_threshold + min_surbs_threshold_buffer;
// Check if we have enough surbs to handle the total queue and maintain minimum thresholds
let total_required_surbs = total_queue + target_surbs_after_clearing_queue;
let total_available_surbs = pending_surbs + available_surbs;
debug!("available surbs: {available_surbs} pending surbs: {pending_surbs} threshold range: {min_surbs_threshold}..+{min_surbs_threshold_buffer}..{max_surbs_threshold}");
// We should request more surbs if:
// 1. We haven't hit the maximum surb threshold, and
// 2. We don't have enough surbs to handle the queue plus minimum thresholds
let is_below_max_threshold = total_available_surbs < max_surbs_threshold;
let is_below_required_surbs = total_available_surbs < total_required_surbs;
is_below_max_threshold && is_below_required_surbs
}
pub(crate) async fn handle_send_reply(
&mut self,
recipient_tag: AnonymousSenderTag,
data: Vec<u8>,
lane: TransmissionLane,
max_retransmissions: Option<u32>,
) {
if !self.surbs_storage.contains_surbs_for(&recipient_tag) {
if self
.unavailable
.insert(recipient_tag, OffsetDateTime::now_utc())
.is_none()
{
// don't report it every single time
warn!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
} else {
trace!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
}
return;
}
trace!("handling reply to {recipient_tag}");
let mut fragments = self.message_handler.split_reply_message(data);
let total_size = fragments.len();
trace!("This reply requires {total_size} SURBs");
// for the purposes of sending reply, do allow using possibly stale entries
let available_surbs = self.surbs_storage.available_surbs(&recipient_tag);
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
let max_to_send = if available_surbs > min_surbs_threshold {
min(fragments.len(), available_surbs - min_surbs_threshold)
} else {
0
};
if max_to_send > 0 {
let (surbs, surbs_left) = self
.surbs_storage
.get_reply_surbs(&recipient_tag, max_to_send);
debug!(
"retrieved {} reply surbs. {surbs_left} surbs remaining in storage",
surbs.as_ref().map(|s| s.len()).unwrap_or_default()
);
if let Some(reply_surbs) = surbs {
let to_send = fragments
.drain(..reply_surbs.len())
.map(|f| FragmentWithMaxRetransmissions {
fragment: f,
max_retransmissions,
})
.collect::<Vec<_>>();
if let Err(err) = self
.message_handler
.try_send_reply_chunks_on_lane(
recipient_tag,
to_send.clone(),
reply_surbs,
lane,
)
.await
{
let err = err.return_unused_surbs(&self.surbs_storage, &recipient_tag);
warn!("failed to send reply to {recipient_tag}: {err}");
info!(
"buffering {no_fragments} fragments for {recipient_tag}",
no_fragments = to_send.len()
);
self.insert_pending_replies(&recipient_tag, to_send, lane);
}
}
}
// if there's leftover data we didn't send because we didn't have enough (or any) surbs - buffer it
if !fragments.is_empty() {
// Ideally we should have enough surbs above the minimum threshold to handle sending
// new replies without having to first request more surbs. That's why I'd like to log
// these cases as they might indicate a problem with the surb management.
debug!(
"buffering {no_fragments} fragments for {recipient_tag}",
no_fragments = fragments.len()
);
let fragments: Vec<_> = fragments
.into_iter()
.map(|fragment| FragmentWithMaxRetransmissions {
fragment,
max_retransmissions,
})
.collect();
self.insert_pending_replies(&recipient_tag, fragments, lane);
}
if self.should_request_more_surbs(&recipient_tag) {
self.request_reply_surbs_for_queue_clearing(recipient_tag)
.await;
}
}
async fn request_additional_reply_surbs(
&mut self,
target: AnonymousSenderTag,
amount: u32,
) -> Result<(), PreparationError> {
debug!("requesting {amount} additional reply surbs for {target}");
let (reply_surb, _) = self
.surbs_storage
.get_reply_surb_ignoring_threshold(&target);
let reply_surb = reply_surb.ok_or(PreparationError::NotEnoughSurbs {
available: 0,
required: 1,
})?;
if let Err(err) = self
.message_handler
.try_request_additional_reply_surbs(target, reply_surb, amount)
.await
{
let err = err.return_unused_surbs(&self.surbs_storage, &target);
warn!("failed to request additional surbs from {target}: {err}",);
return Err(err);
} else {
self.surbs_storage
.increment_pending_reception(&target, amount);
}
Ok(())
}
async fn try_clear_pending_retransmission(&mut self, target: AnonymousSenderTag) {
trace!("trying to clear pending retransmission queue");
let available_surbs = self.surbs_storage.available_surbs(&target);
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
let max_to_clear = if available_surbs > min_surbs_threshold {
available_surbs - min_surbs_threshold
} else {
trace!("we don't have enough surbs for retransmission queue clearing...");
return;
};
trace!("we can clear up to {max_to_clear} entries");
let Some(pending) = self.surb_senders.get_mut(&target) else {
trace!("no pending entry for {target}!");
return;
};
let mut to_take = Vec::new();
while to_take.len() < max_to_clear {
if let Some((_, data)) = pending.pending_retransmissions.pop_first() {
// no need to do anything if we failed to upgrade the reference,
// it means we got the ack while the data was waiting in the queue
if let Some(upgraded) = data.upgrade() {
to_take.push(upgraded)
}
} else {
// our map is empty!
break;
}
}
if to_take.is_empty() {
// no need to do anything
return;
}
let (surbs_for_reply, _) = self.surbs_storage.get_reply_surbs(&target, to_take.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
self.re_insert_pending_retransmission(&target, to_take);
return;
};
let to_send_vec = to_take.iter().map(|ack| ack.fragment_data()).collect();
let prepared_fragments = match self
.message_handler
.prepare_reply_chunks_for_sending(to_send_vec, surbs_for_reply)
.await
{
Ok(prepared) => prepared,
Err(err) => {
let err = err.return_unused_surbs(&self.surbs_storage, &target);
self.re_insert_pending_retransmission(&target, to_take);
warn!("failed to clear pending retransmission queue for {target}: {err}",);
return;
}
};
// we can't fail at this point, so drop all references to acks so that timer updates wouldn't blow up
drop(to_take);
self.message_handler
.send_retransmission_reply_chunks(prepared_fragments, TransmissionLane::Retransmission)
.await;
}
fn pop_at_most_pending_replies(
&mut self,
from: &AnonymousSenderTag,
amount: usize,
) -> Option<Vec<(TransmissionLane, FragmentWithMaxRetransmissions)>> {
// if possible, pop all pending replies, if not, pop only entries for which we'd have a reply surb
let pending = self.surb_senders.get_mut(from)?;
let total = pending.pending_replies.total_size();
trace!("pending queue has {total} elements");
if total == 0 {
return None;
}
pending
.pending_replies
.pop_at_most_n_next_messages_at_random(amount)
}
#[allow(clippy::panic)]
async fn try_clear_pending_queue(&mut self, target: AnonymousSenderTag) {
trace!("trying to clear pending queue");
let available_surbs = self.surbs_storage.available_surbs(&target);
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
let max_to_clear = if available_surbs > min_surbs_threshold {
available_surbs - min_surbs_threshold
} else {
trace!("we don't have enough surbs for queue clearing...");
return;
};
trace!("we can clear up to {max_to_clear} entries");
// we're guaranteed to not get more entries than we have reply surbs for
if let Some(to_send) = self.pop_at_most_pending_replies(&target, max_to_clear) {
let to_send_clone = to_send.clone();
if to_send_clone.is_empty() {
panic!(
"please let the devs know if you ever see this message (reply_controller.rs)"
);
}
let (surbs_for_reply, _) = self
.surbs_storage
.get_reply_surbs(&target, to_send_clone.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
self.re_insert_pending_replies(&target, to_send);
return;
};
if let Err(err) = self
.message_handler
.try_send_reply_chunks(target, to_send_clone, surbs_for_reply)
.await
{
let err = err.return_unused_surbs(&self.surbs_storage, &target);
self.re_insert_pending_replies(&target, to_send);
warn!("failed to clear pending queue for {target}: {err}");
}
} else {
trace!("the pending queue is empty");
}
}
fn reset_rerequest_counter(&mut self, from: &AnonymousSenderTag) {
if let Some(pending) = self.surb_senders.get_mut(from) {
pending.reset_current_clear_rerequest_counter()
}
}
pub(crate) async fn handle_received_surbs(
&mut self,
from: AnonymousSenderTag,
reply_surbs: Vec<ReplySurbWithKeyRotation>,
from_surb_request: bool,
) {
trace!("handling received surbs");
// clear the requesting flag since we should have been asking for surbs
if from_surb_request {
self.surbs_storage
.decrement_pending_reception(&from, reply_surbs.len() as u32);
}
// store received surbs
self.surbs_storage.insert_fresh_surbs(&from, reply_surbs);
// reset, if applicable, request counter
self.reset_rerequest_counter(&from);
// use as many as we can for clearing pending retransmission queue
self.try_clear_pending_retransmission(from).await;
// use as many as we can for clearing pending 'normal' queue
self.try_clear_pending_queue(from).await;
// if we have to, request more
if self.should_request_more_surbs(&from) {
self.request_reply_surbs_for_queue_clearing(from).await;
}
}
fn buffer_pending_ack(
&mut self,
recipient: AnonymousSenderTag,
ack_ref: Arc<PendingAcknowledgement>,
weak_ack_ref: Weak<PendingAcknowledgement>,
) {
let frag_id = ack_ref.inner_fragment_identifier();
let pending = self.surb_senders.entry(recipient).or_default();
if let Entry::Vacant(e) = pending.pending_retransmissions.entry(frag_id) {
e.insert(weak_ack_ref);
} else {
warn!(
"we're already trying to retransmit {frag_id}. We must be really behind in surbs!"
);
}
}
pub(crate) async fn handle_reply_retransmission(
&mut self,
recipient_tag: AnonymousSenderTag,
timed_out_ack: Weak<PendingAcknowledgement>,
extra_surbs_request: bool,
) {
// seems we got the ack in the end
let ack_ref = match timed_out_ack.upgrade() {
Some(ack) => ack,
None => {
debug!("we received the ack for one of the reply packets as we were putting it in the retransmission queue");
return;
}
};
// if this is retransmission for obtaining additional reply surbs,
// we can dip below the storage threshold
let (maybe_reply_surb, _) = if extra_surbs_request {
self.surbs_storage
.get_reply_surb_ignoring_threshold(&recipient_tag)
} else {
self.surbs_storage.get_reply_surb(&recipient_tag)
};
if let Some(reply_surb) = maybe_reply_surb {
match self
.message_handler
.try_prepare_single_reply_chunk_for_sending(reply_surb, ack_ref.fragment_data())
.await
{
Ok(prepared) => {
// drop the ack ref so that controller would not panic on `UpdateTimer` if that task
// got to handle the action before this function terminated (which is very much
// possible if `forward_messages` takes a while)
drop(ack_ref);
self.message_handler
.update_ack_delay(prepared.fragment_identifier, prepared.total_delay);
self.message_handler
.forward_messages(vec![prepared.into()], TransmissionLane::Retransmission)
.await;
}
Err(err) => {
let err = err.return_unused_surbs(&self.surbs_storage, &recipient_tag);
warn!("failed to prepare message for retransmission - {err}");
// we buffer that packet and to try another day
self.buffer_pending_ack(recipient_tag, ack_ref, timed_out_ack);
if self.should_request_more_surbs(&recipient_tag) {
self.request_reply_surbs_for_queue_clearing(recipient_tag)
.await;
}
}
};
} else {
self.buffer_pending_ack(recipient_tag, ack_ref, timed_out_ack);
if self.should_request_more_surbs(&recipient_tag) {
self.request_reply_surbs_for_queue_clearing(recipient_tag)
.await;
}
}
}
// to be honest this doesn't make a lot of sense in the context of `connection_id`,
// it should really be asked per tag
pub(crate) fn handle_lane_queue_length(
&self,
connection_id: ConnectionId,
response_channel: oneshot::Sender<usize>,
) {
// TODO: if we ever have duplicate ids for different senders, it means our rng is super weak
// thus I don't think we have to worry about it?
let lane = TransmissionLane::ConnectionId(connection_id);
for buf in self.surb_senders.values().map(|p| &p.pending_replies) {
if let Some(length) = buf.lane_length(&lane) {
if response_channel.send(length).is_err() {
error!("the requester for lane queue length has dropped the response channel!")
}
return;
}
}
// make sure that if we didn't find that lane, we reply with 0
if response_channel.send(0).is_err() {
error!("the requester for lane queue length has dropped the response channel!")
}
}
// TODO: modify this method to more accurately determine the amount of surbs it needs to request
// it should take into consideration the average latency, sending rate and queue size.
// it should request as many surbs as it takes to saturate its sending rate before next batch arrives
async fn request_reply_surbs_for_queue_clearing(&mut self, target: AnonymousSenderTag) {
trace!("requesting surbs for queue clearing");
let total_queue = self
.surb_senders
.get(&target)
.map(|pending| pending.total_pending() as u32)
.unwrap_or_default();
let min_surbs_buffer = self.config.reply_surbs.minimum_reply_surb_threshold_buffer as u32;
// To proactively request additional surbs, we aim to have a buffer of extra surbs in our
// storage.
let total_queue_with_buffer = total_queue + min_surbs_buffer;
let request_size = min(
self.config.reply_surbs.maximum_reply_surb_request_size,
max(
total_queue_with_buffer,
self.config.reply_surbs.minimum_reply_surb_request_size,
),
);
if let Err(err) = self
.request_additional_reply_surbs(target, request_size)
.await
{
let now = OffsetDateTime::now_utc();
let sender_info = self.get_or_create_surb_sender(&target);
let last_failure = sender_info.reset_last_request_failure(now);
// only log at higher level if it's the first time this error has occurred in a while
if now - last_failure > time::Duration::seconds(30) {
warn!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
} else {
debug!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
}
}
}
pub(crate) async fn inspect_stale_pending_data(&mut self) {
let mut to_request = Vec::new();
let mut to_remove = Vec::new();
let now = OffsetDateTime::now_utc();
for (pending_reply_target, vals) in self.surb_senders.iter_mut() {
// for now recreate old behaviour
let retransmission_buf = &vals.pending_replies;
if retransmission_buf.is_empty() {
continue;
}
let Some(last_received_time) = self
.surbs_storage
.surbs_last_received_at(pending_reply_target)
else {
error!("we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!", retransmission_buf.total_size());
to_remove.push(*pending_reply_target);
continue;
};
let diff = now - last_received_time;
let max_rerequest_wait = self
.config
.reply_surbs
.maximum_reply_surb_rerequest_waiting_period;
let max_drop_wait = self
.config
.reply_surbs
.maximum_reply_surb_drop_waiting_period;
let max_rerequests = self.config.reply_surbs.maximum_reply_surbs_rerequests;
// if we have already requested extra surbs because of the stale entry,
// don't do it again (otherwise we'll get stuck in a constant cycle of requesting more surbs
// if client is offline)
if vals.current_clear_rerequest_counter > max_rerequests {
to_remove.push(*pending_reply_target);
debug!("we have reached the maximum threshold of attempting to request surbs from {pending_reply_target}. dropping the sender");
continue;
}
if diff > max_rerequest_wait {
if diff > max_drop_wait {
to_remove.push(*pending_reply_target)
} else {
debug!("We haven't received any surbs in {} from {pending_reply_target}. Going to explicitly ask for more", humantime::format_duration(diff.unsigned_abs()));
vals.increment_current_clear_rerequest_counter();
to_request.push(*pending_reply_target);
}
}
}
for pending_reply_target in to_request {
self.request_reply_surbs_for_queue_clearing(pending_reply_target)
.await;
self.surbs_storage
.reset_pending_reception(&pending_reply_target)
}
for to_remove in to_remove {
// TODO: in the 'old' version we just removed pending messages,
// not retransmissions, but I think those should follow the same logic.
// if something breaks because of that. I guess here is your explanation, future reader
self.surb_senders.remove(&to_remove);
}
}
pub(crate) async fn check_surb_refresh(&mut self) {
let Some(current_rotation_id) = self.topology_access.current_key_rotation_id().await else {
warn!("failed to retrieve current key rotation id from the network topology");
return;
};
if let SurbRefreshState::WaitingForNextRotation { last_known } = self.surb_refresh_state {
if last_known == current_rotation_id {
trace!("no changes in key rotation id");
} else {
// key rotation actually changed and given the polling rate (1/8th epoch) we should have plenty
// of time to perform the upgrade.
// but wait for one more call before doing this so that the clients could also resync
// their topologies and discover new rotation
self.surb_refresh_state = SurbRefreshState::ScheduledForNextInvocation;
}
return;
}
// here we are in `SurbRefreshState::ScheduledForNextInvocation` state
let mut marked_as_stale = HashMap::new();
// 1. mark all existing surbs we have as possibly stale
for mut map_entry in self.surbs_storage.as_raw_iter_mut() {
let (sender, received) = map_entry.pair_mut();
let num_downgraded = received.downgrade_freshness();
trace!("{sender}: {num_downgraded} downgraded");
if num_downgraded != 0 {
marked_as_stale.insert(*sender, num_downgraded);
}
}
// 2. attempt to re-request the equivalent number of fresh surbs
// TODO PROBLEM: if our request gets lost, we might be in trouble...
// we need some sort of retry mechanism
for (sender, num_to_request) in marked_as_stale {
if self
.request_additional_reply_surbs(sender, num_to_request as u32)
.await
.is_err()
{
warn!("surb refresh request failed")
}
}
self.surb_refresh_state = SurbRefreshState::WaitingForNextRotation {
last_known: current_rotation_id,
};
}
pub(crate) async fn inspect_and_clear_stale_data(&mut self, now: OffsetDateTime) {
// technically we don't know if epoch is stuck, but we're flying in blind here,
// so we have to assume the worst and not purge anything depending on proper epoch progression
let is_epoch_stuck = self
.current_topology_metadata()
.await
.map(|m| self.config.key_rotation.epoch_stuck(m))
.unwrap_or(false);
// expected time of when the CURRENT key rotation has begun
let expected_current_key_rotation_start = self
.config
.key_rotation
.expected_current_key_rotation_start(now);
// expected ID of the CURRENT key rotation
let expected_current_key_rotation = self
.config
.key_rotation
.expected_current_key_rotation_id(now);
// time of the start of one epoch BEFORE the CURRENT rotation has begun
// this indicates the starting time of when packets with the current keys might have been constructed
let prior_epoch_start =
expected_current_key_rotation_start - self.config.key_rotation.epoch_duration;
// time of the start of one epoch AFTER the current rotation has begun
// this indicates the end of transition period and any packets constructed with keys different
// from the current one are definitely invalid
let following_epoch_start =
expected_current_key_rotation_start + self.config.key_rotation.epoch_duration;
// define a closure for validating individual surbs
// (we have to run it twice for different piles)
let basic_surb_retention_logic = |received_surb: &ReceivedReplySurb| {
if is_epoch_stuck {
let diff = now - received_surb.received_at();
return diff < self.config.key_rotation.rotation_lifetime();
}
if received_surb.received_at() < prior_epoch_start {
// it's definitely from previous rotation
return false;
}
let surb_rotation = received_surb.key_rotation();
if surb_rotation.is_unknown() {
// can't do anything, so just retain it
return true;
}
// TODO: will this backfire during transition period where we need surbs to refresh surbs
// and we failed to send a request?
if surb_rotation.is_even() && expected_current_key_rotation % 2 == 1 {
return false;
}
if surb_rotation.is_odd() && expected_current_key_rotation % 2 == 0 {
return false;
}
true
};
// 1. purge full old clients data (this applies to RECEIVER)
self.surbs_storage.retain(|_, received| {
if is_epoch_stuck {
// if epoch is stuck, we can't do much (because we don't know for certain if rotation has advanced)
// apart from the basic check of surbs being received more than maximum lifetime of a rotation
// because at that point we know they must be invalid
let diff = now - received.surbs_last_received_at();
return diff < self.config.key_rotation.rotation_lifetime();
}
// if surbs were received more than 1h before the start of the current rotation,
// they're DEFINITELY invalid.
// if it was up until 1h AFTER the start of the current rotation they MIGHT be valid -
// we don't know for sure, unless the client explicitly attached rotation information
// (which only applies to more recent versions of clients so we can't 100% rely on that)
if received.surbs_last_received_at() < prior_epoch_start {
return false;
}
// 1.1. check individual surbs (same basic logic applies)
received.retain_fresh_surbs(&basic_surb_retention_logic);
// 1.2. check the possibly stale entries
// 1.2.1. check if we're beyond the key rotation transition period,
// if so those surbs are definitely unusable
if now > following_epoch_start {
received.drop_possibly_stale_surbs();
}
// 1.2.2. otherwise continue with the same logic as the fresh ones
received.retain_possibly_stale_surbs(&basic_surb_retention_logic);
// no surbs left, we're not expecting any AND we haven't received anything in a while
// (i.e. sender probably abandoned us)
let max_drop_wait = self
.config
.reply_surbs
.maximum_reply_surb_drop_waiting_period;
let last_received = received.surbs_last_received_at();
let possibly_abandoned = last_received + max_drop_wait < now;
if received.is_empty() && received.pending_reception() == 0 && possibly_abandoned {
return false;
}
true
});
// 1.3 inspect old unavailable receivers to clear any stale data
self.unavailable
.retain(|_, last_reported| now - *last_reported < time::Duration::seconds(30));
}
}
@@ -3,12 +3,12 @@
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
use futures::channel::{mpsc, oneshot};
use log::error;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_task::connections::{ConnectionId, TransmissionLane};
use std::sync::Weak;
use tracing::error;
pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerReceiver) {
let (tx, rx) = mpsc::unbounded();
@@ -1,101 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller::Config;
use nym_client_core_surb_storage::{CombinedReplyStorage, SentReplyKeys, UsedSenderTags};
use nym_crypto::aes::cipher::crypto_common::rand_core::CryptoRng;
use nym_sphinx::addressing::Recipient;
use rand::Rng;
use std::cmp::min;
use std::time::Duration;
use time::OffsetDateTime;
use tracing::{debug, trace, warn};
/// Reply controller responsible for controlling sender-related part
/// of replies, such as checking if any reply keys are stale
pub struct SenderReplyController<R> {
config: Config,
tags_storage: UsedSenderTags,
sent_reply_keys: SentReplyKeys,
message_handler: MessageHandler<R>,
}
impl<R> SenderReplyController<R>
where
R: CryptoRng + Rng,
{
pub(crate) fn new(
config: Config,
storage: &CombinedReplyStorage,
message_handler: MessageHandler<R>,
) -> Self {
SenderReplyController {
config,
tags_storage: storage.tags_storage(),
sent_reply_keys: storage.key_storage(),
message_handler,
}
}
pub(crate) async fn handle_surb_request(&mut self, recipient: Recipient, mut amount: u32) {
// 1. check whether we sent any surbs in the past to this recipient, otherwise
// they have no business in asking for more
if !self.tags_storage.exists(&recipient) {
warn!("{recipient} asked us for reply SURBs even though we never sent them any anonymous messages before!");
return;
}
// 2. check whether the requested amount is within sane range
if amount
> self
.config
.reply_surbs
.maximum_allowed_reply_surb_request_size
{
warn!("The requested reply surb amount is larger than our maximum allowed ({amount} > {}). Lowering it to a more sane value...", self.config.reply_surbs.maximum_allowed_reply_surb_request_size);
amount = self
.config
.reply_surbs
.maximum_allowed_reply_surb_request_size;
}
// 3. construct and send the surbs away
// (send them in smaller batches to make the experience a bit smoother
let mut remaining = amount;
while remaining > 0 {
let to_send = min(remaining, 100);
if let Err(err) = self
.message_handler
.try_send_additional_reply_surbs(
recipient,
to_send,
nym_sphinx::params::PacketType::Mix,
)
.await
{
warn!("failed to send additional surbs to {recipient} - {err}");
} else {
trace!("sent {to_send} reply SURBs to {recipient}");
}
remaining -= to_send;
}
}
pub(crate) fn inspect_and_clear_stale_data(&self, now: OffsetDateTime) {
// check reply keys (this applies to SENDER)
self.sent_reply_keys.retain(|_, reply_key| {
let diff = now - reply_key.sent_at;
if diff > self.config.reply_surbs.maximum_reply_key_age {
let std_diff = Duration::try_from(diff).unwrap_or_default();
let diff_formatted = humantime::format_duration(std_diff);
debug!("it's been {diff_formatted} since we created this reply key. it's probably never going to get used, so we're going to purge it...");
false
} else {
true
}
});
}
}
@@ -93,14 +93,14 @@ impl StatisticsControl {
None,
);
if let Err(err) = self.report_tx.send(report_message).await {
tracing::error!("Failed to report client stats: {err:?}");
log::error!("Failed to report client stats: {:?}", err);
} else {
self.stats.reset();
}
}
async fn run(&mut self) {
tracing::debug!("Started StatisticsControl with graceful shutdown support");
log::debug!("Started StatisticsControl with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut stats_report_interval = tokio_stream::wrappers::IntervalStream::new(
@@ -133,13 +133,13 @@ impl StatisticsControl {
tokio::select! {
biased;
_ = self.task_client.recv() => {
tracing::trace!("StatisticsControl: Received shutdown");
log::trace!("StatisticsControl: Received shutdown");
break;
},
stats_event = self.stats_rx.recv() => match stats_event {
Some(stats_event) => self.stats.handle_event(stats_event),
None => {
tracing::trace!("StatisticsControl: shutting down due to closed stats channel");
log::trace!("StatisticsControl: shutting down due to closed stats channel");
break;
}
},
@@ -161,16 +161,13 @@ impl StatisticsControl {
}
}
}
tracing::debug!("StatisticsControl: Exiting");
log::debug!("StatisticsControl: Exiting");
}
pub(crate) fn start(mut self) {
spawn_future!(
async move {
self.run().await;
},
"StatisticsControl"
)
spawn_future(async move {
self.run().await;
})
}
pub(crate) fn create_and_start(
@@ -2,8 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError, NymTopologyMetadata};
use nym_validator_client::models::KeyRotationId;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -126,7 +125,7 @@ impl TopologyAccessor {
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<'_, NymRouteProvider>> {
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
@@ -135,21 +134,6 @@ impl TopologyAccessor {
}
}
pub async fn current_mixnet_epoch_id(&self) -> Option<u32> {
let route_provider = self.current_route_provider().await?;
Some(route_provider.absolute_epoch_id())
}
pub async fn current_key_rotation_id(&self) -> Option<KeyRotationId> {
let route_provider = self.current_route_provider().await?;
Some(route_provider.current_key_rotation())
}
pub async fn current_metadata(&self) -> Option<NymTopologyMetadata> {
let route_provider = self.current_route_provider().await?;
Some(route_provider.metadata())
}
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
self.inner.controlled_manually.store(true, Ordering::SeqCst);
self.inner.update(Some(new_topology)).await;
@@ -4,11 +4,11 @@
use crate::spawn_future;
pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
use futures::StreamExt;
use log::*;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_task::TaskClient;
use nym_topology::NymTopologyError;
use std::time::Duration;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
@@ -20,7 +20,7 @@ mod accessor;
pub mod nym_api_provider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
pub use nym_topology::provider_trait::TopologyProvider;
// TODO: move it to config later
const MAX_FAILURE_COUNT: usize = 10;
@@ -145,39 +145,36 @@ impl TopologyRefresher {
}
pub fn start(mut self) {
spawn_future!(
async move {
debug!("Started TopologyRefresher with graceful shutdown support");
spawn_future(async move {
debug!("Started TopologyRefresher with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(self.refresh_rate),
);
#[cfg(not(target_arch = "wasm32"))]
let mut interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
self.refresh_rate,
));
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
while !self.task_client.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = self.task_client.recv() => {
tracing::trace!("TopologyRefresher: Received shutdown");
},
}
while !self.task_client.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = self.task_client.recv() => {
log::trace!("TopologyRefresher: Received shutdown");
},
}
self.task_client.recv_timeout().await;
tracing::debug!("TopologyRefresher: Exiting");
},
"TopologyRefresher"
)
}
self.task_client.recv_timeout().await;
log::debug!("TopologyRefresher: Exiting");
})
}
}
@@ -2,12 +2,13 @@
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
use nym_topology::NymTopology;
use log::{debug, error, warn};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::{NymTopology, NymTopologyMetadata};
use nym_validator_client::UserAgent;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
use tracing::{debug, error, warn};
use url::Url;
#[derive(Debug)]
@@ -48,10 +49,18 @@ impl NymApiTopologyProvider {
pub fn new(
config: impl Into<Config>,
mut nym_api_urls: Vec<Url>,
mut validator_client: nym_validator_client::client::NymApiClient,
user_agent: Option<UserAgent>,
) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
validator_client.change_nym_api(nym_api_urls[0].clone());
let validator_client = if let Some(user_agent) = user_agent {
nym_validator_client::client::NymApiClient::new_with_user_agent(
nym_api_urls[0].clone(),
user_agent,
)
} else {
nym_validator_client::client::NymApiClient::new(nym_api_urls[0].clone())
};
NymApiTopologyProvider {
config: config.into(),
@@ -99,8 +108,12 @@ impl NymApiTopologyProvider {
.filter(|n| n.performance.round_to_integer() >= self.config.min_node_performance())
.collect::<Vec<_>>();
NymTopology::new(metadata.to_topology_metadata(), rewarded_set, Vec::new())
.with_skimmed_nodes(&nodes_filtered)
NymTopology::new(
NymTopologyMetadata::new(metadata.rotation_id, metadata.absolute_epoch_id),
rewarded_set,
Vec::new(),
)
.with_skimmed_nodes(&nodes_filtered)
} else {
// if we're not using extended topology, we're only getting active set mixnodes and gateways
@@ -111,7 +124,7 @@ impl NymApiTopologyProvider {
// TODO: we really should be getting ACTIVE gateways only
let gateways_fut = self
.validator_client
.get_all_basic_entry_assigned_nodes_with_metadata();
.get_all_basic_entry_assigned_nodes_v2();
let (rewarded_set, mixnodes_res, gateways_res) =
futures::try_join!(rewarded_set_fut, mixnodes_fut, gateways_fut)
@@ -123,7 +136,7 @@ impl NymApiTopologyProvider {
let metadata = mixnodes_res.metadata;
let mixnodes = mixnodes_res.nodes;
if !gateways_res.metadata.consistency_check(&metadata) {
if gateways_res.metadata != metadata {
warn!("inconsistent nodes metadata between mixnodes and gateways calls! {metadata:?} and {:?}", gateways_res.metadata);
return None;
}
@@ -148,8 +161,12 @@ impl NymApiTopologyProvider {
}
}
NymTopology::new(metadata.to_topology_metadata(), rewarded_set, Vec::new())
.with_skimmed_nodes(&nodes)
NymTopology::new(
NymTopologyMetadata::new(metadata.rotation_id, metadata.absolute_epoch_id),
rewarded_set,
Vec::new(),
)
.with_skimmed_nodes(&nodes)
};
if !topology.is_minimally_routable() {
@@ -36,18 +36,11 @@ impl SizedData for Fragment {
}
}
#[derive(Default)]
pub(crate) struct TransmissionBuffer<T> {
buffer: HashMap<TransmissionLane, LaneBufferEntry<T>>,
}
impl<T> Default for TransmissionBuffer<T> {
fn default() -> Self {
TransmissionBuffer {
buffer: HashMap::new(),
}
}
}
impl<T> TransmissionBuffer<T> {
pub(crate) fn new() -> Self {
TransmissionBuffer {
@@ -218,7 +211,7 @@ impl<T> TransmissionBuffer<T> {
};
let msg = self.pop_front_from_lane(&lane)?;
tracing::trace!("picking to send from lane: {lane:?}");
log::trace!("picking to send from lane: {:?}", lane);
Some((lane, msg))
}
+2 -25
View File
@@ -6,10 +6,7 @@ use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::nym_api::error::NymAPIError;
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::ValidatorClientError;
use rand::distributions::WeightedError;
use std::error::Error;
use std::path::PathBuf;
@@ -55,15 +52,7 @@ pub enum ClientCoreError {
#[error("list of nym apis is empty")]
ListOfNymApisIsEmpty,
#[error("failed to resolve a query to nym API: {source}")]
NymApiQueryFailure {
#[from]
source: NymAPIError,
},
#[error(
"the current network topology seem to be insufficient to route any packets through:\n\t{0}"
)]
#[error("the current network topology seem to be insufficient to route any packets through")]
InsufficientNetworkTopology(#[from] NymTopologyError),
#[error("experienced a failure with our reply surb persistent storage: {source}")]
@@ -232,19 +221,7 @@ pub enum ClientCoreError {
UnexpectedKeyUpgrade { gateway_id: String },
#[error("failed to derive keys from master key")]
HkdfDerivationError,
#[error("missing url for constructing RPC client")]
RpcClientMissingUrl,
#[error("provided nym network details were malformed: {source}")]
InvalidNetworkDetails { source: NyxdError },
#[error("failed to construct RPC client: {source}")]
RpcClientCreationFailure { source: NyxdError },
#[error("failed to select valid gateway due to incomputable latency")]
GatewaySelectionFailure { source: WeightedError },
HkdfDerivationError {},
}
impl From<tungstenite::Error> for ClientCoreError {
+12 -15
View File
@@ -4,6 +4,7 @@
use crate::error::ClientCoreError;
use crate::init::types::RegistrationResult;
use futures::{SinkExt, StreamExt};
use log::{debug, info, trace, warn};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::GatewayClient;
use nym_topology::node::RoutingNode;
@@ -13,7 +14,6 @@ use rand::{seq::SliceRandom, Rng};
#[cfg(unix)]
use std::os::fd::RawFd;
use std::{sync::Arc, time::Duration};
use tracing::{debug, info, trace, warn};
use tungstenite::Message;
use url::Url;
@@ -105,15 +105,12 @@ pub async fn gateways_for_init<R: Rng>(
nym_validator_client::client::NymApiClient::new(nym_api.clone())
};
tracing::debug!("Fetching list of gateways from: {nym_api}");
log::debug!("Fetching list of gateways from: {nym_api}");
let gateways = client
.get_all_basic_entry_assigned_nodes_with_metadata()
.await?
.nodes;
let gateways = client.get_all_basic_entry_assigned_nodes_v2().await?.nodes;
info!("nym api reports {} gateways", gateways.len());
tracing::trace!("Gateways: {gateways:#?}");
log::trace!("Gateways: {:#?}", gateways);
// filter out gateways below minimum performance and ones that could operate as a mixnode
// (we don't want instability)
@@ -123,10 +120,10 @@ pub async fn gateways_for_init<R: Rng>(
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
.filter_map(|gateway| gateway.try_into().ok())
.collect::<Vec<_>>();
tracing::debug!("After checking validity: {}", valid_gateways.len());
tracing::trace!("Valid gateways: {valid_gateways:#?}");
log::debug!("After checking validity: {}", valid_gateways.len());
log::trace!("Valid gateways: {:#?}", valid_gateways);
tracing::info!(
log::info!(
"and {} after validity and performance filtering",
valid_gateways.len()
);
@@ -148,7 +145,7 @@ async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure)
}
async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<'_, G>, ClientCoreError>
async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, ClientCoreError>
where
G: ConnectableGateway,
{
@@ -245,7 +242,7 @@ pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
let gateways_with_latency = gateways_with_latency.lock().await;
let chosen = gateways_with_latency
.choose_weighted(rng, |item| 1. / item.latency.as_secs_f32())
.map_err(|source| ClientCoreError::GatewaySelectionFailure { source })?;
.expect("invalid selection weight!");
info!(
"chose gateway {} with average latency of {:?}",
@@ -289,7 +286,7 @@ pub(super) fn get_specified_gateway(
gateways: &[RoutingNode],
must_use_tls: bool,
) -> Result<RoutingNode, ClientCoreError> {
tracing::debug!("Requesting specified gateway: {gateway_identity}");
log::debug!("Requesting specified gateway: {}", gateway_identity);
let user_gateway = ed25519::PublicKey::from_base58_string(gateway_identity)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
@@ -329,7 +326,7 @@ pub(super) async fn register_with_gateway(
);
gateway_client.establish_connection().await.map_err(|err| {
tracing::warn!("Failed to establish connection with gateway!");
log::warn!("Failed to establish connection with gateway!");
ClientCoreError::GatewayClientError {
gateway_id: gateway_id.to_base58_string(),
source: Box::new(err),
@@ -339,7 +336,7 @@ pub(super) async fn register_with_gateway(
.perform_initial_authentication()
.await
.map_err(|err| {
tracing::warn!("Failed to register with the gateway {gateway_id}: {err}");
log::warn!("Failed to register with the gateway {gateway_id}: {err}");
ClientCoreError::GatewayClientError {
gateway_id: gateway_id.to_base58_string(),
source: Box::new(err),
+5 -5
View File
@@ -63,7 +63,7 @@ where
K::StorageError: Send + Sync + 'static,
D::StorageError: Send + Sync + 'static,
{
tracing::trace!("Setting up new gateway");
log::trace!("Setting up new gateway");
// if we're setting up new gateway, we must have had generated long-term client keys before
let client_keys = load_client_keys(key_store).await?;
@@ -202,10 +202,10 @@ where
K::StorageError: Send + Sync + 'static,
D::StorageError: Send + Sync + 'static,
{
tracing::debug!("Setting up gateway");
log::debug!("Setting up gateway");
match setup {
GatewaySetup::MustLoad { gateway_id } => {
tracing::debug!("GatewaySetup::MustLoad with id: {gateway_id:?}");
log::debug!("GatewaySetup::MustLoad with id: {gateway_id:?}");
use_loaded_gateway_details(key_store, details_store, gateway_id).await
}
GatewaySetup::New {
@@ -214,7 +214,7 @@ where
#[cfg(unix)]
connection_fd_callback,
} => {
tracing::debug!("GatewaySetup::New with spec: {specification:?}");
log::debug!("GatewaySetup::New with spec: {specification:?}");
setup_new_gateway(
key_store,
details_store,
@@ -230,7 +230,7 @@ where
gateway_details,
client_keys: managed_keys,
} => {
tracing::debug!("GatewaySetup::ReuseConnection");
log::debug!("GatewaySetup::ReuseConnection");
Ok(reuse_gateway_connection(
*authenticated_ephemeral_client,
*gateway_details,
+2 -38
View File
@@ -18,54 +18,18 @@ pub use nym_topology::{
};
#[cfg(target_arch = "wasm32")]
pub fn spawn_future<F>(future: F)
pub(crate) fn spawn_future<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(future);
}
// TODO: expose similar API to the rest of the codebase,
// perhaps with some simple trait for a task to define its name
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn_future<F>(future: F)
pub(crate) fn spawn_future<F>(future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(future);
}
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn_named_future<F>(future: F, name: &str)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
cfg_if::cfg_if! {if #[cfg(tokio_unstable)] {
#[allow(clippy::expect_used)]
tokio::task::Builder::new().name(name).spawn(future).expect("failed to spawn future");
} else {
let _ = name;
tracing::debug!(r#"the underlying binary hasn't been built with `RUSTFLAGS="--cfg tokio_unstable"` - the future naming won't do anything"#);
spawn_future(future);
}}
}
#[macro_export]
macro_rules! spawn_future {
($future:expr) => {{
$crate::spawn_future($future)
}};
($future:expr, $name:expr) => {{
cfg_if::cfg_if! {if #[cfg(not(target_arch = "wasm32"))] {
$crate::spawn_named_future($future, $name)
} else {
let _ = $name;
$crate::spawn_future($future)
}}
}};
}
+2 -2
View File
@@ -9,7 +9,7 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
dashmap.workspace = true
tracing.workspace = true
log.workspace = true
thiserror.workspace = true
time.workspace = true
@@ -23,7 +23,7 @@ features = ["fs"]
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
workspace = true
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"]
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"]
optional = true
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx-pool-guard]
@@ -1,81 +0,0 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
-- change `previous_flush_timestamp` unix timestamp to `previous_flush` timestamp
CREATE TABLE status_new
(
flush_in_progress INTEGER NOT NULL,
previous_flush TIMESTAMP WITHOUT TIME ZONE NOT NULL,
client_in_use INTEGER NOT NULL
);
INSERT INTO status_new (flush_in_progress, previous_flush, client_in_use)
SELECT flush_in_progress,
datetime(previous_flush_timestamp, 'unixepoch') AS previous_flush,
client_in_use
FROM status;
DROP TABLE status;
ALTER TABLE status_new
RENAME TO status;
-- change `sent_at_timestamp` unix timestamp to `sent_at` timestamp
CREATE TABLE reply_key_new
(
key_digest BLOB NOT NULL UNIQUE,
reply_key BLOB NOT NULL UNIQUE,
sent_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
INSERT INTO reply_key_new (key_digest, reply_key, sent_at)
SELECT key_digest,
reply_key,
datetime(sent_at_timestamp, 'unixepoch') AS sent_at
FROM reply_key;
DROP TABLE reply_key;
ALTER TABLE reply_key_new
RENAME TO reply_key;
-- change `last_sent_timestamp` unix timestamp to `sent_at` last_sent
CREATE TABLE reply_surb_sender_new
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
last_sent TIMESTAMP WITHOUT TIME ZONE NOT NULL,
tag BLOB NOT NULL UNIQUE
);
INSERT INTO reply_surb_sender_new (id, last_sent, tag)
SELECT id,
datetime(last_sent_timestamp, 'unixepoch') AS last_sent,
tag
FROM reply_surb_sender;
-- recreate `reply_surb` table due to foreign key constraint
CREATE TABLE reply_surb_new
(
reply_surb_sender_id INTEGER NOT NULL,
reply_surb BLOB NOT NULL,
encoded_key_rotation TINYINT NOT NULL,
FOREIGN KEY (reply_surb_sender_id) REFERENCES reply_surb_sender_new (id)
);
INSERT INTO reply_surb_new
SELECT *
FROM reply_surb;
DROP TABLE reply_surb;
ALTER TABLE reply_surb_new
RENAME TO reply_surb;
DROP TABLE reply_surb_sender;
ALTER TABLE reply_surb_sender_new
RENAME TO reply_surb_sender;
@@ -1,12 +0,0 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
-- don't persist sender_tag in the DB. instead generate fresh one on each restart
-- this will:
-- A) further help against correlation attacks
-- B) realistically after client restarts, we might be in new key rotation anyway meaning receiver would have to start
-- "from scratch" with surbs
DROP TABLE sender_tag;
@@ -4,7 +4,6 @@
use crate::backend::Empty;
use crate::{CombinedReplyStorage, ReplyStorageBackend};
use async_trait::async_trait;
use time::OffsetDateTime;
// well, right now we don't have the browser storage : (
// so we keep everything in memory
@@ -39,10 +38,7 @@ impl ReplyStorageBackend for Backend {
self.empty.init_fresh(fresh).await
}
async fn load_surb_storage(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<CombinedReplyStorage, Self::StorageError> {
self.empty.load_surb_storage(surb_freshness_cutoff).await
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError> {
self.empty.load_surb_storage().await
}
}
@@ -3,15 +3,17 @@
use crate::backend::fs_backend::{
error::StorageError,
models::{ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSurbSender},
models::{
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag,
StoredSurbSender,
},
};
use log::{error, info};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use time::OffsetDateTime;
use tracing::{error, info};
use sqlx_pool_guard::SqlitePoolGuard;
@@ -37,7 +39,7 @@ impl StorageManager {
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.filename(&database_path)
.create_if_missing(fresh)
.disable_statement_logging();
@@ -49,7 +51,8 @@ impl StorageManager {
}
};
let connection_pool = SqlitePoolGuard::new(connection_pool);
let connection_pool =
SqlitePoolGuard::new(database_path.as_ref().to_path_buf(), connection_pool);
if let Err(err) = sqlx::migrate!("./fs_surbs_migrations")
.run(&*connection_pool)
@@ -78,11 +81,9 @@ impl StorageManager {
}
pub async fn create_status_table(&self) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO status(flush_in_progress, previous_flush, client_in_use) VALUES (0, 0, 1)"
)
.execute(&*self.connection_pool)
.await?;
sqlx::query!("INSERT INTO status(flush_in_progress, previous_flush_timestamp, client_in_use) VALUES (0, 0, 1)")
.execute(&*self.connection_pool)
.await?;
Ok(())
}
@@ -93,18 +94,18 @@ impl StorageManager {
.map(|r| r.flush_in_progress > 0)
}
pub async fn set_previous_flush(&self, timestamp: OffsetDateTime) -> Result<(), sqlx::Error> {
sqlx::query!("UPDATE status SET previous_flush = ?", timestamp)
pub async fn set_previous_flush_timestamp(&self, timestamp: i64) -> Result<(), sqlx::Error> {
sqlx::query!("UPDATE status SET previous_flush_timestamp = ?", timestamp)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_previous_flush_time(&self) -> Result<OffsetDateTime, sqlx::Error> {
sqlx::query!(r#"SELECT previous_flush AS "previous_flush: OffsetDateTime" FROM status"#)
pub async fn get_previous_flush_timestamp(&self) -> Result<i64, sqlx::Error> {
sqlx::query!("SELECT previous_flush_timestamp FROM status;")
.fetch_one(&*self.connection_pool)
.await
.map(|r| r.previous_flush)
.map(|r| r.previous_flush_timestamp)
}
pub async fn set_flush_status(&self, in_progress: bool) -> Result<(), sqlx::Error> {
@@ -130,6 +131,32 @@ impl StorageManager {
Ok(())
}
pub async fn delete_all_tags(&self) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM sender_tag;")
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn get_tags(&self) -> Result<Vec<StoredSenderTag>, sqlx::Error> {
sqlx::query_as!(StoredSenderTag, "SELECT * FROM sender_tag;",)
.fetch_all(&*self.connection_pool)
.await
}
pub async fn insert_tag(&self, stored_tag: StoredSenderTag) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO sender_tag(recipient, tag) VALUES (?, ?);
"#,
stored_tag.recipient,
stored_tag.tag
)
.execute(&*self.connection_pool)
.await?;
Ok(())
}
pub async fn delete_all_reply_keys(&self) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM reply_key;")
.execute(&*self.connection_pool)
@@ -138,7 +165,7 @@ impl StorageManager {
}
pub async fn get_reply_keys(&self) -> Result<Vec<StoredReplyKey>, sqlx::Error> {
sqlx::query_as("SELECT * FROM reply_key;")
sqlx::query_as!(StoredReplyKey, "SELECT * FROM reply_key;",)
.fetch_all(&*self.connection_pool)
.await
}
@@ -149,11 +176,11 @@ impl StorageManager {
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO reply_key(key_digest, reply_key, sent_at) VALUES (?, ?, ?);
INSERT INTO reply_key(key_digest, reply_key, sent_at_timestamp) VALUES (?, ?, ?);
"#,
stored_reply_key.key_digest,
stored_reply_key.reply_key,
stored_reply_key.sent_at
stored_reply_key.sent_at_timestamp
)
.execute(&*self.connection_pool)
.await?;
@@ -161,7 +188,7 @@ impl StorageManager {
}
pub async fn get_surb_senders(&self) -> Result<Vec<StoredSurbSender>, sqlx::Error> {
sqlx::query_as("SELECT * FROM reply_surb_sender;")
sqlx::query_as!(StoredSurbSender, "SELECT * FROM reply_surb_sender;",)
.fetch_all(&*self.connection_pool)
.await
}
@@ -172,10 +199,10 @@ impl StorageManager {
) -> Result<i64, sqlx::Error> {
let id = sqlx::query!(
r#"
INSERT INTO reply_surb_sender(tag, last_sent) VALUES (?, ?);
INSERT INTO reply_surb_sender(tag, last_sent_timestamp) VALUES (?, ?);
"#,
stored_surb_sender.tag,
stored_surb_sender.last_sent
stored_surb_sender.last_sent_timestamp
)
.execute(&*self.connection_pool)
.await?
@@ -190,7 +217,7 @@ impl StorageManager {
sqlx::query_as!(
StoredReplySurb,
r#"
SELECT reply_surb_sender_id, reply_surb, encoded_key_rotation as "encoded_key_rotation: u8" FROM reply_surb
SELECT reply_surb_sender_id, reply_surb, encoded_key_rotation as "encoded_key_rotation: u8" FROM reply_surb
WHERE reply_surb_sender_id = ?
"#,
sender_id
@@ -4,16 +4,20 @@
use crate::{
backend::fs_backend::{
manager::StorageManager,
models::{ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSurbSender},
models::{
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag,
StoredSurbSender,
},
},
surb_storage::ReceivedReplySurbs,
CombinedReplyStorage, ReceivedReplySurbsMap, ReplyStorageBackend, SentReplyKeys,
UsedSenderTags,
};
use async_trait::async_trait;
use log::{debug, error, info, warn};
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use std::path::{Path, PathBuf};
use time::OffsetDateTime;
use tracing::{error, info, warn};
pub use self::error::StorageError;
@@ -53,7 +57,10 @@ impl Backend {
}
}
pub async fn try_load<P: AsRef<Path>>(database_path: P) -> Result<Self, StorageError> {
pub async fn try_load<P: AsRef<Path>>(
database_path: P,
fresh_sender_tags: bool,
) -> Result<Self, StorageError> {
let owned_path: PathBuf = database_path.as_ref().into();
if owned_path.file_name().is_none() {
return Err(StorageError::DatabasePathWithoutFilename {
@@ -62,7 +69,7 @@ impl Backend {
}
let manager = StorageManager::init(database_path, false).await?;
match Self::try_load_inner(&manager).await {
match Self::try_load_inner(&manager, fresh_sender_tags).await {
Ok(()) => Ok(Backend {
temporary_old_path: None,
database_path: owned_path,
@@ -80,15 +87,18 @@ impl Backend {
self.manager.close_pool().await
}
async fn try_load_inner(manager: &StorageManager) -> Result<(), StorageError> {
async fn try_load_inner(
manager: &StorageManager,
fresh_sender_tags: bool,
) -> Result<(), StorageError> {
// the database flush wasn't fully finished and thus the data is in inconsistent state
// (we don't really know what's properly saved or what's not)
if manager.get_flush_status().await? {
return Err(StorageError::IncompleteDataFlush);
}
let last_flush = manager.get_previous_flush_time().await?;
if last_flush == OffsetDateTime::UNIX_EPOCH {
let last_flush_timestamp = manager.get_previous_flush_timestamp().await?;
if last_flush_timestamp == 0 {
// either this client has been running since 1970 or the flush failed
return Err(StorageError::IncompleteDataFlush);
}
@@ -108,6 +118,15 @@ impl Backend {
return Err(err.into());
}
let last_flush = match OffsetDateTime::from_unix_timestamp(last_flush_timestamp) {
Ok(last_flush) => last_flush,
Err(err) => {
return Err(StorageError::CorruptedData {
details: format!("failed to parse stored timestamp - {err}"),
});
}
};
// in theory clients can use our reply surbs whenever they want, even a year in the future
// (assuming no key rotation has happened)
// but the way it's currently coded, everyone will purge old data
@@ -125,6 +144,14 @@ impl Backend {
manager.delete_all_reply_keys().await?;
}
if days > 2 {
info!("it's been over {days} days and {hours} hours since we last used our data store. our used sender tags are already outdated - we're going to purge them now.");
manager.delete_all_tags().await?;
} else if fresh_sender_tags {
debug!("starting with fresh sender tags");
manager.delete_all_tags().await?;
}
Ok(())
}
@@ -169,7 +196,7 @@ impl Backend {
async fn end_storage_flush(&self) -> Result<(), StorageError> {
self.manager
.set_previous_flush(OffsetDateTime::now_utc())
.set_previous_flush_timestamp(OffsetDateTime::now_utc().unix_timestamp())
.await?;
Ok(self.manager.set_flush_status(false).await?)
}
@@ -182,6 +209,29 @@ impl Backend {
Ok(self.manager.set_client_in_use_status(false).await?)
}
async fn get_stored_tags(&self) -> Result<UsedSenderTags, StorageError> {
let stored = self.manager.get_tags().await?;
// stop at the first instance of corruption. if even a single entry is malformed,
// something weird has happened and we can't trust the rest of the data
let raw = stored
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, _>>()?;
Ok(UsedSenderTags::from_raw(raw))
}
async fn dump_sender_tags(&self, tags: &UsedSenderTags) -> Result<(), StorageError> {
for map_ref in tags.as_raw_iter() {
let (recipient, tag) = map_ref.pair();
self.manager
.insert_tag(StoredSenderTag::new(*recipient, *tag))
.await?;
}
Ok(())
}
async fn get_stored_reply_keys(&self) -> Result<SentReplyKeys, StorageError> {
let stored = self.manager.get_reply_keys().await?;
@@ -205,17 +255,14 @@ impl Backend {
Ok(())
}
async fn get_stored_reply_surbs(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<ReceivedReplySurbsMap, StorageError> {
async fn get_stored_reply_surbs(&self) -> Result<ReceivedReplySurbsMap, StorageError> {
let surb_senders = self.manager.get_surb_senders().await?;
let metadata = self.get_reply_surb_storage_metadata().await?;
let mut received_surbs = Vec::with_capacity(surb_senders.len());
for sender in surb_senders {
let sender_id = sender.id;
let (sender_tag, surbs_last_received_at): (AnonymousSenderTag, OffsetDateTime) =
let (sender_tag, surbs_last_received_at_timestamp): (AnonymousSenderTag, i64) =
sender.try_into()?;
let stored_surbs = self
.manager
@@ -227,17 +274,15 @@ impl Backend {
received_surbs.push((
sender_tag,
ReceivedReplySurbs::new_retrieved(stored_surbs, surbs_last_received_at),
ReceivedReplySurbs::new_retrieved(stored_surbs, surbs_last_received_at_timestamp),
))
}
let received_surbs = ReceivedReplySurbsMap::from_raw(
Ok(ReceivedReplySurbsMap::from_raw(
metadata.min_reply_surb_threshold as usize,
metadata.max_reply_surb_threshold as usize,
received_surbs,
);
received_surbs.drop_stale_loaded_surbs(surb_freshness_cutoff);
Ok(received_surbs)
))
}
async fn dump_reply_surbs(
@@ -259,14 +304,6 @@ impl Backend {
.insert_reply_surb(StoredReplySurb::new(sender_id, reply_surb))
.await?
}
// TODO: should we also retain the stale ones?
if received_surbs.possibly_stale_left() != 0 {
warn!(
"dropping {} possibly stale surbs for {tag}",
received_surbs.possibly_stale_left()
);
}
}
Ok(())
}
@@ -310,6 +347,7 @@ impl ReplyStorageBackend for Backend {
self.rotate().await?;
self.start_storage_flush().await?;
self.dump_sender_tags(storage.tags_storage_ref()).await?;
self.dump_sender_reply_keys(storage.key_storage_ref())
.await?;
let surbs_ref = storage.surbs_storage_ref();
@@ -326,14 +364,12 @@ impl ReplyStorageBackend for Backend {
.await
}
async fn load_surb_storage(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<CombinedReplyStorage, Self::StorageError> {
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError> {
let reply_keys = self.get_stored_reply_keys().await?;
let reply_surbs = self.get_stored_reply_surbs(surb_freshness_cutoff).await?;
let tags = self.get_stored_tags().await?;
let reply_surbs = self.get_stored_reply_surbs().await?;
Ok(CombinedReplyStorage::load(reply_keys, reply_surbs))
Ok(CombinedReplyStorage::load(reply_keys, reply_surbs, tags))
}
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
@@ -3,7 +3,6 @@
use crate::backend::fs_backend::error::StorageError;
use crate::key_storage::UsedReplyKey;
use crate::ReceivedReplySurb;
use nym_crypto::generic_array::typenum::Unsigned;
use nym_crypto::Digest;
use nym_sphinx::addressing::clients::{Recipient, RecipientBytes};
@@ -13,8 +12,6 @@ use nym_sphinx::anonymous_replies::{
ReplySurb, ReplySurbWithKeyRotation, SurbEncryptionKey, SurbEncryptionKeySize,
};
use nym_sphinx::params::{ReplySurbKeyDigestAlgorithm, SphinxKeyRotation};
use sqlx::FromRow;
use time::OffsetDateTime;
#[derive(Debug, Clone)]
pub struct StoredSenderTag {
@@ -61,11 +58,11 @@ impl TryFrom<StoredSenderTag> for (RecipientBytes, AnonymousSenderTag) {
}
}
#[derive(Debug, Clone, FromRow)]
#[derive(Debug, Clone)]
pub struct StoredReplyKey {
pub key_digest: Vec<u8>,
pub reply_key: Vec<u8>,
pub sent_at: OffsetDateTime,
pub sent_at_timestamp: i64,
}
impl StoredReplyKey {
@@ -73,7 +70,7 @@ impl StoredReplyKey {
StoredReplyKey {
key_digest: key_digest.to_vec(),
reply_key: (*reply_key).to_bytes(),
sent_at: reply_key.sent_at,
sent_at_timestamp: reply_key.sent_at_timestamp,
}
}
}
@@ -103,30 +100,32 @@ impl TryFrom<StoredReplyKey> for (EncryptionKeyDigest, UsedReplyKey) {
});
};
Ok((digest, UsedReplyKey::new(reply_key, value.sent_at)))
Ok((
digest,
UsedReplyKey::new(reply_key, value.sent_at_timestamp),
))
}
}
#[derive(FromRow)]
pub struct StoredSurbSender {
pub id: i64,
pub tag: Vec<u8>,
pub last_sent: OffsetDateTime,
pub last_sent_timestamp: i64,
}
impl StoredSurbSender {
pub fn new(tag: AnonymousSenderTag, last_sent: OffsetDateTime) -> Self {
pub fn new(tag: AnonymousSenderTag, last_sent_timestamp: i64) -> Self {
StoredSurbSender {
// for the purposes of STORING data,
// we ignore that field anyway
id: 0,
tag: tag.to_bytes().to_vec(),
last_sent,
last_sent_timestamp,
}
}
}
impl TryFrom<StoredSurbSender> for (AnonymousSenderTag, OffsetDateTime) {
impl TryFrom<StoredSurbSender> for (AnonymousSenderTag, i64) {
type Error = StorageError;
fn try_from(value: StoredSurbSender) -> Result<Self, Self::Error> {
@@ -141,7 +140,7 @@ impl TryFrom<StoredSurbSender> for (AnonymousSenderTag, OffsetDateTime) {
Ok((
AnonymousSenderTag::from_bytes(sender_tag_bytes),
value.last_sent,
value.last_sent_timestamp,
))
}
}
@@ -156,10 +155,10 @@ pub struct StoredReplySurb {
}
impl StoredReplySurb {
pub fn new(reply_surb_sender_id: i64, reply_surb: &ReceivedReplySurb) -> Self {
pub fn new(reply_surb_sender_id: i64, reply_surb: &ReplySurbWithKeyRotation) -> Self {
StoredReplySurb {
reply_surb_sender_id,
reply_surb: reply_surb.surb.inner_reply_surb().to_bytes(),
reply_surb: reply_surb.inner_reply_surb().to_bytes(),
encoded_key_rotation: reply_surb.key_rotation() as u8,
}
}
@@ -5,7 +5,6 @@ use crate::CombinedReplyStorage;
use async_trait::async_trait;
use std::error::Error;
use thiserror::Error;
use time::OffsetDateTime;
// TODO: this should now live inside our wasm/client-core
pub mod browser_backend;
@@ -54,10 +53,7 @@ impl ReplyStorageBackend for Empty {
Ok(())
}
async fn load_surb_storage(
&self,
_: OffsetDateTime,
) -> Result<CombinedReplyStorage, Self::StorageError> {
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError> {
Ok(CombinedReplyStorage::new(
self.min_surb_threshold,
self.max_surb_threshold,
@@ -84,10 +80,7 @@ pub trait ReplyStorageBackend: Sized {
/// (such as surb thresholds)
async fn init_fresh(&mut self, fresh: &CombinedReplyStorage) -> Result<(), Self::StorageError>;
async fn load_surb_storage(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<CombinedReplyStorage, Self::StorageError>;
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError>;
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
Ok(())
@@ -25,11 +25,12 @@ impl CombinedReplyStorage {
pub fn load(
sent_reply_keys: SentReplyKeys,
received_reply_surbs: ReceivedReplySurbsMap,
used_tags: UsedSenderTags,
) -> Self {
CombinedReplyStorage {
sent_reply_keys,
received_reply_surbs,
used_tags: UsedSenderTags::new(),
used_tags,
}
}
@@ -47,12 +47,8 @@ impl SentReplyKeys {
self.inner.data.iter()
}
pub fn retain(&self, f: impl FnMut(&EncryptionKeyDigest, &mut UsedReplyKey) -> bool) {
self.inner.data.retain(f);
}
pub fn insert_multiple(&self, keys: Vec<SurbEncryptionKey>) {
let now = OffsetDateTime::now_utc();
let now = OffsetDateTime::now_utc().unix_timestamp();
for key in keys {
self.insert(UsedReplyKey::new(key, now))
}
@@ -75,12 +71,15 @@ impl SentReplyKeys {
pub struct UsedReplyKey {
key: SurbEncryptionKey,
// the purpose of this field is to perform invalidation at relatively very long intervals
pub sent_at: OffsetDateTime,
pub sent_at_timestamp: i64,
}
impl UsedReplyKey {
pub(crate) fn new(key: SurbEncryptionKey, sent_at: OffsetDateTime) -> Self {
UsedReplyKey { key, sent_at }
pub(crate) fn new(key: SurbEncryptionKey, sent_at_timestamp: i64) -> Self {
UsedReplyKey {
key,
sent_at_timestamp,
}
}
}
+4 -8
View File
@@ -4,9 +4,8 @@
pub use backend::*;
pub use combined::CombinedReplyStorage;
pub use key_storage::SentReplyKeys;
pub use surb_storage::{ReceivedReplySurb, ReceivedReplySurbsMap, RetrievedReplySurb};
pub use surb_storage::ReceivedReplySurbsMap;
pub use tag_storage::UsedSenderTags;
use time::OffsetDateTime;
mod backend;
mod combined;
@@ -30,11 +29,8 @@ where
PersistentReplyStorage { backend }
}
pub async fn load_state_from_backend(
&self,
surb_freshness_cutoff: OffsetDateTime,
) -> Result<CombinedReplyStorage, T::StorageError> {
self.backend.load_surb_storage(surb_freshness_cutoff).await
pub async fn load_state_from_backend(&self) -> Result<CombinedReplyStorage, T::StorageError> {
self.backend.load_surb_storage().await
}
pub async fn flush_on_shutdown(
@@ -42,7 +38,7 @@ where
mem_state: CombinedReplyStorage,
mut shutdown: nym_task::TaskClient,
) {
use tracing::{debug, error, info};
use log::{debug, error, info};
debug!("Started PersistentReplyStorage");
if let Err(err) = self.backend.start_storage_session().await {
@@ -1,45 +1,15 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use dashmap::iter::{Iter, IterMut};
use dashmap::iter::Iter;
use dashmap::DashMap;
use log::trace;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::params::SphinxKeyRotation;
use std::cmp::min;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use time::OffsetDateTime;
use tracing::{error, info, trace};
#[derive(Debug)]
pub struct RetrievedReplySurb {
pub(crate) reply_surb: ReceivedReplySurb,
pub(crate) stale_pile: bool,
}
impl RetrievedReplySurb {
pub(crate) fn new_fresh(reply_surb: ReceivedReplySurb) -> Self {
RetrievedReplySurb {
reply_surb,
stale_pile: false,
}
}
pub(crate) fn new_stale(reply_surb: ReceivedReplySurb) -> Self {
RetrievedReplySurb {
reply_surb,
stale_pile: true,
}
}
}
impl From<RetrievedReplySurb> for ReplySurbWithKeyRotation {
fn from(retrieved: RetrievedReplySurb) -> Self {
retrieved.reply_surb.into()
}
}
#[derive(Debug, Clone)]
pub struct ReceivedReplySurbsMap {
@@ -87,40 +57,17 @@ impl ReceivedReplySurbsMap {
self.inner.data.iter()
}
pub fn as_raw_iter_mut(&self) -> IterMut<'_, AnonymousSenderTag, ReceivedReplySurbs> {
self.inner.data.iter_mut()
pub fn remove(&self, target: &AnonymousSenderTag) {
self.inner.data.remove(target);
}
fn total_surbs(&self) -> usize {
self.inner
.data
.iter()
.map(|entry| entry.value().data.len())
.sum()
}
pub fn drop_stale_loaded_surbs(&self, cutoff: OffsetDateTime) {
let before = self.total_surbs();
self.inner.data.retain(|_, v| {
if v.surbs_last_received_at() < cutoff {
return false;
}
v.data.retain(|s| s.received_at > cutoff);
!v.data.is_empty()
});
let after = self.total_surbs();
let diff = before - after;
if diff != 0 {
info!("removed {diff} stale reply SURBs")
pub fn reset_surbs_last_received_at(&self, target: &AnonymousSenderTag) {
if let Some(mut entry) = self.inner.data.get_mut(target) {
entry.surbs_last_received_at_timestamp = OffsetDateTime::now_utc().unix_timestamp();
}
}
pub fn retain(&self, f: impl FnMut(&AnonymousSenderTag, &mut ReceivedReplySurbs) -> bool) {
self.inner.data.retain(f);
}
pub fn surbs_last_received_at(&self, target: &AnonymousSenderTag) -> Option<OffsetDateTime> {
pub fn surbs_last_received_at(&self, target: &AnonymousSenderTag) -> Option<i64> {
self.inner
.data
.get(target)
@@ -179,25 +126,15 @@ impl ReceivedReplySurbsMap {
.unwrap_or_default()
}
pub fn available_fresh_surbs(&self, target: &AnonymousSenderTag) -> usize {
self.inner
.data
.get(target)
.map(|entry| entry.fresh_left())
.unwrap_or_default()
}
pub fn contains_surbs_for(&self, target: &AnonymousSenderTag) -> bool {
self.inner.data.contains_key(target)
}
/// Attempt to retrieve the specified number of reply SURBs for the target sender
/// and return the number of SURBs remaining in the storage after the call.
pub fn get_reply_surbs(
&self,
target: &AnonymousSenderTag,
amount: usize,
) -> (Option<Vec<RetrievedReplySurb>>, usize) {
) -> (Option<Vec<ReplySurbWithKeyRotation>>, usize) {
if let Some(mut entry) = self.inner.data.get_mut(target) {
let surbs_left = entry.items_left();
if surbs_left < self.min_surb_threshold() + amount {
@@ -213,72 +150,34 @@ impl ReceivedReplySurbsMap {
pub fn get_reply_surb_ignoring_threshold(
&self,
target: &AnonymousSenderTag,
) -> (Option<RetrievedReplySurb>, usize) {
let Some(mut entry) = self.inner.data.get_mut(target) else {
return (None, 0);
};
entry.get_reply_surb()
) -> Option<(Option<ReplySurbWithKeyRotation>, usize)> {
self.inner
.data
.get_mut(target)
.map(|mut s| s.get_reply_surb())
}
pub fn get_reply_surb(
&self,
target: &AnonymousSenderTag,
) -> (Option<RetrievedReplySurb>, usize) {
let Some(mut entry) = self.inner.data.get_mut(target) else {
return (None, 0);
};
let surbs_left = entry.items_left();
if surbs_left < self.min_surb_threshold() {
(None, surbs_left)
} else {
entry.get_reply_surb()
}
}
pub fn re_insert_reply_surbs(
&self,
target: &AnonymousSenderTag,
surbs: Vec<RetrievedReplySurb>,
) {
error!("re-inserting {} unused surbs", surbs.len());
let mut entry = self.inner.data.entry(*target).or_insert_with(|| {
// this branch should realistically NEVER happen, but software be software, so let's not crash
error!("attempting to return surbs to no longer existing entry {target}");
ReceivedReplySurbs::new(VecDeque::new())
});
let entry = entry.value_mut();
for returned_surb in surbs.into_iter().rev() {
if returned_surb.stale_pile {
entry.possibly_stale.push_front(returned_surb.reply_surb)
) -> Option<(Option<ReplySurbWithKeyRotation>, usize)> {
self.inner.data.get_mut(target).map(|mut entry| {
let surbs_left = entry.items_left();
if surbs_left < self.min_surb_threshold() {
(None, surbs_left)
} else {
entry.data.push_front(returned_surb.reply_surb)
entry.get_reply_surb()
}
}
})
}
pub fn insert_fresh_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
pub fn insert_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
&self,
target: &AnonymousSenderTag,
surbs: I,
) {
if let Some(mut existing_data) = self.inner.data.get_mut(target) {
existing_data.insert_fresh_reply_surbs(surbs);
if existing_data.possibly_stale.is_empty() {
return;
}
// if we're above the minimum threshold, remove stale surbs
let threshold = self.min_surb_threshold();
let diff = existing_data.data.len().saturating_sub(threshold);
trace!("will attempt to remove up to {diff} stale surbs");
if diff > 0 {
existing_data.remove_stale_surbs(diff);
}
existing_data.insert_reply_surbs(surbs)
} else {
let new_entry = ReceivedReplySurbs::new(surbs.into_iter().collect());
self.inner.data.insert(*target, new_entry);
@@ -286,102 +185,44 @@ impl ReceivedReplySurbsMap {
}
}
#[derive(Debug)]
pub struct ReceivedReplySurb {
pub(crate) surb: ReplySurbWithKeyRotation,
pub(crate) received_at: OffsetDateTime,
}
impl From<ReceivedReplySurb> for ReplySurbWithKeyRotation {
fn from(surb: ReceivedReplySurb) -> Self {
surb.surb
}
}
impl ReceivedReplySurb {
pub fn received_at(&self) -> OffsetDateTime {
self.received_at
}
pub fn key_rotation(&self) -> SphinxKeyRotation {
self.surb.key_rotation()
}
}
#[derive(Debug)]
pub struct ReceivedReplySurbs {
data: VecDeque<ReceivedReplySurb>,
possibly_stale: VecDeque<ReceivedReplySurb>,
// in the future we'd probably want to put extra data here to indicate when the SURBs got received
// so we could invalidate entries from the previous key rotations
data: VecDeque<ReplySurbWithKeyRotation>,
pending_reception: u32,
surbs_last_received_at: OffsetDateTime,
surbs_last_received_at_timestamp: i64,
}
impl ReceivedReplySurbs {
fn new(initial_surbs: VecDeque<ReplySurbWithKeyRotation>) -> Self {
let mut this = ReceivedReplySurbs {
data: Default::default(),
possibly_stale: Default::default(),
ReceivedReplySurbs {
data: initial_surbs,
pending_reception: 0,
surbs_last_received_at: OffsetDateTime::now_utc(),
};
this.insert_fresh_reply_surbs(initial_surbs);
this
surbs_last_received_at_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
}
}
#[cfg(all(not(target_arch = "wasm32"), feature = "fs-surb-storage"))]
pub fn new_retrieved(
surbs: Vec<ReplySurbWithKeyRotation>,
surbs_last_received_at: OffsetDateTime,
surbs_last_received_at_timestamp: i64,
) -> ReceivedReplySurbs {
let mut this = ReceivedReplySurbs {
data: Default::default(),
possibly_stale: Default::default(),
ReceivedReplySurbs {
data: surbs.into(),
pending_reception: 0,
surbs_last_received_at,
};
this.insert_fresh_reply_surbs(surbs);
this.surbs_last_received_at = surbs_last_received_at;
this
}
pub fn downgrade_freshness(&mut self) -> usize {
debug_assert!(self.possibly_stale.is_empty());
std::mem::swap(&mut self.data, &mut self.possibly_stale);
self.possibly_stale.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty() && self.possibly_stale.is_empty()
surbs_last_received_at_timestamp,
}
}
#[cfg(all(not(target_arch = "wasm32"), feature = "fs-surb-storage"))]
pub fn surbs_ref(&self) -> &VecDeque<ReceivedReplySurb> {
pub fn surbs_ref(&self) -> &VecDeque<ReplySurbWithKeyRotation> {
&self.data
}
pub fn retain_fresh_surbs(&mut self, f: impl FnMut(&ReceivedReplySurb) -> bool) {
self.data.retain(f);
}
pub fn retain_possibly_stale_surbs(&mut self, f: impl FnMut(&ReceivedReplySurb) -> bool) {
self.possibly_stale.retain(f);
}
pub fn fresh_left(&self) -> usize {
self.data.len()
}
pub fn possibly_stale_left(&self) -> usize {
self.possibly_stale.len()
}
pub fn drop_possibly_stale_surbs(&mut self) {
self.possibly_stale = VecDeque::new();
}
pub fn surbs_last_received_at(&self) -> OffsetDateTime {
self.surbs_last_received_at
pub fn surbs_last_received_at(&self) -> i64 {
self.surbs_last_received_at_timestamp
}
pub fn pending_reception(&self) -> u32 {
@@ -402,78 +243,39 @@ impl ReceivedReplySurbs {
self.pending_reception = 0;
}
/// Attempt to retrieve the specified number of reply SURBs (if at least that many are present)
/// and return the number of SURBs remaining in the storage after the call.
pub fn get_reply_surbs(&mut self, amount: usize) -> (Option<Vec<RetrievedReplySurb>>, usize) {
pub fn get_reply_surbs(
&mut self,
amount: usize,
) -> (Option<Vec<ReplySurbWithKeyRotation>>, usize) {
if self.items_left() < amount {
(None, self.items_left())
} else {
let available_fresh = self.fresh_left();
// prefer the 'fresh' data if available. otherwise fallback to the possibly stale entries
let mut reply_surbs = Vec::with_capacity(amount);
let fresh_to_retrieve = min(available_fresh, amount);
for surb in self.data.drain(..fresh_to_retrieve) {
reply_surbs.push(RetrievedReplySurb::new_fresh(surb))
}
if available_fresh < amount {
let stale_to_retrieve = amount - fresh_to_retrieve;
for surb in self.possibly_stale.drain(..stale_to_retrieve) {
reply_surbs.push(RetrievedReplySurb::new_stale(surb))
}
}
(Some(reply_surbs), self.items_left())
let surbs = self.data.drain(..amount).collect();
(Some(surbs), self.items_left())
}
}
pub fn get_reply_surb(&mut self) -> (Option<RetrievedReplySurb>, usize) {
pub fn get_reply_surb(&mut self) -> (Option<ReplySurbWithKeyRotation>, usize) {
(self.pop_surb(), self.items_left())
}
fn pop_surb(&mut self) -> Option<RetrievedReplySurb> {
// prefer the 'fresh' data if available. otherwise fallback to the possibly stale entries
if let Some(fresh) = self.data.pop_front() {
return Some(RetrievedReplySurb::new_fresh(fresh));
}
if let Some(stale) = self.possibly_stale.pop_front() {
return Some(RetrievedReplySurb::new_stale(stale));
}
None
fn pop_surb(&mut self) -> Option<ReplySurbWithKeyRotation> {
self.data.pop_front()
}
fn items_left(&self) -> usize {
self.data.len() + self.possibly_stale.len()
}
pub fn remove_stale_surbs(&mut self, amount: usize) {
// remove up to amount number of possibly stale surbs
let amount = min(amount, self.possibly_stale.len());
self.possibly_stale.drain(..amount);
self.data.len()
}
// realistically we're always going to be getting multiple surbs at once
pub(crate) fn insert_fresh_reply_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
pub fn insert_reply_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
&mut self,
surbs: I,
) {
let received_at = OffsetDateTime::now_utc();
let mut v = surbs
.into_iter()
.map(|surb| ReceivedReplySurb { surb, received_at })
.collect::<VecDeque<_>>();
if v.is_empty() {
return;
}
let mut v = surbs.into_iter().collect::<VecDeque<_>>();
trace!("storing {} surbs in the storage", v.len());
self.data.append(&mut v);
self.surbs_last_received_at = received_at;
self.surbs_last_received_at_timestamp = OffsetDateTime::now_utc().unix_timestamp();
trace!("we now have {} surbs!", self.data.len());
}
}
@@ -201,7 +201,7 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
debug!(
"Attempting to establish connection to gateway at: {}",
"Attemting to establish connection to gateway at: {}",
self.gateway_address
);
let (ws_stream, _) = connect_async(
@@ -272,7 +272,7 @@ impl<C, St> GatewayClient<C, St> {
) -> Result<(), GatewayClientError> {
if let Some(shared_key) = self.shared_key() {
let encrypted = message.encrypt(&*shared_key)?;
Box::pin(self.send_websocket_message_without_response(encrypted)).await?;
Box::pin(self.send_websocket_message(encrypted)).await?;
Ok(())
} else {
Err(GatewayClientError::ConnectionInInvalidState)
@@ -330,80 +330,9 @@ impl<C, St> GatewayClient<C, St> {
}
}
/// Attempt to send a websocket message to the gateway without waiting for any response
async fn send_websocket_message_without_response(
&mut self,
msg: impl Into<Message>,
) -> Result<(), GatewayClientError> {
match self.connection {
SocketState::Available(ref mut conn) => Ok(conn.send(msg.into()).await?),
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
if let Err(err) = partially_delegated.send_without_response(msg.into()).await {
error!("failed to send message without response - {err}...");
// we must ensure we do not leave the task still active
if let Err(err) = self.recover_socket_connection().await {
error!("... and the delegated stream has also errored out - {err}")
}
Err(err)
} else {
Ok(())
}
}
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
_ => Err(GatewayClientError::ConnectionInInvalidState),
}
}
// A very nasty hack due to lack of id tags on messages - send a non-sphinx packet websocket
// message and wait until first non 'Send' response within timeout
pub async fn send_websocket_message_with_non_send_response(
&mut self,
msg: impl Into<Message>,
) -> Result<ServerResponse, GatewayClientError> {
let should_restart_mixnet_listener = if self.connection.is_partially_delegated() {
self.recover_socket_connection().await?;
true
} else {
false
};
let conn = match self.connection {
SocketState::Available(ref mut conn) => conn,
SocketState::NotConnected => return Err(GatewayClientError::ConnectionNotEstablished),
_ => return Err(GatewayClientError::ConnectionInInvalidState),
};
conn.send(msg.into()).await?;
let timeout = sleep(self.cfg.connection.response_timeout_duration);
tokio::pin!(timeout);
let response = loop {
tokio::select! {
_ = &mut timeout => {
break Err(GatewayClientError::Timeout);
}
// note: the below will also listen for shutdown signals
msg = self.read_control_response() => {
match msg {
Ok(res) => if !res.is_send() {
break Ok(res);
},
Err(err) => break Err(err),
}
}
}
};
if should_restart_mixnet_listener {
self.start_listening_for_mixnet_messages()?;
}
response
}
/// Attempt to send a websocket message to the gateway and wait until we receive a response.
// If we want to send a message (with response), we need to have a full control over the socket,
// as we need to be able to write the request and read the subsequent response
pub async fn send_websocket_message_with_response(
pub async fn send_websocket_message(
&mut self,
msg: impl Into<Message>,
) -> Result<ServerResponse, GatewayClientError> {
@@ -458,6 +387,29 @@ impl<C, St> GatewayClient<C, St> {
}
}
async fn send_websocket_message_without_response(
&mut self,
msg: Message,
) -> Result<(), GatewayClientError> {
match self.connection {
SocketState::Available(ref mut conn) => Ok(conn.send(msg).await?),
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
if let Err(err) = partially_delegated.send_without_response(msg).await {
error!("failed to send message without response - {err}...");
// we must ensure we do not leave the task still active
if let Err(err) = self.recover_socket_connection().await {
error!("... and the delegated stream has also errored out - {err}")
}
Err(err)
} else {
Ok(())
}
}
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
_ => Err(GatewayClientError::ConnectionInInvalidState),
}
}
fn check_gateway_protocol(
&self,
gateway_protocol: Option<u8>,
@@ -583,10 +535,7 @@ impl<C, St> GatewayClient<C, St> {
.encrypt(legacy_key)?;
info!("sending upgrade request and awaiting the acknowledgement back");
let (ciphertext, nonce) = match self
.send_websocket_message_with_response(upgrade_request)
.await?
{
let (ciphertext, nonce) = match self.send_websocket_message(upgrade_request).await? {
ServerResponse::EncryptedResponse { ciphertext, nonce } => (ciphertext, nonce),
ServerResponse::Error { message } => {
return Err(GatewayClientError::GatewayError(message))
@@ -618,7 +567,7 @@ impl<C, St> GatewayClient<C, St> {
&mut self,
msg: ClientControlRequest,
) -> Result<(), GatewayClientError> {
match self.send_websocket_message_with_response(msg).await? {
match self.send_websocket_message(msg).await? {
ServerResponse::Authenticate {
protocol_version,
status,
@@ -768,16 +717,13 @@ impl<C, St> GatewayClient<C, St> {
}
}
/// Attempt to retrieve the currently supported gateway protocol version of the remote.
pub async fn get_gateway_protocol(&mut self) -> Result<u8, GatewayClientError> {
if !self.connection.is_established() {
return Err(GatewayClientError::ConnectionNotEstablished);
}
match self
.send_websocket_message_with_non_send_response(
ClientControlRequest::SupportedProtocol {},
)
.send_websocket_message(ClientControlRequest::SupportedProtocol {})
.await?
{
ServerResponse::SupportedProtocol { version } => Ok(version),
@@ -794,10 +740,7 @@ impl<C, St> GatewayClient<C, St> {
credential,
self.shared_key.as_ref().unwrap(),
)?;
let bandwidth_remaining = match self
.send_websocket_message_with_non_send_response(msg)
.await?
{
let bandwidth_remaining = match self.send_websocket_message(msg).await? {
ServerResponse::Bandwidth { available_total } => Ok(available_total),
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
ServerResponse::TypedError { error } => {
@@ -815,10 +758,7 @@ impl<C, St> GatewayClient<C, St> {
async fn try_claim_testnet_bandwidth(&mut self) -> Result<(), GatewayClientError> {
let msg = ClientControlRequest::ClaimFreeTestnetBandwidth;
let bandwidth_remaining = match self
.send_websocket_message_with_non_send_response(msg)
.await?
{
let bandwidth_remaining = match self.send_websocket_message(msg).await? {
ServerResponse::Bandwidth { available_total } => Ok(available_total),
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
other => Err(GatewayClientError::UnexpectedResponse { name: other.name() }),
@@ -337,7 +337,7 @@ impl PartiallyDelegatedHandle {
// check if the split stream didn't error out
let receive_res = stream_receiver
.try_recv()
.map_err(|_| GatewayClientError::ConnectionAbruptlyClosed)?;
.expect("stream sender was somehow dropped without sending anything!");
if let Some(res) = receive_res {
let _res = res?;
+1 -10
View File
@@ -28,7 +28,6 @@ pub struct Config {
pub maximum_reconnection_backoff: Duration,
pub initial_connection_timeout: Duration,
pub maximum_connection_buffer_size: usize,
pub use_legacy_packet_encoding: bool,
}
impl Config {
@@ -37,14 +36,12 @@ impl Config {
maximum_reconnection_backoff: Duration,
initial_connection_timeout: Duration,
maximum_connection_buffer_size: usize,
use_legacy_packet_encoding: bool,
) -> Self {
Config {
initial_reconnection_backoff,
maximum_reconnection_backoff,
initial_connection_timeout,
maximum_connection_buffer_size,
use_legacy_packet_encoding,
}
}
}
@@ -270,12 +267,7 @@ impl SendWithoutResponse for Client {
fn send_without_response(&self, packet: MixPacket) -> io::Result<()> {
let address = packet.next_hop_address();
trace!("Sending packet to {address}");
// TODO: optimisation for the future: rather than constantly using legacy encoding,
// once we're addressing by node_id (and thus have full node info here),
// we could simply infer supported encoding based on their version
let framed_packet =
FramedNymPacket::from_mix_packet(packet, self.config.use_legacy_packet_encoding);
let framed_packet = FramedNymPacket::from(packet);
let Some(sender) = self.active_connections.get_mut(&address) else {
// there was never a connection to begin with
@@ -336,7 +328,6 @@ mod tests {
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
use_legacy_packet_encoding: false,
},
NoiseConfig::new(
Arc::new(x25519::KeyPair::new(&mut rng)),
@@ -72,7 +72,7 @@ macro_rules! collect_paged_skimmed_v2 {
.$f(false, Some(page), None, $self.use_bincode)
.await?;
if !metadata.consistency_check(&res.metadata) {
if metadata != res.metadata {
return Err(ValidatorClientError::InconsistentPagedMetadata);
}
@@ -471,12 +471,12 @@ impl NymApiClient {
pub async fn get_all_basic_entry_assigned_nodes(
&self,
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
self.get_all_basic_entry_assigned_nodes_with_metadata()
self.get_all_basic_entry_assigned_nodes_v2()
.await
.map(|res| res.nodes)
}
pub async fn get_all_basic_entry_assigned_nodes_with_metadata(
pub async fn get_all_basic_entry_assigned_nodes_v2(
&self,
) -> Result<SkimmedNodesWithMetadata, ValidatorClientError> {
collect_paged_skimmed_v2!(self, get_basic_entry_assigned_nodes_v2)
@@ -719,11 +719,10 @@ impl NymApiClient {
pub async fn partial_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<PartialExpirationDateSignatureResponse, ValidatorClientError> {
Ok(self
.nym_api
.partial_expiration_date_signatures(expiration_date, epoch_id)
.partial_expiration_date_signatures(expiration_date)
.await?)
}
@@ -740,11 +739,10 @@ impl NymApiClient {
pub async fn global_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<AggregatedExpirationDateSignatureResponse, ValidatorClientError> {
Ok(self
.nym_api
.global_expiration_date_signatures(expiration_date, epoch_id)
.global_expiration_date_signatures(expiration_date)
.await?)
}
@@ -6,18 +6,16 @@ use crate::nym_api::routes::{ecash, CORE_STATUS_COUNT, SINCE_ARG};
use async_trait::async_trait;
use nym_api_requests::ecash::models::{
AggregatedCoinIndicesSignatureResponse, AggregatedExpirationDateSignatureResponse,
BatchRedeemTicketsBody, EcashBatchTicketRedemptionResponse, EcashSignerStatusResponse,
EcashTicketVerificationResponse, IssuedTicketbooksChallengeCommitmentRequest,
IssuedTicketbooksChallengeCommitmentResponse, IssuedTicketbooksDataRequest,
IssuedTicketbooksDataResponse, IssuedTicketbooksForCountResponse, IssuedTicketbooksForResponse,
VerifyEcashTicketBody,
BatchRedeemTicketsBody, EcashBatchTicketRedemptionResponse, EcashTicketVerificationResponse,
IssuedTicketbooksChallengeCommitmentRequest, IssuedTicketbooksChallengeCommitmentResponse,
IssuedTicketbooksDataRequest, IssuedTicketbooksDataResponse, IssuedTicketbooksForCountResponse,
IssuedTicketbooksForResponse, VerifyEcashTicketBody,
};
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, BinaryBuildInformationOwned, ChainBlocksStatusResponse,
ChainStatusResponse, KeyRotationInfoResponse, LegacyDescribedMixNode, NodePerformanceResponse,
NodeRefreshBody, NymNodeDescription, PerformanceHistoryResponse, RewardedSetResponse,
SignerInformationResponse,
AnnotationResponse, ApiHealthResponse, BinaryBuildInformationOwned, ChainStatusResponse,
KeyRotationInfoResponse, LegacyDescribedMixNode, NodePerformanceResponse, NodeRefreshBody,
NymNodeDescription, PerformanceHistoryResponse, RewardedSetResponse,
};
use nym_api_requests::nym_nodes::{
NodesByAddressesRequestBody, NodesByAddressesResponse, PaginatedCachedNodesResponseV1,
@@ -391,6 +389,7 @@ pub trait NymApiClientExt: ApiClient {
routes::NYM_NODES_ROUTES,
"skimmed",
"entry-gateways",
"all",
],
&params,
)
@@ -1103,9 +1102,8 @@ pub trait NymApiClientExt: ApiClient {
async fn partial_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<PartialExpirationDateSignatureResponse, NymAPIError> {
let mut params = match expiration_date {
let params = match expiration_date {
None => Vec::new(),
Some(exp) => vec![(
ecash::EXPIRATION_DATE_PARAM,
@@ -1113,10 +1111,6 @@ pub trait NymApiClientExt: ApiClient {
)],
};
if let Some(epoch_id) = epoch_id {
params.push((ecash::EPOCH_ID_PARAM, epoch_id.to_string()));
}
self.get_json(
&[
routes::V1_API_VERSION,
@@ -1153,9 +1147,8 @@ pub trait NymApiClientExt: ApiClient {
async fn global_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
epoch_id: Option<EpochId>,
) -> Result<AggregatedExpirationDateSignatureResponse, NymAPIError> {
let mut params = match expiration_date {
let params = match expiration_date {
None => Vec::new(),
Some(exp) => vec![(
ecash::EXPIRATION_DATE_PARAM,
@@ -1163,10 +1156,6 @@ pub trait NymApiClientExt: ApiClient {
)],
};
if let Some(epoch_id) = epoch_id {
params.push((ecash::EPOCH_ID_PARAM, epoch_id.to_string()));
}
self.get_json(
&[
routes::V1_API_VERSION,
@@ -1343,22 +1332,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
async fn get_chain_blocks_status(&self) -> Result<ChainBlocksStatusResponse, NymAPIError> {
self.get_json("/v1/network/chain-blocks-status", NO_PARAMS)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_signer_status(&self) -> Result<EcashSignerStatusResponse, NymAPIError> {
self.get_json("/v1/ecash/signer-status", NO_PARAMS).await
}
#[instrument(level = "debug", skip(self))]
async fn get_signer_information(&self) -> Result<SignerInformationResponse, NymAPIError> {
self.get_json("/v1/api-status/signer-information", NO_PARAMS)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_key_rotation_info(&self) -> Result<KeyRotationInfoResponse, NymAPIError> {
self.get_json(
@@ -8,11 +8,11 @@ use crate::nyxd::CosmWasmClient;
use async_trait::async_trait;
use cosmrs::AccountId;
use cosmwasm_std::Addr;
use nym_coconut_dkg_common::dealer::RegisteredDealerDetails;
use nym_coconut_dkg_common::types::{ChunkIndex, NodeIndex, StateAdvanceResponse};
use serde::Deserialize;
use tracing::trace;
use nym_coconut_dkg_common::dealer::RegisteredDealerDetails;
pub use nym_coconut_dkg_common::{
dealer::{DealerDetailsResponse, PagedDealerIndexResponse, PagedDealerResponse},
dealing::{
@@ -21,9 +21,7 @@ pub use nym_coconut_dkg_common::{
},
msg::QueryMsg as DkgQueryMsg,
types::{DealerDetails, DealingIndex, Epoch, EpochId, EpochState, State},
verification_key::{
ContractVKShare, PagedVKSharesResponse, VerificationKeyShare, VkShareResponse,
},
verification_key::{ContractVKShare, PagedVKSharesResponse, VkShareResponse},
};
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
@@ -43,11 +41,6 @@ pub trait DkgQueryClient {
self.query_dkg_contract(request).await
}
async fn get_epoch_at_height(&self, height: u64) -> Result<Option<Epoch>, NyxdError> {
let request = DkgQueryMsg::GetEpochStateAtHeight { height };
self.query_dkg_contract(request).await
}
async fn can_advance_state(&self) -> Result<StateAdvanceResponse, NyxdError> {
let request = DkgQueryMsg::CanAdvanceState {};
self.query_dkg_contract(request).await
@@ -94,34 +87,6 @@ pub trait DkgQueryClient {
self.query_dkg_contract(request).await
}
async fn get_epoch_dealers_paged(
&self,
epoch_id: EpochId,
start_after: Option<String>,
limit: Option<u32>,
) -> Result<PagedDealerResponse, NyxdError> {
let request = DkgQueryMsg::GetEpochDealers {
epoch_id,
start_after,
limit,
};
self.query_dkg_contract(request).await
}
async fn get_epoch_dealers_addresses_paged(
&self,
epoch_id: EpochId,
start_after: Option<String>,
limit: Option<u32>,
) -> Result<PagedDealerResponse, NyxdError> {
let request = DkgQueryMsg::GetEpochDealersAddresses {
epoch_id,
start_after,
limit,
};
self.query_dkg_contract(request).await
}
async fn get_dealer_indices_paged(
&self,
start_after: Option<String>,
@@ -243,20 +208,6 @@ pub trait PagedDkgQueryClient: DkgQueryClient {
collect_paged!(self, get_current_dealers_paged, dealers)
}
async fn get_all_epoch_dealers(
&self,
epoch_id: EpochId,
) -> Result<Vec<DealerDetails>, NyxdError> {
collect_paged!(self, get_epoch_dealers_paged, dealers, epoch_id)
}
async fn get_all_epoch_dealers_addresses(
&self,
epoch_id: EpochId,
) -> Result<Vec<DealerDetails>, NyxdError> {
collect_paged!(self, get_epoch_dealers_addresses_paged, dealers, epoch_id)
}
async fn get_all_dealer_indices(&self) -> Result<Vec<(Addr, NodeIndex)>, NyxdError> {
collect_paged!(self, get_dealer_indices_paged, indices)
}
@@ -306,9 +257,6 @@ mod tests {
match msg {
DkgQueryMsg::GetState {} => client.get_state().ignore(),
DkgQueryMsg::GetCurrentEpochState {} => client.get_current_epoch().ignore(),
DkgQueryMsg::GetEpochStateAtHeight { height } => {
client.get_epoch_at_height(height).ignore()
}
DkgQueryMsg::CanAdvanceState {} => client.can_advance_state().ignore(),
DkgQueryMsg::GetCurrentEpochThreshold {} => {
client.get_current_epoch_threshold().ignore()
@@ -328,20 +276,6 @@ mod tests {
DkgQueryMsg::GetCurrentDealers { limit, start_after } => client
.get_current_dealers_paged(start_after, limit)
.ignore(),
QueryMsg::GetEpochDealers {
epoch_id,
limit,
start_after,
} => client
.get_epoch_dealers_paged(epoch_id, start_after, limit)
.ignore(),
QueryMsg::GetEpochDealersAddresses {
epoch_id,
limit,
start_after,
} => client
.get_epoch_dealers_addresses_paged(epoch_id, start_after, limit)
.ignore(),
DkgQueryMsg::GetDealerIndices { limit, start_after } => {
client.get_dealer_indices_paged(start_after, limit).ignore()
}
@@ -7,16 +7,17 @@ use crate::nyxd::error::NyxdError;
use crate::nyxd::CosmWasmClient;
use async_trait::async_trait;
use cosmrs::AccountId;
use serde::Deserialize;
pub use nym_performance_contract_common::{
msg::QueryMsg as PerformanceQueryMsg, types::NetworkMonitorResponse, EpochId,
EpochMeasurementsPagedResponse, EpochNodePerformance, EpochPerformancePagedResponse,
FullHistoricalPerformancePagedResponse, HistoricalPerformance, LastSubmission,
NetworkMonitorInformation, NetworkMonitorsPagedResponse, NodeId, NodeMeasurement,
NodeMeasurementsResponse, NodePerformance, NodePerformancePagedResponse,
NodePerformanceResponse, RetiredNetworkMonitor, RetiredNetworkMonitorsPagedResponse,
msg::QueryMsg as PerformanceQueryMsg, types::NetworkMonitorResponse,
};
use nym_performance_contract_common::{
EpochId, EpochMeasurementsPagedResponse, EpochNodePerformance, EpochPerformancePagedResponse,
FullHistoricalPerformancePagedResponse, HistoricalPerformance, NetworkMonitorInformation,
NetworkMonitorsPagedResponse, NodeId, NodeMeasurement, NodeMeasurementsResponse,
NodePerformance, NodePerformancePagedResponse, NodePerformanceResponse, RetiredNetworkMonitor,
RetiredNetworkMonitorsPagedResponse,
};
use serde::Deserialize;
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
@@ -138,11 +139,6 @@ pub trait PerformanceQueryClient {
})
.await
}
async fn get_last_submission(&self) -> Result<LastSubmission, NyxdError> {
self.query_performance_contract(PerformanceQueryMsg::LastSubmittedMeasurement {})
.await
}
}
// extension trait to the query client to deal with the paged queries
@@ -216,7 +212,6 @@ where
mod tests {
use super::*;
use crate::nyxd::contract_traits::tests::IgnoreValue;
use nym_performance_contract_common::QueryMsg;
// it's enough that this compiles and clippy is happy about it
#[allow(dead_code)]
@@ -265,7 +260,6 @@ mod tests {
PerformanceQueryMsg::RetiredNetworkMonitorsPaged { start_after, limit } => client
.get_retired_network_monitors_paged(start_after, limit)
.ignore(),
QueryMsg::LastSubmittedMeasurement {} => client.get_last_submission().ignore(),
};
}
}
@@ -139,22 +139,12 @@ impl NyxdClient<HttpClient> {
})
}
pub fn connect_with_network_details<U>(
endpoint: U,
network_details: NymNetworkDetails,
) -> Result<QueryHttpRpcNyxdClient, NyxdError>
where
U: TryInto<HttpClientUrl, Error = TendermintRpcError>,
{
let config = Config::try_from_nym_network_details(&network_details)?;
Self::connect(config, endpoint)
}
pub fn connect_to_default_env<U>(endpoint: U) -> Result<QueryHttpRpcNyxdClient, NyxdError>
where
U: TryInto<HttpClientUrl, Error = TendermintRpcError>,
{
Self::connect_with_network_details(endpoint, NymNetworkDetails::new_from_env())
let config = Config::try_from_nym_network_details(&NymNetworkDetails::new_from_env())?;
Self::connect(config, endpoint)
}
}
+1 -1
View File
@@ -86,7 +86,7 @@ pub async fn execute(args: Args) -> anyhow::Result<()> {
anyhow!("ticketbook got incorrectly imported - the master verification key is missing")
})?;
let expiration_signatures = persistent_storage
.get_expiration_date_signatures(expiration_date, epoch_id)
.get_expiration_date_signatures(expiration_date)
.await?
.ok_or_else(|| {
anyhow!(
@@ -120,7 +120,7 @@ async fn issue_to_file(args: Args, client: SigningClient) -> anyhow::Result<()>
if args.include_expiration_date_signatures {
let signatures = credentials_store
.get_expiration_date_signatures(expiration_date, epoch_id)
.get_expiration_date_signatures(expiration_date)
.await?
.ok_or(anyhow!("missing expiration date signatures!"))?;
+2 -2
View File
@@ -49,14 +49,14 @@ pub fn show_error<E>(e: E)
where
E: Display,
{
error!("{e}");
error!("{}", e);
}
pub fn show_error_passthrough<E>(e: E) -> E
where
E: Error + Display,
{
error!("{e}");
error!("{}", e);
e
}
@@ -42,7 +42,7 @@ pub async fn query_balance(
.address
.unwrap_or_else(|| address_from_mnemonic.expect("please provide a mnemonic"));
info!("Getting balance for {address}...");
info!("Getting balance for {}...", address);
match client.get_all_balances(&address).await {
Ok(coins) => {
@@ -57,17 +57,17 @@ pub fn get_pubkey_from_mnemonic(address: AccountId, prefix: &str, mnemonic: bip3
println!("{}", account.public_key().to_string());
}
None => {
error!("Could not derive key that matches {address}")
error!("Could not derive key that matches {}", address)
}
},
Err(e) => {
error!("Failed to derive accounts. {e}");
error!("Failed to derive accounts. {}", e);
}
}
}
pub async fn get_pubkey_from_chain(address: AccountId, client: &QueryClient) {
info!("Getting public key for address {address} from chain...");
info!("Getting public key for address {} from chain...", address);
match client.get_account(&address).await {
Ok(Some(account)) => {
if let Ok(base_account) = account.try_get_base_account() {
@@ -37,7 +37,7 @@ pub async fn send_multiple(args: Args, client: &SigningClient) {
let rows = InputFileReader::new(&args.input);
if let Err(e) = rows {
error!("Failed to read input file: {e}");
error!("Failed to read input file: {}", e);
return;
}
let rows = rows.unwrap();
@@ -67,7 +67,7 @@ pub async fn send_multiple(args: Args, client: &SigningClient) {
.prompt();
if let Err(e) = ans {
info!("Aborting, {e}...");
info!("Aborting, {}...", e);
return;
}
if let Ok(false) = ans {
@@ -100,10 +100,13 @@ pub async fn send_multiple(args: Args, client: &SigningClient) {
println!("Transaction hash: {}", &res.hash);
if let Some(output_filename) = args.output {
println!("\nWriting output log to {output_filename}");
println!("\nWriting output log to {}", output_filename);
if let Err(e) = write_output_file(rows, res, &output_filename) {
error!("Failed to write output file {output_filename} with error {e}");
error!(
"Failed to write output file {} with error {}",
output_filename, e
);
}
}
}
@@ -133,7 +136,7 @@ fn write_output_file(
.collect::<Vec<String>>()
.join("\n");
Ok(file.write_all(format!("{data}\n").as_bytes())?)
Ok(file.write_all(format!("{}\n", data).as_bytes())?)
}
#[derive(Debug)]
@@ -168,7 +171,7 @@ impl InputFileReader {
// multiply when a whole token amount, e.g. 50nym (50.123456nym is not allowed, that must be input as 50123456unym)
let (amount, denom) = if !denom.starts_with('u') {
(amount * 1_000_000u128, format!("u{denom}"))
(amount * 1_000_000u128, format!("u{}", denom))
} else {
(amount, denom)
};
@@ -55,6 +55,6 @@ pub async fn execute(args: Args, client: SigningClient) {
.await
{
Ok(res) => info!("SUCCESS ✅\n{}", json!(res)),
Err(e) => error!("FAILURE ❌\n{e}"),
Err(e) => error!("FAILURE ❌\n{}", e),
}
}
@@ -43,7 +43,7 @@ pub struct Args {
pub async fn generate(args: Args) {
info!("Starting to generate vesting contract instantiate msg");
debug!("Received arguments: {args:?}");
debug!("Received arguments: {:?}", args);
let multisig_addr = args.multisig_addr.unwrap_or_else(|| {
let address = std::env::var(nym_network_defaults::var_names::REWARDING_VALIDATOR_ADDRESS)
@@ -97,7 +97,7 @@ pub async fn generate(args: Args) {
key_size: DEFAULT_DEALINGS as u32,
};
debug!("instantiate_msg: {instantiate_msg:?}");
debug!("instantiate_msg: {:?}", instantiate_msg);
let res =
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
@@ -28,7 +28,7 @@ pub struct Args {
pub async fn generate(args: Args) {
info!("Starting to generate vesting contract instantiate msg");
debug!("Received arguments: {args:?}");
debug!("Received arguments: {:?}", args);
let group_addr = args.group_addr.unwrap_or_else(|| {
let address = std::env::var(nym_network_defaults::var_names::GROUP_CONTRACT_ADDRESS)
@@ -51,7 +51,7 @@ pub async fn generate(args: Args) {
deposit_amount: args.deposit_amount,
};
debug!("instantiate_msg: {instantiate_msg:?}");
debug!("instantiate_msg: {:?}", instantiate_msg);
let res =
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
@@ -88,7 +88,7 @@ pub struct Args {
pub async fn generate(args: Args) {
info!("Starting to generate mixnet contract instantiate msg");
debug!("Received arguments: {args:?}");
debug!("Received arguments: {:?}", args);
let initial_rewarding_params = InitialRewardingParams {
initial_reward_pool: Decimal::from_atomics(args.initial_reward_pool, 0)
@@ -114,7 +114,7 @@ pub async fn generate(args: Args) {
},
};
debug!("initial_rewarding_params: {initial_rewarding_params:?}");
debug!("initial_rewarding_params: {:?}", initial_rewarding_params);
let rewarding_validator_address = args.rewarding_validator_address.unwrap_or_else(|| {
let address = std::env::var(nym_network_defaults::var_names::REWARDING_VALIDATOR_ADDRESS)
@@ -160,7 +160,7 @@ pub async fn generate(args: Args) {
key_validity_in_epochs: None,
};
debug!("instantiate_msg: {instantiate_msg:?}");
debug!("instantiate_msg: {:?}", instantiate_msg);
let res =
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
@@ -31,7 +31,7 @@ pub struct Args {
pub async fn generate(args: Args) {
info!("Starting to generate vesting contract instantiate msg");
debug!("Received arguments: {args:?}");
debug!("Received arguments: {:?}", args);
let ecash_contract_address = args.ecash_contract_address.unwrap_or_else(|| {
let address = std::env::var(nym_network_defaults::var_names::ECASH_CONTRACT_ADDRESS)
@@ -60,7 +60,7 @@ pub async fn generate(args: Args) {
coconut_dkg_contract_address: coconut_dkg_contract_address.to_string(),
};
debug!("instantiate_msg: {instantiate_msg:?}");
debug!("instantiate_msg: {:?}", instantiate_msg);
let res =
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
@@ -21,7 +21,7 @@ pub struct Args {
pub async fn generate(args: Args) {
info!("Starting to generate vesting contract instantiate msg");
debug!("Received arguments: {args:?}");
debug!("Received arguments: {:?}", args);
let mixnet_contract_address = args.mixnet_contract_address.unwrap_or_else(|| {
let address = std::env::var(nym_network_defaults::var_names::MIXNET_CONTRACT_ADDRESS)
@@ -39,7 +39,7 @@ pub async fn generate(args: Args) {
mix_denom,
};
debug!("instantiate_msg: {instantiate_msg:?}");
debug!("instantiate_msg: {:?}", instantiate_msg);
let res =
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
@@ -72,7 +72,7 @@ pub async fn init(args: Args, client: SigningClient, network_details: &NymNetwor
.await
.expect("failed to instantiate the contract!");
info!("Init result: {res:?}");
info!("Init result: {:?}", res);
println!("{}", res.contract_address)
}
@@ -47,5 +47,5 @@ pub async fn migrate(args: Args, client: SigningClient) {
.expect("failed to migrate the contract!")
};
info!("Migrate result: {res:?}");
info!("Migrate result: {:?}", res);
}
@@ -31,7 +31,7 @@ pub async fn upload(args: Args, client: SigningClient) {
.await
.expect("failed to upload the contract!");
info!("Upload result: {res:?}");
info!("Upload result: {:?}", res);
println!("{}", res.code_id)
}
@@ -47,5 +47,5 @@ pub async fn delegate_to_mixnode(args: Args, client: SigningClient) {
.await
.expect("failed to delegate to mixnode!");
info!("delegating to mixnode: {res:?}");
info!("delegating to mixnode: {:?}", res);
}
@@ -196,7 +196,7 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
let records = match InputFileReader::new(&args.input) {
Ok(records) => records,
Err(e) => {
println!("Error reading input file: {e}");
println!("Error reading input file: {}", e);
return;
}
};
@@ -241,7 +241,7 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
let node_id = row.node_id.clone().parse::<u32>().unwrap();
let coins: Vec<Coin> = vec![];
undelegation_msgs.push((ExecuteMsg::Undelegate { node_id }, coins));
undelegation_table.add_row(std::slice::from_ref(&row.node_id));
undelegation_table.add_row(&[row.node_id.clone()]);
if row.amount.amount > 0 {
delegation_msgs
@@ -262,11 +262,11 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
}
if !undelegation_msgs.is_empty() {
println!("Undelegation records : \n{undelegation_table}\n\n");
println!("Undelegation records : \n{}\n\n", undelegation_table);
}
if !delegation_msgs.is_empty() {
println!("Delegation records : \n{delegation_table}\n\n");
println!("Delegation records : \n{}\n\n", delegation_table);
}
let ans = inquire::Confirm::new("Do you want to continue with the shown operations?")
@@ -275,7 +275,7 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
.prompt();
if let Err(e) = ans {
info!("Aborting, {e}...");
info!("Aborting, {}...", e);
return;
}
@@ -348,7 +348,7 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
if args.output.is_some() {
if let Err(e) = write_to_csv(output_details, args.output) {
info!("Failed to write to CSV, {e}");
info!("Failed to write to CSV, {}", e);
}
}
}

Some files were not shown because too many files have changed in this diff Show More