Compare commits
43 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d39ec4a048 | |||
| c03c580b1f | |||
| e0cc094ac8 | |||
| f1fcaa309b | |||
| 8771c1dfa6 | |||
| aea5872ad0 | |||
| 9e9abd74d7 | |||
| 3832508af7 | |||
| 69a4e33b17 | |||
| 83385421ff | |||
| ec53b570dc | |||
| ebcc658f98 | |||
| 6a155721c6 | |||
| 1bb8b3a3ec | |||
| 8d1a16eb02 | |||
| 8d10cf70e9 | |||
| e32df10b4d | |||
| d1660c01e6 | |||
| 14378b1db9 | |||
| 35bbf5fd84 | |||
| c374a4935a | |||
| 513f4f652d | |||
| 82b9425ca6 | |||
| 615e98b166 | |||
| b11f6c6c70 | |||
| 2f5e8e0bcd | |||
| 812a8782b4 | |||
| 089c47cce7 | |||
| 833114372a | |||
| a7b57d7e58 | |||
| 84e10a654c | |||
| d724f94319 | |||
| d0692a567a | |||
| 2ae38b9e49 | |||
| ef5990658a | |||
| 658dec8299 | |||
| 447352b8d6 | |||
| d6bb0979d0 | |||
| fa1d47e941 | |||
| 44ec6d6bc8 | |||
| 6d47046a38 | |||
| 5cfd09cd99 | |||
| 40b4670d80 |
@@ -31,33 +31,26 @@ jobs:
|
||||
- name: Install Rust stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
target: wasm32-unknown-unknown
|
||||
override: true
|
||||
|
||||
- name: Install wasm-opt
|
||||
uses: ./.github/actions/install-wasm-opt
|
||||
with:
|
||||
version: '114'
|
||||
|
||||
- name: Install cosmwasm-check
|
||||
run: cargo install cosmwasm-check
|
||||
|
||||
- name: Build release contracts
|
||||
run: make contracts
|
||||
run: make publish-contracts
|
||||
|
||||
- name: Prepare build output
|
||||
shell: bash
|
||||
env:
|
||||
OUTPUT_DIR: ci-contract-builds/${{ github.ref_name }}
|
||||
run: |
|
||||
cp contracts/target/wasm32-unknown-unknown/release/mixnet_contract.wasm $OUTPUT_DIR
|
||||
cp contracts/target/wasm32-unknown-unknown/release/vesting_contract.wasm $OUTPUT_DIR
|
||||
cp contracts/target/wasm32-unknown-unknown/release/nym_coconut_dkg.wasm $OUTPUT_DIR
|
||||
cp contracts/target/wasm32-unknown-unknown/release/cw3_flex_multisig.wasm $OUTPUT_DIR
|
||||
cp contracts/target/wasm32-unknown-unknown/release/cw4_group.wasm $OUTPUT_DIR
|
||||
cp contracts/target/wasm32-unknown-unknown/release/nym_ecash.wasm $OUTPUT_DIR
|
||||
cp contracts/target/wasm32-unknown-unknown/release/nym_pool_contract.wasm $OUTPUT_DIR
|
||||
cp contracts/target/wasm32-unknown-unknown/release/nym_performance_contract.wasm $OUTPUT_DIR
|
||||
find contracts/artifacts -maxdepth 1 -type f -name '*.wasm' -exec cp {} $OUTPUT_DIR \;
|
||||
# Also include the optimizer-generated checksums if present
|
||||
if [ -f contracts/artifacts/checksums.txt ]; then
|
||||
cp contracts/artifacts/checksums.txt $OUTPUT_DIR
|
||||
fi
|
||||
|
||||
- name: Deploy branch to CI www
|
||||
continue-on-error: true
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
name: ci-nym-wallet-storybook
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
paths:
|
||||
- 'nym-wallet/**'
|
||||
- '.github/workflows/ci-nym-wallet-storybook.yml'
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: custom-linux
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Install rsync
|
||||
run: sudo apt-get install rsync
|
||||
continue-on-error: true
|
||||
|
||||
- uses: rlespinasse/github-slug-action@v3.x
|
||||
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: 20
|
||||
|
||||
- name: Setup yarn
|
||||
run: npm install -g yarn
|
||||
|
||||
- name: Install Rust stable
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
|
||||
- name: Install wasm-pack
|
||||
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
|
||||
|
||||
- name: Build dependencies
|
||||
run: yarn && yarn build
|
||||
|
||||
- name: Build storybook
|
||||
run: yarn storybook:build
|
||||
working-directory: ./nym-wallet
|
||||
|
||||
- name: Deploy branch to CI www (storybook)
|
||||
continue-on-error: true
|
||||
uses: easingthemes/ssh-deploy@main
|
||||
env:
|
||||
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
|
||||
ARGS: "-rltgoDzvO --delete"
|
||||
SOURCE: "nym-wallet/storybook-static/"
|
||||
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
|
||||
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
|
||||
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/wallet-${{ env.GITHUB_REF_SLUG }}
|
||||
EXCLUDE: "/dist/, /node_modules/"
|
||||
|
||||
- name: Matrix - Node Install
|
||||
run: npm install
|
||||
working-directory: .github/workflows/support-files
|
||||
|
||||
- name: Matrix - Send Notification
|
||||
env:
|
||||
NYM_NOTIFICATION_KIND: nym-wallet
|
||||
NYM_PROJECT_NAME: "nym-wallet"
|
||||
NYM_CI_WWW_BASE: "${{ secrets.NYM_CI_WWW_BASE }}"
|
||||
NYM_CI_WWW_LOCATION: "wallet-${{ env.GITHUB_REF_SLUG }}"
|
||||
GIT_COMMIT_MESSAGE: "${{ github.event.head_commit.message }}"
|
||||
GIT_BRANCH: "${GITHUB_REF##*/}"
|
||||
IS_SUCCESS: "${{ job.status == 'success' }}"
|
||||
MATRIX_SERVER: "${{ secrets.MATRIX_SERVER }}"
|
||||
MATRIX_ROOM: "${{ secrets.MATRIX_ROOM }}"
|
||||
MATRIX_USER_ID: "${{ secrets.MATRIX_USER_ID }}"
|
||||
MATRIX_TOKEN: "${{ secrets.MATRIX_TOKEN }}"
|
||||
MATRIX_DEVICE_ID: "${{ secrets.MATRIX_DEVICE_ID }}"
|
||||
uses: docker://keybaseio/client:stable-node
|
||||
with:
|
||||
args: .github/workflows/support-files/notifications/entry_point.sh
|
||||
@@ -0,0 +1,19 @@
|
||||
name: Run SonarQube Scan
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- develop
|
||||
# pull_request:
|
||||
# types: [opened, synchronize, reopened]
|
||||
jobs:
|
||||
sonarqube:
|
||||
name: SonarQube
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
|
||||
- name: SonarQube Scan
|
||||
uses: SonarSource/sonarqube-scan-action@v5
|
||||
env:
|
||||
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
|
||||
@@ -49,6 +49,8 @@ jobs:
|
||||
run: |
|
||||
curl -L0 https://www.ssl.com/download/codesigntool-for-linux-and-macos/ -o codesigntool.zip
|
||||
unzip codesigntool.zip
|
||||
chmod +x CodeSignTool.sh
|
||||
|
||||
- name: Get EV certificate credential id
|
||||
working-directory: nym-wallet/src-tauri
|
||||
if: ${{ inputs.sign }}
|
||||
@@ -56,6 +58,7 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
echo "SSL_COM_CREDENTIAL_ID=$(./CodeSignTool.sh get_credential_ids -username=${{ secrets.SSL_COM_USERNAME }} -password=${{ secrets.SSL_COM_PASSWORD }} | sed -n '1!p' | sed 's/- //')" >> "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Add custom sign command to tauri.conf.json
|
||||
working-directory: nym-wallet/src-tauri
|
||||
if: ${{ inputs.sign }}
|
||||
@@ -79,6 +82,7 @@ jobs:
|
||||
]
|
||||
}
|
||||
}' tauri.conf.json
|
||||
|
||||
- name: Install project dependencies
|
||||
shell: bash
|
||||
run: cd .. && yarn --network-timeout 100000
|
||||
@@ -93,12 +97,14 @@ jobs:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
TAURI_SIGNING_PRIVATE_KEY: ${{ secrets.TAURI_PRIVATE_KEY }}
|
||||
TAURI_SIGNING_PRIVATE_KEY_PASSWORD: ${{ secrets.TAURI_KEY_PASSWORD }}
|
||||
SSL_COM_USERNAME: ${{ inputs.sign && secrets.SSL_COM_USERNAME }}
|
||||
SSL_COM_PASSWORD: ${{ inputs.sign && secrets.SSL_COM_PASSWORD }}
|
||||
SSL_COM_CREDENTIAL_ID: ${{ inputs.sign && steps.get_credential_ids.outputs.SSL_COM_CREDENTIAL_ID }}
|
||||
SSL_COM_TOTP_SECRET: ${{ inputs.sign && secrets.SSL_COM_TOTP_SECRET }}
|
||||
SSL_COM_USERNAME: ${{ inputs.sign && secrets.SSL_COM_USERNAME || '' }}
|
||||
SSL_COM_PASSWORD: ${{ inputs.sign && secrets.SSL_COM_PASSWORD || '' }}
|
||||
SSL_COM_CREDENTIAL_ID: ${{ inputs.sign && steps.get_credential_ids.outputs.SSL_COM_CREDENTIAL_ID || '' }}
|
||||
SSL_COM_TOTP_SECRET: ${{ inputs.sign && secrets.SSL_COM_TOTP_SECRET || '' }}
|
||||
CODE_SIGN_TOOL_PATH: ${{ inputs.sign && 'C:\\actions-runner\\_work\\nym\\nym\\nym-wallet\\src-tauri\\' || '' }}
|
||||
run: |
|
||||
echo "Starting build process..."
|
||||
echo "Signing enabled: ${{ inputs.sign }}"
|
||||
yarn build
|
||||
|
||||
- name: Check bundle directory
|
||||
@@ -147,7 +153,7 @@ jobs:
|
||||
nym-wallet/${{ env.BUNDLE_PATH }}/msi/*.msi.zip*
|
||||
nym-wallet/${{ env.BUNDLE_PATH }}/*/nym-wallet*.msi
|
||||
nym-wallet/src-tauri/target/release/bundle/msi/*.msi
|
||||
|
||||
|
||||
- name: Find MSI path for deployment
|
||||
id: find-msi
|
||||
shell: bash
|
||||
@@ -167,4 +173,4 @@ jobs:
|
||||
needs: publish-tauri
|
||||
with:
|
||||
release_tag: ${{ needs.publish-tauri.outputs.release_tag || github.ref_name }}
|
||||
secrets: inherit
|
||||
secrets: inherit
|
||||
|
||||
@@ -5,8 +5,15 @@ on:
|
||||
inputs:
|
||||
gateway_probe_git_ref:
|
||||
type: string
|
||||
default: nym-vpn-core-v1.4.0
|
||||
required: true
|
||||
description: Which gateway probe git ref to build the image with
|
||||
|
||||
release_image:
|
||||
description: 'Tag image as a release'
|
||||
required: true
|
||||
default: false
|
||||
type: boolean
|
||||
|
||||
env:
|
||||
WORKING_DIRECTORY: "nym-node-status-api/nym-node-status-agent"
|
||||
CONTAINER_NAME: "node-status-agent"
|
||||
@@ -43,19 +50,32 @@ jobs:
|
||||
GIT_REF_SLUG="${GATEWAY_PROBE_GIT_REF//\//-}"
|
||||
echo "git_ref=${GIT_REF_SLUG}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Remove existing tag if exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }} >/dev/null 2>&1; then
|
||||
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
|
||||
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
|
||||
fi
|
||||
- name: Set GIT_TAG variable
|
||||
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
|
||||
|
||||
- name: Create tag
|
||||
run: |
|
||||
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }} -m "Version ${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}"
|
||||
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
|
||||
- name: Set RELEASE_TAG variable
|
||||
if: github.event.inputs.release_image == 'true'
|
||||
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
|
||||
|
||||
- name: Set IMAGE_NAME_AND_TAGS variable
|
||||
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}" >> $GITHUB_ENV
|
||||
|
||||
- name: New env vars
|
||||
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
|
||||
|
||||
# - name: Remove existing tag if exists
|
||||
# run: |
|
||||
# if git rev-parse $${{ env.GIT_TAG }} >/dev/null 2>&1; then
|
||||
# git push --delete origin $${{ env.GIT_TAG }}
|
||||
# git tag -d $${{ env.GIT_TAG }}
|
||||
# fi
|
||||
|
||||
# - name: Create tag
|
||||
# run: |
|
||||
# git tag -a $${{ env.GIT_TAG }} -m "Version ${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}"
|
||||
# git push origin $${{ env.GIT_TAG }}
|
||||
|
||||
- name: BuildAndPushImageOnHarbor
|
||||
run: |
|
||||
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }}-${{ steps.cleanup_gateway_probe_ref.outputs.git_ref }}
|
||||
docker build --build-arg GIT_REF=${{ github.event.inputs.gateway_probe_git_ref }} -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }}
|
||||
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
name: Build and upload Node Status API container to harbor.nymte.ch
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
inputs:
|
||||
release_image:
|
||||
description: 'Tag image as a release'
|
||||
required: true
|
||||
default: false
|
||||
type: boolean
|
||||
|
||||
env:
|
||||
WORKING_DIRECTORY: "nym-node-status-api/nym-node-status-api"
|
||||
CONTAINER_NAME: "node-status-api"
|
||||
@@ -31,25 +37,35 @@ jobs:
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
- name: Check if tag exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
|
||||
echo "Tag ${{ steps.get_version.outputs.result }} already exists"
|
||||
fi
|
||||
- name: Set GIT_TAG variable
|
||||
run: echo "GIT_TAG=${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
|
||||
|
||||
- name: Remove existing tag if exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
|
||||
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
fi
|
||||
- name: Set RELEASE_TAG variable
|
||||
if: github.event.inputs.release_image == 'true'
|
||||
run: echo "RELEASE_TAG=golden-" >> $GITHUB_ENV
|
||||
|
||||
- name: Create tag
|
||||
run: |
|
||||
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
|
||||
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
- name: Set IMAGE_NAME_AND_TAGS variable
|
||||
run: echo "IMAGE_NAME_AND_TAGS=${{ env.CONTAINER_NAME }}:${{ env.RELEASE_TAG }}${{ steps.get_version.outputs.result }}" >> $GITHUB_ENV
|
||||
|
||||
- name: New env vars
|
||||
run: echo "RELEASE_TAG='$RELEASE_TAG' GIT_TAG='$GIT_TAG' IMAGE_NAME_AND_TAGS='$IMAGE_NAME_AND_TAGS'"
|
||||
|
||||
# - name: Remove existing tag if exists, then create
|
||||
# run: |
|
||||
# if git rev-parse "$GIT_TAG" >/dev/null 2>&1; then
|
||||
# echo "Tag '$GIT_TAG' already exists, deleting"
|
||||
# git push --delete origin "$GIT_TAG"
|
||||
# git tag -d "$GIT_TAG"
|
||||
# echo "Tag '$GIT_TAG' deleted"
|
||||
# else
|
||||
# echo "Tag '$GIT_TAG' does not exist, creating it"
|
||||
# git tag -a $GIT_TAG -m "Version ${{ steps.get_version.outputs.result }}"
|
||||
# git push origin $GIT_TAG
|
||||
# echo "Tag '$GIT_TAG' created"
|
||||
# fi
|
||||
|
||||
- name: BuildAndPushImageOnHarbor
|
||||
run: |
|
||||
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
|
||||
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.IMAGE_NAME_AND_TAGS }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
|
||||
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@
|
||||
>
|
||||
> ➡️➡️➡️➡️➡️ **View output:**
|
||||
>
|
||||
> `storybook`: https://{{ env.NYM_CI_WWW_LOCATION }}.{{ env.NYM_CI_WWW_BASE }}
|
||||
>
|
||||
> `branch` {{ env.GITHUB_SERVER_URL }}/{{ env.GITHUB_REPOSITORY }}/tree/{{ env.GIT_BRANCH_NAME }}
|
||||
>
|
||||
> `commit` {{ env.GITHUB_SERVER_URL }}/{{ env.GITHUB_REPOSITORY }}/commit/{{ env.GITHUB_SHA }}
|
||||
|
||||
@@ -35,6 +35,8 @@ validator-api/keypair
|
||||
contracts/mixnet/code_id
|
||||
contracts/mixnet/Justfile
|
||||
contracts/mixnet/Makefile
|
||||
artifacts
|
||||
contracts/artifacts
|
||||
validator-config
|
||||
*.patch
|
||||
validator-api-config.toml
|
||||
|
||||
@@ -4,6 +4,74 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [2025.12-dolcelatte] (2025-07-07)
|
||||
|
||||
- bugfix: key-rotation + reply SURBs ([#5876])
|
||||
- Bugfix/backwards compat ([#5865])
|
||||
- bugfix: allow gateways to permit authentication from v4 clients ([#5862])
|
||||
- fixed client route for obtaining v2 list of gateways ([#5859])
|
||||
- Updated browser extension piece removal ([#5849])
|
||||
- Remove/old env references ([#5848])
|
||||
- Remove qa env ([#5847])
|
||||
- remove not used old mock-api ([#5845])
|
||||
- remove bity dir ([#5844])
|
||||
- build(deps-dev): bump webpack-dev-server from 4.13.2 to 5.2.1 in /wasm/mix-fetch/internal-dev ([#5843])
|
||||
- Amended the buy section ([#5841])
|
||||
- Removing test-net faucet ([#5840])
|
||||
- Feature/node status dvpn directory ([#5829])
|
||||
- build(deps-dev): bump webpack-dev-server from 4.15.2 to 5.2.1 in /nym-credential-proxy/vpn-api-lib-wasm/internal-dev ([#5826])
|
||||
- bugfix: fix swapped total and circulating supplies ([#5822])
|
||||
- build(deps): bump tar-fs from 3.0.8 to 3.0.9 in /sdk/typescript/tests/integration-tests/mix-fetch ([#5821])
|
||||
- Url scheme warning log ([#5819])
|
||||
- chore: adjust heuristic for wireguard peer activity ([#5818])
|
||||
- Use the same client bandwidth for top up ([#5813])
|
||||
- Replace chrono with time in NS API ([#5811])
|
||||
- build(deps-dev): bump http-proxy-middleware from 2.0.4 to 2.0.9 in /clients/native/examples/js-examples/websocket ([#5810])
|
||||
- build(deps): bump tokio from 1.44.2 to 1.45.1 ([#5798])
|
||||
- Close sqlite pool before moving or reopening databases ([#5796])
|
||||
- HTTP Client Retries, Fallbacks, and Redirects ([#5789])
|
||||
- feat: key rotation ([#5777])
|
||||
- build(deps): bump next from 14.2.15 to 14.2.26 in /documentation/docs ([#5772])
|
||||
- build(deps): bump undici from 5.28.5 to 5.29.0 in /.github/actions/nym-hash-releases/src ([#5771])
|
||||
- build(deps): bump cargo_metadata from 0.18.1 to 0.19.2 ([#5765])
|
||||
- build(deps): bump tempfile from 3.19.1 to 3.20.0 ([#5764])
|
||||
- [Feature] Noise XKpsk3 integration (2025 version) ([#5692])
|
||||
- feature: nympool contract ([#5464])
|
||||
- chore: fixed typo in API endpoint parameter ([#5449])
|
||||
|
||||
[#5876]: https://github.com/nymtech/nym/pull/5876
|
||||
[#5865]: https://github.com/nymtech/nym/pull/5865
|
||||
[#5862]: https://github.com/nymtech/nym/pull/5862
|
||||
[#5859]: https://github.com/nymtech/nym/pull/5859
|
||||
[#5849]: https://github.com/nymtech/nym/pull/5849
|
||||
[#5848]: https://github.com/nymtech/nym/pull/5848
|
||||
[#5847]: https://github.com/nymtech/nym/pull/5847
|
||||
[#5845]: https://github.com/nymtech/nym/pull/5845
|
||||
[#5844]: https://github.com/nymtech/nym/pull/5844
|
||||
[#5843]: https://github.com/nymtech/nym/pull/5843
|
||||
[#5841]: https://github.com/nymtech/nym/pull/5841
|
||||
[#5840]: https://github.com/nymtech/nym/pull/5840
|
||||
[#5829]: https://github.com/nymtech/nym/pull/5829
|
||||
[#5826]: https://github.com/nymtech/nym/pull/5826
|
||||
[#5822]: https://github.com/nymtech/nym/pull/5822
|
||||
[#5821]: https://github.com/nymtech/nym/pull/5821
|
||||
[#5819]: https://github.com/nymtech/nym/pull/5819
|
||||
[#5818]: https://github.com/nymtech/nym/pull/5818
|
||||
[#5813]: https://github.com/nymtech/nym/pull/5813
|
||||
[#5811]: https://github.com/nymtech/nym/pull/5811
|
||||
[#5810]: https://github.com/nymtech/nym/pull/5810
|
||||
[#5798]: https://github.com/nymtech/nym/pull/5798
|
||||
[#5796]: https://github.com/nymtech/nym/pull/5796
|
||||
[#5789]: https://github.com/nymtech/nym/pull/5789
|
||||
[#5777]: https://github.com/nymtech/nym/pull/5777
|
||||
[#5772]: https://github.com/nymtech/nym/pull/5772
|
||||
[#5771]: https://github.com/nymtech/nym/pull/5771
|
||||
[#5765]: https://github.com/nymtech/nym/pull/5765
|
||||
[#5764]: https://github.com/nymtech/nym/pull/5764
|
||||
[#5692]: https://github.com/nymtech/nym/pull/5692
|
||||
[#5464]: https://github.com/nymtech/nym/pull/5464
|
||||
[#5449]: https://github.com/nymtech/nym/pull/5449
|
||||
|
||||
## [2025.11-cheddar] (2025-06-10)
|
||||
|
||||
- No autoremoval of peers ([#5831])
|
||||
|
||||
Generated
+20
-34
@@ -471,17 +471,6 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-recursion"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.98",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.6"
|
||||
@@ -3075,9 +3064,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.4.9"
|
||||
version = "0.4.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633"
|
||||
checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785"
|
||||
dependencies = [
|
||||
"atomic-waker",
|
||||
"bytes",
|
||||
@@ -3251,21 +3240,19 @@ checksum = "7ebdb29d2ea9ed0083cd8cece49bbd968021bd99b0849edb4a9a7ee0fdf6a4e0"
|
||||
|
||||
[[package]]
|
||||
name = "hickory-proto"
|
||||
version = "0.25.1"
|
||||
version = "0.25.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d844af74f7b799e41c78221be863bade11c430d46042c3b49ca8ae0c6d27287"
|
||||
checksum = "f8a6fe56c0038198998a6f217ca4e7ef3a5e51f46163bd6dd60b5c71ca6c6502"
|
||||
dependencies = [
|
||||
"async-recursion",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"cfg-if",
|
||||
"critical-section",
|
||||
"data-encoding",
|
||||
"enum-as-inner",
|
||||
"futures-channel",
|
||||
"futures-io",
|
||||
"futures-util",
|
||||
"h2 0.4.9",
|
||||
"h2 0.4.11",
|
||||
"http 1.3.1",
|
||||
"idna",
|
||||
"ipnet",
|
||||
@@ -4815,7 +4802,7 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
|
||||
|
||||
[[package]]
|
||||
name = "nym-api"
|
||||
version = "1.1.60"
|
||||
version = "1.1.61"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -5104,7 +5091,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-cli"
|
||||
version = "1.1.57"
|
||||
version = "1.1.58"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64 0.22.1",
|
||||
@@ -5186,7 +5173,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-client"
|
||||
version = "1.1.57"
|
||||
version = "1.1.58"
|
||||
dependencies = [
|
||||
"bs58",
|
||||
"clap",
|
||||
@@ -5233,15 +5220,13 @@ dependencies = [
|
||||
"futures",
|
||||
"gloo-timers",
|
||||
"http-body-util",
|
||||
"humantime-serde",
|
||||
"humantime",
|
||||
"hyper 1.6.0",
|
||||
"hyper-util",
|
||||
"log",
|
||||
"nym-bandwidth-controller",
|
||||
"nym-client-core-config-types",
|
||||
"nym-client-core-gateways-storage",
|
||||
"nym-client-core-surb-storage",
|
||||
"nym-config 0.1.0",
|
||||
"nym-credential-storage",
|
||||
"nym-credentials-interface 0.1.0",
|
||||
"nym-crypto 0.4.0",
|
||||
@@ -5250,7 +5235,6 @@ dependencies = [
|
||||
"nym-gateway-requests",
|
||||
"nym-http-api-client 0.1.0",
|
||||
"nym-id",
|
||||
"nym-metrics 0.1.0",
|
||||
"nym-mixnet-client",
|
||||
"nym-network-defaults 0.1.0",
|
||||
"nym-nonexhaustive-delayqueue",
|
||||
@@ -5303,7 +5287,6 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"cosmrs",
|
||||
"log",
|
||||
"nym-crypto 0.4.0",
|
||||
"nym-gateway-requests",
|
||||
"serde",
|
||||
@@ -5311,6 +5294,7 @@ dependencies = [
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"url",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -5321,7 +5305,6 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"dashmap",
|
||||
"log",
|
||||
"nym-crypto 0.4.0",
|
||||
"nym-sphinx 0.1.0",
|
||||
"nym-task 0.1.0",
|
||||
@@ -5330,6 +5313,7 @@ dependencies = [
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6135,6 +6119,7 @@ dependencies = [
|
||||
"futures",
|
||||
"mime",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_yaml",
|
||||
"subtle 2.6.1",
|
||||
"time",
|
||||
@@ -6473,7 +6458,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-network-requester"
|
||||
version = "1.1.58"
|
||||
version = "1.1.59"
|
||||
dependencies = [
|
||||
"addr",
|
||||
"anyhow",
|
||||
@@ -6523,7 +6508,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-node"
|
||||
version = "1.13.0"
|
||||
version = "1.14.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
@@ -6684,7 +6669,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-node-status-api"
|
||||
version = "3.1.0"
|
||||
version = "3.1.2"
|
||||
dependencies = [
|
||||
"ammonia",
|
||||
"anyhow",
|
||||
@@ -6998,6 +6983,7 @@ dependencies = [
|
||||
"tap",
|
||||
"tempfile",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
@@ -7060,7 +7046,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-socks5-client"
|
||||
version = "1.1.57"
|
||||
version = "1.1.58"
|
||||
dependencies = [
|
||||
"bs58",
|
||||
"clap",
|
||||
@@ -7648,17 +7634,16 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"nym-api-requests 0.1.0",
|
||||
"nym-config 0.1.0",
|
||||
"nym-crypto 0.4.0",
|
||||
"nym-mixnet-contract-common 0.6.0",
|
||||
"nym-sphinx-addressing 0.1.0",
|
||||
"nym-sphinx-routing 0.1.0",
|
||||
"nym-sphinx-types 0.2.0",
|
||||
"rand 0.8.5",
|
||||
"reqwest 0.12.15",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
"tracing",
|
||||
"tsify",
|
||||
"wasm-bindgen",
|
||||
@@ -8017,7 +8002,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nymvisor"
|
||||
version = "0.1.22"
|
||||
version = "0.1.23"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
@@ -10792,6 +10777,7 @@ dependencies = [
|
||||
"console",
|
||||
"cw-utils",
|
||||
"dkg-bypass-contract",
|
||||
"humantime",
|
||||
"indicatif",
|
||||
"nym-bin-common 0.6.0",
|
||||
"nym-coconut-dkg-common 0.1.0",
|
||||
|
||||
@@ -12,7 +12,11 @@ help:
|
||||
@echo " clippy: run clippy for all workspaces"
|
||||
@echo " test: run clippy, unit tests, and formatting."
|
||||
@echo " test-all: like test, but also includes the expensive tests"
|
||||
@echo " deb: build debian packages
|
||||
@echo " deb: build debian packages"
|
||||
@echo ""
|
||||
@echo "Contract building targets:"
|
||||
@echo " contracts: build contracts for development (includes wasm-opt)"
|
||||
@echo " publish-contracts: build contracts using Docker optimizer (deterministic)"
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Meta targets
|
||||
@@ -130,25 +134,69 @@ cargo-test: sdk-wasm-test
|
||||
clippy: sdk-wasm-lint
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Build contracts ready for deploy
|
||||
# Build CosmWasm contracts (deterministic docker build)
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
CONTRACTS=vesting_contract mixnet_contract nym_ecash cw3_flex_multisig cw4_group nym_coconut_dkg nym_pool_contract nym_performance_contract
|
||||
CONTRACTS_WASM=$(addsuffix .wasm, $(CONTRACTS))
|
||||
CONTRACTS_OUT_DIR=contracts/target/wasm32-unknown-unknown/release
|
||||
|
||||
contracts: build-release-contracts wasm-opt-contracts cosmwasm-check-contracts
|
||||
WASM_CONTRACT_DIR := contracts/target/wasm32-unknown-unknown/release
|
||||
# Find every direct contract folder that contains a Cargo.toml
|
||||
CONTRACT_DIRS := $(shell find contracts -type f -name Cargo.toml \( ! -path "contracts/Cargo.toml" \) | grep -v integration-tests | xargs -n1 dirname | sort -u)
|
||||
|
||||
CONTRACTS_OUT_DIR = contracts/artifacts
|
||||
|
||||
# Build all contracts via the official CosmWasm optimizer image (one invocation per contract)
|
||||
# See : https://github.com/CosmWasm/optimizer?tab=readme-ov-file#contracts-excluded-from-workspace
|
||||
# The optimizer ships separate multi-arch images. ARM builds are *not* bit-for-bit identical to the
|
||||
# canonical x86_64 build (see README notice in CosmWasm/optimizer). For reproducible artefacts we
|
||||
# therefore always run the amd64 variant by default.
|
||||
# Override with :
|
||||
# $ COSMWASM_OPTIMIZER_IMAGE=cosmwasm/optimizer-arm64:0.17.0 make contracts-publish
|
||||
#
|
||||
COSMWASM_OPTIMIZER_IMAGE ?= cosmwasm/optimizer:0.17.0
|
||||
COSMWASM_OPTIMIZER_PLATFORM ?= linux/amd64
|
||||
|
||||
# Ensure clean build environment and run the optimizer
|
||||
optimize-contracts:
|
||||
@rm -rf artifacts 2>/dev/null || true
|
||||
@echo "=== Ensuring clean build environment"
|
||||
docker volume rm nym_contracts_cache 2>/dev/null || true
|
||||
docker volume rm registry_cache 2>/dev/null || true
|
||||
@for DIR in $(CONTRACT_DIRS); do \
|
||||
echo "=== Optimizing $${DIR}"; \
|
||||
docker run --rm --platform $(COSMWASM_OPTIMIZER_PLATFORM) \
|
||||
-v $(CURDIR):/code \
|
||||
--mount type=volume,source=nym_contracts_cache,target=/target \
|
||||
--mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
|
||||
-e CARGO_BUILD_INCREMENTAL=false \
|
||||
-e RUSTFLAGS="-C target-cpu=generic -C debuginfo=0" \
|
||||
-e SOURCE_DATE_EPOCH=1 \
|
||||
$(COSMWASM_OPTIMIZER_IMAGE) $${DIR}; \
|
||||
done
|
||||
@mkdir -p $(CONTRACTS_OUT_DIR)
|
||||
@cp artifacts/*.wasm $(CONTRACTS_OUT_DIR)/ 2>/dev/null || true
|
||||
|
||||
@cd $(CONTRACTS_OUT_DIR) && sha256sum *.wasm > checksums.txt
|
||||
# Cleanup temporary artefacts directory
|
||||
@rm -rf artifacts 2>/dev/null || true
|
||||
|
||||
wasm-opt-contracts:
|
||||
for contract in $(CONTRACTS_WASM); do \
|
||||
wasm-opt --signext-lowering -Os $(CONTRACTS_OUT_DIR)/$$contract -o $(CONTRACTS_OUT_DIR)/$$contract; \
|
||||
@for WASM in $(WASM_CONTRACT_DIR)/*.wasm; do \
|
||||
echo "Running wasm-opt on $$WASM"; \
|
||||
wasm-opt --signext-lowering -Os $$WASM -o $$WASM ; \
|
||||
done
|
||||
|
||||
cosmwasm-check-contracts:
|
||||
for contract in $(CONTRACTS_WASM); do \
|
||||
cosmwasm-check $(CONTRACTS_OUT_DIR)/$$contract; \
|
||||
@for WASM in $(WASM_CONTRACT_DIR)/*.wasm; do \
|
||||
echo "Checking $$WASM"; \
|
||||
cosmwasm-check $$WASM ; \
|
||||
done
|
||||
|
||||
# Default development build
|
||||
contracts: build-release-contracts wasm-opt-contracts cosmwasm-check-contracts
|
||||
|
||||
# Publishing build used by CI – deterministic Docker optimiser
|
||||
publish-contracts: optimize-contracts cosmwasm-check-contracts
|
||||
|
||||
# Consider adding 's' to make plural consistent (beware: used in github workflow)
|
||||
contract-schema:
|
||||
$(MAKE) -C contracts schema
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nym-client"
|
||||
version = "1.1.57"
|
||||
version = "1.1.58"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
|
||||
description = "Implementation of the Nym Client"
|
||||
edition = "2021"
|
||||
|
||||
@@ -318,7 +318,7 @@ impl Handler {
|
||||
|
||||
async fn handle_text_message(&mut self, msg: String) -> Option<WsMessage> {
|
||||
debug!("Handling text message request");
|
||||
trace!("Content: {:?}", msg);
|
||||
trace!("Content: {msg:?}");
|
||||
|
||||
self.received_response_type = ReceivedResponseType::Text;
|
||||
let client_request = ClientRequest::try_from_text(msg);
|
||||
|
||||
@@ -68,9 +68,9 @@ impl Listener {
|
||||
new_conn = tcp_listener.accept() => {
|
||||
match new_conn {
|
||||
Ok((mut socket, remote_addr)) => {
|
||||
debug!("Received connection from {:?}", remote_addr);
|
||||
debug!("Received connection from {remote_addr:?}");
|
||||
if self.state.is_connected() {
|
||||
warn!("Tried to open a duplicate websocket connection. The request came from {}", remote_addr);
|
||||
warn!("Tried to open a duplicate websocket connection. The request came from {remote_addr}");
|
||||
// if we've already got a connection, don't allow another one
|
||||
// while we only ever want to accept a single connection, we don't want
|
||||
// to leave clients hanging (and also allow for reconnection if it somehow
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "nym-socks5-client"
|
||||
version = "1.1.57"
|
||||
version = "1.1.58"
|
||||
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
|
||||
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
|
||||
edition = "2021"
|
||||
|
||||
@@ -137,7 +137,7 @@ impl AsyncFileWatcher {
|
||||
log::error!("the file watcher receiver has been dropped!");
|
||||
}
|
||||
} else {
|
||||
log::debug!("will not propagate information about {:?}", event);
|
||||
log::debug!("will not propagate information about {event:?}");
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
@@ -11,7 +11,7 @@ impl std::fmt::Display for BandwidthStatusMessage {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
BandwidthStatusMessage::RemainingBandwidth(b) => {
|
||||
write!(f, "remaining bandwidth: {}", b)
|
||||
write!(f, "remaining bandwidth: {b}")
|
||||
}
|
||||
BandwidthStatusMessage::NoBandwidth => write!(f, "no bandwidth left"),
|
||||
}
|
||||
|
||||
@@ -15,8 +15,7 @@ bs58 = { workspace = true }
|
||||
clap = { workspace = true, optional = true }
|
||||
comfy-table = { workspace = true, optional = true }
|
||||
futures = { workspace = true }
|
||||
humantime-serde = { workspace = true }
|
||||
log = { workspace = true }
|
||||
humantime = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rand_chacha = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
@@ -25,20 +24,18 @@ sha2 = { workspace = true }
|
||||
si-scale = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
url = { workspace = true, features = ["serde"] }
|
||||
tokio = { workspace = true, features = ["macros"] }
|
||||
time = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync", "macros"] }
|
||||
tracing = { workspace = true }
|
||||
zeroize = { workspace = true }
|
||||
|
||||
# internal
|
||||
nym-id = { path = "../nym-id" }
|
||||
nym-bandwidth-controller = { path = "../bandwidth-controller" }
|
||||
nym-config = { path = "../config" }
|
||||
nym-crypto = { path = "../crypto" }
|
||||
nym-gateway-client = { path = "../client-libs/gateway-client" }
|
||||
nym-gateway-requests = { path = "../gateway-requests" }
|
||||
nym-http-api-client = { path = "../http-api-client" }
|
||||
nym-metrics = { path = "../nym-metrics" }
|
||||
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
|
||||
nym-sphinx = { path = "../nymsphinx" }
|
||||
nym-statistics-common = { path = "../statistics" }
|
||||
|
||||
@@ -57,9 +57,7 @@ const DEFAULT_MAXIMUM_ALLOWED_SURB_REQUEST_SIZE: u32 = 500;
|
||||
|
||||
const DEFAULT_MAXIMUM_REPLY_SURB_REREQUEST_WAITING_PERIOD: Duration = Duration::from_secs(10);
|
||||
const DEFAULT_MAXIMUM_REPLY_SURB_DROP_WAITING_PERIOD: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
// 12 hours
|
||||
const DEFAULT_MAXIMUM_REPLY_SURB_AGE: Duration = Duration::from_secs(12 * 60 * 60);
|
||||
const DEFAULT_MAXIMUM_REPLY_SURB_REREQUESTS: usize = 5;
|
||||
|
||||
// 24 hours
|
||||
const DEFAULT_MAXIMUM_REPLY_KEY_AGE: Duration = Duration::from_secs(24 * 60 * 60);
|
||||
@@ -418,6 +416,9 @@ pub struct Traffic {
|
||||
/// will be routed as usual, to the entry gateway, through three mix nodes, egressing
|
||||
/// through the exit gateway. If mix hops are disabled, traffic will be routed directly
|
||||
/// from the entry gateway to the exit gateway, bypassing the mix nodes.
|
||||
///
|
||||
/// This overrides the `use_legacy_sphinx_format` setting as reduced mix hops
|
||||
/// requires use of the updated SURB packet format.
|
||||
pub disable_mix_hops: bool,
|
||||
}
|
||||
|
||||
@@ -625,10 +626,9 @@ pub struct ReplySurbs {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub maximum_reply_surb_drop_waiting_period: Duration,
|
||||
|
||||
/// Defines maximum amount of time given reply surb is going to be valid for.
|
||||
/// This is going to be superseded by key rotation once implemented.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub maximum_reply_surb_age: Duration,
|
||||
/// Defines maximum number of times the client is going to re-request reply surbs
|
||||
/// for clearing pending messages before giving up after making no progress.
|
||||
pub maximum_reply_surbs_rerequests: usize,
|
||||
|
||||
/// Defines maximum amount of time given reply key is going to be valid for.
|
||||
/// This is going to be superseded by key rotation once implemented.
|
||||
@@ -638,9 +638,6 @@ pub struct ReplySurbs {
|
||||
/// Specifies the number of mixnet hops the packet should go through. If not specified, then
|
||||
/// the default value is used.
|
||||
pub surb_mix_hops: Option<u8>,
|
||||
|
||||
/// Specifies if we should reset all the sender tags on startup
|
||||
pub fresh_sender_tags: bool,
|
||||
}
|
||||
|
||||
impl Default for ReplySurbs {
|
||||
@@ -655,10 +652,9 @@ impl Default for ReplySurbs {
|
||||
maximum_reply_surb_rerequest_waiting_period:
|
||||
DEFAULT_MAXIMUM_REPLY_SURB_REREQUEST_WAITING_PERIOD,
|
||||
maximum_reply_surb_drop_waiting_period: DEFAULT_MAXIMUM_REPLY_SURB_DROP_WAITING_PERIOD,
|
||||
maximum_reply_surb_age: DEFAULT_MAXIMUM_REPLY_SURB_AGE,
|
||||
maximum_reply_surbs_rerequests: DEFAULT_MAXIMUM_REPLY_SURB_REREQUESTS,
|
||||
maximum_reply_key_age: DEFAULT_MAXIMUM_REPLY_KEY_AGE,
|
||||
surb_mix_hops: None,
|
||||
fresh_sender_tags: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,14 +189,13 @@ impl From<ConfigV6> for Config {
|
||||
.debug
|
||||
.reply_surbs
|
||||
.maximum_reply_surb_drop_waiting_period,
|
||||
maximum_reply_surb_age: value.debug.reply_surbs.maximum_reply_surb_age,
|
||||
maximum_reply_key_age: value.debug.reply_surbs.maximum_reply_key_age,
|
||||
surb_mix_hops: value.debug.reply_surbs.surb_mix_hops,
|
||||
minimum_reply_surb_threshold_buffer: value
|
||||
.debug
|
||||
.reply_surbs
|
||||
.minimum_reply_surb_threshold_buffer,
|
||||
fresh_sender_tags: value.debug.reply_surbs.fresh_sender_tags,
|
||||
..Default::default()
|
||||
},
|
||||
stats_reporting: StatsReporting {
|
||||
enabled: value.debug.stats_reporting.enabled,
|
||||
|
||||
@@ -9,11 +9,11 @@ license.workspace = true
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
cosmrs.workspace = true
|
||||
log.workspace = true
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
thiserror.workspace = true
|
||||
time.workspace = true
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
zeroize = { workspace = true, features = ["zeroize_derive"] }
|
||||
|
||||
|
||||
@@ -7,12 +7,12 @@ use crate::{
|
||||
RawActiveGateway, RawCustomGatewayDetails, RawRegisteredGateway, RawRemoteGatewayDetails,
|
||||
},
|
||||
};
|
||||
use log::{debug, error};
|
||||
use sqlx::{
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
ConnectOptions,
|
||||
};
|
||||
use std::path::Path;
|
||||
use tracing::{debug, error};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StorageManager {
|
||||
|
||||
@@ -12,12 +12,12 @@ use crate::{
|
||||
error::ClientCoreError,
|
||||
init::types::{GatewaySelectionSpecification, GatewaySetup},
|
||||
};
|
||||
use log::info;
|
||||
use nym_client_core_gateways_storage::GatewayDetails;
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use nym_topology::NymTopology;
|
||||
use nym_validator_client::UserAgent;
|
||||
use std::path::PathBuf;
|
||||
use tracing::info;
|
||||
|
||||
#[cfg_attr(feature = "cli", derive(clap::Args))]
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -81,14 +81,14 @@ where
|
||||
|
||||
// Attempt to use a user-provided gateway, if possible
|
||||
let user_chosen_gateway_id = common_args.gateway_id;
|
||||
log::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
|
||||
tracing::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
|
||||
|
||||
let selection_spec = GatewaySelectionSpecification::new(
|
||||
user_chosen_gateway_id.map(|id| id.to_base58_string()),
|
||||
Some(common_args.latency_based_selection),
|
||||
common_args.force_tls_gateway,
|
||||
);
|
||||
log::debug!("Gateway selection specification: {selection_spec:?}");
|
||||
tracing::debug!("Gateway selection specification: {selection_spec:?}");
|
||||
|
||||
let registered_gateways = get_all_registered_identities(&details_store).await?;
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ use crate::{
|
||||
},
|
||||
init::types::{GatewaySelectionSpecification, GatewaySetup, InitResults},
|
||||
};
|
||||
use log::info;
|
||||
use nym_client_core_gateways_storage::GatewayDetails;
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use nym_sphinx::addressing::Recipient;
|
||||
@@ -20,6 +19,7 @@ use nym_topology::NymTopology;
|
||||
use nym_validator_client::UserAgent;
|
||||
use rand::rngs::OsRng;
|
||||
use std::path::PathBuf;
|
||||
use tracing::info;
|
||||
|
||||
// we can suppress this warning (as suggested by linter itself) since we're only using it in our own code
|
||||
#[allow(async_fn_in_trait)]
|
||||
@@ -130,23 +130,23 @@ where
|
||||
|
||||
// Attempt to use a user-provided gateway, if possible
|
||||
let user_chosen_gateway_id = common_args.gateway;
|
||||
log::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
|
||||
tracing::debug!("User chosen gateway id: {user_chosen_gateway_id:?}");
|
||||
|
||||
let selection_spec = GatewaySelectionSpecification::new(
|
||||
user_chosen_gateway_id.map(|id| id.to_base58_string()),
|
||||
Some(common_args.latency_based_selection),
|
||||
common_args.force_tls_gateway,
|
||||
);
|
||||
log::debug!("Gateway selection specification: {selection_spec:?}");
|
||||
tracing::debug!("Gateway selection specification: {selection_spec:?}");
|
||||
|
||||
// Load and potentially override config
|
||||
log::debug!("Init arguments: {init_args:#?}");
|
||||
tracing::debug!("Init arguments: {init_args:#?}");
|
||||
let config = C::construct_config(&init_args);
|
||||
log::debug!("Constructed config: {config:#?}");
|
||||
tracing::debug!("Constructed config: {config:#?}");
|
||||
let paths = config.common_paths();
|
||||
let core = config.core_config();
|
||||
|
||||
log::info!(
|
||||
tracing::info!(
|
||||
"Using nym-api: {}",
|
||||
core.client
|
||||
.nym_api_urls
|
||||
|
||||
@@ -18,6 +18,7 @@ use crate::client::received_buffer::{
|
||||
ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
|
||||
};
|
||||
use crate::client::replies::reply_controller;
|
||||
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
|
||||
use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyControllerSender};
|
||||
use crate::client::replies::reply_storage::{
|
||||
CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys,
|
||||
@@ -34,7 +35,6 @@ use crate::init::{
|
||||
};
|
||||
use crate::{config, spawn_future};
|
||||
use futures::channel::mpsc;
|
||||
use log::*;
|
||||
use nym_bandwidth_controller::BandwidthController;
|
||||
use nym_client_core_config_types::{ForgetMe, RememberMe};
|
||||
use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
|
||||
@@ -56,13 +56,18 @@ use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender,
|
||||
use nym_task::{TaskClient, TaskHandle};
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_topology::HardcodedTopologyProvider;
|
||||
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
|
||||
use nym_validator_client::nym_api::NymApiClientExt;
|
||||
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, NymApiClient, UserAgent};
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::rngs::OsRng;
|
||||
use rand::thread_rng;
|
||||
use std::fmt::Debug;
|
||||
use std::os::raw::c_int as RawFd;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
use url::Url;
|
||||
|
||||
#[cfg(all(
|
||||
@@ -338,6 +343,7 @@ where
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn start_real_traffic_controller(
|
||||
controller_config: real_messages_control::Config,
|
||||
key_rotation_config: KeyRotationConfig,
|
||||
topology_accessor: TopologyAccessor,
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
input_receiver: InputMessageReceiver,
|
||||
@@ -355,6 +361,7 @@ where
|
||||
|
||||
RealMessagesController::new(
|
||||
controller_config,
|
||||
key_rotation_config,
|
||||
ack_receiver,
|
||||
input_receiver,
|
||||
mix_sender,
|
||||
@@ -453,7 +460,7 @@ where
|
||||
};
|
||||
|
||||
let gateway_failure = |err| {
|
||||
log::error!("Could not authenticate and start up the gateway connection - {err}");
|
||||
tracing::error!("Could not authenticate and start up the gateway connection - {err}");
|
||||
ClientCoreError::GatewayClientError {
|
||||
gateway_id: details.gateway_id.to_base58_string(),
|
||||
source: Box::new(err),
|
||||
@@ -555,14 +562,14 @@ where
|
||||
custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
|
||||
config_topology: config::Topology,
|
||||
nym_api_urls: Vec<Url>,
|
||||
user_agent: Option<UserAgent>,
|
||||
nym_api_client: NymApiClient,
|
||||
) -> Box<dyn TopologyProvider + Send + Sync> {
|
||||
// if no custom provider was ... provided ..., create one using nym-api
|
||||
custom_provider.unwrap_or_else(|| {
|
||||
Box::new(NymApiTopologyProvider::new(
|
||||
config_topology,
|
||||
nym_api_urls,
|
||||
user_agent,
|
||||
nym_api_client,
|
||||
))
|
||||
})
|
||||
}
|
||||
@@ -598,7 +605,7 @@ where
|
||||
topology_refresher.try_refresh().await;
|
||||
|
||||
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
|
||||
log::error!(
|
||||
tracing::error!(
|
||||
"The current network topology seem to be insufficient to route any packets through \
|
||||
- check if enough nodes and a gateway are online - source: {err}"
|
||||
);
|
||||
@@ -674,16 +681,26 @@ where
|
||||
// TODO: rename it as it implies the data is persistent whilst one can use InMemBackend
|
||||
async fn setup_persistent_reply_storage(
|
||||
backend: S::ReplyStore,
|
||||
key_rotation_config: KeyRotationConfig,
|
||||
shutdown: TaskClient,
|
||||
) -> Result<CombinedReplyStorage, ClientCoreError>
|
||||
where
|
||||
<S::ReplyStore as ReplyStorageBackend>::StorageError: Sync + Send,
|
||||
S::ReplyStore: Send + Sync,
|
||||
{
|
||||
log::trace!("Setup persistent reply storage");
|
||||
tracing::trace!("Setup persistent reply storage");
|
||||
let now = OffsetDateTime::now_utc();
|
||||
let expected_current_key_rotation_start =
|
||||
key_rotation_config.expected_current_key_rotation_start(now);
|
||||
// time of the start of one epoch BEFORE the CURRENT rotation has begun
|
||||
// this indicates the starting time of when packets with the current keys might have been constructed
|
||||
// (i.e. any surbs OLDER than that MUST BE invalid)
|
||||
let prior_epoch_start =
|
||||
expected_current_key_rotation_start - key_rotation_config.epoch_duration;
|
||||
|
||||
let persistent_storage = PersistentReplyStorage::new(backend);
|
||||
let mem_store = persistent_storage
|
||||
.load_state_from_backend()
|
||||
.load_state_from_backend(prior_epoch_start)
|
||||
.await
|
||||
.map_err(|err| ClientCoreError::SurbStorageError {
|
||||
source: Box::new(err),
|
||||
@@ -725,6 +742,23 @@ where
|
||||
setup_gateway(setup_method, key_store, details_store).await
|
||||
}
|
||||
|
||||
fn construct_nym_api_client(config: &Config, user_agent: Option<UserAgent>) -> NymApiClient {
|
||||
let mut nym_api_urls = config.get_nym_api_endpoints();
|
||||
nym_api_urls.shuffle(&mut thread_rng());
|
||||
|
||||
if let Some(user_agent) = user_agent {
|
||||
NymApiClient::new_with_user_agent(nym_api_urls[0].clone(), user_agent)
|
||||
} else {
|
||||
NymApiClient::new(nym_api_urls[0].clone())
|
||||
}
|
||||
}
|
||||
|
||||
async fn determine_key_rotation_state(
|
||||
client: &NymApiClient,
|
||||
) -> Result<KeyRotationConfig, ClientCoreError> {
|
||||
Ok(client.nym_api.get_key_rotation_info().await?.into())
|
||||
}
|
||||
|
||||
pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
|
||||
where
|
||||
S::ReplyStore: Send + Sync,
|
||||
@@ -789,11 +823,14 @@ where
|
||||
.dkg_query_client
|
||||
.map(|client| BandwidthController::new(credential_store, client));
|
||||
|
||||
let nym_api_client = Self::construct_nym_api_client(&self.config, self.user_agent.clone());
|
||||
let key_rotation_config = Self::determine_key_rotation_state(&nym_api_client).await?;
|
||||
|
||||
let topology_provider = Self::setup_topology_provider(
|
||||
self.custom_topology_provider.take(),
|
||||
self.config.debug.topology,
|
||||
self.config.get_nym_api_endpoints(),
|
||||
self.user_agent.clone(),
|
||||
nym_api_client,
|
||||
);
|
||||
|
||||
let stats_reporter = Self::start_statistics_control(
|
||||
@@ -838,6 +875,7 @@ where
|
||||
|
||||
let reply_storage = Self::setup_persistent_reply_storage(
|
||||
reply_storage_backend,
|
||||
key_rotation_config,
|
||||
shutdown.fork("persistent_reply_storage"),
|
||||
)
|
||||
.await?;
|
||||
@@ -878,6 +916,7 @@ where
|
||||
|
||||
Self::start_real_traffic_controller(
|
||||
controller_config,
|
||||
key_rotation_config,
|
||||
shared_topology_accessor.clone(),
|
||||
ack_receiver,
|
||||
input_receiver,
|
||||
|
||||
@@ -7,13 +7,13 @@ use crate::{
|
||||
config::Config,
|
||||
error::ClientCoreError,
|
||||
};
|
||||
use log::{error, info, trace};
|
||||
use nym_bandwidth_controller::BandwidthController;
|
||||
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
|
||||
use nym_credential_storage::storage::Storage as CredentialStorage;
|
||||
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
|
||||
use std::{io, path::Path};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{error, info, trace};
|
||||
use url::Url;
|
||||
|
||||
async fn setup_fresh_backend<P: AsRef<Path>>(
|
||||
@@ -90,7 +90,7 @@ pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
|
||||
let db_path = db_path.as_ref();
|
||||
if db_path.exists() {
|
||||
info!("Loading existing surb database");
|
||||
match fs_backend::Backend::try_load(db_path, surb_config.fresh_sender_tags).await {
|
||||
match fs_backend::Backend::try_load(db_path).await {
|
||||
Ok(backend) => Ok(backend),
|
||||
Err(err) => {
|
||||
error!("setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
|
||||
|
||||
@@ -6,7 +6,6 @@ use crate::client::topology_control::TopologyAccessor;
|
||||
use crate::{config, spawn_future};
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::{Future, Stream, StreamExt};
|
||||
use log::*;
|
||||
use nym_sphinx::acknowledgements::AckKey;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::cover::generate_loop_cover_packet;
|
||||
@@ -19,6 +18,7 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tracing::*;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time::{sleep, Sleep};
|
||||
@@ -210,10 +210,10 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
TrySendError::Full(_) => {
|
||||
// This isn't a problem, if the channel is full means we're already sending the
|
||||
// max amount of messages downstream can handle.
|
||||
log::debug!("Failed to send cover message - channel full");
|
||||
tracing::debug!("Failed to send cover message - channel full");
|
||||
}
|
||||
TrySendError::Closed(_) => {
|
||||
log::warn!("Failed to send cover message - channel closed");
|
||||
tracing::warn!("Failed to send cover message - channel closed");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -258,20 +258,20 @@ impl LoopCoverTrafficStream<OsRng> {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("LoopCoverTrafficStream: Received shutdown");
|
||||
tracing::trace!("LoopCoverTrafficStream: Received shutdown");
|
||||
}
|
||||
next = self.next() => {
|
||||
if next.is_some() {
|
||||
self.on_new_message().await;
|
||||
} else {
|
||||
log::trace!("LoopCoverTrafficStream: Stopping since channel closed");
|
||||
tracing::trace!("LoopCoverTrafficStream: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
shutdown.recv_timeout().await;
|
||||
log::debug!("LoopCoverTrafficStream: Exiting");
|
||||
tracing::debug!("LoopCoverTrafficStream: Exiting");
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,7 +135,9 @@ impl InputMessage {
|
||||
recipient_tag,
|
||||
data,
|
||||
lane,
|
||||
max_retransmissions: None,
|
||||
// \/ set it to SOME sane default so that if we run out of surbs and constantly
|
||||
// fail to request more, we wouldn't be stuck in limbo
|
||||
max_retransmissions: Some(10),
|
||||
};
|
||||
if let Some(packet_type) = packet_type {
|
||||
InputMessage::new_wrapper(message, packet_type)
|
||||
|
||||
@@ -4,10 +4,10 @@
|
||||
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
|
||||
use crate::error::ClientCoreError;
|
||||
use crate::spawn_future;
|
||||
use log::*;
|
||||
use nym_gateway_requests::ClientRequest;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_task::TaskClient;
|
||||
use tracing::*;
|
||||
use transceiver::ErasedGatewayError;
|
||||
|
||||
pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
|
||||
@@ -138,7 +138,7 @@ impl MixTrafficController {
|
||||
}
|
||||
},
|
||||
None => {
|
||||
log::trace!("MixTrafficController: Stopping since channel closed");
|
||||
tracing::trace!("MixTrafficController: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
@@ -146,22 +146,22 @@ impl MixTrafficController {
|
||||
Some(client_request) => {
|
||||
match self.gateway_transceiver.send_client_request(client_request).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => error!("Failed to send client request: {}", e),
|
||||
Err(e) => error!("Failed to send client request: {e}"),
|
||||
};
|
||||
},
|
||||
None => {
|
||||
log::trace!("MixTrafficController, client request channel closed");
|
||||
tracing::trace!("MixTrafficController, client request channel closed");
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("MixTrafficController: Received shutdown");
|
||||
tracing::trace!("MixTrafficController: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
|
||||
log::debug!("MixTrafficController: Exiting");
|
||||
tracing::debug!("MixTrafficController: Exiting");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, error};
|
||||
use nym_credential_storage::storage::Storage as CredentialStorage;
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use nym_gateway_client::error::GatewayClientError;
|
||||
@@ -14,6 +13,7 @@ use nym_validator_client::nyxd::contract_traits::DkgQueryClient;
|
||||
use std::fmt::Debug;
|
||||
use std::os::raw::c_int as RawFd;
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, error};
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use futures::channel::oneshot;
|
||||
@@ -27,7 +27,7 @@ fn erase_err<E: std::error::Error + Send + Sync + 'static>(err: E) -> ErasedGate
|
||||
ErasedGatewayError(Box::new(err))
|
||||
}
|
||||
|
||||
/// This combines combines the functionalities of being able to send and receive mix packets.
|
||||
/// This combines the functionalities of being able to send and receive mix packets.
|
||||
#[async_trait]
|
||||
pub trait GatewayTransceiver: GatewaySender + GatewayReceiver {
|
||||
fn gateway_identity(&self) -> ed25519::PublicKey;
|
||||
@@ -87,7 +87,7 @@ impl<G: GatewayTransceiver + ?Sized + Send> GatewayTransceiver for Box<G> {
|
||||
message: ClientRequest,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
let _ = (**self).send_client_request(message.clone()).await?;
|
||||
log::debug!("Sent client request: {:?}", message);
|
||||
tracing::debug!("Sent client request: {:?}", message);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
+5
-5
@@ -5,7 +5,6 @@ use super::action_controller::{AckActionSender, Action};
|
||||
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
|
||||
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_gateway_client::AcknowledgementReceiver;
|
||||
use nym_sphinx::{
|
||||
acknowledgements::{identifier::recover_identifier, AckKey},
|
||||
@@ -13,6 +12,7 @@ use nym_sphinx::{
|
||||
};
|
||||
use nym_task::TaskClient;
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
|
||||
/// Module responsible for listening for any data resembling acknowledgements from the network
|
||||
/// and firing actions to remove them from the 'Pending' state.
|
||||
@@ -65,7 +65,7 @@ impl AcknowledgementListener {
|
||||
return;
|
||||
}
|
||||
|
||||
trace!("Received {} from the mix network", frag_id);
|
||||
trace!("Received {frag_id} from the mix network");
|
||||
self.stats_tx
|
||||
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
|
||||
if let Err(err) = self
|
||||
@@ -93,16 +93,16 @@ impl AcknowledgementListener {
|
||||
acks = self.ack_receiver.next() => match acks {
|
||||
Some(acks) => self.handle_ack_receiver_item(acks).await,
|
||||
None => {
|
||||
log::trace!("AcknowledgementListener: Stopping since channel closed");
|
||||
tracing::trace!("AcknowledgementListener: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("AcknowledgementListener: Received shutdown");
|
||||
tracing::trace!("AcknowledgementListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("AcknowledgementListener: Exiting");
|
||||
tracing::debug!("AcknowledgementListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+13
-25
@@ -5,7 +5,6 @@ use super::PendingAcknowledgement;
|
||||
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
|
||||
use nym_sphinx::chunking::fragment::FragmentIdentifier;
|
||||
use nym_sphinx::Delay as SphinxDelay;
|
||||
@@ -13,6 +12,7 @@ use nym_task::TaskClient;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
|
||||
pub(crate) type AckActionSender = mpsc::UnboundedSender<Action>;
|
||||
pub(crate) type AckActionReceiver = mpsc::UnboundedReceiver<Action>;
|
||||
@@ -126,7 +126,7 @@ impl ActionController {
|
||||
fn handle_insert(&mut self, pending_acks: Vec<PendingAcknowledgement>) {
|
||||
for pending_ack in pending_acks {
|
||||
let frag_id = pending_ack.message_chunk.fragment_identifier();
|
||||
trace!("{} is inserted", frag_id);
|
||||
trace!("{frag_id} is inserted");
|
||||
|
||||
if self
|
||||
.pending_acks_data
|
||||
@@ -161,22 +161,16 @@ impl ActionController {
|
||||
let new_queue_key = self.pending_acks_timers.insert(frag_id, timeout);
|
||||
*queue_key = Some(new_queue_key)
|
||||
} else {
|
||||
debug!(
|
||||
"Tried to START TIMER on pending ack that is already gone! - {}",
|
||||
frag_id
|
||||
);
|
||||
debug!("Tried to START TIMER on pending ack that is already gone! - {frag_id}");
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_remove(&mut self, frag_id: FragmentIdentifier) {
|
||||
trace!("{} is getting removed", frag_id);
|
||||
trace!("{frag_id} is getting removed");
|
||||
|
||||
match self.pending_acks_data.remove(&frag_id) {
|
||||
None => {
|
||||
debug!(
|
||||
"Tried to REMOVE pending ack that is already gone! - {}",
|
||||
frag_id
|
||||
);
|
||||
debug!("Tried to REMOVE pending ack that is already gone! - {frag_id}");
|
||||
}
|
||||
Some((_, queue_key)) => {
|
||||
if let Some(queue_key) = queue_key {
|
||||
@@ -188,10 +182,7 @@ impl ActionController {
|
||||
} else {
|
||||
// I'm not 100% sure if having a `None` key is even possible here
|
||||
// (REMOVE would have to be called before START TIMER),
|
||||
debug!(
|
||||
"Tried to REMOVE pending ack without TIMER active - {}",
|
||||
frag_id
|
||||
);
|
||||
debug!("Tried to REMOVE pending ack without TIMER active - {frag_id}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -200,7 +191,7 @@ impl ActionController {
|
||||
// initiated basically as a first step of retransmission. At first data has its delay updated
|
||||
// (as new sphinx packet was created with new expected delivery time)
|
||||
fn handle_update_pending_ack(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
|
||||
trace!("{} is updating its delay", frag_id);
|
||||
trace!("{frag_id} is updating its delay");
|
||||
// TODO: is it possible to solve this without either locking or temporarily removing the value?
|
||||
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
|
||||
// this Action is triggered by `RetransmissionRequestListener` (for 'normal' packets)
|
||||
@@ -213,10 +204,7 @@ impl ActionController {
|
||||
self.pending_acks_data
|
||||
.insert(frag_id, (Arc::new(inner_data), queue_key));
|
||||
} else {
|
||||
debug!(
|
||||
"Tried to UPDATE TIMER on pending ack that is already gone! - {}",
|
||||
frag_id
|
||||
);
|
||||
debug!("Tried to UPDATE TIMER on pending ack that is already gone! - {frag_id}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,7 +229,7 @@ impl ActionController {
|
||||
.unbounded_send(Arc::downgrade(pending_ack_data))
|
||||
{
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
log::error!("Failed to send pending ack for retransmission: {err}");
|
||||
tracing::error!("Failed to send pending ack for retransmission: {err}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -269,7 +257,7 @@ impl ActionController {
|
||||
action = self.incoming_actions.next() => match action {
|
||||
Some(action) => self.process_action(action),
|
||||
None => {
|
||||
log::trace!(
|
||||
tracing::trace!(
|
||||
"ActionController: Stopping since incoming actions channel closed"
|
||||
);
|
||||
break;
|
||||
@@ -278,17 +266,17 @@ impl ActionController {
|
||||
expired_ack = self.pending_acks_timers.next() => match expired_ack {
|
||||
Some(expired_ack) => self.handle_expired_ack_timer(expired_ack),
|
||||
None => {
|
||||
log::trace!("ActionController: Stopping since ack channel closed");
|
||||
tracing::trace!("ActionController: Stopping since ack channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("ActionController: Received shutdown");
|
||||
tracing::trace!("ActionController: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("ActionController: Exiting");
|
||||
tracing::debug!("ActionController: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+4
-4
@@ -5,7 +5,6 @@ use crate::client::inbound_messages::{InputMessage, InputMessageReceiver};
|
||||
use crate::client::real_messages_control::message_handler::MessageHandler;
|
||||
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
|
||||
use crate::client::replies::reply_controller::ReplyControllerSender;
|
||||
use log::*;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
@@ -13,6 +12,7 @@ use nym_sphinx::params::PacketType;
|
||||
use nym_task::connections::TransmissionLane;
|
||||
use nym_task::TaskClient;
|
||||
use rand::{CryptoRng, Rng};
|
||||
use tracing::*;
|
||||
|
||||
/// Module responsible for dealing with the received messages: splitting them, creating acknowledgements,
|
||||
/// putting everything into sphinx packets, etc.
|
||||
@@ -228,16 +228,16 @@ where
|
||||
self.on_input_message(input_msg).await;
|
||||
},
|
||||
None => {
|
||||
log::trace!("InputMessageListener: Stopping since channel closed");
|
||||
tracing::trace!("InputMessageListener: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("InputMessageListener: Received shutdown");
|
||||
tracing::trace!("InputMessageListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("InputMessageListener: Exiting");
|
||||
tracing::debug!("InputMessageListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::client::replies::reply_controller::ReplyControllerSender;
|
||||
use crate::spawn_future;
|
||||
use action_controller::AckActionReceiver;
|
||||
use futures::channel::mpsc;
|
||||
use log::*;
|
||||
use nym_gateway_client::AcknowledgementReceiver;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::params::{PacketSize, PacketType};
|
||||
@@ -30,6 +29,7 @@ use std::{
|
||||
sync::{Arc, Weak},
|
||||
time::Duration,
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
pub(crate) use action_controller::{AckActionSender, Action};
|
||||
|
||||
|
||||
+4
-4
@@ -10,13 +10,13 @@ use crate::client::real_messages_control::message_handler::{MessageHandler, Prep
|
||||
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
|
||||
use crate::client::replies::reply_controller::ReplyControllerSender;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_sphinx::chunking::fragment::Fragment;
|
||||
use nym_sphinx::preparer::PreparedFragment;
|
||||
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
|
||||
use nym_task::{connections::TransmissionLane, TaskClient};
|
||||
use rand::{CryptoRng, Rng};
|
||||
use std::sync::{Arc, Weak};
|
||||
use tracing::*;
|
||||
|
||||
// responsible for packet retransmission upon fired timer
|
||||
pub(super) struct RetransmissionRequestListener<R> {
|
||||
@@ -182,16 +182,16 @@ where
|
||||
timed_out_ack = self.request_receiver.next() => match timed_out_ack {
|
||||
Some(timed_out_ack) => self.on_retransmission_request(timed_out_ack, packet_type).await,
|
||||
None => {
|
||||
log::trace!("RetransmissionRequestListener: Stopping since channel closed");
|
||||
tracing::trace!("RetransmissionRequestListener: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("RetransmissionRequestListener: Received shutdown");
|
||||
tracing::trace!("RetransmissionRequestListener: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("RetransmissionRequestListener: Exiting");
|
||||
tracing::debug!("RetransmissionRequestListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
+4
-4
@@ -4,9 +4,9 @@
|
||||
use super::action_controller::{AckActionSender, Action};
|
||||
use super::SentPacketNotificationReceiver;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
|
||||
use nym_task::TaskClient;
|
||||
use tracing::*;
|
||||
|
||||
/// Module responsible for starting up retransmission timers.
|
||||
/// It is required because when we send our packet to the `real traffic stream` controlled
|
||||
@@ -56,17 +56,17 @@ impl SentNotificationListener {
|
||||
self.on_sent_message(frag_id).await;
|
||||
}
|
||||
None => {
|
||||
log::trace!("SentNotificationListener: Stopping since channel closed");
|
||||
tracing::trace!("SentNotificationListener: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("SentNotificationListener: Received shutdown");
|
||||
tracing::trace!("SentNotificationListener: Received shutdown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
assert!(self.task_client.is_shutdown_poll());
|
||||
log::debug!("SentNotificationListener: Exiting");
|
||||
tracing::debug!("SentNotificationListener: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::client::real_messages_control::{AckActionSender, Action};
|
||||
use crate::client::replies::reply_controller::MaxRetransmissions;
|
||||
use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags};
|
||||
use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit};
|
||||
use nym_client_core_surb_storage::RetrievedReplySurb;
|
||||
use nym_sphinx::acknowledgements::AckKey;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessage, ReplyMessage};
|
||||
@@ -44,10 +45,7 @@ pub enum PreparationError {
|
||||
}
|
||||
|
||||
impl PreparationError {
|
||||
fn return_surbs(
|
||||
self,
|
||||
returned_surbs: Vec<ReplySurbWithKeyRotation>,
|
||||
) -> SurbWrappedPreparationError {
|
||||
fn return_surbs(self, returned_surbs: Vec<RetrievedReplySurb>) -> SurbWrappedPreparationError {
|
||||
SurbWrappedPreparationError {
|
||||
source: self,
|
||||
returned_surbs: Some(returned_surbs),
|
||||
@@ -61,7 +59,7 @@ pub struct SurbWrappedPreparationError {
|
||||
#[source]
|
||||
source: PreparationError,
|
||||
|
||||
returned_surbs: Option<Vec<ReplySurbWithKeyRotation>>,
|
||||
returned_surbs: Option<Vec<RetrievedReplySurb>>,
|
||||
}
|
||||
|
||||
impl<T> From<T> for SurbWrappedPreparationError
|
||||
@@ -83,7 +81,7 @@ impl SurbWrappedPreparationError {
|
||||
target: &AnonymousSenderTag,
|
||||
) -> PreparationError {
|
||||
if let Some(reply_surbs) = self.returned_surbs {
|
||||
surb_storage.insert_surbs(target, reply_surbs)
|
||||
surb_storage.re_insert_reply_surbs(target, reply_surbs)
|
||||
}
|
||||
self.source
|
||||
}
|
||||
@@ -105,6 +103,9 @@ pub(crate) struct Config {
|
||||
/// will be routed as usual, to the entry gateway, through three mix nodes, egressing
|
||||
/// through the exit gateway. If mix hops are disabled, traffic will be routed directly
|
||||
/// from the entry gateway to the exit gateway, bypassing the mix nodes.
|
||||
///
|
||||
/// This overrides the `use_legacy_sphinx_format` setting as reduced mix hops
|
||||
/// requires use of the updated SURB packet format.
|
||||
disable_mix_hops: bool,
|
||||
|
||||
/// Average delay a data packet is going to get delay at a single mixnode.
|
||||
@@ -159,8 +160,12 @@ impl Config {
|
||||
}
|
||||
|
||||
/// Configure whether messages senders using this config should use mix hops or not when sending messages.
|
||||
///
|
||||
/// This overrides the `use_legacy_sphinx_format` setting as disabled mix hops
|
||||
/// requires use of the updated SURB packet format.
|
||||
pub fn disable_mix_hops(mut self, disable_mix_hops: bool) -> Self {
|
||||
self.disable_mix_hops = disable_mix_hops;
|
||||
self.use_legacy_sphinx_format = false;
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -224,6 +229,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn topology_access_handle(&self) -> &TopologyAccessor {
|
||||
&self.topology_access
|
||||
}
|
||||
|
||||
fn get_or_create_sender_tag(&mut self, recipient: &Recipient) -> AnonymousSenderTag {
|
||||
if let Some(existing) = self.tag_storage.try_get_existing(recipient) {
|
||||
trace!("we already had sender tag for {recipient}");
|
||||
@@ -291,7 +300,7 @@ where
|
||||
&mut self,
|
||||
target: AnonymousSenderTag,
|
||||
message: ReplyMessage,
|
||||
reply_surb: ReplySurbWithKeyRotation,
|
||||
reply_surb: RetrievedReplySurb,
|
||||
is_extra_surb_request: bool,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
let msg = NymMessage::new_reply(message);
|
||||
@@ -322,7 +331,10 @@ where
|
||||
Some(chunk.fragment_identifier()),
|
||||
);
|
||||
let delay = prepared_fragment.total_delay;
|
||||
let max_retransmissions = None;
|
||||
|
||||
// we have to set a maximum number of retransmissions in case we fail to retrieve
|
||||
// surbs for a long period of time; we don't want to be stuck constantly resending the data
|
||||
let max_retransmissions = Some(10);
|
||||
let pending_ack = PendingAcknowledgement::new_anonymous(
|
||||
chunk,
|
||||
delay,
|
||||
@@ -345,7 +357,7 @@ where
|
||||
pub(crate) async fn try_request_additional_reply_surbs(
|
||||
&mut self,
|
||||
from: AnonymousSenderTag,
|
||||
reply_surb: ReplySurbWithKeyRotation,
|
||||
reply_surb: RetrievedReplySurb,
|
||||
amount: u32,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
debug!("requesting {amount} reply SURBs from {from}");
|
||||
@@ -385,11 +397,9 @@ where
|
||||
&mut self,
|
||||
target: AnonymousSenderTag,
|
||||
fragments: Vec<FragmentWithMaxRetransmissions>,
|
||||
reply_surbs: Vec<ReplySurbWithKeyRotation>,
|
||||
reply_surbs: impl IntoIterator<Item = RetrievedReplySurb>,
|
||||
lane: TransmissionLane,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
// TODO: technically this is performing an unnecessary cloning, but in the grand scheme of things
|
||||
// is it really that bad?
|
||||
self.try_send_reply_chunks(
|
||||
target,
|
||||
fragments.into_iter().map(|f| (lane, f)).collect(),
|
||||
@@ -402,7 +412,7 @@ where
|
||||
&mut self,
|
||||
target: AnonymousSenderTag,
|
||||
fragments: Vec<(TransmissionLane, FragmentWithMaxRetransmissions)>,
|
||||
reply_surbs: Vec<ReplySurbWithKeyRotation>,
|
||||
reply_surbs: impl IntoIterator<Item = RetrievedReplySurb>,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
let prepared_fragments = self
|
||||
.prepare_reply_chunks_for_sending(
|
||||
@@ -564,7 +574,7 @@ where
|
||||
)
|
||||
.await?;
|
||||
|
||||
log::trace!("storing {} reply keys", reply_keys.len());
|
||||
tracing::trace!("storing {} reply keys", reply_keys.len());
|
||||
self.reply_key_storage.insert_multiple(reply_keys);
|
||||
|
||||
Ok(())
|
||||
@@ -604,7 +614,7 @@ where
|
||||
)
|
||||
.await?;
|
||||
|
||||
log::trace!("storing {} reply keys", reply_keys.len());
|
||||
tracing::trace!("storing {} reply keys", reply_keys.len());
|
||||
self.reply_key_storage.insert_multiple(reply_keys);
|
||||
|
||||
Ok(())
|
||||
@@ -634,20 +644,12 @@ where
|
||||
pub(crate) async fn prepare_reply_chunks_for_sending(
|
||||
&mut self,
|
||||
fragments: Vec<Fragment>,
|
||||
reply_surbs: Vec<ReplySurbWithKeyRotation>,
|
||||
reply_surbs: impl IntoIterator<Item = RetrievedReplySurb>,
|
||||
) -> Result<Vec<PreparedFragment>, SurbWrappedPreparationError> {
|
||||
debug_assert_eq!(
|
||||
fragments.len(),
|
||||
reply_surbs.len(),
|
||||
"attempted to send {} fragments with {} reply surbs",
|
||||
fragments.len(),
|
||||
reply_surbs.len()
|
||||
);
|
||||
|
||||
let topology_permit = self.topology_access.get_read_permit().await;
|
||||
let topology = match self.get_topology(&topology_permit) {
|
||||
Ok(topology) => topology,
|
||||
Err(err) => return Err(err.return_surbs(reply_surbs)),
|
||||
Err(err) => return Err(err.return_surbs(reply_surbs.into_iter().collect())),
|
||||
};
|
||||
|
||||
Ok(fragments
|
||||
@@ -660,7 +662,7 @@ where
|
||||
fragment,
|
||||
topology,
|
||||
&self.config.ack_key,
|
||||
reply_surb,
|
||||
reply_surb.into(),
|
||||
PacketType::Mix,
|
||||
)
|
||||
.unwrap()
|
||||
@@ -670,7 +672,7 @@ where
|
||||
|
||||
pub(crate) async fn try_prepare_single_reply_chunk_for_sending(
|
||||
&mut self,
|
||||
reply_surb: ReplySurbWithKeyRotation,
|
||||
reply_surb: RetrievedReplySurb,
|
||||
chunk: Fragment,
|
||||
) -> Result<PreparedFragment, SurbWrappedPreparationError> {
|
||||
let topology_permit = self.topology_access.get_read_permit().await;
|
||||
@@ -683,7 +685,7 @@ where
|
||||
chunk,
|
||||
topology,
|
||||
&self.config.ack_key,
|
||||
reply_surb,
|
||||
reply_surb.into(),
|
||||
PacketType::Mix,
|
||||
)?;
|
||||
|
||||
|
||||
@@ -24,7 +24,6 @@ use crate::{
|
||||
spawn_future,
|
||||
};
|
||||
use futures::channel::mpsc;
|
||||
use log::*;
|
||||
use nym_gateway_client::AcknowledgementReceiver;
|
||||
use nym_sphinx::acknowledgements::AckKey;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
@@ -34,7 +33,9 @@ use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
|
||||
use nym_task::TaskClient;
|
||||
use rand::{rngs::OsRng, CryptoRng, Rng};
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
|
||||
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
|
||||
pub(crate) use acknowledgement_control::{AckActionSender, Action};
|
||||
|
||||
pub(crate) mod acknowledgement_control;
|
||||
@@ -85,12 +86,6 @@ impl<'a> From<&'a Config> for real_traffic_stream::Config {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a Config> for reply_controller::Config {
|
||||
fn from(cfg: &'a Config) -> Self {
|
||||
reply_controller::Config::new(cfg.reply_surbs)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a Config> for message_handler::Config {
|
||||
fn from(cfg: &'a Config) -> Self {
|
||||
message_handler::Config::new(
|
||||
@@ -139,6 +134,7 @@ impl RealMessagesController<OsRng> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
config: Config,
|
||||
key_rotation_config: KeyRotationConfig,
|
||||
ack_receiver: AcknowledgementReceiver,
|
||||
input_receiver: InputMessageReceiver,
|
||||
mix_sender: BatchMixMessageSender,
|
||||
@@ -169,7 +165,8 @@ impl RealMessagesController<OsRng> {
|
||||
// create all configs for the components
|
||||
let ack_control_config = (&config).into();
|
||||
let out_queue_config = (&config).into();
|
||||
let reply_controller_config = (&config).into();
|
||||
let reply_controller_config =
|
||||
reply_controller::Config::new(config.reply_surbs, key_rotation_config);
|
||||
let message_handler_config = (&config).into();
|
||||
|
||||
// create the actual components
|
||||
|
||||
@@ -9,7 +9,6 @@ use crate::client::transmission_buffer::TransmissionBuffer;
|
||||
use crate::config;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::{Future, Stream, StreamExt};
|
||||
use log::*;
|
||||
use nym_sphinx::acknowledgements::AckKey;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::chunking::fragment::FragmentIdentifier;
|
||||
@@ -27,6 +26,7 @@ use rand::{CryptoRng, Rng};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time::{sleep, Sleep};
|
||||
@@ -202,7 +202,7 @@ where
|
||||
// well technically the message was not sent just yet, but now it's up to internal
|
||||
// queues and client load rather than the required delay. So realistically we can treat
|
||||
// whatever is about to happen as negligible additional delay.
|
||||
trace!("{} is about to get sent to the mixnet", frag_id);
|
||||
trace!("{frag_id} is about to get sent to the mixnet");
|
||||
if let Err(err) = self.sent_notifier.unbounded_send(frag_id) {
|
||||
error!("Failed to notify about sent message: {err}");
|
||||
}
|
||||
@@ -280,7 +280,7 @@ where
|
||||
|
||||
if let Err(err) = self.mix_tx.send(vec![next_message]).await {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
log::error!("Failed to send: {err}");
|
||||
tracing::error!("Failed to send: {err}");
|
||||
}
|
||||
} else {
|
||||
let event = if fragment_id.is_some() {
|
||||
@@ -313,7 +313,7 @@ where
|
||||
}
|
||||
|
||||
fn on_close_connection(&mut self, connection_id: ConnectionId) {
|
||||
log::debug!("Removing lane for connection: {connection_id}");
|
||||
tracing::debug!("Removing lane for connection: {connection_id}");
|
||||
self.transmission_buffer
|
||||
.remove(&TransmissionLane::ConnectionId(connection_id));
|
||||
}
|
||||
@@ -325,7 +325,7 @@ where
|
||||
|
||||
fn adjust_current_average_message_sending_delay(&mut self) {
|
||||
let used_slots = self.mix_tx.max_capacity() - self.mix_tx.capacity();
|
||||
log::trace!(
|
||||
tracing::trace!(
|
||||
"used_slots: {used_slots}, current_multiplier: {}",
|
||||
self.sending_delay_controller.current_multiplier()
|
||||
);
|
||||
@@ -334,7 +334,7 @@ where
|
||||
.sending_delay_controller
|
||||
.is_backpressure_currently_detected(used_slots)
|
||||
{
|
||||
log::trace!("Backpressure detected");
|
||||
tracing::trace!("Backpressure detected");
|
||||
self.sending_delay_controller.record_backpressure_detected();
|
||||
}
|
||||
|
||||
@@ -436,7 +436,7 @@ where
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
|
||||
Poll::Ready(Some((real_messages, conn_id))) => {
|
||||
log::trace!("handling real_messages: size: {}", real_messages.len());
|
||||
tracing::trace!("handling real_messages: size: {}", real_messages.len());
|
||||
|
||||
self.transmission_buffer.store(&conn_id, real_messages);
|
||||
let real_next = self.pop_next_message().expect("Just stored one");
|
||||
@@ -483,7 +483,7 @@ where
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
|
||||
Poll::Ready(Some((real_messages, conn_id))) => {
|
||||
log::trace!("handling real_messages: size: {}", real_messages.len());
|
||||
tracing::trace!("handling real_messages: size: {}", real_messages.len());
|
||||
|
||||
// First store what we got for the given connection id
|
||||
self.transmission_buffer.store(&conn_id, real_messages);
|
||||
@@ -538,11 +538,11 @@ where
|
||||
};
|
||||
|
||||
if packets > 1000 {
|
||||
log::warn!("{status_str}");
|
||||
tracing::warn!("{status_str}");
|
||||
} else if packets > 0 {
|
||||
log::info!("{status_str}");
|
||||
tracing::info!("{status_str}");
|
||||
} else {
|
||||
log::debug!("{status_str}");
|
||||
tracing::debug!("{status_str}");
|
||||
}
|
||||
|
||||
// Send status message to whoever is listening (possibly UI)
|
||||
@@ -566,7 +566,7 @@ where
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("OutQueueControl: Received shutdown");
|
||||
tracing::trace!("OutQueueControl: Received shutdown");
|
||||
break;
|
||||
}
|
||||
_ = status_timer.tick() => {
|
||||
@@ -575,7 +575,7 @@ where
|
||||
next_message = self.next() => if let Some(next_message) = next_message {
|
||||
self.on_message(next_message).await;
|
||||
} else {
|
||||
log::trace!("OutQueueControl: Stopping since channel closed");
|
||||
tracing::trace!("OutQueueControl: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -589,18 +589,18 @@ where
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("OutQueueControl: Received shutdown");
|
||||
tracing::trace!("OutQueueControl: Received shutdown");
|
||||
}
|
||||
next_message = self.next() => if let Some(next_message) = next_message {
|
||||
self.on_message(next_message).await;
|
||||
} else {
|
||||
log::trace!("OutQueueControl: Stopping since channel closed");
|
||||
tracing::trace!("OutQueueControl: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log::debug!("OutQueueControl: Exiting");
|
||||
tracing::debug!("OutQueueControl: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+6
-6
@@ -98,12 +98,12 @@ impl SendingDelayController {
|
||||
self.current_multiplier =
|
||||
(self.current_multiplier + 1).clamp(self.lower_bound, self.upper_bound);
|
||||
self.time_when_changed = get_time_now();
|
||||
log::debug!(
|
||||
tracing::debug!(
|
||||
"Increasing sending delay multiplier to: {}",
|
||||
self.current_multiplier
|
||||
);
|
||||
} else {
|
||||
log::warn!("Trying to increase delay multipler higher than allowed");
|
||||
tracing::warn!("Trying to increase delay multipler higher than allowed");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ impl SendingDelayController {
|
||||
self.current_multiplier =
|
||||
(self.current_multiplier - 1).clamp(self.lower_bound, self.upper_bound);
|
||||
self.time_when_changed = get_time_now();
|
||||
log::debug!(
|
||||
tracing::debug!(
|
||||
"Decreasing sending delay multiplier to: {}",
|
||||
self.current_multiplier
|
||||
);
|
||||
@@ -164,11 +164,11 @@ impl SendingDelayController {
|
||||
self.current_multiplier()
|
||||
);
|
||||
if self.current_multiplier() > 0 {
|
||||
log::debug!("{}", status_str);
|
||||
tracing::debug!("{status_str}");
|
||||
} else if self.current_multiplier() > 1 {
|
||||
log::info!("{}", status_str);
|
||||
tracing::info!("{status_str}");
|
||||
} else if self.current_multiplier() > 2 {
|
||||
log::warn!("{}", status_str);
|
||||
tracing::warn!("{status_str}");
|
||||
}
|
||||
self.time_when_logged_about_elevated_multiplier = now;
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use crate::spawn_future;
|
||||
use futures::channel::mpsc;
|
||||
use futures::lock::Mutex;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_crypto::asymmetric::x25519;
|
||||
use nym_crypto::Digest;
|
||||
use nym_gateway_client::MixnetMessageReceiver;
|
||||
@@ -24,6 +23,7 @@ use nym_task::TaskClient;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::*;
|
||||
|
||||
// The interval at which we check for stale buffers
|
||||
const STALE_BUFFER_CHECK_INTERVAL: Duration = Duration::from_secs(10);
|
||||
@@ -221,10 +221,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
let stored_messages = std::mem::take(&mut guard.messages);
|
||||
if !stored_messages.is_empty() {
|
||||
if let Err(err) = sender.unbounded_send(stored_messages) {
|
||||
error!(
|
||||
"The sender channel we just received is already invalidated - {:?}",
|
||||
err
|
||||
);
|
||||
error!("The sender channel we just received is already invalidated - {err:?}");
|
||||
// put the values back to the buffer
|
||||
// the returned error has two fields: err: SendError and val: T,
|
||||
// where val is the value that was failed to get sent;
|
||||
@@ -310,13 +307,15 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
|
||||
msg.sender_tag,
|
||||
reply_surbs,
|
||||
from_surb_request,
|
||||
) {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("{err}");
|
||||
if !reply_surbs.is_empty() {
|
||||
if let Err(err) = self.reply_controller_sender.send_additional_surbs(
|
||||
msg.sender_tag,
|
||||
reply_surbs,
|
||||
from_surb_request,
|
||||
) {
|
||||
if !self.task_client.is_shutdown_poll() {
|
||||
error!("{err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -500,20 +499,20 @@ impl<R: MessageReceiver> RequestReceiver<R> {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("RequestReceiver: Received shutdown");
|
||||
tracing::trace!("RequestReceiver: Received shutdown");
|
||||
}
|
||||
request = self.query_receiver.next() => {
|
||||
if let Some(message) = request {
|
||||
self.handle_message(message).await
|
||||
} else {
|
||||
log::trace!("RequestReceiver: Stopping since channel closed");
|
||||
tracing::trace!("RequestReceiver: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
self.task_client.recv().await;
|
||||
log::debug!("RequestReceiver: Exiting");
|
||||
tracing::debug!("RequestReceiver: Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -544,17 +543,17 @@ impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
|
||||
if let Some(new_messages) = new_messages {
|
||||
self.received_buffer.handle_new_received(new_messages).await?;
|
||||
} else {
|
||||
log::trace!("FragmentedMessageReceiver: Stopping since channel closed");
|
||||
tracing::trace!("FragmentedMessageReceiver: Stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = self.task_client.recv_with_delay() => {
|
||||
log::trace!("FragmentedMessageReceiver: Received shutdown");
|
||||
tracing::trace!("FragmentedMessageReceiver: Received shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("FragmentedMessageReceiver: Exiting");
|
||||
tracing::debug!("FragmentedMessageReceiver: Exiting");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,169 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_topology::NymTopologyMetadata;
|
||||
use nym_validator_client::models::{
|
||||
EpochId, KeyRotationId, KeyRotationInfoResponse, KeyRotationState,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) enum SurbRefreshState {
|
||||
WaitingForNextRotation { last_known: KeyRotationId },
|
||||
ScheduledForNextInvocation,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) struct ReferenceEpoch {
|
||||
pub(crate) absolute_epoch_id: EpochId,
|
||||
pub(crate) start_time: OffsetDateTime,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) struct KeyRotationConfig {
|
||||
pub(crate) epoch_duration: Duration,
|
||||
pub(crate) rotation_state: KeyRotationState,
|
||||
pub(crate) reference_epoch: ReferenceEpoch,
|
||||
}
|
||||
|
||||
impl From<KeyRotationInfoResponse> for KeyRotationConfig {
|
||||
fn from(value: KeyRotationInfoResponse) -> Self {
|
||||
KeyRotationConfig {
|
||||
epoch_duration: value.details.epoch_duration,
|
||||
rotation_state: value.details.key_rotation_state,
|
||||
reference_epoch: ReferenceEpoch {
|
||||
absolute_epoch_id: value.details.current_absolute_epoch_id,
|
||||
start_time: value.details.current_epoch_start,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyRotationConfig {
|
||||
pub(crate) fn rotation_lifetime(&self) -> Duration {
|
||||
(self.rotation_state.validity_epochs + 1) * self.epoch_duration
|
||||
}
|
||||
|
||||
pub(crate) fn key_rotation_id(&self, current_absolute_epoch_id: EpochId) -> KeyRotationId {
|
||||
self.rotation_state
|
||||
.key_rotation_id(current_absolute_epoch_id)
|
||||
}
|
||||
|
||||
// this is called with the assumption that now is always > reference epoch start
|
||||
pub(crate) fn expected_current_epoch_id(&self, now: OffsetDateTime) -> EpochId {
|
||||
let diff_secs = (now - self.reference_epoch.start_time).as_seconds_f64();
|
||||
let epochs = (diff_secs / self.epoch_duration.as_secs_f64()).floor() as u32;
|
||||
|
||||
self.reference_epoch.absolute_epoch_id + epochs
|
||||
}
|
||||
|
||||
fn initial_rotation_epoch_start(&self) -> OffsetDateTime {
|
||||
let epochs_diff = self
|
||||
.reference_epoch
|
||||
.absolute_epoch_id
|
||||
.saturating_sub(self.rotation_state.initial_epoch_id);
|
||||
|
||||
self.reference_epoch.start_time - epochs_diff * self.epoch_duration
|
||||
}
|
||||
|
||||
pub(crate) fn key_rotation_start(&self, key_rotation_id: KeyRotationId) -> OffsetDateTime {
|
||||
let rotation_duration = self.rotation_state.validity_epochs * self.epoch_duration;
|
||||
let initial_start = self.initial_rotation_epoch_start();
|
||||
|
||||
// note: key rotation starts from 0
|
||||
initial_start + rotation_duration * key_rotation_id
|
||||
}
|
||||
|
||||
pub(crate) fn expected_current_key_rotation_id(&self, now: OffsetDateTime) -> KeyRotationId {
|
||||
let expected_current_epoch = self.expected_current_epoch_id(now);
|
||||
self.key_rotation_id(expected_current_epoch)
|
||||
}
|
||||
|
||||
pub(crate) fn expected_current_key_rotation_start(
|
||||
&self,
|
||||
now: OffsetDateTime,
|
||||
) -> OffsetDateTime {
|
||||
let expected_current_key_rotation_id = self.expected_current_key_rotation_id(now);
|
||||
self.key_rotation_start(expected_current_key_rotation_id)
|
||||
}
|
||||
|
||||
pub(crate) fn epoch_stuck(&self, topology_metadata: NymTopologyMetadata) -> bool {
|
||||
// add leeway of 2mins each direction since transition is not instantaneous
|
||||
let lower_bound = topology_metadata.refreshed_at - Duration::from_secs(2);
|
||||
let upper_bound = topology_metadata.refreshed_at + Duration::from_secs(2);
|
||||
|
||||
let expected_epoch_lower = self.expected_current_epoch_id(lower_bound);
|
||||
let expected_epoch_upper = self.expected_current_epoch_id(upper_bound);
|
||||
|
||||
topology_metadata.absolute_epoch_id != expected_epoch_lower
|
||||
&& topology_metadata.absolute_epoch_id != expected_epoch_upper
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use time::macros::datetime;
|
||||
|
||||
fn mock_config() -> KeyRotationConfig {
|
||||
KeyRotationConfig {
|
||||
epoch_duration: Duration::from_secs(60 * 60),
|
||||
rotation_state: KeyRotationState {
|
||||
validity_epochs: 10,
|
||||
initial_epoch_id: 80,
|
||||
},
|
||||
reference_epoch: ReferenceEpoch {
|
||||
absolute_epoch_id: 100,
|
||||
start_time: datetime!(2025-06-30 12:00:00+00:00),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn expected_current_key_rotation_start() {
|
||||
// rot0: 80-89
|
||||
// rot1: 90-99
|
||||
// rot2: 100-109
|
||||
// rot3: 110-119
|
||||
// ... etc
|
||||
let cfg = mock_config();
|
||||
|
||||
assert_eq!(
|
||||
cfg.initial_rotation_epoch_start(),
|
||||
datetime!(2025-06-29 16:00:00+00:00)
|
||||
);
|
||||
|
||||
let fake_now = datetime!(2025-06-30 12:00:00+00:00);
|
||||
assert_eq!(cfg.expected_current_epoch_id(fake_now), 100);
|
||||
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 2);
|
||||
assert_eq!(
|
||||
cfg.expected_current_key_rotation_start(fake_now),
|
||||
datetime!(2025-06-30 12:00:00+00:00)
|
||||
);
|
||||
|
||||
let fake_now = datetime!(2025-06-30 12:30:00+00:00);
|
||||
assert_eq!(cfg.expected_current_epoch_id(fake_now), 100);
|
||||
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 2);
|
||||
assert_eq!(
|
||||
cfg.expected_current_key_rotation_start(fake_now),
|
||||
datetime!(2025-06-30 12:00:00+00:00)
|
||||
);
|
||||
|
||||
let fake_now = datetime!(2025-06-30 13:01:00+00:00);
|
||||
assert_eq!(cfg.expected_current_epoch_id(fake_now), 101);
|
||||
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 2);
|
||||
assert_eq!(
|
||||
cfg.expected_current_key_rotation_start(fake_now),
|
||||
datetime!(2025-06-30 12:00:00+00:00)
|
||||
);
|
||||
|
||||
let fake_now = datetime!(2025-06-30 22:02:00+00:00);
|
||||
assert_eq!(cfg.expected_current_epoch_id(fake_now), 110);
|
||||
assert_eq!(cfg.expected_current_key_rotation_id(fake_now), 3);
|
||||
assert_eq!(
|
||||
cfg.expected_current_key_rotation_start(fake_now),
|
||||
datetime!(2025-06-30 22:00:00+00:00)
|
||||
);
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,899 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
|
||||
use crate::client::real_messages_control::message_handler::{
|
||||
FragmentWithMaxRetransmissions, MessageHandler, PreparationError,
|
||||
};
|
||||
use crate::client::replies::reply_controller::key_rotation_helpers::SurbRefreshState;
|
||||
use crate::client::replies::reply_controller::Config;
|
||||
use crate::client::topology_control::TopologyAccessor;
|
||||
use crate::client::transmission_buffer::TransmissionBuffer;
|
||||
use futures::channel::oneshot;
|
||||
use nym_client_core_surb_storage::{ReceivedReplySurb, ReceivedReplySurbsMap};
|
||||
use nym_crypto::aes::cipher::crypto_common::rand_core::CryptoRng;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
|
||||
use nym_sphinx::chunking::fragment::FragmentIdentifier;
|
||||
use nym_task::connections::{ConnectionId, TransmissionLane};
|
||||
use nym_topology::NymTopologyMetadata;
|
||||
use rand::Rng;
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::mem;
|
||||
use std::sync::{Arc, Weak};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
struct SenderData {
|
||||
current_clear_rerequest_counter: usize,
|
||||
pending_replies: TransmissionBuffer<FragmentWithMaxRetransmissions>,
|
||||
pending_retransmissions: BTreeMap<FragmentIdentifier, Weak<PendingAcknowledgement>>,
|
||||
last_request_failure: OffsetDateTime,
|
||||
}
|
||||
|
||||
impl Default for SenderData {
|
||||
fn default() -> Self {
|
||||
SenderData {
|
||||
current_clear_rerequest_counter: 0,
|
||||
pending_replies: Default::default(),
|
||||
pending_retransmissions: Default::default(),
|
||||
last_request_failure: OffsetDateTime::UNIX_EPOCH,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SenderData {
|
||||
fn total_pending(&self) -> usize {
|
||||
let pending_replies = self.pending_replies.total_size();
|
||||
let pending_retransmissions = self.pending_retransmissions.len();
|
||||
let total_pending = pending_retransmissions + pending_replies;
|
||||
|
||||
debug!("total queue size: {total_pending} = pending data {pending_replies} + pending retransmission {pending_retransmissions}");
|
||||
|
||||
total_pending
|
||||
}
|
||||
|
||||
pub(crate) fn increment_current_clear_rerequest_counter(&mut self) {
|
||||
self.current_clear_rerequest_counter += 1;
|
||||
}
|
||||
|
||||
pub(crate) fn reset_current_clear_rerequest_counter(&mut self) {
|
||||
self.current_clear_rerequest_counter = 0;
|
||||
}
|
||||
|
||||
pub(crate) fn reset_last_request_failure(&mut self, now: OffsetDateTime) -> OffsetDateTime {
|
||||
mem::replace(&mut self.last_request_failure, now)
|
||||
}
|
||||
}
|
||||
|
||||
/// Reply controller responsible for controlling receiver-related part
|
||||
/// of replies, such as requesting additional reply SURBs
|
||||
pub struct ReceiverReplyController<R> {
|
||||
config: Config,
|
||||
|
||||
surb_refresh_state: SurbRefreshState,
|
||||
topology_access: TopologyAccessor,
|
||||
|
||||
surb_senders: HashMap<AnonymousSenderTag, SenderData>,
|
||||
unavailable: HashMap<AnonymousSenderTag, OffsetDateTime>,
|
||||
surbs_storage: ReceivedReplySurbsMap,
|
||||
|
||||
// TODO: incorporate that field at some point
|
||||
// and use binomial distribution to determine the expected required number
|
||||
// of surbs required to send the message through
|
||||
// expected_reliability: f32,
|
||||
message_handler: MessageHandler<R>,
|
||||
}
|
||||
|
||||
impl<R> ReceiverReplyController<R>
|
||||
where
|
||||
R: CryptoRng + Rng,
|
||||
{
|
||||
pub(crate) fn new(
|
||||
config: Config,
|
||||
storage: ReceivedReplySurbsMap,
|
||||
message_handler: MessageHandler<R>,
|
||||
) -> Self {
|
||||
let topology_access = message_handler.topology_access_handle().clone();
|
||||
|
||||
ReceiverReplyController {
|
||||
config,
|
||||
surb_refresh_state: SurbRefreshState::WaitingForNextRotation {
|
||||
last_known: config
|
||||
.key_rotation
|
||||
.expected_current_key_rotation_id(OffsetDateTime::now_utc()),
|
||||
},
|
||||
topology_access,
|
||||
surb_senders: Default::default(),
|
||||
unavailable: Default::default(),
|
||||
surbs_storage: storage,
|
||||
message_handler,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_or_create_surb_sender(&mut self, tag: &AnonymousSenderTag) -> &mut SenderData {
|
||||
self.surb_senders.entry(*tag).or_default()
|
||||
}
|
||||
|
||||
async fn current_topology_metadata(&self) -> Option<NymTopologyMetadata> {
|
||||
self.topology_access.current_metadata().await
|
||||
}
|
||||
|
||||
fn insert_pending_replies<I: IntoIterator<Item = FragmentWithMaxRetransmissions>>(
|
||||
&mut self,
|
||||
recipient: &AnonymousSenderTag,
|
||||
fragments: I,
|
||||
lane: TransmissionLane,
|
||||
) {
|
||||
trace!("buffering pending replies for {recipient}");
|
||||
self.surb_senders
|
||||
.entry(*recipient)
|
||||
.or_default()
|
||||
.pending_replies
|
||||
.store(&lane, fragments)
|
||||
}
|
||||
|
||||
fn re_insert_pending_replies(
|
||||
&mut self,
|
||||
recipient: &AnonymousSenderTag,
|
||||
fragments: Vec<(TransmissionLane, FragmentWithMaxRetransmissions)>,
|
||||
) {
|
||||
trace!("re-inserting pending replies for {recipient}");
|
||||
// the buffer should ALWAYS exist at this point, if it doesn't, it's a bug...
|
||||
self.surb_senders
|
||||
.entry(*recipient)
|
||||
.or_default()
|
||||
.pending_replies
|
||||
.store_multiple(fragments)
|
||||
}
|
||||
|
||||
fn re_insert_pending_retransmission(
|
||||
&mut self,
|
||||
recipient: &AnonymousSenderTag,
|
||||
data: Vec<Arc<PendingAcknowledgement>>,
|
||||
) {
|
||||
trace!("re-inserting pending retransmissions for {recipient}");
|
||||
// the underlying entry MUST exist as we've just got data from there
|
||||
// and we hold a mut reference
|
||||
let map_entry = &mut self
|
||||
.surb_senders
|
||||
.get_mut(recipient)
|
||||
.expect("our pending retransmission entry is somehow gone!")
|
||||
.pending_retransmissions;
|
||||
|
||||
for pending in data {
|
||||
// if it's 0, we don't need to do anything - we just got that ack!
|
||||
if Arc::strong_count(&pending) > 1 {
|
||||
let id = pending.inner_fragment_identifier();
|
||||
let downgraded = Arc::downgrade(&pending);
|
||||
map_entry.insert(id, downgraded);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn should_request_more_surbs(&self, target: &AnonymousSenderTag) -> bool {
|
||||
trace!("checking if we should request more surbs from {target}");
|
||||
|
||||
let total_queue = self
|
||||
.surb_senders
|
||||
.get(target)
|
||||
.map(|pending| pending.total_pending())
|
||||
.unwrap_or_default();
|
||||
|
||||
// only consider 'fresh' surbs
|
||||
let available_surbs = self.surbs_storage.available_fresh_surbs(target);
|
||||
let pending_surbs = self.surbs_storage.pending_reception(target) as usize;
|
||||
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
|
||||
let max_surbs_threshold = self.surbs_storage.max_surb_threshold();
|
||||
let min_surbs_threshold_buffer =
|
||||
self.config.reply_surbs.minimum_reply_surb_threshold_buffer;
|
||||
|
||||
// After clearing the queue, we want to have at least `min_surbs_threshold` surbs available
|
||||
// and reserved for requesting additional surbs, and in addition to that we also want to
|
||||
// have `min_surbs_threshold_buffer` surbs available proactively.
|
||||
let target_surbs_after_clearing_queue = min_surbs_threshold + min_surbs_threshold_buffer;
|
||||
|
||||
// Check if we have enough surbs to handle the total queue and maintain minimum thresholds
|
||||
let total_required_surbs = total_queue + target_surbs_after_clearing_queue;
|
||||
let total_available_surbs = pending_surbs + available_surbs;
|
||||
|
||||
debug!("available surbs: {available_surbs} pending surbs: {pending_surbs} threshold range: {min_surbs_threshold}..+{min_surbs_threshold_buffer}..{max_surbs_threshold}");
|
||||
|
||||
// We should request more surbs if:
|
||||
// 1. We haven't hit the maximum surb threshold, and
|
||||
// 2. We don't have enough surbs to handle the queue plus minimum thresholds
|
||||
let is_below_max_threshold = total_available_surbs < max_surbs_threshold;
|
||||
let is_below_required_surbs = total_available_surbs < total_required_surbs;
|
||||
|
||||
is_below_max_threshold && is_below_required_surbs
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_send_reply(
|
||||
&mut self,
|
||||
recipient_tag: AnonymousSenderTag,
|
||||
data: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
max_retransmissions: Option<u32>,
|
||||
) {
|
||||
if !self.surbs_storage.contains_surbs_for(&recipient_tag) {
|
||||
if self
|
||||
.unavailable
|
||||
.insert(recipient_tag, OffsetDateTime::now_utc())
|
||||
.is_none()
|
||||
{
|
||||
// don't report it every single time
|
||||
warn!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
|
||||
} else {
|
||||
trace!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
trace!("handling reply to {recipient_tag}");
|
||||
let mut fragments = self.message_handler.split_reply_message(data);
|
||||
let total_size = fragments.len();
|
||||
trace!("This reply requires {total_size} SURBs");
|
||||
|
||||
// for the purposes of sending reply, do allow using possibly stale entries
|
||||
let available_surbs = self.surbs_storage.available_surbs(&recipient_tag);
|
||||
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
|
||||
|
||||
let max_to_send = if available_surbs > min_surbs_threshold {
|
||||
min(fragments.len(), available_surbs - min_surbs_threshold)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
if max_to_send > 0 {
|
||||
let (surbs, surbs_left) = self
|
||||
.surbs_storage
|
||||
.get_reply_surbs(&recipient_tag, max_to_send);
|
||||
|
||||
debug!(
|
||||
"retrieved {} reply surbs. {surbs_left} surbs remaining in storage",
|
||||
surbs.as_ref().map(|s| s.len()).unwrap_or_default()
|
||||
);
|
||||
if let Some(reply_surbs) = surbs {
|
||||
let to_send = fragments
|
||||
.drain(..reply_surbs.len())
|
||||
.map(|f| FragmentWithMaxRetransmissions {
|
||||
fragment: f,
|
||||
max_retransmissions,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_reply_chunks_on_lane(
|
||||
recipient_tag,
|
||||
to_send.clone(),
|
||||
reply_surbs,
|
||||
lane,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let err = err.return_unused_surbs(&self.surbs_storage, &recipient_tag);
|
||||
warn!("failed to send reply to {recipient_tag}: {err}");
|
||||
info!(
|
||||
"buffering {no_fragments} fragments for {recipient_tag}",
|
||||
no_fragments = to_send.len()
|
||||
);
|
||||
self.insert_pending_replies(&recipient_tag, to_send, lane);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if there's leftover data we didn't send because we didn't have enough (or any) surbs - buffer it
|
||||
if !fragments.is_empty() {
|
||||
// Ideally we should have enough surbs above the minimum threshold to handle sending
|
||||
// new replies without having to first request more surbs. That's why I'd like to log
|
||||
// these cases as they might indicate a problem with the surb management.
|
||||
debug!(
|
||||
"buffering {no_fragments} fragments for {recipient_tag}",
|
||||
no_fragments = fragments.len()
|
||||
);
|
||||
let fragments: Vec<_> = fragments
|
||||
.into_iter()
|
||||
.map(|fragment| FragmentWithMaxRetransmissions {
|
||||
fragment,
|
||||
max_retransmissions,
|
||||
})
|
||||
.collect();
|
||||
self.insert_pending_replies(&recipient_tag, fragments, lane);
|
||||
}
|
||||
|
||||
if self.should_request_more_surbs(&recipient_tag) {
|
||||
self.request_reply_surbs_for_queue_clearing(recipient_tag)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_additional_reply_surbs(
|
||||
&mut self,
|
||||
target: AnonymousSenderTag,
|
||||
amount: u32,
|
||||
) -> Result<(), PreparationError> {
|
||||
debug!("requesting {amount} additional reply surbs for {target}");
|
||||
let (reply_surb, _) = self
|
||||
.surbs_storage
|
||||
.get_reply_surb_ignoring_threshold(&target);
|
||||
|
||||
let reply_surb = reply_surb.ok_or(PreparationError::NotEnoughSurbs {
|
||||
available: 0,
|
||||
required: 1,
|
||||
})?;
|
||||
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_request_additional_reply_surbs(target, reply_surb, amount)
|
||||
.await
|
||||
{
|
||||
let err = err.return_unused_surbs(&self.surbs_storage, &target);
|
||||
warn!("failed to request additional surbs from {target}: {err}",);
|
||||
return Err(err);
|
||||
} else {
|
||||
self.surbs_storage
|
||||
.increment_pending_reception(&target, amount);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn try_clear_pending_retransmission(&mut self, target: AnonymousSenderTag) {
|
||||
trace!("trying to clear pending retransmission queue");
|
||||
let available_surbs = self.surbs_storage.available_surbs(&target);
|
||||
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
|
||||
|
||||
let max_to_clear = if available_surbs > min_surbs_threshold {
|
||||
available_surbs - min_surbs_threshold
|
||||
} else {
|
||||
trace!("we don't have enough surbs for retransmission queue clearing...");
|
||||
return;
|
||||
};
|
||||
trace!("we can clear up to {max_to_clear} entries");
|
||||
|
||||
let Some(pending) = self.surb_senders.get_mut(&target) else {
|
||||
trace!("no pending entry for {target}!");
|
||||
return;
|
||||
};
|
||||
|
||||
let mut to_take = Vec::new();
|
||||
|
||||
while to_take.len() < max_to_clear {
|
||||
if let Some((_, data)) = pending.pending_retransmissions.pop_first() {
|
||||
// no need to do anything if we failed to upgrade the reference,
|
||||
// it means we got the ack while the data was waiting in the queue
|
||||
if let Some(upgraded) = data.upgrade() {
|
||||
to_take.push(upgraded)
|
||||
}
|
||||
} else {
|
||||
// our map is empty!
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if to_take.is_empty() {
|
||||
// no need to do anything
|
||||
return;
|
||||
}
|
||||
|
||||
let (surbs_for_reply, _) = self.surbs_storage.get_reply_surbs(&target, to_take.len());
|
||||
|
||||
let Some(surbs_for_reply) = surbs_for_reply else {
|
||||
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
|
||||
self.re_insert_pending_retransmission(&target, to_take);
|
||||
return;
|
||||
};
|
||||
|
||||
let to_send_vec = to_take.iter().map(|ack| ack.fragment_data()).collect();
|
||||
|
||||
let prepared_fragments = match self
|
||||
.message_handler
|
||||
.prepare_reply_chunks_for_sending(to_send_vec, surbs_for_reply)
|
||||
.await
|
||||
{
|
||||
Ok(prepared) => prepared,
|
||||
Err(err) => {
|
||||
let err = err.return_unused_surbs(&self.surbs_storage, &target);
|
||||
self.re_insert_pending_retransmission(&target, to_take);
|
||||
|
||||
warn!("failed to clear pending retransmission queue for {target}: {err}",);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// we can't fail at this point, so drop all references to acks so that timer updates wouldn't blow up
|
||||
drop(to_take);
|
||||
|
||||
self.message_handler
|
||||
.send_retransmission_reply_chunks(prepared_fragments, TransmissionLane::Retransmission)
|
||||
.await;
|
||||
}
|
||||
|
||||
fn pop_at_most_pending_replies(
|
||||
&mut self,
|
||||
from: &AnonymousSenderTag,
|
||||
amount: usize,
|
||||
) -> Option<Vec<(TransmissionLane, FragmentWithMaxRetransmissions)>> {
|
||||
// if possible, pop all pending replies, if not, pop only entries for which we'd have a reply surb
|
||||
let pending = self.surb_senders.get_mut(from)?;
|
||||
let total = pending.pending_replies.total_size();
|
||||
trace!("pending queue has {total} elements");
|
||||
if total == 0 {
|
||||
return None;
|
||||
}
|
||||
pending
|
||||
.pending_replies
|
||||
.pop_at_most_n_next_messages_at_random(amount)
|
||||
}
|
||||
|
||||
async fn try_clear_pending_queue(&mut self, target: AnonymousSenderTag) {
|
||||
trace!("trying to clear pending queue");
|
||||
let available_surbs = self.surbs_storage.available_surbs(&target);
|
||||
let min_surbs_threshold = self.surbs_storage.min_surb_threshold();
|
||||
|
||||
let max_to_clear = if available_surbs > min_surbs_threshold {
|
||||
available_surbs - min_surbs_threshold
|
||||
} else {
|
||||
trace!("we don't have enough surbs for queue clearing...");
|
||||
return;
|
||||
};
|
||||
trace!("we can clear up to {max_to_clear} entries");
|
||||
|
||||
// we're guaranteed to not get more entries than we have reply surbs for
|
||||
if let Some(to_send) = self.pop_at_most_pending_replies(&target, max_to_clear) {
|
||||
let to_send_clone = to_send.clone();
|
||||
|
||||
if to_send_clone.is_empty() {
|
||||
panic!(
|
||||
"please let the devs know if you ever see this message (reply_controller.rs)"
|
||||
);
|
||||
}
|
||||
|
||||
let (surbs_for_reply, _) = self
|
||||
.surbs_storage
|
||||
.get_reply_surbs(&target, to_send_clone.len());
|
||||
|
||||
let Some(surbs_for_reply) = surbs_for_reply else {
|
||||
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
|
||||
self.re_insert_pending_replies(&target, to_send);
|
||||
return;
|
||||
};
|
||||
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_reply_chunks(target, to_send_clone, surbs_for_reply)
|
||||
.await
|
||||
{
|
||||
let err = err.return_unused_surbs(&self.surbs_storage, &target);
|
||||
self.re_insert_pending_replies(&target, to_send);
|
||||
warn!("failed to clear pending queue for {target}: {err}");
|
||||
}
|
||||
} else {
|
||||
trace!("the pending queue is empty");
|
||||
}
|
||||
}
|
||||
|
||||
fn reset_rerequest_counter(&mut self, from: &AnonymousSenderTag) {
|
||||
if let Some(pending) = self.surb_senders.get_mut(from) {
|
||||
pending.reset_current_clear_rerequest_counter()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_received_surbs(
|
||||
&mut self,
|
||||
from: AnonymousSenderTag,
|
||||
reply_surbs: Vec<ReplySurbWithKeyRotation>,
|
||||
from_surb_request: bool,
|
||||
) {
|
||||
trace!("handling received surbs");
|
||||
|
||||
// clear the requesting flag since we should have been asking for surbs
|
||||
if from_surb_request {
|
||||
self.surbs_storage
|
||||
.decrement_pending_reception(&from, reply_surbs.len() as u32);
|
||||
}
|
||||
|
||||
// store received surbs
|
||||
self.surbs_storage.insert_fresh_surbs(&from, reply_surbs);
|
||||
|
||||
// reset, if applicable, request counter
|
||||
self.reset_rerequest_counter(&from);
|
||||
|
||||
// use as many as we can for clearing pending retransmission queue
|
||||
self.try_clear_pending_retransmission(from).await;
|
||||
|
||||
// use as many as we can for clearing pending 'normal' queue
|
||||
self.try_clear_pending_queue(from).await;
|
||||
|
||||
// if we have to, request more
|
||||
if self.should_request_more_surbs(&from) {
|
||||
self.request_reply_surbs_for_queue_clearing(from).await;
|
||||
}
|
||||
}
|
||||
fn buffer_pending_ack(
|
||||
&mut self,
|
||||
recipient: AnonymousSenderTag,
|
||||
ack_ref: Arc<PendingAcknowledgement>,
|
||||
weak_ack_ref: Weak<PendingAcknowledgement>,
|
||||
) {
|
||||
let frag_id = ack_ref.inner_fragment_identifier();
|
||||
|
||||
let pending = self.surb_senders.entry(recipient).or_default();
|
||||
if let Entry::Vacant(e) = pending.pending_retransmissions.entry(frag_id) {
|
||||
e.insert(weak_ack_ref);
|
||||
} else {
|
||||
warn!(
|
||||
"we're already trying to retransmit {frag_id}. We must be really behind in surbs!"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_reply_retransmission(
|
||||
&mut self,
|
||||
recipient_tag: AnonymousSenderTag,
|
||||
timed_out_ack: Weak<PendingAcknowledgement>,
|
||||
extra_surbs_request: bool,
|
||||
) {
|
||||
// seems we got the ack in the end
|
||||
let ack_ref = match timed_out_ack.upgrade() {
|
||||
Some(ack) => ack,
|
||||
None => {
|
||||
debug!("we received the ack for one of the reply packets as we were putting it in the retransmission queue");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// if this is retransmission for obtaining additional reply surbs,
|
||||
// we can dip below the storage threshold
|
||||
let (maybe_reply_surb, _) = if extra_surbs_request {
|
||||
self.surbs_storage
|
||||
.get_reply_surb_ignoring_threshold(&recipient_tag)
|
||||
} else {
|
||||
self.surbs_storage.get_reply_surb(&recipient_tag)
|
||||
};
|
||||
|
||||
if let Some(reply_surb) = maybe_reply_surb {
|
||||
match self
|
||||
.message_handler
|
||||
.try_prepare_single_reply_chunk_for_sending(reply_surb, ack_ref.fragment_data())
|
||||
.await
|
||||
{
|
||||
Ok(prepared) => {
|
||||
// drop the ack ref so that controller would not panic on `UpdateTimer` if that task
|
||||
// got to handle the action before this function terminated (which is very much
|
||||
// possible if `forward_messages` takes a while)
|
||||
drop(ack_ref);
|
||||
|
||||
self.message_handler
|
||||
.update_ack_delay(prepared.fragment_identifier, prepared.total_delay);
|
||||
self.message_handler
|
||||
.forward_messages(vec![prepared.into()], TransmissionLane::Retransmission)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let err = err.return_unused_surbs(&self.surbs_storage, &recipient_tag);
|
||||
warn!("failed to prepare message for retransmission - {err}");
|
||||
// we buffer that packet and to try another day
|
||||
self.buffer_pending_ack(recipient_tag, ack_ref, timed_out_ack);
|
||||
|
||||
if self.should_request_more_surbs(&recipient_tag) {
|
||||
self.request_reply_surbs_for_queue_clearing(recipient_tag)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
};
|
||||
} else {
|
||||
self.buffer_pending_ack(recipient_tag, ack_ref, timed_out_ack);
|
||||
|
||||
if self.should_request_more_surbs(&recipient_tag) {
|
||||
self.request_reply_surbs_for_queue_clearing(recipient_tag)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// to be honest this doesn't make a lot of sense in the context of `connection_id`,
|
||||
// it should really be asked per tag
|
||||
pub(crate) fn handle_lane_queue_length(
|
||||
&self,
|
||||
connection_id: ConnectionId,
|
||||
response_channel: oneshot::Sender<usize>,
|
||||
) {
|
||||
// TODO: if we ever have duplicate ids for different senders, it means our rng is super weak
|
||||
// thus I don't think we have to worry about it?
|
||||
let lane = TransmissionLane::ConnectionId(connection_id);
|
||||
for buf in self.surb_senders.values().map(|p| &p.pending_replies) {
|
||||
if let Some(length) = buf.lane_length(&lane) {
|
||||
if response_channel.send(length).is_err() {
|
||||
error!("the requester for lane queue length has dropped the response channel!")
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
// make sure that if we didn't find that lane, we reply with 0
|
||||
if response_channel.send(0).is_err() {
|
||||
error!("the requester for lane queue length has dropped the response channel!")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: modify this method to more accurately determine the amount of surbs it needs to request
|
||||
// it should take into consideration the average latency, sending rate and queue size.
|
||||
// it should request as many surbs as it takes to saturate its sending rate before next batch arrives
|
||||
async fn request_reply_surbs_for_queue_clearing(&mut self, target: AnonymousSenderTag) {
|
||||
trace!("requesting surbs for queue clearing");
|
||||
|
||||
let total_queue = self
|
||||
.surb_senders
|
||||
.get(&target)
|
||||
.map(|pending| pending.total_pending() as u32)
|
||||
.unwrap_or_default();
|
||||
|
||||
let min_surbs_buffer = self.config.reply_surbs.minimum_reply_surb_threshold_buffer as u32;
|
||||
|
||||
// To proactively request additional surbs, we aim to have a buffer of extra surbs in our
|
||||
// storage.
|
||||
let total_queue_with_buffer = total_queue + min_surbs_buffer;
|
||||
|
||||
let request_size = min(
|
||||
self.config.reply_surbs.maximum_reply_surb_request_size,
|
||||
max(
|
||||
total_queue_with_buffer,
|
||||
self.config.reply_surbs.minimum_reply_surb_request_size,
|
||||
),
|
||||
);
|
||||
|
||||
if let Err(err) = self
|
||||
.request_additional_reply_surbs(target, request_size)
|
||||
.await
|
||||
{
|
||||
let now = OffsetDateTime::now_utc();
|
||||
let sender_info = self.get_or_create_surb_sender(&target);
|
||||
let last_failure = sender_info.reset_last_request_failure(now);
|
||||
|
||||
// only log at higher level if it's the first time this error has occurred in a while
|
||||
if now - last_failure > time::Duration::seconds(30) {
|
||||
warn!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
|
||||
} else {
|
||||
debug!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn inspect_stale_pending_data(&mut self) {
|
||||
let mut to_request = Vec::new();
|
||||
let mut to_remove = Vec::new();
|
||||
|
||||
let now = OffsetDateTime::now_utc();
|
||||
for (pending_reply_target, vals) in self.surb_senders.iter_mut() {
|
||||
// for now recreate old behaviour
|
||||
let retransmission_buf = &vals.pending_replies;
|
||||
|
||||
if retransmission_buf.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let Some(last_received_time) = self
|
||||
.surbs_storage
|
||||
.surbs_last_received_at(pending_reply_target)
|
||||
else {
|
||||
error!("we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!", retransmission_buf.total_size());
|
||||
to_remove.push(*pending_reply_target);
|
||||
continue;
|
||||
};
|
||||
|
||||
let diff = now - last_received_time;
|
||||
let max_rerequest_wait = self
|
||||
.config
|
||||
.reply_surbs
|
||||
.maximum_reply_surb_rerequest_waiting_period;
|
||||
let max_drop_wait = self
|
||||
.config
|
||||
.reply_surbs
|
||||
.maximum_reply_surb_drop_waiting_period;
|
||||
let max_rerequests = self.config.reply_surbs.maximum_reply_surbs_rerequests;
|
||||
|
||||
// if we have already requested extra surbs because of the stale entry,
|
||||
// don't do it again (otherwise we'll get stuck in a constant cycle of requesting more surbs
|
||||
// if client is offline)
|
||||
if vals.current_clear_rerequest_counter > max_rerequests {
|
||||
to_remove.push(*pending_reply_target);
|
||||
debug!("we have reached the maximum threshold of attempting to request surbs from {pending_reply_target}. dropping the sender");
|
||||
continue;
|
||||
}
|
||||
|
||||
if diff > max_rerequest_wait {
|
||||
if diff > max_drop_wait {
|
||||
to_remove.push(*pending_reply_target)
|
||||
} else {
|
||||
debug!("We haven't received any surbs in {} from {pending_reply_target}. Going to explicitly ask for more", humantime::format_duration(diff.unsigned_abs()));
|
||||
vals.increment_current_clear_rerequest_counter();
|
||||
to_request.push(*pending_reply_target);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for pending_reply_target in to_request {
|
||||
self.request_reply_surbs_for_queue_clearing(pending_reply_target)
|
||||
.await;
|
||||
self.surbs_storage
|
||||
.reset_pending_reception(&pending_reply_target)
|
||||
}
|
||||
for to_remove in to_remove {
|
||||
// TODO: in the 'old' version we just removed pending messages,
|
||||
// not retransmissions, but I think those should follow the same logic.
|
||||
// if something breaks because of that. I guess here is your explanation, future reader
|
||||
self.surb_senders.remove(&to_remove);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn check_surb_refresh(&mut self) {
|
||||
let Some(current_rotation_id) = self.topology_access.current_key_rotation_id().await else {
|
||||
warn!("failed to retrieve current key rotation id from the network topology");
|
||||
return;
|
||||
};
|
||||
|
||||
if let SurbRefreshState::WaitingForNextRotation { last_known } = self.surb_refresh_state {
|
||||
if last_known == current_rotation_id {
|
||||
trace!("no changes in key rotation id");
|
||||
} else {
|
||||
// key rotation actually changed and given the polling rate (1/8th epoch) we should have plenty
|
||||
// of time to perform the upgrade.
|
||||
// but wait for one more call before doing this so that the clients could also resync
|
||||
// their topologies and discover new rotation
|
||||
self.surb_refresh_state = SurbRefreshState::ScheduledForNextInvocation;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// here we are in `SurbRefreshState::ScheduledForNextInvocation` state
|
||||
|
||||
let mut marked_as_stale = HashMap::new();
|
||||
|
||||
// 1. mark all existing surbs we have as possibly stale
|
||||
for mut map_entry in self.surbs_storage.as_raw_iter_mut() {
|
||||
let (sender, received) = map_entry.pair_mut();
|
||||
let num_downgraded = received.downgrade_freshness();
|
||||
trace!("{sender}: {num_downgraded} downgraded");
|
||||
if num_downgraded != 0 {
|
||||
marked_as_stale.insert(*sender, num_downgraded);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. attempt to re-request the equivalent number of fresh surbs
|
||||
// TODO PROBLEM: if our request gets lost, we might be in trouble...
|
||||
// we need some sort of retry mechanism
|
||||
for (sender, num_to_request) in marked_as_stale {
|
||||
if self
|
||||
.request_additional_reply_surbs(sender, num_to_request as u32)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
warn!("surb refresh request failed")
|
||||
}
|
||||
}
|
||||
|
||||
self.surb_refresh_state = SurbRefreshState::WaitingForNextRotation {
|
||||
last_known: current_rotation_id,
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) async fn inspect_and_clear_stale_data(&mut self, now: OffsetDateTime) {
|
||||
// technically we don't know if epoch is stuck, but we're flying in blind here,
|
||||
// so we have to assume the worst and not purge anything depending on proper epoch progression
|
||||
let is_epoch_stuck = self
|
||||
.current_topology_metadata()
|
||||
.await
|
||||
.map(|m| self.config.key_rotation.epoch_stuck(m))
|
||||
.unwrap_or(false);
|
||||
|
||||
// expected time of when the CURRENT key rotation has begun
|
||||
let expected_current_key_rotation_start = self
|
||||
.config
|
||||
.key_rotation
|
||||
.expected_current_key_rotation_start(now);
|
||||
|
||||
// expected ID of the CURRENT key rotation
|
||||
let expected_current_key_rotation = self
|
||||
.config
|
||||
.key_rotation
|
||||
.expected_current_key_rotation_id(now);
|
||||
|
||||
// time of the start of one epoch BEFORE the CURRENT rotation has begun
|
||||
// this indicates the starting time of when packets with the current keys might have been constructed
|
||||
let prior_epoch_start =
|
||||
expected_current_key_rotation_start - self.config.key_rotation.epoch_duration;
|
||||
|
||||
// time of the start of one epoch AFTER the current rotation has begun
|
||||
// this indicates the end of transition period and any packets constructed with keys different
|
||||
// from the current one are definitely invalid
|
||||
let following_epoch_start =
|
||||
expected_current_key_rotation_start + self.config.key_rotation.epoch_duration;
|
||||
|
||||
// define a closure for validating individual surbs
|
||||
// (we have to run it twice for different piles)
|
||||
let basic_surb_retention_logic = |received_surb: &ReceivedReplySurb| {
|
||||
if is_epoch_stuck {
|
||||
let diff = now - received_surb.received_at();
|
||||
return diff < self.config.key_rotation.rotation_lifetime();
|
||||
}
|
||||
|
||||
if received_surb.received_at() < prior_epoch_start {
|
||||
// it's definitely from previous rotation
|
||||
return false;
|
||||
}
|
||||
let surb_rotation = received_surb.key_rotation();
|
||||
|
||||
if surb_rotation.is_unknown() {
|
||||
// can't do anything, so just retain it
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO: will this backfire during transition period where we need surbs to refresh surbs
|
||||
// and we failed to send a request?
|
||||
if surb_rotation.is_even() && expected_current_key_rotation % 2 == 1 {
|
||||
return false;
|
||||
}
|
||||
|
||||
if surb_rotation.is_odd() && expected_current_key_rotation % 2 == 0 {
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
};
|
||||
|
||||
// 1. purge full old clients data (this applies to RECEIVER)
|
||||
self.surbs_storage.retain(|_, received| {
|
||||
if is_epoch_stuck {
|
||||
// if epoch is stuck, we can't do much (because we don't know for certain if rotation has advanced)
|
||||
// apart from the basic check of surbs being received more than maximum lifetime of a rotation
|
||||
// because at that point we know they must be invalid
|
||||
let diff = now - received.surbs_last_received_at();
|
||||
return diff < self.config.key_rotation.rotation_lifetime();
|
||||
}
|
||||
|
||||
// if surbs were received more than 1h before the start of the current rotation,
|
||||
// they're DEFINITELY invalid.
|
||||
// if it was up until 1h AFTER the start of the current rotation they MIGHT be valid -
|
||||
// we don't know for sure, unless the client explicitly attached rotation information
|
||||
// (which only applies to more recent versions of clients so we can't 100% rely on that)
|
||||
if received.surbs_last_received_at() < prior_epoch_start {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 1.1. check individual surbs (same basic logic applies)
|
||||
received.retain_fresh_surbs(&basic_surb_retention_logic);
|
||||
|
||||
// 1.2. check the possibly stale entries
|
||||
// 1.2.1. check if we're beyond the key rotation transition period,
|
||||
// if so those surbs are definitely unusable
|
||||
if now > following_epoch_start {
|
||||
received.drop_possibly_stale_surbs();
|
||||
}
|
||||
|
||||
// 1.2.2. otherwise continue with the same logic as the fresh ones
|
||||
received.retain_possibly_stale_surbs(&basic_surb_retention_logic);
|
||||
|
||||
// no surbs left, we're not expecting any AND we haven't received anything in a while
|
||||
// (i.e. sender probably abandoned us)
|
||||
let max_drop_wait = self
|
||||
.config
|
||||
.reply_surbs
|
||||
.maximum_reply_surb_drop_waiting_period;
|
||||
let last_received = received.surbs_last_received_at();
|
||||
|
||||
let possibly_abandoned = last_received + max_drop_wait < now;
|
||||
if received.is_empty() && received.pending_reception() == 0 && possibly_abandoned {
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
});
|
||||
|
||||
// 1.3 inspect old unavailable receivers to clear any stale data
|
||||
self.unavailable
|
||||
.retain(|_, last_reported| now - *last_reported < time::Duration::seconds(30));
|
||||
}
|
||||
}
|
||||
@@ -3,12 +3,12 @@
|
||||
|
||||
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use log::error;
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
|
||||
use nym_task::connections::{ConnectionId, TransmissionLane};
|
||||
use std::sync::Weak;
|
||||
use tracing::error;
|
||||
|
||||
pub(crate) fn new_control_channels() -> (ReplyControllerSender, ReplyControllerReceiver) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::client::real_messages_control::message_handler::MessageHandler;
|
||||
use crate::client::replies::reply_controller::Config;
|
||||
use nym_client_core_surb_storage::{CombinedReplyStorage, SentReplyKeys, UsedSenderTags};
|
||||
use nym_crypto::aes::cipher::crypto_common::rand_core::CryptoRng;
|
||||
use nym_sphinx::addressing::Recipient;
|
||||
use rand::Rng;
|
||||
use std::cmp::min;
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
/// Reply controller responsible for controlling sender-related part
|
||||
/// of replies, such as checking if any reply keys are stale
|
||||
pub struct SenderReplyController<R> {
|
||||
config: Config,
|
||||
|
||||
tags_storage: UsedSenderTags,
|
||||
sent_reply_keys: SentReplyKeys,
|
||||
message_handler: MessageHandler<R>,
|
||||
}
|
||||
|
||||
impl<R> SenderReplyController<R>
|
||||
where
|
||||
R: CryptoRng + Rng,
|
||||
{
|
||||
pub(crate) fn new(
|
||||
config: Config,
|
||||
storage: &CombinedReplyStorage,
|
||||
message_handler: MessageHandler<R>,
|
||||
) -> Self {
|
||||
SenderReplyController {
|
||||
config,
|
||||
tags_storage: storage.tags_storage(),
|
||||
sent_reply_keys: storage.key_storage(),
|
||||
message_handler,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_surb_request(&mut self, recipient: Recipient, mut amount: u32) {
|
||||
// 1. check whether we sent any surbs in the past to this recipient, otherwise
|
||||
// they have no business in asking for more
|
||||
if !self.tags_storage.exists(&recipient) {
|
||||
warn!("{recipient} asked us for reply SURBs even though we never sent them any anonymous messages before!");
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. check whether the requested amount is within sane range
|
||||
if amount
|
||||
> self
|
||||
.config
|
||||
.reply_surbs
|
||||
.maximum_allowed_reply_surb_request_size
|
||||
{
|
||||
warn!("The requested reply surb amount is larger than our maximum allowed ({amount} > {}). Lowering it to a more sane value...", self.config.reply_surbs.maximum_allowed_reply_surb_request_size);
|
||||
amount = self
|
||||
.config
|
||||
.reply_surbs
|
||||
.maximum_allowed_reply_surb_request_size;
|
||||
}
|
||||
|
||||
// 3. construct and send the surbs away
|
||||
// (send them in smaller batches to make the experience a bit smoother
|
||||
let mut remaining = amount;
|
||||
while remaining > 0 {
|
||||
let to_send = min(remaining, 100);
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_additional_reply_surbs(
|
||||
recipient,
|
||||
to_send,
|
||||
nym_sphinx::params::PacketType::Mix,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("failed to send additional surbs to {recipient} - {err}");
|
||||
} else {
|
||||
trace!("sent {to_send} reply SURBs to {recipient}");
|
||||
}
|
||||
|
||||
remaining -= to_send;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn inspect_and_clear_stale_data(&self, now: OffsetDateTime) {
|
||||
// check reply keys (this applies to SENDER)
|
||||
self.sent_reply_keys.retain(|_, reply_key| {
|
||||
let diff = now - reply_key.sent_at;
|
||||
if diff > self.config.reply_surbs.maximum_reply_key_age {
|
||||
let std_diff = Duration::try_from(diff).unwrap_or_default();
|
||||
let diff_formatted = humantime::format_duration(std_diff);
|
||||
debug!("it's been {diff_formatted} since we created this reply key. it's probably never going to get used, so we're going to purge it...");
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -93,14 +93,14 @@ impl StatisticsControl {
|
||||
None,
|
||||
);
|
||||
if let Err(err) = self.report_tx.send(report_message).await {
|
||||
log::error!("Failed to report client stats: {:?}", err);
|
||||
tracing::error!("Failed to report client stats: {err:?}");
|
||||
} else {
|
||||
self.stats.reset();
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(&mut self) {
|
||||
log::debug!("Started StatisticsControl with graceful shutdown support");
|
||||
tracing::debug!("Started StatisticsControl with graceful shutdown support");
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let mut stats_report_interval = tokio_stream::wrappers::IntervalStream::new(
|
||||
@@ -133,13 +133,13 @@ impl StatisticsControl {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("StatisticsControl: Received shutdown");
|
||||
tracing::trace!("StatisticsControl: Received shutdown");
|
||||
break;
|
||||
},
|
||||
stats_event = self.stats_rx.recv() => match stats_event {
|
||||
Some(stats_event) => self.stats.handle_event(stats_event),
|
||||
None => {
|
||||
log::trace!("StatisticsControl: shutting down due to closed stats channel");
|
||||
tracing::trace!("StatisticsControl: shutting down due to closed stats channel");
|
||||
break;
|
||||
}
|
||||
},
|
||||
@@ -161,7 +161,7 @@ impl StatisticsControl {
|
||||
}
|
||||
}
|
||||
}
|
||||
log::debug!("StatisticsControl: Exiting");
|
||||
tracing::debug!("StatisticsControl: Exiting");
|
||||
}
|
||||
|
||||
pub(crate) fn start(mut self) {
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_sphinx::addressing::clients::Recipient;
|
||||
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError};
|
||||
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError, NymTopologyMetadata};
|
||||
use nym_validator_client::models::KeyRotationId;
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
@@ -134,6 +135,21 @@ impl TopologyAccessor {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn current_mixnet_epoch_id(&self) -> Option<u32> {
|
||||
let route_provider = self.current_route_provider().await?;
|
||||
Some(route_provider.absolute_epoch_id())
|
||||
}
|
||||
|
||||
pub async fn current_key_rotation_id(&self) -> Option<KeyRotationId> {
|
||||
let route_provider = self.current_route_provider().await?;
|
||||
Some(route_provider.current_key_rotation())
|
||||
}
|
||||
|
||||
pub async fn current_metadata(&self) -> Option<NymTopologyMetadata> {
|
||||
let route_provider = self.current_route_provider().await?;
|
||||
Some(route_provider.metadata())
|
||||
}
|
||||
|
||||
pub async fn manually_change_topology(&self, new_topology: NymTopology) {
|
||||
self.inner.controlled_manually.store(true, Ordering::SeqCst);
|
||||
self.inner.update(Some(new_topology)).await;
|
||||
|
||||
@@ -4,11 +4,11 @@
|
||||
use crate::spawn_future;
|
||||
pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_sphinx::addressing::nodes::NodeIdentity;
|
||||
use nym_task::TaskClient;
|
||||
use nym_topology::NymTopologyError;
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio::time::sleep;
|
||||
@@ -20,7 +20,7 @@ mod accessor;
|
||||
pub mod nym_api_provider;
|
||||
|
||||
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
|
||||
pub use nym_topology::provider_trait::TopologyProvider;
|
||||
pub use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
|
||||
|
||||
// TODO: move it to config later
|
||||
const MAX_FAILURE_COUNT: usize = 10;
|
||||
@@ -169,12 +169,12 @@ impl TopologyRefresher {
|
||||
self.try_refresh().await;
|
||||
},
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("TopologyRefresher: Received shutdown");
|
||||
tracing::trace!("TopologyRefresher: Received shutdown");
|
||||
},
|
||||
}
|
||||
}
|
||||
self.task_client.recv_timeout().await;
|
||||
log::debug!("TopologyRefresher: Exiting");
|
||||
tracing::debug!("TopologyRefresher: Exiting");
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,13 +2,12 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, error, warn};
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_topology::{NymTopology, NymTopologyMetadata};
|
||||
use nym_validator_client::UserAgent;
|
||||
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
|
||||
use nym_topology::NymTopology;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::cmp::min;
|
||||
use tracing::{debug, error, warn};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -49,18 +48,10 @@ impl NymApiTopologyProvider {
|
||||
pub fn new(
|
||||
config: impl Into<Config>,
|
||||
mut nym_api_urls: Vec<Url>,
|
||||
user_agent: Option<UserAgent>,
|
||||
mut validator_client: nym_validator_client::client::NymApiClient,
|
||||
) -> Self {
|
||||
nym_api_urls.shuffle(&mut thread_rng());
|
||||
|
||||
let validator_client = if let Some(user_agent) = user_agent {
|
||||
nym_validator_client::client::NymApiClient::new_with_user_agent(
|
||||
nym_api_urls[0].clone(),
|
||||
user_agent,
|
||||
)
|
||||
} else {
|
||||
nym_validator_client::client::NymApiClient::new(nym_api_urls[0].clone())
|
||||
};
|
||||
validator_client.change_nym_api(nym_api_urls[0].clone());
|
||||
|
||||
NymApiTopologyProvider {
|
||||
config: config.into(),
|
||||
@@ -108,12 +99,8 @@ impl NymApiTopologyProvider {
|
||||
.filter(|n| n.performance.round_to_integer() >= self.config.min_node_performance())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
NymTopology::new(
|
||||
NymTopologyMetadata::new(metadata.rotation_id, metadata.absolute_epoch_id),
|
||||
rewarded_set,
|
||||
Vec::new(),
|
||||
)
|
||||
.with_skimmed_nodes(&nodes_filtered)
|
||||
NymTopology::new(metadata.to_topology_metadata(), rewarded_set, Vec::new())
|
||||
.with_skimmed_nodes(&nodes_filtered)
|
||||
} else {
|
||||
// if we're not using extended topology, we're only getting active set mixnodes and gateways
|
||||
|
||||
@@ -124,7 +111,7 @@ impl NymApiTopologyProvider {
|
||||
// TODO: we really should be getting ACTIVE gateways only
|
||||
let gateways_fut = self
|
||||
.validator_client
|
||||
.get_all_basic_entry_assigned_nodes_v2();
|
||||
.get_all_basic_entry_assigned_nodes_with_metadata();
|
||||
|
||||
let (rewarded_set, mixnodes_res, gateways_res) =
|
||||
futures::try_join!(rewarded_set_fut, mixnodes_fut, gateways_fut)
|
||||
@@ -136,7 +123,7 @@ impl NymApiTopologyProvider {
|
||||
let metadata = mixnodes_res.metadata;
|
||||
let mixnodes = mixnodes_res.nodes;
|
||||
|
||||
if gateways_res.metadata != metadata {
|
||||
if !gateways_res.metadata.consistency_check(&metadata) {
|
||||
warn!("inconsistent nodes metadata between mixnodes and gateways calls! {metadata:?} and {:?}", gateways_res.metadata);
|
||||
return None;
|
||||
}
|
||||
@@ -161,12 +148,8 @@ impl NymApiTopologyProvider {
|
||||
}
|
||||
}
|
||||
|
||||
NymTopology::new(
|
||||
NymTopologyMetadata::new(metadata.rotation_id, metadata.absolute_epoch_id),
|
||||
rewarded_set,
|
||||
Vec::new(),
|
||||
)
|
||||
.with_skimmed_nodes(&nodes)
|
||||
NymTopology::new(metadata.to_topology_metadata(), rewarded_set, Vec::new())
|
||||
.with_skimmed_nodes(&nodes)
|
||||
};
|
||||
|
||||
if !topology.is_minimally_routable() {
|
||||
|
||||
@@ -36,11 +36,18 @@ impl SizedData for Fragment {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TransmissionBuffer<T> {
|
||||
buffer: HashMap<TransmissionLane, LaneBufferEntry<T>>,
|
||||
}
|
||||
|
||||
impl<T> Default for TransmissionBuffer<T> {
|
||||
fn default() -> Self {
|
||||
TransmissionBuffer {
|
||||
buffer: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TransmissionBuffer<T> {
|
||||
pub(crate) fn new() -> Self {
|
||||
TransmissionBuffer {
|
||||
@@ -211,7 +218,7 @@ impl<T> TransmissionBuffer<T> {
|
||||
};
|
||||
|
||||
let msg = self.pop_front_from_lane(&lane)?;
|
||||
log::trace!("picking to send from lane: {:?}", lane);
|
||||
tracing::trace!("picking to send from lane: {lane:?}");
|
||||
Some((lane, msg))
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
|
||||
use nym_gateway_client::error::GatewayClientError;
|
||||
use nym_topology::node::RoutingNodeError;
|
||||
use nym_topology::{NodeId, NymTopologyError};
|
||||
use nym_validator_client::nym_api::error::NymAPIError;
|
||||
use nym_validator_client::ValidatorClientError;
|
||||
use std::error::Error;
|
||||
use std::path::PathBuf;
|
||||
@@ -52,7 +53,15 @@ pub enum ClientCoreError {
|
||||
#[error("list of nym apis is empty")]
|
||||
ListOfNymApisIsEmpty,
|
||||
|
||||
#[error("the current network topology seem to be insufficient to route any packets through")]
|
||||
#[error("failed to resolve a query to nym API: {source}")]
|
||||
NymApiQueryFailure {
|
||||
#[from]
|
||||
source: NymAPIError,
|
||||
},
|
||||
|
||||
#[error(
|
||||
"the current network topology seem to be insufficient to route any packets through:\n\t{0}"
|
||||
)]
|
||||
InsufficientNetworkTopology(#[from] NymTopologyError),
|
||||
|
||||
#[error("experienced a failure with our reply surb persistent storage: {source}")]
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
use crate::error::ClientCoreError;
|
||||
use crate::init::types::RegistrationResult;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use log::{debug, info, trace, warn};
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use nym_gateway_client::GatewayClient;
|
||||
use nym_topology::node::RoutingNode;
|
||||
@@ -14,6 +13,7 @@ use rand::{seq::SliceRandom, Rng};
|
||||
#[cfg(unix)]
|
||||
use std::os::fd::RawFd;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tungstenite::Message;
|
||||
use url::Url;
|
||||
|
||||
@@ -105,12 +105,15 @@ pub async fn gateways_for_init<R: Rng>(
|
||||
nym_validator_client::client::NymApiClient::new(nym_api.clone())
|
||||
};
|
||||
|
||||
log::debug!("Fetching list of gateways from: {nym_api}");
|
||||
tracing::debug!("Fetching list of gateways from: {nym_api}");
|
||||
|
||||
let gateways = client.get_all_basic_entry_assigned_nodes_v2().await?.nodes;
|
||||
let gateways = client
|
||||
.get_all_basic_entry_assigned_nodes_with_metadata()
|
||||
.await?
|
||||
.nodes;
|
||||
info!("nym api reports {} gateways", gateways.len());
|
||||
|
||||
log::trace!("Gateways: {:#?}", gateways);
|
||||
tracing::trace!("Gateways: {gateways:#?}");
|
||||
|
||||
// filter out gateways below minimum performance and ones that could operate as a mixnode
|
||||
// (we don't want instability)
|
||||
@@ -120,10 +123,10 @@ pub async fn gateways_for_init<R: Rng>(
|
||||
.filter(|g| g.performance.round_to_integer() >= minimum_performance)
|
||||
.filter_map(|gateway| gateway.try_into().ok())
|
||||
.collect::<Vec<_>>();
|
||||
log::debug!("After checking validity: {}", valid_gateways.len());
|
||||
log::trace!("Valid gateways: {:#?}", valid_gateways);
|
||||
tracing::debug!("After checking validity: {}", valid_gateways.len());
|
||||
tracing::trace!("Valid gateways: {valid_gateways:#?}");
|
||||
|
||||
log::info!(
|
||||
tracing::info!(
|
||||
"and {} after validity and performance filtering",
|
||||
valid_gateways.len()
|
||||
);
|
||||
@@ -286,7 +289,7 @@ pub(super) fn get_specified_gateway(
|
||||
gateways: &[RoutingNode],
|
||||
must_use_tls: bool,
|
||||
) -> Result<RoutingNode, ClientCoreError> {
|
||||
log::debug!("Requesting specified gateway: {}", gateway_identity);
|
||||
tracing::debug!("Requesting specified gateway: {gateway_identity}");
|
||||
let user_gateway = ed25519::PublicKey::from_base58_string(gateway_identity)
|
||||
.map_err(ClientCoreError::UnableToCreatePublicKeyFromGatewayId)?;
|
||||
|
||||
@@ -326,7 +329,7 @@ pub(super) async fn register_with_gateway(
|
||||
);
|
||||
|
||||
gateway_client.establish_connection().await.map_err(|err| {
|
||||
log::warn!("Failed to establish connection with gateway!");
|
||||
tracing::warn!("Failed to establish connection with gateway!");
|
||||
ClientCoreError::GatewayClientError {
|
||||
gateway_id: gateway_id.to_base58_string(),
|
||||
source: Box::new(err),
|
||||
@@ -336,7 +339,7 @@ pub(super) async fn register_with_gateway(
|
||||
.perform_initial_authentication()
|
||||
.await
|
||||
.map_err(|err| {
|
||||
log::warn!("Failed to register with the gateway {gateway_id}: {err}");
|
||||
tracing::warn!("Failed to register with the gateway {gateway_id}: {err}");
|
||||
ClientCoreError::GatewayClientError {
|
||||
gateway_id: gateway_id.to_base58_string(),
|
||||
source: Box::new(err),
|
||||
|
||||
@@ -63,7 +63,7 @@ where
|
||||
K::StorageError: Send + Sync + 'static,
|
||||
D::StorageError: Send + Sync + 'static,
|
||||
{
|
||||
log::trace!("Setting up new gateway");
|
||||
tracing::trace!("Setting up new gateway");
|
||||
|
||||
// if we're setting up new gateway, we must have had generated long-term client keys before
|
||||
let client_keys = load_client_keys(key_store).await?;
|
||||
@@ -202,10 +202,10 @@ where
|
||||
K::StorageError: Send + Sync + 'static,
|
||||
D::StorageError: Send + Sync + 'static,
|
||||
{
|
||||
log::debug!("Setting up gateway");
|
||||
tracing::debug!("Setting up gateway");
|
||||
match setup {
|
||||
GatewaySetup::MustLoad { gateway_id } => {
|
||||
log::debug!("GatewaySetup::MustLoad with id: {gateway_id:?}");
|
||||
tracing::debug!("GatewaySetup::MustLoad with id: {gateway_id:?}");
|
||||
use_loaded_gateway_details(key_store, details_store, gateway_id).await
|
||||
}
|
||||
GatewaySetup::New {
|
||||
@@ -214,7 +214,7 @@ where
|
||||
#[cfg(unix)]
|
||||
connection_fd_callback,
|
||||
} => {
|
||||
log::debug!("GatewaySetup::New with spec: {specification:?}");
|
||||
tracing::debug!("GatewaySetup::New with spec: {specification:?}");
|
||||
setup_new_gateway(
|
||||
key_store,
|
||||
details_store,
|
||||
@@ -230,7 +230,7 @@ where
|
||||
gateway_details,
|
||||
client_keys: managed_keys,
|
||||
} => {
|
||||
log::debug!("GatewaySetup::ReuseConnection");
|
||||
tracing::debug!("GatewaySetup::ReuseConnection");
|
||||
Ok(reuse_gateway_connection(
|
||||
*authenticated_ephemeral_client,
|
||||
*gateway_details,
|
||||
|
||||
@@ -9,7 +9,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
dashmap.workspace = true
|
||||
log.workspace = true
|
||||
tracing.workspace = true
|
||||
thiserror.workspace = true
|
||||
time.workspace = true
|
||||
|
||||
@@ -23,7 +23,7 @@ features = ["fs"]
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx]
|
||||
workspace = true
|
||||
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"]
|
||||
features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"]
|
||||
optional = true
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.sqlx-pool-guard]
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
-- change `previous_flush_timestamp` unix timestamp to `previous_flush` timestamp
|
||||
CREATE TABLE status_new
|
||||
(
|
||||
flush_in_progress INTEGER NOT NULL,
|
||||
previous_flush TIMESTAMP WITHOUT TIME ZONE NOT NULL,
|
||||
client_in_use INTEGER NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO status_new (flush_in_progress, previous_flush, client_in_use)
|
||||
SELECT flush_in_progress,
|
||||
datetime(previous_flush_timestamp, 'unixepoch') AS previous_flush,
|
||||
client_in_use
|
||||
FROM status;
|
||||
|
||||
DROP TABLE status;
|
||||
ALTER TABLE status_new
|
||||
RENAME TO status;
|
||||
|
||||
|
||||
-- change `sent_at_timestamp` unix timestamp to `sent_at` timestamp
|
||||
CREATE TABLE reply_key_new
|
||||
(
|
||||
key_digest BLOB NOT NULL UNIQUE,
|
||||
reply_key BLOB NOT NULL UNIQUE,
|
||||
sent_at TIMESTAMP WITHOUT TIME ZONE NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO reply_key_new (key_digest, reply_key, sent_at)
|
||||
SELECT key_digest,
|
||||
reply_key,
|
||||
datetime(sent_at_timestamp, 'unixepoch') AS sent_at
|
||||
FROM reply_key;
|
||||
|
||||
DROP TABLE reply_key;
|
||||
ALTER TABLE reply_key_new
|
||||
RENAME TO reply_key;
|
||||
|
||||
|
||||
-- change `last_sent_timestamp` unix timestamp to `sent_at` last_sent
|
||||
CREATE TABLE reply_surb_sender_new
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
last_sent TIMESTAMP WITHOUT TIME ZONE NOT NULL,
|
||||
tag BLOB NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
INSERT INTO reply_surb_sender_new (id, last_sent, tag)
|
||||
SELECT id,
|
||||
datetime(last_sent_timestamp, 'unixepoch') AS last_sent,
|
||||
tag
|
||||
FROM reply_surb_sender;
|
||||
|
||||
|
||||
-- recreate `reply_surb` table due to foreign key constraint
|
||||
CREATE TABLE reply_surb_new
|
||||
(
|
||||
reply_surb_sender_id INTEGER NOT NULL,
|
||||
reply_surb BLOB NOT NULL,
|
||||
encoded_key_rotation TINYINT NOT NULL,
|
||||
|
||||
FOREIGN KEY (reply_surb_sender_id) REFERENCES reply_surb_sender_new (id)
|
||||
);
|
||||
|
||||
INSERT INTO reply_surb_new
|
||||
SELECT *
|
||||
FROM reply_surb;
|
||||
|
||||
DROP TABLE reply_surb;
|
||||
ALTER TABLE reply_surb_new
|
||||
RENAME TO reply_surb;
|
||||
|
||||
DROP TABLE reply_surb_sender;
|
||||
ALTER TABLE reply_surb_sender_new
|
||||
RENAME TO reply_surb_sender;
|
||||
|
||||
|
||||
+12
@@ -0,0 +1,12 @@
|
||||
/*
|
||||
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
-- don't persist sender_tag in the DB. instead generate fresh one on each restart
|
||||
-- this will:
|
||||
-- A) further help against correlation attacks
|
||||
-- B) realistically after client restarts, we might be in new key rotation anyway meaning receiver would have to start
|
||||
-- "from scratch" with surbs
|
||||
|
||||
DROP TABLE sender_tag;
|
||||
@@ -4,6 +4,7 @@
|
||||
use crate::backend::Empty;
|
||||
use crate::{CombinedReplyStorage, ReplyStorageBackend};
|
||||
use async_trait::async_trait;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
// well, right now we don't have the browser storage : (
|
||||
// so we keep everything in memory
|
||||
@@ -38,7 +39,10 @@ impl ReplyStorageBackend for Backend {
|
||||
self.empty.init_fresh(fresh).await
|
||||
}
|
||||
|
||||
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError> {
|
||||
self.empty.load_surb_storage().await
|
||||
async fn load_surb_storage(
|
||||
&self,
|
||||
surb_freshness_cutoff: OffsetDateTime,
|
||||
) -> Result<CombinedReplyStorage, Self::StorageError> {
|
||||
self.empty.load_surb_storage(surb_freshness_cutoff).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,17 +3,15 @@
|
||||
|
||||
use crate::backend::fs_backend::{
|
||||
error::StorageError,
|
||||
models::{
|
||||
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag,
|
||||
StoredSurbSender,
|
||||
},
|
||||
models::{ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSurbSender},
|
||||
};
|
||||
use log::{error, info};
|
||||
use sqlx::{
|
||||
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
|
||||
ConnectOptions,
|
||||
};
|
||||
use std::path::Path;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{error, info};
|
||||
|
||||
use sqlx_pool_guard::SqlitePoolGuard;
|
||||
|
||||
@@ -81,9 +79,11 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
pub async fn create_status_table(&self) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("INSERT INTO status(flush_in_progress, previous_flush_timestamp, client_in_use) VALUES (0, 0, 1)")
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
sqlx::query!(
|
||||
"INSERT INTO status(flush_in_progress, previous_flush, client_in_use) VALUES (0, 0, 1)"
|
||||
)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -94,18 +94,18 @@ impl StorageManager {
|
||||
.map(|r| r.flush_in_progress > 0)
|
||||
}
|
||||
|
||||
pub async fn set_previous_flush_timestamp(&self, timestamp: i64) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("UPDATE status SET previous_flush_timestamp = ?", timestamp)
|
||||
pub async fn set_previous_flush(&self, timestamp: OffsetDateTime) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("UPDATE status SET previous_flush = ?", timestamp)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_previous_flush_timestamp(&self) -> Result<i64, sqlx::Error> {
|
||||
sqlx::query!("SELECT previous_flush_timestamp FROM status;")
|
||||
pub async fn get_previous_flush_time(&self) -> Result<OffsetDateTime, sqlx::Error> {
|
||||
sqlx::query!(r#"SELECT previous_flush AS "previous_flush: OffsetDateTime" FROM status"#)
|
||||
.fetch_one(&*self.connection_pool)
|
||||
.await
|
||||
.map(|r| r.previous_flush_timestamp)
|
||||
.map(|r| r.previous_flush)
|
||||
}
|
||||
|
||||
pub async fn set_flush_status(&self, in_progress: bool) -> Result<(), sqlx::Error> {
|
||||
@@ -131,32 +131,6 @@ impl StorageManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_all_tags(&self) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("DELETE FROM sender_tag;")
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_tags(&self) -> Result<Vec<StoredSenderTag>, sqlx::Error> {
|
||||
sqlx::query_as!(StoredSenderTag, "SELECT * FROM sender_tag;",)
|
||||
.fetch_all(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn insert_tag(&self, stored_tag: StoredSenderTag) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO sender_tag(recipient, tag) VALUES (?, ?);
|
||||
"#,
|
||||
stored_tag.recipient,
|
||||
stored_tag.tag
|
||||
)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_all_reply_keys(&self) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!("DELETE FROM reply_key;")
|
||||
.execute(&*self.connection_pool)
|
||||
@@ -165,7 +139,7 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
pub async fn get_reply_keys(&self) -> Result<Vec<StoredReplyKey>, sqlx::Error> {
|
||||
sqlx::query_as!(StoredReplyKey, "SELECT * FROM reply_key;",)
|
||||
sqlx::query_as("SELECT * FROM reply_key;")
|
||||
.fetch_all(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
@@ -176,11 +150,11 @@ impl StorageManager {
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO reply_key(key_digest, reply_key, sent_at_timestamp) VALUES (?, ?, ?);
|
||||
INSERT INTO reply_key(key_digest, reply_key, sent_at) VALUES (?, ?, ?);
|
||||
"#,
|
||||
stored_reply_key.key_digest,
|
||||
stored_reply_key.reply_key,
|
||||
stored_reply_key.sent_at_timestamp
|
||||
stored_reply_key.sent_at
|
||||
)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?;
|
||||
@@ -188,7 +162,7 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
pub async fn get_surb_senders(&self) -> Result<Vec<StoredSurbSender>, sqlx::Error> {
|
||||
sqlx::query_as!(StoredSurbSender, "SELECT * FROM reply_surb_sender;",)
|
||||
sqlx::query_as("SELECT * FROM reply_surb_sender;")
|
||||
.fetch_all(&*self.connection_pool)
|
||||
.await
|
||||
}
|
||||
@@ -199,10 +173,10 @@ impl StorageManager {
|
||||
) -> Result<i64, sqlx::Error> {
|
||||
let id = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO reply_surb_sender(tag, last_sent_timestamp) VALUES (?, ?);
|
||||
INSERT INTO reply_surb_sender(tag, last_sent) VALUES (?, ?);
|
||||
"#,
|
||||
stored_surb_sender.tag,
|
||||
stored_surb_sender.last_sent_timestamp
|
||||
stored_surb_sender.last_sent
|
||||
)
|
||||
.execute(&*self.connection_pool)
|
||||
.await?
|
||||
@@ -217,7 +191,7 @@ impl StorageManager {
|
||||
sqlx::query_as!(
|
||||
StoredReplySurb,
|
||||
r#"
|
||||
SELECT reply_surb_sender_id, reply_surb, encoded_key_rotation as "encoded_key_rotation: u8" FROM reply_surb
|
||||
SELECT reply_surb_sender_id, reply_surb, encoded_key_rotation as "encoded_key_rotation: u8" FROM reply_surb
|
||||
WHERE reply_surb_sender_id = ?
|
||||
"#,
|
||||
sender_id
|
||||
|
||||
@@ -4,20 +4,16 @@
|
||||
use crate::{
|
||||
backend::fs_backend::{
|
||||
manager::StorageManager,
|
||||
models::{
|
||||
ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSenderTag,
|
||||
StoredSurbSender,
|
||||
},
|
||||
models::{ReplySurbStorageMetadata, StoredReplyKey, StoredReplySurb, StoredSurbSender},
|
||||
},
|
||||
surb_storage::ReceivedReplySurbs,
|
||||
CombinedReplyStorage, ReceivedReplySurbsMap, ReplyStorageBackend, SentReplyKeys,
|
||||
UsedSenderTags,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, error, info, warn};
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use std::path::{Path, PathBuf};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
pub use self::error::StorageError;
|
||||
|
||||
@@ -57,10 +53,7 @@ impl Backend {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn try_load<P: AsRef<Path>>(
|
||||
database_path: P,
|
||||
fresh_sender_tags: bool,
|
||||
) -> Result<Self, StorageError> {
|
||||
pub async fn try_load<P: AsRef<Path>>(database_path: P) -> Result<Self, StorageError> {
|
||||
let owned_path: PathBuf = database_path.as_ref().into();
|
||||
if owned_path.file_name().is_none() {
|
||||
return Err(StorageError::DatabasePathWithoutFilename {
|
||||
@@ -69,7 +62,7 @@ impl Backend {
|
||||
}
|
||||
|
||||
let manager = StorageManager::init(database_path, false).await?;
|
||||
match Self::try_load_inner(&manager, fresh_sender_tags).await {
|
||||
match Self::try_load_inner(&manager).await {
|
||||
Ok(()) => Ok(Backend {
|
||||
temporary_old_path: None,
|
||||
database_path: owned_path,
|
||||
@@ -87,18 +80,15 @@ impl Backend {
|
||||
self.manager.close_pool().await
|
||||
}
|
||||
|
||||
async fn try_load_inner(
|
||||
manager: &StorageManager,
|
||||
fresh_sender_tags: bool,
|
||||
) -> Result<(), StorageError> {
|
||||
async fn try_load_inner(manager: &StorageManager) -> Result<(), StorageError> {
|
||||
// the database flush wasn't fully finished and thus the data is in inconsistent state
|
||||
// (we don't really know what's properly saved or what's not)
|
||||
if manager.get_flush_status().await? {
|
||||
return Err(StorageError::IncompleteDataFlush);
|
||||
}
|
||||
|
||||
let last_flush_timestamp = manager.get_previous_flush_timestamp().await?;
|
||||
if last_flush_timestamp == 0 {
|
||||
let last_flush = manager.get_previous_flush_time().await?;
|
||||
if last_flush == OffsetDateTime::UNIX_EPOCH {
|
||||
// either this client has been running since 1970 or the flush failed
|
||||
return Err(StorageError::IncompleteDataFlush);
|
||||
}
|
||||
@@ -118,15 +108,6 @@ impl Backend {
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
let last_flush = match OffsetDateTime::from_unix_timestamp(last_flush_timestamp) {
|
||||
Ok(last_flush) => last_flush,
|
||||
Err(err) => {
|
||||
return Err(StorageError::CorruptedData {
|
||||
details: format!("failed to parse stored timestamp - {err}"),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// in theory clients can use our reply surbs whenever they want, even a year in the future
|
||||
// (assuming no key rotation has happened)
|
||||
// but the way it's currently coded, everyone will purge old data
|
||||
@@ -144,14 +125,6 @@ impl Backend {
|
||||
manager.delete_all_reply_keys().await?;
|
||||
}
|
||||
|
||||
if days > 2 {
|
||||
info!("it's been over {days} days and {hours} hours since we last used our data store. our used sender tags are already outdated - we're going to purge them now.");
|
||||
manager.delete_all_tags().await?;
|
||||
} else if fresh_sender_tags {
|
||||
debug!("starting with fresh sender tags");
|
||||
manager.delete_all_tags().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -196,7 +169,7 @@ impl Backend {
|
||||
|
||||
async fn end_storage_flush(&self) -> Result<(), StorageError> {
|
||||
self.manager
|
||||
.set_previous_flush_timestamp(OffsetDateTime::now_utc().unix_timestamp())
|
||||
.set_previous_flush(OffsetDateTime::now_utc())
|
||||
.await?;
|
||||
Ok(self.manager.set_flush_status(false).await?)
|
||||
}
|
||||
@@ -209,29 +182,6 @@ impl Backend {
|
||||
Ok(self.manager.set_client_in_use_status(false).await?)
|
||||
}
|
||||
|
||||
async fn get_stored_tags(&self) -> Result<UsedSenderTags, StorageError> {
|
||||
let stored = self.manager.get_tags().await?;
|
||||
|
||||
// stop at the first instance of corruption. if even a single entry is malformed,
|
||||
// something weird has happened and we can't trust the rest of the data
|
||||
let raw = stored
|
||||
.into_iter()
|
||||
.map(TryInto::try_into)
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
Ok(UsedSenderTags::from_raw(raw))
|
||||
}
|
||||
|
||||
async fn dump_sender_tags(&self, tags: &UsedSenderTags) -> Result<(), StorageError> {
|
||||
for map_ref in tags.as_raw_iter() {
|
||||
let (recipient, tag) = map_ref.pair();
|
||||
self.manager
|
||||
.insert_tag(StoredSenderTag::new(*recipient, *tag))
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_stored_reply_keys(&self) -> Result<SentReplyKeys, StorageError> {
|
||||
let stored = self.manager.get_reply_keys().await?;
|
||||
|
||||
@@ -255,14 +205,17 @@ impl Backend {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_stored_reply_surbs(&self) -> Result<ReceivedReplySurbsMap, StorageError> {
|
||||
async fn get_stored_reply_surbs(
|
||||
&self,
|
||||
surb_freshness_cutoff: OffsetDateTime,
|
||||
) -> Result<ReceivedReplySurbsMap, StorageError> {
|
||||
let surb_senders = self.manager.get_surb_senders().await?;
|
||||
|
||||
let metadata = self.get_reply_surb_storage_metadata().await?;
|
||||
let mut received_surbs = Vec::with_capacity(surb_senders.len());
|
||||
for sender in surb_senders {
|
||||
let sender_id = sender.id;
|
||||
let (sender_tag, surbs_last_received_at_timestamp): (AnonymousSenderTag, i64) =
|
||||
let (sender_tag, surbs_last_received_at): (AnonymousSenderTag, OffsetDateTime) =
|
||||
sender.try_into()?;
|
||||
let stored_surbs = self
|
||||
.manager
|
||||
@@ -274,15 +227,17 @@ impl Backend {
|
||||
|
||||
received_surbs.push((
|
||||
sender_tag,
|
||||
ReceivedReplySurbs::new_retrieved(stored_surbs, surbs_last_received_at_timestamp),
|
||||
ReceivedReplySurbs::new_retrieved(stored_surbs, surbs_last_received_at),
|
||||
))
|
||||
}
|
||||
|
||||
Ok(ReceivedReplySurbsMap::from_raw(
|
||||
let received_surbs = ReceivedReplySurbsMap::from_raw(
|
||||
metadata.min_reply_surb_threshold as usize,
|
||||
metadata.max_reply_surb_threshold as usize,
|
||||
received_surbs,
|
||||
))
|
||||
);
|
||||
received_surbs.drop_stale_loaded_surbs(surb_freshness_cutoff);
|
||||
Ok(received_surbs)
|
||||
}
|
||||
|
||||
async fn dump_reply_surbs(
|
||||
@@ -304,6 +259,14 @@ impl Backend {
|
||||
.insert_reply_surb(StoredReplySurb::new(sender_id, reply_surb))
|
||||
.await?
|
||||
}
|
||||
|
||||
// TODO: should we also retain the stale ones?
|
||||
if received_surbs.possibly_stale_left() != 0 {
|
||||
warn!(
|
||||
"dropping {} possibly stale surbs for {tag}",
|
||||
received_surbs.possibly_stale_left()
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -347,7 +310,6 @@ impl ReplyStorageBackend for Backend {
|
||||
self.rotate().await?;
|
||||
self.start_storage_flush().await?;
|
||||
|
||||
self.dump_sender_tags(storage.tags_storage_ref()).await?;
|
||||
self.dump_sender_reply_keys(storage.key_storage_ref())
|
||||
.await?;
|
||||
let surbs_ref = storage.surbs_storage_ref();
|
||||
@@ -364,12 +326,14 @@ impl ReplyStorageBackend for Backend {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError> {
|
||||
async fn load_surb_storage(
|
||||
&self,
|
||||
surb_freshness_cutoff: OffsetDateTime,
|
||||
) -> Result<CombinedReplyStorage, Self::StorageError> {
|
||||
let reply_keys = self.get_stored_reply_keys().await?;
|
||||
let tags = self.get_stored_tags().await?;
|
||||
let reply_surbs = self.get_stored_reply_surbs().await?;
|
||||
let reply_surbs = self.get_stored_reply_surbs(surb_freshness_cutoff).await?;
|
||||
|
||||
Ok(CombinedReplyStorage::load(reply_keys, reply_surbs, tags))
|
||||
Ok(CombinedReplyStorage::load(reply_keys, reply_surbs))
|
||||
}
|
||||
|
||||
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::backend::fs_backend::error::StorageError;
|
||||
use crate::key_storage::UsedReplyKey;
|
||||
use crate::ReceivedReplySurb;
|
||||
use nym_crypto::generic_array::typenum::Unsigned;
|
||||
use nym_crypto::Digest;
|
||||
use nym_sphinx::addressing::clients::{Recipient, RecipientBytes};
|
||||
@@ -12,6 +13,8 @@ use nym_sphinx::anonymous_replies::{
|
||||
ReplySurb, ReplySurbWithKeyRotation, SurbEncryptionKey, SurbEncryptionKeySize,
|
||||
};
|
||||
use nym_sphinx::params::{ReplySurbKeyDigestAlgorithm, SphinxKeyRotation};
|
||||
use sqlx::FromRow;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StoredSenderTag {
|
||||
@@ -58,11 +61,11 @@ impl TryFrom<StoredSenderTag> for (RecipientBytes, AnonymousSenderTag) {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, FromRow)]
|
||||
pub struct StoredReplyKey {
|
||||
pub key_digest: Vec<u8>,
|
||||
pub reply_key: Vec<u8>,
|
||||
pub sent_at_timestamp: i64,
|
||||
pub sent_at: OffsetDateTime,
|
||||
}
|
||||
|
||||
impl StoredReplyKey {
|
||||
@@ -70,7 +73,7 @@ impl StoredReplyKey {
|
||||
StoredReplyKey {
|
||||
key_digest: key_digest.to_vec(),
|
||||
reply_key: (*reply_key).to_bytes(),
|
||||
sent_at_timestamp: reply_key.sent_at_timestamp,
|
||||
sent_at: reply_key.sent_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -100,32 +103,30 @@ impl TryFrom<StoredReplyKey> for (EncryptionKeyDigest, UsedReplyKey) {
|
||||
});
|
||||
};
|
||||
|
||||
Ok((
|
||||
digest,
|
||||
UsedReplyKey::new(reply_key, value.sent_at_timestamp),
|
||||
))
|
||||
Ok((digest, UsedReplyKey::new(reply_key, value.sent_at)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromRow)]
|
||||
pub struct StoredSurbSender {
|
||||
pub id: i64,
|
||||
pub tag: Vec<u8>,
|
||||
pub last_sent_timestamp: i64,
|
||||
pub last_sent: OffsetDateTime,
|
||||
}
|
||||
|
||||
impl StoredSurbSender {
|
||||
pub fn new(tag: AnonymousSenderTag, last_sent_timestamp: i64) -> Self {
|
||||
pub fn new(tag: AnonymousSenderTag, last_sent: OffsetDateTime) -> Self {
|
||||
StoredSurbSender {
|
||||
// for the purposes of STORING data,
|
||||
// we ignore that field anyway
|
||||
id: 0,
|
||||
tag: tag.to_bytes().to_vec(),
|
||||
last_sent_timestamp,
|
||||
last_sent,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<StoredSurbSender> for (AnonymousSenderTag, i64) {
|
||||
impl TryFrom<StoredSurbSender> for (AnonymousSenderTag, OffsetDateTime) {
|
||||
type Error = StorageError;
|
||||
|
||||
fn try_from(value: StoredSurbSender) -> Result<Self, Self::Error> {
|
||||
@@ -140,7 +141,7 @@ impl TryFrom<StoredSurbSender> for (AnonymousSenderTag, i64) {
|
||||
|
||||
Ok((
|
||||
AnonymousSenderTag::from_bytes(sender_tag_bytes),
|
||||
value.last_sent_timestamp,
|
||||
value.last_sent,
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -155,10 +156,10 @@ pub struct StoredReplySurb {
|
||||
}
|
||||
|
||||
impl StoredReplySurb {
|
||||
pub fn new(reply_surb_sender_id: i64, reply_surb: &ReplySurbWithKeyRotation) -> Self {
|
||||
pub fn new(reply_surb_sender_id: i64, reply_surb: &ReceivedReplySurb) -> Self {
|
||||
StoredReplySurb {
|
||||
reply_surb_sender_id,
|
||||
reply_surb: reply_surb.inner_reply_surb().to_bytes(),
|
||||
reply_surb: reply_surb.surb.inner_reply_surb().to_bytes(),
|
||||
encoded_key_rotation: reply_surb.key_rotation() as u8,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::CombinedReplyStorage;
|
||||
use async_trait::async_trait;
|
||||
use std::error::Error;
|
||||
use thiserror::Error;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
// TODO: this should now live inside our wasm/client-core
|
||||
pub mod browser_backend;
|
||||
@@ -53,7 +54,10 @@ impl ReplyStorageBackend for Empty {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError> {
|
||||
async fn load_surb_storage(
|
||||
&self,
|
||||
_: OffsetDateTime,
|
||||
) -> Result<CombinedReplyStorage, Self::StorageError> {
|
||||
Ok(CombinedReplyStorage::new(
|
||||
self.min_surb_threshold,
|
||||
self.max_surb_threshold,
|
||||
@@ -80,7 +84,10 @@ pub trait ReplyStorageBackend: Sized {
|
||||
/// (such as surb thresholds)
|
||||
async fn init_fresh(&mut self, fresh: &CombinedReplyStorage) -> Result<(), Self::StorageError>;
|
||||
|
||||
async fn load_surb_storage(&self) -> Result<CombinedReplyStorage, Self::StorageError>;
|
||||
async fn load_surb_storage(
|
||||
&self,
|
||||
surb_freshness_cutoff: OffsetDateTime,
|
||||
) -> Result<CombinedReplyStorage, Self::StorageError>;
|
||||
|
||||
async fn stop_storage_session(self) -> Result<(), Self::StorageError> {
|
||||
Ok(())
|
||||
|
||||
@@ -25,12 +25,11 @@ impl CombinedReplyStorage {
|
||||
pub fn load(
|
||||
sent_reply_keys: SentReplyKeys,
|
||||
received_reply_surbs: ReceivedReplySurbsMap,
|
||||
used_tags: UsedSenderTags,
|
||||
) -> Self {
|
||||
CombinedReplyStorage {
|
||||
sent_reply_keys,
|
||||
received_reply_surbs,
|
||||
used_tags,
|
||||
used_tags: UsedSenderTags::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,8 +47,12 @@ impl SentReplyKeys {
|
||||
self.inner.data.iter()
|
||||
}
|
||||
|
||||
pub fn retain(&self, f: impl FnMut(&EncryptionKeyDigest, &mut UsedReplyKey) -> bool) {
|
||||
self.inner.data.retain(f);
|
||||
}
|
||||
|
||||
pub fn insert_multiple(&self, keys: Vec<SurbEncryptionKey>) {
|
||||
let now = OffsetDateTime::now_utc().unix_timestamp();
|
||||
let now = OffsetDateTime::now_utc();
|
||||
for key in keys {
|
||||
self.insert(UsedReplyKey::new(key, now))
|
||||
}
|
||||
@@ -71,15 +75,12 @@ impl SentReplyKeys {
|
||||
pub struct UsedReplyKey {
|
||||
key: SurbEncryptionKey,
|
||||
// the purpose of this field is to perform invalidation at relatively very long intervals
|
||||
pub sent_at_timestamp: i64,
|
||||
pub sent_at: OffsetDateTime,
|
||||
}
|
||||
|
||||
impl UsedReplyKey {
|
||||
pub(crate) fn new(key: SurbEncryptionKey, sent_at_timestamp: i64) -> Self {
|
||||
UsedReplyKey {
|
||||
key,
|
||||
sent_at_timestamp,
|
||||
}
|
||||
pub(crate) fn new(key: SurbEncryptionKey, sent_at: OffsetDateTime) -> Self {
|
||||
UsedReplyKey { key, sent_at }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,8 +4,9 @@
|
||||
pub use backend::*;
|
||||
pub use combined::CombinedReplyStorage;
|
||||
pub use key_storage::SentReplyKeys;
|
||||
pub use surb_storage::ReceivedReplySurbsMap;
|
||||
pub use surb_storage::{ReceivedReplySurb, ReceivedReplySurbsMap, RetrievedReplySurb};
|
||||
pub use tag_storage::UsedSenderTags;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
mod backend;
|
||||
mod combined;
|
||||
@@ -29,8 +30,11 @@ where
|
||||
PersistentReplyStorage { backend }
|
||||
}
|
||||
|
||||
pub async fn load_state_from_backend(&self) -> Result<CombinedReplyStorage, T::StorageError> {
|
||||
self.backend.load_surb_storage().await
|
||||
pub async fn load_state_from_backend(
|
||||
&self,
|
||||
surb_freshness_cutoff: OffsetDateTime,
|
||||
) -> Result<CombinedReplyStorage, T::StorageError> {
|
||||
self.backend.load_surb_storage(surb_freshness_cutoff).await
|
||||
}
|
||||
|
||||
pub async fn flush_on_shutdown(
|
||||
@@ -38,7 +42,7 @@ where
|
||||
mem_state: CombinedReplyStorage,
|
||||
mut shutdown: nym_task::TaskClient,
|
||||
) {
|
||||
use log::{debug, error, info};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
debug!("Started PersistentReplyStorage");
|
||||
if let Err(err) = self.backend.start_storage_session().await {
|
||||
|
||||
@@ -1,15 +1,45 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use dashmap::iter::Iter;
|
||||
use dashmap::iter::{Iter, IterMut};
|
||||
use dashmap::DashMap;
|
||||
use log::trace;
|
||||
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
|
||||
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
|
||||
use nym_sphinx::params::SphinxKeyRotation;
|
||||
use std::cmp::min;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RetrievedReplySurb {
|
||||
pub(crate) reply_surb: ReceivedReplySurb,
|
||||
pub(crate) stale_pile: bool,
|
||||
}
|
||||
|
||||
impl RetrievedReplySurb {
|
||||
pub(crate) fn new_fresh(reply_surb: ReceivedReplySurb) -> Self {
|
||||
RetrievedReplySurb {
|
||||
reply_surb,
|
||||
stale_pile: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_stale(reply_surb: ReceivedReplySurb) -> Self {
|
||||
RetrievedReplySurb {
|
||||
reply_surb,
|
||||
stale_pile: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RetrievedReplySurb> for ReplySurbWithKeyRotation {
|
||||
fn from(retrieved: RetrievedReplySurb) -> Self {
|
||||
retrieved.reply_surb.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ReceivedReplySurbsMap {
|
||||
@@ -57,17 +87,40 @@ impl ReceivedReplySurbsMap {
|
||||
self.inner.data.iter()
|
||||
}
|
||||
|
||||
pub fn remove(&self, target: &AnonymousSenderTag) {
|
||||
self.inner.data.remove(target);
|
||||
pub fn as_raw_iter_mut(&self) -> IterMut<'_, AnonymousSenderTag, ReceivedReplySurbs> {
|
||||
self.inner.data.iter_mut()
|
||||
}
|
||||
|
||||
pub fn reset_surbs_last_received_at(&self, target: &AnonymousSenderTag) {
|
||||
if let Some(mut entry) = self.inner.data.get_mut(target) {
|
||||
entry.surbs_last_received_at_timestamp = OffsetDateTime::now_utc().unix_timestamp();
|
||||
fn total_surbs(&self) -> usize {
|
||||
self.inner
|
||||
.data
|
||||
.iter()
|
||||
.map(|entry| entry.value().data.len())
|
||||
.sum()
|
||||
}
|
||||
|
||||
pub fn drop_stale_loaded_surbs(&self, cutoff: OffsetDateTime) {
|
||||
let before = self.total_surbs();
|
||||
self.inner.data.retain(|_, v| {
|
||||
if v.surbs_last_received_at() < cutoff {
|
||||
return false;
|
||||
}
|
||||
|
||||
v.data.retain(|s| s.received_at > cutoff);
|
||||
!v.data.is_empty()
|
||||
});
|
||||
let after = self.total_surbs();
|
||||
let diff = before - after;
|
||||
if diff != 0 {
|
||||
info!("removed {diff} stale reply SURBs")
|
||||
}
|
||||
}
|
||||
|
||||
pub fn surbs_last_received_at(&self, target: &AnonymousSenderTag) -> Option<i64> {
|
||||
pub fn retain(&self, f: impl FnMut(&AnonymousSenderTag, &mut ReceivedReplySurbs) -> bool) {
|
||||
self.inner.data.retain(f);
|
||||
}
|
||||
|
||||
pub fn surbs_last_received_at(&self, target: &AnonymousSenderTag) -> Option<OffsetDateTime> {
|
||||
self.inner
|
||||
.data
|
||||
.get(target)
|
||||
@@ -126,15 +179,25 @@ impl ReceivedReplySurbsMap {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn available_fresh_surbs(&self, target: &AnonymousSenderTag) -> usize {
|
||||
self.inner
|
||||
.data
|
||||
.get(target)
|
||||
.map(|entry| entry.fresh_left())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn contains_surbs_for(&self, target: &AnonymousSenderTag) -> bool {
|
||||
self.inner.data.contains_key(target)
|
||||
}
|
||||
|
||||
/// Attempt to retrieve the specified number of reply SURBs for the target sender
|
||||
/// and return the number of SURBs remaining in the storage after the call.
|
||||
pub fn get_reply_surbs(
|
||||
&self,
|
||||
target: &AnonymousSenderTag,
|
||||
amount: usize,
|
||||
) -> (Option<Vec<ReplySurbWithKeyRotation>>, usize) {
|
||||
) -> (Option<Vec<RetrievedReplySurb>>, usize) {
|
||||
if let Some(mut entry) = self.inner.data.get_mut(target) {
|
||||
let surbs_left = entry.items_left();
|
||||
if surbs_left < self.min_surb_threshold() + amount {
|
||||
@@ -150,34 +213,72 @@ impl ReceivedReplySurbsMap {
|
||||
pub fn get_reply_surb_ignoring_threshold(
|
||||
&self,
|
||||
target: &AnonymousSenderTag,
|
||||
) -> Option<(Option<ReplySurbWithKeyRotation>, usize)> {
|
||||
self.inner
|
||||
.data
|
||||
.get_mut(target)
|
||||
.map(|mut s| s.get_reply_surb())
|
||||
) -> (Option<RetrievedReplySurb>, usize) {
|
||||
let Some(mut entry) = self.inner.data.get_mut(target) else {
|
||||
return (None, 0);
|
||||
};
|
||||
|
||||
entry.get_reply_surb()
|
||||
}
|
||||
|
||||
pub fn get_reply_surb(
|
||||
&self,
|
||||
target: &AnonymousSenderTag,
|
||||
) -> Option<(Option<ReplySurbWithKeyRotation>, usize)> {
|
||||
self.inner.data.get_mut(target).map(|mut entry| {
|
||||
let surbs_left = entry.items_left();
|
||||
if surbs_left < self.min_surb_threshold() {
|
||||
(None, surbs_left)
|
||||
} else {
|
||||
entry.get_reply_surb()
|
||||
}
|
||||
})
|
||||
) -> (Option<RetrievedReplySurb>, usize) {
|
||||
let Some(mut entry) = self.inner.data.get_mut(target) else {
|
||||
return (None, 0);
|
||||
};
|
||||
|
||||
let surbs_left = entry.items_left();
|
||||
if surbs_left < self.min_surb_threshold() {
|
||||
(None, surbs_left)
|
||||
} else {
|
||||
entry.get_reply_surb()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
|
||||
pub fn re_insert_reply_surbs(
|
||||
&self,
|
||||
target: &AnonymousSenderTag,
|
||||
surbs: Vec<RetrievedReplySurb>,
|
||||
) {
|
||||
error!("re-inserting {} unused surbs", surbs.len());
|
||||
let mut entry = self.inner.data.entry(*target).or_insert_with(|| {
|
||||
// this branch should realistically NEVER happen, but software be software, so let's not crash
|
||||
error!("attempting to return surbs to no longer existing entry {target}");
|
||||
ReceivedReplySurbs::new(VecDeque::new())
|
||||
});
|
||||
|
||||
let entry = entry.value_mut();
|
||||
for returned_surb in surbs.into_iter().rev() {
|
||||
if returned_surb.stale_pile {
|
||||
entry.possibly_stale.push_front(returned_surb.reply_surb)
|
||||
} else {
|
||||
entry.data.push_front(returned_surb.reply_surb)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_fresh_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
|
||||
&self,
|
||||
target: &AnonymousSenderTag,
|
||||
surbs: I,
|
||||
) {
|
||||
if let Some(mut existing_data) = self.inner.data.get_mut(target) {
|
||||
existing_data.insert_reply_surbs(surbs)
|
||||
existing_data.insert_fresh_reply_surbs(surbs);
|
||||
|
||||
if existing_data.possibly_stale.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// if we're above the minimum threshold, remove stale surbs
|
||||
let threshold = self.min_surb_threshold();
|
||||
let diff = existing_data.data.len().saturating_sub(threshold);
|
||||
|
||||
trace!("will attempt to remove up to {diff} stale surbs");
|
||||
if diff > 0 {
|
||||
existing_data.remove_stale_surbs(diff);
|
||||
}
|
||||
} else {
|
||||
let new_entry = ReceivedReplySurbs::new(surbs.into_iter().collect());
|
||||
self.inner.data.insert(*target, new_entry);
|
||||
@@ -185,44 +286,102 @@ impl ReceivedReplySurbsMap {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ReceivedReplySurb {
|
||||
pub(crate) surb: ReplySurbWithKeyRotation,
|
||||
pub(crate) received_at: OffsetDateTime,
|
||||
}
|
||||
|
||||
impl From<ReceivedReplySurb> for ReplySurbWithKeyRotation {
|
||||
fn from(surb: ReceivedReplySurb) -> Self {
|
||||
surb.surb
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceivedReplySurb {
|
||||
pub fn received_at(&self) -> OffsetDateTime {
|
||||
self.received_at
|
||||
}
|
||||
|
||||
pub fn key_rotation(&self) -> SphinxKeyRotation {
|
||||
self.surb.key_rotation()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ReceivedReplySurbs {
|
||||
// in the future we'd probably want to put extra data here to indicate when the SURBs got received
|
||||
// so we could invalidate entries from the previous key rotations
|
||||
data: VecDeque<ReplySurbWithKeyRotation>,
|
||||
data: VecDeque<ReceivedReplySurb>,
|
||||
possibly_stale: VecDeque<ReceivedReplySurb>,
|
||||
|
||||
pending_reception: u32,
|
||||
surbs_last_received_at_timestamp: i64,
|
||||
surbs_last_received_at: OffsetDateTime,
|
||||
}
|
||||
|
||||
impl ReceivedReplySurbs {
|
||||
fn new(initial_surbs: VecDeque<ReplySurbWithKeyRotation>) -> Self {
|
||||
ReceivedReplySurbs {
|
||||
data: initial_surbs,
|
||||
let mut this = ReceivedReplySurbs {
|
||||
data: Default::default(),
|
||||
possibly_stale: Default::default(),
|
||||
pending_reception: 0,
|
||||
surbs_last_received_at_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
|
||||
}
|
||||
surbs_last_received_at: OffsetDateTime::now_utc(),
|
||||
};
|
||||
this.insert_fresh_reply_surbs(initial_surbs);
|
||||
this
|
||||
}
|
||||
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "fs-surb-storage"))]
|
||||
pub fn new_retrieved(
|
||||
surbs: Vec<ReplySurbWithKeyRotation>,
|
||||
surbs_last_received_at_timestamp: i64,
|
||||
surbs_last_received_at: OffsetDateTime,
|
||||
) -> ReceivedReplySurbs {
|
||||
ReceivedReplySurbs {
|
||||
data: surbs.into(),
|
||||
let mut this = ReceivedReplySurbs {
|
||||
data: Default::default(),
|
||||
possibly_stale: Default::default(),
|
||||
pending_reception: 0,
|
||||
surbs_last_received_at_timestamp,
|
||||
}
|
||||
surbs_last_received_at,
|
||||
};
|
||||
this.insert_fresh_reply_surbs(surbs);
|
||||
this.surbs_last_received_at = surbs_last_received_at;
|
||||
this
|
||||
}
|
||||
|
||||
pub fn downgrade_freshness(&mut self) -> usize {
|
||||
debug_assert!(self.possibly_stale.is_empty());
|
||||
std::mem::swap(&mut self.data, &mut self.possibly_stale);
|
||||
self.possibly_stale.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.data.is_empty() && self.possibly_stale.is_empty()
|
||||
}
|
||||
|
||||
#[cfg(all(not(target_arch = "wasm32"), feature = "fs-surb-storage"))]
|
||||
pub fn surbs_ref(&self) -> &VecDeque<ReplySurbWithKeyRotation> {
|
||||
pub fn surbs_ref(&self) -> &VecDeque<ReceivedReplySurb> {
|
||||
&self.data
|
||||
}
|
||||
|
||||
pub fn surbs_last_received_at(&self) -> i64 {
|
||||
self.surbs_last_received_at_timestamp
|
||||
pub fn retain_fresh_surbs(&mut self, f: impl FnMut(&ReceivedReplySurb) -> bool) {
|
||||
self.data.retain(f);
|
||||
}
|
||||
|
||||
pub fn retain_possibly_stale_surbs(&mut self, f: impl FnMut(&ReceivedReplySurb) -> bool) {
|
||||
self.possibly_stale.retain(f);
|
||||
}
|
||||
|
||||
pub fn fresh_left(&self) -> usize {
|
||||
self.data.len()
|
||||
}
|
||||
|
||||
pub fn possibly_stale_left(&self) -> usize {
|
||||
self.possibly_stale.len()
|
||||
}
|
||||
|
||||
pub fn drop_possibly_stale_surbs(&mut self) {
|
||||
self.possibly_stale = VecDeque::new();
|
||||
}
|
||||
|
||||
pub fn surbs_last_received_at(&self) -> OffsetDateTime {
|
||||
self.surbs_last_received_at
|
||||
}
|
||||
|
||||
pub fn pending_reception(&self) -> u32 {
|
||||
@@ -243,39 +402,78 @@ impl ReceivedReplySurbs {
|
||||
self.pending_reception = 0;
|
||||
}
|
||||
|
||||
pub fn get_reply_surbs(
|
||||
&mut self,
|
||||
amount: usize,
|
||||
) -> (Option<Vec<ReplySurbWithKeyRotation>>, usize) {
|
||||
/// Attempt to retrieve the specified number of reply SURBs (if at least that many are present)
|
||||
/// and return the number of SURBs remaining in the storage after the call.
|
||||
pub fn get_reply_surbs(&mut self, amount: usize) -> (Option<Vec<RetrievedReplySurb>>, usize) {
|
||||
if self.items_left() < amount {
|
||||
(None, self.items_left())
|
||||
} else {
|
||||
let surbs = self.data.drain(..amount).collect();
|
||||
(Some(surbs), self.items_left())
|
||||
let available_fresh = self.fresh_left();
|
||||
|
||||
// prefer the 'fresh' data if available. otherwise fallback to the possibly stale entries
|
||||
let mut reply_surbs = Vec::with_capacity(amount);
|
||||
|
||||
let fresh_to_retrieve = min(available_fresh, amount);
|
||||
|
||||
for surb in self.data.drain(..fresh_to_retrieve) {
|
||||
reply_surbs.push(RetrievedReplySurb::new_fresh(surb))
|
||||
}
|
||||
|
||||
if available_fresh < amount {
|
||||
let stale_to_retrieve = amount - fresh_to_retrieve;
|
||||
for surb in self.possibly_stale.drain(..stale_to_retrieve) {
|
||||
reply_surbs.push(RetrievedReplySurb::new_stale(surb))
|
||||
}
|
||||
}
|
||||
|
||||
(Some(reply_surbs), self.items_left())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_reply_surb(&mut self) -> (Option<ReplySurbWithKeyRotation>, usize) {
|
||||
pub fn get_reply_surb(&mut self) -> (Option<RetrievedReplySurb>, usize) {
|
||||
(self.pop_surb(), self.items_left())
|
||||
}
|
||||
|
||||
fn pop_surb(&mut self) -> Option<ReplySurbWithKeyRotation> {
|
||||
self.data.pop_front()
|
||||
fn pop_surb(&mut self) -> Option<RetrievedReplySurb> {
|
||||
// prefer the 'fresh' data if available. otherwise fallback to the possibly stale entries
|
||||
if let Some(fresh) = self.data.pop_front() {
|
||||
return Some(RetrievedReplySurb::new_fresh(fresh));
|
||||
}
|
||||
if let Some(stale) = self.possibly_stale.pop_front() {
|
||||
return Some(RetrievedReplySurb::new_stale(stale));
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn items_left(&self) -> usize {
|
||||
self.data.len()
|
||||
self.data.len() + self.possibly_stale.len()
|
||||
}
|
||||
|
||||
pub fn remove_stale_surbs(&mut self, amount: usize) {
|
||||
// remove up to amount number of possibly stale surbs
|
||||
let amount = min(amount, self.possibly_stale.len());
|
||||
|
||||
self.possibly_stale.drain(..amount);
|
||||
}
|
||||
|
||||
// realistically we're always going to be getting multiple surbs at once
|
||||
pub fn insert_reply_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
|
||||
pub(crate) fn insert_fresh_reply_surbs<I: IntoIterator<Item = ReplySurbWithKeyRotation>>(
|
||||
&mut self,
|
||||
surbs: I,
|
||||
) {
|
||||
let mut v = surbs.into_iter().collect::<VecDeque<_>>();
|
||||
let received_at = OffsetDateTime::now_utc();
|
||||
let mut v = surbs
|
||||
.into_iter()
|
||||
.map(|surb| ReceivedReplySurb { surb, received_at })
|
||||
.collect::<VecDeque<_>>();
|
||||
|
||||
if v.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
trace!("storing {} surbs in the storage", v.len());
|
||||
self.data.append(&mut v);
|
||||
self.surbs_last_received_at_timestamp = OffsetDateTime::now_utc().unix_timestamp();
|
||||
self.surbs_last_received_at = received_at;
|
||||
trace!("we now have {} surbs!", self.data.len());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -272,7 +272,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
) -> Result<(), GatewayClientError> {
|
||||
if let Some(shared_key) = self.shared_key() {
|
||||
let encrypted = message.encrypt(&*shared_key)?;
|
||||
Box::pin(self.send_websocket_message(encrypted)).await?;
|
||||
Box::pin(self.send_websocket_message_without_response(encrypted)).await?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(GatewayClientError::ConnectionInInvalidState)
|
||||
@@ -330,9 +330,80 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to send a websocket message to the gateway without waiting for any response
|
||||
async fn send_websocket_message_without_response(
|
||||
&mut self,
|
||||
msg: impl Into<Message>,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
match self.connection {
|
||||
SocketState::Available(ref mut conn) => Ok(conn.send(msg.into()).await?),
|
||||
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
|
||||
if let Err(err) = partially_delegated.send_without_response(msg.into()).await {
|
||||
error!("failed to send message without response - {err}...");
|
||||
// we must ensure we do not leave the task still active
|
||||
if let Err(err) = self.recover_socket_connection().await {
|
||||
error!("... and the delegated stream has also errored out - {err}")
|
||||
}
|
||||
Err(err)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
|
||||
_ => Err(GatewayClientError::ConnectionInInvalidState),
|
||||
}
|
||||
}
|
||||
|
||||
// A very nasty hack due to lack of id tags on messages - send a non-sphinx packet websocket
|
||||
// message and wait until first non 'Send' response within timeout
|
||||
pub async fn send_websocket_message_with_non_send_response(
|
||||
&mut self,
|
||||
msg: impl Into<Message>,
|
||||
) -> Result<ServerResponse, GatewayClientError> {
|
||||
let should_restart_mixnet_listener = if self.connection.is_partially_delegated() {
|
||||
self.recover_socket_connection().await?;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let conn = match self.connection {
|
||||
SocketState::Available(ref mut conn) => conn,
|
||||
SocketState::NotConnected => return Err(GatewayClientError::ConnectionNotEstablished),
|
||||
_ => return Err(GatewayClientError::ConnectionInInvalidState),
|
||||
};
|
||||
conn.send(msg.into()).await?;
|
||||
|
||||
let timeout = sleep(self.cfg.connection.response_timeout_duration);
|
||||
tokio::pin!(timeout);
|
||||
|
||||
let response = loop {
|
||||
tokio::select! {
|
||||
_ = &mut timeout => {
|
||||
break Err(GatewayClientError::Timeout);
|
||||
}
|
||||
// note: the below will also listen for shutdown signals
|
||||
msg = self.read_control_response() => {
|
||||
match msg {
|
||||
Ok(res) => if !res.is_send() {
|
||||
break Ok(res);
|
||||
},
|
||||
Err(err) => break Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if should_restart_mixnet_listener {
|
||||
self.start_listening_for_mixnet_messages()?;
|
||||
}
|
||||
response
|
||||
}
|
||||
|
||||
/// Attempt to send a websocket message to the gateway and wait until we receive a response.
|
||||
// If we want to send a message (with response), we need to have a full control over the socket,
|
||||
// as we need to be able to write the request and read the subsequent response
|
||||
pub async fn send_websocket_message(
|
||||
pub async fn send_websocket_message_with_response(
|
||||
&mut self,
|
||||
msg: impl Into<Message>,
|
||||
) -> Result<ServerResponse, GatewayClientError> {
|
||||
@@ -387,29 +458,6 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_websocket_message_without_response(
|
||||
&mut self,
|
||||
msg: Message,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
match self.connection {
|
||||
SocketState::Available(ref mut conn) => Ok(conn.send(msg).await?),
|
||||
SocketState::PartiallyDelegated(ref mut partially_delegated) => {
|
||||
if let Err(err) = partially_delegated.send_without_response(msg).await {
|
||||
error!("failed to send message without response - {err}...");
|
||||
// we must ensure we do not leave the task still active
|
||||
if let Err(err) = self.recover_socket_connection().await {
|
||||
error!("... and the delegated stream has also errored out - {err}")
|
||||
}
|
||||
Err(err)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
SocketState::NotConnected => Err(GatewayClientError::ConnectionNotEstablished),
|
||||
_ => Err(GatewayClientError::ConnectionInInvalidState),
|
||||
}
|
||||
}
|
||||
|
||||
fn check_gateway_protocol(
|
||||
&self,
|
||||
gateway_protocol: Option<u8>,
|
||||
@@ -535,7 +583,10 @@ impl<C, St> GatewayClient<C, St> {
|
||||
.encrypt(legacy_key)?;
|
||||
|
||||
info!("sending upgrade request and awaiting the acknowledgement back");
|
||||
let (ciphertext, nonce) = match self.send_websocket_message(upgrade_request).await? {
|
||||
let (ciphertext, nonce) = match self
|
||||
.send_websocket_message_with_response(upgrade_request)
|
||||
.await?
|
||||
{
|
||||
ServerResponse::EncryptedResponse { ciphertext, nonce } => (ciphertext, nonce),
|
||||
ServerResponse::Error { message } => {
|
||||
return Err(GatewayClientError::GatewayError(message))
|
||||
@@ -567,7 +618,7 @@ impl<C, St> GatewayClient<C, St> {
|
||||
&mut self,
|
||||
msg: ClientControlRequest,
|
||||
) -> Result<(), GatewayClientError> {
|
||||
match self.send_websocket_message(msg).await? {
|
||||
match self.send_websocket_message_with_response(msg).await? {
|
||||
ServerResponse::Authenticate {
|
||||
protocol_version,
|
||||
status,
|
||||
@@ -717,13 +768,16 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to retrieve the currently supported gateway protocol version of the remote.
|
||||
pub async fn get_gateway_protocol(&mut self) -> Result<u8, GatewayClientError> {
|
||||
if !self.connection.is_established() {
|
||||
return Err(GatewayClientError::ConnectionNotEstablished);
|
||||
}
|
||||
|
||||
match self
|
||||
.send_websocket_message(ClientControlRequest::SupportedProtocol {})
|
||||
.send_websocket_message_with_non_send_response(
|
||||
ClientControlRequest::SupportedProtocol {},
|
||||
)
|
||||
.await?
|
||||
{
|
||||
ServerResponse::SupportedProtocol { version } => Ok(version),
|
||||
@@ -740,7 +794,10 @@ impl<C, St> GatewayClient<C, St> {
|
||||
credential,
|
||||
self.shared_key.as_ref().unwrap(),
|
||||
)?;
|
||||
let bandwidth_remaining = match self.send_websocket_message(msg).await? {
|
||||
let bandwidth_remaining = match self
|
||||
.send_websocket_message_with_non_send_response(msg)
|
||||
.await?
|
||||
{
|
||||
ServerResponse::Bandwidth { available_total } => Ok(available_total),
|
||||
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
|
||||
ServerResponse::TypedError { error } => {
|
||||
@@ -758,7 +815,10 @@ impl<C, St> GatewayClient<C, St> {
|
||||
|
||||
async fn try_claim_testnet_bandwidth(&mut self) -> Result<(), GatewayClientError> {
|
||||
let msg = ClientControlRequest::ClaimFreeTestnetBandwidth;
|
||||
let bandwidth_remaining = match self.send_websocket_message(msg).await? {
|
||||
let bandwidth_remaining = match self
|
||||
.send_websocket_message_with_non_send_response(msg)
|
||||
.await?
|
||||
{
|
||||
ServerResponse::Bandwidth { available_total } => Ok(available_total),
|
||||
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
|
||||
other => Err(GatewayClientError::UnexpectedResponse { name: other.name() }),
|
||||
|
||||
@@ -28,6 +28,7 @@ pub struct Config {
|
||||
pub maximum_reconnection_backoff: Duration,
|
||||
pub initial_connection_timeout: Duration,
|
||||
pub maximum_connection_buffer_size: usize,
|
||||
pub use_legacy_packet_encoding: bool,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
@@ -36,12 +37,14 @@ impl Config {
|
||||
maximum_reconnection_backoff: Duration,
|
||||
initial_connection_timeout: Duration,
|
||||
maximum_connection_buffer_size: usize,
|
||||
use_legacy_packet_encoding: bool,
|
||||
) -> Self {
|
||||
Config {
|
||||
initial_reconnection_backoff,
|
||||
maximum_reconnection_backoff,
|
||||
initial_connection_timeout,
|
||||
maximum_connection_buffer_size,
|
||||
use_legacy_packet_encoding,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -267,7 +270,12 @@ impl SendWithoutResponse for Client {
|
||||
fn send_without_response(&self, packet: MixPacket) -> io::Result<()> {
|
||||
let address = packet.next_hop_address();
|
||||
trace!("Sending packet to {address}");
|
||||
let framed_packet = FramedNymPacket::from(packet);
|
||||
|
||||
// TODO: optimisation for the future: rather than constantly using legacy encoding,
|
||||
// once we're addressing by node_id (and thus have full node info here),
|
||||
// we could simply infer supported encoding based on their version
|
||||
let framed_packet =
|
||||
FramedNymPacket::from_mix_packet(packet, self.config.use_legacy_packet_encoding);
|
||||
|
||||
let Some(sender) = self.active_connections.get_mut(&address) else {
|
||||
// there was never a connection to begin with
|
||||
@@ -328,6 +336,7 @@ mod tests {
|
||||
maximum_reconnection_backoff: Duration::from_millis(300_000),
|
||||
initial_connection_timeout: Duration::from_millis(1_500),
|
||||
maximum_connection_buffer_size: 128,
|
||||
use_legacy_packet_encoding: false,
|
||||
},
|
||||
NoiseConfig::new(
|
||||
Arc::new(x25519::KeyPair::new(&mut rng)),
|
||||
|
||||
@@ -72,7 +72,7 @@ macro_rules! collect_paged_skimmed_v2 {
|
||||
.$f(false, Some(page), None, $self.use_bincode)
|
||||
.await?;
|
||||
|
||||
if metadata != res.metadata {
|
||||
if !metadata.consistency_check(&res.metadata) {
|
||||
return Err(ValidatorClientError::InconsistentPagedMetadata);
|
||||
}
|
||||
|
||||
@@ -471,12 +471,12 @@ impl NymApiClient {
|
||||
pub async fn get_all_basic_entry_assigned_nodes(
|
||||
&self,
|
||||
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
|
||||
self.get_all_basic_entry_assigned_nodes_v2()
|
||||
self.get_all_basic_entry_assigned_nodes_with_metadata()
|
||||
.await
|
||||
.map(|res| res.nodes)
|
||||
}
|
||||
|
||||
pub async fn get_all_basic_entry_assigned_nodes_v2(
|
||||
pub async fn get_all_basic_entry_assigned_nodes_with_metadata(
|
||||
&self,
|
||||
) -> Result<SkimmedNodesWithMetadata, ValidatorClientError> {
|
||||
collect_paged_skimmed_v2!(self, get_basic_entry_assigned_nodes_v2)
|
||||
|
||||
@@ -389,7 +389,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
"entry-gateways",
|
||||
"all",
|
||||
],
|
||||
¶ms,
|
||||
)
|
||||
|
||||
+16
-10
@@ -7,18 +7,17 @@ use crate::nyxd::error::NyxdError;
|
||||
use crate::nyxd::CosmWasmClient;
|
||||
use async_trait::async_trait;
|
||||
use cosmrs::AccountId;
|
||||
pub use nym_performance_contract_common::{
|
||||
msg::QueryMsg as PerformanceQueryMsg, types::NetworkMonitorResponse,
|
||||
};
|
||||
use nym_performance_contract_common::{
|
||||
EpochId, EpochMeasurementsPagedResponse, EpochNodePerformance, EpochPerformancePagedResponse,
|
||||
FullHistoricalPerformancePagedResponse, HistoricalPerformance, NetworkMonitorInformation,
|
||||
NetworkMonitorsPagedResponse, NodeId, NodeMeasurement, NodeMeasurementsResponse,
|
||||
NodePerformance, NodePerformancePagedResponse, NodePerformanceResponse, RetiredNetworkMonitor,
|
||||
RetiredNetworkMonitorsPagedResponse,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
|
||||
pub use nym_performance_contract_common::{
|
||||
msg::QueryMsg as PerformanceQueryMsg, types::NetworkMonitorResponse, EpochId,
|
||||
EpochMeasurementsPagedResponse, EpochNodePerformance, EpochPerformancePagedResponse,
|
||||
FullHistoricalPerformancePagedResponse, HistoricalPerformance, LastSubmission,
|
||||
NetworkMonitorInformation, NetworkMonitorsPagedResponse, NodeId, NodeMeasurement,
|
||||
NodeMeasurementsResponse, NodePerformance, NodePerformancePagedResponse,
|
||||
NodePerformanceResponse, RetiredNetworkMonitor, RetiredNetworkMonitorsPagedResponse,
|
||||
};
|
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
pub trait PerformanceQueryClient {
|
||||
@@ -139,6 +138,11 @@ pub trait PerformanceQueryClient {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_last_submission(&self) -> Result<LastSubmission, NyxdError> {
|
||||
self.query_performance_contract(PerformanceQueryMsg::LastSubmittedMeasurement {})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
// extension trait to the query client to deal with the paged queries
|
||||
@@ -212,6 +216,7 @@ where
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::nyxd::contract_traits::tests::IgnoreValue;
|
||||
use nym_performance_contract_common::QueryMsg;
|
||||
|
||||
// it's enough that this compiles and clippy is happy about it
|
||||
#[allow(dead_code)]
|
||||
@@ -260,6 +265,7 @@ mod tests {
|
||||
PerformanceQueryMsg::RetiredNetworkMonitorsPaged { start_after, limit } => client
|
||||
.get_retired_network_monitors_paged(start_after, limit)
|
||||
.ignore(),
|
||||
QueryMsg::LastSubmittedMeasurement {} => client.get_last_submission().ignore(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,14 +49,14 @@ pub fn show_error<E>(e: E)
|
||||
where
|
||||
E: Display,
|
||||
{
|
||||
error!("{}", e);
|
||||
error!("{e}");
|
||||
}
|
||||
|
||||
pub fn show_error_passthrough<E>(e: E) -> E
|
||||
where
|
||||
E: Error + Display,
|
||||
{
|
||||
error!("{}", e);
|
||||
error!("{e}");
|
||||
e
|
||||
}
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ pub async fn query_balance(
|
||||
.address
|
||||
.unwrap_or_else(|| address_from_mnemonic.expect("please provide a mnemonic"));
|
||||
|
||||
info!("Getting balance for {}...", address);
|
||||
info!("Getting balance for {address}...");
|
||||
|
||||
match client.get_all_balances(&address).await {
|
||||
Ok(coins) => {
|
||||
|
||||
@@ -57,17 +57,17 @@ pub fn get_pubkey_from_mnemonic(address: AccountId, prefix: &str, mnemonic: bip3
|
||||
println!("{}", account.public_key().to_string());
|
||||
}
|
||||
None => {
|
||||
error!("Could not derive key that matches {}", address)
|
||||
error!("Could not derive key that matches {address}")
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to derive accounts. {}", e);
|
||||
error!("Failed to derive accounts. {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_pubkey_from_chain(address: AccountId, client: &QueryClient) {
|
||||
info!("Getting public key for address {} from chain...", address);
|
||||
info!("Getting public key for address {address} from chain...");
|
||||
match client.get_account(&address).await {
|
||||
Ok(Some(account)) => {
|
||||
if let Ok(base_account) = account.try_get_base_account() {
|
||||
|
||||
@@ -37,7 +37,7 @@ pub async fn send_multiple(args: Args, client: &SigningClient) {
|
||||
|
||||
let rows = InputFileReader::new(&args.input);
|
||||
if let Err(e) = rows {
|
||||
error!("Failed to read input file: {}", e);
|
||||
error!("Failed to read input file: {e}");
|
||||
return;
|
||||
}
|
||||
let rows = rows.unwrap();
|
||||
@@ -67,7 +67,7 @@ pub async fn send_multiple(args: Args, client: &SigningClient) {
|
||||
.prompt();
|
||||
|
||||
if let Err(e) = ans {
|
||||
info!("Aborting, {}...", e);
|
||||
info!("Aborting, {e}...");
|
||||
return;
|
||||
}
|
||||
if let Ok(false) = ans {
|
||||
@@ -100,13 +100,10 @@ pub async fn send_multiple(args: Args, client: &SigningClient) {
|
||||
println!("Transaction hash: {}", &res.hash);
|
||||
|
||||
if let Some(output_filename) = args.output {
|
||||
println!("\nWriting output log to {}", output_filename);
|
||||
println!("\nWriting output log to {output_filename}");
|
||||
|
||||
if let Err(e) = write_output_file(rows, res, &output_filename) {
|
||||
error!(
|
||||
"Failed to write output file {} with error {}",
|
||||
output_filename, e
|
||||
);
|
||||
error!("Failed to write output file {output_filename} with error {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -136,7 +133,7 @@ fn write_output_file(
|
||||
.collect::<Vec<String>>()
|
||||
.join("\n");
|
||||
|
||||
Ok(file.write_all(format!("{}\n", data).as_bytes())?)
|
||||
Ok(file.write_all(format!("{data}\n").as_bytes())?)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -171,7 +168,7 @@ impl InputFileReader {
|
||||
|
||||
// multiply when a whole token amount, e.g. 50nym (50.123456nym is not allowed, that must be input as 50123456unym)
|
||||
let (amount, denom) = if !denom.starts_with('u') {
|
||||
(amount * 1_000_000u128, format!("u{}", denom))
|
||||
(amount * 1_000_000u128, format!("u{denom}"))
|
||||
} else {
|
||||
(amount, denom)
|
||||
};
|
||||
|
||||
@@ -55,6 +55,6 @@ pub async fn execute(args: Args, client: SigningClient) {
|
||||
.await
|
||||
{
|
||||
Ok(res) => info!("SUCCESS ✅\n{}", json!(res)),
|
||||
Err(e) => error!("FAILURE ❌\n{}", e),
|
||||
Err(e) => error!("FAILURE ❌\n{e}"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ pub struct Args {
|
||||
pub async fn generate(args: Args) {
|
||||
info!("Starting to generate vesting contract instantiate msg");
|
||||
|
||||
debug!("Received arguments: {:?}", args);
|
||||
debug!("Received arguments: {args:?}");
|
||||
|
||||
let multisig_addr = args.multisig_addr.unwrap_or_else(|| {
|
||||
let address = std::env::var(nym_network_defaults::var_names::REWARDING_VALIDATOR_ADDRESS)
|
||||
@@ -97,7 +97,7 @@ pub async fn generate(args: Args) {
|
||||
key_size: DEFAULT_DEALINGS as u32,
|
||||
};
|
||||
|
||||
debug!("instantiate_msg: {:?}", instantiate_msg);
|
||||
debug!("instantiate_msg: {instantiate_msg:?}");
|
||||
|
||||
let res =
|
||||
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
|
||||
|
||||
@@ -28,7 +28,7 @@ pub struct Args {
|
||||
pub async fn generate(args: Args) {
|
||||
info!("Starting to generate vesting contract instantiate msg");
|
||||
|
||||
debug!("Received arguments: {:?}", args);
|
||||
debug!("Received arguments: {args:?}");
|
||||
|
||||
let group_addr = args.group_addr.unwrap_or_else(|| {
|
||||
let address = std::env::var(nym_network_defaults::var_names::GROUP_CONTRACT_ADDRESS)
|
||||
@@ -51,7 +51,7 @@ pub async fn generate(args: Args) {
|
||||
deposit_amount: args.deposit_amount,
|
||||
};
|
||||
|
||||
debug!("instantiate_msg: {:?}", instantiate_msg);
|
||||
debug!("instantiate_msg: {instantiate_msg:?}");
|
||||
|
||||
let res =
|
||||
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
|
||||
|
||||
@@ -88,7 +88,7 @@ pub struct Args {
|
||||
pub async fn generate(args: Args) {
|
||||
info!("Starting to generate mixnet contract instantiate msg");
|
||||
|
||||
debug!("Received arguments: {:?}", args);
|
||||
debug!("Received arguments: {args:?}");
|
||||
|
||||
let initial_rewarding_params = InitialRewardingParams {
|
||||
initial_reward_pool: Decimal::from_atomics(args.initial_reward_pool, 0)
|
||||
@@ -114,7 +114,7 @@ pub async fn generate(args: Args) {
|
||||
},
|
||||
};
|
||||
|
||||
debug!("initial_rewarding_params: {:?}", initial_rewarding_params);
|
||||
debug!("initial_rewarding_params: {initial_rewarding_params:?}");
|
||||
|
||||
let rewarding_validator_address = args.rewarding_validator_address.unwrap_or_else(|| {
|
||||
let address = std::env::var(nym_network_defaults::var_names::REWARDING_VALIDATOR_ADDRESS)
|
||||
@@ -160,7 +160,7 @@ pub async fn generate(args: Args) {
|
||||
key_validity_in_epochs: None,
|
||||
};
|
||||
|
||||
debug!("instantiate_msg: {:?}", instantiate_msg);
|
||||
debug!("instantiate_msg: {instantiate_msg:?}");
|
||||
|
||||
let res =
|
||||
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
|
||||
|
||||
@@ -31,7 +31,7 @@ pub struct Args {
|
||||
pub async fn generate(args: Args) {
|
||||
info!("Starting to generate vesting contract instantiate msg");
|
||||
|
||||
debug!("Received arguments: {:?}", args);
|
||||
debug!("Received arguments: {args:?}");
|
||||
|
||||
let ecash_contract_address = args.ecash_contract_address.unwrap_or_else(|| {
|
||||
let address = std::env::var(nym_network_defaults::var_names::ECASH_CONTRACT_ADDRESS)
|
||||
@@ -60,7 +60,7 @@ pub async fn generate(args: Args) {
|
||||
coconut_dkg_contract_address: coconut_dkg_contract_address.to_string(),
|
||||
};
|
||||
|
||||
debug!("instantiate_msg: {:?}", instantiate_msg);
|
||||
debug!("instantiate_msg: {instantiate_msg:?}");
|
||||
|
||||
let res =
|
||||
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
|
||||
|
||||
@@ -21,7 +21,7 @@ pub struct Args {
|
||||
pub async fn generate(args: Args) {
|
||||
info!("Starting to generate vesting contract instantiate msg");
|
||||
|
||||
debug!("Received arguments: {:?}", args);
|
||||
debug!("Received arguments: {args:?}");
|
||||
|
||||
let mixnet_contract_address = args.mixnet_contract_address.unwrap_or_else(|| {
|
||||
let address = std::env::var(nym_network_defaults::var_names::MIXNET_CONTRACT_ADDRESS)
|
||||
@@ -39,7 +39,7 @@ pub async fn generate(args: Args) {
|
||||
mix_denom,
|
||||
};
|
||||
|
||||
debug!("instantiate_msg: {:?}", instantiate_msg);
|
||||
debug!("instantiate_msg: {instantiate_msg:?}");
|
||||
|
||||
let res =
|
||||
serde_json::to_string(&instantiate_msg).expect("failed to convert instantiate msg to json");
|
||||
|
||||
@@ -72,7 +72,7 @@ pub async fn init(args: Args, client: SigningClient, network_details: &NymNetwor
|
||||
.await
|
||||
.expect("failed to instantiate the contract!");
|
||||
|
||||
info!("Init result: {:?}", res);
|
||||
info!("Init result: {res:?}");
|
||||
|
||||
println!("{}", res.contract_address)
|
||||
}
|
||||
|
||||
@@ -47,5 +47,5 @@ pub async fn migrate(args: Args, client: SigningClient) {
|
||||
.expect("failed to migrate the contract!")
|
||||
};
|
||||
|
||||
info!("Migrate result: {:?}", res);
|
||||
info!("Migrate result: {res:?}");
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ pub async fn upload(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to upload the contract!");
|
||||
|
||||
info!("Upload result: {:?}", res);
|
||||
info!("Upload result: {res:?}");
|
||||
|
||||
println!("{}", res.code_id)
|
||||
}
|
||||
|
||||
@@ -47,5 +47,5 @@ pub async fn delegate_to_mixnode(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to delegate to mixnode!");
|
||||
|
||||
info!("delegating to mixnode: {:?}", res);
|
||||
info!("delegating to mixnode: {res:?}");
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
|
||||
let records = match InputFileReader::new(&args.input) {
|
||||
Ok(records) => records,
|
||||
Err(e) => {
|
||||
println!("Error reading input file: {}", e);
|
||||
println!("Error reading input file: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -262,11 +262,11 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
|
||||
}
|
||||
|
||||
if !undelegation_msgs.is_empty() {
|
||||
println!("Undelegation records : \n{}\n\n", undelegation_table);
|
||||
println!("Undelegation records : \n{undelegation_table}\n\n");
|
||||
}
|
||||
|
||||
if !delegation_msgs.is_empty() {
|
||||
println!("Delegation records : \n{}\n\n", delegation_table);
|
||||
println!("Delegation records : \n{delegation_table}\n\n");
|
||||
}
|
||||
|
||||
let ans = inquire::Confirm::new("Do you want to continue with the shown operations?")
|
||||
@@ -275,7 +275,7 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
|
||||
.prompt();
|
||||
|
||||
if let Err(e) = ans {
|
||||
info!("Aborting, {}...", e);
|
||||
info!("Aborting, {e}...");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -348,7 +348,7 @@ pub async fn delegate_to_multiple_mixnodes(args: Args, client: SigningClient) {
|
||||
|
||||
if args.output.is_some() {
|
||||
if let Err(e) = write_to_csv(output_details, args.output) {
|
||||
info!("Failed to write to CSV, {}", e);
|
||||
info!("Failed to write to CSV, {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,5 +38,5 @@ pub async fn migrate_vested_delegation(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to migrate delegation!");
|
||||
|
||||
info!("migration result: {:?}", res)
|
||||
info!("migration result: {res:?}")
|
||||
}
|
||||
|
||||
@@ -40,5 +40,5 @@ pub async fn claim_delegator_reward(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to claim delegator-reward");
|
||||
|
||||
info!("Claiming delegator reward: {:?}", res)
|
||||
info!("Claiming delegator reward: {res:?}")
|
||||
}
|
||||
|
||||
+1
-1
@@ -40,5 +40,5 @@ pub async fn vesting_claim_delegator_reward(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to claim vesting delegator-reward");
|
||||
|
||||
info!("Claiming vesting delegator reward: {:?}", res)
|
||||
info!("Claiming vesting delegator reward: {res:?}")
|
||||
}
|
||||
|
||||
@@ -40,5 +40,5 @@ pub async fn undelegate_from_mixnode(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to remove stake from mixnode!");
|
||||
|
||||
info!("removing stake from mixnode: {:?}", res)
|
||||
info!("removing stake from mixnode: {res:?}")
|
||||
}
|
||||
|
||||
@@ -53,5 +53,5 @@ pub async fn vesting_delegate_to_mixnode(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to delegate to mixnode!");
|
||||
|
||||
info!("vesting delegating to mixnode: {:?}", res);
|
||||
info!("vesting delegating to mixnode: {res:?}");
|
||||
}
|
||||
|
||||
@@ -45,5 +45,5 @@ pub async fn vesting_undelegate_from_mixnode(args: Args, client: SigningClient)
|
||||
.await
|
||||
.expect("failed to remove stake from vesting account on mixnode!");
|
||||
|
||||
info!("removing stake from vesting mixnode: {:?}", res)
|
||||
info!("removing stake from vesting mixnode: {res:?}")
|
||||
}
|
||||
|
||||
@@ -73,5 +73,5 @@ pub async fn bond_gateway(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to bond gateway!");
|
||||
|
||||
info!("Bonding result: {:?}", res)
|
||||
info!("Bonding result: {res:?}")
|
||||
}
|
||||
|
||||
@@ -52,5 +52,5 @@ pub async fn migrate_to_nymnode(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to migrate gateway!");
|
||||
|
||||
info!("migration result: {:?}", res)
|
||||
info!("migration result: {res:?}")
|
||||
}
|
||||
|
||||
@@ -56,5 +56,5 @@ pub async fn update_config(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("updating gateway config");
|
||||
|
||||
info!("gateway config updated: {:?}", res)
|
||||
info!("gateway config updated: {res:?}")
|
||||
}
|
||||
|
||||
+1
-1
@@ -57,5 +57,5 @@ pub async fn vesting_update_config(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("updating vesting gateway config");
|
||||
|
||||
info!("gateway config updated: {:?}", res)
|
||||
info!("gateway config updated: {res:?}")
|
||||
}
|
||||
|
||||
@@ -17,5 +17,5 @@ pub async fn unbond_gateway(client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to unbond gateway!");
|
||||
|
||||
info!("Unbonding result: {:?}", res)
|
||||
info!("Unbonding result: {res:?}")
|
||||
}
|
||||
|
||||
@@ -73,5 +73,5 @@ pub async fn vesting_bond_gateway(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to bond gateway!");
|
||||
|
||||
info!("Vesting bonding gateway result: {:?}", res)
|
||||
info!("Vesting bonding gateway result: {res:?}")
|
||||
}
|
||||
|
||||
@@ -17,5 +17,5 @@ pub async fn vesting_unbond_gateway(client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to unbond vesting gateway!");
|
||||
|
||||
info!("Unbonding vesting result: {:?}", res)
|
||||
info!("Unbonding vesting result: {res:?}")
|
||||
}
|
||||
|
||||
@@ -106,5 +106,5 @@ pub async fn bond_mixnode(args: Args, client: SigningClient) {
|
||||
.await
|
||||
.expect("failed to bond mixnode!");
|
||||
|
||||
info!("Bonding result: {:?}", res)
|
||||
info!("Bonding result: {res:?}")
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user