Compare commits

..

3 Commits

Author SHA1 Message Date
Sachin Kamath 1bdcf9c3cf fix review comments 2024-11-06 14:16:13 +05:30
Sachin Kamath 4ebb9cd239 clippy 2024-11-05 16:01:18 +05:30
Sachin Kamath 620d68ea2f nyxd-scraper: add config to make pre-commit storage optional 2024-11-05 15:49:30 +05:30
965 changed files with 17086 additions and 44578 deletions
+28 -15
View File
@@ -6,27 +6,20 @@ on:
jobs:
build:
runs-on: arc-ubuntu-20.04
defaults:
run:
working-directory: documentation/docs
steps:
- uses: actions/checkout@v4
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get install -y build-essential curl wget libssl-dev libudev-dev squashfs-tools protobuf-compiler git python3 && sudo apt-get update --fix-missing
- name: Install pip3
run: sudo apt install -y python3-pip
run: sudo apt install -y python3-pip
- name: Install Python3 modules
run: sudo pip3 install pandas tabulate
- name: Install rsync
run: sudo apt-get install rsync
- uses: rlespinasse/github-slug-action@v3.x
- name: Setup pnpm
uses: pnpm/action-setup@v4.0.0
with:
version: 9
- uses: actions/setup-node@v4
with:
node-version: 20
node-version: 18
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
@@ -36,13 +29,33 @@ jobs:
with:
command: build
args: --workspace --release
- name: Install mdbook and plugins
run: cd documentation && ./install_mdbook_deps.sh
- name: Remove existing Nym config directory (`~/.nym/`)
run: cd documentation && ./remove_existing_config.sh
continue-on-error: false
# This is the original flow
# - name: Build all projects in documentation/ & move to ~/dist/docs/
# run: cd documentation && ./build_all_to_dist.sh
- name: Install project dependencies
run: pnpm i
- name: Build project
run: pnpm run build
- name: Move files to /dist/
run: ../scripts/move-to-dist.sh
# This is a workaround replacement which builds on the last working commit b332a6b55668f60988e36961f3f62a794ba82ddb and then on current branch
- name: Save current branch to ~/current_branch
run: git rev-parse --abbrev-ref HEAD > ~/current_branch
- name: Git pull, reset & switch to b332a6b55668f60988e36961f3f62a794ba82ddb
run: git pull && git reset --hard && git checkout b332a6b55668f60988e36961f3f62a794ba82ddb
- name: Build all projects in documentation/ & move to ~/dist/docs/ from b332a6b55668f60988e36961f3f62a794ba82ddb
run: cd documentation && ./build_all_to_dist.sh
- name: Switch to current branch
run: git checkout $echo "$(cat ~/current_branch)"
- name: Build all projects in documentation/ & move to ~/dist/docs/ on current branch
run: cd documentation && ./build_all_to_dist.sh && rm ~/current_branch
# End of replacemet
- name: Post process
run: cd documentation && ./post_process.sh
continue-on-error: false
- name: Create Vercel project file
uses: mobiledevops/secret-to-file-action@v1
+29 -19
View File
@@ -3,35 +3,28 @@ name: ci-docs
on:
workflow_dispatch:
push:
branches-ignore: [master]
branches-ignore: master
paths:
- "documentation/docs/**"
- ".github/workflows/ci-docs.yml"
- 'documentation/docs/**'
- '.github/workflows/ci-docs.yml'
jobs:
build:
runs-on: arc-ubuntu-20.04
defaults:
run:
working-directory: documentation/docs
steps:
- uses: actions/checkout@v4
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get install -y build-essential curl wget libssl-dev libudev-dev squashfs-tools protobuf-compiler git python3 && sudo apt-get update --fix-missing
- name: Install pip3
run: sudo apt install -y python3-pip
run: sudo apt install -y python3-pip
- name: Install Python3 modules
run: sudo pip3 install pandas tabulate
- name: Install rsync
run: sudo apt-get install rsync
- uses: rlespinasse/github-slug-action@v3.x
- name: Setup pnpm
uses: pnpm/action-setup@v4.0.0
with:
version: 9
- uses: actions/setup-node@v4
with:
node-version: 20
node-version: 18
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
@@ -41,13 +34,30 @@ jobs:
with:
command: build
args: --workspace --release
- name: Install mdbook and plugins
run: cd documentation && ./install_mdbook_deps.sh
- name: Remove existing Nym config directory (`~/.nym/`)
run: cd documentation && ./remove_existing_config.sh
continue-on-error: false
- name: Install project dependencies
run: pnpm i
- name: Build project
run: pnpm run build
- name: Move files to /dist/
run: ../scripts/move-to-dist.sh
# This is the original flow
# - name: Build all projects in documentation/ & move to ~/dist/docs/
# run: cd documentation && ./build_all_to_dist.sh
# This is a workaround replacement which builds on the last working commit b332a6b55668f60988e36961f3f62a794ba82ddb and then on current branch
- name: Save current branch to ~/current_branch
run: git rev-parse --abbrev-ref HEAD > ~/current_branch
- name: Git pull, reset & switch to b332a6b55668f60988e36961f3f62a794ba82ddb
run: git pull && git reset --hard && git checkout b332a6b55668f60988e36961f3f62a794ba82ddb
- name: Build all projects in documentation/ & move to ~/dist/docs/ from b332a6b55668f60988e36961f3f62a794ba82ddb
run: cd documentation && ./build_all_to_dist.sh
- name: Switch to current branch
run: git checkout $echo "$(cat ~/current_branch)"
- name: Build all projects in documentation/ & move to ~/dist/docs/ on current branch
run: cd documentation && ./build_all_to_dist.sh && rm ~/current_branch
# End of replacemet
- name: Deploy branch to CI www
continue-on-error: true
@@ -58,5 +68,5 @@ jobs:
SOURCE: "dist/docs/"
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/docs-nextra-${{ env.GITHUB_REF_SLUG }}
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/docs-${{ env.GITHUB_REF_SLUG }}
EXCLUDE: "/node_modules/"
@@ -0,0 +1,45 @@
name: ci-nym-credential-proxy
on:
pull_request:
paths:
- 'common/**'
- 'nym-credential-proxy/**'
- '.github/workspace/ci-nym-credential-proxy.yml'
workflow_dispatch:
jobs:
build:
runs-on: arc-ubuntu-22.04
env:
CARGO_TERM_COLOR: always
MANIFEST_PATH: "--manifest-path nym-credential-proxy/Cargo.toml"
steps:
- name: Check out repository code
uses: actions/checkout@v4
- name: Install rust toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
components: rustfmt, clippy
- name: Check formatting
uses: actions-rs/cargo@v1
with:
command: fmt
args: ${{ env.MANIFEST_PATH }} --all -- --check
- name: Build
uses: actions-rs/cargo@v1
with:
command: build
args: ${{ env.MANIFEST_PATH }} --workspace --all-targets
- name: Clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: ${{ env.MANIFEST_PATH }} --workspace --all-targets -- -D warnings
@@ -0,0 +1,79 @@
name: ci-sdk-docs-typescript
on:
pull_request:
paths:
- "sdk/typescript/**"
- "wasm/**"
- '.github/workflows/ci-sdk-docs-typescript.yml'
jobs:
build:
runs-on: custom-linux
steps:
- uses: actions/checkout@v4
- name: Install rsync
run: sudo apt-get install rsync
continue-on-error: true
- uses: rlespinasse/github-slug-action@v3.x
- uses: actions/setup-node@v4
with:
node-version: 18.17
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Setup yarn
run: npm install -g yarn
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.20'
- name: Install wasm-pack
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
- name: Install wasm-opt
uses: ./.github/actions/install-wasm-opt
with:
version: '116'
- name: Build branch WASM packages
run: make sdk-wasm-build
- name: Install
run: yarn
- name: Build
run: yarn docs:prod:build
- name: Deploy branch to CI www (docs)
continue-on-error: true
uses: easingthemes/ssh-deploy@main
env:
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
ARGS: "-rltgoDzvO --delete"
SOURCE: "dist/ts/"
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/sdk-ts-docs-${{ env.GITHUB_REF_SLUG }}
EXCLUDE: "/dist/, /node_modules/"
- name: Matrix - Node Install
run: npm install
working-directory: .github/workflows/support-files
- name: Matrix - Send Notification
env:
NYM_NOTIFICATION_KIND: ts-packages
NYM_PROJECT_NAME: "sdk-ts-docs"
NYM_CI_WWW_BASE: "${{ secrets.NYM_CI_WWW_BASE }}/docs/sdk/typescript"
NYM_CI_WWW_LOCATION: "sdk-ts-docs-${{ env.GITHUB_REF_SLUG }}"
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
GIT_BRANCH: "${GITHUB_REF##*/}"
IS_SUCCESS: "${{ job.status == 'success' }}"
MATRIX_SERVER: "${{ secrets.MATRIX_SERVER }}"
MATRIX_ROOM: "${{ secrets.MATRIX_ROOM }}"
MATRIX_USER_ID: "${{ secrets.MATRIX_USER_ID }}"
MATRIX_TOKEN: "${{ secrets.MATRIX_TOKEN }}"
MATRIX_DEVICE_ID: "${{ secrets.MATRIX_DEVICE_ID }}"
uses: docker://keybaseio/client:stable-node
with:
args: .github/workflows/support-files/notifications/entry_point.sh
+1 -3
View File
@@ -4,9 +4,7 @@ on:
pull_request:
branches:
- develop
- "release/**"
paths-ignore:
- "documentation/**"
- 'release/**'
types:
- labeled
- unlabeled
+10 -15
View File
@@ -2,10 +2,6 @@ name: Build and upload Node Status agent container to harbor.nymte.ch
on:
workflow_dispatch:
inputs:
gateway_probe_git_ref:
type: string
description: Which gateway probe git ref to build the image with
env:
WORKING_DIRECTORY: "nym-node-status-agent"
@@ -36,26 +32,25 @@ jobs:
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: cleanup-gateway-probe-ref
id: cleanup_gateway_probe_ref
- name: Check if tag exists
run: |
GATEWAY_PROBE_GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }}
GIT_REF_SLUG="${GATEWAY_PROBE_GIT_REF//\//-}"
echo "git_ref=${GIT_REF_SLUG}" >> $GITHUB_OUTPUT
if git rev-parse ${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
fi
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }} -m "Version ${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: BuildAndPushImageOnHarbor
run: |
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
-55
View File
@@ -1,55 +0,0 @@
name: Build and upload nym node container to harbor.nymte.ch
on:
workflow_dispatch:
env:
WORKING_DIRECTORY: "nym-node"
CONTAINER_NAME: "nym-node"
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
- name: Checkout repo
uses: actions/checkout@v4
- name: Configure git identity
run: |
git config --global user.email "lawrence@nymtech.net"
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.44.3
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Check if tag exists
run: |
if git rev-parse ${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
fi
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
@@ -1,49 +0,0 @@
name: Build and upload Validator Rewarder container to harbor.nymte.ch
on:
workflow_dispatch:
env:
WORKING_DIRECTORY: "nym-validator-rewarder"
CONTAINER_NAME: "validator-rewarder"
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
- name: Checkout repo
uses: actions/checkout@v4
- name: Configure git identity
run: |
git config --global user.email "lawrence@nymtech.net"
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.44.3
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/nym-credential-proxy/Cargo.toml
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
Generated
+719 -470
View File
File diff suppressed because it is too large Load Diff
+23 -25
View File
@@ -19,33 +19,33 @@ members = [
"clients/native",
"clients/native/websocket-requests",
"clients/socks5",
"common/async-file-watcher",
"common/authenticator-requests",
"common/async-file-watcher",
"common/bandwidth-controller",
"common/bin-common",
"common/client-core",
"common/client-core/config-types",
"common/client-core/gateways-storage",
"common/client-core/surb-storage",
"common/client-core/gateways-storage",
"common/client-libs/gateway-client",
"common/client-libs/mixnet-client",
"common/client-libs/validator-client",
"common/commands",
"common/config",
"common/cosmwasm-smart-contracts/coconut-bandwidth-contract",
"common/cosmwasm-smart-contracts/ecash-contract",
"common/cosmwasm-smart-contracts/coconut-dkg",
"common/cosmwasm-smart-contracts/contracts-common",
"common/cosmwasm-smart-contracts/ecash-contract",
"common/cosmwasm-smart-contracts/group-contract",
"common/cosmwasm-smart-contracts/mixnet-contract",
"common/cosmwasm-smart-contracts/multisig-contract",
"common/cosmwasm-smart-contracts/vesting-contract",
"common/country-group",
"common/credential-storage",
"common/credential-utils",
"common/credential-verification",
"common/credentials",
"common/credential-utils",
"common/credentials-interface",
"common/credential-verification",
"common/crypto",
"common/dkg",
"common/ecash-double-spending",
@@ -65,10 +65,10 @@ members = [
"common/network-defaults",
"common/node-tester-utils",
"common/nonexhaustive-delayqueue",
"common/nymcoconut",
"common/nym_offline_compact_ecash",
"common/nym-id",
"common/nym-metrics",
"common/nym_offline_compact_ecash",
"common/nymcoconut",
"common/nymsphinx",
"common/nymsphinx/acknowledgements",
"common/nymsphinx/addressing",
@@ -98,30 +98,26 @@ members = [
"common/wasm/utils",
"common/wireguard",
"common/wireguard-types",
# "documentation/autodoc",
"explorer-api",
"explorer-api/explorer-api-requests",
"explorer-api/explorer-client",
"gateway",
"integrations/bity",
"mixnode",
"sdk/ffi/cpp",
"sdk/ffi/go",
"sdk/ffi/shared",
"sdk/lib/socks5-listener",
"sdk/rust/nym-sdk",
"sdk/ffi/shared",
"sdk/ffi/go",
"sdk/ffi/cpp",
"service-providers/authenticator",
"service-providers/common",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"nym-api",
"nym-api/nym-api-requests",
"nym-browser-extension/storage",
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-network-monitor",
"nym-api",
"nym-browser-extension/storage",
"nym-api/nym-api-requests",
"nym-data-observatory",
"nym-node",
"nym-node/nym-node-http-api",
"nym-node/nym-node-requests",
@@ -144,11 +140,11 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
]
default-members = [
@@ -159,7 +155,6 @@ default-members = [
"gateway",
"mixnode",
"nym-api",
"nym-credential-proxy/nym-credential-proxy",
"nym-data-observatory",
"nym-node",
"nym-node-status-api",
@@ -198,14 +193,16 @@ aead = "0.5.2"
anyhow = "1.0.90"
argon2 = "0.5.0"
async-trait = "0.1.83"
axum-client-ip = "0.6.1"
axum = "0.7.5"
axum-extra = "0.9.4"
base64 = "0.22.1"
bincode = "1.3.3"
bip39 = { version = "2.0.0", features = ["zeroize"] }
bit-vec = "0.7.0" # can we unify those?
# can we unify those?
bit-vec = "0.7.0"
bitvec = "1.0.0"
blake3 = "1.5.4"
bloomfilter = "1.0.14"
bs58 = "0.5.1"
@@ -272,7 +269,7 @@ ipnetwork = "0.20"
isocountry = "0.3.2"
itertools = "0.13.0"
k256 = "0.13"
lazy_static = "1.5.0"
lazy_static = "1.4.0"
ledger-transport = "0.10.0"
ledger-transport-hid = "0.10.0"
log = "0.4"
@@ -282,7 +279,7 @@ moka = { version = "0.12", features = ["future"] }
nix = "0.27.1"
notify = "5.1.0"
okapi = "0.7.0"
once_cell = "1.20.2"
once_cell = "1.7.2"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
parking_lot = "0.12.3"
@@ -410,6 +407,7 @@ wasm-bindgen-futures = "0.4.45"
wasmtimer = "0.2.0"
web-sys = "0.3.72"
# Profile settings for individual crates
# Compile-time verified queries do quite a bit of work at compile time. Incremental
@@ -102,10 +102,5 @@ average_ack_delay = '{{ debug.acknowledgements.average_ack_delay }}'
[debug.cover_traffic]
loop_cover_traffic_average_delay = '{{ debug.cover_traffic.loop_cover_traffic_average_delay }}'
[debug.stats_reporting]
enabled = {{ debug.stats_reporting.enabled }}
provider_address = '{{ debug.stats_reporting.provider_address }}'
reporting_interval = '{{ debug.stats_reporting.reporting_interval }}'
"#;
-1
View File
@@ -81,7 +81,6 @@ impl From<Init> for OverrideConfig {
nyxd_urls: init_config.common_args.nyxd_urls,
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
stats_reporting_address: init_config.common_args.stats_reporting_address,
}
}
}
-7
View File
@@ -13,7 +13,6 @@ use clap::{Parser, Subcommand};
use log::{error, info};
use nym_bin_common::bin_info;
use nym_bin_common::completions::{fig_generate, ArgShell};
use nym_client::client::Recipient;
use nym_client_core::cli_helpers::CliClient;
use nym_client_core::client::base_client::storage::migration_helpers::v1_1_33;
use nym_config::OptionalSet;
@@ -105,7 +104,6 @@ pub(crate) struct OverrideConfig {
no_cover: bool,
nyxd_urls: Option<Vec<url::Url>>,
enabled_credentials_mode: Option<bool>,
stats_reporting_address: Option<Recipient>,
}
pub(crate) async fn execute(args: Cli) -> Result<(), Box<dyn Error + Send + Sync>> {
@@ -151,11 +149,6 @@ pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
BaseClientConfig::with_disabled_credentials,
args.enabled_credentials_mode.map(|b| !b),
)
.with_optional_env_ext(
BaseClientConfig::with_enabled_stats_reporting_address,
args.stats_reporting_address,
nym_network_defaults::var_names::CLIENT_STATS_COLLECTION_PROVIDER,
)
}
async fn try_upgrade_v1_1_13_config(id: &str) -> Result<bool, ClientError> {
-1
View File
@@ -43,7 +43,6 @@ impl From<Run> for OverrideConfig {
no_cover: run_config.common_args.no_cover,
nyxd_urls: run_config.common_args.nyxd_urls,
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
stats_reporting_address: run_config.common_args.stats_reporting_address,
}
}
}
-1
View File
@@ -92,7 +92,6 @@ impl From<Init> for OverrideConfig {
nyxd_urls: init_config.common_args.nyxd_urls,
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
outfox: false,
stats_reporting_address: init_config.common_args.stats_reporting_address,
}
}
}
-7
View File
@@ -19,7 +19,6 @@ use nym_client_core::client::base_client::storage::migration_helpers::v1_1_33;
use nym_client_core::client::topology_control::geo_aware_provider::CountryGroup;
use nym_client_core::config::{GroupBy, TopologyStructure};
use nym_config::OptionalSet;
use nym_sphinx::addressing::Recipient;
use nym_sphinx::params::{PacketSize, PacketType};
use std::error::Error;
use std::net::IpAddr;
@@ -112,7 +111,6 @@ pub(crate) struct OverrideConfig {
nyxd_urls: Option<Vec<url::Url>>,
enabled_credentials_mode: Option<bool>,
outfox: bool,
stats_reporting_address: Option<Recipient>,
}
pub(crate) async fn execute(args: Cli) -> Result<(), Box<dyn Error + Send + Sync>> {
@@ -198,11 +196,6 @@ pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
BaseClientConfig::with_disabled_credentials,
args.enabled_credentials_mode.map(|b| !b),
)
.with_optional_base_env(
BaseClientConfig::with_enabled_stats_reporting_address,
args.stats_reporting_address,
nym_network_defaults::var_names::CLIENT_STATS_COLLECTION_PROVIDER,
)
}
async fn try_upgrade_v1_1_13_config(id: &str) -> Result<bool, Socks5ClientError> {
-1
View File
@@ -70,7 +70,6 @@ impl From<Run> for OverrideConfig {
nyxd_urls: run_config.common_args.nyxd_urls,
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
outfox: run_config.outfox,
stats_reporting_address: run_config.common_args.stats_reporting_address,
}
}
}
-5
View File
@@ -108,9 +108,4 @@ average_ack_delay = '{{ core.debug.acknowledgements.average_ack_delay }}'
[core.debug.cover_traffic]
loop_cover_traffic_average_delay = '{{ core.debug.cover_traffic.loop_cover_traffic_average_delay }}'
[core.debug.stats_reporting]
enabled = {{ core.debug.stats_reporting.enabled }}
provider_address = '{{ core.debug.stats_reporting.provider_address }}'
reporting_interval = '{{ core.debug.stats_reporting.reporting_interval }}'
"#;
-1
View File
@@ -17,7 +17,6 @@ thiserror = { workspace = true }
nym-credentials-interface = { path = "../credentials-interface" }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
nym-network-defaults = { path = "../network-defaults" }
nym-service-provider-requests-common = { path = "../service-provider-requests-common" }
nym-sphinx = { path = "../nymsphinx" }
nym-wireguard-types = { path = "../wireguard-types" }
+2 -3
View File
@@ -4,14 +4,13 @@
pub mod v1;
pub mod v2;
pub mod v3;
pub mod v4;
mod error;
pub use error::Error;
pub use v4 as latest;
pub use v3 as latest;
pub const CURRENT_VERSION: u8 = 4;
pub const CURRENT_VERSION: u8 = 3;
fn make_bincode_serializer() -> impl bincode::Options {
use bincode::Options;
@@ -9,7 +9,7 @@ impl From<v2::request::AuthenticatorRequest> for v3::request::AuthenticatorReque
fn from(authenticator_request: v2::request::AuthenticatorRequest) -> Self {
Self {
protocol: Protocol {
version: 3,
version: 2,
service_provider_type: ServiceProviderType::Authenticator,
},
data: authenticator_request.data.into(),
@@ -1,200 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use crate::{v3, v4};
impl From<v3::request::AuthenticatorRequest> for v4::request::AuthenticatorRequest {
fn from(authenticator_request: v3::request::AuthenticatorRequest) -> Self {
Self {
protocol: Protocol {
version: 4,
service_provider_type: ServiceProviderType::Authenticator,
},
data: authenticator_request.data.into(),
reply_to: authenticator_request.reply_to,
request_id: authenticator_request.request_id,
}
}
}
impl From<v3::request::AuthenticatorRequestData> for v4::request::AuthenticatorRequestData {
fn from(authenticator_request_data: v3::request::AuthenticatorRequestData) -> Self {
match authenticator_request_data {
v3::request::AuthenticatorRequestData::Initial(init_msg) => {
v4::request::AuthenticatorRequestData::Initial(init_msg.into())
}
v3::request::AuthenticatorRequestData::Final(gw_client) => {
v4::request::AuthenticatorRequestData::Final(gw_client.into())
}
v3::request::AuthenticatorRequestData::QueryBandwidth(pub_key) => {
v4::request::AuthenticatorRequestData::QueryBandwidth(pub_key)
}
v3::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
v4::request::AuthenticatorRequestData::TopUpBandwidth(top_up_message.into())
}
}
}
}
impl From<v3::registration::InitMessage> for v4::registration::InitMessage {
fn from(init_msg: v3::registration::InitMessage) -> Self {
Self {
pub_key: init_msg.pub_key,
}
}
}
impl From<Box<v3::registration::FinalMessage>> for Box<v4::registration::FinalMessage> {
fn from(gw_client: Box<v3::registration::FinalMessage>) -> Self {
Box::new(v4::registration::FinalMessage {
gateway_client: gw_client.gateway_client.into(),
credential: gw_client.credential,
})
}
}
impl From<Box<v3::topup::TopUpMessage>> for Box<v4::topup::TopUpMessage> {
fn from(top_up_message: Box<v3::topup::TopUpMessage>) -> Self {
Box::new(v4::topup::TopUpMessage {
pub_key: top_up_message.pub_key,
credential: top_up_message.credential,
})
}
}
impl From<v3::registration::GatewayClient> for v4::registration::GatewayClient {
fn from(gw_client: v3::registration::GatewayClient) -> Self {
Self {
pub_key: gw_client.pub_key,
private_ips: gw_client.private_ip.into(),
mac: gw_client.mac.into(),
}
}
}
impl From<v4::registration::GatewayClient> for v3::registration::GatewayClient {
fn from(gw_client: v4::registration::GatewayClient) -> Self {
Self {
pub_key: gw_client.pub_key,
private_ip: gw_client.private_ips.ipv4.into(),
mac: gw_client.mac.into(),
}
}
}
impl From<v3::registration::ClientMac> for v4::registration::ClientMac {
fn from(mac: v3::registration::ClientMac) -> Self {
Self::new(mac.to_vec())
}
}
impl From<v4::registration::ClientMac> for v3::registration::ClientMac {
fn from(mac: v4::registration::ClientMac) -> Self {
Self::new(mac.to_vec())
}
}
impl TryFrom<v4::response::AuthenticatorResponse> for v3::response::AuthenticatorResponse {
type Error = crate::Error;
fn try_from(
authenticator_response: v4::response::AuthenticatorResponse,
) -> Result<Self, Self::Error> {
Ok(Self {
data: authenticator_response.data.try_into()?,
reply_to: authenticator_response.reply_to,
protocol: authenticator_response.protocol,
})
}
}
impl TryFrom<v4::response::AuthenticatorResponseData> for v3::response::AuthenticatorResponseData {
type Error = crate::Error;
fn try_from(
authenticator_response_data: v4::response::AuthenticatorResponseData,
) -> Result<Self, Self::Error> {
match authenticator_response_data {
v4::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response,
) => Ok(
v3::response::AuthenticatorResponseData::PendingRegistration(
pending_registration_response.into(),
),
),
v4::response::AuthenticatorResponseData::Registered(registered_response) => Ok(
v3::response::AuthenticatorResponseData::Registered(registered_response.into()),
),
v4::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response,
) => Ok(v3::response::AuthenticatorResponseData::RemainingBandwidth(
remaining_bandwidth_response.into(),
)),
v4::response::AuthenticatorResponseData::TopUpBandwidth(_) => {
Err(Self::Error::Conversion(
"a v3 request couldn't produce a v4 only type of response".to_string(),
))
}
}
}
}
impl From<v4::response::PendingRegistrationResponse> for v3::response::PendingRegistrationResponse {
fn from(value: v4::response::PendingRegistrationResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.into(),
}
}
}
impl From<v4::response::RegisteredResponse> for v3::response::RegisteredResponse {
fn from(value: v4::response::RegisteredResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.into(),
}
}
}
impl From<v4::response::RemainingBandwidthResponse> for v3::response::RemainingBandwidthResponse {
fn from(value: v4::response::RemainingBandwidthResponse) -> Self {
Self {
request_id: value.request_id,
reply_to: value.reply_to,
reply: value.reply.map(Into::into),
}
}
}
impl From<v4::registration::RegistrationData> for v3::registration::RegistrationData {
fn from(value: v4::registration::RegistrationData) -> Self {
Self {
nonce: value.nonce,
gateway_data: value.gateway_data.into(),
wg_port: value.wg_port,
}
}
}
impl From<v4::registration::RegistredData> for v3::registration::RegistredData {
fn from(value: v4::registration::RegistredData) -> Self {
Self {
pub_key: value.pub_key,
private_ip: value.private_ips.ipv4.into(),
wg_port: value.wg_port,
}
}
}
impl From<v4::registration::RemainingBandwidthData> for v3::registration::RemainingBandwidthData {
fn from(value: v4::registration::RemainingBandwidthData) -> Self {
Self {
available_bandwidth: value.available_bandwidth,
}
}
}
@@ -1,10 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod conversion;
pub mod registration;
pub mod request;
pub mod response;
pub mod topup;
pub const VERSION: u8 = 4;
@@ -1,281 +0,0 @@
// -2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use base64::{engine::general_purpose, Engine};
use nym_credentials_interface::CredentialSpendingData;
use nym_network_defaults::constants::{WG_TUN_DEVICE_IP_ADDRESS_V4, WG_TUN_DEVICE_IP_ADDRESS_V6};
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::time::SystemTime;
use std::{fmt, ops::Deref, str::FromStr};
#[cfg(feature = "verify")]
use hmac::{Hmac, Mac};
#[cfg(feature = "verify")]
use nym_crypto::asymmetric::encryption::PrivateKey;
#[cfg(feature = "verify")]
use sha2::Sha256;
pub type PendingRegistrations = HashMap<PeerPublicKey, RegistrationData>;
pub type PrivateIPs = HashMap<IpPair, Taken>;
#[cfg(feature = "verify")]
pub type HmacSha256 = Hmac<Sha256>;
pub type Nonce = u64;
pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: u64 = 1024 * 1024 * 1024; // 1 GB
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct IpPair {
pub ipv4: Ipv4Addr,
pub ipv6: Ipv6Addr,
}
impl IpPair {
pub fn new(ipv4: Ipv4Addr, ipv6: Ipv6Addr) -> Self {
IpPair { ipv4, ipv6 }
}
}
impl fmt::Display for IpPair {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "({}, {})", self.ipv4, self.ipv6)
}
}
impl From<IpAddr> for IpPair {
fn from(value: IpAddr) -> Self {
let (before_last_byte, last_byte) = match value {
std::net::IpAddr::V4(ipv4_addr) => (ipv4_addr.octets()[2], ipv4_addr.octets()[3]),
std::net::IpAddr::V6(ipv6_addr) => (ipv6_addr.octets()[14], ipv6_addr.octets()[15]),
};
let last_bytes = (before_last_byte as u16) << 8 | last_byte as u16;
let ipv4 = Ipv4Addr::new(
WG_TUN_DEVICE_IP_ADDRESS_V4.octets()[0],
WG_TUN_DEVICE_IP_ADDRESS_V4.octets()[1],
before_last_byte,
last_byte,
);
let ipv6 = Ipv6Addr::new(
WG_TUN_DEVICE_IP_ADDRESS_V6.segments()[0],
WG_TUN_DEVICE_IP_ADDRESS_V6.segments()[1],
WG_TUN_DEVICE_IP_ADDRESS_V6.segments()[2],
WG_TUN_DEVICE_IP_ADDRESS_V6.segments()[3],
WG_TUN_DEVICE_IP_ADDRESS_V6.segments()[4],
WG_TUN_DEVICE_IP_ADDRESS_V6.segments()[5],
WG_TUN_DEVICE_IP_ADDRESS_V6.segments()[6],
last_bytes,
);
IpPair::new(ipv4, ipv6)
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct InitMessage {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
}
impl InitMessage {
pub fn new(pub_key: PeerPublicKey) -> Self {
InitMessage { pub_key }
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FinalMessage {
/// Gateway client data
pub gateway_client: GatewayClient,
/// Ecash credential
pub credential: Option<CredentialSpendingData>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistrationData {
pub nonce: u64,
pub gateway_data: GatewayClient,
pub wg_port: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RegistredData {
pub pub_key: PeerPublicKey,
pub private_ips: IpPair,
pub wg_port: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RemainingBandwidthData {
pub available_bandwidth: i64,
}
/// Client that wants to register sends its PublicKey bytes mac digest encrypted with a DH shared secret.
/// Gateway/Nym node can then verify pub_key payload using the same process
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GatewayClient {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
/// Assigned private IPs (v4 and v6)
pub private_ips: IpPair,
/// Sha256 hmac on the data (alongside the prior nonce)
pub mac: ClientMac,
}
impl GatewayClient {
#[cfg(feature = "verify")]
pub fn new(
local_secret: &PrivateKey,
remote_public: x25519_dalek::PublicKey,
private_ips: IpPair,
nonce: u64,
) -> Self {
// convert from 1.0 x25519-dalek private key into 2.0 x25519-dalek
#[allow(clippy::expect_used)]
let static_secret = x25519_dalek::StaticSecret::from(local_secret.to_bytes());
let local_public: x25519_dalek::PublicKey = (&static_secret).into();
let dh = static_secret.diffie_hellman(&remote_public);
// TODO: change that to use our nym_crypto::hmac module instead
#[allow(clippy::expect_used)]
let mut mac = HmacSha256::new_from_slice(dh.as_bytes())
.expect("x25519 shared secret is always 32 bytes long");
mac.update(local_public.as_bytes());
mac.update(private_ips.to_string().as_bytes());
mac.update(&nonce.to_le_bytes());
GatewayClient {
pub_key: PeerPublicKey::new(local_public),
private_ips,
mac: ClientMac(mac.finalize().into_bytes().to_vec()),
}
}
// Reusable secret should be gateways Wireguard PK
// Client should perform this step when generating its payload, using its own WG PK
#[cfg(feature = "verify")]
pub fn verify(&self, gateway_key: &PrivateKey, nonce: u64) -> Result<(), Error> {
// convert from 1.0 x25519-dalek private key into 2.0 x25519-dalek
#[allow(clippy::expect_used)]
let static_secret = x25519_dalek::StaticSecret::from(gateway_key.to_bytes());
let dh = static_secret.diffie_hellman(&self.pub_key);
// TODO: change that to use our nym_crypto::hmac module instead
#[allow(clippy::expect_used)]
let mut mac = HmacSha256::new_from_slice(dh.as_bytes())
.expect("x25519 shared secret is always 32 bytes long");
mac.update(self.pub_key.as_bytes());
mac.update(self.private_ips.to_string().as_bytes());
mac.update(&nonce.to_le_bytes());
mac.verify_slice(&self.mac)
.map_err(|source| Error::FailedClientMacVerification {
client: self.pub_key.to_string(),
source,
})
}
pub fn pub_key(&self) -> PeerPublicKey {
self.pub_key
}
}
// TODO: change the inner type into generic array of size HmacSha256::OutputSize
// TODO2: rely on our internal crypto/hmac
#[derive(Debug, Clone)]
pub struct ClientMac(Vec<u8>);
impl fmt::Display for ClientMac {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", general_purpose::STANDARD.encode(&self.0))
}
}
impl ClientMac {
#[allow(dead_code)]
pub fn new(mac: Vec<u8>) -> Self {
ClientMac(mac)
}
}
impl Deref for ClientMac {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl FromStr for ClientMac {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mac_bytes: Vec<u8> =
general_purpose::STANDARD
.decode(s)
.map_err(|source| Error::MalformedClientMac {
mac: s.to_string(),
source,
})?;
Ok(ClientMac(mac_bytes))
}
}
impl Serialize for ClientMac {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let encoded_key = general_purpose::STANDARD.encode(self.0.clone());
serializer.serialize_str(&encoded_key)
}
}
impl<'de> Deserialize<'de> for ClientMac {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let encoded_key = String::deserialize(deserializer)?;
ClientMac::from_str(&encoded_key).map_err(serde::de::Error::custom)
}
}
#[cfg(test)]
mod tests {
use super::*;
use nym_crypto::asymmetric::encryption;
#[test]
fn create_ip_pair() {
let ipv4: IpAddr = Ipv4Addr::from_str("10.1.10.50").unwrap().into();
let ipv6: IpAddr = Ipv6Addr::from_str("fc01::0a32").unwrap().into();
assert_eq!(IpPair::from(ipv4), IpPair::from(ipv6));
}
#[test]
#[cfg(feature = "verify")]
fn client_request_roundtrip() {
let mut rng = rand::thread_rng();
let gateway_key_pair = encryption::KeyPair::new(&mut rng);
let client_key_pair = encryption::KeyPair::new(&mut rng);
let nonce = 1234567890;
let client = GatewayClient::new(
client_key_pair.private_key(),
x25519_dalek::PublicKey::from(gateway_key_pair.public_key().to_bytes()),
IpPair::new("10.0.0.42".parse().unwrap(), "fc00::42".parse().unwrap()),
nonce,
);
assert!(client.verify(gateway_key_pair.private_key(), nonce).is_ok())
}
}
@@ -1,136 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::{
registration::{FinalMessage, InitMessage},
topup::TopUpMessage,
};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
use super::VERSION;
fn generate_random() -> u64 {
use rand::RngCore;
let mut rng = rand::rngs::OsRng;
rng.next_u64()
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AuthenticatorRequest {
pub protocol: Protocol,
pub data: AuthenticatorRequestData,
pub reply_to: Recipient,
pub request_id: u64,
}
impl AuthenticatorRequest {
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
pub fn new_initial_request(init_message: InitMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::Initial(init_message),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_final_request(final_message: FinalMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::Final(Box::new(final_message)),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_query_request(peer_public_key: PeerPublicKey, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::QueryBandwidth(peer_public_key),
reply_to,
request_id,
},
request_id,
)
}
pub fn new_topup_request(top_up_message: TopUpMessage, reply_to: Recipient) -> (Self, u64) {
let request_id = generate_random();
(
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorRequestData::TopUpBandwidth(Box::new(top_up_message)),
reply_to,
request_id,
},
request_id,
)
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AuthenticatorRequestData {
Initial(InitMessage),
Final(Box<FinalMessage>),
QueryBandwidth(PeerPublicKey),
TopUpBandwidth(Box<TopUpMessage>),
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn check_first_bytes_protocol() {
let version = 4;
let data = AuthenticatorRequest {
protocol: Protocol { version, service_provider_type: ServiceProviderType::Authenticator },
data: AuthenticatorRequestData::Initial(InitMessage::new(
PeerPublicKey::from_str("yvNUDpT5l7W/xDhiu6HkqTHDQwbs/B3J5UrLmORl1EQ=").unwrap(),
)),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
request_id: 1,
};
let bytes = *data.to_bytes().unwrap().first_chunk::<2>().unwrap();
assert_eq!(bytes, [version, ServiceProviderType::Authenticator as u8]);
}
}
@@ -1,157 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::registration::{RegistrationData, RegistredData, RemainingBandwidthData};
use nym_service_provider_requests_common::{Protocol, ServiceProviderType};
use nym_sphinx::addressing::Recipient;
use serde::{Deserialize, Serialize};
use crate::make_bincode_serializer;
use super::VERSION;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AuthenticatorResponse {
pub protocol: Protocol,
pub data: AuthenticatorResponseData,
pub reply_to: Recipient,
}
impl AuthenticatorResponse {
pub fn new_pending_registration_success(
registration_data: RegistrationData,
request_id: u64,
reply_to: Recipient,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::PendingRegistration(PendingRegistrationResponse {
reply: registration_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_registered(
registred_data: RegistredData,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::Registered(RegisteredResponse {
reply: registred_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_remaining_bandwidth(
remaining_bandwidth_data: Option<RemainingBandwidthData>,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::RemainingBandwidth(RemainingBandwidthResponse {
reply: remaining_bandwidth_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn new_topup_bandwidth(
remaining_bandwidth_data: RemainingBandwidthData,
reply_to: Recipient,
request_id: u64,
) -> Self {
Self {
protocol: Protocol {
service_provider_type: ServiceProviderType::Authenticator,
version: VERSION,
},
data: AuthenticatorResponseData::TopUpBandwidth(TopUpBandwidthResponse {
reply: remaining_bandwidth_data,
reply_to,
request_id,
}),
reply_to,
}
}
pub fn recipient(&self) -> Recipient {
self.reply_to
}
pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error> {
use bincode::Options;
make_bincode_serializer().serialize(self)
}
pub fn from_reconstructed_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
pub fn id(&self) -> Option<u64> {
match &self.data {
AuthenticatorResponseData::PendingRegistration(response) => Some(response.request_id),
AuthenticatorResponseData::Registered(response) => Some(response.request_id),
AuthenticatorResponseData::RemainingBandwidth(response) => Some(response.request_id),
AuthenticatorResponseData::TopUpBandwidth(response) => Some(response.request_id),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AuthenticatorResponseData {
PendingRegistration(PendingRegistrationResponse),
Registered(RegisteredResponse),
RemainingBandwidth(RemainingBandwidthResponse),
TopUpBandwidth(TopUpBandwidthResponse),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PendingRegistrationResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RegistrationData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RegisteredResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RegistredData,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RemainingBandwidthResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: Option<RemainingBandwidthData>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TopUpBandwidthResponse {
pub request_id: u64,
pub reply_to: Recipient,
pub reply: RemainingBandwidthData,
}
@@ -1,15 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_credentials_interface::CredentialSpendingData;
use nym_wireguard_types::PeerPublicKey;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TopUpMessage {
/// Base64 encoded x25519 public key
pub pub_key: PeerPublicKey,
/// Ecash credential
pub credential: CredentialSpendingData,
}
+2 -3
View File
@@ -14,15 +14,14 @@ thiserror = { workspace = true }
url = { workspace = true }
zeroize = { workspace = true }
nym-ecash-time = { path = "../ecash-time" }
nym-credential-storage = { path = "../credential-storage" }
nym-credentials = { path = "../credentials" }
nym-credentials-interface = { path = "../credentials-interface" }
nym-crypto = { path = "../crypto", features = ["rand", "asymmetric", "stream_cipher", "aes", "hashing"] }
nym-ecash-contract-common = { path = "../cosmwasm-smart-contracts/ecash-contract" }
nym-ecash-time = { path = "../ecash-time" }
nym-network-defaults = { path = "../network-defaults" }
nym-task = { path = "../task" }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
nym-ecash-contract-common = { path = "../cosmwasm-smart-contracts/ecash-contract" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.nym-validator-client]
path = "../client-libs/validator-client"
+6 -18
View File
@@ -1,25 +1,13 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#[derive(Debug)]
// See other comments for other TaskStatus message enumds about abusing the Error trait when we
// should have a new trait for TaskStatus messages
#[derive(Debug, thiserror::Error)]
pub enum BandwidthStatusMessage {
#[error("remaining bandwidth: {0}")]
RemainingBandwidth(i64),
#[error("no bandwidth left")]
NoBandwidth,
}
impl std::fmt::Display for BandwidthStatusMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BandwidthStatusMessage::RemainingBandwidth(b) => {
write!(f, "remaining bandwidth: {}", b)
}
BandwidthStatusMessage::NoBandwidth => write!(f, "no bandwidth left"),
}
}
}
impl nym_task::TaskStatusEvent for BandwidthStatusMessage {
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
-1
View File
@@ -43,7 +43,6 @@ nym-gateway-requests = { path = "../gateway-requests" }
nym-metrics = { path = "../nym-metrics" }
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
nym-pemstore = { path = "../pemstore" }
nym-topology = { path = "../topology", features = ["serializable"] }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
+1 -2
View File
@@ -9,7 +9,6 @@ license.workspace = true
[dependencies]
humantime-serde = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_with = { workspace = true, features = ["macros"] }
thiserror.workspace = true
url = { workspace = true, features = ["serde"] }
@@ -24,4 +23,4 @@ nym-sphinx-addressing = { path = "../../nymsphinx/addressing" }
[features]
disk-persistence = ["nym-pemstore"]
disk-persistence = ["nym-pemstore"]
@@ -5,7 +5,6 @@ use nym_config::defaults::NymNetworkDetails;
use nym_sphinx_addressing::Recipient;
use nym_sphinx_params::{PacketSize, PacketType};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::time::Duration;
use url::Url;
@@ -62,11 +61,6 @@ const DEFAULT_MAXIMUM_REPLY_SURB_AGE: Duration = Duration::from_secs(12 * 60 * 6
// 24 hours
const DEFAULT_MAXIMUM_REPLY_KEY_AGE: Duration = Duration::from_secs(24 * 60 * 60);
// stats reporting related
/// Time interval between reporting statistics to the given provider if it exist
const STATS_REPORT_INTERVAL_SECS: Duration = Duration::from_secs(300);
use crate::error::InvalidTrafficModeFailure;
pub use nym_country_group::CountryGroup;
@@ -139,12 +133,6 @@ impl Config {
self
}
pub fn with_enabled_stats_reporting_address(mut self, address: Recipient) -> Self {
self.debug.stats_reporting.provider_address = Some(address);
self.debug.stats_reporting.enabled = true; //since we are overriding the address, we assume the reporting should be enabled
self
}
// TODO: this should be refactored properly
// as of 12.09.23 the below is true (not sure how this comment will rot in the future)
// medium_toggle:
@@ -643,32 +631,6 @@ impl Default for ReplySurbs {
}
}
#[serde_as]
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct StatsReporting {
/// Is stats reporting enabled
pub enabled: bool,
/// Address of the stats collector. If this is none, no reporting will happen, regardless of `enabled`
#[serde_as(as = "Option<DisplayFromStr>")]
pub provider_address: Option<Recipient>,
/// With what frequence will statistics be sent
#[serde(with = "humantime_serde")]
pub reporting_interval: Duration,
}
impl Default for StatsReporting {
fn default() -> Self {
StatsReporting {
enabled: true,
provider_address: None,
reporting_interval: STATS_REPORT_INTERVAL_SECS,
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DebugConfig {
@@ -689,9 +651,6 @@ pub struct DebugConfig {
/// Defines all configuration options related to reply SURBs.
pub reply_surbs: ReplySurbs,
/// Defines all configuration options related to stats reporting.
pub stats_reporting: StatsReporting,
}
impl DebugConfig {
@@ -713,7 +672,6 @@ impl Default for DebugConfig {
acknowledgements: Default::default(),
topology: Default::default(),
reply_surbs: Default::default(),
stats_reporting: Default::default(),
}
}
}
@@ -181,7 +181,6 @@ impl From<ConfigV5> for Config {
maximum_reply_key_age: value.debug.reply_surbs.maximum_reply_key_age,
surb_mix_hops: value.debug.reply_surbs.surb_mix_hops,
},
stats_reporting: Default::default(),
},
}
}
@@ -15,7 +15,6 @@ use crate::{
use log::info;
use nym_client_core_gateways_storage::GatewayDetails;
use nym_crypto::asymmetric::identity;
use nym_sphinx::addressing::Recipient;
use nym_topology::NymTopology;
use nym_validator_client::UserAgent;
use rand::rngs::OsRng;
@@ -89,10 +88,6 @@ pub struct CommonClientInitArgs {
/// Disable loop cover traffic and the Poisson rate limiter (for debugging only)
#[cfg_attr(feature = "cli", clap(long, hide = true))]
pub no_cover: bool,
/// Sets the address to report statistics
#[cfg_attr(feature = "cli", clap(long, hide = true))]
pub stats_reporting_address: Option<Recipient>,
}
pub struct InitResultsWithConfig<T> {
@@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0
use nym_crypto::asymmetric::identity;
use nym_sphinx::addressing::Recipient;
use std::path::PathBuf;
#[cfg_attr(feature = "cli", derive(clap::Args))]
@@ -57,8 +56,4 @@ pub struct CommonClientRunArgs {
// has defined the conflict on that field itself
#[cfg_attr(feature = "cli", clap(long, hide = true))]
pub no_cover: bool,
/// Sets the address to report statistics
#[cfg_attr(feature = "cli", clap(long, hide = true))]
pub stats_reporting_address: Option<Recipient>,
}
@@ -1,8 +1,8 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::packet_statistics_control::PacketStatisticsReporter;
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
@@ -12,6 +12,7 @@ use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use crate::client::packet_statistics_control::PacketStatisticsControl;
use crate::client::real_messages_control;
use crate::client::real_messages_control::RealMessagesController;
use crate::client::received_buffer::{
@@ -48,19 +49,16 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_task::{TaskClient, TaskHandle};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use rand::rngs::OsRng;
use sha2::Digest;
use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use url::Url;
#[cfg(all(
@@ -275,7 +273,7 @@ where
self_address: Recipient,
topology_accessor: TopologyAccessor,
mix_tx: BatchMixMessageSender,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
shutdown: TaskClient,
) {
info!("Starting loop cover traffic stream...");
@@ -308,7 +306,7 @@ where
client_connection_rx: ConnectionCommandReceiver,
shutdown: TaskClient,
packet_type: PacketType,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
) {
info!("Starting real traffic stream...");
@@ -337,7 +335,7 @@ where
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
shutdown: TaskClient,
metrics_reporter: ClientStatsSender,
packet_statistics_control: PacketStatisticsReporter,
) {
info!("Starting received messages buffer controller...");
let controller: ReceivedMessagesBufferController<SphinxMessageReceiver> =
@@ -347,7 +345,7 @@ where
mixnet_receiver,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
packet_statistics_control,
);
controller.start_with_shutdown(shutdown)
}
@@ -588,23 +586,11 @@ where
Ok(())
}
fn start_statistics_control(
config: &Config,
user_agent: Option<UserAgent>,
client_stats_id: String,
input_sender: Sender<InputMessage>,
shutdown: TaskClient,
) -> ClientStatsSender {
info!("Starting statistics control...");
StatisticsControl::create_and_start_with_shutdown(
config.debug.stats_reporting,
user_agent
.map(|u| u.application)
.unwrap_or("unknown".to_string()),
client_stats_id,
input_sender.clone(),
shutdown.with_suffix("controller"),
)
fn start_packet_statistics_control(shutdown: TaskClient) -> PacketStatisticsReporter {
info!("Starting packet statistics control...");
let (packet_statistics_control, packet_stats_reporter) = PacketStatisticsControl::new();
packet_statistics_control.start_with_shutdown(shutdown);
packet_stats_reporter
}
fn start_mix_traffic_controller(
@@ -734,19 +720,6 @@ where
self.user_agent.clone(),
);
//make sure we don't accidentally get the same id as gateways are reporting
let client_stats_id = format!(
"stats_id_{:x}",
sha2::Sha256::digest(self_address.identity().to_bytes())
);
let stats_reporter = Self::start_statistics_control(
self.config,
self.user_agent.clone(),
client_stats_id,
input_sender.clone(),
shutdown.fork("statistics_control"),
);
// needs to be started as the first thing to block if required waiting for the gateway
Self::start_topology_refresher(
topology_provider,
@@ -758,6 +731,9 @@ where
)
.await?;
let packet_stats_reporter =
Self::start_packet_statistics_control(shutdown.fork("packet_statistics_control"));
let gateway_packet_router = PacketRouter::new(
ack_sender,
mixnet_messages_sender,
@@ -789,7 +765,7 @@ where
reply_storage.key_storage(),
reply_controller_sender.clone(),
shutdown.fork("received_messages_buffer"),
stats_reporter.clone(),
packet_stats_reporter.clone(),
);
// The message_sender is the transmitter for any component generating sphinx packets
@@ -828,7 +804,7 @@ where
client_connection_rx,
shutdown.fork("real_traffic_controller"),
self.config.debug.traffic.packet_type,
stats_reporter.clone(),
packet_stats_reporter.clone(),
);
if !self
@@ -843,7 +819,7 @@ where
self_address,
shared_topology_accessor.clone(),
message_sender,
stats_reporter.clone(),
packet_stats_reporter,
shutdown.fork("cover_traffic_stream"),
);
}
@@ -871,7 +847,6 @@ where
topology_accessor: shared_topology_accessor,
gateway_connection: GatewayConnection { gateway_ws_fd },
},
stats_reporter,
task_handle: shutdown,
})
}
@@ -883,7 +858,6 @@ pub struct BaseClient {
pub client_input: ClientInputStatus,
pub client_output: ClientOutputStatus,
pub client_state: ClientState,
pub stats_reporter: ClientStatsSender,
pub task_handle: TaskHandle,
}
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter};
use crate::client::topology_control::TopologyAccessor;
use crate::{config, spawn_future};
use futures::task::{Context, Poll};
@@ -12,7 +13,6 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -63,7 +63,7 @@ where
packet_type: PacketType,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
}
impl<R> Stream for LoopCoverTrafficStream<R>
@@ -109,7 +109,7 @@ impl LoopCoverTrafficStream<OsRng> {
topology_access: TopologyAccessor,
traffic_config: config::Traffic,
cover_config: config::CoverTraffic,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
) -> Self {
let rng = OsRng;
@@ -198,9 +198,9 @@ impl LoopCoverTrafficStream<OsRng> {
}
}
} else {
self.stats_tx.report(
PacketStatisticsEvent::CoverPacketSent(cover_traffic_packet_size.size()).into(),
);
self.stats_tx.report(PacketStatisticsEvent::CoverPacketSent(
cover_traffic_packet_size.size(),
));
}
// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
+1 -1
View File
@@ -7,9 +7,9 @@ pub(crate) mod helpers;
pub mod inbound_messages;
pub mod key_manager;
pub mod mix_traffic;
pub(crate) mod packet_statistics_control;
pub mod real_messages_control;
pub mod received_buffer;
pub mod replies;
pub mod statistics_control;
pub mod topology_control;
pub(crate) mod transmission_buffer;
@@ -1,25 +1,48 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::ClientStatsEvents;
use core::fmt;
use std::{
collections::VecDeque,
time::{Duration, Instant},
};
use nym_metrics::{inc, inc_by};
use serde::{Deserialize, Serialize};
use si_scale::helpers::bibytes2;
// Metrics server
use futures::future::{FusedFuture, OptionFuture};
use futures::FutureExt;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use http_body_util::Full;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use hyper::body::Bytes;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use hyper::server::conn::http1;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use hyper::service::service_fn;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use hyper::{Request, Response};
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use hyper_util::rt::TokioIo;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use std::convert::Infallible;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
#[cfg(feature = "metrics-server")]
use std::net::SocketAddr;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
use tokio::net::TcpListener;
use crate::spawn_future;
// Time interval between reporting packet statistics
const PACKET_REPORT_INTERVAL_SECS: u64 = 2;
// Interval for taking snapshots of the packet statistics
const SNAPSHOT_INTERVAL_MS: u64 = 500;
// When computing rates, we include snapshots that are up to this old. We set it to some odd number
// a tad larger than an integer number of snapshot intervals, so that we don't have to worry about
// threshold effects.
// Also, set it larger than the packet report interval so that we don't miss notable singular events
const RECORDING_WINDOW_MS: u64 = 2300;
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub(crate) struct PacketStatistics {
#[derive(Default, Debug, Clone)]
struct PacketStatistics {
// Sent
real_packets_sent: u64,
real_packets_sent_size: usize,
@@ -49,7 +72,7 @@ pub(crate) struct PacketStatistics {
}
impl PacketStatistics {
fn handle(&mut self, event: PacketStatisticsEvent) {
fn handle_event(&mut self, event: PacketStatisticsEvent) {
match event {
PacketStatisticsEvent::RealPacketSent(packet_size) => {
self.real_packets_sent += 1;
@@ -166,64 +189,29 @@ impl std::ops::Sub for PacketStatistics {
}
}
pub struct MixnetBandwidthStatisticsEvent {
pub rates: PacketRates,
}
impl MixnetBandwidthStatisticsEvent {
pub fn new(rates: PacketRates) -> Self {
Self { rates }
}
}
impl nym_task::TaskStatusEvent for MixnetBandwidthStatisticsEvent {
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl fmt::Display for MixnetBandwidthStatisticsEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.rates.summary())
}
}
#[derive(Debug, Clone)]
pub struct PacketRates {
pub real_packets_sent: f64,
pub real_packets_sent_size: f64,
pub cover_packets_sent: f64,
pub cover_packets_sent_size: f64,
struct PacketRates {
real_packets_sent: f64,
real_packets_sent_size: f64,
cover_packets_sent: f64,
cover_packets_sent_size: f64,
pub real_packets_received: f64,
pub real_packets_received_size: f64,
pub cover_packets_received: f64,
pub cover_packets_received_size: f64,
real_packets_received: f64,
real_packets_received_size: f64,
cover_packets_received: f64,
cover_packets_received_size: f64,
pub total_acks_received: f64,
pub total_acks_received_size: f64,
pub real_acks_received: f64,
pub real_acks_received_size: f64,
pub cover_acks_received: f64,
pub cover_acks_received_size: f64,
total_acks_received: f64,
total_acks_received_size: f64,
real_acks_received: f64,
real_acks_received_size: f64,
cover_acks_received: f64,
cover_acks_received_size: f64,
pub real_packets_queued: f64,
pub retransmissions_queued: f64,
pub reply_surbs_queued: f64,
pub additional_reply_surbs_queued: f64,
}
impl fmt::Display for PacketRates {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"down: {}/s, up: {}/s (cover down: {}/s, cover up: {}/s)",
bibytes2(self.real_packets_received_size),
bibytes2(self.real_packets_sent_size),
bibytes2(self.cover_packets_received_size),
bibytes2(self.cover_packets_sent_size),
)
}
real_packets_queued: f64,
retransmissions_queued: f64,
reply_surbs_queued: f64,
additional_reply_surbs_queued: f64,
}
impl From<PacketStatistics> for PacketRates {
@@ -342,46 +330,56 @@ impl PacketRates {
}
}
/// Event Space used for counting the Packet types used in a connection.
#[derive(Debug)]
pub enum PacketStatisticsEvent {
/// The real packets sent. Recall that acks are sent by the gateway, so it's not included here.
pub(crate) enum PacketStatisticsEvent {
// The real packets sent. Recall that acks are sent by the gateway, so it's not included here.
RealPacketSent(usize),
/// The cover packets sent
// The cover packets sent
CoverPacketSent(usize),
/// Real packets received
// Real packets received
RealPacketReceived(usize),
/// Cover packets received
// Cover packets received
CoverPacketReceived(usize),
/// Ack of any type received. This is mostly used as a consistency check, and should be the sum
/// of real and cover acks received.
// Ack of any type received. This is mostly used as a consistency check, and should be the sum
// of real and cover acks received.
AckReceived(usize),
/// Out of the total acks received, this is the subset of those that were real
// Out of the total acks received, this is the subset of those that were real
RealAckReceived(usize),
/// Out of the total acks received, this is the subset of those that were for cover traffic
// Out of the total acks received, this is the subset of those that were for cover traffic
CoverAckReceived(usize),
/// Types of packets queued
// Types of packets queued
RealPacketQueued,
/// Types of packets queued
RetransmissionQueued,
/// Types of packets queued
ReplySurbRequestQueued,
/// Types of packets queued
AdditionalReplySurbRequestQueued,
}
impl From<PacketStatisticsEvent> for ClientStatsEvents {
fn from(event: PacketStatisticsEvent) -> ClientStatsEvents {
ClientStatsEvents::PacketStatistics(event)
type PacketStatisticsReceiver = tokio::sync::mpsc::UnboundedReceiver<PacketStatisticsEvent>;
#[derive(Clone)]
pub(crate) struct PacketStatisticsReporter {
stats_tx: tokio::sync::mpsc::UnboundedSender<PacketStatisticsEvent>,
}
impl PacketStatisticsReporter {
pub(crate) fn new(stats_tx: tokio::sync::mpsc::UnboundedSender<PacketStatisticsEvent>) -> Self {
Self { stats_tx }
}
pub(crate) fn report(&self, event: PacketStatisticsEvent) {
self.stats_tx.send(event).unwrap_or_else(|err| {
log::error!("Failed to report packet stat: {:?}", err);
});
}
}
/// Statistics tracking for Packet based I/O
#[derive(Default)]
pub struct PacketStatisticsControl {
pub(crate) struct PacketStatisticsControl {
// Incoming packet stats events from other tasks
stats_rx: PacketStatisticsReceiver,
// Keep track of packet statistics over time
stats: PacketStatistics,
@@ -394,28 +392,18 @@ pub struct PacketStatisticsControl {
}
impl PacketStatisticsControl {
pub(crate) fn handle_event(&mut self, event: PacketStatisticsEvent) {
self.stats.handle(event)
}
pub(crate) fn new() -> (Self, PacketStatisticsReporter) {
let (stats_tx, stats_rx) = tokio::sync::mpsc::unbounded_channel();
pub(crate) fn snapshot(&mut self) {
self.update_history();
self.update_rates();
}
pub(crate) fn report(&self) -> PacketStatistics {
self.stats.clone()
}
pub(crate) fn local_report(&mut self, task_client: &mut nym_task::TaskClient) {
let rates = self.report_rates();
self.check_for_notable_events();
self.report_counters();
// Report our current bandwidth used to e.g a GUI client
if let Some(rates) = rates {
task_client.send_status_msg(Box::new(MixnetBandwidthStatisticsEvent::new(rates)));
}
(
Self {
stats_rx,
stats: PacketStatistics::default(),
history: VecDeque::new(),
rates: VecDeque::new(),
},
PacketStatisticsReporter::new(stats_tx),
)
}
// Add the current stats to the history, and remove old ones.
@@ -468,13 +456,11 @@ impl PacketStatisticsControl {
}
}
fn report_rates(&self) -> Option<PacketRates> {
fn report_rates(&self) {
if let Some((_, rates)) = self.rates.back() {
log::debug!("{}", rates.summary());
log::debug!("{}", rates.detailed_summary());
return Some(rates.clone());
}
None
}
fn report_counters(&self) {
@@ -512,4 +498,124 @@ impl PacketStatisticsControl {
// IDEA: if there is a burst of acks, that could indicate tokio task starvation.
}
pub(crate) async fn run_with_shutdown(&mut self, mut shutdown: nym_task::TaskClient) {
log::debug!("Started PacketStatisticsControl with graceful shutdown support");
let report_interval = Duration::from_secs(PACKET_REPORT_INTERVAL_SECS);
let mut report_interval = tokio::time::interval(report_interval);
let snapshot_interval = Duration::from_millis(SNAPSHOT_INTERVAL_MS);
let mut snapshot_interval = tokio::time::interval(snapshot_interval);
cfg_if::cfg_if! {
if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
log::warn!("Metrics server is not supported on wasm32-unknown-unknown");
let listener: Option<WasmEmpty> = None;
} else if #[cfg(feature = "metrics-server")] {
let mut metrics_port = 18000;
let listener: Option<TcpListener>;
loop {
let addr = SocketAddr::from(([0, 0, 0, 0], metrics_port));
match TcpListener::bind(addr).await {
Ok(l) => {
log::info!("###############################");
log::info!("Metrics endpoint is at: {:?}", l.local_addr());
log::info!("###############################");
listener = Some(l);
break;
},
Err(err) => {
log::warn!("Failed to bind metrics server: {:?}", err);
metrics_port += 1;
}
};
}
} else {
log::info!("Metrics server is disabled!");
let listener: Option<TcpListener> = None;
}
}
loop {
// it seems at some point tokio changed its select precondition evaluation,
// and it's no longer checked before the future is evaluated.
let accept_future: OptionFuture<_> = listener
.as_ref()
.map(|l| l.accept())
.map(FutureExt::fuse)
.into();
tokio::select! {
stats_event = self.stats_rx.recv() => match stats_event {
Some(stats_event) => {
log::trace!("PacketStatisticsControl: Received stats event");
self.stats.handle_event(stats_event);
},
None => {
log::trace!("PacketStatisticsControl: stopping since stats channel was closed");
break;
}
},
// conditional will disable the branch if we're in wasm32-unknown-unknown
// use `_` to calm down clippy when running for wasm
_result = accept_future, if !accept_future.is_terminated() => {
cfg_if::cfg_if! {
if #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] {
if let Some(Ok((stream, _))) = _result {
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(io, service_fn(serve_metrics))
.await
{
log::warn!("Error serving connection: {:?}", err);
}
});
} else {
log::warn!("Error accepting connection");
}
}
}
}
_ = snapshot_interval.tick() => {
self.update_history();
self.update_rates();
}
_ = report_interval.tick() => {
self.report_rates();
self.check_for_notable_events();
self.report_counters();
}
_ = shutdown.recv_with_delay() => {
log::trace!("PacketStatisticsControl: Received shutdown");
break;
},
}
}
log::debug!("PacketStatisticsControl: Exiting");
}
pub(crate) fn start_with_shutdown(mut self, task_client: nym_task::TaskClient) {
spawn_future(async move {
self.run_with_shutdown(task_client).await;
})
}
}
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
async fn serve_metrics(
_: Request<hyper::body::Incoming>,
) -> Result<Response<Full<Bytes>>, Infallible> {
use nym_metrics::metrics;
Ok(Response::new(Full::new(Bytes::from(metrics!()))))
}
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
struct WasmEmpty;
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
impl WasmEmpty {
async fn accept(&self) {}
}
@@ -1,9 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::action_controller::{AckActionSender, Action};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter};
use super::action_controller::{AckActionSender, Action};
use futures::StreamExt;
use log::*;
use nym_gateway_client::AcknowledgementReceiver;
@@ -19,7 +19,7 @@ pub(super) struct AcknowledgementListener {
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
}
impl AcknowledgementListener {
@@ -27,7 +27,7 @@ impl AcknowledgementListener {
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
) -> Self {
AcknowledgementListener {
ack_key,
@@ -40,7 +40,7 @@ impl AcknowledgementListener {
async fn on_ack(&mut self, ack_content: Vec<u8>) {
trace!("Received an ack");
self.stats_tx
.report(PacketStatisticsEvent::AckReceived(ack_content.len()).into());
.report(PacketStatisticsEvent::AckReceived(ack_content.len()));
let frag_id = match recover_identifier(&self.ack_key, &ack_content)
.map(FragmentIdentifier::try_from_bytes)
@@ -57,13 +57,13 @@ impl AcknowledgementListener {
if frag_id == COVER_FRAG_ID {
trace!("Received an ack for a cover message - no need to do anything");
self.stats_tx
.report(PacketStatisticsEvent::CoverAckReceived(ack_content.len()).into());
.report(PacketStatisticsEvent::CoverAckReceived(ack_content.len()));
return;
}
trace!("Received {} from the mix network", frag_id);
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()));
self.action_sender
.unbounded_send(Action::new_remove(frag_id))
.unwrap();
@@ -8,6 +8,7 @@ use self::{
sent_notification_listener::SentNotificationListener,
};
use crate::client::inbound_messages::InputMessageReceiver;
use crate::client::packet_statistics_control::PacketStatisticsReporter;
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller::ReplyControllerSender;
use crate::spawn_future;
@@ -23,7 +24,6 @@ use nym_sphinx::{
chunking::fragment::{Fragment, FragmentIdentifier},
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use rand::{CryptoRng, Rng};
use std::{
sync::{Arc, Weak},
@@ -209,7 +209,7 @@ where
connectors: AcknowledgementControllerConnectors,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
) -> Self {
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();
@@ -35,7 +35,7 @@ use crate::client::replies::reply_controller;
use crate::config;
pub(crate) use acknowledgement_control::{AckActionSender, Action};
use nym_statistics_common::clients::ClientStatsSender;
use super::packet_statistics_control::PacketStatisticsReporter;
pub(crate) mod acknowledgement_control;
pub(crate) mod message_handler;
@@ -145,7 +145,7 @@ impl RealMessagesController<OsRng> {
reply_controller_receiver: ReplyControllerReceiver,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
) -> Self {
let rng = OsRng;
@@ -3,6 +3,7 @@
use self::sending_delay_controller::SendingDelayController;
use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter};
use crate::client::real_messages_control::acknowledgement_control::SentPacketNotificationSender;
use crate::client::topology_control::TopologyAccessor;
use crate::client::transmission_buffer::TransmissionBuffer;
@@ -18,7 +19,6 @@ use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketSize;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
@@ -115,8 +115,8 @@ where
/// Report queue lengths so that upstream can backoff sending data, and keep connections open.
lane_queue_lengths: LaneQueueLengths,
/// Channel used for sending metrics events (specifically `PacketStatistics` events) to the metrics tracker.
stats_tx: ClientStatsSender,
/// Channel used for sending statistics events to `PacketStatisticsControl`.
stats_tx: PacketStatisticsReporter,
}
#[derive(Debug)]
@@ -175,7 +175,7 @@ where
topology_access: TopologyAccessor,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
) -> Self {
OutQueueControl {
config,
@@ -277,7 +277,7 @@ where
} else {
PacketStatisticsEvent::CoverPacketSent(packet_size)
};
self.stats_tx.report(event.into());
self.stats_tx.report(event);
}
// notify ack controller about sending our message only after we actually managed to push it
@@ -373,13 +373,13 @@ where
TransmissionLane::Retransmission => Some(PacketStatisticsEvent::RetransmissionQueued),
};
if let Some(stat_event) = stat_event {
self.stats_tx.report(stat_event.into());
self.stats_tx.report(stat_event);
}
// To avoid comparing apples to oranges when presenting the fraction of packets that are
// retransmissions, we also need to keep track to the total number of real messages queued,
// even though we also track the actual number of messages sent later in the pipeline.
self.stats_tx
.report(PacketStatisticsEvent::RealPacketQueued.into());
.report(PacketStatisticsEvent::RealPacketQueued);
Some(real_next)
}
@@ -1,8 +1,9 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::replies::{
reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
use crate::client::{
packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter},
replies::{reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys},
};
use crate::spawn_future;
use futures::channel::mpsc;
@@ -19,7 +20,6 @@ use nym_sphinx::anonymous_replies::{encryption_key::EncryptionKeyDigest, SurbEnc
use nym_sphinx::message::{NymMessage, PlainMessage};
use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use std::collections::HashSet;
use std::sync::Arc;
@@ -46,7 +46,7 @@ struct ReceivedMessagesBufferInner<R: MessageReceiver> {
// and every now and then remove ids older than X
recently_reconstructed: HashSet<i32>,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
}
impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
@@ -61,12 +61,16 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
// received and sent packets due to the sphinx layers being removed by the exit gateway
// before it reaches the mixnet client.
self.stats_tx
.report(PacketStatisticsEvent::CoverPacketReceived(fragment_data_size).into());
.report(PacketStatisticsEvent::CoverPacketReceived(
fragment_data_size,
));
return None;
}
self.stats_tx
.report(PacketStatisticsEvent::RealPacketReceived(fragment_data_size).into());
.report(PacketStatisticsEvent::RealPacketReceived(
fragment_data_size,
));
let fragment = match self.message_receiver.recover_fragment(fragment_data) {
Err(err) => {
@@ -159,7 +163,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
local_encryption_keypair: Arc<encryption::KeyPair>,
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
stats_tx: ClientStatsSender,
stats_tx: PacketStatisticsReporter,
) -> Self {
ReceivedMessagesBuffer {
inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
@@ -500,13 +504,13 @@ impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferControll
mixnet_packet_receiver: MixnetMessageReceiver,
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
metrics_reporter: ClientStatsSender,
packet_statistics_reporter: PacketStatisticsReporter,
) -> Self {
let received_buffer = ReceivedMessagesBuffer::new(
local_encryption_keypair,
reply_key_storage,
reply_controller_sender,
metrics_reporter,
packet_statistics_reporter,
);
ReceivedMessagesBufferController {
@@ -1,151 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! # Statistics collection and reporting.
//!
//! Modular metrics collection and reporting system. submodules can be added to collect different types of metrics.
//! On creation the Statistics controller will start a task that will listen for incoming stats events and
//! multiplex them out to the appropriate metrics module based on type.
//!
//! Adding A new module you need to write a new module that implements the `StatsObj` trait and add it to
//! the `stats` hashmap in the `StatisticsControl` struct during it's initialization in the `new` function in
//! this file.
#![warn(clippy::expect_used)]
#![warn(clippy::unwrap_used)]
#![warn(clippy::todo)]
#![warn(clippy::dbg_macro)]
use std::time::Duration;
use nym_client_core_config_types::StatsReporting;
use nym_sphinx::addressing::Recipient;
use nym_statistics_common::clients::{
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
};
use nym_task::connections::TransmissionLane;
use crate::{
client::inbound_messages::{InputMessage, InputMessageSender},
spawn_future,
};
/// Time interval between reporting statistics locally (logging/task_client)
const LOCAL_REPORT_INTERVAL: Duration = Duration::from_secs(2);
/// Interval for taking snapshots of the statistics
const SNAPSHOT_INTERVAL: Duration = Duration::from_millis(500);
/// Launches and manages metrics collection and reporting.
///
/// This is designed to be generic to allow for multiple types of metrics to be collected and
/// reported.
pub(crate) struct StatisticsControl {
/// Keep store the different types of metrics collectors
stats: ClientStatsController,
/// Incoming packet stats events from other tasks
stats_rx: ClientStatsReceiver,
/// Channel to send stats report through the mixnet
report_tx: InputMessageSender,
/// Config for stats reporting (enabled, address, interval)
reporting_config: StatsReporting,
}
impl StatisticsControl {
pub(crate) fn create(
reporting_config: StatsReporting,
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
) -> (Self, ClientStatsSender) {
let (stats_tx, stats_rx) = tokio::sync::mpsc::unbounded_channel();
let stats = ClientStatsController::new(client_stats_id, client_type);
(
StatisticsControl {
stats,
stats_rx,
report_tx,
reporting_config,
},
ClientStatsSender::new(Some(stats_tx)),
)
}
async fn report_stats(&mut self, recipient: Recipient) {
let stats_report = self.stats.build_report();
let report_message = InputMessage::new_regular(
recipient,
stats_report.into(),
TransmissionLane::General,
None,
);
if let Err(err) = self.report_tx.send(report_message).await {
log::error!("Failed to report client stats: {:?}", err);
} else {
self.stats.reset();
}
}
async fn run_with_shutdown(&mut self, mut task_client: nym_task::TaskClient) {
log::debug!("Started StatisticsControl with graceful shutdown support");
let mut stats_report_interval =
tokio::time::interval(self.reporting_config.reporting_interval);
let mut local_report_interval = tokio::time::interval(LOCAL_REPORT_INTERVAL);
let mut snapshot_interval = tokio::time::interval(SNAPSHOT_INTERVAL);
loop {
tokio::select! {
stats_event = self.stats_rx.recv() => match stats_event {
Some(stats_event) => self.stats.handle_event(stats_event),
None => {
log::trace!("StatisticsControl: shutting down due to closed stats channel");
break;
}
},
_ = snapshot_interval.tick() => {
self.stats.snapshot();
}
_ = stats_report_interval.tick(), if self.reporting_config.enabled && self.reporting_config.provider_address.is_some() => {
// SAFTEY : this branch executes only if reporting is not none, so unwrapp is fine
#[allow(clippy::unwrap_used)]
self.report_stats(self.reporting_config.provider_address.unwrap()).await;
}
_ = local_report_interval.tick() => {
self.stats.local_report(&mut task_client);
}
_ = task_client.recv_with_delay() => {
log::trace!("StatisticsControl: Received shutdown");
break;
},
}
}
task_client.recv_timeout().await;
log::debug!("StatisticsControl: Exiting");
}
pub(crate) fn start_with_shutdown(mut self, task_client: nym_task::TaskClient) {
spawn_future(async move {
self.run_with_shutdown(task_client).await;
})
}
pub(crate) fn create_and_start_with_shutdown(
reporting_config: StatsReporting,
client_type: String,
client_stats_id: String,
report_tx: InputMessageSender,
task_client: nym_task::TaskClient,
) -> ClientStatsSender {
let (controller, sender) =
Self::create(reporting_config, client_type, client_stats_id, report_tx);
controller.start_with_shutdown(task_client);
sender
}
}
@@ -112,7 +112,7 @@ impl GeoAwareTopologyProvider {
async fn get_topology(&self) -> Option<NymTopology> {
let mixnodes = match self
.validator_client
.get_all_basic_active_mixing_assigned_nodes(Some(self.client_version.clone()))
.get_basic_active_mixing_assigned_nodes(Some(self.client_version.clone()))
.await
{
Err(err) => {
@@ -6,6 +6,7 @@ pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
use futures::StreamExt;
use log::*;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::NymTopologyError;
use std::time::Duration;
@@ -17,11 +18,7 @@ use wasmtimer::tokio::sleep;
mod accessor;
pub mod geo_aware_provider;
pub mod nym_api_provider;
pub use geo_aware_provider::GeoAwareTopologyProvider;
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
pub use nym_topology::provider_trait::TopologyProvider;
pub(crate) mod nym_api_provider;
// TODO: move it to config later
const MAX_FAILURE_COUNT: usize = 10;
@@ -14,10 +14,9 @@ use url::Url;
pub const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
pub const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
#[derive(Debug)]
pub struct Config {
pub min_mixnode_performance: u8,
pub min_gateway_performance: u8,
pub(crate) struct Config {
pub(crate) min_mixnode_performance: u8,
pub(crate) min_gateway_performance: u8,
}
impl Default for Config {
@@ -30,7 +29,7 @@ impl Default for Config {
}
}
pub struct NymApiTopologyProvider {
pub(crate) struct NymApiTopologyProvider {
config: Config,
validator_client: nym_validator_client::client::NymApiClient,
@@ -41,7 +40,7 @@ pub struct NymApiTopologyProvider {
}
impl NymApiTopologyProvider {
pub fn new(
pub(crate) fn new(
config: Config,
mut nym_api_urls: Vec<Url>,
client_version: String,
@@ -99,7 +98,7 @@ impl NymApiTopologyProvider {
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
let mixnodes = match self
.validator_client
.get_all_basic_active_mixing_assigned_nodes(Some(self.client_version.clone()))
.get_basic_active_mixing_assigned_nodes(Some(self.client_version.clone()))
.await
{
Err(err) => {
+5 -22
View File
@@ -212,29 +212,12 @@ pub enum ClientCoreError {
}
/// Set of messages that the client can send to listeners via the task manager
#[derive(Debug)]
#[derive(thiserror::Error, Debug)]
pub enum ClientCoreStatusMessage {
// NOTE: The nym-connect frontend listens for these strings, so don't change them until we have a more robust mechanism in place
#[error("The connected gateway is slow, or the connection to it is slow")]
GatewayIsSlow,
// NOTE: The nym-connect frontend listens for these strings, so don't change them until we have a more robust mechanism in place
#[error("The connected gateway is very slow, or the connection to it is very slow")]
GatewayIsVerySlow,
}
impl std::fmt::Display for ClientCoreStatusMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ClientCoreStatusMessage::GatewayIsSlow => write!(
f,
"The connected gateway is slow, or the connection to it is slow"
),
ClientCoreStatusMessage::GatewayIsVerySlow => write!(
f,
"The connected gateway is very slow, or the connection to it is very slow"
),
}
}
}
impl nym_task::TaskStatusEvent for ClientCoreStatusMessage {
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
+1 -3
View File
@@ -121,9 +121,7 @@ pub async fn current_mixnodes<R: Rng>(
log::trace!("Fetching list of mixnodes from: {nym_api}");
let mixnodes = client
.get_all_basic_active_mixing_assigned_nodes(None)
.await?;
let mixnodes = client.get_basic_active_mixing_assigned_nodes(None).await?;
let valid_mixnodes = mixnodes
.iter()
.filter_map(|mixnode| mixnode.try_into().ok())
@@ -18,8 +18,8 @@ use nym_api_requests::ecash::{
PartialExpirationDateSignatureResponse, VerificationKeyResponse,
};
use nym_api_requests::models::{
ApiHealthResponse, GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
NymNodeDescription, RewardEstimationResponse, StakeSaturationResponse,
GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
RewardEstimationResponse, StakeSaturationResponse,
};
use nym_api_requests::models::{LegacyDescribedGateway, MixNodeBondAnnotated};
use nym_api_requests::nym_nodes::SkimmedNode;
@@ -30,10 +30,10 @@ use time::Date;
use url::Url;
pub use crate::nym_api::NymApiClientExt;
use nym_mixnet_contract_common::NymNodeDetails;
pub use nym_mixnet_contract_common::{
mixnode::MixNodeDetails, GatewayBond, IdentityKey, IdentityKeyRef, NodeId,
};
// re-export the type to not break existing imports
pub use crate::coconut::EcashApiClient;
@@ -106,9 +106,7 @@ impl Config {
pub struct Client<C, S = NoSigner> {
// ideally they would have been read-only, but unfortunately rust doesn't have such features
// #[deprecated(note = "please use `nym_api_client` instead")]
pub nym_api: nym_api::Client,
// pub nym_api_client: NymApiClient,
pub nyxd: NyxdClient<C, S>,
}
@@ -192,8 +190,6 @@ impl<C, S> Client<C, S> {
}
// validator-api wrappers
// we have to allow the use of deprecated method here as they're calling the deprecated trait methods
#[allow(deprecated)]
impl<C, S> Client<C, S> {
pub fn api_url(&self) -> &Url {
self.nym_api.current_url()
@@ -203,102 +199,50 @@ impl<C, S> Client<C, S> {
self.nym_api.change_base_url(new_endpoint)
}
#[deprecated]
pub async fn get_cached_mixnodes(&self) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_mixnodes_detailed(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
Ok(self.nym_api.get_mixnodes_detailed().await?)
}
#[deprecated]
pub async fn get_cached_mixnodes_detailed_unfiltered(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
Ok(self.nym_api.get_mixnodes_detailed_unfiltered().await?)
}
#[deprecated]
pub async fn get_cached_rewarded_mixnodes(
&self,
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_rewarded_mixnodes_detailed(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_mixnodes_detailed().await?)
}
#[deprecated]
pub async fn get_cached_active_mixnodes(
&self,
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_active_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_active_mixnodes_detailed(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
Ok(self.nym_api.get_active_mixnodes_detailed().await?)
}
#[deprecated]
pub async fn get_cached_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
Ok(self.nym_api.get_gateways().await?)
}
// TODO: combine with NymApiClient...
pub async fn get_all_cached_described_nodes(
&self,
) -> Result<Vec<NymNodeDescription>, ValidatorClientError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut descriptions = Vec::new();
loop {
let mut res = self.nym_api.get_nodes_described(Some(page), None).await?;
descriptions.append(&mut res.data);
if descriptions.len() < res.pagination.total {
page += 1
} else {
break;
}
}
Ok(descriptions)
}
// TODO: combine with NymApiClient...
pub async fn get_all_cached_bonded_nym_nodes(
&self,
) -> Result<Vec<NymNodeDetails>, ValidatorClientError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut bonds = Vec::new();
loop {
let mut res = self.nym_api.get_nym_nodes(Some(page), None).await?;
bonds.append(&mut res.data);
if bonds.len() < res.pagination.total {
page += 1
} else {
break;
}
}
Ok(bonds)
}
pub async fn blind_sign(
&self,
request_body: &BlindSignRequestBody,
@@ -314,8 +258,6 @@ pub struct NymApiClient {
// we could re-implement the communication with the REST API on port 1317
}
// we have to allow the use of deprecated method here as they're calling the deprecated trait methods
#[allow(deprecated)]
impl NymApiClient {
pub fn new(api_url: Url) -> Self {
let nym_api = nym_api::Client::new(api_url, None);
@@ -348,7 +290,7 @@ impl NymApiClient {
self.nym_api.change_base_url(new_endpoint);
}
#[deprecated(note = "use get_all_basic_active_mixing_assigned_nodes instead")]
#[deprecated(note = "use get_basic_active_mixing_assigned_nodes instead")]
pub async fn get_basic_mixnodes(
&self,
semver_compatibility: Option<String>,
@@ -385,7 +327,7 @@ impl NymApiClient {
loop {
let mut res = self
.nym_api
.get_basic_entry_assigned_nodes(
.get_all_basic_entry_assigned_nodes(
semver_compatibility.clone(),
false,
Some(page),
@@ -406,7 +348,7 @@ impl NymApiClient {
/// retrieve basic information for nodes that got assigned 'mixing' node in this epoch
/// this includes legacy mixnodes and nym-nodes
pub async fn get_all_basic_active_mixing_assigned_nodes(
pub async fn get_basic_active_mixing_assigned_nodes(
&self,
semver_compatibility: Option<String>,
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
@@ -436,142 +378,32 @@ impl NymApiClient {
Ok(nodes)
}
/// retrieve basic information for nodes are capable of operating as a mixnode
/// this includes legacy mixnodes and nym-nodes
pub async fn get_all_basic_mixing_capable_nodes(
&self,
semver_compatibility: Option<String>,
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut nodes = Vec::new();
loop {
let mut res = self
.nym_api
.get_basic_mixing_capable_nodes(
semver_compatibility.clone(),
false,
Some(page),
None,
)
.await?;
nodes.append(&mut res.nodes.data);
if nodes.len() < res.nodes.pagination.total {
page += 1
} else {
break;
}
}
Ok(nodes)
}
/// retrieve basic information for all bonded nodes on the network
pub async fn get_all_basic_nodes(
&self,
semver_compatibility: Option<String>,
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut nodes = Vec::new();
loop {
let mut res = self
.nym_api
.get_basic_nodes(semver_compatibility.clone(), false, Some(page), None)
.await?;
nodes.append(&mut res.nodes.data);
if nodes.len() < res.nodes.pagination.total {
page += 1
} else {
break;
}
}
Ok(nodes)
}
pub async fn health(&self) -> Result<ApiHealthResponse, ValidatorClientError> {
Ok(self.nym_api.health().await?)
}
#[deprecated]
pub async fn get_cached_active_mixnodes(
&self,
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_active_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_rewarded_mixnodes(
&self,
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_rewarded_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_mixnodes(&self) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
Ok(self.nym_api.get_mixnodes().await?)
}
#[deprecated]
pub async fn get_cached_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
Ok(self.nym_api.get_gateways().await?)
}
#[deprecated]
pub async fn get_cached_described_gateways(
&self,
) -> Result<Vec<LegacyDescribedGateway>, ValidatorClientError> {
Ok(self.nym_api.get_gateways_described().await?)
}
pub async fn get_all_described_nodes(
&self,
) -> Result<Vec<NymNodeDescription>, ValidatorClientError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut descriptions = Vec::new();
loop {
let mut res = self.nym_api.get_nodes_described(Some(page), None).await?;
descriptions.append(&mut res.data);
if descriptions.len() < res.pagination.total {
page += 1
} else {
break;
}
}
Ok(descriptions)
}
pub async fn get_all_bonded_nym_nodes(
&self,
) -> Result<Vec<NymNodeDetails>, ValidatorClientError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut bonds = Vec::new();
loop {
let mut res = self.nym_api.get_nym_nodes(Some(page), None).await?;
bonds.append(&mut res.data);
if bonds.len() < res.pagination.total {
page += 1
} else {
break;
}
}
Ok(bonds)
}
#[deprecated]
pub async fn get_gateway_core_status_count(
&self,
identity: IdentityKeyRef<'_>,
@@ -583,7 +415,6 @@ impl NymApiClient {
.await?)
}
#[deprecated]
pub async fn get_mixnode_core_status_count(
&self,
mix_id: NodeId,
@@ -595,7 +426,6 @@ impl NymApiClient {
.await?)
}
#[deprecated]
pub async fn get_mixnode_status(
&self,
mix_id: NodeId,
@@ -603,7 +433,6 @@ impl NymApiClient {
Ok(self.nym_api.get_mixnode_status(mix_id).await?)
}
#[deprecated]
pub async fn get_mixnode_reward_estimation(
&self,
mix_id: NodeId,
@@ -611,7 +440,6 @@ impl NymApiClient {
Ok(self.nym_api.get_mixnode_reward_estimation(mix_id).await?)
}
#[deprecated]
pub async fn get_mixnode_stake_saturation(
&self,
mix_id: NodeId,
@@ -643,7 +471,6 @@ impl NymApiClient {
.await?)
}
#[deprecated]
pub async fn spent_credentials_filter(
&self,
) -> Result<SpentCredentialsResponse, ValidatorClientError> {
@@ -164,7 +164,7 @@ async fn test_nym_api_connection(
) -> ConnectionResult {
let result = match timeout(
Duration::from_secs(CONNECTION_TEST_TIMEOUT_SEC),
client.health(),
client.get_cached_mixnodes(),
)
.await
{
@@ -11,11 +11,9 @@ use nym_api_requests::ecash::models::{
};
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, LegacyDescribedMixNode, NodePerformanceResponse,
NymNodeDescription,
AnnotationResponse, LegacyDescribedMixNode, NodePerformanceResponse,
};
use nym_api_requests::nym_nodes::PaginatedCachedNodesResponse;
use nym_api_requests::pagination::PaginatedResponse;
pub use nym_api_requests::{
ecash::{
models::{
@@ -40,7 +38,7 @@ use nym_contracts_common::IdentityKey;
pub use nym_http_api_client::Client;
use nym_http_api_client::{ApiClient, NO_PARAMS};
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, NodeId, NymNodeDetails};
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, NodeId};
use time::format_description::BorrowedFormatItem;
use time::Date;
use tracing::instrument;
@@ -55,26 +53,12 @@ pub fn rfc_3339_date() -> Vec<BorrowedFormatItem<'static>> {
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait NymApiClientExt: ApiClient {
async fn health(&self) -> Result<ApiHealthResponse, NymAPIError> {
self.get_json(
&[
routes::API_VERSION,
routes::API_STATUS_ROUTES,
routes::HEALTH,
],
NO_PARAMS,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(&[routes::API_VERSION, routes::MIXNODES], NO_PARAMS)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
@@ -89,7 +73,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateways_detailed(&self) -> Result<Vec<GatewayBondAnnotated>, NymAPIError> {
self.get_json(
@@ -104,7 +87,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_detailed_unfiltered(
&self,
@@ -121,14 +103,12 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateways(&self) -> Result<Vec<GatewayBond>, NymAPIError> {
self.get_json(&[routes::API_VERSION, routes::GATEWAYS], NO_PARAMS)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateways_described(&self) -> Result<Vec<LegacyDescribedGateway>, NymAPIError> {
self.get_json(
@@ -138,7 +118,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_described(&self) -> Result<Vec<LegacyDescribedMixNode>, NymAPIError> {
self.get_json(
@@ -148,47 +127,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_nodes_described(
&self,
page: Option<u32>,
per_page: Option<u32>,
) -> Result<PaginatedResponse<NymNodeDescription>, NymAPIError> {
let mut params = Vec::new();
if let Some(page) = page {
params.push(("page", page.to_string()))
}
if let Some(per_page) = per_page {
params.push(("per_page", per_page.to_string()))
}
self.get_json(&[routes::API_VERSION, "nym-nodes", "described"], &params)
.await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_nym_nodes(
&self,
page: Option<u32>,
per_page: Option<u32>,
) -> Result<PaginatedResponse<NymNodeDetails>, NymAPIError> {
let mut params = Vec::new();
if let Some(page) = page {
params.push(("page", page.to_string()))
}
if let Some(per_page) = per_page {
params.push(("per_page", per_page.to_string()))
}
self.get_json(&[routes::API_VERSION, "nym-nodes", "bonded"], &params)
.await
}
#[deprecated]
#[tracing::instrument(level = "debug", skip_all)]
async fn get_basic_mixnodes(
&self,
@@ -213,7 +151,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_basic_gateways(
&self,
@@ -241,7 +178,7 @@ pub trait NymApiClientExt: ApiClient {
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
#[instrument(level = "debug", skip(self))]
async fn get_basic_entry_assigned_nodes(
async fn get_all_basic_entry_assigned_nodes(
&self,
semver_compatibility: Option<String>,
no_legacy: bool,
@@ -322,82 +259,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
/// retrieve basic information for nodes that got assigned 'mixing' node in this epoch
/// this includes legacy mixnodes and nym-nodes
#[instrument(level = "debug", skip(self))]
async fn get_basic_mixing_capable_nodes(
&self,
semver_compatibility: Option<String>,
no_legacy: bool,
page: Option<u32>,
per_page: Option<u32>,
) -> Result<PaginatedCachedNodesResponse<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if let Some(arg) = &semver_compatibility {
params.push(("semver_compatibility", arg.clone()))
}
if no_legacy {
params.push(("no_legacy", "true".to_string()))
}
if let Some(page) = page {
params.push(("page", page.to_string()))
}
if let Some(per_page) = per_page {
params.push(("per_page", per_page.to_string()))
}
self.get_json(
&[
routes::API_VERSION,
"unstable",
"nym-nodes",
"skimmed",
"mixnodes",
"all",
],
&params,
)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_basic_nodes(
&self,
semver_compatibility: Option<String>,
no_legacy: bool,
page: Option<u32>,
per_page: Option<u32>,
) -> Result<PaginatedCachedNodesResponse<SkimmedNode>, NymAPIError> {
let mut params = Vec::new();
if let Some(arg) = &semver_compatibility {
params.push(("semver_compatibility", arg.clone()))
}
if no_legacy {
params.push(("no_legacy", "true".to_string()))
}
if let Some(page) = page {
params.push(("page", page.to_string()))
}
if let Some(per_page) = per_page {
params.push(("per_page", per_page.to_string()))
}
self.get_json(
&[routes::API_VERSION, "unstable", "nym-nodes", "skimmed"],
&params,
)
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(
@@ -407,7 +268,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_active_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
@@ -423,7 +283,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(
@@ -433,7 +292,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_report(
&self,
@@ -452,7 +310,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateway_report(
&self,
@@ -471,7 +328,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_history(
&self,
@@ -490,7 +346,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateway_history(
&self,
@@ -509,7 +364,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_mixnodes_detailed(
&self,
@@ -527,7 +381,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateway_core_status_count(
&self,
@@ -560,7 +413,6 @@ pub trait NymApiClientExt: ApiClient {
}
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_core_status_count(
&self,
@@ -594,7 +446,6 @@ pub trait NymApiClientExt: ApiClient {
}
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_status(
&self,
@@ -613,7 +464,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_reward_estimation(
&self,
@@ -632,7 +482,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn compute_mixnode_reward_estimation(
&self,
@@ -653,7 +502,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_stake_saturation(
&self,
@@ -672,7 +520,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_inclusion_probability(
&self,
@@ -708,7 +555,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
async fn get_mixnode_avg_uptime(&self, mix_id: NodeId) -> Result<UptimeResponse, NymAPIError> {
self.get_json(
&[
@@ -723,7 +569,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_blacklisted(&self) -> Result<Vec<NodeId>, NymAPIError> {
self.get_json(
@@ -733,7 +578,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn get_gateways_blacklisted(&self) -> Result<Vec<IdentityKey>, NymAPIError> {
self.get_json(
@@ -794,7 +638,6 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[deprecated]
#[instrument(level = "debug", skip(self))]
async fn double_spending_filter_v1(&self) -> Result<SpentCredentialsResponse, NymAPIError> {
self.get_json(
@@ -36,8 +36,6 @@ pub mod ecash {
}
pub const STATUS_ROUTES: &str = "status";
pub const API_STATUS_ROUTES: &str = "api-status";
pub const HEALTH: &str = "health";
pub const MIXNODE: &str = "mixnode";
pub const GATEWAY: &str = "gateway";
pub const NYM_NODES: &str = "nym-nodes";
@@ -2,9 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
use crate::context::QueryClientWithNyxd;
use crate::utils::show_error;
use crate::utils::{pretty_cosmwasm_coin, show_error};
use clap::Parser;
use comfy_table::Table;
use nym_validator_client::client::NymApiClientExt;
#[derive(Debug, Parser)]
pub struct Args {
@@ -14,11 +15,12 @@ pub struct Args {
}
pub async fn query(args: Args, client: &QueryClientWithNyxd) {
match client.get_all_cached_described_nodes().await {
match client.nym_api.get_gateways().await {
Ok(res) => match args.identity_key {
Some(identity_key) => {
let node = res.iter().find(|node| {
node.ed25519_identity_key()
node.gateway
.identity_key
.to_string()
.eq_ignore_ascii_case(&identity_key)
});
@@ -30,16 +32,14 @@ pub async fn query(args: Args, client: &QueryClientWithNyxd) {
None => {
let mut table = Table::new();
table.set_header(vec!["Node Id", "Identity Key", "Version", "Is Legacy"]);
for node in res
.into_iter()
.filter(|node| node.description.declared_role.entry)
{
table.set_header(vec!["Identity Key", "Owner", "Host", "Bond", "Version"]);
for node in res {
table.add_row(vec![
node.node_id.to_string(),
node.ed25519_identity_key().to_base58_string(),
node.description.build_information.build_version,
(!node.contract_node_type.is_nym_node()).to_string(),
node.gateway.identity_key.to_string(),
node.owner.to_string(),
node.gateway.host.to_string(),
pretty_cosmwasm_coin(&node.pledge_amount),
node.gateway.version.clone(),
]);
}
@@ -2,9 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
use crate::context::QueryClientWithNyxd;
use crate::utils::show_error;
use crate::utils::{pretty_decimal_with_denom, show_error};
use clap::Parser;
use comfy_table::Table;
use nym_validator_client::client::NymApiClientExt;
#[derive(Debug, Parser)]
pub struct Args {
@@ -14,11 +15,13 @@ pub struct Args {
}
pub async fn query(args: Args, client: &QueryClientWithNyxd) {
match client.get_all_cached_described_nodes().await {
match client.nym_api.get_mixnodes().await {
Ok(res) => match args.identity_key {
Some(identity_key) => {
let node = res.iter().find(|node| {
node.ed25519_identity_key()
node.bond_information
.mix_node
.identity_key
.to_string()
.eq_ignore_ascii_case(&identity_key)
});
@@ -30,16 +33,25 @@ pub async fn query(args: Args, client: &QueryClientWithNyxd) {
None => {
let mut table = Table::new();
table.set_header(vec!["Node Id", "Identity Key", "Version", "Is Legacy"]);
for node in res
.into_iter()
.filter(|node| node.description.declared_role.mixnode)
{
table.set_header(vec![
"Mix id",
"Identity Key",
"Owner",
"Host",
"Bond",
"Total Delegations",
"Version",
]);
for node in res {
let denom = &node.bond_information.original_pledge().denom;
table.add_row(vec![
node.node_id.to_string(),
node.ed25519_identity_key().to_base58_string(),
node.description.build_information.build_version,
(!node.contract_node_type.is_nym_node()).to_string(),
node.mix_id().to_string(),
node.bond_information.mix_node.identity_key.clone(),
node.bond_information.owner.clone().into_string(),
node.bond_information.mix_node.host.clone(),
pretty_decimal_with_denom(node.rewarding_details.operator, denom),
pretty_decimal_with_denom(node.rewarding_details.delegates, denom),
node.bond_information.mix_node.version,
]);
}
@@ -17,7 +17,6 @@ use crate::{
use cosmwasm_schema::cw_serde;
use cosmwasm_std::{Addr, Coin, Decimal, StdResult, Uint128};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
/// Full details associated with given mixnode.
@@ -648,39 +647,14 @@ impl From<LegacyMixLayer> for u8 {
export_to = "ts-packages/types/src/types/rust/PendingMixnodeChanges.ts"
)
)]
// note: we had to remove `#[cw_serde]` as it enforces `#[serde(deny_unknown_fields)]` which we do not want
// with the addition of .cost_params_change field
#[derive(
::cosmwasm_schema::serde::Serialize,
::cosmwasm_schema::serde::Deserialize,
::std::clone::Clone,
::std::fmt::Debug,
::std::cmp::PartialEq,
::cosmwasm_schema::schemars::JsonSchema,
Default,
Copy,
)]
#[schemars(crate = "::cosmwasm_schema::schemars")]
#[cw_serde]
#[derive(Default, Copy)]
pub struct PendingMixNodeChanges {
pub pledge_change: Option<EpochEventId>,
#[serde(default)]
pub cost_params_change: Option<IntervalEventId>,
}
#[derive(Default, Copy, Clone, Debug, Serialize, Deserialize, JsonSchema)]
pub struct LegacyPendingMixNodeChanges {
pub pledge_change: Option<EpochEventId>,
}
impl From<PendingMixNodeChanges> for LegacyPendingMixNodeChanges {
fn from(value: PendingMixNodeChanges) -> Self {
LegacyPendingMixNodeChanges {
pledge_change: value.pledge_change,
}
}
}
impl PendingMixNodeChanges {
pub fn new_empty() -> PendingMixNodeChanges {
PendingMixNodeChanges {
@@ -21,7 +21,7 @@ pub mod error;
mod helpers;
mod state;
pub const TIME_RANGE_SEC: i64 = 30;
const TIME_RANGE_SEC: i64 = 30;
pub struct EcashManager<S> {
shared_state: SharedState<S>,
+2 -2
View File
@@ -6,7 +6,7 @@ use std::sync::Arc;
use time::{Date, OffsetDateTime};
use tracing::*;
use nym_credentials::ecash::utils::{cred_exp_date, ecash_today, EcashTime};
use nym_credentials::ecash::utils::{ecash_today, EcashTime};
use nym_credentials_interface::{Bandwidth, ClientTicket, TicketType};
use nym_gateway_requests::models::CredentialSpendingRequest;
use nym_gateway_storage::Storage;
@@ -131,7 +131,7 @@ impl<S: Storage + Clone + 'static> CredentialVerifier<S> {
let bandwidth = Bandwidth::ticket_amount(credential_type.into());
self.bandwidth_storage_manager
.increase_bandwidth(bandwidth, cred_exp_date())
.increase_bandwidth(bandwidth, spend_date)
.await?;
Ok(self
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
use std::fmt::{self, Debug, Display, Formatter};
use std::fmt::{self, Display, Formatter};
use std::str::FromStr;
use thiserror::Error;
use zeroize::{Zeroize, ZeroizeOnDrop};
@@ -112,18 +112,12 @@ impl PemStorableKeyPair for KeyPair {
}
}
#[derive(PartialEq, Eq, Hash, Copy, Clone)]
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
pub struct PublicKey(x25519_dalek::PublicKey);
impl Display for PublicKey {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(&self.to_base58_string(), f)
}
}
impl Debug for PublicKey {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Debug::fmt(&self.to_base58_string(), f)
write!(f, "{}", self.to_base58_string())
}
}
@@ -31,16 +31,8 @@ pub mod option_bs58_x25519_pubkey {
pub fn deserialize<'de, D: Deserializer<'de>>(
deserializer: D,
) -> Result<Option<PublicKey>, D::Error> {
match Option::<String>::deserialize(deserializer)? {
None => Ok(None),
Some(s) => {
if s.is_empty() {
Ok(None)
} else {
Some(PublicKey::from_base58_string(&s).map_err(serde::de::Error::custom))
.transpose()
}
}
}
let s = Option::<String>::deserialize(deserializer)?;
s.map(|s| PublicKey::from_base58_string(&s).map_err(serde::de::Error::custom))
.transpose()
}
}
+3 -9
View File
@@ -5,7 +5,7 @@ pub use ed25519_dalek::SignatureError;
use ed25519_dalek::{Signer, SigningKey};
pub use ed25519_dalek::{Verifier, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH, SIGNATURE_LENGTH};
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
use std::fmt::{self, Debug, Display, Formatter};
use std::fmt::{self, Display, Formatter};
use std::str::FromStr;
use thiserror::Error;
use zeroize::{Zeroize, ZeroizeOnDrop};
@@ -119,18 +119,12 @@ impl PemStorableKeyPair for KeyPair {
}
/// ed25519 EdDSA Public Key
#[derive(Copy, Clone, Eq, PartialEq)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct PublicKey(ed25519_dalek::VerifyingKey);
impl Display for PublicKey {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(&self.to_base58_string(), f)
}
}
impl Debug for PublicKey {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Debug::fmt(&self.to_base58_string(), f)
write!(f, "{}", self.to_base58_string())
}
}
+2 -2
View File
@@ -104,8 +104,8 @@ impl PersistentStatsStorage {
.await?)
}
pub async fn get_unique_users(&self, date: Date) -> Result<Vec<String>, StatsStorageError> {
Ok(self.session_manager.get_unique_users(date).await?)
pub async fn get_unique_users_count(&self, date: Date) -> Result<i32, StatsStorageError> {
Ok(self.session_manager.get_unique_users_count(date).await?)
}
pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> {
+6 -5
View File
@@ -71,13 +71,14 @@ impl SessionManager {
Ok(())
}
pub(crate) async fn get_unique_users(&self, date: Date) -> Result<Vec<String>> {
sqlx::query_scalar!(
"SELECT client_address as count FROM sessions_unique_users WHERE day = ?",
pub(crate) async fn get_unique_users_count(&self, date: Date) -> Result<i32> {
Ok(sqlx::query!(
"SELECT COUNT(*) as count FROM sessions_unique_users WHERE day = ?",
date
)
.fetch_all(&self.connection_pool)
.await
.fetch_one(&self.connection_pool)
.await?
.count)
}
pub(crate) async fn delete_unique_users(&self, before_date: Date) -> Result<()> {
-3
View File
@@ -35,9 +35,6 @@ pub enum HttpClientError<E: Display = String> {
source: reqwest::Error,
},
#[error("failed to deserialise received response: {source}")]
ResponseDeserialisationFailure { source: serde_json::Error },
#[error("provided url is malformed: {source}")]
MalformedUrl {
#[from]
-1
View File
@@ -11,7 +11,6 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
axum-client-ip.workspace = true
axum.workspace = true
bytes = { workspace = true }
colored.workspace = true
+3 -3
View File
@@ -1,18 +1,18 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use axum::extract::Request;
use axum::extract::{ConnectInfo, Request};
use axum::http::header::{HOST, USER_AGENT};
use axum::http::HeaderValue;
use axum::middleware::Next;
use axum::response::IntoResponse;
use axum_client_ip::InsecureClientIp;
use colored::Colorize;
use std::net::SocketAddr;
use std::time::Instant;
use tracing::info;
pub async fn logger(
InsecureClientIp(addr): InsecureClientIp,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
request: Request,
next: Next,
) -> impl IntoResponse {
+1 -1
View File
@@ -248,7 +248,7 @@ mod tests {
data: IpPacketRequestData::StaticConnect(
StaticConnectRequest {
request_id: 123,
ips: IpPair::new(Ipv4Addr::from_str("10.0.0.1").unwrap(), Ipv6Addr::from_str("fc00::1").unwrap()),
ips: IpPair::new(Ipv4Addr::from_str("10.0.0.1").unwrap(), Ipv6Addr::from_str("2001:db8:a160::1").unwrap()),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
reply_to_hops: None,
reply_to_avg_mix_delays: None,
+1 -1
View File
@@ -429,7 +429,7 @@ mod tests {
SignedStaticConnectRequest {
request: StaticConnectRequest {
request_id: 123,
ips: IpPair::new(Ipv4Addr::from_str("10.0.0.1").unwrap(), Ipv6Addr::from_str("fc00::1").unwrap()),
ips: IpPair::new(Ipv4Addr::from_str("10.0.0.1").unwrap(), Ipv6Addr::from_str("2001:db8:a160::1").unwrap()),
reply_to: Recipient::try_from_base58_string("D1rrpsysCGCYXy9saP8y3kmNpGtJZUXN9SvFoUcqAsM9.9Ssso1ea5NfkbMASdiseDSjTN1fSWda5SgEVjdSN4CvV@GJqd3ZxpXWSNxTfx7B1pPtswpetH4LnJdFeLeuY5KUuN").unwrap(),
reply_to_hops: None,
reply_to_avg_mix_delays: None,
+16 -7
View File
@@ -14,6 +14,7 @@ use nym_task::TaskClient;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
@@ -312,7 +313,7 @@ impl VerlocMeasurer {
info!("Starting verloc measurements");
// TODO: should we also measure gateways?
let all_mixes = match self.validator_client.get_all_described_nodes().await {
let all_mixes = match self.validator_client.get_cached_mixnodes().await {
Ok(nodes) => nodes,
Err(err) => {
error!(
@@ -331,14 +332,22 @@ impl VerlocMeasurer {
// we only care about address and identity
let tested_nodes = all_mixes
.into_iter()
.filter(|n| n.description.declared_role.mixnode)
.filter_map(|node| {
// try to parse the identity and host
let node_identity = node.ed25519_identity_key();
let mix_node = node.bond_information.mix_node;
// check if the node has sufficient version to be able to understand the packets
let node_version = parse_version(&mix_node.version).ok()?;
if node_version < self.config.minimum_compatible_node_version {
return None;
}
let ip = node.description.host_information.ip_address.first()?;
let verloc_port = node.description.verloc_port();
let verloc_host = SocketAddr::new(*ip, verloc_port);
// try to parse the identity and host
let node_identity =
identity::PublicKey::from_base58_string(mix_node.identity_key).ok()?;
let verloc_host = (&*mix_node.host, mix_node.verloc_port)
.to_socket_addrs()
.ok()?
.next()?;
// TODO: possible problem in the future, this does name resolution and theoretically
// if a lot of nodes maliciously mis-configured themselves, it might take a while to resolve them all
+2 -1
View File
@@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TestrunAssignment {
/// has nothing to do with GW identity key. This is PK from `gateways` table
pub testrun_id: i64,
pub gateway_identity_key: String,
pub gateway_pk_id: i64,
}
-4
View File
@@ -23,7 +23,3 @@ default = ["env", "network"]
env = ["dotenvy", "log"]
network = ["schemars", "serde", "url"]
utoipa = [ "dep:utoipa" ]
[build-dependencies]
regex = { workspace = true }
cargo_metadata = { version = "0.18" }
-85
View File
@@ -1,85 +0,0 @@
use cargo_metadata::MetadataCommand;
use regex::Regex;
use std::{collections::HashMap, fs, path::PathBuf};
/// Sync variable values defined in code with .env file
fn main() {
let source_of_truth = include_str!("src/mainnet.rs");
let mut output_path = workspace_root();
output_path.push("envs");
output_path.push("mainnet.env");
println!("{}", output_path.display());
let variables_to_track = [
"NETWORK_NAME",
"BECH32_PREFIX",
"MIXNET_CONTRACT_ADDRESS",
"VESTING_CONTRACT_ADDRESS",
"GROUP_CONTRACT_ADDRESS",
"ECASH_CONTRACT_ADDRESS",
"MULTISIG_CONTRACT_ADDRESS",
"COCONUT_DKG_CONTRACT_ADDRESS",
"REWARDING_VALIDATOR_ADDRESS",
"NYM_API",
"NYXD_WS",
"EXPLORER_API",
"NYM_VPN_API",
];
let mut replace_with = HashMap::new();
for var in variables_to_track {
// if script fails, debug with `cargo check -vv``
println!("Looking for {}", var);
// read pattern that looks like:
// <var>: &str = "<whatever is between quotes>"
let pattern = format!(r#"{}: &str\s*=\s*"([^"]*)""#, regex::escape(var));
let re = Regex::new(&pattern).unwrap();
let value = re
.captures(source_of_truth)
.and_then(|caps| caps.get(1).map(|match_| match_.as_str().to_string()))
.expect("Couldn't find var in source file");
println!("Storing {}={}", var, value);
replace_with.insert(var, value);
}
let mut contents = fs::read_to_string(&output_path).unwrap();
for (var, value) in replace_with {
// match a pattern that looks like:
// <var> = <value>
// where `<var>` is a variable name inserted into search pattern
let pattern = format!(r#"{}\s*=\s*([^\n]*)"#, regex::escape(var));
// replace matched pattern with
// <var>=<value>
let re = Regex::new(&pattern).unwrap();
contents = re
.replace(&contents, |_: &regex::Captures| {
format!(r#"{}={}"#, var, value)
})
.to_string();
}
println!("File contents to write:\n{}", contents);
if output_path.exists() {
fs::write(output_path, contents).unwrap();
} else {
panic!("{} doesn't exist", output_path.display());
}
}
fn workspace_root() -> PathBuf {
let metadata = MetadataCommand::new()
.exec()
.expect("Failed to get cargo metadata");
metadata
.workspace_root
.into_std_path_buf()
.canonicalize()
.expect("Failed to canonicalize path")
}
+4 -7
View File
@@ -45,16 +45,13 @@ pub mod nyx {
}
pub mod wireguard {
use std::net::{Ipv4Addr, Ipv6Addr};
use std::net::{IpAddr, Ipv4Addr};
pub const WG_PORT: u16 = 51822;
// The interface used to route traffic
pub const WG_TUN_BASE_NAME_V4: &str = "nymwg";
pub const WG_TUN_BASE_NAME_V6: &str = "nymwgv6";
pub const WG_TUN_BASE_NAME: &str = "nymwg";
pub const WG_TUN_DEVICE_ADDRESS: &str = "10.1.0.1";
pub const WG_TUN_DEVICE_IP_ADDRESS_V4: Ipv4Addr = Ipv4Addr::new(10, 1, 0, 1);
pub const WG_TUN_DEVICE_NETMASK_V4: u8 = 16;
pub const WG_TUN_DEVICE_IP_ADDRESS_V6: Ipv6Addr = Ipv6Addr::new(0xfc01, 0, 0, 0, 0, 0, 0, 0x1); // fc01::1
pub const WG_TUN_DEVICE_NETMASK_V6: u8 = 112;
pub const WG_TUN_DEVICE_IP_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::new(10, 1, 0, 1));
pub const WG_TUN_DEVICE_NETMASK: &str = "255.255.255.0";
}
-1
View File
@@ -25,7 +25,6 @@ pub const NYXD_WEBSOCKET: &str = "NYXD_WS";
pub const EXPLORER_API: &str = "EXPLORER_API";
pub const EXIT_POLICY_URL: &str = "EXIT_POLICY";
pub const NYM_VPN_API: &str = "NYM_VPN_API";
pub const CLIENT_STATS_COLLECTION_PROVIDER: &str = "CLIENT_STATS_COLLECTION_PROVIDER";
pub const DKG_TIME_CONFIGURATION: &str = "DKG_TIME_CONFIGURATION";
+34 -9
View File
@@ -42,8 +42,32 @@ impl PendingSync {
}
}
#[derive(Debug, Clone)]
pub struct BlockProcessorConfig {
pub pruning_options: PruningOptions,
pub store_precommits: bool,
}
impl Default for BlockProcessorConfig {
fn default() -> Self {
Self {
pruning_options: PruningOptions::nothing(),
store_precommits: true,
}
}
}
impl BlockProcessorConfig {
pub fn new(pruning_options: PruningOptions, store_precommits: bool) -> Self {
Self {
pruning_options,
store_precommits,
}
}
}
pub struct BlockProcessor {
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
@@ -65,9 +89,10 @@ pub struct BlockProcessor {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
pub async fn new(
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
@@ -82,7 +107,7 @@ impl BlockProcessor {
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
Ok(BlockProcessor {
pruning_options,
config,
cancel,
synced,
last_processed_height,
@@ -101,7 +126,7 @@ impl BlockProcessor {
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.pruning_options = pruning_options;
self.config.pruning_options = pruning_options;
self
}
@@ -128,7 +153,7 @@ impl BlockProcessor {
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;
persist_block(&full_info, &mut tx).await?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
// let the modules do whatever they want
// the ones wanting the full block:
@@ -241,7 +266,7 @@ impl BlockProcessor {
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
@@ -282,12 +307,12 @@ impl BlockProcessor {
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.pruning_options.strategy.is_nothing() {
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.pruning_options.strategy_interval();
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
@@ -371,7 +396,7 @@ impl BlockProcessor {
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
self.last_processed_height = max(self.last_processed_height, last_to_keep);
+14 -3
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::BlockProcessor;
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
@@ -34,6 +34,8 @@ pub struct Config {
pub database_path: PathBuf,
pub pruning_options: PruningOptions,
pub store_precommits: bool,
}
pub struct NyxdScraperBuilder {
@@ -60,8 +62,14 @@ impl NyxdScraperBuilder {
req_rx,
processing_tx.clone(),
);
let mut block_processor = BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
scraper.config.pruning_options,
scraper.config.store_precommits,
);
let mut block_processor = BlockProcessor::new(
block_processor_config,
scraper.cancel_token.clone(),
scraper.startup_sync.clone(),
processing_rx,
@@ -275,8 +283,11 @@ impl NyxdScraper {
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
let block_processor_config =
BlockProcessorConfig::new(self.config.pruning_options, self.config.store_precommits);
BlockProcessor::new(
self.config.pruning_options,
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
+7 -5
View File
@@ -212,6 +212,7 @@ impl ScraperStorage {
pub async fn persist_block(
block: &FullBlockInformation,
tx: &mut StorageTransaction,
store_precommits: bool,
) -> Result<(), ScraperError> {
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
@@ -224,11 +225,12 @@ pub async fn persist_block(
// persist block data
persist_block_data(&block.block, total_gas, tx).await?;
// persist commits
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
if store_precommits {
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
-6
View File
@@ -28,9 +28,3 @@ impl From<ConnectionError> for Socks5ClientCoreError {
}
}
}
impl nym_task::TaskStatusEvent for Socks5ClientCoreError {
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
+2 -1
View File
@@ -23,7 +23,8 @@ use nym_client_core::init::types::GatewaySetup;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::params::PacketType;
use nym_task::{TaskClient, TaskHandle, TaskStatus};
use nym_task::manager::TaskStatus;
use nym_task::{TaskClient, TaskHandle};
use anyhow::anyhow;
use nym_validator_client::UserAgent;
-9
View File
@@ -11,16 +11,7 @@ license.workspace = true
[dependencies]
futures = { workspace = true }
log = { workspace = true }
sysinfo = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
si-scale = { workspace = true }
nym-sphinx = { path = "../nymsphinx" }
nym-credentials-interface = { path = "../credentials-interface" }
nym-metrics = { path = "../nym-metrics" }
nym-task = { path = "../task" }
@@ -1,83 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! # Gateway Connection statistics
//!
//! Metrics collected by the client while establishing and maintaining connections to the gateway.
use super::ClientStatsEvents;
use std::collections::VecDeque;
use nym_metrics::{inc, inc_by};
use serde::{Deserialize, Serialize};
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub(crate) struct GatewayStats {
// Sent
real_packets_sent: u64,
real_packets_sent_size: usize,
/// failed connection statistics
failures: VecDeque<()>, // TODO
}
impl GatewayStats {
fn handle(&mut self, event: GatewayStatsEvent) {
match event {
GatewayStatsEvent::RealPacketSent(packet_size) => {
self.real_packets_sent += 1;
self.real_packets_sent_size += packet_size;
inc!("real_packets_sent");
inc_by!("real_packets_sent_size", packet_size);
}
}
}
fn summary(&self) -> (String, String) {
(
format!("packets sent: {}", self.real_packets_sent),
"packets received: todo".to_owned(),
)
}
}
impl From<GatewayStatsEvent> for ClientStatsEvents {
fn from(event: GatewayStatsEvent) -> ClientStatsEvents {
ClientStatsEvents::GatewayConn(event)
}
}
/// Event space for Gateway Connection Events
#[derive(Debug)]
pub enum GatewayStatsEvent {
/// The real packets sent. Recall that acks are sent by the gateway, so it's not included here.
RealPacketSent(usize),
}
/// Gateway Statistics Tracking
#[derive(Default)]
pub struct GatewayStatsControl {
// Keep track of packet statistics over time
stats: GatewayStats,
}
impl GatewayStatsControl {
pub(crate) fn handle_event(&mut self, event: GatewayStatsEvent) {
self.stats.handle(event)
}
pub(crate) fn report(&self) -> GatewayStats {
self.stats.clone()
}
pub(crate) fn local_report(&self) {
self.report_counters();
}
fn report_counters(&self) {
log::trace!("packet statistics: {:?}", &self.stats);
let (summary_sent, summary_recv) = self.stats.summary();
log::debug!("{}", summary_sent);
log::debug!("{}", summary_recv);
}
}
-135
View File
@@ -1,135 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::report::{ClientStatsReport, OsInformation};
use nym_task::TaskClient;
use time::{OffsetDateTime, Time};
use tokio::sync::mpsc::UnboundedSender;
/// Active gateway connection statistics.
pub mod gateway_conn_statistics;
/// Nym API connection statistics.
pub mod nym_api_statistics;
/// Packet count based statistics.
pub mod packet_statistics;
/// Channel receiving generic stats events to be used by a statistics aggregator.
pub type ClientStatsReceiver = tokio::sync::mpsc::UnboundedReceiver<ClientStatsEvents>;
/// Channel allowing generic statistics events to be reported to a stats event aggregator
#[derive(Clone)]
pub struct ClientStatsSender {
stats_tx: Option<UnboundedSender<ClientStatsEvents>>,
}
impl ClientStatsSender {
/// Create a new statistics Sender
pub fn new(stats_tx: Option<UnboundedSender<ClientStatsEvents>>) -> Self {
ClientStatsSender { stats_tx }
}
/// Report a statistics event using the sender.
pub fn report(&self, event: ClientStatsEvents) {
if let Some(tx) = &self.stats_tx {
if let Err(err) = tx.send(event) {
log::error!("Failed to send stats event: {:?}", err);
}
}
}
}
/// Client Statistics events (static for now)
pub enum ClientStatsEvents {
/// Packet count events
PacketStatistics(packet_statistics::PacketStatisticsEvent),
/// Gateway Connection events
GatewayConn(gateway_conn_statistics::GatewayStatsEvent),
/// Nym API connection events
NymApi(nym_api_statistics::NymApiStatsEvent),
}
/// Controls stats event handling and reporting
pub struct ClientStatsController {
//static infos
last_update_time: OffsetDateTime,
client_id: String,
client_type: String,
os_information: OsInformation,
// stats collection modules
packet_stats: packet_statistics::PacketStatisticsControl,
gateway_conn_stats: gateway_conn_statistics::GatewayStatsControl,
nym_api_stats: nym_api_statistics::NymApiStatsControl,
}
impl ClientStatsController {
/// Creates a ClientStatsController given a client_id
pub fn new(client_id: String, client_type: String) -> Self {
ClientStatsController {
last_update_time: ClientStatsController::get_update_time(),
client_id,
client_type,
os_information: OsInformation::new(),
packet_stats: Default::default(),
gateway_conn_stats: Default::default(),
nym_api_stats: Default::default(),
}
}
/// Returns a static ClientStatsReport that can be sent somewhere
pub fn build_report(&self) -> ClientStatsReport {
ClientStatsReport {
last_update_time: self.last_update_time,
client_id: self.client_id.clone(),
client_type: self.client_type.clone(),
os_information: self.os_information.clone(),
packet_stats: self.packet_stats.report(),
gateway_conn_stats: self.gateway_conn_stats.report(),
nym_api_stats: self.nym_api_stats.report(),
}
}
/// Handle and dispatch incoming stats event
pub fn handle_event(&mut self, stats_event: ClientStatsEvents) {
match stats_event {
ClientStatsEvents::PacketStatistics(event) => self.packet_stats.handle_event(event),
ClientStatsEvents::GatewayConn(event) => self.gateway_conn_stats.handle_event(event),
ClientStatsEvents::NymApi(event) => self.nym_api_stats.handle_event(event),
}
}
/// Reset the metrics to their initial state.
///
/// Used to periodically reset the metrics in accordance with periodic reporting strategy
pub fn reset(&mut self) {
self.nym_api_stats = Default::default();
self.gateway_conn_stats = Default::default();
//no periodic reset for packet stats
self.last_update_time = ClientStatsController::get_update_time();
}
/// snapshot the current state of the metrics for module that needs it
pub fn snapshot(&mut self) {
//no snapshot for gateway_conn_stats
//no snapshot for nym_api_stats
self.packet_stats.snapshot();
}
pub fn local_report(&mut self, task_client: &mut TaskClient) {
self.packet_stats.local_report(task_client);
self.gateway_conn_stats.local_report();
self.nym_api_stats.local_report();
}
fn get_update_time() -> OffsetDateTime {
let now = OffsetDateTime::now_utc();
#[allow(clippy::unwrap_used)]
//Safety : 0 is always a valid number of seconds, hours and minutes comes from a valid source
let new_time = Time::from_hms(now.hour(), now.minute(), 0).unwrap();
//allows a bigger anonymity by hiding exact sending time
now.replace_time(new_time)
}
}
@@ -1,83 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! # API Connection statistics
//!
//! Metrics collected by the client while attempting to pull config from the API.
use super::ClientStatsEvents;
use std::collections::VecDeque;
use nym_metrics::{inc, inc_by};
use serde::{Deserialize, Serialize};
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub(crate) struct NymApiStats {
// Sent
real_packets_sent: u64,
real_packets_sent_size: usize,
/// API connection failure statistics
failures: VecDeque<()>, // TODO
}
impl NymApiStats {
fn handle(&mut self, event: NymApiStatsEvent) {
match event {
NymApiStatsEvent::RealPacketSent(packet_size) => {
self.real_packets_sent += 1;
self.real_packets_sent_size += packet_size;
inc!("real_packets_sent");
inc_by!("real_packets_sent_size", packet_size);
}
}
}
fn summary(&self) -> (String, String) {
(
format!("packets sent: {}", self.real_packets_sent,),
"packets received: todo".to_owned(),
)
}
}
/// Event space for Nym API statistics tracking
#[derive(Debug)]
pub enum NymApiStatsEvent {
/// The real packets sent. Recall that acks are sent by the Api, so it's not included here.
RealPacketSent(usize),
}
impl From<NymApiStatsEvent> for ClientStatsEvents {
fn from(event: NymApiStatsEvent) -> ClientStatsEvents {
ClientStatsEvents::NymApi(event)
}
}
/// Nym API statistics tracking object
#[derive(Default)]
pub struct NymApiStatsControl {
// Keep track of packet statistics over time
stats: NymApiStats,
}
impl NymApiStatsControl {
pub(crate) fn handle_event(&mut self, event: NymApiStatsEvent) {
self.stats.handle(event)
}
pub(crate) fn report(&self) -> NymApiStats {
self.stats.clone()
}
pub(crate) fn local_report(&self) {
self.report_counters();
}
fn report_counters(&self) {
log::trace!("packet statistics: {:?}", &self.stats);
let (summary_sent, summary_recv) = self.stats.summary();
log::debug!("{}", summary_sent);
log::debug!("{}", summary_recv);
}
}
-15
View File
@@ -1,15 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use thiserror::Error;
/// Error types occurring while processing statistics events and reporting.
#[derive(Debug, Error)]
#[allow(missing_docs)]
pub enum StatsError {
#[error("Failed to (de)serialize stats report : {0}")]
ReportJsonSerialization(#[from] serde_json::Error),
}
/// Result of a statistics operation.
pub type Result<T> = core::result::Result<T, StatsError>;
+54
View File
@@ -0,0 +1,54 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use futures::channel::mpsc;
use nym_credentials_interface::TicketType;
use nym_sphinx::DestinationAddressBytes;
use time::OffsetDateTime;
pub type StatsEventSender = mpsc::UnboundedSender<StatsEvent>;
pub type StatsEventReceiver = mpsc::UnboundedReceiver<StatsEvent>;
pub enum StatsEvent {
SessionStatsEvent(SessionEvent),
}
impl StatsEvent {
pub fn new_session_start(client: DestinationAddressBytes) -> StatsEvent {
StatsEvent::SessionStatsEvent(SessionEvent::SessionStart {
start_time: OffsetDateTime::now_utc(),
client,
})
}
pub fn new_session_stop(client: DestinationAddressBytes) -> StatsEvent {
StatsEvent::SessionStatsEvent(SessionEvent::SessionStop {
stop_time: OffsetDateTime::now_utc(),
client,
})
}
pub fn new_ecash_ticket(
client: DestinationAddressBytes,
ticket_type: TicketType,
) -> StatsEvent {
StatsEvent::SessionStatsEvent(SessionEvent::EcashTicket {
ticket_type,
client,
})
}
}
pub enum SessionEvent {
SessionStart {
start_time: OffsetDateTime,
client: DestinationAddressBytes,
},
SessionStop {
stop_time: OffsetDateTime,
client: DestinationAddressBytes,
},
EcashTicket {
ticket_type: TicketType,
client: DestinationAddressBytes,
},
}
-89
View File
@@ -1,89 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_credentials_interface::TicketType;
use nym_sphinx::DestinationAddressBytes;
use time::OffsetDateTime;
/// Channel for receiving incoming Stats events
pub type GatewayStatsReceiver = tokio::sync::mpsc::UnboundedReceiver<GatewayStatsEvent>;
/// Channel allowing for generic statistics events to be reported to a stats event aggregator.
#[derive(Clone)]
pub struct GatewayStatsReporter {
stats_tx: tokio::sync::mpsc::UnboundedSender<GatewayStatsEvent>,
}
impl GatewayStatsReporter {
/// Construct a new gateway statistics event reporter
pub fn new(stats_tx: tokio::sync::mpsc::UnboundedSender<GatewayStatsEvent>) -> Self {
Self { stats_tx }
}
/// Report a gateway statistivs event using the reporter
pub fn report(&self, event: GatewayStatsEvent) {
self.stats_tx.send(event).unwrap_or_else(|err| {
log::error!("Failed to report gateway stat event : {err}");
});
}
}
/// Gateway Statistics events
pub enum GatewayStatsEvent {
/// Events in the lifecycle of an established client tunnel
SessionStatsEvent(SessionEvent),
}
impl GatewayStatsEvent {
/// A new session between this gateway and the client remote has successfully opened
pub fn new_session_start(client: DestinationAddressBytes) -> GatewayStatsEvent {
GatewayStatsEvent::SessionStatsEvent(SessionEvent::SessionStart {
start_time: OffsetDateTime::now_utc(),
client,
})
}
/// An existing session with the client remote has ended
pub fn new_session_stop(client: DestinationAddressBytes) -> GatewayStatsEvent {
GatewayStatsEvent::SessionStatsEvent(SessionEvent::SessionStop {
stop_time: OffsetDateTime::now_utc(),
client,
})
}
/// A new ecash ticket has been added / requested
pub fn new_ecash_ticket(
client: DestinationAddressBytes,
ticket_type: TicketType,
) -> GatewayStatsEvent {
GatewayStatsEvent::SessionStatsEvent(SessionEvent::EcashTicket {
ticket_type,
client,
})
}
}
/// Events in the lifecycle of an established client tunnel
pub enum SessionEvent {
/// A new session between this gateway and the client remote has successfully opened
SessionStart {
/// The timestamp of the session open event
start_time: OffsetDateTime,
/// Address of the remote client opening the connection
client: DestinationAddressBytes,
},
/// An existing session with the client remote has ended
SessionStop {
/// Timestamp of the session end event
stop_time: OffsetDateTime,
/// Address of the remote client opening the connection
client: DestinationAddressBytes,
},
/// A new ecash ticket has been added / requested
EcashTicket {
/// Type of ecash ticket that has been created as part of the session
ticket_type: TicketType,
/// Address of the remote client opening the connection
client: DestinationAddressBytes,
},
}
+2 -19
View File
@@ -1,21 +1,4 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// SPDX-License-Identifier: GPL-3.0-only
//! Nym Statistics
//!
//! This crate contains basic statistics utilities and abstractions to be re-used and
//! applied throughout both the client and gateway implementations.
#![warn(clippy::expect_used)]
#![warn(clippy::unwrap_used)]
#![warn(clippy::todo)]
#![warn(clippy::dbg_macro)]
/// Client specific statistics interfaces and events.
pub mod clients;
/// Statistics related errors.
pub mod error;
/// Gateway specific statistics interfaces and events.
pub mod gateways;
/// Statistics reporting abstractions and implementations.
pub mod report;
pub mod events;
-64
View File
@@ -1,64 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::clients::{
gateway_conn_statistics::GatewayStats, nym_api_statistics::NymApiStats,
packet_statistics::PacketStatistics,
};
use super::error::StatsError;
use serde::{Deserialize, Serialize};
use sysinfo::System;
use time::OffsetDateTime;
/// Report object containing both data to be reported and client / device context. We take extra care not to overcapture context information.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClientStatsReport {
pub(crate) last_update_time: OffsetDateTime,
pub(crate) client_id: String,
pub(crate) client_type: String,
pub(crate) os_information: OsInformation,
pub(crate) packet_stats: PacketStatistics,
pub(crate) gateway_conn_stats: GatewayStats,
pub(crate) nym_api_stats: NymApiStats,
}
impl From<ClientStatsReport> for Vec<u8> {
fn from(value: ClientStatsReport) -> Self {
// safety, no custom serialisation
#[allow(clippy::unwrap_used)]
let report_json = serde_json::to_string(&value).unwrap();
report_json.as_bytes().to_vec()
}
}
impl TryFrom<&[u8]> for ClientStatsReport {
type Error = StatsError;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
Ok(serde_json::from_slice(value)?)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OsInformation {
pub(crate) os_type: String,
pub(crate) os_version: Option<String>,
pub(crate) os_arch: Option<String>,
}
impl OsInformation {
pub fn new() -> Self {
OsInformation {
os_type: System::distribution_id(),
os_version: System::long_os_version(),
os_arch: System::cpu_arch(),
}
}
}
impl Default for OsInformation {
fn default() -> Self {
Self::new()
}
}
-35
View File
@@ -1,35 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::{any::Any, fmt};
pub type SentStatus = Box<dyn TaskStatusEvent>;
pub type StatusSender = futures::channel::mpsc::Sender<SentStatus>;
pub type StatusReceiver = futures::channel::mpsc::Receiver<SentStatus>;
pub trait TaskStatusEvent: Send + Sync + Any + fmt::Display {
fn as_any(&self) -> &dyn Any;
}
#[derive(Debug, PartialEq, Eq)]
pub enum TaskStatus {
Ready,
ReadyWithGateway(String),
}
impl fmt::Display for TaskStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TaskStatus::Ready => write!(f, "Ready"),
TaskStatus::ReadyWithGateway(gateway) => {
write!(f, "Ready and connected to gateway: {gateway}")
}
}
}
}
impl TaskStatusEvent for TaskStatus {
fn as_any(&self) -> &dyn Any {
self
}
}
+1 -3
View File
@@ -2,14 +2,12 @@
// SPDX-License-Identifier: Apache-2.0
pub mod connections;
pub mod event;
pub mod manager;
#[cfg(not(target_arch = "wasm32"))]
pub mod signal;
pub mod spawn;
pub use event::{StatusReceiver, StatusSender, TaskStatus, TaskStatusEvent};
pub use manager::{TaskClient, TaskHandle, TaskManager};
pub use manager::{StatusReceiver, StatusSender, TaskClient, TaskHandle, TaskManager};
#[cfg(not(target_arch = "wasm32"))]
pub use signal::wait_for_signal_and_error;
+15 -8
View File
@@ -1,21 +1,15 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::{
error::Error,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use futures::{future::pending, FutureExt, SinkExt, StreamExt};
use log::{log, Level};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{error::Error, time::Duration};
use tokio::sync::{
mpsc,
watch::{self, error::SendError},
};
use crate::event::{SentStatus, StatusReceiver, StatusSender, TaskStatus};
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, timeout};
@@ -28,6 +22,10 @@ pub(crate) type SentError = Box<dyn Error + Send + Sync>;
type ErrorSender = mpsc::UnboundedSender<SentError>;
type ErrorReceiver = mpsc::UnboundedReceiver<SentError>;
pub type SentStatus = Box<dyn Error + Send + Sync>;
pub type StatusSender = futures::channel::mpsc::Sender<SentStatus>;
pub type StatusReceiver = futures::channel::mpsc::Receiver<SentStatus>;
fn try_recover_name(name: &Option<String>) -> String {
if let Some(name) = name {
name.clone()
@@ -42,6 +40,15 @@ enum TaskError {
UnexpectedHalt { shutdown_name: Option<String> },
}
// TODO: possibly we should create a `Status` trait instead of reusing `Error`
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum TaskStatus {
#[error("Ready")]
Ready,
#[error("Ready and connected to gateway: {0}")]
ReadyWithGateway(String),
}
/// Listens to status and error messages from tasks, as well as notifying them to gracefully
/// shutdown. Keeps track of if task stop unexpectedly, such as in a panic.
#[derive(Debug)]
-4
View File
@@ -286,10 +286,6 @@ impl NymTopology {
self.get_gateway(gateway_identity).is_some()
}
pub fn insert_gateway(&mut self, gateway: gateway::LegacyNode) {
self.gateways.push(gateway)
}
pub fn set_gateways(&mut self, gateways: Vec<gateway::LegacyNode>) {
self.gateways = gateways
}
+1 -1
View File
@@ -116,7 +116,7 @@ impl<'a> TryFrom<&'a SkimmedNode> for LegacyNode {
});
}
let layer = match value.role {
let layer = match value.epoch_role {
NodeRole::Mixnode { layer } => layer
.try_into()
.map_err(|_| MixnodeConversionError::InvalidLayer)?,
+2 -51
View File
@@ -17,7 +17,7 @@ pub use nym_client_core::config::{
Acknowledgements as ConfigAcknowledgements, Config as BaseClientConfig,
CoverTraffic as ConfigCoverTraffic, DebugConfig as ConfigDebug,
GatewayConnection as ConfigGatewayConnection, ReplySurbs as ConfigReplySurbs,
StatsReporting as ConfigStatsReporting, Topology as ConfigTopology, Traffic as ConfigTraffic,
Topology as ConfigTopology, Traffic as ConfigTraffic,
};
pub fn new_base_client_config(
@@ -86,7 +86,7 @@ pub fn no_cover_debug_obj() -> JsValue {
// just a helper structure to more easily pass through the JS boundary
#[wasm_bindgen(inspectable)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DebugWasm {
/// Defines all configuration options related to traffic streams.
@@ -106,10 +106,6 @@ pub struct DebugWasm {
/// Defines all configuration options related to reply SURBs.
pub reply_surbs: ReplySurbsWasm,
/// Defines all configuration options related to stats reporting.
#[wasm_bindgen(getter_with_clone)]
pub stats_reporting: StatsReportingWasm,
}
impl Default for DebugWasm {
@@ -127,7 +123,6 @@ impl From<DebugWasm> for ConfigDebug {
acknowledgements: debug.acknowledgements.into(),
topology: debug.topology.into(),
reply_surbs: debug.reply_surbs.into(),
stats_reporting: debug.stats_reporting.into(),
}
}
}
@@ -141,7 +136,6 @@ impl From<ConfigDebug> for DebugWasm {
acknowledgements: debug.acknowledgements.into(),
topology: debug.topology.into(),
reply_surbs: debug.reply_surbs.into(),
stats_reporting: debug.stats_reporting.into(),
}
}
}
@@ -511,46 +505,3 @@ impl From<ConfigReplySurbs> for ReplySurbsWasm {
}
}
}
#[wasm_bindgen(inspectable)]
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct StatsReportingWasm {
/// Is stats reporting enabled
pub enabled: bool,
/// Address of the stats collector. If this is none, no reporting will happen, regardless of `enabled`
#[wasm_bindgen(getter_with_clone)]
pub provider_address: Option<String>,
/// With what frequence will statistics be sent
pub reporting_interval_ms: u32,
}
impl Default for StatsReportingWasm {
fn default() -> Self {
ConfigStatsReporting::default().into()
}
}
impl From<StatsReportingWasm> for ConfigStatsReporting {
fn from(stats_reporting: StatsReportingWasm) -> Self {
ConfigStatsReporting {
enabled: stats_reporting.enabled,
provider_address: stats_reporting
.provider_address
.map(|address| address.parse().expect("Invalid provider address")),
reporting_interval: Duration::from_millis(stats_reporting.reporting_interval_ms as u64),
}
}
}
impl From<ConfigStatsReporting> for StatsReportingWasm {
fn from(stats_reporting: ConfigStatsReporting) -> Self {
StatsReportingWasm {
enabled: stats_reporting.enabled,
provider_address: stats_reporting.provider_address.map(|r| r.to_string()),
reporting_interval_ms: stats_reporting.reporting_interval.as_millis() as u32,
}
}
}
+2 -38
View File
@@ -9,14 +9,14 @@
use super::{
AcknowledgementsWasm, CoverTrafficWasm, DebugWasm, GatewayConnectionWasm, ReplySurbsWasm,
StatsReportingWasm, TopologyWasm, TrafficWasm,
TopologyWasm, TrafficWasm,
};
use crate::config::ConfigDebug;
use serde::{Deserialize, Serialize};
use tsify::Tsify;
// just a helper structure to more easily pass through the JS boundary
#[derive(Tsify, Debug, Clone, Serialize, Deserialize)]
#[derive(Tsify, Debug, Copy, Clone, Serialize, Deserialize)]
#[tsify(into_wasm_abi, from_wasm_abi)]
#[serde(rename_all = "camelCase")]
pub struct DebugWasmOverride {
@@ -43,10 +43,6 @@ pub struct DebugWasmOverride {
/// Defines all configuration options related to reply SURBs.
#[tsify(optional)]
pub reply_surbs: Option<ReplySurbsWasmOverride>,
/// Defines all configuration options related to stats reporting.
#[tsify(optional)]
pub stats_reporting: Option<StatsReportingWasmOverride>,
}
impl From<DebugWasmOverride> for DebugWasm {
@@ -58,7 +54,6 @@ impl From<DebugWasmOverride> for DebugWasm {
acknowledgements: value.acknowledgements.map(Into::into).unwrap_or_default(),
topology: value.topology.map(Into::into).unwrap_or_default(),
reply_surbs: value.reply_surbs.map(Into::into).unwrap_or_default(),
stats_reporting: value.stats_reporting.map(Into::into).unwrap_or_default(),
}
}
}
@@ -371,34 +366,3 @@ impl From<ReplySurbsWasmOverride> for ReplySurbsWasm {
}
}
}
#[derive(Tsify, Debug, Clone, Serialize, Deserialize)]
#[tsify(into_wasm_abi, from_wasm_abi)]
#[serde(rename_all = "camelCase")]
pub struct StatsReportingWasmOverride {
/// Is stats reporting enabled
#[tsify(optional)]
pub enabled: Option<bool>,
/// Address of the stats collector. If this is none, no reporting will happen, regardless of `enabled`
#[tsify(optional)]
pub provider_address: Option<Option<String>>,
/// With what frequence will statistics be sent
#[tsify(optional)]
pub reporting_interval_ms: Option<u32>,
}
impl From<StatsReportingWasmOverride> for StatsReportingWasm {
fn from(value: StatsReportingWasmOverride) -> Self {
let def = StatsReportingWasm::default();
StatsReportingWasm {
enabled: value.enabled.unwrap_or(def.enabled),
provider_address: value.provider_address.unwrap_or(def.provider_address),
reporting_interval_ms: value
.reporting_interval_ms
.unwrap_or(def.reporting_interval_ms),
}
}
}
+1 -1
View File
@@ -68,7 +68,7 @@ pub async fn current_network_topology_async(
let api_client = NymApiClient::new(url);
let mixnodes = api_client
.get_all_basic_active_mixing_assigned_nodes(None)
.get_basic_active_mixing_assigned_nodes(None)
.await?;
let gateways = api_client.get_all_basic_entry_assigned_nodes(None).await?;
+6 -14
View File
@@ -1,7 +1,7 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::net::{IpAddr, SocketAddr};
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug)]
pub struct Config {
@@ -9,23 +9,15 @@ pub struct Config {
/// default: `0.0.0.0:51822`
pub bind_address: SocketAddr,
/// Private IPv4 address of the wireguard gateway.
/// Private IP address of the wireguard gateway.
/// default: `10.1.0.1`
pub private_ipv4: Ipv4Addr,
/// Private IPv6 address of the wireguard gateway.
/// default: `fc01::1`
pub private_ipv6: Ipv6Addr,
pub private_ip: IpAddr,
/// Port announced to external clients wishing to connect to the wireguard interface.
/// Useful in the instances where the node is behind a proxy.
pub announced_port: u16,
/// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv4.
/// The maximum value for IPv4 is 32
pub private_network_prefix_v4: u8,
/// The prefix denoting the maximum number of the clients that can be connected via Wireguard using IPv6.
/// The maximum value for IPv6 is 128
pub private_network_prefix_v6: u8,
/// The prefix denoting the maximum number of the clients that can be connected via Wireguard.
/// The maximum value for IPv4 is 32 and for IPv6 is 128
pub private_network_prefix: u8,
}
-3
View File
@@ -23,9 +23,6 @@ futures = { workspace = true }
x25519-dalek = { workspace = true }
ip_network = { workspace = true }
log.workspace = true
serde = { workspace = true, features = [
"derive",
] } # for config serialization/deserialization
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-stream = { workspace = true }
-27
View File
@@ -1,27 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use defguard_wireguard_rs::{host::Peer, key::Key};
use serde::{Deserialize, Serialize};
#[derive(Default, Clone, Serialize, Deserialize)]
pub struct Hosts {
host_v4: defguard_wireguard_rs::host::Host,
host_v6: defguard_wireguard_rs::host::Host,
}
impl Hosts {
pub fn new(
host_v4: defguard_wireguard_rs::host::Host,
host_v6: defguard_wireguard_rs::host::Host,
) -> Self {
Self { host_v4, host_v6 }
}
pub fn get(&self, key: &Key) -> Option<&Peer> {
self.host_v4
.peers
.get(key)
.or_else(|| self.host_v6.peers.get(key))
}
}

Some files were not shown because too many files have changed in this diff Show More