Compare commits

..

1 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 7d0a865cd4 Make allowed_ips unique 2025-07-24 10:38:42 +03:00
1286 changed files with 34642 additions and 61800 deletions
@@ -21,7 +21,7 @@ jobs:
strategy:
fail-fast: false
matrix:
platform: [ arc-linux-latest ]
platform: [ arc-ubuntu-22.04 ]
runs-on: ${{ matrix.platform }}
env:
@@ -38,14 +38,15 @@ jobs:
rm -rf ci-builds || true
mkdir -p $OUTPUT_DIR
echo $OUTPUT_DIR
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libudev-dev
- name: Sets env vars for tokio if set in manual dispatch inputs
if: github.event_name == 'workflow_dispatch' && inputs.add_tokio_unstable == true
run: |
echo "RUSTFLAGS=--cfg tokio_unstable" >> $GITHUB_ENV
echo "CARGO_FEATURES=--features tokio-console" >> $GITHUB_ENV
echo 'RUSTFLAGS="--cfg tokio_unstable"' >> $GITHUB_ENV
if: github.event_name == 'workflow_dispatch' && inputs.add_tokio_unstable == true
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
@@ -102,6 +103,7 @@ jobs:
if [ ${{ github.event_name == 'workflow_dispatch' && inputs.enable_deb == true }} = true ]; then
cp target/debian/*.deb $OUTPUT_DIR
fi
- name: Deploy branch to CI www
continue-on-error: true
uses: easingthemes/ssh-deploy@main
+1 -1
View File
@@ -9,7 +9,7 @@ on:
jobs:
wasm:
runs-on: arc-linux-latest
runs-on: arc-ubuntu-22.04
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
+8 -20
View File
@@ -8,13 +8,10 @@ on:
- 'gateway/**'
- 'integrations/**'
- 'nym-api/**'
- 'nym-authenticator-client/**'
- 'nym-credential-proxy/**'
- 'nym-ip-packet-client/**'
- 'nym-network-monitor/**'
- 'nym-node/**'
- 'nym-node-status-api/**'
- 'nym-registration-client/**'
- 'nym-statistics-api/**'
- 'nym-outfox/**'
- 'nym-validator-rewarder/**'
@@ -41,7 +38,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ arc-linux-latest, custom-windows-11, custom-macos-15 ]
os: [ arc-ubuntu-22.04, custom-windows-11, custom-macos-15 ]
runs-on: ${{ matrix.os }}
env:
CARGO_TERM_COLOR: always
@@ -49,9 +46,9 @@ jobs:
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools protobuf-compiler cmake
run: sudo apt-get update && sudo apt-get -y install libwebkit2gtk-4.0-dev build-essential curl wget libssl-dev libgtk-3-dev libudev-dev squashfs-tools protobuf-compiler
continue-on-error: true
if: contains(matrix.os, 'linux')
if: contains(matrix.os, 'ubuntu')
- name: Check out repository code
uses: actions/checkout@v4
@@ -66,7 +63,7 @@ jobs:
# To avoid running out of disk space, skip generating debug symbols
- name: Set debug to false (unix)
if: contains(matrix.os, 'linux') || contains(matrix.os, 'mac')
if: contains(matrix.os, 'ubuntu') || contains(matrix.os, 'mac')
run: |
sed -i.bak 's/\[profile.dev\]/\[profile.dev\]\ndebug = false/' Cargo.toml
git diff
@@ -84,35 +81,26 @@ jobs:
command: fmt
args: --all -- --check
- name: Clippy (macos)
if: contains(matrix.os, 'mac')
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace --all-targets --exclude nym-gateway-probe -- -D warnings
- name: Clippy (non-macos)
if: contains(matrix.os, 'linux') || contains(matrix.os, 'windows')
- name: Clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace --all-targets -- -D warnings
- name: Build all binaries
uses: actions-rs/cargo@v1
with:
command: build
- name: Build all examples
if: contains(matrix.os, 'linux')
if: contains(matrix.os, 'ubuntu')
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace --examples
- name: Run all tests
if: contains(matrix.os, 'linux')
if: contains(matrix.os, 'ubuntu')
uses: actions-rs/cargo@v1
env:
NYM_API: https://sandbox-nym-api1.nymtech.net/api
@@ -121,7 +109,7 @@ jobs:
args: --workspace
- name: Run expensive tests
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && contains(matrix.os, 'linux')
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && contains(matrix.os, 'ubuntu')
uses: actions-rs/cargo@v1
with:
command: test
@@ -10,13 +10,13 @@ env:
jobs:
check-if-tag-exists:
runs-on: arc-linux-latest-dind
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -16,7 +16,7 @@ jobs:
uses: actions/checkout@v4
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -11,7 +11,7 @@ jobs:
strategy:
fail-fast: false
matrix:
platform: [ arc-linux-latest-dind ]
platform: [ arc-ubuntu-22.04 ]
runs-on: ${{ matrix.platform }}
env:
@@ -28,11 +28,18 @@ jobs:
mkdir -p $OUTPUT_DIR
echo $OUTPUT_DIR
- name: Build contracts
run: make optimize-contracts
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: stable
target: wasm32-unknown-unknown
override: true
- name: Check optimized contracts
run: make docker-check-contracts
- name: Install cosmwasm-check
run: cargo install cosmwasm-check
- name: Build release contracts
run: make publish-contracts
- name: Prepare build output
shell: bash
+2 -2
View File
@@ -17,7 +17,7 @@ jobs:
build:
# since it's going to be compiled into wasm, there's absolutely
# no point in running CI on different OS-es
runs-on: arc-linux-latest
runs-on: ubuntu-22.04
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
@@ -54,7 +54,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --lib --manifest-path contracts/Cargo.toml --all-features
args: --lib --manifest-path contracts/Cargo.toml
- name: Check formatting
uses: actions-rs/cargo@v1
+1 -1
View File
@@ -10,7 +10,7 @@ on:
jobs:
build:
runs-on: arc-linux-latest
runs-on: arc-ubuntu-22.04
env:
RUSTUP_PERMIT_COPY_RENAME: 1
defaults:
+9 -10
View File
@@ -6,14 +6,16 @@ on:
paths:
- "ts-packages/**"
- "sdk/typescript/**"
- "nym-connect/desktop/src/**"
- "nym-connect/desktop/package.json"
- "nym-wallet/src/**"
- "nym-wallet/package.json"
- "explorer-v2/**"
- "explorer/**"
- ".github/workflows/ci-lint-typescript.yml"
jobs:
build:
runs-on: arc-linux-latest
runs-on: ubuntu-22.04
env:
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
@@ -23,7 +25,6 @@ jobs:
- uses: actions/setup-node@v4
with:
node-version: 20
- name: Setup yarn
run: npm install -g yarn
@@ -36,12 +37,14 @@ jobs:
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
- name: Install wasm-opt
run: cargo install wasm-opt
uses: ./.github/actions/install-wasm-opt
with:
version: '116'
- name: Set up Go
uses: actions/setup-go@v6
uses: actions/setup-go@v5
with:
go-version: "1.24.6"
go-version: "1.23.7"
- name: Install
run: yarn
@@ -49,11 +52,7 @@ jobs:
- name: Build packages
run: yarn build:ci
- name: Install again
run: yarn
- name: Lint
run: yarn lint
- name: Typecheck with tsc
run: yarn tsc
+1 -1
View File
@@ -11,7 +11,7 @@ on:
jobs:
build:
runs-on: arc-linux-latest
runs-on: arc-ubuntu-22.04
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
@@ -8,7 +8,7 @@ on:
jobs:
build:
runs-on: arc-linux-latest
runs-on: custom-linux
steps:
- uses: actions/checkout@v4
+7 -7
View File
@@ -4,14 +4,14 @@ on:
workflow_dispatch:
pull_request:
paths:
- "wasm/**"
- "clients/client-core/**"
- "common/**"
- ".github/workflows/ci-sdk-wasm.yml"
- 'wasm/**'
- 'clients/client-core/**'
- 'common/**'
- '.github/workflows/ci-sdk-wasm.yml'
jobs:
wasm:
runs-on: arc-linux-latest
runs-on: arc-ubuntu-22.04
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
@@ -33,7 +33,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.24.6"
go-version: "1.23.7"
- name: Install wasm-pack
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
@@ -41,7 +41,7 @@ jobs:
- name: Install wasm-opt
uses: ./.github/actions/install-wasm-opt
with:
version: "116"
version: '116'
- name: Install wasm-bindgen-cli
run: cargo install wasm-bindgen-cli
+1 -1
View File
@@ -6,7 +6,7 @@ jobs:
greeting:
runs-on: ubuntu-latest
steps:
- uses: actions/first-interaction@v3
- uses: actions/first-interaction@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
issue-message: 'Thank you for raising this issue'
+1 -1
View File
@@ -31,7 +31,7 @@ jobs:
- name: Check out repository code
uses: actions/checkout@v4
- name: Download report from previous job
uses: actions/download-artifact@v5
uses: actions/download-artifact@v4
with:
name: report
path: .github/workflows/support-files/notifications
+2 -2
View File
@@ -21,7 +21,7 @@ jobs:
fail-fast: false
matrix:
include:
- os: arc-linux-latest
- os: arc-ubuntu-22.04
target: x86_64-unknown-linux-gnu
runs-on: ${{ matrix.os }}
@@ -56,7 +56,7 @@ jobs:
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: 1.88.0
toolchain: 1.86.0
override: true
- name: Build all binaries
@@ -25,7 +25,7 @@ jobs:
uses: actions/checkout@v4
- name: Install Java
uses: actions/setup-java@v5
uses: actions/setup-java@v4
with:
distribution: "temurin"
java-version: "17"
@@ -91,7 +91,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Download binary artifact
uses: actions/download-artifact@v5
uses: actions/download-artifact@v4
with:
name: nyms5-apk-arch64
path: apk
+4 -7
View File
@@ -4,7 +4,7 @@ on:
jobs:
publish:
runs-on: ubuntu-latest
runs-on: arc-ubuntu-22.04
steps:
- uses: actions/checkout@v4
@@ -17,13 +17,10 @@ jobs:
- name: Setup yarn
run: npm install -g yarn
- name: Install rust toolchain
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
components: rustfmt, clippy
- name: Install wasm-pack
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
@@ -32,9 +29,9 @@ jobs:
run: cargo install wasm-opt
- name: Set up Go
uses: actions/setup-go@v6
uses: actions/setup-go@v5
with:
go-version: "1.24.6"
go-version: "1.23.7"
- name: Install dependencies
run: yarn
+2 -2
View File
@@ -8,7 +8,7 @@ env:
jobs:
build-container:
runs-on: arc-linux-latest-dind
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-credential-proxy/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-network-monitor/Cargo.toml
+35 -12
View File
@@ -3,6 +3,11 @@ name: Build and upload Node Status agent container to harbor.nymte.ch
on:
workflow_dispatch:
inputs:
gateway_probe_git_ref:
type: string
default: nym-vpn-core-v1.4.0
required: true
description: Which gateway probe git ref to build the image with
release_image:
description: 'Tag image as a release'
required: true
@@ -15,7 +20,7 @@ env:
jobs:
build-container:
runs-on: arc-linux-latest-dind
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
@@ -35,23 +40,41 @@ jobs:
- name: Get version from cargo.toml
id: get_version
run: |
VERSION=$(yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml)
echo "result=$VERSION" >> $GITHUB_OUTPUT
yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Initialize RELEASE_TAG
run: echo "RELEASE_TAG=" >> $GITHUB_ENV
- name: Set RELEASE_TAG for release
- name: cleanup-gateway-probe-ref
id: cleanup_gateway_probe_ref
run: |
GATEWAY_PROBE_GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }}
GIT_REF_SLUG="${GATEWAY_PROBE_GIT_REF//\//-}"
echo "git_ref=${GIT_REF_SLUG}" >> $GITHUB_OUTPUT
- name: Set GIT_TAG variable
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
- name: Set RELEASE_TAG variable
if: github.event.inputs.release_image == 'true'
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
- name: Set IMAGE_NAME_AND_TAGS variable
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
- name: New env vars
run: echo "RELEASE_TAG='$RELEASE_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
# - name: Remove existing tag if exists
# run: |
# if git rev-parse $${{ env.GIT_TAG }} >/dev/null 2>&1; then
# git push --delete origin $${{ env.GIT_TAG }}
# git tag -d $${{ env.GIT_TAG }}
# fi
# - name: Create tag
# run: |
# git tag -a $${{ env.GIT_TAG }} -m "Version ${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}"
# git push origin $${{ env.GIT_TAG }}
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }}
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }}
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
+5 -9
View File
@@ -14,7 +14,7 @@ env:
jobs:
build-container:
runs-on: arc-linux-latest-dind
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
@@ -34,22 +34,18 @@ jobs:
- name: Get version from cargo.toml
id: get_version
run: |
VERSION=$(yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml)
echo "result=$VERSION" >> $GITHUB_OUTPUT
yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Set GIT_TAG variable
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
- name: Initialise RELEASE_TAG
run: echo "RELEASE_TAG=" >> $GITHUB_ENV
- name: Set RELEASE_TAG for release
- name: Set RELEASE_TAG variable
if: github.event.inputs.release_image == 'true'
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
- name: Set IMAGE_NAME_AND_TAGS variable
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
- name: New env vars
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-api/Cargo.toml
+1 -1
View File
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -8,7 +8,7 @@ env:
jobs:
build-container:
runs-on: arc-linux-latest-dind
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
@@ -26,7 +26,7 @@ jobs:
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.47.1
uses: mikefarah/yq@v4.45.4
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
-240
View File
@@ -4,246 +4,6 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2025.19-kase] (2025-10-30)
- update ns agent workflow ([#6154])
- Cherry pick - request #6143 from nymtech/bugfix/mix-tx-closed-v2 ([#6153])
- bugfix: nym-credential-proxy query params parsing regression ([#6121])
- bugfix: revert some dep updates introduced in #6043 ([#6120])
- Skip ipv6 metadata endpoint request ([#6118])
- update to no longer use 1mb files ([#6117])
- chore: restore pending dkg contract state migration ([#6116])
- Revert "Propagate cancel token to mixnet client" ([#6115])
- Update dirs to 6.0 ([#6109])
- Propagate cancel token to mixnet client ([#6105])
- bugfix: retrieve and update ticketbook in the same query ([#6101])
- bugfix: include network name in the default gateway probe config path ([#6100])
- Bugfix/incompatibility fixes ([#6099])
- [DOCs/operators] QUIC deployment script & docs ([#6098])
- bugfix: testnet manager 02sql migration ([#6096])
- feat: move gateway probe to monorepo (and update to rust edition 2024) ([#6094])
- bugfix: use custom topology provider for list of init gateways ([#6092])
- Max/fix wasm client + build commands ([#6043])
[#6154]: https://github.com/nymtech/nym/pull/6154
[#6153]: https://github.com/nymtech/nym/pull/6153
[#6121]: https://github.com/nymtech/nym/pull/6121
[#6120]: https://github.com/nymtech/nym/pull/6120
[#6118]: https://github.com/nymtech/nym/pull/6118
[#6117]: https://github.com/nymtech/nym/pull/6117
[#6116]: https://github.com/nymtech/nym/pull/6116
[#6115]: https://github.com/nymtech/nym/pull/6115
[#6109]: https://github.com/nymtech/nym/pull/6109
[#6105]: https://github.com/nymtech/nym/pull/6105
[#6101]: https://github.com/nymtech/nym/pull/6101
[#6100]: https://github.com/nymtech/nym/pull/6100
[#6099]: https://github.com/nymtech/nym/pull/6099
[#6098]: https://github.com/nymtech/nym/pull/6098
[#6096]: https://github.com/nymtech/nym/pull/6096
[#6094]: https://github.com/nymtech/nym/pull/6094
[#6092]: https://github.com/nymtech/nym/pull/6092
[#6043]: https://github.com/nymtech/nym/pull/6043
## [2025.18-jarlsberg] (2025-10-14)
- ns-api: add descriptions to dVPN gateway responses ([#6102])
- NS API: use new probe download filesize and milliseconds field ([#6097])
- ns-api: use download files size from probes instead of parsing filenames ([#6095])
- ns-api: add new fields for probe output for query_metadata and download file size and duration in ms ([#6091])
- Bugfix/bloomfilters purge ([#6089])
- Hotfix: Update API source in node ping tester script ([#6082])
- Get wireguard keypair as arg instead of reading it from disk ([#6078])
- Feature: Ping probe all nodes /described nodes from a server ([#6074])
- Node Status API: add bridge information to dVPN endpoint ([#6069])
- frontdoor typo fix ([#6067])
- Feature: Node rewards tracker ([#6064])
- [chore] Clippy fix ([#6060])
- Registration Client ([#6059])
- Bugfix: Nym node CLI download nym-node exception ([#6058])
- Feature: Nym node html landing page ([#6053])
- feat: DKG contract method for updating announce address ([#6050])
- feat: NS ticket faucet ([#6047])
- Bridge proto client params in Self-Described ([#6035])
- Node Status API: remove sqlite support ([#6004])
- Benny/ci contract fix ([#5962])
[#6102]: https://github.com/nymtech/nym/pull/6102
[#6097]: https://github.com/nymtech/nym/pull/6097
[#6095]: https://github.com/nymtech/nym/pull/6095
[#6091]: https://github.com/nymtech/nym/pull/6091
[#6089]: https://github.com/nymtech/nym/pull/6089
[#6082]: https://github.com/nymtech/nym/pull/6082
[#6078]: https://github.com/nymtech/nym/pull/6078
[#6074]: https://github.com/nymtech/nym/pull/6074
[#6069]: https://github.com/nymtech/nym/pull/6069
[#6067]: https://github.com/nymtech/nym/pull/6067
[#6064]: https://github.com/nymtech/nym/pull/6064
[#6060]: https://github.com/nymtech/nym/pull/6060
[#6059]: https://github.com/nymtech/nym/pull/6059
[#6058]: https://github.com/nymtech/nym/pull/6058
[#6053]: https://github.com/nymtech/nym/pull/6053
[#6050]: https://github.com/nymtech/nym/pull/6050
[#6047]: https://github.com/nymtech/nym/pull/6047
[#6035]: https://github.com/nymtech/nym/pull/6035
[#6004]: https://github.com/nymtech/nym/pull/6004
[#5962]: https://github.com/nymtech/nym/pull/5962
## [2025.17-isabirra] (2025-09-29)
- Bugfix | Fix the registration handshake ([#6062])
- Convenience for ShutdownTracker ([#6038])
- chore: made http-api-client-macro doctest compile ([#6037])
- feat: refresh mixnet contract on epoch progression ([#6023])
- chore: remove legacy nodes from nym api [and kinda-ish from node status api] ([#6021])
- Feature/credential proxy crate ([#6018])
- Moving clients crate from vpn-client repo to here ([#6015])
- Feature/cancellation migration ([#6014])
- Use default value for the ports until api is deployed ([#6007])
- bugfix: return from MixTrafficController if client request channel has closed ([#6002])
- Revert "Create an axum_test client for more integrated unit testing (… ([#5999])
- chore: upgraded syn to 2.0 and removed nym-execute ([#5998])
- feat: use `ShutdownToken` (`CancellationToken` inside) for nym-api ([#5997])
- bugfix: Recipient deserialisation for deserialisers missing bytes specialisation ([#5991])
- chore: use updated version of simulate endpoint ([#5988])
- chore: purge temp databases on build ([#5984])
- Bump sha.js from 2.4.11 to 2.4.12 ([#5983])
- Feature: Delegation program stake checker and adjuster ([#5980])
- build(deps): bump actions/setup-java from 4 to 5 ([#5975])
- Domain fronting integration ([#5974])
- chore: internal hidden command to force advance nyx epoch ([#5964])
- Create an axum_test client for more integrated unit testing ([#5956])
- feat: shared library for attempting to retrieve update mode attestation ([#5954])
- Bump slab from 0.4.10 to 0.4.11 ([#5952])
- build(deps): bump actions/first-interaction from 1 to 3 ([#5950])
- fix: use WASM compatible time API in client ([#5948])
- feat: credential proxy deposit pool ([#5945])
- build(deps): bump actions/download-artifact from 4 to 5 ([#5939])
- feat: nym signers monitor ([#5933])
- Bump console from 0.15.11 to 0.16.0 ([#5931])
- Bump mock_instant from 0.5.3 to 0.6.0 ([#5930])
- Bump tokio from 1.46.1 to 1.47.1 ([#5929])
- Bump defguard_wireguard_rs from v0.4.7 to v0.7.5 ([#5928])
- Bump indicatif from 0.17.11 to 0.18.0 ([#5924])
- Feature: Nym node autorun CLI ([#5916])
- build(deps): bump mikefarah/yq from 4.45.4 to 4.47.1 ([#5911])
- build(deps): bump pbkdf2 from 3.1.2 to 3.1.3 ([#5869])
[#6062]: https://github.com/nymtech/nym/pull/6062
[#6038]: https://github.com/nymtech/nym/pull/6038
[#6037]: https://github.com/nymtech/nym/pull/6037
[#6023]: https://github.com/nymtech/nym/pull/6023
[#6021]: https://github.com/nymtech/nym/pull/6021
[#6018]: https://github.com/nymtech/nym/pull/6018
[#6015]: https://github.com/nymtech/nym/pull/6015
[#6014]: https://github.com/nymtech/nym/pull/6014
[#6007]: https://github.com/nymtech/nym/pull/6007
[#6002]: https://github.com/nymtech/nym/pull/6002
[#5999]: https://github.com/nymtech/nym/pull/5999
[#5998]: https://github.com/nymtech/nym/pull/5998
[#5997]: https://github.com/nymtech/nym/pull/5997
[#5991]: https://github.com/nymtech/nym/pull/5991
[#5988]: https://github.com/nymtech/nym/pull/5988
[#5984]: https://github.com/nymtech/nym/pull/5984
[#5983]: https://github.com/nymtech/nym/pull/5983
[#5980]: https://github.com/nymtech/nym/pull/5980
[#5975]: https://github.com/nymtech/nym/pull/5975
[#5974]: https://github.com/nymtech/nym/pull/5974
[#5964]: https://github.com/nymtech/nym/pull/5964
[#5956]: https://github.com/nymtech/nym/pull/5956
[#5954]: https://github.com/nymtech/nym/pull/5954
[#5952]: https://github.com/nymtech/nym/pull/5952
[#5950]: https://github.com/nymtech/nym/pull/5950
[#5948]: https://github.com/nymtech/nym/pull/5948
[#5945]: https://github.com/nymtech/nym/pull/5945
[#5939]: https://github.com/nymtech/nym/pull/5939
[#5933]: https://github.com/nymtech/nym/pull/5933
[#5931]: https://github.com/nymtech/nym/pull/5931
[#5930]: https://github.com/nymtech/nym/pull/5930
[#5929]: https://github.com/nymtech/nym/pull/5929
[#5928]: https://github.com/nymtech/nym/pull/5928
[#5924]: https://github.com/nymtech/nym/pull/5924
[#5916]: https://github.com/nymtech/nym/pull/5916
[#5911]: https://github.com/nymtech/nym/pull/5911
[#5869]: https://github.com/nymtech/nym/pull/5869
## [2025.16-halloumi] (2025-09-16)
- Backport metadata endpoint ([#6010])
- bugfix: make sure tables are removed in correct order to not trigger FK constraint issue ([#5987])
- chore: move authenticator into gateway crate ([#5982])
- Fix the ns api ci workflow ([#5981])
- Remove freshness check on testrun submit ([#5977])
- Update sysinfo to the latest ([#5976])
- bugfix: manually calculate per node work on rewarded set changes ([#5972])
- fixing the ci for ns agent ([#5965])
- Feature/testing utils ([#5963])
- bugfix: fix ci-build for linux (and use updated runner) ([#5958])
- chore: updated refs to cheddar rev of nym repo ([#5955])
- http api client adjustment ([#5953])
- chore: fix rust 1.89 clippy issues ([#5944])
- Wireguard metadata client library ([#5943])
- chore: remove unused import ([#5942])
- feat: introduce additional checks when attempting to send to bounded channels ([#5941])
- Move credential verifier in peer controller ([#5938])
- change PK/FK on expiration date signatures tables ([#5934])
- Wireguard private metadata ([#5915])
[#6010]: https://github.com/nymtech/nym/pull/6010
[#5987]: https://github.com/nymtech/nym/pull/5987
[#5982]: https://github.com/nymtech/nym/pull/5982
[#5981]: https://github.com/nymtech/nym/pull/5981
[#5977]: https://github.com/nymtech/nym/pull/5977
[#5976]: https://github.com/nymtech/nym/pull/5976
[#5972]: https://github.com/nymtech/nym/pull/5972
[#5965]: https://github.com/nymtech/nym/pull/5965
[#5963]: https://github.com/nymtech/nym/pull/5963
[#5958]: https://github.com/nymtech/nym/pull/5958
[#5955]: https://github.com/nymtech/nym/pull/5955
[#5953]: https://github.com/nymtech/nym/pull/5953
[#5944]: https://github.com/nymtech/nym/pull/5944
[#5943]: https://github.com/nymtech/nym/pull/5943
[#5942]: https://github.com/nymtech/nym/pull/5942
[#5941]: https://github.com/nymtech/nym/pull/5941
[#5938]: https://github.com/nymtech/nym/pull/5938
[#5934]: https://github.com/nymtech/nym/pull/5934
[#5915]: https://github.com/nymtech/nym/pull/5915
## [2025.15-gruyere] (2025-08-20)
- Migrate strum to 0.27.2 ([#5960])
- WG exit policy scripts update ([#5921])
- Make DNS Resolver fallback optional ([#5920])
- nym-node debug command to reset providers db ([#5914])
- basic zulip client for sending messages ([#5913])
- chore: allow compatibility with 'CDLA-Permissive-2.0' ([#5910])
- feat: ecash liveness check ([#5890])
- Remove old free credential handle ([#5864])
[#5960]: https://github.com/nymtech/nym/pull/5960
[#5921]: https://github.com/nymtech/nym/pull/5921
[#5920]: https://github.com/nymtech/nym/pull/5920
[#5914]: https://github.com/nymtech/nym/pull/5914
[#5913]: https://github.com/nymtech/nym/pull/5913
[#5910]: https://github.com/nymtech/nym/pull/5910
[#5890]: https://github.com/nymtech/nym/pull/5890
[#5864]: https://github.com/nymtech/nym/pull/5864
## [2025.14-feta] (2025-08-05)
- chore: nym node tokio console ([#5909])
- Feature/dkg snapshot epoch ([#5900])
- Feature/dkg epoch dealers query ([#5899])
- sqlx-pool-guard: allocate more memory on windows ([#5896])
- Support mnemonic in the NS agent ([#5883])
- Allow PG database backend ([#5880])
[#5909]: https://github.com/nymtech/nym/pull/5909
[#5900]: https://github.com/nymtech/nym/pull/5900
[#5899]: https://github.com/nymtech/nym/pull/5899
[#5896]: https://github.com/nymtech/nym/pull/5896
[#5883]: https://github.com/nymtech/nym/pull/5883
[#5880]: https://github.com/nymtech/nym/pull/5880
## [2025.13-emmental] (2025-07-22)
- fix: don't allow mixnode running in exit mode ([#5898])
Generated
+1748 -1493
View File
File diff suppressed because it is too large Load Diff
+32 -54
View File
@@ -31,7 +31,6 @@ members = [
"common/client-libs/mixnet-client",
"common/client-libs/validator-client",
"common/commands",
"common/nym-common",
"common/config",
"common/cosmwasm-smart-contracts/coconut-dkg",
"common/cosmwasm-smart-contracts/contracts-common",
@@ -40,11 +39,9 @@ members = [
"common/cosmwasm-smart-contracts/ecash-contract",
"common/cosmwasm-smart-contracts/group-contract",
"common/cosmwasm-smart-contracts/mixnet-contract",
"common/cosmwasm-smart-contracts/multisig-contract",
"common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/multisig-contract", "common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/nym-pool-contract",
"common/cosmwasm-smart-contracts/vesting-contract",
"common/credential-proxy",
"common/credential-storage",
"common/credential-utils",
"common/credential-verification",
@@ -52,15 +49,13 @@ members = [
"common/credentials-interface",
"common/crypto",
"common/dkg",
"common/ecash-signer-check",
"common/ecash-signer-check-types",
"common/ecash-time",
"common/execute",
"common/exit-policy",
"common/gateway-requests",
"common/gateway-stats-storage",
"common/gateway-storage",
"common/http-api-client",
"common/http-api-client-macro",
"common/http-api-common",
"common/inclusion-probability",
"common/ip-packet-requests",
@@ -69,8 +64,6 @@ members = [
"common/network-defaults",
"common/node-tester-utils",
"common/nonexhaustive-delayqueue",
"common/nym-cache",
"common/nym-connection-monitor",
"common/nym-id",
"common/nym-metrics",
"common/nym_offline_compact_ecash",
@@ -89,7 +82,6 @@ members = [
"common/nymsphinx/types",
"common/nyxd-scraper",
"common/pemstore",
"common/registration",
"common/serde-helpers",
"common/service-provider-requests-common",
"common/socks5-client-core",
@@ -98,33 +90,24 @@ members = [
"common/statistics",
"common/store-cipher",
"common/task",
"common/test-utils",
"common/ticketbooks-merkle",
"common/topology",
"common/tun",
"common/types",
"common/upgrade-mode-check",
"common/verloc",
"common/wasm/client-core",
"common/wasm/storage",
"common/wasm/utils",
"common/wireguard",
"common/wireguard-private-metadata/client",
"common/wireguard-private-metadata/server",
"common/wireguard-private-metadata/shared",
"common/wireguard-private-metadata/tests",
"common/wireguard-types",
"common/zulip-client",
"documentation/autodoc",
"gateway",
"nym-api",
"nym-api/nym-api-requests",
"nym-authenticator-client",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-ip-packet-client",
"nym-network-monitor",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
@@ -133,8 +116,6 @@ members = [
"nym-node/nym-node-metrics",
"nym-node/nym-node-requests",
"nym-outfox",
"nym-registration-client",
"nym-signers-monitor",
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
@@ -142,6 +123,7 @@ members = [
"sdk/ffi/go",
"sdk/ffi/shared",
"sdk/rust/nym-sdk",
"service-providers/authenticator",
"service-providers/common",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -150,9 +132,10 @@ members = [
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/mixnet-connectivity-check",
# "tools/internal/sdk-version-bump",
# "tools/internal/sdk-version-bump",
"tools/internal/ssl-inject",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/internal/validator-status-check",
"tools/nym-cli",
@@ -165,8 +148,7 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"nym-gateway-probe"
, "query-tester"]
]
default-members = [
"clients/native",
@@ -179,21 +161,22 @@ default-members = [
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"tools/nymvisor",
]
exclude = ["contracts", "nym-wallet", "cpu-cycles"]
exclude = ["explorer", "contracts", "nym-wallet", "cpu-cycles"]
[workspace.package]
authors = ["Nym Technologies SA"]
repository = "https://github.com/nymtech/nym"
homepage = "https://nymtech.net"
documentation = "https://nymtech.net"
edition = "2024"
edition = "2021"
license = "Apache-2.0"
rust-version = "1.85"
rust-version = "1.80"
readme = "README.md"
[workspace.dependencies]
@@ -215,6 +198,7 @@ base64 = "0.22.1"
base85rs = "0.1.3"
bincode = "1.3.3"
bip39 = { version = "2.0.0", features = ["zeroize"] }
bit-vec = "0.7.0" # can we unify those?
bitvec = "1.0.0"
blake3 = "1.7.0"
bloomfilter = "3.0.1"
@@ -233,8 +217,8 @@ clap_complete = "4.5"
clap_complete_fig = "4.5"
colored = "2.2"
comfy-table = "7.1.4"
console = "0.16.0"
console-subscriber = "0.4.1"
console = "0.15.11"
console-subscriber = "0.1.1"
console_error_panic_hook = "0.1"
const-str = "0.5.6"
const_format = "0.2.34"
@@ -242,11 +226,13 @@ criterion = "0.5"
csv = "1.3.1"
ctr = "0.9.1"
cupid = "0.6.1"
curve25519-dalek = "4.1"
dashmap = "5.5.3"
# We want https://github.com/DefGuard/wireguard-rs/pull/64 , but there's no crates.io release being pushed out anymore
defguard_wireguard_rs = { git = "https://github.com/DefGuard/wireguard-rs.git", rev = "v0.4.7" }
digest = "0.10.7"
dirs = "6.0"
dirs = "5.0"
doc-comment = "0.3"
dotenvy = "0.15.6"
dyn-clone = "1.0.19"
ecdsa = "0.16"
@@ -262,8 +248,11 @@ futures = "0.3.31"
futures-util = "0.3"
generic-array = "0.14.7"
getrandom = "0.2.10"
getset = "0.1.5"
handlebars = "3.5.5"
headers = "0.4.0"
hex = "0.4.3"
hex-literal = "0.3.3"
hickory-resolver = "0.25"
hkdf = "0.12.3"
hmac = "0.12.1"
@@ -275,22 +264,22 @@ humantime = "2.2.0"
humantime-serde = "1.1.1"
hyper = "1.6.0"
hyper-util = "0.1"
indicatif = "0.18.0"
indicatif = "0.17.11"
inquire = "0.6.2"
inventory = "0.3.21"
ip_network = "0.4.1"
ipnetwork = "0.20"
itertools = "0.14.0"
jwt-simple = { version = "0.12.12", default-features = false, features = ["pure-rust"] }
k256 = "0.13"
lazy_static = "1.5.0"
ledger-transport = "0.10.0"
ledger-transport-hid = "0.10.0"
log = "0.4"
maxminddb = "0.23.0"
mime = "0.3.17"
moka = { version = "0.12", features = ["future"] }
nix = "0.27.1"
notify = "5.1.0"
okapi = "0.7.0"
once_cell = "1.21.3"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
@@ -298,7 +287,7 @@ parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
pin-project = "1.1"
pnet_packet = "0.35.0"
pin-project-lite = "0.2.16"
publicsuffix = "2.3.0"
proc_pidinfo = "0.1.3"
quote = "1"
@@ -306,10 +295,13 @@ rand = "0.8.5"
rand_chacha = "0.3"
rand_core = "0.6.3"
rand_distr = "0.4"
rand_pcg = "0.3.1"
rand_seeder = "0.2.3"
rayon = "1.5.1"
regex = "1.10.6"
reqwest = { version = "0.12.15", default-features = false }
rs_merkle = "1.5.0"
safer-ffi = "0.1.13"
schemars = "0.8.22"
semver = "1.0.26"
serde = "1.0.219"
@@ -320,24 +312,22 @@ serde_json_path = "0.7.2"
serde_repr = "0.1"
serde_with = "3.9.0"
serde_yaml = "0.9.25"
serde_plain = "1.0.2"
sha2 = "0.10.9"
si-scale = "0.2.3"
snow = "0.9.6"
sphinx-packet = "=0.6.0"
sqlx = "0.8.6"
strum = "0.27.2"
strum_macros = "0.27.2"
strum = "0.26"
strum_macros = "0.26"
subtle-encoding = "0.5"
syn = "2"
sysinfo = "0.37.0"
syn = "1"
sysinfo = "0.33.0"
tap = "1.0.1"
tar = "0.4.44"
test-with = { version = "0.15.4", default-features = false }
tempfile = "3.20"
thiserror = "2.0"
time = "0.3.41"
tokio = "1.47"
tokio = "1.45"
tokio-postgres = "0.7"
tokio-stream = "0.1.17"
tokio-test = "0.4.4"
@@ -353,10 +343,8 @@ tracing-opentelemetry = "0.19.0"
tracing-subscriber = "0.3.19"
tracing-tree = "0.2.2"
tracing-indicatif = "0.3.9"
tracing-test = "0.2.5"
ts-rs = "10.1.0"
tungstenite = { version = "0.20.1", default-features = false }
typed-builder = "0.23.0"
uniffi = "0.29.2"
uniffi_build = "0.29.0"
url = "2.5"
@@ -365,7 +353,6 @@ utoipa-swagger-ui = "8.1"
utoipauto = "0.2"
uuid = "*"
vergen = { version = "=8.3.1", default-features = false }
vergen-gitcl = { version = "1.0.8", default-features = false }
walkdir = "2"
x25519-dalek = "2.0.0"
zeroize = "1.7.0"
@@ -405,19 +392,18 @@ prost = { version = "0.13", default-features = false }
# wasm-related dependencies
gloo-utils = "0.2.0"
gloo-net = "0.6.0"
gloo-timers = "0.3.0"
indexed_db_futures = "0.6.4"
js-sys = "0.3.76"
serde-wasm-bindgen = "0.6.5"
tsify = "0.4.5"
tokio_with_wasm = { version = "0.8.7" }
wasm-bindgen = "0.2.99"
wasm-bindgen-futures = "0.4.49"
wasm-bindgen-test = "0.3.49"
wasmtimer = "0.4.1"
web-sys = "0.3.76"
# for local development:
#[patch.crates-io]
#sphinx-packet = { path = "../sphinx" }
@@ -449,15 +435,7 @@ opt-level = 'z'
# lto = true
opt-level = 'z'
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
[workspace.lints.clippy]
suspicious = "deny"
complexity = "deny"
perf = "deny"
style = "deny"
unwrap_used = "deny"
expect_used = "deny"
todo = "deny"
+2 -11
View File
@@ -109,7 +109,7 @@ sdk-wasm-build:
$(MAKE) -C wasm/node-tester
$(MAKE) -C wasm/mix-fetch
$(MAKE) -C wasm/zknym-lib
# $(MAKE) -C wasm/full-nym-wasm
#$(MAKE) -C wasm/full-nym-wasm
# run this from npm/yarn to ensure tools are in the path, e.g. yarn build:sdk from root of repo
sdk-typescript-build:
@@ -140,8 +140,7 @@ clippy: sdk-wasm-lint
WASM_CONTRACT_DIR := contracts/target/wasm32-unknown-unknown/release
# Find every direct contract folder that contains a Cargo.toml
#CONTRACT_DIRS := $(shell find contracts -type f -name Cargo.toml \( ! -path "contracts/Cargo.toml" \) | grep -v integration-tests | xargs -n1 dirname | sort -u)
CONTRACT_DIRS := contracts/example-contract
CONTRACT_DIRS := $(shell find contracts -type f -name Cargo.toml \( ! -path "contracts/Cargo.toml" \) | grep -v integration-tests | xargs -n1 dirname | sort -u)
CONTRACTS_OUT_DIR = contracts/artifacts
@@ -155,7 +154,6 @@ CONTRACTS_OUT_DIR = contracts/artifacts
#
COSMWASM_OPTIMIZER_IMAGE ?= cosmwasm/optimizer:0.17.0
COSMWASM_OPTIMIZER_PLATFORM ?= linux/amd64
COSMWASM_CHECK_IMAGE ?= rust:1.88
# Ensure clean build environment and run the optimizer
optimize-contracts:
@@ -181,13 +179,6 @@ optimize-contracts:
# Cleanup temporary artefacts directory
@rm -rf artifacts 2>/dev/null || true
# Check artifacts with cosmwasm-check inside the optimizer image
docker-check-contracts:
@docker run --rm --platform $(COSMWASM_OPTIMIZER_PLATFORM) \
-v $(CURDIR):/code --workdir /code \
--entrypoint /bin/sh \
$(COSMWASM_CHECK_IMAGE) -lc 'apt-get update && apt-get install -y --no-install-recommends llvm-dev libclang-dev pkg-config && export PATH="/usr/local/cargo/bin:/usr/local/rustup/bin:$$PATH" && cargo install cosmwasm-check --locked && WASMER_ENGINE=universal WASMER_COMPILER=singlepass cosmwasm-check contracts/artifacts/*.wasm'
wasm-opt-contracts:
@for WASM in $(WASM_CONTRACT_DIR)/*.wasm; do \
echo "Running wasm-opt on $$WASM"; \
+2 -2
View File
@@ -1,10 +1,10 @@
[package]
name = "nym-client"
version = "1.1.65"
version = "1.1.59"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
rust-version = "1.85"
rust-version = "1.70"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+11 -16
View File
@@ -11,7 +11,7 @@ use nym_client_core::client::base_client::{
BaseClientBuilder, ClientInput, ClientOutput, ClientState,
};
use nym_sphinx::params::PacketType;
use nym_task::ShutdownManager;
use nym_task::TaskHandle;
use nym_validator_client::QueryHttpRpcNyxdClient;
use std::error::Error;
use std::path::PathBuf;
@@ -29,8 +29,6 @@ pub struct SocketClient {
/// Optional path to a .json file containing standalone network details.
custom_mixnet: Option<PathBuf>,
shutdown_manager: ShutdownManager,
}
impl SocketClient {
@@ -42,7 +40,6 @@ impl SocketClient {
SocketClient {
config,
custom_mixnet,
shutdown_manager: Default::default(),
}
}
@@ -52,7 +49,7 @@ impl SocketClient {
client_output: ClientOutput,
client_state: ClientState,
self_address: &Recipient,
shutdown_token: nym_task::ShutdownToken,
task_client: nym_task::TaskClient,
packet_type: PacketType,
) {
info!("Starting websocket listener...");
@@ -60,7 +57,6 @@ impl SocketClient {
let ClientInput {
connection_command_sender,
input_sender,
..
} = client_input;
let ClientOutput {
@@ -81,24 +77,24 @@ impl SocketClient {
shared_lane_queue_lengths,
reply_controller_sender,
Some(packet_type),
shutdown_token.clone(),
task_client.fork("websocket_handler"),
);
websocket::Listener::new(
config.socket.host,
config.socket.listening_port,
shutdown_token.child_token(),
task_client.with_suffix("websocket_listener"),
)
.start(websocket_handler);
}
/// blocking version of `start_socket` method. Will run forever (or until SIGINT is sent)
pub async fn run_socket_forever(self) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut shutdown = self.start_socket().await?;
let shutdown = self.start_socket().await?;
shutdown.run_until_shutdown().await;
let res = shutdown.wait_for_shutdown().await;
log::info!("Stopping nym-client");
Ok(())
res
}
async fn initialise_storage(&self) -> Result<OnDiskPersistent, ClientError> {
@@ -115,7 +111,7 @@ impl SocketClient {
let dkg_query_client = if self.config.base.client.disabled_credentials_mode {
None
} else {
Some(default_query_dkg_client_from_config(&self.config.base)?)
Some(default_query_dkg_client_from_config(&self.config.base))
};
let storage = self.initialise_storage().await?;
@@ -123,7 +119,6 @@ impl SocketClient {
let mut base_client =
BaseClientBuilder::new(self.config().base(), storage, dkg_query_client)
.with_shutdown(self.shutdown_manager.shutdown_tracker_owned())
.with_user_agent(user_agent);
if let Some(custom_mixnet) = &self.custom_mixnet {
@@ -133,7 +128,7 @@ impl SocketClient {
Ok(base_client)
}
pub async fn start_socket(self) -> Result<ShutdownManager, ClientError> {
pub async fn start_socket(self) -> Result<TaskHandle, ClientError> {
if !self.config.socket.socket_type.is_websocket() {
return Err(ClientError::InvalidSocketMode);
}
@@ -152,13 +147,13 @@ impl SocketClient {
client_output,
client_state,
&self_address,
self.shutdown_manager.child_shutdown_token(),
started_client.task_handle.get_handle(),
packet_type,
);
info!("Client startup finished!");
info!("The address of this client is: {self_address}");
Ok(self.shutdown_manager)
Ok(started_client.task_handle)
}
}
+27 -21
View File
@@ -19,7 +19,7 @@ use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::connections::{
ConnectionCommand, ConnectionCommandSender, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::Instant;
@@ -44,7 +44,7 @@ pub(crate) struct HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
shutdown_token: ShutdownToken,
task_client: TaskClient,
}
impl HandlerBuilder {
@@ -57,7 +57,7 @@ impl HandlerBuilder {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> Self {
Self {
msg_input,
@@ -67,13 +67,14 @@ impl HandlerBuilder {
lane_queue_lengths,
reply_controller_sender,
packet_type,
shutdown_token,
task_client,
}
}
// TODO: make sure we only ever have one active handler
pub fn create_active_handler(&self) -> Handler {
let shutdown_token = self.shutdown_token.clone();
let mut task_client = self.task_client.fork("active_handler");
task_client.disarm();
Handler {
msg_input: self.msg_input.clone(),
client_connection_tx: self.client_connection_tx.clone(),
@@ -84,7 +85,7 @@ impl HandlerBuilder {
lane_queue_lengths: self.lane_queue_lengths.clone(),
reply_controller_sender: self.reply_controller_sender.clone(),
packet_type: self.packet_type,
shutdown_token,
task_client,
}
}
}
@@ -99,14 +100,19 @@ pub(crate) struct Handler {
lane_queue_lengths: LaneQueueLengths,
reply_controller_sender: ReplyControllerSender,
packet_type: Option<PacketType>,
shutdown_token: ShutdownToken,
task_client: TaskClient,
}
impl Drop for Handler {
fn drop(&mut self) {
let _ = self
if let Err(err) = self
.buffer_requester
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect);
.unbounded_send(ReceivedBufferMessage::ReceiverDisconnect)
{
if !self.task_client.is_shutdown_poll() {
error!("failed to disconnect the receiver from the buffer: {err}");
}
}
}
}
@@ -136,7 +142,7 @@ impl Handler {
{
Ok(length) => length,
Err(err) => {
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!(
"Failed to get reply queue length for connection {connection_id}: {err}"
);
@@ -186,7 +192,7 @@ impl Handler {
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send message to the input buffer: {err}");
}
}
@@ -219,7 +225,7 @@ impl Handler {
let input_msg =
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send anonymous message to the input buffer: {err}");
}
}
@@ -247,7 +253,7 @@ impl Handler {
let input_msg = InputMessage::new_reply(recipient_tag, message, lane, self.packet_type);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send reply message to the input buffer: {err}");
}
}
@@ -269,7 +275,7 @@ impl Handler {
.client_connection_tx
.unbounded_send(ConnectionCommand::Close(connection_id))
{
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send close connection command: {err}");
}
}
@@ -388,14 +394,11 @@ impl Handler {
}
async fn listen_for_requests(&mut self, mut msg_receiver: ReconstructedMessagesReceiver) {
let shutdown_token = self.shutdown_token.clone();
let mut task_client = self.task_client.fork("select");
task_client.disarm();
loop {
while !task_client.is_shutdown() {
tokio::select! {
_ = shutdown_token.cancelled() => {
log::trace!("Websocket handler: Received shutdown");
break;
}
// we can either get a client request from the websocket
socket_msg = self.next_websocket_request() => {
if socket_msg.is_none() {
@@ -433,6 +436,9 @@ impl Handler {
break;
}
}
_ = task_client.recv() => {
log::trace!("Websocket handler: Received shutdown");
}
}
}
log::debug!("Websocket handler: Exiting");
@@ -458,7 +464,7 @@ impl Handler {
reconstructed_sender,
))
{
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!("failed to announce the receiver to the buffer: {err}");
}
}
+7 -7
View File
@@ -3,7 +3,7 @@
use super::handler::HandlerBuilder;
use log::*;
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use std::net::IpAddr;
use std::{net::SocketAddr, process, sync::Arc};
use tokio::io::AsyncWriteExt;
@@ -23,15 +23,15 @@ impl State {
pub(crate) struct Listener {
address: SocketAddr,
state: State,
shutdown_token: ShutdownToken,
task_client: TaskClient,
}
impl Listener {
pub(crate) fn new(host: IpAddr, port: u16, shutdown_token: ShutdownToken) -> Self {
pub(crate) fn new(host: IpAddr, port: u16, task_client: TaskClient) -> Self {
Listener {
address: SocketAddr::new(host, port),
state: State::AwaitingConnection,
shutdown_token,
task_client,
}
}
@@ -46,11 +46,11 @@ impl Listener {
let notify = Arc::new(Notify::new());
while !self.shutdown_token.is_cancelled() {
while !self.task_client.is_shutdown() {
tokio::select! {
// When the handler finishes we check if shutdown is signalled
_ = notify.notified() => {
if self.shutdown_token.is_cancelled() {
if self.task_client.is_shutdown() {
log::trace!("Websocket listener: detected shutdown after connection closed");
break;
}
@@ -59,7 +59,7 @@ impl Listener {
}
// ... but when there is no connected client at the time of shutdown being
// signalled, we handle it here.
_ = self.shutdown_token.cancelled() => {
_ = self.task_client.recv() => {
if !self.state.is_connected() {
log::trace!("Not connected: shutting down");
break;
+2 -2
View File
@@ -1,10 +1,10 @@
[package]
name = "nym-socks5-client"
version = "1.1.65"
version = "1.1.59"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
rust-version = "1.85"
rust-version = "1.70"
license.workspace = true
[dependencies]
+5 -5
View File
@@ -1,8 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::StreamExt;
use futures::channel::mpsc;
use futures::StreamExt;
use notify::event::{DataChange, MetadataKind, ModifyKind};
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
@@ -96,10 +96,10 @@ impl AsyncFileWatcher {
// when testing I was consistently getting two `Modify(Data(Any))` events in quick succession
// (probably to modify content and metadata).
// we really only want to propagate one of them
if let Some(previous) = self.last_received.get(&event.kind)
&& now.duration_since(*previous) < self.tick_duration
{
return false;
if let Some(previous) = self.last_received.get(&event.kind) {
if now.duration_since(*previous) < self.tick_duration {
return false;
}
}
let Some(filters) = &self.filters else {
-2
View File
@@ -13,8 +13,6 @@ base64 = { workspace = true }
bincode = { workspace = true }
rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
semver = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
nym-credentials-interface = { path = "../credentials-interface" }
@@ -1,273 +0,0 @@
// Copyright 2025 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::PeerPublicKey;
use crate::{
AuthenticatorVersion, Error,
latest::registration::IpPair,
traits::{FinalMessage, InitMessage, QueryBandwidthMessage, TopUpMessage, Versionable},
v2, v3, v4, v5,
};
// This is very redundant with AuthenticatorRequest and I reckon they could be smooshed.
// It is a bit out of scope for me at the moment though
#[derive(Debug)]
pub enum ClientMessage {
Initial(Box<dyn InitMessage + Send + Sync + 'static>),
Final(Box<dyn FinalMessage + Send + Sync + 'static>),
Query(Box<dyn QueryBandwidthMessage + Send + Sync + 'static>),
TopUp(Box<dyn TopUpMessage + Send + Sync + 'static>),
}
impl ClientMessage {
// check if message is wasteful e.g. contains a credential
pub fn is_wasteful(&self) -> bool {
match self {
Self::Final(msg) => msg.credential().is_some(),
Self::TopUp(_) => true,
Self::Initial(_) | Self::Query(_) => false,
}
}
fn version(&self) -> AuthenticatorVersion {
match self {
ClientMessage::Initial(msg) => msg.version(),
ClientMessage::Final(msg) => msg.version(),
ClientMessage::Query(msg) => msg.version(),
ClientMessage::TopUp(msg) => msg.version(),
}
}
pub fn bytes(&self, reply_to: Recipient) -> Result<(Vec<u8>, u64), Error> {
match self.version() {
AuthenticatorVersion::V1 => Err(Error::UnsupportedVersion),
AuthenticatorVersion::V2 => {
use v2::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
request::AuthenticatorRequest,
};
match self {
ClientMessage::Initial(init_message) => {
let (req, id) = AuthenticatorRequest::new_initial_request(
InitMessage {
pub_key: init_message.pub_key(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Final(final_message) => {
let (req, id) = AuthenticatorRequest::new_final_request(
FinalMessage {
gateway_client: GatewayClient {
pub_key: final_message.gateway_client_pub_key(),
private_ip: final_message
.gateway_client_ipv4()
.ok_or(Error::UnsupportedMessage)?
.into(),
mac: ClientMac::new(final_message.gateway_client_mac()),
},
credential: final_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Query(query_message) => {
let (req, id) = AuthenticatorRequest::new_query_request(
query_message.pub_key(),
reply_to,
);
Ok((req.to_bytes()?, id))
}
_ => Err(Error::UnsupportedMessage),
}
}
AuthenticatorVersion::V3 => {
use v3::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
request::AuthenticatorRequest,
topup::TopUpMessage,
};
match self {
ClientMessage::Initial(init_message) => {
let (req, id) = AuthenticatorRequest::new_initial_request(
InitMessage {
pub_key: init_message.pub_key(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Final(final_message) => {
let (req, id) = AuthenticatorRequest::new_final_request(
FinalMessage {
gateway_client: GatewayClient {
pub_key: final_message.gateway_client_pub_key(),
private_ip: final_message
.gateway_client_ipv4()
.ok_or(Error::UnsupportedMessage)?
.into(),
mac: ClientMac::new(final_message.gateway_client_mac()),
},
credential: final_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Query(query_message) => {
let (req, id) = AuthenticatorRequest::new_query_request(
query_message.pub_key(),
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::TopUp(top_up_message) => {
let (req, id) = AuthenticatorRequest::new_topup_request(
TopUpMessage {
pub_key: top_up_message.pub_key(),
credential: top_up_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
}
}
AuthenticatorVersion::V4 => {
use v4::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
request::AuthenticatorRequest,
topup::TopUpMessage,
};
match self {
ClientMessage::Initial(init_message) => {
let (req, id) = AuthenticatorRequest::new_initial_request(
InitMessage {
pub_key: init_message.pub_key(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Final(final_message) => {
let (req, id) = AuthenticatorRequest::new_final_request(
FinalMessage {
gateway_client: GatewayClient {
pub_key: final_message.gateway_client_pub_key(),
private_ips: IpPair {
ipv4: final_message
.gateway_client_ipv4()
.ok_or(Error::UnsupportedMessage)?,
ipv6: final_message
.gateway_client_ipv6()
.ok_or(Error::UnsupportedMessage)?,
}
.into(),
mac: ClientMac::new(final_message.gateway_client_mac()),
},
credential: final_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::Query(query_message) => {
let (req, id) = AuthenticatorRequest::new_query_request(
query_message.pub_key(),
reply_to,
);
Ok((req.to_bytes()?, id))
}
ClientMessage::TopUp(top_up_message) => {
let (req, id) = AuthenticatorRequest::new_topup_request(
TopUpMessage {
pub_key: top_up_message.pub_key(),
credential: top_up_message.credential(),
},
reply_to,
);
Ok((req.to_bytes()?, id))
}
}
}
AuthenticatorVersion::V5 => {
use v5::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
request::AuthenticatorRequest,
topup::TopUpMessage,
};
match self {
ClientMessage::Initial(init_message) => {
let (req, id) = AuthenticatorRequest::new_initial_request(InitMessage {
pub_key: init_message.pub_key(),
});
Ok((req.to_bytes()?, id))
}
ClientMessage::Final(final_message) => {
let (req, id) = AuthenticatorRequest::new_final_request(FinalMessage {
gateway_client: GatewayClient {
pub_key: final_message.gateway_client_pub_key(),
private_ips: IpPair {
ipv4: final_message
.gateway_client_ipv4()
.ok_or(Error::UnsupportedMessage)?,
ipv6: final_message
.gateway_client_ipv6()
.ok_or(Error::UnsupportedMessage)?,
},
mac: ClientMac::new(final_message.gateway_client_mac()),
},
credential: final_message.credential(),
});
Ok((req.to_bytes()?, id))
}
ClientMessage::Query(query_message) => {
let (req, id) =
AuthenticatorRequest::new_query_request(query_message.pub_key());
Ok((req.to_bytes()?, id))
}
ClientMessage::TopUp(top_up_message) => {
let (req, id) = AuthenticatorRequest::new_topup_request(TopUpMessage {
pub_key: top_up_message.pub_key(),
credential: top_up_message.credential(),
});
Ok((req.to_bytes()?, id))
}
}
}
AuthenticatorVersion::UNKNOWN => Err(Error::UnknownVersion),
}
}
pub fn use_surbs(&self) -> bool {
use AuthenticatorVersion::*;
match self.version() {
V1 | V2 | V3 | V4 => false,
V5 => true,
UNKNOWN => true,
}
}
}
// Same comment as above struct
#[derive(Debug)]
pub struct QueryMessageImpl {
pub pub_key: PeerPublicKey,
pub version: AuthenticatorVersion,
}
impl Versionable for QueryMessageImpl {
fn version(&self) -> AuthenticatorVersion {
self.version
}
}
impl QueryBandwidthMessage for QueryMessageImpl {
fn pub_key(&self) -> PeerPublicKey {
self.pub_key
}
}
+2 -13
View File
@@ -23,17 +23,6 @@ pub enum Error {
#[error("conversion: {0}")]
Conversion(String),
// TODO add version number for debugging
#[error("unknown version number")]
UnknownVersion,
// TODO add version number for debugging
#[error("unsupported request version")]
UnsupportedVersion,
#[error("gateway doesn't support this type of message")]
UnsupportedMessage,
#[error(transparent)]
Bincode(#[from] bincode::Error),
#[error("failed to serialize response packet: {source}")]
FailedToSerializeResponsePacket { source: Box<bincode::ErrorKind> },
}
+1 -6
View File
@@ -1,9 +1,6 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod client_message;
pub mod request;
pub mod response;
pub mod traits;
pub mod v1;
pub mod v2;
@@ -13,13 +10,11 @@ pub mod v5;
mod error;
mod util;
mod version;
pub use error::Error;
pub use v5 as latest;
pub use version::AuthenticatorVersion;
pub const CURRENT_VERSION: u8 = latest::VERSION;
pub const CURRENT_VERSION: u8 = 5;
fn make_bincode_serializer() -> impl bincode::Options {
use bincode::Options;
@@ -1,204 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use crate::traits::{FinalMessage, InitMessage, QueryBandwidthMessage, TopUpMessage};
use crate::{v1, v2, v3, v4, v5};
#[derive(Debug)]
pub enum AuthenticatorRequest {
Initial {
msg: Box<dyn InitMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
Final {
msg: Box<dyn FinalMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
QueryBandwidth {
msg: Box<dyn QueryBandwidthMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
TopUpBandwidth {
msg: Box<dyn TopUpMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
}
impl From<v1::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v1::request::AuthenticatorRequest) -> Self {
match value.data {
v1::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v1::request::AuthenticatorRequestData::Final(gateway_client) => Self::Final {
msg: Box::new(gateway_client),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v1::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl From<v2::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v2::request::AuthenticatorRequest) -> Self {
match value.data {
v2::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v2::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v2::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl From<v3::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v3::request::AuthenticatorRequest) -> Self {
match value.data {
v3::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v3::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v3::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
v3::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl From<v4::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v4::request::AuthenticatorRequest) -> Self {
match value.data {
v4::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v4::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v4::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
v4::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl From<v5::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v5::request::AuthenticatorRequest) -> Self {
match value.data {
v5::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
},
v5::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
},
v5::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
}
}
v5::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
}
}
}
}
}
@@ -1,106 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::traits::{
Id, PendingRegistrationResponse, RegisteredResponse, RemainingBandwidthResponse,
TopUpBandwidthResponse,
};
use crate::{v2, v3, v4, v5};
#[derive(Debug)]
pub enum AuthenticatorResponse {
PendingRegistration(Box<dyn PendingRegistrationResponse + Send + Sync + 'static>),
Registered(Box<dyn RegisteredResponse + Send + Sync + 'static>),
RemainingBandwidth(Box<dyn RemainingBandwidthResponse + Send + Sync + 'static>),
TopUpBandwidth(Box<dyn TopUpBandwidthResponse + Send + Sync + 'static>),
}
impl Id for AuthenticatorResponse {
fn id(&self) -> u64 {
match self {
AuthenticatorResponse::PendingRegistration(pending_registration_response) => {
pending_registration_response.id()
}
AuthenticatorResponse::Registered(registered_response) => registered_response.id(),
AuthenticatorResponse::RemainingBandwidth(remaining_bandwidth_response) => {
remaining_bandwidth_response.id()
}
AuthenticatorResponse::TopUpBandwidth(top_up_bandwidth_response) => {
top_up_bandwidth_response.id()
}
}
}
}
impl From<v2::response::AuthenticatorResponse> for AuthenticatorResponse {
fn from(value: v2::response::AuthenticatorResponse) -> Self {
match value.data {
v2::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Self::PendingRegistration(Box::new(pending_registration_response)),
v2::response::AuthenticatorResponseData::Registered(registered_response) => {
Self::Registered(Box::new(registered_response))
}
v2::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Self::RemainingBandwidth(Box::new(remaining_bandwidth_response)),
}
}
}
impl From<v3::response::AuthenticatorResponse> for AuthenticatorResponse {
fn from(value: v3::response::AuthenticatorResponse) -> Self {
match value.data {
v3::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Self::PendingRegistration(Box::new(pending_registration_response)),
v3::response::AuthenticatorResponseData::Registered(registered_response) => {
Self::Registered(Box::new(registered_response))
}
v3::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Self::RemainingBandwidth(Box::new(remaining_bandwidth_response)),
v3::response::AuthenticatorResponseData::TopUpBandwidth(top_up_bandwidth_response) => {
Self::TopUpBandwidth(Box::new(top_up_bandwidth_response))
}
}
}
}
impl From<v4::response::AuthenticatorResponse> for AuthenticatorResponse {
fn from(value: v4::response::AuthenticatorResponse) -> Self {
match value.data {
v4::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Self::PendingRegistration(Box::new(pending_registration_response)),
v4::response::AuthenticatorResponseData::Registered(registered_response) => {
Self::Registered(Box::new(registered_response))
}
v4::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Self::RemainingBandwidth(Box::new(remaining_bandwidth_response)),
v4::response::AuthenticatorResponseData::TopUpBandwidth(top_up_bandwidth_response) => {
Self::TopUpBandwidth(Box::new(top_up_bandwidth_response))
}
}
}
}
impl From<v5::response::AuthenticatorResponse> for AuthenticatorResponse {
fn from(value: v5::response::AuthenticatorResponse) -> Self {
match value.data {
v5::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Self::PendingRegistration(Box::new(pending_registration_response)),
v5::response::AuthenticatorResponseData::Registered(registered_response) => {
Self::Registered(Box::new(registered_response))
}
v5::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Self::RemainingBandwidth(Box::new(remaining_bandwidth_response)),
v5::response::AuthenticatorResponseData::TopUpBandwidth(top_up_bandwidth_response) => {
Self::TopUpBandwidth(Box::new(top_up_bandwidth_response))
}
}
}
}
+220 -437
View File
@@ -1,105 +1,49 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::fmt;
use std::net::{Ipv4Addr, Ipv6Addr};
use nym_credentials_interface::CredentialSpendingData;
use nym_crypto::asymmetric::x25519::PrivateKey;
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::clients::Recipient;
use nym_wireguard_types::PeerPublicKey;
use crate::latest::registration::IpPair;
use crate::{AuthenticatorVersion, Error, v1, v2, v3, v4, v5};
use crate::{
v1, v2, v3, v4,
v5::{self, registration::IpPair},
Error,
};
pub trait Versionable {
fn version(&self) -> AuthenticatorVersion;
#[derive(Copy, Clone, Debug)]
pub enum AuthenticatorVersion {
V1,
V2,
V3,
V4,
V5,
UNKNOWN,
}
impl Versionable for v1::GatewayClient {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V1
impl From<Protocol> for AuthenticatorVersion {
fn from(value: Protocol) -> Self {
if value.service_provider_type != ServiceProviderType::Authenticator {
AuthenticatorVersion::UNKNOWN
} else if value.version == v1::VERSION {
AuthenticatorVersion::V1
} else if value.version == v2::VERSION {
AuthenticatorVersion::V2
} else if value.version == v3::VERSION {
AuthenticatorVersion::V3
} else if value.version == v4::VERSION {
AuthenticatorVersion::V4
} else if value.version == v5::VERSION {
AuthenticatorVersion::V5
} else {
AuthenticatorVersion::UNKNOWN
}
}
}
impl Versionable for v1::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V1
}
}
impl Versionable for v2::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V2
}
}
impl Versionable for v3::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V3
}
}
impl Versionable for v4::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V4
}
}
impl Versionable for v5::registration::InitMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V5
}
}
impl Versionable for v2::registration::FinalMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V2
}
}
impl Versionable for v3::registration::FinalMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V3
}
}
impl Versionable for v4::registration::FinalMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V4
}
}
impl Versionable for v5::registration::FinalMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V5
}
}
impl Versionable for PeerPublicKey {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V3
}
}
impl Versionable for v3::topup::TopUpMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V3
}
}
impl Versionable for v4::topup::TopUpMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V4
}
}
impl Versionable for v5::topup::TopUpMessage {
fn version(&self) -> AuthenticatorVersion {
AuthenticatorVersion::V5
}
}
pub trait InitMessage: Versionable + fmt::Debug {
pub trait InitMessage {
fn pub_key(&self) -> PeerPublicKey;
}
@@ -133,18 +77,15 @@ impl InitMessage for v5::registration::InitMessage {
}
}
pub trait FinalMessage: Versionable + fmt::Debug {
fn gateway_client_pub_key(&self) -> PeerPublicKey;
pub trait FinalMessage {
fn pub_key(&self) -> PeerPublicKey;
fn verify(&self, private_key: &PrivateKey, nonce: u64) -> Result<(), Error>;
fn private_ips(&self) -> IpPair;
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr>;
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr>;
fn gateway_client_mac(&self) -> Vec<u8>;
fn credential(&self) -> Option<CredentialSpendingData>;
}
impl FinalMessage for v1::GatewayClient {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
fn pub_key(&self) -> PeerPublicKey {
self.pub_key
}
@@ -156,28 +97,13 @@ impl FinalMessage for v1::GatewayClient {
self.private_ip.into()
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
match self.private_ip {
std::net::IpAddr::V4(ipv4_addr) => Some(ipv4_addr),
std::net::IpAddr::V6(_) => None,
}
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
None
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
None
}
}
impl FinalMessage for v2::registration::FinalMessage {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
fn pub_key(&self) -> PeerPublicKey {
self.gateway_client.pub_key
}
@@ -189,28 +115,13 @@ impl FinalMessage for v2::registration::FinalMessage {
self.gateway_client.private_ip.into()
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
match self.gateway_client.private_ip {
std::net::IpAddr::V4(ipv4_addr) => Some(ipv4_addr),
std::net::IpAddr::V6(_) => None,
}
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
None
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.gateway_client.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
self.credential.clone()
}
}
impl FinalMessage for v3::registration::FinalMessage {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
fn pub_key(&self) -> PeerPublicKey {
self.gateway_client.pub_key
}
@@ -222,28 +133,13 @@ impl FinalMessage for v3::registration::FinalMessage {
self.gateway_client.private_ip.into()
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
match self.gateway_client.private_ip {
std::net::IpAddr::V4(ipv4_addr) => Some(ipv4_addr),
std::net::IpAddr::V6(_) => None,
}
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
None
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.gateway_client.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
self.credential.clone()
}
}
impl FinalMessage for v4::registration::FinalMessage {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
fn pub_key(&self) -> PeerPublicKey {
self.gateway_client.pub_key
}
@@ -255,25 +151,13 @@ impl FinalMessage for v4::registration::FinalMessage {
self.gateway_client.private_ips.into()
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
Some(self.gateway_client.private_ips.ipv4)
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
Some(self.gateway_client.private_ips.ipv6)
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.gateway_client.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
self.credential.clone()
}
}
impl FinalMessage for v5::registration::FinalMessage {
fn gateway_client_pub_key(&self) -> PeerPublicKey {
fn pub_key(&self) -> PeerPublicKey {
self.gateway_client.pub_key
}
@@ -285,24 +169,12 @@ impl FinalMessage for v5::registration::FinalMessage {
self.gateway_client.private_ips
}
fn gateway_client_ipv4(&self) -> Option<Ipv4Addr> {
Some(self.gateway_client.private_ips.ipv4)
}
fn gateway_client_ipv6(&self) -> Option<Ipv6Addr> {
Some(self.gateway_client.private_ips.ipv6)
}
fn gateway_client_mac(&self) -> Vec<u8> {
self.gateway_client.mac.to_vec()
}
fn credential(&self) -> Option<CredentialSpendingData> {
self.credential.clone()
}
}
pub trait QueryBandwidthMessage: Versionable + fmt::Debug {
pub trait QueryBandwidthMessage {
fn pub_key(&self) -> PeerPublicKey;
}
@@ -312,7 +184,7 @@ impl QueryBandwidthMessage for PeerPublicKey {
}
}
pub trait TopUpMessage: Versionable + fmt::Debug {
pub trait TopUpMessage {
fn pub_key(&self) -> PeerPublicKey;
fn credential(&self) -> CredentialSpendingData;
}
@@ -347,286 +219,197 @@ impl TopUpMessage for v5::topup::TopUpMessage {
}
}
pub trait Id {
fn id(&self) -> u64;
pub enum AuthenticatorRequest {
Initial {
msg: Box<dyn InitMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
Final {
msg: Box<dyn FinalMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
QueryBandwidth {
msg: Box<dyn QueryBandwidthMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
TopUpBandwidth {
msg: Box<dyn TopUpMessage + Send + Sync + 'static>,
protocol: Protocol,
reply_to: Option<Recipient>,
request_id: u64,
},
}
impl Id for v2::response::PendingRegistrationResponse {
fn id(&self) -> u64 {
self.request_id
impl From<v1::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v1::request::AuthenticatorRequest) -> Self {
match value.data {
v1::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v1::request::AuthenticatorRequestData::Final(gateway_client) => Self::Final {
msg: Box::new(gateway_client),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v1::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: Protocol {
version: value.version,
service_provider_type: ServiceProviderType::Authenticator,
},
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl Id for v3::response::PendingRegistrationResponse {
fn id(&self) -> u64 {
self.request_id
impl From<v2::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v2::request::AuthenticatorRequest) -> Self {
match value.data {
v2::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v2::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v2::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl Id for v4::response::PendingRegistrationResponse {
fn id(&self) -> u64 {
self.request_id
impl From<v3::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v3::request::AuthenticatorRequest) -> Self {
match value.data {
v3::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v3::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v3::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
v3::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl Id for v5::response::PendingRegistrationResponse {
fn id(&self) -> u64 {
self.request_id
impl From<v4::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v4::request::AuthenticatorRequest) -> Self {
match value.data {
v4::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v4::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
},
v4::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
v4::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: Some(value.reply_to),
request_id: value.request_id,
}
}
}
}
}
impl Id for v2::response::RegisteredResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v3::response::RegisteredResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v4::response::RegisteredResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v5::response::RegisteredResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v2::response::RemainingBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v3::response::RemainingBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v4::response::RemainingBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v5::response::RemainingBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v3::response::TopUpBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v4::response::TopUpBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
impl Id for v5::response::TopUpBandwidthResponse {
fn id(&self) -> u64 {
self.request_id
}
}
pub trait PendingRegistrationResponse: Id + fmt::Debug {
fn nonce(&self) -> u64;
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error>;
fn pub_key(&self) -> PeerPublicKey;
fn private_ips(&self) -> IpPair;
}
impl PendingRegistrationResponse for v2::response::PendingRegistrationResponse {
fn nonce(&self) -> u64 {
self.reply.nonce
}
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error> {
self.reply.gateway_data.verify(gateway_key, self.nonce())
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.gateway_data.pub_key
}
fn private_ips(&self) -> IpPair {
self.reply.gateway_data.private_ip.into()
}
}
impl PendingRegistrationResponse for v3::response::PendingRegistrationResponse {
fn nonce(&self) -> u64 {
self.reply.nonce
}
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error> {
self.reply.gateway_data.verify(gateway_key, self.nonce())
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.gateway_data.pub_key
}
fn private_ips(&self) -> IpPair {
self.reply.gateway_data.private_ip.into()
}
}
impl PendingRegistrationResponse for v4::response::PendingRegistrationResponse {
fn nonce(&self) -> u64 {
self.reply.nonce
}
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error> {
self.reply.gateway_data.verify(gateway_key, self.nonce())
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.gateway_data.pub_key
}
fn private_ips(&self) -> IpPair {
self.reply.gateway_data.private_ips.into()
}
}
impl PendingRegistrationResponse for v5::response::PendingRegistrationResponse {
fn nonce(&self) -> u64 {
self.reply.nonce
}
fn verify(&self, gateway_key: &PrivateKey) -> std::result::Result<(), Error> {
self.reply.gateway_data.verify(gateway_key, self.nonce())
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.gateway_data.pub_key
}
fn private_ips(&self) -> IpPair {
self.reply.gateway_data.private_ips
}
}
pub trait RegisteredResponse: Id + fmt::Debug {
fn private_ips(&self) -> IpPair;
fn pub_key(&self) -> PeerPublicKey;
fn wg_port(&self) -> u16;
}
impl RegisteredResponse for v2::response::RegisteredResponse {
fn private_ips(&self) -> IpPair {
self.reply.private_ip.into()
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.pub_key
}
fn wg_port(&self) -> u16 {
self.reply.wg_port
}
}
impl RegisteredResponse for v3::response::RegisteredResponse {
fn private_ips(&self) -> IpPair {
self.reply.private_ip.into()
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.pub_key
}
fn wg_port(&self) -> u16 {
self.reply.wg_port
}
}
impl RegisteredResponse for v4::response::RegisteredResponse {
fn private_ips(&self) -> IpPair {
self.reply.private_ips.into()
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.pub_key
}
fn wg_port(&self) -> u16 {
self.reply.wg_port
}
}
impl RegisteredResponse for v5::response::RegisteredResponse {
fn private_ips(&self) -> IpPair {
self.reply.private_ips
}
fn pub_key(&self) -> PeerPublicKey {
self.reply.pub_key
}
fn wg_port(&self) -> u16 {
self.reply.wg_port
}
}
pub trait RemainingBandwidthResponse: Id + fmt::Debug {
fn available_bandwidth(&self) -> Option<i64>;
}
impl RemainingBandwidthResponse for v2::response::RemainingBandwidthResponse {
fn available_bandwidth(&self) -> Option<i64> {
self.reply.as_ref().map(|r| r.available_bandwidth)
}
}
impl RemainingBandwidthResponse for v3::response::RemainingBandwidthResponse {
fn available_bandwidth(&self) -> Option<i64> {
self.reply.as_ref().map(|r| r.available_bandwidth)
}
}
impl RemainingBandwidthResponse for v4::response::RemainingBandwidthResponse {
fn available_bandwidth(&self) -> Option<i64> {
self.reply.as_ref().map(|r| r.available_bandwidth)
}
}
impl RemainingBandwidthResponse for v5::response::RemainingBandwidthResponse {
fn available_bandwidth(&self) -> Option<i64> {
self.reply.as_ref().map(|r| r.available_bandwidth)
}
}
pub trait TopUpBandwidthResponse: Id + fmt::Debug {
fn available_bandwidth(&self) -> i64;
}
impl TopUpBandwidthResponse for v3::response::TopUpBandwidthResponse {
fn available_bandwidth(&self) -> i64 {
self.reply.available_bandwidth
}
}
impl TopUpBandwidthResponse for v4::response::TopUpBandwidthResponse {
fn available_bandwidth(&self) -> i64 {
self.reply.available_bandwidth
}
}
impl TopUpBandwidthResponse for v5::response::TopUpBandwidthResponse {
fn available_bandwidth(&self) -> i64 {
self.reply.available_bandwidth
impl From<v5::request::AuthenticatorRequest> for AuthenticatorRequest {
fn from(value: v5::request::AuthenticatorRequest) -> Self {
match value.data {
v5::request::AuthenticatorRequestData::Initial(init_message) => Self::Initial {
msg: Box::new(init_message),
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
},
v5::request::AuthenticatorRequestData::Final(final_message) => Self::Final {
msg: final_message,
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
},
v5::request::AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
Self::QueryBandwidth {
msg: Box::new(peer_public_key),
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
}
}
v5::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
Self::TopUpBandwidth {
msg: top_up_message,
protocol: value.protocol,
reply_to: None,
request_id: value.request_id,
}
}
}
}
}
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{Engine, engine::general_purpose};
use base64::{engine::general_purpose, Engine};
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{Engine, engine::general_purpose};
use base64::{engine::general_purpose, Engine};
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{Engine, engine::general_purpose};
use base64::{engine::general_purpose, Engine};
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{Engine, engine::general_purpose};
use base64::{engine::general_purpose, Engine};
use nym_credentials_interface::CredentialSpendingData;
use nym_network_defaults::constants::{WG_TUN_DEVICE_IP_ADDRESS_V4, WG_TUN_DEVICE_IP_ADDRESS_V6};
use nym_wireguard_types::PeerPublicKey;
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{Engine, engine::general_purpose};
use base64::{engine::general_purpose, Engine};
use nym_credentials_interface::CredentialSpendingData;
use nym_network_defaults::constants::{WG_TUN_DEVICE_IP_ADDRESS_V4, WG_TUN_DEVICE_IP_ADDRESS_V6};
use nym_wireguard_types::PeerPublicKey;
@@ -1,195 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::{v1, v2, v3, v4, v5};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
#[derive(Copy, Clone, Debug, PartialEq, strum_macros::Display)]
#[strum(serialize_all = "snake_case")]
pub enum AuthenticatorVersion {
/// introduced in wispa release (1.1.5)
V1,
/// introduced in aero release (1.1.9)
V2,
/// introduced in magura release (1.1.10)
V3,
/// introduced in crunch release (1.2.0)
V4,
/// introduced in dorina-patched release (1.6.1)
V5,
UNKNOWN,
}
impl AuthenticatorVersion {
pub const LATEST: Self = Self::V5;
pub const fn release_version(&self) -> semver::Version {
match self {
AuthenticatorVersion::V1 => semver::Version::new(1, 1, 5),
AuthenticatorVersion::V2 => semver::Version::new(1, 1, 9),
AuthenticatorVersion::V3 => semver::Version::new(1, 1, 10),
AuthenticatorVersion::V4 => semver::Version::new(1, 2, 0),
AuthenticatorVersion::V5 => semver::Version::new(1, 6, 1),
AuthenticatorVersion::UNKNOWN => semver::Version::new(0, 0, 0),
}
}
}
impl From<Protocol> for AuthenticatorVersion {
fn from(value: Protocol) -> Self {
if value.service_provider_type != ServiceProviderType::Authenticator {
AuthenticatorVersion::UNKNOWN
} else if value.version == v1::VERSION {
AuthenticatorVersion::V1
} else if value.version == v2::VERSION {
AuthenticatorVersion::V2
} else if value.version == v3::VERSION {
AuthenticatorVersion::V3
} else if value.version == v4::VERSION {
AuthenticatorVersion::V4
} else if value.version == v5::VERSION {
AuthenticatorVersion::V5
} else {
AuthenticatorVersion::UNKNOWN
}
}
}
impl From<u8> for AuthenticatorVersion {
fn from(value: u8) -> Self {
if value == v1::VERSION {
AuthenticatorVersion::V1
} else if value == v2::VERSION {
AuthenticatorVersion::V2
} else if value == v3::VERSION {
AuthenticatorVersion::V3
} else if value == v4::VERSION {
AuthenticatorVersion::V4
} else if value == v5::VERSION {
AuthenticatorVersion::V5
} else {
AuthenticatorVersion::UNKNOWN
}
}
}
impl From<&str> for AuthenticatorVersion {
fn from(value: &str) -> Self {
let Ok(semver) = semver::Version::parse(value) else {
return Self::UNKNOWN;
};
semver.into()
}
}
impl From<Option<&String>> for AuthenticatorVersion {
fn from(value: Option<&String>) -> Self {
match value {
None => Self::UNKNOWN,
Some(value) => value.as_str().into(),
}
}
}
impl From<String> for AuthenticatorVersion {
fn from(value: String) -> Self {
Self::from(value.as_str())
}
}
impl From<Option<String>> for AuthenticatorVersion {
fn from(value: Option<String>) -> Self {
value.as_ref().into()
}
}
impl From<semver::Version> for AuthenticatorVersion {
fn from(semver: semver::Version) -> Self {
if semver < AuthenticatorVersion::V1.release_version() {
return Self::UNKNOWN;
}
if semver < AuthenticatorVersion::V2.release_version() {
return Self::V1;
}
if semver < AuthenticatorVersion::V3.release_version() {
return Self::V2;
}
if semver < AuthenticatorVersion::V4.release_version() {
return Self::V3;
}
if semver < AuthenticatorVersion::V5.release_version() {
return Self::V4;
}
// if provided version is higher (or equal) to release version of V5,
// we return the latest (i.e. v5)
debug_assert_eq!(
Self::V5,
Self::LATEST,
"a new AuthenticatorVersion variant has been introduced without adjusting the `From<semver::Version>` trait"
);
Self::LATEST
}
}
#[cfg(test)]
mod tests {
use super::super::latest;
use super::*;
#[test]
fn strum_display() {
// sanity check on formatting and casing
assert_eq!("v1", AuthenticatorVersion::V1.to_string());
assert_eq!("v2", AuthenticatorVersion::V2.to_string());
assert_eq!("unknown", AuthenticatorVersion::UNKNOWN.to_string());
}
#[test]
fn u8_conversion() {
assert_eq!(AuthenticatorVersion::V1, AuthenticatorVersion::from(1u8));
assert_eq!(AuthenticatorVersion::V2, AuthenticatorVersion::from(2u8));
assert_eq!(
AuthenticatorVersion::UNKNOWN,
AuthenticatorVersion::from(latest::VERSION + 1)
);
assert_eq!(
AuthenticatorVersion::UNKNOWN,
AuthenticatorVersion::from(0u8)
);
assert_eq!(
AuthenticatorVersion::UNKNOWN,
AuthenticatorVersion::from(255u8)
);
}
#[test]
fn semver_checks() {
assert_eq!(AuthenticatorVersion::UNKNOWN, "1.1.4".into());
assert_eq!(AuthenticatorVersion::UNKNOWN, "0.1.0".into());
assert_eq!(AuthenticatorVersion::UNKNOWN, "1.0.4".into());
assert_eq!(AuthenticatorVersion::V1, "1.1.5".into());
assert_eq!(AuthenticatorVersion::V1, "1.1.6".into());
assert_eq!(AuthenticatorVersion::V1, "1.1.8".into());
assert_eq!(AuthenticatorVersion::V2, "1.1.9".into());
assert_eq!(AuthenticatorVersion::V3, "1.1.10".into());
assert_eq!(AuthenticatorVersion::V3, "1.1.11".into());
assert_eq!(AuthenticatorVersion::V3, "1.1.60".into());
assert_eq!(AuthenticatorVersion::V4, "1.2.0".into());
assert_eq!(AuthenticatorVersion::V4, "1.2.1".into());
assert_eq!(AuthenticatorVersion::V4, "1.5.1".into());
assert_eq!(AuthenticatorVersion::V4, "1.6.0".into());
assert_eq!(AuthenticatorVersion::V5, "1.6.1".into());
assert_eq!(AuthenticatorVersion::V5, "1.6.11".into());
assert_eq!(AuthenticatorVersion::V5, "1.7.0".into());
assert_eq!(AuthenticatorVersion::V5, "1.16.11".into());
assert_eq!(AuthenticatorVersion::V5, "1.17.0".into());
}
}
-1
View File
@@ -7,7 +7,6 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = { workspace = true }
bip39 = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
-2
View File
@@ -23,12 +23,10 @@ use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
pub use event::BandwidthStatusMessage;
pub use traits::{BandwidthTicketProvider, DEFAULT_TICKETS_TO_SPEND};
pub mod acquire;
pub mod error;
mod event;
mod traits;
mod utils;
#[derive(Debug)]
-42
View File
@@ -1,42 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use nym_credential_storage::storage::Storage;
use nym_credentials_interface::TicketType;
use nym_crypto::asymmetric::ed25519;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use crate::{error::BandwidthControllerError, BandwidthController, PreparedCredential};
pub const DEFAULT_TICKETS_TO_SPEND: u32 = 1;
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait BandwidthTicketProvider: Send + Sync {
async fn get_ecash_ticket(
&self,
ticket_type: TicketType,
gateway_id: ed25519::PublicKey,
tickets_to_spend: u32,
) -> Result<PreparedCredential, BandwidthControllerError>;
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<C, St> BandwidthTicketProvider for BandwidthController<C, St>
where
C: DkgQueryClient + Sync + Send,
St: nym_credential_storage::storage::Storage,
<St as Storage>::StorageError: Send + Sync + 'static,
{
async fn get_ecash_ticket(
&self,
ticket_type: TicketType,
gateway_id: ed25519::PublicKey,
tickets_to_spend: u32,
) -> Result<PreparedCredential, BandwidthControllerError> {
self.prepare_ecash_ticket(ticket_type, gateway_id.to_bytes(), tickets_to_spend)
.await
}
}
+3 -3
View File
@@ -13,7 +13,7 @@ use nym_credentials_interface::{
};
use nym_ecash_time::Date;
use nym_validator_client::coconut::all_ecash_api_clients;
use nym_validator_client::nym_api::{EpochId, NymApiClientExt};
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use nym_validator_client::EcashApiClient;
use rand::prelude::SliceRandom;
@@ -207,7 +207,7 @@ where
<St as Storage>::StorageError: Send + Sync + 'static,
{
if let Some(stored) = storage
.get_expiration_date_signatures(expiration_date, epoch_id)
.get_expiration_date_signatures(expiration_date)
.await
.map_err(BandwidthControllerError::credential_storage_error)?
{
@@ -220,7 +220,7 @@ where
ecash_apis,
|api| async move {
api.api_client
.global_expiration_date_signatures(Some(expiration_date), Some(epoch_id))
.global_expiration_date_signatures(Some(expiration_date))
.await
},
format!("aggregated coin index signatures for date {expiration_date}"),
+2 -2
View File
@@ -1,8 +1,8 @@
use clap::Args;
use clap::builder::Command;
use clap::clap_derive::ValueEnum;
use clap_complete::Shell as ClapShell;
use clap::Args;
use clap_complete::generator::generate;
use clap_complete::Shell as ClapShell;
use std::io;
pub fn fig_generate(command: &mut Command, name: &str) {
+4 -12
View File
@@ -3,7 +3,7 @@ name = "nym-client-core"
version = "1.1.15"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
rust-version = "1.85"
rust-version = "1.76"
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -13,7 +13,6 @@ async-trait = { workspace = true }
base64 = { workspace = true }
bs58 = { workspace = true }
clap = { workspace = true, optional = true }
cfg-if = { workspace = true }
comfy-table = { workspace = true, optional = true }
futures = { workspace = true }
humantime = { workspace = true }
@@ -36,7 +35,7 @@ nym-bandwidth-controller = { path = "../bandwidth-controller" }
nym-crypto = { path = "../crypto" }
nym-gateway-client = { path = "../client-libs/gateway-client" }
nym-gateway-requests = { path = "../gateway-requests" }
nym-http-api-client = { path = "../http-api-client", features = ["network-defaults"] }
nym-http-api-client = { path = "../http-api-client" }
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
@@ -53,7 +52,6 @@ nym-client-core-config-types = { path = "./config-types", features = [
nym-client-core-surb-storage = { path = "./surb-storage" }
nym-client-core-gateways-storage = { path = "./gateways-storage" }
nym-ecash-time = { path = "../ecash-time" }
nym-mixnet-contract-common = { path = "../cosmwasm-smart-contracts/mixnet-contract" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
nym-mixnet-client = { path = "../client-libs/mixnet-client", default-features = false }
@@ -69,6 +67,7 @@ workspace = true
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.hyper-util]
workspace = true
features = ["tokio"]
###
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio-stream]
workspace = true
@@ -102,7 +101,7 @@ workspace = true
features = ["tokio"]
[target."cfg(target_arch = \"wasm32\")".dependencies.gloo-timers]
workspace = true
version = "0.3.0"
features = ["futures"]
[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-utils]
@@ -113,10 +112,6 @@ features = ["websocket"]
workspace = true
features = ["wasm-bindgen"]
[target."cfg(target_arch = \"wasm32\")".dependencies.tokio_with_wasm]
workspace = true
features = ["full"]
[dev-dependencies]
tempfile = { workspace = true }
@@ -128,6 +123,3 @@ fs-surb-storage = ["nym-client-core-surb-storage/fs-surb-storage"]
fs-gateways-storage = ["nym-client-core-gateways-storage/fs-gateways-storage"]
wasm = ["nym-gateway-client/wasm"]
metrics-server = []
[lints]
workspace = true
@@ -707,10 +707,13 @@ pub struct DebugConfig {
/// Defines all configuration options related to reply SURBs.
pub reply_surbs: ReplySurbs,
/// Defines all configuration options related to stats reporting.
pub stats_reporting: StatsReporting,
/// Defines all configuration options related to the forget me flag.
pub forget_me: ForgetMe,
/// Defines all configuration options related to the remember me flag.
pub remember_me: RememberMe,
}
@@ -543,8 +543,10 @@ pub struct DebugConfigV6 {
/// Defines all configuration options related to reply SURBs.
pub reply_surbs: ReplySurbsV6,
/// Defines all configuration options related to stats reporting.
pub stats_reporting: StatsReportingV6,
/// Defines all configuration options related to the forget me flag.
pub forget_me: ForgetMeV6,
@@ -3,7 +3,6 @@ name = "nym-client-core-gateways-storage"
version = "0.1.0"
edition = "2021"
license.workspace = true
rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -27,7 +26,6 @@ features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"]
optional = true
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
+4 -13
View File
@@ -2,30 +2,23 @@
// SPDX-License-Identifier: Apache-2.0
#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn main() {
#[cfg(feature = "fs-gateways-storage")]
{
use anyhow::Context;
use sqlx::{Connection, SqliteConnection};
use std::env;
let out_dir = env::var("OUT_DIR")?;
let out_dir = env::var("OUT_DIR").unwrap();
let database_path = format!("{out_dir}/gateways-storage-example.sqlite");
// remove the db file if it already existed from previous build
// in case it was from a different branch
if std::fs::exists(&database_path)? {
std::fs::remove_file(&database_path)?;
}
let mut conn = SqliteConnection::connect(&format!("sqlite://{database_path}?mode=rwc"))
.await
.context("Failed to create SQLx database connection")?;
.expect("Failed to create SQLx database connection");
sqlx::migrate!("./fs_gateways_migrations")
.run(&mut conn)
.await
.context("Failed to perform SQLx migrations")?;
.expect("Failed to perform SQLx migrations");
#[cfg(target_family = "unix")]
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);
@@ -35,6 +28,4 @@ async fn main() -> anyhow::Result<()> {
// not a valid windows path... but hey, it works...
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
}
Ok(())
}
@@ -31,6 +31,7 @@ impl StorageManager {
}
})?;
}
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
@@ -114,12 +114,13 @@ where
})?;
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::gateways_for_init(
&mut rng,
&core.client.nym_api_urls,
user_agent,
core.debug.topology.minimum_gateway_performance,
core.debug.topology.ignore_ingress_epoch_role,
None,
)
.await?
};
@@ -58,7 +58,6 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.signatures_path.unwrap())?
}
};
@@ -64,7 +64,6 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.credential_path.unwrap())?
}
};
@@ -58,7 +58,6 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.signatures_path.unwrap())?
}
};
@@ -58,7 +58,6 @@ where
Some(data) => data,
None => {
// SAFETY: one of those arguments must have been set
#[allow(clippy::unwrap_used)]
fs::read(common_args.key_path.unwrap())?
}
};
@@ -173,12 +173,13 @@ where
})?;
hardcoded_topology.entry_capable_nodes().cloned().collect()
} else {
let mut rng = rand::thread_rng();
crate::init::helpers::gateways_for_init(
&mut rng,
&core.client.nym_api_urls,
user_agent,
core.debug.topology.minimum_gateway_performance,
core.debug.topology.ignore_ingress_epoch_role,
None,
)
.await?
};
+98 -360
View File
@@ -7,12 +7,11 @@ use super::statistics_control::StatisticsControl;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
use crate::client::event_control::EventControl;
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use crate::client::real_messages_control;
use crate::client::real_messages_control::RealMessagesController;
use crate::client::received_buffer::{
@@ -28,13 +27,13 @@ use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
use crate::client::topology_control::{
TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
};
use crate::config;
use crate::config::{Config, DebugConfig};
use crate::error::ClientCoreError;
use crate::init::{
setup_gateway,
types::{GatewaySetup, InitialisationResult},
};
use crate::{config, spawn_future};
use futures::channel::mpsc;
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_config_types::{ForgetMe, RememberMe};
@@ -49,15 +48,16 @@ use nym_gateway_client::{
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
use nym_statistics_common::clients::ClientStatsSender;
use nym_statistics_common::generate_client_stats_id;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_task::ShutdownTracker;
use nym_task::{TaskClient, TaskHandle};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, NymApiClient, UserAgent};
use rand::prelude::SliceRandom;
use rand::rngs::OsRng;
use rand::thread_rng;
@@ -67,16 +67,9 @@ use std::path::Path;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use tracing::*;
use url::Url;
#[cfg(target_arch = "wasm32")]
#[cfg(debug_assertions)]
use wasm_utils::console_log;
/// Default number of retries for Nym API requests when using network details with domain fronting.
/// This allows the client to try alternative URLs if the primary endpoint is unavailable.
const DEFAULT_NYM_API_RETRIES: usize = 3;
#[cfg(all(
not(target_arch = "wasm32"),
feature = "fs-surb-storage",
@@ -87,28 +80,10 @@ pub mod non_wasm_helpers;
pub mod helpers;
pub mod storage;
#[derive(Clone, Copy, Debug)]
pub enum MixnetClientEvent {
Traffic(MixTrafficEvent),
}
pub type EventReceiver = mpsc::UnboundedReceiver<MixnetClientEvent>;
#[derive(Clone)]
pub struct EventSender(pub mpsc::UnboundedSender<MixnetClientEvent>);
impl EventSender {
pub fn send(&self, event: MixnetClientEvent) {
if let Err(err) = self.0.unbounded_send(event) {
tracing::warn!("Failed to send error event. The caller event reader was closed: {err}");
}
}
}
#[derive(Clone)]
pub struct ClientInput {
pub connection_command_sender: ConnectionCommandSender,
pub input_sender: InputMessageSender,
pub client_request_sender: ClientRequestSender,
}
impl ClientInput {
@@ -120,6 +95,7 @@ impl ClientInput {
}
}
#[derive(Clone)]
pub struct ClientOutput {
pub received_buffer_request_sender: ReceivedBufferRequestSender,
}
@@ -159,11 +135,9 @@ pub enum ClientInputStatus {
}
impl ClientInputStatus {
#[allow(clippy::panic)]
pub fn register_producer(&mut self) -> ClientInput {
match std::mem::replace(self, ClientInputStatus::Connected) {
ClientInputStatus::AwaitingProducer { client_input } => client_input,
// critical failure implying misuse of software
ClientInputStatus::Connected => panic!("producer was already registered before"),
}
}
@@ -175,11 +149,9 @@ pub enum ClientOutputStatus {
}
impl ClientOutputStatus {
#[allow(clippy::panic)]
pub fn register_consumer(&mut self) -> ClientOutput {
match std::mem::replace(self, ClientOutputStatus::Connected) {
ClientOutputStatus::AwaitingConsumer { client_output } => client_output,
// critical failure implying misuse of software
ClientOutputStatus::Connected => panic!("consumer was already registered before"),
}
}
@@ -216,14 +188,10 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
client_store: S,
dkg_query_client: Option<C>,
// Optional API URLs for domain fronting support
nym_api_urls: Option<Vec<nym_network_defaults::ApiUrl>>,
wait_for_gateway: bool,
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
shutdown: Option<ShutdownTracker>,
event_tx: Option<EventSender>,
shutdown: Option<TaskClient>,
user_agent: Option<UserAgent>,
setup_method: GatewaySetup,
@@ -248,12 +216,10 @@ where
config: base_config,
client_store,
dkg_query_client,
nym_api_urls: None,
wait_for_gateway: false,
custom_topology_provider: None,
custom_gateway_transceiver: None,
shutdown: None,
event_tx: None,
user_agent: None,
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
@@ -271,16 +237,6 @@ where
self
}
/// Set Nym API URLs for domain fronting support.
///
/// When provided, the client will use these API URLs (which include front_hosts)
/// to construct HTTP clients with domain fronting enabled.
#[must_use]
pub fn with_nym_api_urls(mut self, nym_api_urls: Vec<nym_network_defaults::ApiUrl>) -> Self {
self.nym_api_urls = Some(nym_api_urls);
self
}
#[must_use]
pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
self.config.debug.forget_me = *forget_me;
@@ -321,17 +277,11 @@ where
}
#[must_use]
pub fn with_shutdown(mut self, shutdown: ShutdownTracker) -> Self {
pub fn with_shutdown(mut self, shutdown: TaskClient) -> Self {
self.shutdown = Some(shutdown);
self
}
#[must_use]
pub fn with_event_tx(mut self, event_tx: EventSender) -> Self {
self.event_tx = Some(event_tx);
self
}
#[must_use]
pub fn with_user_agent(mut self, user_agent: UserAgent) -> Self {
self.user_agent = Some(user_agent);
@@ -362,18 +312,6 @@ where
details.client_address()
}
fn start_event_control(
parent_event_tx: Option<EventSender>,
children_event_rx: EventReceiver,
shutdown_tracker: &ShutdownTracker,
) {
let event_control = EventControl::new(parent_event_tx, children_event_rx);
shutdown_tracker.try_spawn_named_with_shutdown(
async move { event_control.run().await },
"EventControl",
);
}
// future constantly pumping loop cover traffic at some specified average rate
// the pumped traffic goes to the MixTrafficController
fn start_cover_traffic_stream(
@@ -383,11 +321,11 @@ where
topology_accessor: TopologyAccessor,
mix_tx: BatchMixMessageSender,
stats_tx: ClientStatsSender,
shutdown_tracker: &ShutdownTracker,
task_client: TaskClient,
) {
tracing::info!("Starting loop cover traffic stream...");
info!("Starting loop cover traffic stream...");
let mut stream = LoopCoverTrafficStream::new(
let stream = LoopCoverTrafficStream::new(
ack_key,
debug_config.acknowledgements.average_ack_delay,
mix_tx,
@@ -396,9 +334,10 @@ where
debug_config.traffic,
debug_config.cover_traffic,
stats_tx,
task_client,
);
shutdown_tracker
.try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream");
stream.start();
}
#[allow(clippy::too_many_arguments)]
@@ -414,12 +353,13 @@ where
reply_controller_receiver: ReplyControllerReceiver,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
task_client: TaskClient,
packet_type: PacketType,
stats_tx: ClientStatsSender,
shutdown_tracker: &ShutdownTracker,
) {
tracing::info!("Starting real traffic stream...");
info!("Starting real traffic stream...");
let real_messages_controller = RealMessagesController::new(
RealMessagesController::new(
controller_config,
key_rotation_config,
ack_receiver,
@@ -432,63 +372,9 @@ where
lane_queue_lengths,
client_connection_rx,
stats_tx,
shutdown_tracker.clone_shutdown_token(),
);
// break out all the subtasks
let (mut out_queue_control, mut reply_controller, ack_controller) =
real_messages_controller.into_tasks();
let (
mut ack_listener,
mut input_listener,
mut retransmission_listener,
mut sent_notification_listener,
mut ack_action_controller,
) = ack_controller.into_tasks();
shutdown_tracker.try_spawn_named(
async move { out_queue_control.run().await },
"RealMessagesController::OutQueueControl",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { reply_controller.run(shutdown_token).await },
"RealMessagesController::ReplyController",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { ack_listener.run(shutdown_token).await },
"AcknowledgementController::AcknowledgementListener",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { input_listener.run(shutdown_token).await },
"AcknowledgementController::InputMessageListener",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { retransmission_listener.run(shutdown_token).await },
"AcknowledgementController::RetransmissionRequestListener",
);
shutdown_tracker.try_spawn_named_with_shutdown(
async move {
sent_notification_listener.run().await;
},
"AcknowledgementController::SentNotificationListener",
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move { ack_action_controller.run(shutdown_token).await },
"AcknowledgementController::ActionController",
);
// .start(packet_type);
task_client,
)
.start(packet_type);
}
// buffer controlling all messages fetched from provider
@@ -499,29 +385,21 @@ where
mixnet_receiver: MixnetMessageReceiver,
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
shutdown: TaskClient,
metrics_reporter: ClientStatsSender,
shutdown_tracker: &ShutdownTracker,
) {
tracing::info!("Starting received messages buffer controller...");
let controller = ReceivedMessagesBufferController::<SphinxMessageReceiver>::new(
local_encryption_keypair,
query_receiver,
mixnet_receiver,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
shutdown_tracker.clone_shutdown_token(),
);
let (mut msg_receiver, mut req_receiver) = controller.into_tasks();
shutdown_tracker.try_spawn_named(
async move { msg_receiver.run().await },
"ReceivedMessagesBufferController::FragmentedMessageReceiver",
);
shutdown_tracker.try_spawn_named(
async move { req_receiver.run().await },
"ReceivedMessagesBufferController::RequestReceiver",
);
info!("Starting received messages buffer controller...");
let controller: ReceivedMessagesBufferController<SphinxMessageReceiver> =
ReceivedMessagesBufferController::new(
local_encryption_keypair,
query_receiver,
mixnet_receiver,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
shutdown,
);
controller.start()
}
#[allow(clippy::too_many_arguments)]
@@ -533,7 +411,7 @@ where
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
shutdown_tracker: &ShutdownTracker,
shutdown: TaskClient,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
<S::KeyStore as KeyStore>::StorageError: Send + Sync + 'static,
@@ -552,7 +430,7 @@ where
packet_router,
bandwidth_controller,
stats_reporter,
shutdown_tracker.clone_shutdown_token(),
shutdown,
)
} else {
let cfg = GatewayConfig::new(
@@ -577,7 +455,7 @@ where
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown_tracker.clone_shutdown_token(),
shutdown,
)
};
@@ -613,7 +491,7 @@ where
details_store
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
.await.map_err(|err| {
tracing::error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
})?
}
@@ -640,7 +518,7 @@ where
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
shutdown_tracker: &ShutdownTracker,
mut shutdown: TaskClient,
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
where
<S::KeyStore as KeyStore>::StorageError: Send + Sync + 'static,
@@ -657,6 +535,7 @@ where
Err(ClientCoreError::CustomGatewaySelectionExpected)
} else {
// and make sure to invalidate the task client, so we wouldn't cause premature shutdown
shutdown.disarm();
custom_gateway_transceiver.set_packet_router(packet_router)?;
Ok(custom_gateway_transceiver)
};
@@ -672,7 +551,7 @@ where
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
shutdown_tracker,
shutdown,
)
.await?;
@@ -683,7 +562,7 @@ where
custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
config_topology: config::Topology,
nym_api_urls: Vec<Url>,
nym_api_client: nym_http_api_client::Client,
nym_api_client: NymApiClient,
) -> Box<dyn TopologyProvider + Send + Sync> {
// if no custom provider was ... provided ..., create one using nym-api
custom_provider.unwrap_or_else(|| {
@@ -703,24 +582,26 @@ where
topology_accessor: TopologyAccessor,
local_gateway: NodeIdentity,
wait_for_gateway: bool,
shutdown_tracker: &ShutdownTracker,
mut task_client: TaskClient,
) -> Result<(), ClientCoreError> {
let topology_refresher_config =
TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
if topology_config.disable_refreshing {
// if we're not spawning the refresher, don't cause shutdown immediately
tracing::info!("The background topology refresher is not going to be started");
info!("The background topology refesher is not going to be started");
task_client.disarm();
}
let mut topology_refresher = TopologyRefresher::new(
topology_refresher_config,
topology_accessor,
topology_provider,
task_client,
);
// before returning, block entire runtime to refresh the current network view so that any
// components depending on topology would see a non-empty view
tracing::info!("Obtaining initial network topology");
info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
@@ -746,13 +627,13 @@ where
.wait_for_gateway(local_gateway, waiting_timeout)
.await
{
tracing::error!(
error!(
"the gateway did not come back online within the specified timeout: {err}"
);
return Err(err.into());
}
} else {
tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
return Err(err.into());
}
}
@@ -760,11 +641,8 @@ where
if !topology_config.disable_refreshing {
// don't spawn the refresher if we don't want to be refreshing the topology.
// only use the initial values obtained
tracing::info!("Starting topology refresher...");
shutdown_tracker.try_spawn_named_with_shutdown(
async move { topology_refresher.run().await },
"TopologyRefresher",
);
info!("Starting topology refresher...");
topology_refresher.start();
}
Ok(())
@@ -775,9 +653,9 @@ where
user_agent: Option<UserAgent>,
client_stats_id: String,
input_sender: Sender<InputMessage>,
shutdown_tracker: &ShutdownTracker,
task_client: TaskClient,
) -> ClientStatsSender {
tracing::info!("Starting statistics control...");
info!("Starting statistics control...");
StatisticsControl::create_and_start(
config.debug.stats_reporting,
user_agent
@@ -785,30 +663,18 @@ where
.unwrap_or("unknown".to_string()),
client_stats_id,
input_sender.clone(),
shutdown_tracker,
task_client,
)
}
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown_tracker: &ShutdownTracker,
event_tx: EventSender,
shutdown: TaskClient,
) -> (BatchMixMessageSender, ClientRequestSender) {
tracing::info!("Starting mix traffic controller...");
let mut mix_traffic_controller = MixTrafficController::new(
gateway_transceiver,
shutdown_tracker.clone_shutdown_token(),
event_tx,
);
let mix_tx = mix_traffic_controller.mix_tx();
let client_tx = mix_traffic_controller.client_tx();
shutdown_tracker.try_spawn_named(
async move { mix_traffic_controller.run().await },
"MixTrafficController",
);
info!("Starting mix traffic controller...");
let (mix_traffic_controller, mix_tx, client_tx) =
MixTrafficController::new(gateway_transceiver, shutdown);
mix_traffic_controller.start();
(mix_tx, client_tx)
}
@@ -816,7 +682,7 @@ where
async fn setup_persistent_reply_storage(
backend: S::ReplyStore,
key_rotation_config: KeyRotationConfig,
shutdown_tracker: &ShutdownTracker,
shutdown: TaskClient,
) -> Result<CombinedReplyStorage, ClientCoreError>
where
<S::ReplyStore as ReplyStorageBackend>::StorageError: Sync + Send,
@@ -841,15 +707,11 @@ where
})?;
let store_clone = mem_store.clone();
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move {
persistent_storage
.flush_on_shutdown(store_clone, shutdown_token)
.await
},
"PersistentReplyStorage::flush_on_shutdown",
);
spawn_future(async move {
persistent_storage
.flush_on_shutdown(store_clone, shutdown)
.await
});
Ok(mem_store)
}
@@ -866,11 +728,11 @@ where
{
// if client keys do not exist already, create and persist them
if key_store.load_keys().await.is_err() {
tracing::info!("could not find valid client keys - a new set will be generated");
info!("could not find valid client keys - a new set will be generated");
let mut rng = OsRng;
let keys = if let Some(derivation_material) = derivation_material {
ClientKeys::from_master_key(&mut rng, &derivation_material)
.map_err(|_| ClientCoreError::HkdfDerivationError)?
.map_err(|_| ClientCoreError::HkdfDerivationError {})?
} else {
ClientKeys::generate_new(&mut rng)
};
@@ -880,75 +742,21 @@ where
setup_gateway(setup_method, key_store, details_store).await
}
fn construct_nym_api_client(
nym_api_urls: Option<&Vec<nym_network_defaults::ApiUrl>>,
config: &Config,
user_agent: Option<UserAgent>,
) -> Result<nym_http_api_client::Client, ClientCoreError> {
tracing::debug!(
"construct_nym_api_client called with nym_api_urls: {}",
nym_api_urls.is_some()
);
// If API URLs are provided, use new_with_fronted_urls() which handles domain fronting
if let Some(nym_api_urls) = nym_api_urls {
if nym_api_urls.is_empty() {
tracing::warn!("Provided nym_api_urls is empty, falling back to config endpoints");
} else {
tracing::info!(
"Building nym-api client from provided URLs (with domain fronting support): {} URLs",
nym_api_urls.len()
);
let mut builder =
nym_http_api_client::ClientBuilder::new_with_fronted_urls(nym_api_urls.clone())
.map_err(ClientCoreError::from)?
.with_retries(DEFAULT_NYM_API_RETRIES);
if let Some(user_agent) = user_agent {
builder = builder.with_user_agent(user_agent);
}
return builder.build().map_err(ClientCoreError::from);
}
}
// Fallback to basic client for backwards compatibility
tracing::debug!("Building basic nym-api HTTP client from config endpoints");
fn construct_nym_api_client(config: &Config, user_agent: Option<UserAgent>) -> NymApiClient {
let mut nym_api_urls = config.get_nym_api_endpoints();
if nym_api_urls.is_empty() {
tracing::warn!("No API endpoints configured in config, this may cause issues");
}
nym_api_urls.shuffle(&mut thread_rng());
// Convert config URLs to ApiUrl format for consistency
let api_urls: Vec<nym_network_defaults::ApiUrl> = nym_api_urls
.into_iter()
.map(|url| nym_network_defaults::ApiUrl {
url: url.to_string(),
front_hosts: None,
})
.collect();
tracing::debug!("Using {} config API endpoints", api_urls.len());
let mut builder = nym_http_api_client::ClientBuilder::new_with_fronted_urls(api_urls)
.map_err(ClientCoreError::from)?
.with_retries(DEFAULT_NYM_API_RETRIES)
.with_bincode();
if let Some(user_agent) = user_agent {
builder = builder.with_user_agent(user_agent);
NymApiClient::new_with_user_agent(nym_api_urls[0].clone(), user_agent)
} else {
NymApiClient::new(nym_api_urls[0].clone())
}
builder.build().map_err(ClientCoreError::from)
}
async fn determine_key_rotation_state(
client: &nym_http_api_client::Client,
client: &NymApiClient,
) -> Result<KeyRotationConfig, ClientCoreError> {
Ok(client.get_key_rotation_info().await?.into())
Ok(client.nym_api.get_key_rotation_info().await?.into())
}
pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
@@ -959,12 +767,7 @@ where
<S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
<S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
{
tracing::info!("Starting nym client");
#[cfg(debug_assertions)]
#[cfg(target_arch = "wasm32")]
{
console_log!("Starting base Nym Client");
}
info!("Starting nym client");
// derive (or load) client keys and gateway configuration
let init_res = Self::initialise_keys_and_gateway(
@@ -993,22 +796,17 @@ where
// channels responsible for controlling real messages
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
// channels responsible for event management
let (event_sender, event_receiver) = mpsc::unbounded();
// channels responsible for controlling ack messages
let (ack_sender, ack_receiver) = mpsc::unbounded();
let shared_topology_accessor =
TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
// Create a shutdown tracker for this client - either as a child of provided tracker
// or get one from the registry
let shutdown_tracker = match self.shutdown {
Some(parent_tracker) => parent_tracker.clone(),
None => nym_task::create_sdk_shutdown_tracker()?,
};
Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
// Shutdown notifier for signalling tasks to stop
let shutdown = self
.shutdown
.map(Into::<TaskHandle>::into)
.unwrap_or_default()
.name_if_unnamed("BaseNymClient");
// channels responsible for dealing with reply-related fun
let (reply_controller_sender, reply_controller_receiver) =
@@ -1025,11 +823,7 @@ where
.dkg_query_client
.map(|client| BandwidthController::new(credential_store, client));
let nym_api_client = Self::construct_nym_api_client(
self.nym_api_urls.as_ref(),
&self.config,
self.user_agent.clone(),
)?;
let nym_api_client = Self::construct_nym_api_client(&self.config, self.user_agent.clone());
let key_rotation_config = Self::determine_key_rotation_state(&nym_api_client).await?;
let topology_provider = Self::setup_topology_provider(
@@ -1044,7 +838,7 @@ where
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
&shutdown_tracker.clone(),
shutdown.fork("statistics_control"),
);
// needs to be started as the first thing to block if required waiting for the gateway
@@ -1054,14 +848,14 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
&shutdown_tracker.clone(),
shutdown.fork("topology_refresher"),
)
.await?;
let gateway_packet_router = PacketRouter::new(
ack_sender,
mixnet_messages_sender,
shutdown_tracker.clone_shutdown_token(),
shutdown.get_handle().named("gateway-packet-router"),
);
let gateway_transceiver = Self::setup_gateway_transceiver(
@@ -1074,7 +868,7 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
&shutdown_tracker.clone(),
shutdown.fork("gateway_transceiver"),
)
.await?;
let gateway_ws_fd = gateway_transceiver.ws_fd();
@@ -1082,7 +876,7 @@ where
let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
&shutdown_tracker.clone(),
shutdown.fork("persistent_reply_storage"),
)
.await?;
@@ -1092,8 +886,8 @@ where
mixnet_messages_receiver,
reply_storage.key_storage(),
reply_controller_sender.clone(),
shutdown.fork("received_messages_buffer"),
stats_reporter.clone(),
&shutdown_tracker.clone(),
);
// The message_sender is the transmitter for any component generating sphinx packets
@@ -1103,8 +897,7 @@ where
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
&shutdown_tracker.clone(),
EventSender(event_sender),
shutdown.fork("mix_traffic_controller"),
);
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -1133,8 +926,9 @@ where
reply_controller_receiver,
shared_lane_queue_lengths.clone(),
client_connection_rx,
shutdown.fork("real_traffic_controller"),
self.config.debug.traffic.packet_type,
stats_reporter.clone(),
&shutdown_tracker.clone(),
);
if !self
@@ -1150,19 +944,12 @@ where
shared_topology_accessor.clone(),
message_sender,
stats_reporter.clone(),
&shutdown_tracker.clone(),
shutdown.fork("cover_traffic_stream"),
);
}
tracing::debug!("Core client startup finished!");
tracing::debug!("The address of this client is: {self_address}");
#[cfg(debug_assertions)]
#[cfg(target_arch = "wasm32")]
{
console_log!("Core client startup finished!");
console_log!("Rust::start_base: the address of this client is: {self_address}");
}
debug!("Core client startup finished!");
debug!("The address of this client is: {self_address}");
Ok(BaseClient {
address: self_address,
@@ -1171,7 +958,6 @@ where
client_input: ClientInput {
connection_command_sender: client_connection_tx,
input_sender,
client_request_sender,
},
},
client_output: ClientOutputStatus::AwaitingConsumer {
@@ -1186,7 +972,8 @@ where
gateway_connection: GatewayConnection { gateway_ws_fd },
},
stats_reporter,
shutdown_handle: shutdown_tracker, // The primary tracker for this client
task_handle: shutdown,
client_request_sender,
forget_me: self.config.debug.forget_me,
remember_me: self.config.debug.remember_me,
})
@@ -1200,57 +987,8 @@ pub struct BaseClient {
pub client_output: ClientOutputStatus,
pub client_state: ClientState,
pub stats_reporter: ClientStatsSender,
pub shutdown_handle: ShutdownTracker,
pub client_request_sender: ClientRequestSender,
pub task_handle: TaskHandle,
pub forget_me: ForgetMe,
pub remember_me: RememberMe,
}
#[cfg(test)]
mod tests {
use super::*;
use nym_network_defaults::{ApiUrl, NymNetworkDetails};
#[test]
fn test_network_details_with_multiple_urls() {
// Verify that network details can be configured with multiple API URLs
let mut network_details = NymNetworkDetails::new_empty();
network_details.nym_api_urls = Some(vec![
ApiUrl {
url: "https://validator.nymtech.net/api/".to_string(),
front_hosts: None,
},
ApiUrl {
url: "https://nym-frontdoor.vercel.app/api/".to_string(),
front_hosts: Some(vec!["vercel.app".to_string(), "vercel.com".to_string()]),
},
]);
assert_eq!(network_details.nym_api_urls.as_ref().unwrap().len(), 2);
assert!(network_details.nym_api_urls.as_ref().unwrap()[1]
.front_hosts
.is_some());
}
#[test]
fn test_network_details_with_front_hosts() {
// Verify that ApiUrl can store domain fronting configuration
let api_url = ApiUrl {
url: "https://nym-frontdoor.vercel.app/api/".to_string(),
front_hosts: Some(vec!["vercel.app".to_string(), "vercel.com".to_string()]),
};
assert_eq!(api_url.url, "https://nym-frontdoor.vercel.app/api/");
assert_eq!(api_url.front_hosts.as_ref().unwrap().len(), 2);
assert!(api_url
.front_hosts
.as_ref()
.unwrap()
.contains(&"vercel.app".to_string()));
}
#[test]
fn test_default_nym_api_retries_constant() {
// Verify the retry constant is set correctly
assert_eq!(DEFAULT_NYM_API_RETRIES, 3);
}
}
@@ -114,32 +114,41 @@ pub async fn setup_fs_gateways_storage<P: AsRef<Path>>(
})
}
pub fn create_bandwidth_controller_with_urls<St: CredentialStorage>(
nyxd_url: Url,
storage: St,
) -> Result<BandwidthController<QueryHttpRpcNyxdClient, St>, ClientCoreError> {
let client = default_query_dkg_client(nyxd_url)?;
Ok(BandwidthController::new(storage, client))
}
pub fn default_query_dkg_client_from_config(
pub fn create_bandwidth_controller<St: CredentialStorage>(
config: &Config,
) -> Result<QueryHttpRpcNyxdClient, ClientCoreError> {
storage: St,
) -> BandwidthController<QueryHttpRpcNyxdClient, St> {
let nyxd_url = config
.get_validator_endpoints()
.pop()
.ok_or(ClientCoreError::RpcClientMissingUrl)?;
.expect("No nyxd validator endpoint provided");
create_bandwidth_controller_with_urls(nyxd_url, storage)
}
pub fn create_bandwidth_controller_with_urls<St: CredentialStorage>(
nyxd_url: Url,
storage: St,
) -> BandwidthController<QueryHttpRpcNyxdClient, St> {
let client = default_query_dkg_client(nyxd_url);
BandwidthController::new(storage, client)
}
pub fn default_query_dkg_client_from_config(config: &Config) -> QueryHttpRpcNyxdClient {
let nyxd_url = config
.get_validator_endpoints()
.pop()
.expect("No nyxd validator endpoint provided");
default_query_dkg_client(nyxd_url)
}
pub fn default_query_dkg_client(nyxd_url: Url) -> Result<QueryHttpRpcNyxdClient, ClientCoreError> {
pub fn default_query_dkg_client(nyxd_url: Url) -> QueryHttpRpcNyxdClient {
let details = nym_network_defaults::NymNetworkDetails::new_from_env();
let client_config = nyxd::Config::try_from_nym_network_details(&details)
.map_err(|source| ClientCoreError::InvalidNetworkDetails { source })?;
.expect("failed to construct validator client config");
// overwrite env configuration with config URLs
QueryHttpRpcNyxdClient::connect(client_config, nyxd_url.as_str())
.map_err(|source| ClientCoreError::RpcClientCreationFailure { source })
.expect("Could not construct query client")
}
@@ -3,7 +3,7 @@
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::topology_control::TopologyAccessor;
use crate::config;
use crate::{config, spawn_future};
use futures::task::{Context, Poll};
use futures::{Future, Stream, StreamExt};
use nym_sphinx::acknowledgements::AckKey;
@@ -12,6 +12,7 @@ use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::TaskClient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -68,6 +69,8 @@ where
packet_type: PacketType,
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
impl<R> Stream for LoopCoverTrafficStream<R>
@@ -114,6 +117,7 @@ impl LoopCoverTrafficStream<OsRng> {
traffic_config: config::Traffic,
cover_config: config::CoverTraffic,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let rng = OsRng;
@@ -133,6 +137,7 @@ impl LoopCoverTrafficStream<OsRng> {
use_legacy_sphinx_format: traffic_config.use_legacy_sphinx_format,
packet_type: traffic_config.packet_type,
stats_tx,
task_client,
}
}
@@ -205,7 +210,7 @@ impl LoopCoverTrafficStream<OsRng> {
TrySendError::Full(_) => {
// This isn't a problem, if the channel is full means we're already sending the
// max amount of messages downstream can handle.
tracing::trace!("Failed to send cover message - channel full");
tracing::debug!("Failed to send cover message - channel full");
}
TrySendError::Closed(_) => {
tracing::warn!("Failed to send cover message - channel closed");
@@ -225,24 +230,16 @@ impl LoopCoverTrafficStream<OsRng> {
// JS: due to identical logical structure to OutQueueControl::on_message(), this is also
// presumably required to prevent bugs in the future. Exact reason is still unknown to me.
// TODO: temporary and BAD workaround for wasm (we should find a way to yield here in wasm)
#[cfg(not(target_arch = "wasm32"))]
{
tokio::task::yield_now().await;
}
#[cfg(target_arch = "wasm32")]
{
tokio_with_wasm::task::yield_now().await;
}
tokio::task::yield_now().await;
}
// it's fine if cover traffic stream task gets killed whilst processing next message
#[allow(clippy::panic)]
pub async fn run(&mut self) {
pub fn start(mut self) {
if self.cover_traffic.disable_loop_cover_traffic_stream {
// we should have never got here in the first place - the task should have never been created to begin with
// so panic and review the code that lead to this branch
panic!("attempted to run LoopCoverTrafficStream while config explicitly disabled it.")
panic!("attempted to start LoopCoverTrafficStream while config explicitly disabled it.")
}
// we should set initial delay only when we actually start the stream
@@ -252,11 +249,29 @@ impl LoopCoverTrafficStream<OsRng> {
);
self.set_next_delay(sampled);
while self.next().await.is_some() {
self.on_new_message().await;
}
let mut shutdown = self.task_client.fork("select");
// this should never get triggered
error!("cover traffic stream has been exhausted!")
spawn_future(async move {
debug!("Started LoopCoverTrafficStream with graceful shutdown support");
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
tracing::trace!("LoopCoverTrafficStream: Received shutdown");
}
next = self.next() => {
if next.is_some() {
self.on_new_message().await;
} else {
tracing::trace!("LoopCoverTrafficStream: Stopping since channel closed");
break;
}
}
}
}
shutdown.recv_timeout().await;
tracing::debug!("LoopCoverTrafficStream: Exiting");
})
}
}
@@ -1,40 +0,0 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::StreamExt;
use crate::client::base_client::{EventReceiver, EventSender, MixnetClientEvent};
/// Launches and manages task events, propagating upwards what is not strictly internal.
pub(crate) struct EventControl {
parent_event_tx: Option<EventSender>,
children_event_rx: EventReceiver,
}
impl EventControl {
pub(crate) fn new(
parent_event_tx: Option<EventSender>,
children_event_rx: EventReceiver,
) -> Self {
EventControl {
parent_event_tx,
children_event_rx,
}
}
fn is_internal(event: MixnetClientEvent) -> bool {
match event {
MixnetClientEvent::Traffic(_) => false,
}
}
pub(crate) async fn run(mut self) {
while let Some(event) = self.children_event_rx.next().await {
if let Some(parent_event_tx) = &self.parent_event_tx {
if !Self::is_internal(event) {
parent_event_tx.send(event);
}
}
}
}
}
@@ -2,9 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
#![allow(unused_imports)]
use std::time::Duration;
#[cfg(target_arch = "wasm32")]
pub use wasmtimer::{std::Instant, tokio::*};
pub type IntervalStream = gloo_timers::future::IntervalStream;
+97 -109
View File
@@ -1,13 +1,12 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::{
base_client::{EventSender, MixnetClientEvent},
mix_traffic::transceiver::GatewayTransceiver,
};
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
use crate::error::ClientCoreError;
use crate::spawn_future;
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use tracing::*;
use transceiver::ErasedGatewayError;
@@ -20,41 +19,33 @@ pub mod transceiver;
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
/// Reduced from 100 to 20 to fail fast (~1-2 seconds instead of ~6 seconds).
/// If we can't send 20 packets in a row, the gateway is unreachable.
const MAX_FAILURE_COUNT: usize = 20;
const MAX_FAILURE_COUNT: usize = 100;
// that's also disgusting.
pub struct Empty;
#[derive(Clone, Copy, Debug)]
pub enum MixTrafficEvent {
FailedSendingSphinx,
}
pub struct MixTrafficController {
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
mix_tx: BatchMixMessageSender,
mix_rx: BatchMixMessageReceiver,
client_rx: ClientRequestReceiver,
client_tx: ClientRequestSender,
// TODO: this is temporary work-around.
// in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
consecutive_gateway_failure_count: usize,
shutdown_token: ShutdownToken,
event_tx: EventSender,
task_client: TaskClient,
}
impl MixTrafficController {
pub fn new<T>(
gateway_transceiver: T,
shutdown_token: ShutdownToken,
event_tx: EventSender,
) -> MixTrafficController
task_client: TaskClient,
) -> (
MixTrafficController,
BatchMixMessageSender,
ClientRequestSender,
)
where
T: GatewayTransceiver + Send + 'static,
{
@@ -63,32 +54,41 @@ impl MixTrafficController {
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(8);
MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_tx: message_sender,
mix_rx: message_receiver,
client_rx: client_receiver,
client_tx: client_sender,
consecutive_gateway_failure_count: 0,
shutdown_token,
event_tx,
}
(
MixTrafficController {
gateway_transceiver: Box::new(gateway_transceiver),
mix_rx: message_receiver,
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
task_client,
},
message_sender,
client_sender,
)
}
pub fn new_dynamic(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown_token: ShutdownToken,
event_tx: EventSender,
) -> MixTrafficController {
Self::new(gateway_transceiver, shutdown_token, event_tx)
}
pub fn client_tx(&self) -> ClientRequestSender {
self.client_tx.clone()
}
pub fn mix_tx(&self) -> BatchMixMessageSender {
self.mix_tx.clone()
task_client: TaskClient,
) -> (
MixTrafficController,
BatchMixMessageSender,
ClientRequestSender,
) {
let (message_sender, message_receiver) =
tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);
let (client_sender, client_receiver) = tokio::sync::mpsc::channel(8);
(
MixTrafficController {
gateway_transceiver,
mix_rx: message_receiver,
client_rx: client_receiver,
consecutive_gateway_failure_count: 0,
task_client,
},
message_sender,
client_sender,
)
}
async fn on_messages(
@@ -96,84 +96,72 @@ impl MixTrafficController {
mut mix_packets: Vec<MixPacket>,
) -> Result<(), ErasedGatewayError> {
debug_assert!(!mix_packets.is_empty());
let send_future = if mix_packets.len() == 1 {
// SAFETY: we just checked we have one packet
#[allow(clippy::unwrap_used)]
let result = if mix_packets.len() == 1 {
let mix_packet = mix_packets.pop().unwrap();
self.gateway_transceiver.send_mix_packet(mix_packet)
self.gateway_transceiver.send_mix_packet(mix_packet).await
} else {
self.gateway_transceiver.batch_send_mix_packets(mix_packets)
self.gateway_transceiver
.batch_send_mix_packets(mix_packets)
.await
};
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
trace!("received shutdown while handling messages");
Ok(())
}
result = send_future => {
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
result
}
if result.is_err() {
self.consecutive_gateway_failure_count += 1;
} else {
trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
self.consecutive_gateway_failure_count = 0;
}
result
}
async fn on_client_request(&mut self, client_request: ClientRequest) {
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
trace!("received shutdown while handling client request");
}
result = self.gateway_transceiver.send_client_request(client_request) => {
if let Err(err) = result {
error!("Failed to send client request: {err}")
}
}
}
}
pub fn start(mut self) {
spawn_future(async move {
debug!("Started MixTrafficController with graceful shutdown support");
pub async fn run(&mut self) {
debug!("Started MixTrafficController with graceful shutdown support");
loop {
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
trace!("MixTrafficController: Received shutdown");
break;
}
// mix_rx should never error out as we're holding one instance of the sender
Some(mix_packets) = self.mix_rx.recv() => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
// IMO it shouldn't be signalled from there but it is how it is
// TODO : report the failure upwards and shutdown from upwards
// Gateway is dead, we have to shut down currently
error!("Signalling shutdown from the MixTrafficController");
self.shutdown_token.cancel();
while !self.task_client.is_shutdown() {
tokio::select! {
mix_packets = self.mix_rx.recv() => match mix_packets {
Some(mix_packets) => {
if let Err(err) = self.on_messages(mix_packets).await {
error!("Failed to send sphinx packet(s) to the gateway: {err}");
if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
// Disconnect from the gateway. If we should try to re-connect
// is handled at a higher layer.
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
// Do we need to handle the embedded mixnet client case
// separately?
self.task_client.send_we_stopped(Box::new(ClientCoreError::GatewayFailedToForwardMessages));
break;
}
}
},
None => {
tracing::trace!("MixTrafficController: Stopping since channel closed");
break;
}
},
client_request = self.client_rx.recv() => match client_request {
Some(client_request) => {
match self.gateway_transceiver.send_client_request(client_request).await {
Ok(_) => (),
Err(e) => error!("Failed to send client request: {e}"),
};
},
None => {
tracing::trace!("MixTrafficController, client request channel closed");
}
},
_ = self.task_client.recv() => {
tracing::trace!("MixTrafficController: Received shutdown");
break;
}
},
// client_rx should never error out as we're holding one instance of the sender
Some(client_request) = self.client_rx.recv() => {
self.on_client_request(client_request).await;
}
}
}
debug!("MixTrafficController: Exiting");
self.task_client.recv_timeout().await;
tracing::debug!("MixTrafficController: Exiting");
});
}
}
@@ -269,8 +269,6 @@ pub struct MockGateway {
}
impl Default for MockGateway {
// test code
#[allow(clippy::unwrap_used)]
fn default() -> Self {
MockGateway {
dummy_identity: "3ebjp1Fb9hdcS1AR6AZihgeJiMHkB5jjJUsvqNnfQwU7"
-1
View File
@@ -3,7 +3,6 @@
pub mod base_client;
pub mod cover_traffic_stream;
pub(crate) mod event_control;
pub(crate) mod helpers;
pub mod inbound_messages;
pub mod key_manager;
@@ -10,17 +10,18 @@ use nym_sphinx::{
acknowledgements::{identifier::recover_identifier, AckKey},
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
};
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use std::sync::Arc;
use tracing::*;
/// Module responsible for listening for any data resembling acknowledgements from the network
/// and firing actions to remove them from the 'Pending' state.
pub(crate) struct AcknowledgementListener {
pub(super) struct AcknowledgementListener {
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
}
impl AcknowledgementListener {
@@ -29,12 +30,14 @@ impl AcknowledgementListener {
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
AcknowledgementListener {
ack_key,
ack_receiver,
action_sender,
stats_tx,
task_client,
}
}
@@ -65,9 +68,14 @@ impl AcknowledgementListener {
trace!("Received {frag_id} from the mix network");
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
let _ = self
if let Err(err) = self
.action_sender
.unbounded_send(Action::new_remove(frag_id));
.unbounded_send(Action::new_remove(frag_id))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send remove action to action controller: {err}");
}
}
}
async fn handle_ack_receiver_item(&mut self, item: Vec<Vec<u8>>) {
@@ -77,16 +85,11 @@ impl AcknowledgementListener {
}
}
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
pub(super) async fn run(&mut self) {
debug!("Started AcknowledgementListener with graceful shutdown support");
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
tracing::trace!("AcknowledgementListener: Received shutdown");
break;
}
acks = self.ack_receiver.next() => match acks {
Some(acks) => self.handle_ack_receiver_item(acks).await,
None => {
@@ -94,9 +97,12 @@ impl AcknowledgementListener {
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("AcknowledgementListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("AcknowledgementListener: Exiting");
}
}
@@ -8,7 +8,7 @@ use futures::StreamExt;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::Delay as SphinxDelay;
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
@@ -82,7 +82,7 @@ impl Config {
}
}
pub(crate) struct ActionController {
pub(super) struct ActionController {
/// Configurable parameters of the `ActionController`
config: Config,
@@ -102,6 +102,8 @@ pub(crate) struct ActionController {
/// Channel for notifying `RetransmissionRequestListener` about expired acknowledgements.
retransmission_sender: RetransmissionRequestSender,
task_client: TaskClient,
}
impl ActionController {
@@ -109,6 +111,7 @@ impl ActionController {
config: Config,
retransmission_sender: RetransmissionRequestSender,
incoming_actions: AckActionReceiver,
task_client: TaskClient,
) -> Self {
ActionController {
config,
@@ -116,6 +119,7 @@ impl ActionController {
pending_acks_timers: NonExhaustiveDelayQueue::new(),
incoming_actions,
retransmission_sender,
task_client,
}
}
@@ -190,11 +194,10 @@ impl ActionController {
trace!("{frag_id} is updating its delay");
// TODO: is it possible to solve this without either locking or temporarily removing the value?
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
// SAFETY: this Action is triggered by `RetransmissionRequestListener` (for 'normal' packets)
// this Action is triggered by `RetransmissionRequestListener` (for 'normal' packets)
// or `ReplyController` (for 'reply' packets) which held the other potential
// reference to this Arc. HOWEVER, before the Action was pushed onto the queue, the reference
// was dropped hence this unwrap is safe.
#[allow(clippy::unwrap_used)]
let mut inner_data = Arc::try_unwrap(pending_ack_data).unwrap();
inner_data.update_retransmitted(delay);
@@ -206,7 +209,6 @@ impl ActionController {
}
// note: when the entry expires it's automatically removed from pending_acks_timers
#[allow(clippy::panic)]
fn handle_expired_ack_timer(&mut self, expired_ack: Expired<FragmentIdentifier>) {
let frag_id = expired_ack.into_inner();
@@ -222,9 +224,14 @@ impl ActionController {
// downgrading an arc and then upgrading vs cloning is difference of 30ns vs 15ns
// so it's literally a NO difference while it might prevent us from unnecessarily
// resending data (in maybe 1 in 1 million cases, but it's something)
let _ = self
if let Err(err) = self
.retransmission_sender
.unbounded_send(Arc::downgrade(pending_ack_data));
.unbounded_send(Arc::downgrade(pending_ack_data))
{
if !self.task_client.is_shutdown_poll() {
tracing::error!("Failed to send pending ack for retransmission: {err}");
}
}
} else {
// this shouldn't cause any issues but shouldn't have happened to begin with!
error!("An already removed pending ack has expired")
@@ -242,16 +249,11 @@ impl ActionController {
}
}
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
pub(super) async fn run(&mut self) {
debug!("Started ActionController with graceful shutdown support");
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
tracing::trace!("ActionController: Received shutdown");
break;
}
action = self.incoming_actions.next() => match action {
Some(action) => self.process_action(action),
None => {
@@ -268,8 +270,13 @@ impl ActionController {
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("ActionController: Received shutdown");
break;
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("ActionController: Exiting");
}
}
@@ -10,20 +10,21 @@ use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use tracing::*;
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
/// putting everything into sphinx packets, etc.
/// It also makes an initial sending attempt for said messages.
pub(crate) struct InputMessageListener<R>
pub(super) struct InputMessageListener<R>
where
R: CryptoRng + Rng,
{
input_receiver: InputMessageReceiver,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
}
impl<R> InputMessageListener<R>
@@ -37,11 +38,13 @@ where
input_receiver: InputMessageReceiver,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
task_client: TaskClient,
) -> Self {
InputMessageListener {
input_receiver,
message_handler,
reply_controller_sender,
task_client,
}
}
@@ -65,9 +68,14 @@ where
max_retransmissions: Option<u32>,
) {
// offload reply handling to the dedicated task
let _ =
if let Err(err) =
self.reply_controller_sender
.send_reply(recipient_tag, data, lane, max_retransmissions);
.send_reply(recipient_tag, data, lane, max_retransmissions)
{
if !self.task_client.is_shutdown_poll() {
error!("failed to send a reply - {err}");
}
}
}
async fn handle_plain_message(
@@ -112,7 +120,6 @@ where
}
}
#[allow(clippy::panic)]
async fn on_input_message(&mut self, msg: InputMessage) {
match msg {
InputMessage::Regular {
@@ -206,23 +213,16 @@ where
self.handle_premade_packets(msgs, lane).await
}
// MessageWrappers can't be nested
InputMessage::MessageWrapper { .. } => {
panic!("attempted to use nested MessageWrapper")
}
InputMessage::MessageWrapper { .. } => unimplemented!(),
},
};
}
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
pub(super) async fn run(&mut self) {
debug!("Started InputMessageListener with graceful shutdown support");
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
tracing::trace!("InputMessageListener: Received shutdown");
break;
}
input_msg = self.input_receiver.recv() => match input_msg {
Some(input_msg) => {
self.on_input_message(input_msg).await;
@@ -232,9 +232,12 @@ where
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("InputMessageListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("InputMessageListener: Exiting");
}
}
@@ -10,6 +10,7 @@ use self::{
use crate::client::inbound_messages::InputMessageReceiver;
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller::ReplyControllerSender;
use crate::spawn_future;
use action_controller::AckActionReceiver;
use futures::channel::mpsc;
use nym_gateway_client::AcknowledgementReceiver;
@@ -22,11 +23,13 @@ use nym_sphinx::{
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use std::{
sync::{Arc, Weak},
time::Duration,
};
use tracing::*;
pub(crate) use action_controller::{AckActionSender, Action};
@@ -187,9 +190,6 @@ pub(super) struct Config {
/// Predefined packet size used for the encapsulated messages.
packet_size: PacketSize,
/// Type of packets used for retransmissions
packet_type: PacketType,
}
impl Config {
@@ -197,14 +197,12 @@ impl Config {
maximum_retransmissions: Option<u32>,
ack_wait_addition: Duration,
ack_wait_multiplier: f64,
packet_type: PacketType,
) -> Self {
Config {
maximum_retransmissions,
ack_wait_addition,
ack_wait_multiplier,
packet_size: Default::default(),
packet_type,
}
}
@@ -214,7 +212,7 @@ impl Config {
}
}
pub(crate) struct AcknowledgementController<R>
pub(super) struct AcknowledgementController<R>
where
R: CryptoRng + Rng,
{
@@ -236,6 +234,7 @@ where
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
task_client: TaskClient,
) -> Self {
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();
@@ -245,6 +244,7 @@ where
action_config,
retransmission_tx,
connectors.ack_action_receiver,
task_client.fork("action_controller"),
);
// will listen for any acks coming from the network
@@ -253,6 +253,7 @@ where
connectors.ack_receiver,
connectors.ack_action_sender.clone(),
stats_tx,
task_client.fork("acknowledgement_listener"),
);
// will listen for any new messages from the client
@@ -260,6 +261,7 @@ where
connectors.input_receiver,
message_handler.clone(),
reply_controller_sender.clone(),
task_client.fork("input_message_listener"),
);
// will listen for any ack timeouts and trigger retransmission
@@ -269,13 +271,16 @@ where
message_handler,
retransmission_rx,
reply_controller_sender,
config.packet_type,
task_client.fork("retransmission_request_listener"),
);
// will listen for events indicating the packet was sent through the network so that
// the retransmission timer should be started.
let sent_notification_listener =
SentNotificationListener::new(connectors.sent_notifier, connectors.ack_action_sender);
let sent_notification_listener = SentNotificationListener::new(
connectors.sent_notifier,
connectors.ack_action_sender,
task_client.with_suffix("sent_notification_listener"),
);
AcknowledgementController {
acknowledgement_listener,
@@ -286,21 +291,36 @@ where
}
}
pub(crate) fn into_tasks(
self,
) -> (
AcknowledgementListener,
InputMessageListener<R>,
RetransmissionRequestListener<R>,
SentNotificationListener,
ActionController,
) {
(
self.acknowledgement_listener,
self.input_message_listener,
self.retransmission_request_listener,
self.sent_notification_listener,
self.action_controller,
)
pub(super) fn start(self, packet_type: PacketType) {
let mut acknowledgement_listener = self.acknowledgement_listener;
let mut input_message_listener = self.input_message_listener;
let mut retransmission_request_listener = self.retransmission_request_listener;
let mut sent_notification_listener = self.sent_notification_listener;
let mut action_controller = self.action_controller;
spawn_future(async move {
acknowledgement_listener.run().await;
debug!("The acknowledgement listener has finished execution!");
});
spawn_future(async move {
input_message_listener.run().await;
debug!("The input listener has finished execution!");
});
spawn_future(async move {
retransmission_request_listener.run(packet_type).await;
debug!("The retransmission request listener has finished execution!");
});
spawn_future(async move {
sent_notification_listener.run().await;
debug!("The sent notification listener has finished execution!");
});
spawn_future(async move {
action_controller.run().await;
debug!("The controller has finished execution!");
});
}
}
@@ -13,19 +13,19 @@ use futures::StreamExt;
use nym_sphinx::chunking::fragment::Fragment;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
use nym_task::{connections::TransmissionLane, ShutdownToken};
use nym_task::{connections::TransmissionLane, TaskClient};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
use tracing::*;
// responsible for packet retransmission upon fired timer
pub(crate) struct RetransmissionRequestListener<R> {
pub(super) struct RetransmissionRequestListener<R> {
maximum_retransmissions: Option<u32>,
action_sender: AckActionSender,
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
packet_type: PacketType,
task_client: TaskClient,
}
impl<R> RetransmissionRequestListener<R>
@@ -38,7 +38,7 @@ where
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
packet_type: PacketType,
task_client: TaskClient,
) -> Self {
RetransmissionRequestListener {
maximum_retransmissions,
@@ -46,7 +46,7 @@ where
message_handler,
request_receiver,
reply_controller_sender,
packet_type,
task_client,
}
}
@@ -67,6 +67,7 @@ where
async fn on_retransmission_request(
&mut self,
weak_timed_out_ack: Weak<PendingAcknowledgement>,
packet_type: PacketType,
) {
let timed_out_ack = match weak_timed_out_ack.upgrade() {
Some(timed_out_ack) => timed_out_ack,
@@ -96,18 +97,22 @@ where
} => {
// if this is retransmission for reply, offload it to the dedicated task
// that deals with all the surbs
let _ = self.reply_controller_sender.send_retransmission_data(
if let Err(err) = self.reply_controller_sender.send_retransmission_data(
*recipient_tag,
weak_timed_out_ack,
*extra_surb_request,
);
) {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send retransmission data to the reply controller: {err}");
}
}
return;
}
PacketDestination::KnownRecipient(recipient) => {
self.prepare_normal_retransmission_chunk(
**recipient,
timed_out_ack.message_chunk.clone(),
self.packet_type,
packet_type,
)
.await
}
@@ -148,9 +153,14 @@ where
// is sent to the `OutQueueControl` and has gone through its internal queue
// with the additional poisson delay.
// And since Actions are executed in order `UpdateTimer` will HAVE TO be executed before `StartTimer`
let _ = self
if let Err(err) = self
.action_sender
.unbounded_send(Action::new_update_pending_ack(frag_id, new_delay));
.unbounded_send(Action::new_update_pending_ack(frag_id, new_delay))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send update pending ack action to the controller: {err}");
}
}
// send to `OutQueueControl` to eventually send to the mix network
self.message_handler
@@ -164,26 +174,24 @@ where
.await
}
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
pub(super) async fn run(&mut self, packet_type: PacketType) {
debug!("Started RetransmissionRequestListener with graceful shutdown support");
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
break;
}
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack).await,
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
None => {
tracing::trace!("RetransmissionRequestListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("RetransmissionRequestListener: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("RetransmissionRequestListener: Exiting");
}
}
@@ -5,25 +5,29 @@ use super::action_controller::{AckActionSender, Action};
use super::SentPacketNotificationReceiver;
use futures::StreamExt;
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use nym_task::TaskClient;
use tracing::*;
/// Module responsible for starting up retransmission timers.
/// It is required because when we send our packet to the `real traffic stream` controlled
/// by a poisson timer, there's no guarantee the message will be sent immediately, so we might
/// accidentally fire retransmission way quicker than we should have.
pub(crate) struct SentNotificationListener {
pub(super) struct SentNotificationListener {
sent_notifier: SentPacketNotificationReceiver,
action_sender: AckActionSender,
task_client: TaskClient,
}
impl SentNotificationListener {
pub(super) fn new(
sent_notifier: SentPacketNotificationReceiver,
action_sender: AckActionSender,
task_client: TaskClient,
) -> Self {
SentNotificationListener {
sent_notifier,
action_sender,
task_client,
}
}
@@ -32,18 +36,37 @@ impl SentNotificationListener {
trace!("sent off a cover message - no need to start retransmission timer!");
return;
}
let _ = self
if let Err(err) = self
.action_sender
.unbounded_send(Action::new_start_timer(frag_id));
.unbounded_send(Action::new_start_timer(frag_id))
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to send start timer action to action controller: {err}");
}
}
}
pub(crate) async fn run(&mut self) {
pub(super) async fn run(&mut self) {
debug!("Started SentNotificationListener with graceful shutdown support");
while let Some(frag_id) = self.sent_notifier.next().await {
self.on_sent_message(frag_id).await;
while !self.task_client.is_shutdown() {
tokio::select! {
frag_id = self.sent_notifier.next() => match frag_id {
Some(frag_id) => {
self.on_sent_message(frag_id).await;
}
None => {
tracing::trace!("SentNotificationListener: Stopping since channel closed");
break;
}
},
_ = self.task_client.recv() => {
tracing::trace!("SentNotificationListener: Received shutdown");
break;
}
}
}
assert!(self.task_client.is_shutdown_poll());
tracing::debug!("SentNotificationListener: Exiting");
}
}
@@ -20,7 +20,7 @@ use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
@@ -35,9 +35,6 @@ pub enum PreparationError {
#[error(transparent)]
NymTopologyError(#[from] NymTopologyError),
#[error("message wasn't split into any fragments!")]
EmptyFragments,
#[error("message too long for a single SURB, splitting into {fragments} fragments.")]
MessageTooLongForSingleSurb { fragments: usize },
@@ -189,7 +186,7 @@ pub(crate) struct MessageHandler<R> {
topology_access: TopologyAccessor,
reply_key_storage: SentReplyKeys,
tag_storage: UsedSenderTags,
shutdown_token: ShutdownToken,
task_client: TaskClient,
}
impl<R> MessageHandler<R>
@@ -205,7 +202,7 @@ where
topology_access: TopologyAccessor,
reply_key_storage: SentReplyKeys,
tag_storage: UsedSenderTags,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> Self
where
R: Copy,
@@ -228,7 +225,7 @@ where
topology_access,
reply_key_storage,
tag_storage,
shutdown_token,
task_client,
}
}
@@ -323,16 +320,6 @@ where
});
}
if fragment.is_empty() {
error!("CRITICAL FAILURE: our split message didn't result in any sendable fragments");
return Err(SurbWrappedPreparationError {
source: PreparationError::EmptyFragments,
returned_surbs: Some(vec![reply_surb]),
});
}
// SAFETY: we just checked we have one fragment
#[allow(clippy::unwrap_used)]
let chunk = fragment.pop().unwrap();
let chunk_clone = chunk.clone();
let prepared_fragment = self
@@ -548,7 +535,6 @@ where
pending_acks.push(pending_ack);
}
drop(topology_permit);
self.insert_pending_acks(pending_acks);
self.forward_messages(real_messages, lane).await;
@@ -671,7 +657,6 @@ where
.zip(reply_surbs.into_iter())
.map(|(fragment, reply_surb)| {
// unwrap here is fine as we know we have a valid topology
#[allow(clippy::unwrap_used)]
self.message_preparer
.prepare_reply_chunk_for_sending(
fragment,
@@ -712,7 +697,7 @@ where
.action_sender
.unbounded_send(Action::UpdatePendingAck(id, new_delay))
{
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send update action to the controller: {err}");
}
}
@@ -723,7 +708,7 @@ where
.action_sender
.unbounded_send(Action::new_insert(pending_acks))
{
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!("Failed to send insert action to the controller: {err}");
}
}
@@ -731,21 +716,17 @@ where
// tells real message sender (with the poisson timer) to send this to the mix network
pub(crate) async fn forward_messages(
&mut self,
&self,
messages: Vec<RealMessage>,
transmission_lane: TransmissionLane,
) {
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
trace!("received shutdown while attempting to forward mixnet messages");
}
sending_res = self.real_message_sender.send((messages, transmission_lane)) => {
if sending_res.is_err() {
error!(
"failed to forward mixnet messages due to closed channel (outside of shutdown!)"
);
}
if let Err(err) = self
.real_message_sender
.send((messages, transmission_lane))
.await
{
if !self.task_client.is_shutdown_poll() {
error!("Failed to forward messages to the real message sender: {err}");
}
}
}
@@ -14,21 +14,26 @@ use crate::client::replies::reply_controller::{
ReplyController, ReplyControllerReceiver, ReplyControllerSender,
};
use crate::client::replies::reply_storage::CombinedReplyStorage;
use crate::client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
topology_control::TopologyAccessor,
};
use crate::config;
use crate::{
client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
topology_control::TopologyAccessor,
},
spawn_future,
};
use futures::channel::mpsc;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::PacketType;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
use tracing::*;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
pub(crate) use acknowledgement_control::{AckActionSender, Action};
@@ -64,7 +69,6 @@ impl<'a> From<&'a Config> for acknowledgement_control::Config {
cfg.traffic.maximum_number_of_retransmissions,
cfg.acks.ack_wait_addition,
cfg.acks.ack_wait_multiplier,
cfg.traffic.packet_type,
)
.with_custom_packet_size(cfg.traffic.primary_packet_size)
}
@@ -142,7 +146,7 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> Self {
let rng = OsRng;
@@ -174,7 +178,7 @@ impl RealMessagesController<OsRng> {
topology_access.clone(),
reply_storage.key_storage(),
reply_storage.tags_storage(),
shutdown_token.clone(),
task_client.fork("message_handler"),
);
let ack_control = AcknowledgementController::new(
@@ -184,6 +188,7 @@ impl RealMessagesController<OsRng> {
message_handler.clone(),
reply_controller_sender,
stats_tx.clone(),
task_client.fork("ack_control"),
);
let reply_control = ReplyController::new(
@@ -191,6 +196,7 @@ impl RealMessagesController<OsRng> {
message_handler,
reply_storage,
reply_controller_receiver,
task_client.fork("reply_controller"),
);
let out_queue_control = OutQueueControl::new(
@@ -203,7 +209,7 @@ impl RealMessagesController<OsRng> {
lane_queue_lengths,
client_connection_rx,
stats_tx,
shutdown_token.clone(),
task_client.with_suffix("out_queue_control"),
);
RealMessagesController {
@@ -213,13 +219,20 @@ impl RealMessagesController<OsRng> {
}
}
pub fn into_tasks(
self,
) -> (
OutQueueControl<OsRng>,
ReplyController<OsRng>,
AcknowledgementController<OsRng>,
) {
(self.out_queue_control, self.reply_control, self.ack_control)
pub fn start(self, packet_type: PacketType) {
let mut out_queue_control = self.out_queue_control;
let ack_control = self.ack_control;
let mut reply_control = self.reply_control;
spawn_future(async move {
out_queue_control.run().await;
debug!("The out queue controller has finished execution!");
});
spawn_future(async move {
reply_control.run().await;
debug!("The reply controller has finished execution!");
});
ack_control.start(packet_type);
}
}
@@ -21,7 +21,7 @@ use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, C
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -31,9 +31,9 @@ use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
// use wasm_utils::console_log;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::{sleep, Sleep};
mod sending_delay_controller;
/// Configurable parameters of the `OutQueueControl`
@@ -119,7 +119,7 @@ where
/// Channel used for sending metrics events (specifically `PacketStatistics` events) to the metrics tracker.
stats_tx: ClientStatsSender,
shutdown_token: ShutdownToken,
task_client: TaskClient,
}
#[derive(Debug)]
@@ -179,7 +179,7 @@ where
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> Self {
OutQueueControl {
config,
@@ -194,7 +194,7 @@ where
client_connection_rx,
lane_queue_lengths,
stats_tx,
shutdown_token,
task_client,
}
}
@@ -249,8 +249,6 @@ where
}
};
// SAFETY: our topology must be valid at this point
#[allow(clippy::expect_used)]
(
generate_loop_cover_packet(
&mut self.rng,
@@ -280,35 +278,17 @@ where
}
};
let sending_res = tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
trace!("received shutdown signal while attempting to send mix message");
return
}
sending_res = self.mix_tx.send(vec![next_message]) => {
sending_res
}
};
match sending_res {
Err(_) => {
if !self.shutdown_token.is_cancelled() {
tracing::error!(
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
);
}
// Early return to avoid further processing when channel is closed
return;
}
Ok(_) => {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
if !self.task_client.is_shutdown_poll() {
tracing::error!("Failed to send: {err}");
}
} else {
let event = if fragment_id.is_some() {
PacketStatisticsEvent::RealPacketSent(packet_size)
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
}
// notify ack controller about sending our message only after we actually managed to push it
@@ -327,15 +307,9 @@ where
// ready and hence was immediately re-scheduled causing other tasks to be starved;
// yield makes it go back the scheduling queue regardless of its value availability
// TODO: temporary and BAD workaround for wasm (we should find a way to yield here in wasm)
#[cfg(not(target_arch = "wasm32"))]
{
tokio::task::yield_now().await;
}
#[cfg(target_arch = "wasm32")]
{
tokio_with_wasm::task::yield_now().await;
}
tokio::task::yield_now().await;
}
fn on_close_connection(&mut self, connection_id: ConnectionId) {
@@ -465,8 +439,6 @@ where
tracing::trace!("handling real_messages: size: {}", real_messages.len());
self.transmission_buffer.store(&conn_id, real_messages);
// SAFETY: we just stored the message
#[allow(clippy::expect_used)]
let real_next = self.pop_next_message().expect("Just stored one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
@@ -515,8 +487,6 @@ where
// First store what we got for the given connection id
self.transmission_buffer.store(&conn_id, real_messages);
// SAFETY: we just stored the message
#[allow(clippy::expect_used)]
let real_next = self.pop_next_message().expect("we just added one");
Poll::Ready(Some(StreamMessage::Real(Box::new(real_next))))
@@ -544,7 +514,9 @@ where
}
#[cfg(not(target_arch = "wasm32"))]
fn log_status(&self) {
fn log_status(&self, shutdown: &mut TaskClient) {
use crate::error::ClientCoreStatusMessage;
let packets = self.transmission_buffer.total_size();
let lanes = self.transmission_buffer.lanes();
let mult = self.sending_delay_controller.current_multiplier();
@@ -573,33 +545,32 @@ where
tracing::debug!("{status_str}");
}
// leave the code commented in case somebody wanted to restore this logic with a different channel
// // Send status message to whoever is listening (possibly UI)
// if mult == self.sending_delay_controller.max_multiplier() {
// shutdown.send_status_msg(Box::new(ClientCoreStatusMessage::GatewayIsVerySlow));
// } else if mult > self.sending_delay_controller.min_multiplier() {
// shutdown.send_status_msg(Box::new(ClientCoreStatusMessage::GatewayIsSlow));
// }
// Send status message to whoever is listening (possibly UI)
if mult == self.sending_delay_controller.max_multiplier() {
shutdown.send_status_msg(Box::new(ClientCoreStatusMessage::GatewayIsVerySlow));
} else if mult > self.sending_delay_controller.min_multiplier() {
shutdown.send_status_msg(Box::new(ClientCoreStatusMessage::GatewayIsSlow));
}
}
pub(crate) async fn run(&mut self) {
pub(super) async fn run(&mut self) {
debug!("Started OutQueueControl with graceful shutdown support");
// avoid borrow on self
let shutdown_token = self.shutdown_token.clone();
let mut shutdown = self.task_client.fork("select");
#[cfg(not(target_arch = "wasm32"))]
{
let mut status_timer = tokio::time::interval(Duration::from_secs(5));
loop {
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
_ = shutdown.recv() => {
tracing::trace!("OutQueueControl: Received shutdown");
break;
}
_ = status_timer.tick() => {
self.log_status();
self.log_status(&mut shutdown);
}
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
@@ -609,16 +580,16 @@ where
}
}
}
shutdown.recv_timeout().await;
}
#[cfg(target_arch = "wasm32")]
{
loop {
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
_ = shutdown.recv() => {
tracing::trace!("OutQueueControl: Received shutdown");
break;
}
next_message = self.next() => if let Some(next_message) = next_message {
self.on_message(next_message).await;
@@ -83,13 +83,11 @@ impl SendingDelayController {
self.current_multiplier
}
#[allow(dead_code)]
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn min_multiplier(&self) -> u32 {
self.lower_bound
}
#[allow(dead_code)]
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn max_multiplier(&self) -> u32 {
self.upper_bound
@@ -1,10 +1,10 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::helpers::get_time_now;
use crate::client::replies::{
reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
};
use crate::spawn_future;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::StreamExt;
@@ -19,10 +19,10 @@ use nym_sphinx::message::{NymMessage, PlainMessage};
use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tracing::*;
// The interval at which we check for stale buffers
@@ -54,7 +54,7 @@ struct ReceivedMessagesBufferInner<R: MessageReceiver> {
stats_tx: ClientStatsSender,
// Periodically check for stale buffers to clean up
last_stale_check: crate::client::helpers::Instant,
last_stale_check: Instant,
}
impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
@@ -154,7 +154,7 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
}
fn cleanup_stale_buffers(&mut self) {
let now = get_time_now();
let now = Instant::now();
if now - self.last_stale_check > STALE_BUFFER_CHECK_INTERVAL {
self.last_stale_check = now;
self.message_receiver
@@ -171,7 +171,7 @@ struct ReceivedMessagesBuffer<R: MessageReceiver> {
inner: Arc<Mutex<ReceivedMessagesBufferInner<R>>>,
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
shutdown_token: ShutdownToken,
task_client: TaskClient,
}
impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
@@ -180,7 +180,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> Self {
ReceivedMessagesBuffer {
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
@@ -190,15 +190,14 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
message_sender: None,
recently_reconstructed: HashSet::new(),
stats_tx,
last_stale_check: get_time_now(),
last_stale_check: Instant::now(),
})),
reply_key_storage,
reply_controller_sender,
shutdown_token,
task_client,
}
}
#[allow(clippy::panic)]
async fn disconnect_sender(&mut self) {
let mut guard = self.inner.lock().await;
if guard.message_sender.is_none() {
@@ -209,7 +208,6 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
guard.message_sender = None;
}
#[allow(clippy::panic)]
async fn connect_sender(&mut self, sender: ReconstructedMessagesSender) {
let mut guard = self.inner.lock().await;
if guard.message_sender.is_some() {
@@ -315,7 +313,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
reply_surbs,
from_surb_request,
) {
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
}
@@ -338,7 +336,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
.reply_controller_sender
.send_additional_surbs_request(*recipient, amount)
{
if !self.shutdown_token.is_cancelled() {
if !self.task_client.is_shutdown_poll() {
error!("{err}");
}
}
@@ -465,22 +463,22 @@ pub enum ReceivedBufferMessage {
ReceiverDisconnect,
}
pub(crate) struct RequestReceiver<R: MessageReceiver> {
struct RequestReceiver<R: MessageReceiver> {
received_buffer: ReceivedMessagesBuffer<R>,
query_receiver: ReceivedBufferRequestReceiver,
shutdown_token: ShutdownToken,
task_client: TaskClient,
}
impl<R: MessageReceiver> RequestReceiver<R> {
fn new(
received_buffer: ReceivedMessagesBuffer<R>,
query_receiver: ReceivedBufferRequestReceiver,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> Self {
RequestReceiver {
received_buffer,
query_receiver,
shutdown_token,
task_client,
}
}
@@ -495,70 +493,66 @@ impl<R: MessageReceiver> RequestReceiver<R> {
}
}
pub(crate) async fn run(&mut self) {
async fn run(&mut self) {
debug!("Started RequestReceiver with graceful shutdown support");
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
_ = self.task_client.recv() => {
tracing::trace!("RequestReceiver: Received shutdown");
break;
}
request = self.query_receiver.next() => {
if let Some(message) = request {
self.handle_message(message).await
} else {
tracing::trace!("RequestReceiver: Stopping since channel closed");
self.shutdown_token.cancelled().await;
break;
}
},
}
}
self.task_client.recv().await;
tracing::debug!("RequestReceiver: Exiting");
}
}
pub(crate) struct FragmentedMessageReceiver<R: MessageReceiver> {
struct FragmentedMessageReceiver<R: MessageReceiver> {
received_buffer: ReceivedMessagesBuffer<R>,
mixnet_packet_receiver: MixnetMessageReceiver,
shutdown_token: ShutdownToken,
task_client: TaskClient,
}
impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
fn new(
received_buffer: ReceivedMessagesBuffer<R>,
mixnet_packet_receiver: MixnetMessageReceiver,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> Self {
FragmentedMessageReceiver {
received_buffer,
mixnet_packet_receiver,
shutdown_token,
task_client,
}
}
pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> {
async fn run(&mut self) -> Result<(), MessageRecoveryError> {
debug!("Started FragmentedMessageReceiver with graceful shutdown support");
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
tracing::trace!("FragmentedMessageReceiver: Received shutdown");
break;
}
new_messages = self.mixnet_packet_receiver.next() => {
if let Some(new_messages) = new_messages {
self.received_buffer.handle_new_received(new_messages).await?;
} else {
tracing::trace!("FragmentedMessageReceiver: Stopping since channel closed");
self.shutdown_token.cancelled().await;
break;
}
},
_ = self.task_client.recv_with_delay() => {
tracing::trace!("FragmentedMessageReceiver: Received shutdown");
}
}
}
self.task_client.recv_timeout().await;
tracing::debug!("FragmentedMessageReceiver: Exiting");
Ok(())
}
@@ -577,31 +571,42 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
metrics_reporter: ClientStatsSender,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> Self {
let received_buffer = ReceivedMessagesBuffer::new(
local_encryption_keypair,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
shutdown_token.clone(),
task_client.fork("received_messages_buffer"),
);
ReceivedMessagesBufferController {
fragmented_message_receiver: FragmentedMessageReceiver::new(
received_buffer.clone(),
mixnet_packet_receiver,
shutdown_token.clone(),
task_client.fork("fragmented_message_receiver"),
),
request_receiver: RequestReceiver::new(
received_buffer,
query_receiver,
shutdown_token.clone(),
task_client.with_suffix("request_receiver"),
),
}
}
pub(crate) fn into_tasks(self) -> (FragmentedMessageReceiver<R>, RequestReceiver<R>) {
(self.fragmented_message_receiver, self.request_receiver)
pub fn start(self) {
let mut fragmented_message_receiver = self.fragmented_message_receiver;
let mut request_receiver = self.request_receiver;
spawn_future(async move {
match fragmented_message_receiver.run().await {
Ok(_) => {}
Err(e) => error!("{e}"),
}
});
spawn_future(async move {
request_receiver.run().await;
});
}
}
@@ -7,7 +7,7 @@ use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationC
use crate::client::replies::reply_storage::CombinedReplyStorage;
use crate::config;
use futures::StreamExt;
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use rand::rngs::OsRng;
use rand::{CryptoRng, Rng};
use std::time::Duration;
@@ -60,6 +60,9 @@ pub struct ReplyController<R> {
receiver_controller: ReceiverReplyController<R>,
request_receiver: ReplyControllerReceiver,
// Listen for shutdown signals
task_client: TaskClient,
}
impl ReplyController<OsRng> {
@@ -68,6 +71,7 @@ impl ReplyController<OsRng> {
message_handler: MessageHandler<OsRng>,
full_reply_storage: CombinedReplyStorage,
request_receiver: ReplyControllerReceiver,
task_client: TaskClient,
) -> Self {
ReplyController {
config,
@@ -82,6 +86,7 @@ impl ReplyController<OsRng> {
message_handler,
),
request_receiver,
task_client,
}
}
}
@@ -143,21 +148,22 @@ where
self.sender_controller.inspect_and_clear_stale_data(now)
}
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
pub(crate) async fn run(&mut self) {
debug!("Started ReplyController with graceful shutdown support");
let mut shutdown = self.task_client.fork("reply-controller");
let polling_rate = Duration::from_secs(5);
let mut stale_inspection = new_interval_stream(polling_rate);
let polling_rate = self.config.key_rotation.epoch_duration / 8;
let mut invalidation_inspection = new_interval_stream(polling_rate);
loop {
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
_ = shutdown.recv() => {
tracing::trace!("ReplyController: Received shutdown");
break;
},
req = self.request_receiver.next() => match req {
Some(req) => self.handle_request(req).await,
@@ -175,6 +181,7 @@ where
}
}
}
assert!(shutdown.is_shutdown_poll());
tracing::debug!("ReplyController: Exiting");
}
}
@@ -155,9 +155,8 @@ where
data: Vec<Arc<PendingAcknowledgement>>,
) {
trace!("re-inserting pending retransmissions for {recipient}");
// SAFETY: the underlying entry MUST exist as we've just got data from there
// the underlying entry MUST exist as we've just got data from there
// and we hold a mut reference
#[allow(clippy::expect_used)]
let map_entry = &mut self
.surb_senders
.get_mut(recipient)
@@ -430,7 +429,6 @@ where
.pop_at_most_n_next_messages_at_random(amount)
}
#[allow(clippy::panic)]
async fn try_clear_pending_queue(&mut self, target: AnonymousSenderTag) {
trace!("trying to clear pending queue");
let available_surbs = self.surbs_storage.available_surbs(&target);
@@ -16,17 +16,21 @@
#![warn(clippy::todo)]
#![warn(clippy::dbg_macro)]
use crate::client::inbound_messages::{InputMessage, InputMessageSender};
use futures::StreamExt;
use nym_client_core_config_types::StatsReporting;
use nym_sphinx::addressing::Recipient;
use nym_statistics_common::clients::{
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
};
use nym_task::{connections::TransmissionLane, ShutdownToken, ShutdownTracker};
use nym_task::{connections::TransmissionLane, TaskClient};
use std::time::Duration;
/// Time interval between reporting statistics locally (logging/shutdown_token)
use crate::{
client::inbound_messages::{InputMessage, InputMessageSender},
spawn_future,
};
/// Time interval between reporting statistics locally (logging/task_client)
const LOCAL_REPORT_INTERVAL: Duration = Duration::from_secs(2);
/// Interval for taking snapshots of the statistics
const SNAPSHOT_INTERVAL: Duration = Duration::from_millis(500);
@@ -47,6 +51,9 @@ pub(crate) struct StatisticsControl {
/// Config for stats reporting (enabled, address, interval)
reporting_config: StatsReporting,
/// Task client for listening for shutdown
task_client: TaskClient,
}
impl StatisticsControl {
@@ -55,20 +62,24 @@ impl StatisticsControl {
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> (Self, ClientStatsSender) {
let (stats_tx, stats_rx) = tokio::sync::mpsc::unbounded_channel();
let stats = ClientStatsController::new(client_stats_id, client_type);
let mut task_client_stats_sender = task_client.fork("stats_sender");
task_client_stats_sender.disarm();
(
StatisticsControl {
stats,
stats_rx,
report_tx,
reporting_config,
task_client,
},
ClientStatsSender::new(Some(stats_tx), shutdown_token),
ClientStatsSender::new(Some(stats_tx), task_client_stats_sender),
)
}
@@ -88,8 +99,7 @@ impl StatisticsControl {
}
}
// manually control the shutdown mechanism as we don't want to get interrupted mid-snapshot
pub async fn run(&mut self, shutdown_token: ShutdownToken) {
async fn run(&mut self) {
tracing::debug!("Started StatisticsControl with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
@@ -119,10 +129,10 @@ impl StatisticsControl {
let mut snapshot_interval =
gloo_timers::future::IntervalStream::new(SNAPSHOT_INTERVAL.as_millis() as u32);
loop {
while !self.task_client.is_shutdown() {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
_ = self.task_client.recv() => {
tracing::trace!("StatisticsControl: Received shutdown");
break;
},
@@ -147,34 +157,34 @@ impl StatisticsControl {
}
_ = local_report_interval.next() => {
self.stats.local_report();
self.stats.local_report(&mut self.task_client);
}
}
}
tracing::debug!("StatisticsControl: Exiting");
}
pub(crate) fn start(mut self) {
spawn_future(async move {
self.run().await;
})
}
pub(crate) fn create_and_start(
reporting_config: StatsReporting,
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
shutdown_tracker: &ShutdownTracker,
task_client: TaskClient,
) -> ClientStatsSender {
let (mut controller, sender) = Self::create(
let (controller, sender) = Self::create(
reporting_config,
client_type,
client_stats_id,
report_tx,
shutdown_tracker.child_shutdown_token(),
);
let shutdown_token = shutdown_tracker.clone_shutdown_token();
shutdown_tracker.try_spawn_named(
async move {
controller.run(shutdown_token).await;
},
"StatisticsControl",
task_client,
);
controller.start();
sender
}
}
@@ -126,7 +126,7 @@ impl TopologyAccessor {
.map(|p| p.topology.clone())
}
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<'_, NymRouteProvider>> {
pub async fn current_route_provider(&self) -> Option<RwLockReadGuard<NymRouteProvider>> {
let provider = self.inner.topology.read().await;
if provider.topology.is_empty() {
None
@@ -1,9 +1,11 @@
// Copyright 2021-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::spawn_future;
pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
use futures::StreamExt;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_task::TaskClient;
use nym_topology::NymTopologyError;
use std::time::Duration;
use tracing::*;
@@ -39,6 +41,8 @@ pub struct TopologyRefresher {
refresh_rate: Duration,
consecutive_failure_count: usize,
task_client: TaskClient,
}
impl TopologyRefresher {
@@ -46,12 +50,14 @@ impl TopologyRefresher {
cfg: TopologyRefresherConfig,
topology_accessor: TopologyAccessor,
topology_provider: Box<dyn TopologyProvider + Send + Sync>,
task_client: TaskClient,
) -> Self {
TopologyRefresher {
topology_provider,
topology_accessor,
refresh_rate: cfg.refresh_rate,
consecutive_failure_count: 0,
task_client,
}
}
@@ -138,30 +144,37 @@ impl TopologyRefresher {
}
}
// it's perfectly fine if task is interrupted mid-refresh
// there's no data to persist or send over
pub async fn run(&mut self) {
debug!("Started TopologyRefresher with graceful shutdown support");
pub fn start(mut self) {
spawn_future(async move {
debug!("Started TopologyRefresher with graceful shutdown support");
#[cfg(not(target_arch = "wasm32"))]
let mut interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(self.refresh_rate));
#[cfg(not(target_arch = "wasm32"))]
let mut interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(
self.refresh_rate,
));
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
#[cfg(target_arch = "wasm32")]
let mut interval =
gloo_timers::future::IntervalStream::new(self.refresh_rate.as_millis() as u32);
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
// We already have an initial topology, so no need to refresh it immediately.
// My understanding is that js setInterval does not fire immediately, so it's not
// needed there.
#[cfg(not(target_arch = "wasm32"))]
interval.next().await;
while interval.next().await.is_some() {
self.try_refresh().await;
}
// this should never get triggered
error!("topology refresher interval has been exhausted!")
while !self.task_client.is_shutdown() {
tokio::select! {
_ = interval.next() => {
self.try_refresh().await;
},
_ = self.task_client.recv() => {
tracing::trace!("TopologyRefresher: Received shutdown");
},
}
}
self.task_client.recv_timeout().await;
tracing::debug!("TopologyRefresher: Exiting");
})
}
}
@@ -2,10 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use nym_mixnet_contract_common::EpochRewardedSet;
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
use nym_topology::NymTopology;
use nym_validator_client::nym_api::NymApiClientExt;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
@@ -41,43 +39,30 @@ impl Config {
pub struct NymApiTopologyProvider {
config: Config,
validator_client: nym_http_api_client::Client,
validator_client: nym_validator_client::client::NymApiClient,
nym_api_urls: Vec<Url>,
currently_used_api: usize,
use_bincode: bool,
}
impl NymApiTopologyProvider {
pub fn new(
config: impl Into<Config>,
mut nym_api_urls: Vec<Url>,
validator_client: nym_http_api_client::Client,
mut validator_client: nym_validator_client::client::NymApiClient,
) -> Self {
nym_api_urls.shuffle(&mut thread_rng());
let mut provider = NymApiTopologyProvider {
validator_client.change_nym_api(nym_api_urls[0].clone());
NymApiTopologyProvider {
config: config.into(),
validator_client,
nym_api_urls,
currently_used_api: 0,
use_bincode: true,
};
// Set all API URLs - the client will try them in order with automatic failover
provider.validator_client.change_base_urls(
provider
.nym_api_urls
.iter()
.map(|u| u.clone().into())
.collect(),
);
provider
}
}
pub fn disable_bincode(&mut self) {
self.use_bincode = false;
// Note: The unified client doesn't support toggling bincode after creation.
// This would require recreating the client without bincode.
// For now, we'll track the preference but it won't take effect.
warn!("Disabling bincode on existing client is not currently supported");
self.validator_client.use_bincode = false;
}
fn use_next_nym_api(&mut self) {
@@ -87,19 +72,8 @@ impl NymApiTopologyProvider {
}
self.currently_used_api = (self.currently_used_api + 1) % self.nym_api_urls.len();
// Provide all URLs starting from the next one in rotation order
// This enables automatic failover to other endpoints
let rotated_urls: Vec<_> = self
.nym_api_urls
.iter()
.cycle()
.skip(self.currently_used_api)
.take(self.nym_api_urls.len())
.map(|u| u.clone().into())
.collect();
self.validator_client.change_base_urls(rotated_urls)
self.validator_client
.change_nym_api(self.nym_api_urls[self.currently_used_api].clone())
}
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
@@ -125,13 +99,8 @@ impl NymApiTopologyProvider {
.filter(|n| n.performance.round_to_integer() >= self.config.min_node_performance())
.collect::<Vec<_>>();
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
NymTopology::new(
metadata.to_topology_metadata(),
epoch_rewarded_set,
Vec::new(),
)
.with_skimmed_nodes(&nodes_filtered)
NymTopology::new(metadata.to_topology_metadata(), rewarded_set, Vec::new())
.with_skimmed_nodes(&nodes_filtered)
} else {
// if we're not using extended topology, we're only getting active set mixnodes and gateways
@@ -179,13 +148,8 @@ impl NymApiTopologyProvider {
}
}
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
NymTopology::new(
metadata.to_topology_metadata(),
epoch_rewarded_set,
Vec::new(),
)
.with_skimmed_nodes(&nodes)
NymTopology::new(metadata.to_topology_metadata(), rewarded_set, Vec::new())
.with_skimmed_nodes(&nodes)
};
if !topology.is_minimally_routable() {
+5 -28
View File
@@ -4,13 +4,10 @@
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
use nym_task::RegistryAccessError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::nym_api::error::NymAPIError;
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::ValidatorClientError;
use rand::distributions::WeightedError;
use std::error::Error;
use std::path::PathBuf;
@@ -57,7 +54,10 @@ pub enum ClientCoreError {
ListOfNymApisIsEmpty,
#[error("failed to resolve a query to nym API: {source}")]
NymApiQueryFailure { source: Box<NymAPIError> },
NymApiQueryFailure {
#[from]
source: NymAPIError,
},
#[error(
"the current network topology seem to be insufficient to route any packets through:\n\t{0}"
@@ -230,22 +230,7 @@ pub enum ClientCoreError {
UnexpectedKeyUpgrade { gateway_id: String },
#[error("failed to derive keys from master key")]
HkdfDerivationError,
#[error("missing url for constructing RPC client")]
RpcClientMissingUrl,
#[error("provided nym network details were malformed: {source}")]
InvalidNetworkDetails { source: NyxdError },
#[error("failed to construct RPC client: {source}")]
RpcClientCreationFailure { source: NyxdError },
#[error("failed to select valid gateway due to incomputable latency")]
GatewaySelectionFailure { source: WeightedError },
#[error("Could not access task registry, {0}")]
RegistryAccess(#[from] RegistryAccessError),
HkdfDerivationError {},
}
impl From<tungstenite::Error> for ClientCoreError {
@@ -256,14 +241,6 @@ impl From<tungstenite::Error> for ClientCoreError {
}
}
impl From<NymAPIError> for ClientCoreError {
fn from(err: NymAPIError) -> ClientCoreError {
ClientCoreError::NymApiQueryFailure {
source: Box::new(err),
}
}
}
/// Set of messages that the client can send to listeners via the task manager
#[derive(Debug)]
pub enum ClientCoreStatusMessage {
+22 -134
View File
@@ -7,8 +7,7 @@ use futures::{SinkExt, StreamExt};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::GatewayClient;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::{IdentityKeyRef, NymApiClientExt};
use nym_validator_client::nym_nodes::SkimmedNodesWithMetadata;
use nym_validator_client::client::IdentityKeyRef;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
#[cfg(unix)]
@@ -45,7 +44,6 @@ type WsConn = JSWebsocket;
const CONCURRENT_GATEWAYS_MEASURED: usize = 20;
const MEASUREMENTS: usize = 3;
const DEFAULT_NYM_API_RETRIES: usize = 3;
#[cfg(not(target_arch = "wasm32"))]
const CONN_TIMEOUT: Duration = Duration::from_millis(1500);
@@ -85,88 +83,32 @@ struct GatewayWithLatency<'a, G: ConnectableGateway> {
latency: Duration,
}
// Helper to collect all pages of entry nodes - replicates NymApiClient's convenience method
async fn get_all_basic_entry_nodes_with_metadata(
client: &nym_http_api_client::Client,
use_bincode: bool,
) -> Result<SkimmedNodesWithMetadata, ClientCoreError> {
// Get first page to obtain metadata
let mut page = 0;
let res = client
.get_basic_entry_assigned_nodes_v2(false, Some(page), None, use_bincode)
.await?;
let mut nodes = res.nodes.data;
let metadata = res.metadata;
if res.nodes.pagination.total == nodes.len() {
return Ok(SkimmedNodesWithMetadata::new(nodes, metadata));
}
page += 1;
// Collect remaining pages
loop {
let mut res = client
.get_basic_entry_assigned_nodes_v2(false, Some(page), None, use_bincode)
.await?;
if !metadata.consistency_check(&res.metadata) {
return Err(ClientCoreError::ValidatorClientError(
nym_validator_client::ValidatorClientError::InconsistentPagedMetadata,
));
}
nodes.append(&mut res.nodes.data);
if nodes.len() < res.nodes.pagination.total {
page += 1
} else {
break;
}
}
Ok(SkimmedNodesWithMetadata::new(nodes, metadata))
}
impl<'a, G: ConnectableGateway> GatewayWithLatency<'a, G> {
fn new(gateway: &'a G, latency: Duration) -> Self {
GatewayWithLatency { gateway, latency }
}
}
pub async fn gateways_for_init(
pub async fn gateways_for_init<R: Rng>(
rng: &mut R,
nym_apis: &[Url],
user_agent: Option<UserAgent>,
minimum_performance: u8,
ignore_epoch_roles: bool,
retry_count: Option<usize>,
) -> Result<Vec<RoutingNode>, ClientCoreError> {
// Build client with ALL URLs for fallback support
let nym_api_urls: Vec<nym_http_api_client::Url> = nym_apis
.iter()
.map(|url| nym_http_api_client::Url::from(url.clone()))
.collect();
let nym_api = nym_apis
.choose(rng)
.ok_or(ClientCoreError::ListOfNymApisIsEmpty)?;
let client = if let Some(user_agent) = user_agent {
nym_validator_client::client::NymApiClient::new_with_user_agent(nym_api.clone(), user_agent)
} else {
nym_validator_client::client::NymApiClient::new(nym_api.clone())
};
if nym_api_urls.is_empty() {
return Err(ClientCoreError::ListOfNymApisIsEmpty);
}
tracing::debug!("Fetching list of gateways from: {nym_api}");
let retry_count = retry_count.unwrap_or(DEFAULT_NYM_API_RETRIES);
let mut builder = nym_http_api_client::ClientBuilder::new_with_urls(nym_api_urls.clone())?
.with_retries(retry_count)
.with_bincode();
if let Some(user_agent) = user_agent {
builder = builder.with_user_agent(user_agent);
}
let client = builder.build().map_err(|e| {
ClientCoreError::ValidatorClientError(nym_validator_client::ValidatorClientError::from(e))
})?;
tracing::debug!("Fetching list of gateways from: {:?}", nym_api_urls);
// Use our helper to handle pagination
let gateways = get_all_basic_entry_nodes_with_metadata(&client, true)
let gateways = client
.get_all_basic_entry_assigned_nodes_with_metadata()
.await?
.nodes;
info!("nym api reports {} gateways", gateways.len());
@@ -175,15 +117,17 @@ pub async fn gateways_for_init(
// filter out gateways below minimum performance and ones that could operate as a mixnode
// (we don't want instability)
let valid_gateways: Vec<RoutingNode> = gateways
let valid_gateways = gateways
.iter()
.filter(|g| ignore_epoch_roles || !g.supported_roles.mixnode)
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
.filter_map(|gateway| gateway.try_into().ok())
.collect();
.collect::<Vec<_>>();
tracing::debug!("After checking validity: {}", valid_gateways.len());
tracing::trace!("Valid gateways: {valid_gateways:#?}");
tracing::info!(
"Found {} valid gateways after filtering",
"and {} after validity and performance filtering",
valid_gateways.len()
);
@@ -204,7 +148,7 @@ async fn connect(endpoint: &str) -> Result<WsConn, ClientCoreError> {
JSWebsocket::new(endpoint).map_err(|_| ClientCoreError::GatewayJsConnectionFailure)
}
async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<'_, G>, ClientCoreError>
async fn measure_latency<G>(gateway: &G) -> Result<GatewayWithLatency<G>, ClientCoreError>
where
G: ConnectableGateway,
{
@@ -301,7 +245,7 @@ pub async fn choose_gateway_by_latency<R: Rng, G: ConnectableGateway + Clone>(
let gateways_with_latency = gateways_with_latency.lock().await;
let chosen = gateways_with_latency
.choose_weighted(rng, |item| 1. / item.latency.as_secs_f32())
.map_err(|source| ClientCoreError::GatewaySelectionFailure { source })?;
.expect("invalid selection weight!");
info!(
"chose gateway {} with average latency of {:?}",
@@ -346,20 +290,13 @@ pub(super) fn get_specified_gateway(
must_use_tls: bool,
) -> Result<RoutingNode, ClientCoreError> {
tracing::debug!("Requesting specified gateway: {gateway_identity}");
let user_gateway = ed25519::PublicKey::from_base58_string(gateway_identity)
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
let gateway = gateways
.iter()
.find(|gateway| gateway.identity_key == user_gateway)
.ok_or_else(|| {
tracing::debug!(
"Gateway {gateway_identity} not found in {} available gateways",
gateways.len()
);
ClientCoreError::NoGatewayWithId(gateway_identity.to_string())
})?;
.ok_or_else(|| ClientCoreError::NoGatewayWithId(gateway_identity.to_string()))?;
let Some(entry_details) = gateway.entry.as_ref() else {
return Err(ClientCoreError::UnsupportedEntry {
@@ -422,52 +359,3 @@ pub(super) async fn register_with_gateway(
authenticated_ephemeral_client: gateway_client,
})
}
#[cfg(test)]
mod tests {
use url::Url;
#[test]
fn test_single_url_builds_without_retries() {
let urls = [Url::parse("https://api.nym.com").unwrap()];
let nym_api_urls: Vec<nym_http_api_client::Url> = urls
.iter()
.map(|url| nym_http_api_client::Url::from(url.clone()))
.collect();
assert_eq!(nym_api_urls.len(), 1, "Should have exactly one URL");
}
#[test]
fn test_multiple_urls_prepared_for_retries() {
let urls = [
Url::parse("https://api1.nym.com").unwrap(),
Url::parse("https://api2.nym.com").unwrap(),
Url::parse("https://api3.nym.com").unwrap(),
];
let nym_api_urls: Vec<nym_http_api_client::Url> = urls
.iter()
.map(|url| nym_http_api_client::Url::from(url.clone()))
.collect();
assert_eq!(nym_api_urls.len(), 3, "Should have all three URLs");
assert!(
nym_api_urls.len() > 1,
"Multiple URLs trigger retry behavior"
);
}
#[test]
fn test_empty_url_list_is_detected() {
let urls: Vec<Url> = vec![];
let nym_api_urls: Vec<nym_http_api_client::Url> = urls
.iter()
.map(|url| nym_http_api_client::Url::from(url.clone()))
.collect();
assert!(nym_api_urls.is_empty(), "Empty list should remain empty");
}
}
+3 -9
View File
@@ -17,22 +17,16 @@ pub use nym_topology::{
HardcodedTopologyProvider, NymRouteProvider, NymTopology, NymTopologyError, TopologyProvider,
};
#[deprecated(note = "use spawn_future from nym_task crate instead")]
#[cfg(target_arch = "wasm32")]
#[track_caller]
pub fn spawn_future<F>(future: F)
pub(crate) fn spawn_future<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
wasm_bindgen_futures::spawn_local(future);
}
#[deprecated(note = "use spawn_future from nym_task crate instead")]
#[cfg(not(target_arch = "wasm32"))]
#[track_caller]
pub fn spawn_future<F>(future: F)
pub(crate) fn spawn_future<F>(future: F)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
@@ -30,7 +30,6 @@ optional = true
path = "../../../sqlx-pool-guard"
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
+4 -7
View File
@@ -2,24 +2,23 @@
// SPDX-License-Identifier: Apache-2.0
#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn main() {
#[cfg(feature = "fs-surb-storage")]
{
use anyhow::Context;
use sqlx::{Connection, SqliteConnection};
use std::env;
let out_dir = env::var("OUT_DIR")?;
let out_dir = env::var("OUT_DIR").unwrap();
let database_path = format!("{out_dir}/fs-surbs-example.sqlite");
let mut conn = SqliteConnection::connect(&format!("sqlite://{database_path}?mode=rwc"))
.await
.context("Failed to create SQLx database connection")?;
.expect("Failed to create SQLx database connection");
sqlx::migrate!("./fs_surbs_migrations")
.run(&mut conn)
.await
.context("Failed to perform SQLx migrations")?;
.expect("Failed to perform SQLx migrations");
#[cfg(target_family = "unix")]
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);
@@ -29,6 +28,4 @@ async fn main() -> anyhow::Result<()> {
// not a valid windows path... but hey, it works...
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
}
Ok(())
}
+2 -2
View File
@@ -40,7 +40,7 @@ where
pub async fn flush_on_shutdown(
mut self,
mem_state: CombinedReplyStorage,
shutdown: nym_task::ShutdownToken,
mut shutdown: nym_task::TaskClient,
) {
use tracing::{debug, error, info};
@@ -50,7 +50,7 @@ where
return;
}
shutdown.cancelled().await;
shutdown.recv().await;
info!("PersistentReplyStorage is flushing all reply-related data to underlying storage");
if let Err(err) = self.backend.flush_surb_storage(&mem_state).await {
@@ -12,7 +12,7 @@ use crate::socket_state::{ws_fd, PartiallyDelegatedHandle, SocketState};
use crate::traits::GatewayPacketRouter;
use crate::{cleanup_socket_message, try_decrypt_binary_message};
use futures::{SinkExt, StreamExt};
use nym_bandwidth_controller::BandwidthController;
use nym_bandwidth_controller::{BandwidthController, BandwidthStatusMessage};
use nym_credential_storage::ephemeral_storage::EphemeralStorage as EphemeralCredentialStorage;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_credentials::CredentialSpendingData;
@@ -27,7 +27,7 @@ use nym_gateway_requests::{
use nym_sphinx::forwarding::packet::MixPacket;
use nym_statistics_common::clients::connection::ConnectionStatsEvent;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::ShutdownToken;
use nym_task::TaskClient;
use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
use rand::rngs::OsRng;
use std::sync::Arc;
@@ -109,7 +109,7 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
/// Listen to shutdown messages and send notifications back to the task manager
shutdown_token: ShutdownToken,
task_client: TaskClient,
}
impl<C, St> GatewayClient<C, St> {
@@ -124,7 +124,7 @@ impl<C, St> GatewayClient<C, St> {
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> Self {
GatewayClient {
cfg,
@@ -141,7 +141,7 @@ impl<C, St> GatewayClient<C, St> {
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
shutdown_token,
task_client,
}
}
@@ -201,7 +201,7 @@ impl<C, St> GatewayClient<C, St> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn establish_connection(&mut self) -> Result<(), GatewayClientError> {
debug!(
"Attempting to establish connection to gateway at: {}",
"Attemting to establish connection to gateway at: {}",
self.gateway_address
);
let (ws_stream, _) = connect_async(
@@ -293,7 +293,7 @@ impl<C, St> GatewayClient<C, St> {
loop {
tokio::select! {
_ = self.shutdown_token.cancelled() => {
_ = self.task_client.recv() => {
log::trace!("GatewayClient control response: Received shutdown");
log::debug!("GatewayClient control response: Exiting");
break Err(GatewayClientError::ConnectionClosedGatewayShutdown);
@@ -514,7 +514,7 @@ impl<C, St> GatewayClient<C, St> {
self.cfg.bandwidth.require_tickets,
derive_aes256_gcm_siv_key,
#[cfg(not(target_arch = "wasm32"))]
self.shutdown_token.clone(),
self.task_client.clone(),
)
.await
.map_err(GatewayClientError::RegistrationFailure),
@@ -631,6 +631,9 @@ impl<C, St> GatewayClient<C, St> {
self.negotiated_protocol = protocol_version;
log::debug!("authenticated: {status}, bandwidth remaining: {bandwidth_remaining}");
self.task_client.send_status_msg(Box::new(
BandwidthStatusMessage::RemainingBandwidth(bandwidth_remaining),
));
Ok(())
}
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
@@ -1066,7 +1069,7 @@ impl<C, St> GatewayClient<C, St> {
.expect("no shared key present even though we're authenticated!"),
),
self.bandwidth.clone(),
self.shutdown_token.clone(),
self.task_client.clone(),
)
}
_ => unreachable!(),
@@ -1140,8 +1143,8 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
// perfectly fine here, because it's not meant to be used
let (ack_tx, _) = mpsc::unbounded();
let (mix_tx, _) = mpsc::unbounded();
let shutdown_token = ShutdownToken::default();
let packet_router = PacketRouter::new(ack_tx, mix_tx, shutdown_token.clone());
let task_client = TaskClient::dummy();
let packet_router = PacketRouter::new(ack_tx, mix_tx, task_client.clone());
GatewayClient {
cfg: GatewayClientConfig::default().with_disabled_credentials_mode(true),
@@ -1154,11 +1157,11 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
connection: SocketState::NotConnected,
packet_router,
bandwidth_controller: None,
stats_reporter: ClientStatsSender::new(None, shutdown_token.clone()),
stats_reporter: ClientStatsSender::new(None, task_client.clone()),
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
shutdown_token,
task_client,
}
}
@@ -1167,7 +1170,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
packet_router: PacketRouter,
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
shutdown_token: ShutdownToken,
task_client: TaskClient,
) -> GatewayClient<C, St> {
// invariants that can't be broken
// (unless somebody decided to expose some field that wasn't meant to be exposed)
@@ -1190,7 +1193,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
negotiated_protocol: self.negotiated_protocol,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
shutdown_token,
task_client,
}
}
}
@@ -7,7 +7,7 @@
use crate::error::GatewayClientError;
use crate::GatewayPacketRouter;
use futures::channel::mpsc;
use nym_task::ShutdownToken;
use nym_task::TaskClient;
pub type MixnetMessageSender = mpsc::UnboundedSender<Vec<Vec<u8>>>;
pub type MixnetMessageReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
@@ -19,14 +19,14 @@ pub type AcknowledgementReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
pub struct PacketRouter {
ack_sender: AcknowledgementSender,
mixnet_message_sender: MixnetMessageSender,
shutdown: ShutdownToken,
shutdown: TaskClient,
}
impl PacketRouter {
pub fn new(
ack_sender: AcknowledgementSender,
mixnet_message_sender: MixnetMessageSender,
shutdown: ShutdownToken,
shutdown: TaskClient,
) -> Self {
PacketRouter {
ack_sender,
@@ -42,7 +42,7 @@ impl PacketRouter {
if let Err(err) = self.mixnet_message_sender.unbounded_send(received_messages) {
// check if the failure is due to the shutdown being in progress and thus the receiver channel
// having already been dropped
if self.shutdown.is_cancelled() {
if self.shutdown.is_shutdown_poll() || self.shutdown.is_dummy() {
// This should ideally not happen, but it's ok
tracing::warn!("Failed to send mixnet messages due to receiver task shutdown");
return Err(GatewayClientError::ShutdownInProgress);
@@ -58,7 +58,7 @@ impl PacketRouter {
if let Err(err) = self.ack_sender.unbounded_send(received_acks) {
// check if the failure is due to the shutdown being in progress and thus the receiver channel
// having already been dropped
if self.shutdown.is_cancelled() {
if self.shutdown.is_shutdown_poll() || self.shutdown.is_dummy() {
// This should ideally not happen, but it's ok
tracing::warn!("Failed to send acks due to receiver task shutdown");
return Err(GatewayClientError::ShutdownInProgress);
@@ -69,6 +69,10 @@ impl PacketRouter {
}
Ok(())
}
pub fn disarm(&mut self) {
self.shutdown.disarm();
}
}
impl GatewayPacketRouter for PacketRouter {

Some files were not shown because too many files have changed in this diff Show More