Compare commits

...

69 Commits

Author SHA1 Message Date
Simon Wicky 9a38f1c3a6 parking branch 2026-05-29 15:07:59 +02:00
Simon Wicky fc79fe4738 trait rework, removed Ts and NdId generic 2026-05-28 14:09:36 +02:00
Simon Wicky 187c6a51fd nymnode pipeline in mix-sim pre trait rework, not optimal 2026-05-27 13:33:06 +02:00
Simon Wicky c93d106ca3 misc 2026-05-26 14:29:46 +02:00
Simon Wicky 5f1553d589 gateway_forwarding sphinx packet 2026-05-26 11:48:53 +02:00
Simon Wicky 258ceded26 back to separate pipelines 2026-05-22 15:47:23 +02:00
Simon Wicky be76065c66 tweak 2026-05-22 15:11:00 +02:00
Simon Wicky d2558d96e0 tiny fix 2026-05-22 11:53:29 +02:00
Simon Wicky 05ed775686 new addressing + pipeline unification and routing stubs 2026-05-22 11:49:37 +02:00
Simon Wicky c8f9959d7a stub gateway pipeline 2026-05-20 14:05:51 +02:00
Simon Wicky 8293870461 lock 2026-05-20 11:44:00 +02:00
Simon Wicky c0a8f97a20 different fragmentation 2026-05-20 11:42:32 +02:00
Simon Wicky 804b17517f tweak 2026-05-20 11:42:32 +02:00
Simon Wicky 2722544c86 lp metrics 2026-05-20 11:42:32 +02:00
Simon Wicky 732a09aa41 worker pool for processing 2026-05-20 11:42:32 +02:00
Simon Wicky e1c4085217 routing filter 2026-05-20 11:42:32 +02:00
Simon Wicky 34045d02b9 tweak 2026-05-20 11:42:31 +02:00
Simon Wicky b7a36373e5 pipeline unit test 2026-05-20 11:42:31 +02:00
Simon Wicky 17d16503a7 metrics 2026-05-20 11:42:31 +02:00
Simon Wicky df566933ba prepare multi threaded node 2026-05-20 11:42:31 +02:00
Simon Wicky f73f1a5219 sphinx and outfox processing 2026-05-20 11:42:31 +02:00
Simon Wicky 62a5d1437d tweak 2026-05-20 11:42:31 +02:00
Simon Wicky e952f9df24 clean reconstruction buffer 2026-05-20 11:42:31 +02:00
Simon Wicky 525e9314b4 prototype mixnode with dummy pipeline 2026-05-20 11:42:30 +02:00
Simon Wicky 8573004c34 rebasing cleanup 2026-05-20 11:38:35 +02:00
Simon Wicky 5636c5afc4 name change 2026-05-20 11:34:03 +02:00
Simon Wicky f505c29926 some PR review 2026-05-20 11:33:44 +02:00
Simon Wicky 95bec7422c tweaks and checked arithmetic 2026-05-20 11:33:31 +02:00
Simon Wicky c02c28f7cb add mut to transport layer 2026-05-20 11:33:31 +02:00
Simon Wicky 6fb4a98667 comments update 2026-05-20 11:33:30 +02:00
Simon Wicky 4a50f6dcd0 options in framing layer 2026-05-20 11:33:30 +02:00
Simon Wicky 53dec68378 remove anyhow error for in trait one 2026-05-20 11:33:30 +02:00
Simon Wicky f0ecdfd295 delete unnecessary unfinished type 2026-05-20 11:33:30 +02:00
Simon Wicky 668477c5c3 remove unnecessary imports 2026-05-20 11:33:30 +02:00
Simon Wicky 53aaa71178 cargo fmt 2026-05-20 11:33:30 +02:00
Simon Wicky 35517f1df6 nym-mix-sim crate 2026-05-20 11:33:29 +02:00
Simon Wicky ed5ddf0170 nym-lp-data crate 2026-05-20 11:33:14 +02:00
Simon Wicky 644e669a15 helper changes 2026-05-20 11:32:02 +02:00
Simon Wicky 1fd25529ce crate description 2026-05-20 11:28:36 +02:00
Simon Wicky 8677b98bcb fmt 2026-05-20 11:18:04 +02:00
Simon Wicky ca031af69a one more bit 2026-05-20 11:14:44 +02:00
Simon Wicky 7c0264b839 moving lp packets in lp-data crate 2026-05-20 11:10:46 +02:00
import this bde2b07d0d NTM: NIP-11 exit policy update (#6807) 2026-05-20 08:30:49 +00:00
import this 26538f5a40 Patch: Linux kernel vulnerability patch (#6773)
* add guide component

* add mitigate kernel playbook

* add to troubleshooting

* remove redundant

* remove redundant

* FIX ISSUES

* fix

* fix url to raw

* update docs and add new playbook

* update and simplify docs and ansible

* create ntm explanation component and import it

* rm mistaken empty file

* rm crap

* rm crap

* rm all crap

* try to fix nextra screaming seagul

* try to fix nextra screaming seagul

* try to fix nextra screaming seagul

* UX improvement by logic refactoring

* UX improvement by logic refactoring

* UX improvement by logic refactoring

* UX improvement by logic refactoring

* fix header urls

* fix command syntax

* fix indentation

* update auto-stats

* resolve review comments

* resolve review comments in docs

* fix remove kernel book

* soften warning

* address comments

* address comments

* update stats
2026-05-20 09:17:36 +02:00
dependabot[bot] 483bb6f477 build(deps): bump pnpm/action-setup from 4.2.0 to 5.0.0 (#6571)
Bumps [pnpm/action-setup](https://github.com/pnpm/action-setup) from 4.2.0 to 5.0.0.
- [Release notes](https://github.com/pnpm/action-setup/releases)
- [Commits](https://github.com/pnpm/action-setup/compare/v4.2.0...v5.0.0)

---
updated-dependencies:
- dependency-name: pnpm/action-setup
  dependency-version: 5.0.0
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 16:37:52 +01:00
dependabot[bot] a68355a75a Bump tauri from 2.10.3 to 2.11.1 in /nym-wallet (#6742)
Bumps [tauri](https://github.com/tauri-apps/tauri) from 2.10.3 to 2.11.1.
- [Release notes](https://github.com/tauri-apps/tauri/releases)
- [Commits](https://github.com/tauri-apps/tauri/compare/tauri-v2.10.3...tauri-v2.11.1)

---
updated-dependencies:
- dependency-name: tauri
  dependency-version: 2.11.1
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 16:36:30 +01:00
dependabot[bot] 1572d8e5c2 Bump rand from 0.8.5 to 0.8.6 in /contracts (#6702)
Bumps [rand](https://github.com/rust-random/rand) from 0.8.5 to 0.8.6.
- [Release notes](https://github.com/rust-random/rand/releases)
- [Changelog](https://github.com/rust-random/rand/blob/0.8.6/CHANGELOG.md)
- [Commits](https://github.com/rust-random/rand/compare/0.8.5...0.8.6)

---
updated-dependencies:
- dependency-name: rand
  dependency-version: 0.8.6
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 15:49:54 +01:00
dependabot[bot] fd76c5ca4d build(deps): bump microsoft/setup-msbuild from 2 to 3 (#6602)
Bumps [microsoft/setup-msbuild](https://github.com/microsoft/setup-msbuild) from 2 to 3.
- [Release notes](https://github.com/microsoft/setup-msbuild/releases)
- [Commits](https://github.com/microsoft/setup-msbuild/compare/v2...v3)

---
updated-dependencies:
- dependency-name: microsoft/setup-msbuild
  dependency-version: '3'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 15:49:24 +01:00
dependabot[bot] f94589c2d1 build(deps): bump tar from 0.4.44 to 0.4.45 (#6595)
Bumps [tar](https://github.com/alexcrichton/tar-rs) from 0.4.44 to 0.4.45.
- [Commits](https://github.com/alexcrichton/tar-rs/compare/0.4.44...0.4.45)

---
updated-dependencies:
- dependency-name: tar
  dependency-version: 0.4.45
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 15:49:14 +01:00
dependabot[bot] 1c40499829 build(deps): bump quinn-proto from 0.11.12 to 0.11.14 (#6549)
Bumps [quinn-proto](https://github.com/quinn-rs/quinn) from 0.11.12 to 0.11.14.
- [Release notes](https://github.com/quinn-rs/quinn/releases)
- [Commits](https://github.com/quinn-rs/quinn/compare/quinn-proto-0.11.12...quinn-proto-0.11.14)

---
updated-dependencies:
- dependency-name: quinn-proto
  dependency-version: 0.11.14
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 15:48:31 +01:00
dependabot[bot] f8a4d5f1ff build(deps): bump quinn-proto from 0.11.10 to 0.11.14 in /nym-wallet (#6548)
Bumps [quinn-proto](https://github.com/quinn-rs/quinn) from 0.11.10 to 0.11.14.
- [Release notes](https://github.com/quinn-rs/quinn/releases)
- [Commits](https://github.com/quinn-rs/quinn/compare/quinn-proto-0.11.10...quinn-proto-0.11.14)

---
updated-dependencies:
- dependency-name: quinn-proto
  dependency-version: 0.11.14
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 15:48:20 +01:00
dependabot[bot] 42807890af build(deps): bump docker/login-action from 3 to 4 (#6518)
Bumps [docker/login-action](https://github.com/docker/login-action) from 3 to 4.
- [Release notes](https://github.com/docker/login-action/releases)
- [Commits](https://github.com/docker/login-action/compare/v3...v4)

---
updated-dependencies:
- dependency-name: docker/login-action
  dependency-version: '4'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 15:48:09 +01:00
dependabot[bot] 5aa576b596 build(deps): bump actions/download-artifact from 7 to 8 (#6497)
Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 7 to 8.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v7...v8)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: '8'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 15:47:59 +01:00
dependabot[bot] 0215ad9294 build(deps): bump actions/upload-artifact from 6 to 7 (#6496)
Bumps [actions/upload-artifact](https://github.com/actions/upload-artifact) from 6 to 7.
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](https://github.com/actions/upload-artifact/compare/v6...v7)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-version: '7'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-19 15:47:26 +01:00
ZM 227e6a10e1 fix(ecash): cast usize to u64 in to_bytes() for 32-bit platform compatibility (#6528)
VerificationKeyAuth::to_bytes() and SecretKeyAuth::to_bytes() used
usize::to_le_bytes() to serialize vector lengths, producing 4 bytes on
32-bit and 8 bytes on 64-bit. Since from_bytes() always reads 8 bytes
(u64), this caused ZK proof challenge hash mismatches when a 32-bit
client's proof was verified by a 64-bit gateway, resulting in
"the provided ticket failed to get verified" on all 32-bit platforms.
2026-05-19 15:17:24 +01:00
Jędrzej Stuczyński d3b6a270de chore: expose admin method for migrating vesting delegations/mixnodes (#6795)
* chore: expose admin method for migrating vesting delegations/mixnodes

* don't error out on vested delegation no longer existing - perform a noop instead

* cargo fmt

* add message for batch migration
2026-05-19 15:13:03 +01:00
mfahampshire e12ada0105 Point mobile reference section at nymvpn setup (#6776) 2026-05-19 13:00:51 +00:00
Simon Wicky 71d50d79c2 fix clippy 1.95 lints (#6794) 2026-05-19 14:21:12 +02:00
Jędrzej Stuczyński a21a01cf1a node families (#6715)
* start node families topic branch

* start node families topic branch

* initialise node families contract

* define contract storage

* registering new family in storage

* accepting family invitation

* add_pending_invitation

* revoke_pending_invitation

* remove_family_member

* reject_pending_invitation

* disband_family

* added unit tests for the storage methods

* added restriction on uniquness of family names

* update rustc version for node families contract common

* clippy

* basic queries by id

* query_families_paged

* change family membership storage and expose query for all members of a family

* queries for pending invitations

* queries for past invitations

* queries for past data per node

* queries for past family members

* query_past_members_for_node_paged

* queries for family by name and by owner

* fixup family name normalisation

* fixed incorrect lower bound for queries for past data

* implement contract and storage initialisation

* stubbing tx messages that are to be exposed by the contract

* handler for updating config

* removed partial fee return

* wip: create family

* move mixnet contract interaction traits to shared location

* store original family name alongside the normalised variant

* prevent family creation if owner has a node in another family

* try_disband_family

* try_invite_to_family + shared helpers

* try_revoke_family_invitation

* accept_family_invitation

* stub method for node unbonding

* try_reject_family_invitation

* unit tests for family name normalisation

* try_leave_family

* try_kick_from_family

* fix outdated comments and add paid fee event attribute

* feat: NMv3: leave family upon node unbonding

* NF contract handling of unbonding

* lints

* init node families contract when creating performance contract tester

* clippy

* avoid self-dep in the contract dev deps

* introduced client traits for interacting with the node families contract

* add node families contract to cache refresher

* added query for all node family members (globally) and started scaffolding nym-api caches

* docs and cache -> api conversion

* calculating average node age based on individual timestamps

* wire up node families cache

* http stubs

* filled in the implementation

* route tests + extracting shared code

* review fixes

* feat: expose family information for all dvpn gateway endpoints within NS API

* expose family information for explorer v3 route

* clippy

* review comments and optimise db family update

* feat: Node Families: expose stake information inside DVpnGateway

* chore: update lock files after rebase

* chore: sort workspace members

* explicitly require providing node families contract address for mixnet contract migration

* fix missing node families contract address env export

* dont swallow cache overwrite failures in fixture

* pin network-defaults rustc version due to contracts dep

* further version pinning

* chore: update mixnet contract schema
2026-05-19 10:36:20 +01:00
Jack Wampler 362f84b5f6 Handle Rate Limit Challenge Response (#6786)
rotate urls on HTTP response error indicating API rate limiting
2026-05-18 08:47:41 -06:00
benedettadavico daed9cd15b Merge branch 'release/2026.9-venaco' into develop 2026-05-16 06:27:52 +02:00
Jack Wampler a53ca71bd2 Re-order default API urls for network details (#6767) 2026-05-15 09:46:33 -06:00
mfahampshire a70e68c7bd Max/smolmix docs (#6716)
* Smolmix documentation

* Add smolmix docs: landing page, tutorials, and developer page links

* Add Exit Gateway services page (NR vs IPR) and link from existing docs

* Update auto-generated command and API outputs

* Reorg of tutorials and architecture pages

* License information + remove TODO from docs.rs visibile comment + reorg
readme

* Add versions file for doc-wide versioning

* Relative -> absolute links

* Relative -> absolute links

* Update license + add old tutorial code as examples

* Streamline smolmix docs

* Clippy

* Clean up doc comments

* Last pass

* Add larger file download to list

* set new versions

* Clippy

* Remove blake pin from docs + add version range to root Cargo.toml

* Format example logging

* Remove crate blocked component

* Loose whitespace

* Add doc verification script for inline mdx

* Formatting

* Components regen

* Reorg + tighten text

* Voicing cohesion pass + remove bloated examples

* Voicing cont.

* Reduce max download size

* Small suggested clarifications

* Max/docs voicing consistency (#6769)

* Reduce max download size

* voicing consistency across docs

* New landing order w smolmix

* Tweaks

* Final tweaks
2026-05-13 11:19:44 +00:00
import this fdebed7c38 Bugfix: nym-node-cly.py argument mismatch fix and sync up with NTM updates (#6743)
* fix argument missmatch and sync args with recent NTM update

* fix wg_enabled check & name consistency

* correct env.os saving persisting vars logic

* fix naming issue
2026-05-12 11:52:46 +02:00
benedetta davico f576a4ee2d Merge pull request #6764 from nymtech/bdq/add-ci-build-NM-agents
add ci for NM agent binary
2026-05-12 10:40:23 +02:00
benedettadavico a9aafd785e publish NM agent binary 2026-05-12 10:34:34 +02:00
benedetta davico 0f7dbb94a8 fix for crates (#6745)
* version fix

* try to publish core crates first

* bump version ci

* fix to yaml

* Slight modifications to ordering, remove core-crates and rely on  ordering as test + sed tweak

* crates release: bump version to 1.21.0 (#6744)

Co-authored-by: Nym bot <nym-bot@users.noreply.github.com>
Co-authored-by: mfahampshire <maxhampshire@pm.me>

* Remove unnecessary verification step becase of dryrun (doubled)

* Revert some changes to develop

* Add preflight to its own workflow

* Clippy

* Update crate publishing file

* Clippy

---------

Co-authored-by: benedettadavico <benedettadavico@users.noreply.github.com>
Co-authored-by: mfahampshire <maxhampshire@pm.me>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Nym bot <nym-bot@users.noreply.github.com>
2026-05-11 14:50:14 +00:00
Jędrzej Stuczyński 2d72b1b201 feat: introduce shared contract caches within Nym API (#6760)
it has been extracted from the mix stress testing branch and it is going to be used within node families branch
2026-05-11 13:02:37 +01:00
Jędrzej Stuczyński 412657f773 chore: removed dead code for redundant mixnet-vesting integration tests (#6759) 2026-05-11 10:03:56 +01:00
448 changed files with 34888 additions and 6144 deletions
+2 -2
View File
@@ -25,14 +25,14 @@ jobs:
echo "file2=$(ls nym-vpn*.deb)" >> $GITHUB_ENV
- name: Upload nym-repo-setup
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: ${{ env.file1 }}
path: ppa/packages/nym-repo-setup*.deb
retention-days: 10
- name: Upload nym-vpn
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: ${{ env.file2 }}
path: ppa/packages/nym-vpn*.deb
+4 -1
View File
@@ -21,7 +21,7 @@ jobs:
run: sudo apt-get install -y rsync
- uses: rlespinasse/github-slug-action@v3.x
- name: Setup pnpm
uses: pnpm/action-setup@v4.2.0
uses: pnpm/action-setup@v5.0.0
with:
version: 9
- uses: actions/setup-node@v4
@@ -37,6 +37,9 @@ jobs:
command: build
args: --workspace --release
- name: Verify doc versions
run: ${{ github.workspace }}/documentation/scripts/verify-doc-versions.sh
working-directory: ${{ github.workspace }}
- name: Install project dependencies
run: pnpm i
- name: Generate llms-full.txt
@@ -110,7 +110,7 @@ jobs:
- name: Upload Artifact
if: github.event_name == 'workflow_dispatch'
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: nym-binaries-artifacts
path: |
@@ -0,0 +1,63 @@
name: ci-build-upload-network-monitor-agent
on:
workflow_dispatch:
jobs:
build-and-upload:
strategy:
fail-fast: false
matrix:
platform: [arc-ubuntu-22.04]
runs-on: ${{ matrix.platform }}
env:
CARGO_TERM_COLOR: always
RUSTUP_PERMIT_COPY_RENAME: 1
steps:
- uses: actions/checkout@v6
- name: Prepare build output directory
shell: bash
env:
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
run: |
rm -rf ci-builds || true
mkdir -p "$OUTPUT_DIR"
- name: Install Dependencies (Linux)
run: sudo apt-get update && sudo apt-get -y install libudev-dev
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ vars.REQUIRED_RUSTC_VERSION }}
- name: Build nym-network-monitor-agent
shell: bash
run: cargo build -p nym-network-monitor-agent --release
- name: Upload artifact
uses: actions/upload-artifact@v6
with:
name: nym-network-monitor-agent
path: target/release/nym-network-monitor-agent
retention-days: 30
- name: Prepare build output
shell: bash
env:
OUTPUT_DIR: ci-builds/${{ github.ref_name }}
run: cp target/release/nym-network-monitor-agent "$OUTPUT_DIR"
- name: Deploy to CI www
uses: easingthemes/ssh-deploy@main
env:
SSH_PRIVATE_KEY: ${{ secrets.CI_WWW_SSH_PRIVATE_KEY }}
ARGS: "-avzr"
SOURCE: "ci-builds/"
REMOTE_HOST: ${{ secrets.CI_WWW_REMOTE_HOST }}
REMOTE_USER: ${{ secrets.CI_WWW_REMOTE_USER }}
TARGET: ${{ secrets.CI_WWW_REMOTE_TARGET }}/builds/
EXCLUDE: "/dist/, /node_modules/"
+19
View File
@@ -0,0 +1,19 @@
name: ci-crates-preflight
on:
workflow_dispatch:
pull_request:
paths:
- 'Cargo.toml'
- '**/Cargo.toml'
- 'tools/internal/check_publish_preflight.py'
- '.github/workflows/ci-crates-preflight.yml'
jobs:
preflight:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v6
- name: Preflight publish checks
run: python3 tools/internal/check_publish_preflight.py
@@ -57,7 +57,8 @@ jobs:
- name: Update workspace dependencies
run: |
sed -i '/path = /s/version = "${{ steps.current_version.outputs.version }}"/version = "${{ inputs.version }}"/g' Cargo.toml
# Match any semver version on lines with `path = `, not just the current workspace version.
sed -i '/path = /s/version = "[0-9][0-9]*\.[0-9][0-9]*\.[0-9][0-9]*"/version = "${{ inputs.version }}"/g' Cargo.toml
- name: Bump versions (local only)
run: |
+5 -1
View File
@@ -33,7 +33,11 @@ jobs:
- name: Install cargo-workspaces
run: cargo install cargo-workspaces
# `--publish-as-is` skips version bumping since that's done in a separate CI job.
- name: Preflight publish checks
run: |
python3 tools/internal/check_publish_preflight.py
# --publish-as-is skips version bumping since that's done in a separate CI job.
- name: Publish
env:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
+29 -2
View File
@@ -19,6 +19,7 @@ jobs:
RUSTUP_PERMIT_COPY_RENAME: 1
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repo
uses: actions/checkout@v6
@@ -58,7 +59,9 @@ jobs:
- name: Update workspace dependencies
run: |
sed -i '/path = /s/version = "${{ steps.current_version.outputs.version }}"/version = "${{ inputs.version }}"/g' Cargo.toml
# Match any semver version on lines with `path = `, not just the current workspace version.
# This catches entries whose version has drifted (e.g. nym-sqlx-pool-guard at 1.2.0).
sed -i '/path = /s/version = "[0-9][0-9]*\.[0-9][0-9]*\.[0-9][0-9]*"/version = "${{ inputs.version }}"/g' Cargo.toml
- name: Bump versions
run: |
@@ -68,9 +71,33 @@ jobs:
- name: Commit and push version bump
run: |
set -euo pipefail
BASE_BRANCH="${GITHUB_REF_NAME}"
PR_BRANCH="ci/crates-version-bump-${{ inputs.version }}-${GITHUB_RUN_ID}"
git checkout -b "$PR_BRANCH"
git add -A
git commit -m "crates release: bump version to ${{ inputs.version }}"
git push
git push -u origin "$PR_BRANCH"
cat > /tmp/crates-version-bump-pr-body.md <<'EOF'
This PR was created by CI because direct pushes to the release branch are blocked by branch protection rules.
## Summary
- Bump workspace crate versions to the requested release version.
- Update workspace dependency versions accordingly.
## Notes
- Merge this PR to proceed with crates.io publishing.
EOF
gh pr create \
--base "$BASE_BRANCH" \
--head "$PR_BRANCH" \
--title "crates release: bump version to ${{ inputs.version }}" \
--body-file /tmp/crates-version-bump-pr-body.md
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Show package versions
run: cargo workspaces list --long
+4 -1
View File
@@ -28,7 +28,7 @@ jobs:
run: sudo apt-get install -y rsync
- uses: rlespinasse/github-slug-action@v3.x
- name: Setup pnpm
uses: pnpm/action-setup@v4.2.0
uses: pnpm/action-setup@v5.0.0
with:
version: 9
- uses: actions/setup-node@v4
@@ -61,6 +61,9 @@ jobs:
cd ${{ github.workspace }}/sdk/typescript/packages/sdk && typedoc --skipErrorChecking
cd ${{ github.workspace }}/sdk/typescript/packages/mix-fetch && typedoc --skipErrorChecking
- name: Verify doc versions
run: ${{ github.workspace }}/documentation/scripts/verify-doc-versions.sh
working-directory: ${{ github.workspace }}
- name: Install project dependencies
run: pnpm i
- name: Generate llms-full.txt
+1 -1
View File
@@ -20,7 +20,7 @@ jobs:
find . -name Cargo.toml -exec cargo deny --manifest-path {} check \
advisories -A advisory-not-detected --hide-inclusion-graph \; &> \
>(uniq &> .github/workflows/support-files/notifications/deny.message )
- uses: actions/upload-artifact@v6
- uses: actions/upload-artifact@v7
with:
name: report
path: .github/workflows/support-files/notifications/deny.message
+1 -1
View File
@@ -66,7 +66,7 @@ jobs:
args: --workspace --release ${{ env.CARGO_FEATURES }}
- name: Upload Artifact
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: my-artifact
path: |
+2 -2
View File
@@ -27,14 +27,14 @@ jobs:
run: make contracts
- name: Upload Mixnet Contract Artifact
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: mixnet_contract.wasm
path: contracts/target/wasm32-unknown-unknown/release/mixnet_contract.wasm
retention-days: 5
- name: Upload Vesting Contract Artifact
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: vesting_contract.wasm
path: contracts/target/wasm32-unknown-unknown/release/vesting_contract.wasm
@@ -108,7 +108,7 @@ jobs:
cd -
- name: Upload Artifact
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: nym-wallet.app.tar.gz
path: nym-wallet/target/x86_64-apple-darwin/release/bundle/macos/nym-wallet.app.tar.gz
@@ -132,7 +132,7 @@ jobs:
fi
- name: Upload Artifact
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: nym-wallet-appimage.tar.gz
path: |
@@ -38,7 +38,7 @@ jobs:
toolchain: ${{ vars.REQUIRED_RUSTC_VERSION }}
- name: Setup MSBuild.exe
uses: microsoft/setup-msbuild@v2
uses: microsoft/setup-msbuild@v3
# No cache:yarn here: setup-node needs yarn on PATH to populate the cache, but this runner
# only gets yarn from the step below.
@@ -165,7 +165,7 @@ jobs:
find . -name "*.msi" -type f
- name: Upload Artifact
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: nym-wallet.msi
path: |
@@ -76,7 +76,7 @@ jobs:
apk/nyms5-arch64-release.apk
- name: Upload APKs
uses: actions/upload-artifact@v6
uses: actions/upload-artifact@v7
with:
name: nyms5-apk-arch64
path: |
@@ -91,7 +91,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v6
- name: Download binary artifact
uses: actions/download-artifact@v7
uses: actions/download-artifact@v8
with:
name: nyms5-apk-arch64
path: apk
+1 -1
View File
@@ -11,7 +11,7 @@ jobs:
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
+1 -1
View File
@@ -11,7 +11,7 @@ jobs:
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
+1 -1
View File
@@ -11,7 +11,7 @@ jobs:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
@@ -18,7 +18,7 @@ jobs:
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
+1 -1
View File
@@ -17,7 +17,7 @@ jobs:
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
+1 -1
View File
@@ -11,7 +11,7 @@ jobs:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
+1 -1
View File
@@ -11,7 +11,7 @@ jobs:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
@@ -11,7 +11,7 @@ jobs:
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
@@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
@@ -11,7 +11,7 @@ jobs:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
uses: docker/login-action@v4
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
+1 -1
View File
@@ -30,7 +30,7 @@ jobs:
with:
release-tag-or-name-or-id: ${{ inputs.release_tag }}
- uses: actions/upload-artifact@v6
- uses: actions/upload-artifact@v7
with:
name: Asset Hashes
path: hashes.json
+1
View File
@@ -78,3 +78,4 @@ CLAUDE.md
/notes
/target-otel
test-tutorials/
Generated
+694 -417
View File
File diff suppressed because it is too large Load Diff
+129 -121
View File
@@ -31,7 +31,6 @@ members = [
"common/client-libs/mixnet-client",
"common/client-libs/validator-client",
"common/commands",
"common/nym-common",
"common/config",
"common/cosmwasm-smart-contracts/coconut-dkg",
"common/cosmwasm-smart-contracts/contracts-common",
@@ -41,6 +40,7 @@ members = [
"common/cosmwasm-smart-contracts/group-contract",
"common/cosmwasm-smart-contracts/mixnet-contract",
"common/cosmwasm-smart-contracts/multisig-contract",
"common/cosmwasm-smart-contracts/node-families-contract",
"common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/nym-pool-contract",
"common/cosmwasm-smart-contracts/vesting-contract",
@@ -70,11 +70,15 @@ members = [
"common/node-tester-utils",
"common/nonexhaustive-delayqueue",
"common/nym-cache",
"common/nym-common",
"common/nym-connection-monitor",
"common/nym-id",
"common/nym-kcp",
"common/nym-lp",
"common/nym-kkt",
"common/nym-kkt-ciphersuite",
"common/nym-kkt-context",
"common/nym-lp",
"common/nym-lp-data",
"common/nym-metrics",
"common/nym_offline_compact_ecash",
"common/nymnoise",
@@ -90,9 +94,9 @@ members = [
"common/nymsphinx/params",
"common/nymsphinx/routing",
"common/nymsphinx/types",
"common/nyxd-scraper-sqlite",
"common/nyxd-scraper-psql",
"common/nyxd-scraper-shared",
"common/nyxd-scraper-sqlite",
"common/pemstore",
"common/registration",
"common/serde-helpers",
@@ -122,6 +126,7 @@ members = [
"common/zulip-client",
"documentation/autodoc",
"gateway",
"integration-tests",
"nym-api",
"nym-api/nym-api-requests",
"nym-authenticator-client",
@@ -129,7 +134,9 @@ members = [
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-data-observatory",
"nym-gateway-probe",
"nym-ip-packet-client",
"nym-mix-sim",
"nym-network-monitor",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
@@ -140,6 +147,7 @@ members = [
"nym-outfox",
"nym-registration-client",
"nym-signers-monitor",
"nym-sqlx-pool-guard",
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
@@ -147,19 +155,18 @@ members = [
"sdk/ffi/go",
"sdk/ffi/shared",
"sdk/rust/nym-sdk",
"smolmix/core",
"service-providers/common",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"nym-sqlx-pool-guard",
"smolmix/core",
"tools/echo-server",
"tools/internal/contract-state-importer/importer-cli",
"tools/internal/contract-state-importer/importer-contract",
"tools/internal/localnet-orchestrator",
"tools/internal/localnet-orchestrator/dkg-bypass-contract",
"tools/internal/mixnet-connectivity-check",
# "tools/internal/sdk-version-bump",
"tools/internal/ssl-inject",
"tools/internal/localnet-orchestrator",
"tools/internal/localnet-orchestrator/dkg-bypass-contract",
"tools/internal/validator-status-check",
"tools/nym-cli",
"tools/nym-id-cli",
@@ -172,27 +179,24 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"nym-gateway-probe",
"integration-tests",
"common/nym-kkt-ciphersuite",
"common/nym-kkt-context",
]
default-members = [
"clients/native",
"clients/socks5",
"nym-authenticator-client",
"nym-api",
"nym-authenticator-client",
"nym-credential-proxy/nym-credential-proxy",
"nym-mix-sim",
"nym-node",
"nym-registration-client",
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/ip-packet-router",
"service-providers/network-requester",
"tools/internal/localnet-orchestrator",
"tools/nymvisor",
"nym-registration-client",
"tools/internal/localnet-orchestrator"
]
exclude = ["contracts", "nym-wallet", "cpu-cycles"]
@@ -206,7 +210,7 @@ edition = "2024"
license = "Apache-2.0"
rust-version = "1.87.0"
readme = "README.md"
version = "1.20.4"
version = "1.21.0"
[workspace.dependencies]
addr = "0.15.6"
@@ -229,7 +233,7 @@ base85rs = "0.1.3"
bincode = "1.3.3"
bip39 = { version = "2.0.0", features = ["zeroize"] }
bitvec = "1.0.0"
blake3 = "1.7.0"
blake3 = ">=1.7, <1.8.4" # blake3 1.8.4+ requires digest 0.11; workspace is on 0.10
bloomfilter = "3.0.1"
bs58 = "0.5.1"
bytecodec = "0.4.15"
@@ -325,7 +329,7 @@ pnet_packet = "0.35.0"
publicsuffix = "2.3.0"
proc_pidinfo = "0.1.3"
quote = "1"
rand = "0.8.5"
rand = "0.8.6"
rand09 = { package = "rand", version = "=0.9.2" }
rand_chacha = "0.3"
rand_chacha09 = { package = "rand_chacha", version = "=0.9.0" }
@@ -349,7 +353,6 @@ serde_yaml = "0.9.25"
serde_plain = "1.0.2"
sha2 = "0.10.3"
si-scale = "0.2.3"
smolmix = { version = "0.0.1", path = "smolmix/core" }
smoltcp = "0.12"
snow = "0.9.6"
sphinx-packet = "=0.6.0"
@@ -360,7 +363,7 @@ subtle-encoding = "0.5"
syn = "2"
sysinfo = "0.37.0"
tap = "1.0.1"
tar = "0.4.44"
tar = "0.4.45"
test-with = { version = "0.15.4", default-features = false }
tempfile = "3.20"
thiserror = "2.0"
@@ -414,110 +417,114 @@ libcrux-sha3 = "0.0.8"
libcrux-traits = "0.0.6"
# Workspace dep definitions required by crates.io publication - we need a workspace version since `cargo workspaces` doesn't work with path imports from crate manifests
nym-api-requests = { version = "1.20.4", path = "nym-api/nym-api-requests" }
nym-authenticator-requests = { version = "1.20.4", path = "common/authenticator-requests" }
nym-async-file-watcher = { version = "1.20.4", path = "common/async-file-watcher" }
nym-authenticator-client = { version = "1.20.4", path = "nym-authenticator-client" }
nym-bandwidth-controller = { version = "1.20.4", path = "common/bandwidth-controller" }
nym-bin-common = { version = "1.20.4", path = "common/bin-common" }
nym-cache = { version = "1.20.4", path = "common/nym-cache" }
nym-client-core = { version = "1.20.4", path = "common/client-core", default-features = false }
nym-client-core-config-types = { version = "1.20.4", path = "common/client-core/config-types" }
nym-client-core-gateways-storage = { version = "1.20.4", path = "common/client-core/gateways-storage" }
nym-client-core-surb-storage = { version = "1.20.4", path = "common/client-core/surb-storage" }
nym-client-websocket-requests = { version = "1.20.4", path = "clients/native/websocket-requests" }
nym-common = { version = "1.20.4", path = "common/nym-common" }
nym-compact-ecash = { version = "1.20.4", path = "common/nym_offline_compact_ecash" }
nym-config = { version = "1.20.4", path = "common/config" }
nym-contracts-common = { version = "1.20.4", path = "common/cosmwasm-smart-contracts/contracts-common" }
nym-coconut-dkg-common = { version = "1.20.4", path = "common/cosmwasm-smart-contracts/coconut-dkg" }
nym-credential-storage = { version = "1.20.4", path = "common/credential-storage" }
nym-credential-utils = { version = "1.20.4", path = "common/credential-utils" }
nym-credential-proxy-lib = { version = "1.20.4", path = "common/credential-proxy" }
nym-credentials = { version = "1.20.4", path = "common/credentials", default-features = false }
nym-credentials-interface = { version = "1.20.4", path = "common/credentials-interface" }
nym-credential-proxy-requests = { version = "1.20.4", path = "nym-credential-proxy/nym-credential-proxy-requests", default-features = false }
nym-credential-verification = { version = "1.20.4", path = "common/credential-verification" }
nym-crypto = { version = "1.20.4", path = "common/crypto", default-features = false }
nym-dkg = { version = "1.20.4", path = "common/dkg" }
nym-ecash-contract-common = { version = "1.20.4", path = "common/cosmwasm-smart-contracts/ecash-contract" }
nym-ecash-signer-check = { version = "1.20.4", path = "common/ecash-signer-check" }
nym-ecash-signer-check-types = { version = "1.20.4", path = "common/ecash-signer-check-types" }
nym-ecash-time = { version = "1.20.4", path = "common/ecash-time" }
nym-exit-policy = { version = "1.20.4", path = "common/exit-policy" }
nym-ffi-shared = { version = "1.20.4", path = "sdk/ffi/shared" }
nym-gateway-client = { version = "1.20.4", path = "common/client-libs/gateway-client", default-features = false }
nym-api-requests = { version = "1.21.0", path = "nym-api/nym-api-requests" }
nym-authenticator-requests = { version = "1.21.0", path = "common/authenticator-requests" }
nym-async-file-watcher = { version = "1.21.0", path = "common/async-file-watcher" }
nym-authenticator-client = { version = "1.21.0", path = "nym-authenticator-client" }
nym-bandwidth-controller = { version = "1.21.0", path = "common/bandwidth-controller" }
nym-bin-common = { version = "1.21.0", path = "common/bin-common" }
nym-cache = { version = "1.21.0", path = "common/nym-cache" }
nym-client-core = { version = "1.21.0", path = "common/client-core", default-features = false }
nym-client-core-config-types = { version = "1.21.0", path = "common/client-core/config-types" }
nym-client-core-gateways-storage = { version = "1.21.0", path = "common/client-core/gateways-storage" }
nym-client-core-surb-storage = { version = "1.21.0", path = "common/client-core/surb-storage" }
nym-client-websocket-requests = { version = "1.21.0", path = "clients/native/websocket-requests" }
nym-common = { version = "1.21.0", path = "common/nym-common" }
nym-compact-ecash = { version = "1.21.0", path = "common/nym_offline_compact_ecash" }
nym-config = { version = "1.21.0", path = "common/config" }
nym-contracts-common = { version = "1.21.0", path = "common/cosmwasm-smart-contracts/contracts-common" }
nym-coconut-dkg-common = { version = "1.21.0", path = "common/cosmwasm-smart-contracts/coconut-dkg" }
nym-credential-storage = { version = "1.21.0", path = "common/credential-storage" }
nym-credential-utils = { version = "1.21.0", path = "common/credential-utils" }
nym-credential-proxy-lib = { version = "1.21.0", path = "common/credential-proxy" }
nym-credentials = { version = "1.21.0", path = "common/credentials", default-features = false }
nym-credentials-interface = { version = "1.21.0", path = "common/credentials-interface" }
nym-credential-proxy-requests = { version = "1.21.0", path = "nym-credential-proxy/nym-credential-proxy-requests", default-features = false }
nym-credential-verification = { version = "1.21.0", path = "common/credential-verification" }
nym-crypto = { version = "1.21.0", path = "common/crypto", default-features = false }
nym-dkg = { version = "1.21.0", path = "common/dkg" }
nym-ecash-contract-common = { version = "1.21.0", path = "common/cosmwasm-smart-contracts/ecash-contract" }
nym-ecash-signer-check = { version = "1.21.0", path = "common/ecash-signer-check" }
nym-ecash-signer-check-types = { version = "1.21.0", path = "common/ecash-signer-check-types" }
nym-ecash-time = { version = "1.21.0", path = "common/ecash-time" }
nym-exit-policy = { version = "1.21.0", path = "common/exit-policy" }
nym-ffi-shared = { version = "1.21.0", path = "sdk/ffi/shared" }
nym-gateway-client = { version = "1.21.0", path = "common/client-libs/gateway-client", default-features = false }
nym-gateway-probe = { version = "1.18.0", path = "nym-gateway-probe" }
nym-gateway-requests = { version = "1.20.4", path = "common/gateway-requests" }
nym-gateway-storage = { version = "1.20.4", path = "common/gateway-storage" }
nym-gateway-stats-storage = { version = "1.20.4", path = "common/gateway-stats-storage" }
nym-group-contract-common = { version = "1.20.4", path = "common/cosmwasm-smart-contracts/group-contract" }
nym-http-api-client = { version = "1.20.4", path = "common/http-api-client" }
nym-http-api-client-macro = { version = "1.20.4", path = "common/http-api-client-macro" }
nym-http-api-common = { version = "1.20.4", path = "common/http-api-common", default-features = false }
nym-id = { version = "1.20.4", path = "common/nym-id" }
nym-ip-packet-client = { version = "1.20.4", path = "nym-ip-packet-client" }
nym-ip-packet-requests = { version = "1.20.4", path = "common/ip-packet-requests" }
nym-lp = { version = "1.20.4", path = "common/nym-lp" }
nym-kkt = { version = "0.1.0", path = "common/nym-kkt" }
nym-kkt-ciphersuite = { version = "1.20.4", path = "common/nym-kkt-ciphersuite" }
nym-kkt-context = { version = "1.20.4", path = "common/nym-kkt-context" }
nym-metrics = { version = "1.20.4", path = "common/nym-metrics" }
nym-mixnet-client = { version = "1.20.4", path = "common/client-libs/mixnet-client" }
nym-mixnet-contract-common = { version = "1.20.4", path = "common/cosmwasm-smart-contracts/mixnet-contract" }
nym-multisig-contract-common = { version = "1.20.4", path = "common/cosmwasm-smart-contracts/multisig-contract" }
nym-network-defaults = { version = "1.20.4", path = "common/network-defaults" }
nym-node-tester-utils = { version = "1.20.4", path = "common/node-tester-utils" }
nym-noise = { version = "1.20.4", path = "common/nymnoise" }
nym-noise-keys = { version = "1.20.4", path = "common/nymnoise/keys" }
nym-nonexhaustive-delayqueue = { version = "1.20.4", path = "common/nonexhaustive-delayqueue" }
nym-node-requests = { version = "1.20.4", path = "nym-node/nym-node-requests", default-features = false }
nym-node-metrics = { version = "1.20.4", path = "nym-node/nym-node-metrics" }
nym-ordered-buffer = { version = "1.20.4", path = "common/socks5/ordered-buffer" }
nym-outfox = { version = "1.20.4", path = "nym-outfox" }
nym-registration-common = { version = "1.20.4", path = "common/registration" }
nym-pemstore = { version = "1.20.4", path = "common/pemstore" }
nym-performance-contract-common = { version = "1.20.4", path = "common/cosmwasm-smart-contracts/nym-performance-contract" }
nym-sdk = { version = "1.20.4", path = "sdk/rust/nym-sdk" }
nym-serde-helpers = { version = "1.20.4", path = "common/serde-helpers" }
nym-service-providers-common = { version = "1.20.4", path = "service-providers/common" }
nym-service-provider-requests-common = { version = "1.20.4", path = "common/service-provider-requests-common" }
nym-socks5-client-core = { version = "1.20.4", path = "common/socks5-client-core" }
nym-socks5-proxy-helpers = { version = "1.20.4", path = "common/socks5/proxy-helpers" }
nym-socks5-requests = { version = "1.20.4", path = "common/socks5/requests" }
nym-sphinx = { version = "1.20.4", path = "common/nymsphinx" }
nym-sphinx-acknowledgements = { version = "1.20.4", path = "common/nymsphinx/acknowledgements" }
nym-sphinx-addressing = { version = "1.20.4", path = "common/nymsphinx/addressing" }
nym-sphinx-anonymous-replies = { version = "1.20.4", path = "common/nymsphinx/anonymous-replies" }
nym-sphinx-chunking = { version = "1.20.4", path = "common/nymsphinx/chunking" }
nym-sphinx-cover = { version = "1.20.4", path = "common/nymsphinx/cover" }
nym-sphinx-forwarding = { version = "1.20.4", path = "common/nymsphinx/forwarding" }
nym-sphinx-framing = { version = "1.20.4", path = "common/nymsphinx/framing" }
nym-sphinx-params = { version = "1.20.4", path = "common/nymsphinx/params" }
nym-sphinx-routing = { version = "1.20.4", path = "common/nymsphinx/routing" }
nym-sphinx-types = { version = "1.20.4", path = "common/nymsphinx/types" }
nym-statistics-common = { version = "1.20.4", path = "common/statistics" }
nym-store-cipher = { version = "1.20.4", path = "common/store-cipher" }
nym-task = { version = "1.20.4", path = "common/task" }
nym-tun = { version = "1.20.4", path = "common/tun" }
nym-test-utils = { version = "1.20.4", path = "common/test-utils" }
nym-ticketbooks-merkle = { version = "1.20.4", path = "common/ticketbooks-merkle" }
nym-topology = { version = "1.20.4", path = "common/topology" }
nym-types = { version = "1.20.4", path = "common/types" }
nym-upgrade-mode-check = { version = "1.20.4", path = "common/upgrade-mode-check" }
nym-validator-client = { version = "1.20.4", path = "common/client-libs/validator-client", default-features = false }
nym-vesting-contract-common = { version = "1.20.4", path = "common/cosmwasm-smart-contracts/vesting-contract" }
nym-verloc = { version = "1.20.4", path = "common/verloc" }
nym-wireguard = { version = "1.20.4", path = "common/wireguard" }
nym-wireguard-types = { version = "1.20.4", path = "common/wireguard-types" }
nym-wireguard-private-metadata-shared = { version = "1.20.4", path = "common/wireguard-private-metadata/shared" }
nym-wireguard-private-metadata-client = { version = "1.20.4", path = "common/wireguard-private-metadata/client" }
nym-wireguard-private-metadata-server = { version = "1.20.4", path = "common/wireguard-private-metadata/server" }
nym-gateway-requests = { version = "1.21.0", path = "common/gateway-requests" }
nym-gateway-storage = { version = "1.21.0", path = "common/gateway-storage" }
nym-gateway-stats-storage = { version = "1.21.0", path = "common/gateway-stats-storage" }
nym-group-contract-common = { version = "1.21.0", path = "common/cosmwasm-smart-contracts/group-contract" }
nym-http-api-client = { version = "1.21.0", path = "common/http-api-client" }
nym-http-api-client-macro = { version = "1.21.0", path = "common/http-api-client-macro" }
nym-http-api-common = { version = "1.21.0", path = "common/http-api-common", default-features = false }
nym-id = { version = "1.21.0", path = "common/nym-id" }
nym-ip-packet-client = { version = "1.21.0", path = "nym-ip-packet-client" }
nym-ip-packet-requests = { version = "1.21.0", path = "common/ip-packet-requests" }
nym-lp = { version = "1.21.0", path = "common/nym-lp" }
nym-lp-data = { version = "1.21.0", path = "common/nym-lp-data" }
nym-kkt = { version = "1.21.0", path = "common/nym-kkt" }
nym-kkt-ciphersuite = { version = "1.21.0", path = "common/nym-kkt-ciphersuite" }
nym-kkt-context = { version = "1.21.0", path = "common/nym-kkt-context" }
nym-metrics = { version = "1.21.0", path = "common/nym-metrics" }
nym-mixnet-client = { version = "1.21.0", path = "common/client-libs/mixnet-client" }
nym-mixnet-contract-common = { version = "1.21.0", path = "common/cosmwasm-smart-contracts/mixnet-contract" }
nym-multisig-contract-common = { version = "1.21.0", path = "common/cosmwasm-smart-contracts/multisig-contract" }
nym-network-defaults = { version = "1.21.0", path = "common/network-defaults" }
nym-node-tester-utils = { version = "1.21.0", path = "common/node-tester-utils" }
nym-noise = { version = "1.21.0", path = "common/nymnoise" }
nym-noise-keys = { version = "1.21.0", path = "common/nymnoise/keys" }
nym-nonexhaustive-delayqueue = { version = "1.21.0", path = "common/nonexhaustive-delayqueue" }
nym-node-requests = { version = "1.21.0", path = "nym-node/nym-node-requests", default-features = false }
nym-node-metrics = { version = "1.21.0", path = "nym-node/nym-node-metrics" }
nym-node-families-contract-common = { version = "1.21.0", path = "common/cosmwasm-smart-contracts/node-families-contract" }
nym-ordered-buffer = { version = "1.21.0", path = "common/socks5/ordered-buffer" }
nym-outfox = { version = "1.21.0", path = "nym-outfox" }
nym-registration-common = { version = "1.21.0", path = "common/registration" }
nym-pemstore = { version = "1.21.0", path = "common/pemstore" }
nym-performance-contract-common = { version = "1.21.0", path = "common/cosmwasm-smart-contracts/nym-performance-contract" }
nym-sdk = { version = "1.21.0", path = "sdk/rust/nym-sdk" }
nym-serde-helpers = { version = "1.21.0", path = "common/serde-helpers" }
nym-service-providers-common = { version = "1.21.0", path = "service-providers/common" }
nym-service-provider-requests-common = { version = "1.21.0", path = "common/service-provider-requests-common" }
nym-socks5-client-core = { version = "1.21.0", path = "common/socks5-client-core" }
nym-socks5-proxy-helpers = { version = "1.21.0", path = "common/socks5/proxy-helpers" }
nym-socks5-requests = { version = "1.21.0", path = "common/socks5/requests" }
nym-sphinx = { version = "1.21.0", path = "common/nymsphinx" }
nym-sphinx-acknowledgements = { version = "1.21.0", path = "common/nymsphinx/acknowledgements" }
nym-sphinx-addressing = { version = "1.21.0", path = "common/nymsphinx/addressing" }
nym-sphinx-anonymous-replies = { version = "1.21.0", path = "common/nymsphinx/anonymous-replies" }
nym-sphinx-chunking = { version = "1.21.0", path = "common/nymsphinx/chunking" }
nym-sphinx-cover = { version = "1.21.0", path = "common/nymsphinx/cover" }
nym-sphinx-forwarding = { version = "1.21.0", path = "common/nymsphinx/forwarding" }
nym-sphinx-framing = { version = "1.21.0", path = "common/nymsphinx/framing" }
nym-sphinx-params = { version = "1.21.0", path = "common/nymsphinx/params" }
nym-sphinx-routing = { version = "1.21.0", path = "common/nymsphinx/routing" }
nym-sphinx-types = { version = "1.21.0", path = "common/nymsphinx/types" }
nym-statistics-common = { version = "1.21.0", path = "common/statistics" }
nym-store-cipher = { version = "1.21.0", path = "common/store-cipher" }
nym-task = { version = "1.21.0", path = "common/task" }
nym-tun = { version = "1.21.0", path = "common/tun" }
nym-test-utils = { version = "1.21.0", path = "common/test-utils" }
nym-ticketbooks-merkle = { version = "1.21.0", path = "common/ticketbooks-merkle" }
nym-topology = { version = "1.21.0", path = "common/topology" }
nym-types = { version = "1.21.0", path = "common/types" }
nym-upgrade-mode-check = { version = "1.21.0", path = "common/upgrade-mode-check" }
nym-validator-client = { version = "1.21.0", path = "common/client-libs/validator-client", default-features = false }
nym-vesting-contract-common = { version = "1.21.0", path = "common/cosmwasm-smart-contracts/vesting-contract" }
nym-verloc = { version = "1.21.0", path = "common/verloc" }
nym-wireguard = { version = "1.21.0", path = "common/wireguard" }
nym-wireguard-types = { version = "1.21.0", path = "common/wireguard-types" }
nym-wireguard-private-metadata-shared = { version = "1.21.0", path = "common/wireguard-private-metadata/shared" }
nym-wireguard-private-metadata-client = { version = "1.21.0", path = "common/wireguard-private-metadata/client" }
nym-wireguard-private-metadata-server = { version = "1.21.0", path = "common/wireguard-private-metadata/server" }
nym-sqlx-pool-guard = { version = "1.2.0", path = "nym-sqlx-pool-guard" }
nym-wasm-client-core = { version = "1.20.4", path = "common/wasm/client-core" }
nym-wasm-storage = { version = "1.20.4", path = "common/wasm/storage" }
nym-wasm-utils = { version = "1.20.4", path = "common/wasm/utils", default-features = false }
nyxd-scraper-shared = { version = "1.20.4", path = "common/nyxd-scraper-shared" }
nym-wasm-client-core = { version = "1.21.0", path = "common/wasm/client-core" }
nym-wasm-storage = { version = "1.21.0", path = "common/wasm/storage" }
nym-wasm-utils = { version = "1.21.0", path = "common/wasm/utils", default-features = false }
nyxd-scraper-shared = { version = "1.21.0", path = "common/nyxd-scraper-shared" }
smolmix = { version = "1.21.0", path = "smolmix/core" }
# coconut/DKG related
# unfortunately until https://github.com/zkcrypto/nym-bls12_381-fork/issues/10 is resolved, we have to rely on the fork
@@ -616,3 +623,4 @@ exit = "deny"
panic = "deny"
unimplemented = "deny"
unreachable = "deny"
@@ -0,0 +1,42 @@
# Mitigation playbook for CopyFail (CVE-2026-31431) and DirtyFrag (CVE-2026-43284 / CVE-2026-43500)
# This playbook applies interim module blacklists only
# Kernel patches are not yet available (May 2026)
# Once patched kernels ship, use remove_kernel_CVE_mitigations.yml to reverse everything
# This playbook is idempotent - safe to re-run if mitigations were already applied
- name: Mitigate Copy Fail + Dirty Frag
hosts: all
become: true
tasks:
- name: Blacklist algif_aead (Copy Fail)
copy:
dest: /etc/modprobe.d/disable-algif_aead.conf
content: "install algif_aead /bin/false\n"
owner: root
group: root
mode: "0644"
- name: Blacklist esp4, esp6, rxrpc (Dirty Frag)
copy:
dest: /etc/modprobe.d/dirtyfrag.conf
content: |
install esp4 /bin/false
install esp6 /bin/false
install rxrpc /bin/false
owner: root
group: root
mode: "0644"
- name: Unload all affected modules
modprobe:
name: "{{ item }}"
state: absent
loop:
- algif_aead
- esp4
- esp6
- rxrpc
ignore_errors: true
- name: Drop page cache to clear any contamination
shell: echo 3 > /proc/sys/vm/drop_caches
@@ -0,0 +1,111 @@
############################################################################################
############################################################################################
############################################################################################
#### THIS PLAYBOOK IS NOT MEANT TO BE RUN YET, IT IS NOT REFERRED IN ANY DOCUMENTATION! ####
############################################################################################
############################################################################################
############################################################################################
#
# Reversal playbook for mitigate_kernel_CVE.yml (CopyFail CVE-2026-31431 / DirtyFrag CVE-2026-43284 / CVE-2026-43500).
#
# Run this AFTER your distro has shipped the patched kernel.
# This playbook:
# 1. Updates the kernel via apt
# 2. Reboots and waits for reconnect
# 3. Verifies the running kernel is newer than the pre-patch version
# 4. Removes the interim module blacklists
# 5. Re-enables the affected modules live (no second reboot needed)
#
# Debian family only (Debian, Ubuntu). Tested on Debian 11, Debian 12, Ubuntu 20.04, 22.04, 24.04.
#
# For exit-gateway nodes with --wireguard-enabled true:
# After this playbook completes, run the networking restore step on each node via:
# ansible-playbook deploy.yml -t ntm
# See the CVE patch documentation for details.
- name: Remove CVE mitigations and apply patched kernel
hosts: all
become: true
tasks:
- name: Verify OS is Debian family
assert:
that:
- ansible_os_family == "Debian"
fail_msg: "This playbook supports Debian-family distros only (Debian, Ubuntu). For other distros, apply the kernel update and mitigation removal manually."
- name: Update apt cache
apt:
update_cache: true
cache_valid_time: 0
- name: Upgrade kernel packages
apt:
upgrade: full
only_upgrade: false
register: apt_upgrade_result
- name: Record pre-reboot kernel version
command: uname -r
register: kernel_before
changed_when: false
- name: Reboot to load patched kernel
reboot:
msg: "Rebooting to apply patched kernel (CVE-2026-31431 / CVE-2026-43284 / CVE-2026-43500)"
reboot_timeout: 300
pre_reboot_delay: 5
post_reboot_delay: 15
- name: Record post-reboot kernel version
command: uname -r
register: kernel_after
changed_when: false
- name: Show kernel versions before and after reboot
debug:
msg:
- "Kernel before reboot: {{ kernel_before.stdout }}"
- "Kernel after reboot: {{ kernel_after.stdout }}"
- name: Warn if kernel did not change after reboot
debug:
msg: >
WARNING: kernel version did not change after reboot ({{ kernel_after.stdout }}).
The patched kernel may not have been selected by GRUB, or no kernel update was available.
Do NOT remove the interim mitigations until you have confirmed the running kernel is patched.
Check: apt-cache policy linux-image-amd64 # Debian
Check: apt-cache policy linux-image-generic # Ubuntu
when: kernel_before.stdout == kernel_after.stdout
- name: Remove algif_aead blacklist
file:
path: /etc/modprobe.d/disable-algif_aead.conf
state: absent
- name: Remove DirtyFrag blacklist (esp4, esp6, rxrpc)
file:
path: /etc/modprobe.d/dirtyfrag.conf
state: absent
- name: Re-enable affected modules live
modprobe:
name: "{{ item }}"
state: present
loop:
- esp4
- esp6
- rxrpc
- algif_aead
ignore_errors: true
- name: Confirm nym-node service is still running
systemd:
name: nym-node
state: started
register: nym_node_status
failed_when: false
- name: Show nym-node status
debug:
msg: "nym-node service state: {{ nym_node_status.state | default('unknown - service may not exist on this node') }}"
+2
View File
@@ -472,6 +472,7 @@ impl Handler {
fn prepare_reconstructed_binary(
reconstructed_messages: Vec<ReconstructedMessage>,
) -> Vec<Result<WsMessage, WsError>> {
#[allow(clippy::result_large_err)] // TODO : remove this once tungstenite is updated
reconstructed_messages
.into_iter()
.map(ServerResponse::Received)
@@ -484,6 +485,7 @@ fn prepare_reconstructed_binary(
fn prepare_reconstructed_text(
reconstructed_messages: Vec<ReconstructedMessage>,
) -> Vec<Result<WsMessage, WsError>> {
#[allow(clippy::result_large_err)] // TODO : remove this once tungstenite is updated
reconstructed_messages
.into_iter()
.map(ServerResponse::Received)
+1
View File
@@ -60,6 +60,7 @@ nym-client-core-surb-storage = { workspace = true }
nym-client-core-gateways-storage = { workspace = true }
nym-ecash-time = { workspace = true }
nym-mixnet-contract-common = { workspace = true }
nym-lp-data = { workspace = true }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
nym-mixnet-client = { workspace = true }
@@ -11,6 +11,8 @@ use crate::client::event_control::EventControl;
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::client::lp::data::LpDataSetup;
use crate::client::lp::data::shared::SharedLpDataState;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
use crate::client::real_messages_control;
@@ -636,7 +638,6 @@ where
{
Err(ClientCoreError::CustomGatewaySelectionExpected)
} else {
// and make sure to invalidate the task client, so we wouldn't cause premature shutdown
custom_gateway_transceiver.set_packet_router(packet_router)?;
Ok(custom_gateway_transceiver)
};
@@ -817,6 +818,24 @@ where
(mix_tx, client_tx)
}
#[allow(dead_code)]
fn build_lp_data_tasks(
config: &Config,
encryption_keys: Arc<x25519::KeyPair>,
identity_keys: Arc<ed25519::KeyPair>,
input_receiver: InputMessageReceiver,
shutdown_tracker: &ShutdownTracker,
) -> Result<LpDataSetup, ClientCoreError> {
let shared_state = SharedLpDataState::new(
config.debug,
encryption_keys,
identity_keys,
shutdown_tracker.clone_shutdown_token(),
);
LpDataSetup::new(shared_state, input_receiver, shutdown_tracker.clone())
}
// TODO: rename it as it implies the data is persistent whilst one can use InMemBackend
async fn setup_persistent_reply_storage(
backend: S::ReplyStore,
@@ -1063,12 +1082,27 @@ where
)
.await?;
// SW keep all the above
// LP Data channel
// let lp_data_tasks = Self::build_lp_data_tasks(
// &self.config,
// encryption_keys.clone(),
// identity_keys.clone(),
// input_receiver,
// &shutdown_tracker.clone(),
// )?;
// lp_data_tasks.start_tasks();
// SW Piping between inbound and outbound
let gateway_packet_router = PacketRouter::new(
ack_sender,
mixnet_messages_sender,
shutdown_tracker.clone_shutdown_token(),
);
// SW this needs to become the IO handler
let gateway_transceiver = Self::setup_gateway_transceiver(
self.custom_gateway_transceiver,
&self.config,
@@ -1090,6 +1124,7 @@ where
)
.await?;
// SW turn into inbound pipeline
Self::start_received_messages_buffer_controller(
encryption_keys,
received_buffer_request_receiver,
@@ -1100,6 +1135,8 @@ where
&shutdown_tracker.clone(),
);
// SW the rest below is outbound pipeline
// The message_sender is the transmitter for any component generating sphinx packets
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
@@ -0,0 +1,52 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_lp_data::packet::frame::LpFrameKind;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddressError;
use nym_sphinx::forwarding::packet::MixPacketFormattingError;
use nym_sphinx::framing::processing::PacketProcessingError;
use nym_sphinx::{OutfoxError, SphinxError};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum LpDataHandlerError {
#[error(transparent)]
PacketFormattingError(#[from] MixPacketFormattingError),
#[error(transparent)]
PacketProcessingError(#[from] PacketProcessingError),
#[error(transparent)]
NymNodeRoutingAddressError(#[from] NymNodeRoutingAddressError),
#[error("failed to process received sphinx packet: {0}")]
SphinxProcessingError(#[from] SphinxError),
#[error("failed to process received outfox packet: {0}")]
OutfoxProcessingError(#[from] OutfoxError),
#[error("received payload type of an unexpected type: {typ:?}")]
UnexpectedLpPayload { typ: LpFrameKind },
#[error("received an Lp Frame kind that we don't support: {typ:?}")]
UnsupportedLpFrameKind { typ: LpFrameKind },
#[error("unwrapped a packet into a forward hop packet. This is no longer supported")]
ForwardHop,
#[error("{0}")]
Internal(String),
#[error("{0}")]
Other(String),
}
impl LpDataHandlerError {
pub fn internal(message: impl Into<String>) -> Self {
LpDataHandlerError::Internal(message.into())
}
pub fn other(message: impl Into<String>) -> Self {
LpDataHandlerError::Other(message.into())
}
}
@@ -0,0 +1,56 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_lp_data::packet::frame::{LpFrameAttributes, LpFrameHeader, LpFrameKind};
use nym_sphinx::forwarding::packet::MixPacketFormattingError;
use nym_sphinx::params::SphinxKeyRotation;
use crate::client::lp::data::handler::error::LpDataHandlerError;
/// Message types supported by clients
#[derive(Debug, Clone, Copy)]
pub enum ClientMessage {
Sphinx(SphinxMessage),
Outfox(OutfoxMessage),
}
impl ClientMessage {
pub fn from_frame_header(header: LpFrameHeader) -> Result<Self, LpDataHandlerError> {
match header.kind {
LpFrameKind::SphinxPacket => {
Ok(ClientMessage::Sphinx(header.frame_attributes.try_into()?))
}
LpFrameKind::OutfoxPacket => {
Ok(ClientMessage::Outfox(header.frame_attributes.try_into()?))
}
_ => Err(LpDataHandlerError::UnsupportedLpFrameKind { typ: header.kind }),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct SphinxMessage {
pub key_rotation: SphinxKeyRotation,
}
impl TryFrom<LpFrameAttributes> for SphinxMessage {
type Error = LpDataHandlerError;
fn try_from(value: LpFrameAttributes) -> Result<Self, Self::Error> {
let key_rotation = value[0]
.try_into()
.map_err(MixPacketFormattingError::InvalidKeyRotation)?;
Ok(SphinxMessage { key_rotation })
}
}
impl From<SphinxMessage> for LpFrameAttributes {
fn from(value: SphinxMessage) -> Self {
let mut attrs = [0; 14];
attrs[0] = value.key_rotation as u8;
attrs
}
}
// For now there are no differences. We can augment this variant when we will need it
pub type OutfoxMessage = SphinxMessage;
@@ -0,0 +1,216 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::inbound_messages::InputMessageReceiver;
use crate::client::lp::LpDataHandlerError;
use crate::client::lp::data::PACKET_BUFFER_SIZE;
use crate::client::lp::data::shared::SharedLpDataState;
use nym_lp_data::clients::traits::ClientUnwrappingPipeline;
use nym_lp_data::common::traits::TransportUnwrap;
use nym_lp_data::packet::{EncryptedLpPacket, MalformedLpPacketError};
use nym_lp_data::{AddressedTimedData, TimedData};
use std::sync::{Arc, mpsc};
use std::time::Instant;
use std::{net::SocketAddr, time::Duration};
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::interval;
use tracing::*;
pub mod error;
pub mod messages;
pub mod pipeline;
mod processing;
const PIPELINE_TICKING_DURATION: Duration = Duration::from_millis(1);
/// Bounded queue depth in front of each worker; keeps memory bounded under
/// bursty load and provides drop-based backpressure.
const WORKER_QUEUE_DEPTH: usize = 128;
type WorkerOutput = Result<Option<Vec<u8>>, MalformedLpPacketError>;
/// LP Data Handler for UDP data plane, acts as a pipeline driver and buffer
/// for delaying packets. Heavy per-packet processing is fanned out across a
/// pool of worker threads spawned on the shared blocking pool tracked by the
/// surrounding [`nym_task::ShutdownTracker`].
pub struct LpDataHandler {
/// Shared state
shared_state: Arc<SharedLpDataState>,
// Outbound pipeline
/// Channel to receive data for the outbound pipeline
outbound_input_rx: InputMessageReceiver,
/// Buffer for outbound packet
outbound_pkt_buffer: Vec<AddressedTimedData<EncryptedLpPacket>>,
/// Channel to send outgoing data from the outbound pipeline
outbound_output_tx: tokio::sync::mpsc::Sender<(EncryptedLpPacket, SocketAddr)>,
// Inbound pipeline
/// Channel to receive incoming data for the inbound pipeline
inbound_input_rx: mpsc::Receiver<EncryptedLpPacket>,
/// Per-worker job queues (round-robin dispatch).
worker_input_txs: Vec<mpsc::SyncSender<TimedData<EncryptedLpPacket>>>,
/// Aggregated processed packets returned by the workers. (Inbound data)
worker_output_rx: mpsc::Receiver<WorkerOutput>,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
}
impl LpDataHandler {
pub(crate) fn new(
shared_state: Arc<SharedLpDataState>,
outbound_input_rx: InputMessageReceiver,
outbound_output_tx: tokio::sync::mpsc::Sender<(EncryptedLpPacket, SocketAddr)>,
inbound_input_rx: mpsc::Receiver<EncryptedLpPacket>,
// SW TODO : inbound output (worker_output_rx)
shutdown_tracker: &nym_task::ShutdownTracker,
) -> Result<Self, LpDataHandlerError> {
let (worker_output_tx, worker_output_rx) = mpsc::sync_channel(PACKET_BUFFER_SIZE);
// Allow at least one worker, even if the config says 0
let worker_count = 4; // SW Put that in the config
// Create workers. They will stop naturally when worker_output_rx is dropped.
// The mode is decided once here; each closure picks the right pipeline type so
// the worker loop monomorphizes against a single concrete pipeline.
let worker_input_txs = (0..worker_count)
.map(|_| {
let (worker_input_tx, _worker_input_rx) = mpsc::sync_channel(WORKER_QUEUE_DEPTH);
let _worker_state = shared_state.clone();
let _worker_output = worker_output_tx.clone();
shutdown_tracker.spawn_blocking(move || {
// Instantiat pipeline
todo!()
//Self::run_worker(pipeline, worker_input_rx, worker_output);
});
worker_input_tx
})
.collect();
Ok(Self {
shared_state,
outbound_input_rx,
outbound_pkt_buffer: Vec::new(),
outbound_output_tx,
inbound_input_rx,
worker_input_txs,
worker_output_rx,
shutdown: shutdown_tracker.clone_shutdown_token(),
})
}
pub async fn run(&mut self) {
info!(
workers = self.worker_input_txs.len(),
"Starting LP data handler"
);
let mut ticking_interval = interval(PIPELINE_TICKING_DURATION);
let mut next_worker = 0;
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
info!("LP data handler: received shutdown signal");
break;
}
timestamp = ticking_interval.tick() => {
let std_timestamp: Instant = timestamp.into();
// Drain processed packets returned by workers.
while let Ok(processing_result) = self.worker_output_rx.try_recv() {
match processing_result {
Ok(_packets) => {
// Dispatch to application
todo!()
},
Err(e) => {
warn!("LP data worker: error processing packet : {e}");
},
}
}
// Dispatch incoming packets to workers.
while let Ok(input) = self.inbound_input_rx.try_recv() {
next_worker = self.dispatch_to_workers(
TimedData::new(std_timestamp, input),
next_worker,
);
}
// Run outbound pipeline
while let Ok(_input) = self.outbound_input_rx.try_recv() {
// Run outbound pipeline and stack result in outbound_pkt_buffer
todo!()
}
// Send packets that needs sending
for pkt in self.outbound_pkt_buffer.extract_if(.., |p| p.data.timestamp <= std_timestamp) {
if let Err(e) = self.outbound_output_tx.try_send((pkt.data.data, pkt.dst)) {
match e {
TrySendError::Full(_) => {
warn!("LP data handler: packet sending buffer is full, the client might be overloaded");
},
TrySendError::Closed(_) => {
break;
},
}
}
}
}
}
}
// Workers will stop because we are dropping the receiving channel
info!("LP data handler shutdown complete");
}
/// Round-robin dispatch a job across worker queues. If the chosen worker is
/// full, fall through to the next one; if all are saturated, drop the packet
/// (UDP-style) and bump a metric. Returns the worker index to start from on
/// the next dispatch.
fn dispatch_to_workers(&self, mut job: TimedData<EncryptedLpPacket>, start: usize) -> usize {
let n = self.worker_input_txs.len();
for offset in 0..n {
let idx = (start + offset) % n;
match self.worker_input_txs[idx].try_send(job) {
Ok(()) => return (idx + 1) % n,
Err(mpsc::TrySendError::Full(returned)) => {
job = returned;
}
Err(mpsc::TrySendError::Disconnected(returned)) => {
error!(
"LP data worker {idx} disconnected; this shouldn't happen outside of shut down"
);
job = returned;
}
}
}
warn!("LP data handler: all workers saturated, dropping packet");
start
}
fn run_worker<P>(
mut pipeline: P,
input_rx: mpsc::Receiver<TimedData<EncryptedLpPacket>>,
output_tx: mpsc::SyncSender<WorkerOutput>,
) where
P: ClientUnwrappingPipeline<EncryptedLpPacket, ()> // SW fill in message kind
+ TransportUnwrap<EncryptedLpPacket, Error = MalformedLpPacketError>, // This is needed to specify the error type
{
while let Ok(input) = input_rx.recv() {
// Blocking is fine, we don't want to unclog ourself and process a new packet that will be dropped anyway
if let Err(e) = output_tx.send(pipeline.unwrap(input.data, input.timestamp)) {
trace!(
"Failed to send processing data back to handler : {e}. We are probably shutting down"
);
return;
}
}
}
}
@@ -0,0 +1,4 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// TODO
@@ -0,0 +1,5 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub(crate) mod outfox;
pub(crate) mod sphinx;
@@ -0,0 +1,37 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_lp_data::TimedPayload;
use nym_sphinx::OutfoxPacket;
use tracing::warn;
use crate::client::lp::data::{
handler::{error::LpDataHandlerError, messages::OutfoxMessage},
shared::SharedLpDataState,
};
pub(crate) fn process(
shared_state: &SharedLpDataState,
outfox_packet: TimedPayload,
_metadata: OutfoxMessage,
) -> Result<TimedPayload, LpDataHandlerError> {
let TimedPayload {
data: outfox_bytes,
timestamp: arrival_timestamp,
} = outfox_packet;
let mut outfox_packet = OutfoxPacket::try_from(outfox_bytes.as_slice())?;
let _next_address =
outfox_packet.decode_next_layer(shared_state.encryption_keys.private_key().as_ref())?;
if outfox_packet.is_final_hop() {
Ok(TimedPayload::new(
arrival_timestamp,
outfox_packet.payload().to_vec(),
))
} else {
warn!("Dropping forward hop packet in a client");
Err(LpDataHandlerError::ForwardHop)
}
}
@@ -0,0 +1,39 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_lp_data::TimedPayload;
use nym_sphinx::{ProcessedPacketData, SphinxPacket};
use tracing::warn;
use crate::client::lp::data::{
handler::{error::LpDataHandlerError, messages::SphinxMessage},
shared::SharedLpDataState,
};
pub(crate) fn process(
shared_state: &SharedLpDataState,
sphinx_packet: TimedPayload,
_metadata: SphinxMessage,
) -> Result<TimedPayload, LpDataHandlerError> {
let TimedPayload {
data: sphinx_bytes,
timestamp: arrival_timestamp,
} = sphinx_packet;
let sphinx_packet = SphinxPacket::from_bytes(&sphinx_bytes)?;
// Final processing
let processed_packet =
sphinx_packet.process(shared_state.encryption_keys.private_key().as_ref())?;
match processed_packet.data {
ProcessedPacketData::ForwardHop { .. } => {
warn!("Dropping forward hop packet in a client");
Err(LpDataHandlerError::ForwardHop)
}
ProcessedPacketData::FinalHop { payload, .. } => Ok(TimedPayload::new(
arrival_timestamp,
payload.recover_plaintext()?,
)),
}
}
@@ -0,0 +1,107 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::lp::data::MAX_UDP_PACKET_SIZE;
use crate::client::lp::data::shared::SharedLpDataState;
use crate::error::ClientCoreError;
use nym_lp_data::packet::EncryptedLpPacket;
use std::net::SocketAddr;
use std::sync::{Arc, mpsc, mpsc::TrySendError};
use tokio::net::UdpSocket;
use tracing::log::warn;
use tracing::{error, info};
/// LP UDP listener that accepts TCP connections on port 51264 (by default)
pub(crate) struct LpDataListener {
/// Shared state
shared_state: Arc<SharedLpDataState>,
/// Channel to send incoming data to the processing pipeline
inbound_input_tx: mpsc::SyncSender<EncryptedLpPacket>,
// This has to be a tokio channel, to be async and bounded
/// Channel to receive outgoing data from the processling pipeline
outbound_output_rx: tokio::sync::mpsc::Receiver<(EncryptedLpPacket, SocketAddr)>,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
}
impl LpDataListener {
pub fn new(
shared_state: Arc<SharedLpDataState>,
inbound_input_tx: mpsc::SyncSender<EncryptedLpPacket>,
outbound_output_rx: tokio::sync::mpsc::Receiver<(EncryptedLpPacket, SocketAddr)>,
shutdown: nym_task::ShutdownToken,
) -> Self {
Self {
shared_state,
inbound_input_tx,
outbound_output_rx,
shutdown,
}
}
pub async fn run(&mut self) -> Result<(), ClientCoreError> {
let socket = UdpSocket::bind("[::]:0").await.map_err(|source| {
error!("Failed to bind LP data socket: {source}");
ClientCoreError::LpBindFailure { source }
})?;
info!("Started LP data socket on {}", socket.local_addr()?);
let mut buf = vec![0u8; MAX_UDP_PACKET_SIZE];
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
info!("LP data listener: received shutdown signal");
break;
}
result = self.outbound_output_rx.recv() => {
match result {
Some((payload, dst_addr)) => {
if let Err(e) = socket.send_to(&payload.to_bytes(), dst_addr).await {
warn!("LP data packet error to {dst_addr}: {e}");
}
}
None => {
warn!("LP outgoing packet channel closed");
break;
}
}
}
result = socket.recv_from(&mut buf) => {
match result {
Ok((len, src_addr)) => {
info!("received {len} bytes from {src_addr} on the LP Data socket");
if let Ok(encrypted_packet) = EncryptedLpPacket::decode(&buf[..len]) {
if let Err(e) = self.inbound_input_tx.try_send(encrypted_packet) {
match e {
TrySendError::Full(_) => {
warn!("LP data listener: packet sending buffer is full, the client might be overloaded");
},
TrySendError::Disconnected(_) => {
warn!("LP data listener: incoming packet channel is closed");
break;
},
}
}
} else {
warn!("Error reading LP packet from wire");
}
}
Err(e) => {
warn!("LP data socket recv error: {e}");
}
}
}
}
}
info!("LP data handler shutdown complete");
Ok(())
}
}
@@ -0,0 +1,103 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// Parking the branch
#![allow(clippy::todo)]
#![allow(dead_code)]
#![allow(clippy::incompatible_msrv)]
use std::sync::{Arc, mpsc};
use crate::client::inbound_messages::InputMessageReceiver;
use crate::client::lp::data::handler::LpDataHandler;
use crate::client::lp::data::listener::LpDataListener;
use crate::client::lp::data::shared::SharedLpDataState;
use crate::error::ClientCoreError;
use nym_task::ShutdownTracker;
use tracing::error;
/// Maximum UDP packet size we'll accept
/// Sphinx packets are typically ~2KB, LP overhead is ~50 bytes, so 4KB is plenty
const MAX_UDP_PACKET_SIZE: usize = 4096;
pub(crate) const PACKET_BUFFER_SIZE: usize = 100;
pub mod handler;
mod listener;
pub mod shared;
pub struct LpDataSetup {
listener: LpDataListener,
handler: LpDataHandler,
/// Shutdown coordination
shutdown: ShutdownTracker,
}
impl LpDataSetup {
pub(crate) fn new(
shared_state: SharedLpDataState,
outbound_input_rx: InputMessageReceiver,
shutdown: ShutdownTracker,
) -> Result<Self, ClientCoreError> {
let (inbound_input_tx, inbound_input_rx) = mpsc::sync_channel(PACKET_BUFFER_SIZE);
let (outbound_output_tx, outbound_output_rx) =
tokio::sync::mpsc::channel(PACKET_BUFFER_SIZE);
let shared_state = Arc::new(shared_state);
let listener = LpDataListener::new(
shared_state.clone(),
inbound_input_tx,
outbound_output_rx,
shutdown.clone_shutdown_token(),
);
let handler = LpDataHandler::new(
shared_state,
outbound_input_rx,
outbound_output_tx,
inbound_input_rx,
&shutdown,
)?;
Ok(LpDataSetup {
listener,
handler,
shutdown,
})
}
pub fn start_tasks(mut self) {
// Spawn the UDP data handler for LP data plane
// The data handler listens on UDP port 51264 and processes LP-wrapped Sphinx packets
// from registered clients. It decrypts the LP layer and forwards the Sphinx packets
let shutdown_token = self.shutdown.clone_shutdown_token();
let mut listener = self.listener;
self.shutdown.try_spawn_named(
async move {
if let Err(err) = listener.run().await {
shutdown_token.cancel();
error!("LP data listener error: {err}");
}
},
"LP::LpDataListener",
);
self.shutdown
.try_spawn_named(async move { self.handler.run().await }, "LP::LpDataHandler");
}
}
#[cfg(test)]
mod tests {
use super::*;
// Sphinx packets are typically around 2KB
// 4KB should be plenty with room to spare
const _: () = {
assert!(MAX_UDP_PACKET_SIZE >= 2048 + 100);
};
}
@@ -0,0 +1,38 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use nym_client_core_config_types::DebugConfig;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_lp_data::fragmentation::reconstruction::MessageReconstructor;
use nym_task::ShutdownToken;
/// Shared state for LP data plane
pub struct SharedLpDataState {
pub(crate) config: DebugConfig,
pub(crate) encryption_keys: Arc<x25519::KeyPair>,
pub(crate) identity_keys: Arc<ed25519::KeyPair>,
pub(crate) message_reconstructor: MessageReconstructor,
pub(crate) shutdown_token: ShutdownToken,
}
impl SharedLpDataState {
pub(crate) fn new(
config: DebugConfig,
encryption_keys: Arc<x25519::KeyPair>,
identity_keys: Arc<ed25519::KeyPair>,
shutdown_token: ShutdownToken,
) -> Self {
SharedLpDataState {
config,
encryption_keys,
identity_keys,
message_reconstructor: Default::default(),
shutdown_token,
}
}
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub use data::handler::error::LpDataHandlerError;
pub mod data;
+1
View File
@@ -7,6 +7,7 @@ pub(crate) mod event_control;
pub(crate) mod helpers;
pub mod inbound_messages;
pub mod key_manager;
pub mod lp;
pub mod mix_traffic;
pub mod real_messages_control;
pub mod received_buffer;
@@ -439,7 +439,7 @@ where
let mut pending_acks = Vec::with_capacity(fragments.len());
let mut to_forward: HashMap<_, Vec<_>> = HashMap::new();
for (raw, prepared) in fragments.into_iter().zip(prepared_fragments.into_iter()) {
for (raw, prepared) in fragments.into_iter().zip(prepared_fragments) {
let lane = raw.0;
let FragmentWithMaxRetransmissions {
fragment,
@@ -670,7 +670,7 @@ where
Ok(fragments
.into_iter()
.zip(reply_surbs.into_iter())
.zip(reply_surbs)
.map(|(fragment, reply_surb)| {
// unwrap here is fine as we know we have a valid topology
#[allow(clippy::unwrap_used)]
+7
View File
@@ -1,6 +1,7 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::lp::LpDataHandlerError;
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
@@ -263,6 +264,12 @@ pub enum ClientCoreError {
#[error("Could not access task registry, {0}")]
RegistryAccess(#[from] RegistryAccessError),
#[error("failed to bind LP UDP socket: {source}")]
LpBindFailure { source: std::io::Error },
#[error(transparent)]
LpFailure(#[from] LpDataHandlerError),
}
impl From<tungstenite::Error> for ClientCoreError {
@@ -5,6 +5,7 @@ use dashmap::DashMap;
use futures::StreamExt;
use nym_noise::config::NoiseConfig;
use nym_noise::upgrade_noise_initiator;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
@@ -309,7 +310,13 @@ impl Client {
impl SendWithoutResponse for Client {
fn send_without_response(&self, packet: MixPacket) -> io::Result<()> {
let address = packet.next_hop_address();
let address = match packet.next_hop() {
NymNodeRoutingAddress::Client(_) => {
warn!("mix packet addressed to a client in the legacy send_without_response path. This should never happen!");
return Ok(());
}
NymNodeRoutingAddress::Node(address) => address,
};
trace!("Sending packet to {address}");
// TODO: optimisation for the future: rather than constantly using legacy encoding,
@@ -26,6 +26,7 @@ nym-ecash-contract-common = { workspace = true }
nym-multisig-contract-common = { workspace = true }
nym-group-contract-common = { workspace = true }
nym-performance-contract-common = { workspace = true }
nym-node-families-contract-common = { workspace = true }
nym-serde-helpers = { workspace = true, features = ["hex", "base64"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
@@ -15,6 +15,7 @@ use nym_api_requests::ecash::models::{
VerifyEcashTicketBody,
};
use nym_api_requests::ecash::VerificationKeyResponse;
use nym_api_requests::models::node_families::NodeFamily;
use nym_api_requests::models::{
AnnotationResponse, ApiHealthResponse, BinaryBuildInformationOwned, ChainBlocksStatusResponse,
ChainStatusResponse, KeyRotationInfoResponse, NodePerformanceResponse, NodeRefreshBody,
@@ -389,6 +390,45 @@ pub trait NymApiClientExt: ApiClient {
Ok(bonds)
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_node_families(
&self,
page: Option<u32>,
per_page: Option<u32>,
) -> Result<PaginatedResponse<NodeFamily>, NymAPIError> {
let mut params = Vec::new();
if let Some(page) = page {
params.push(("page", page.to_string()))
}
if let Some(per_page) = per_page {
params.push(("per_page", per_page.to_string()))
}
self.get_json(
&[routes::V1_API_VERSION, routes::NODE_FAMILIES_ROUTES],
&params,
)
.await
}
async fn get_all_node_families(&self) -> Result<Vec<NodeFamily>, NymAPIError> {
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
let mut page = 0;
let mut families = Vec::new();
loop {
let mut res = self.get_node_families(Some(page), None).await?;
families.append(&mut res.data);
if families.len() < res.pagination.total {
page += 1
} else {
break;
}
}
Ok(families)
}
#[deprecated]
#[tracing::instrument(level = "debug", skip_all)]
async fn get_basic_mixnodes(&self) -> Result<CachedNodesResponse<SkimmedNodeV1>, NymAPIError> {
@@ -38,6 +38,7 @@ pub mod ecash {
}
pub const NYM_NODES_ROUTES: &str = "nym-nodes";
pub const NODE_FAMILIES_ROUTES: &str = "node-families";
pub use nym_nodes::*;
pub mod nym_nodes {
@@ -867,6 +867,10 @@ mod tests {
MixnetExecuteMsg::TestingResolveAllPendingEvents { .. } => {
client.testing_resolve_all_pending_events(None).ignore()
}
// not expected to be exposed by the client
ExecuteMsg::AdminMigrateVestedMixNode { .. }
| ExecuteMsg::AdminMigrateVestedDelegation { .. }
| ExecuteMsg::AdminBatchMigrateVestedDelegations { .. } => ().ignore(),
};
}
}
@@ -13,6 +13,7 @@ pub mod ecash_query_client;
pub mod group_query_client;
pub mod mixnet_query_client;
pub mod multisig_query_client;
pub mod node_families_query_client;
pub mod performance_query_client;
pub mod vesting_query_client;
@@ -22,6 +23,7 @@ pub mod ecash_signing_client;
pub mod group_signing_client;
pub mod mixnet_signing_client;
pub mod multisig_signing_client;
pub mod node_families_signing_client;
pub mod performance_signing_client;
pub mod vesting_signing_client;
@@ -31,6 +33,7 @@ pub use ecash_query_client::{EcashQueryClient, PagedEcashQueryClient};
pub use group_query_client::{GroupQueryClient, PagedGroupQueryClient};
pub use mixnet_query_client::{MixnetQueryClient, PagedMixnetQueryClient};
pub use multisig_query_client::{MultisigQueryClient, PagedMultisigQueryClient};
pub use node_families_query_client::{NodeFamiliesQueryClient, PagedNodeFamiliesQueryClient};
pub use performance_query_client::{PagedPerformanceQueryClient, PerformanceQueryClient};
pub use vesting_query_client::{PagedVestingQueryClient, VestingQueryClient};
@@ -40,6 +43,7 @@ pub use ecash_signing_client::EcashSigningClient;
pub use group_signing_client::GroupSigningClient;
pub use mixnet_signing_client::MixnetSigningClient;
pub use multisig_signing_client::MultisigSigningClient;
pub use node_families_signing_client::NodeFamiliesSigningClient;
pub use performance_signing_client::PerformanceSigningClient;
pub use vesting_signing_client::VestingSigningClient;
@@ -49,6 +53,7 @@ pub trait NymContractsProvider {
fn mixnet_contract_address(&self) -> Option<&AccountId>;
fn vesting_contract_address(&self) -> Option<&AccountId>;
fn performance_contract_address(&self) -> Option<&AccountId>;
fn node_families_contract_address(&self) -> Option<&AccountId>;
// coconut-related
fn ecash_contract_address(&self) -> Option<&AccountId>;
@@ -62,6 +67,7 @@ pub struct TypedNymContracts {
pub mixnet_contract_address: Option<AccountId>,
pub vesting_contract_address: Option<AccountId>,
pub performance_contract_address: Option<AccountId>,
pub node_families_contract_address: Option<AccountId>,
pub ecash_contract_address: Option<AccountId>,
pub group_contract_address: Option<AccountId>,
@@ -86,6 +92,10 @@ impl TryFrom<NymContracts> for TypedNymContracts {
.performance_contract_address
.map(|addr| addr.parse())
.transpose()?,
node_families_contract_address: value
.node_families_contract_address
.map(|addr| addr.parse())
.transpose()?,
ecash_contract_address: value
.ecash_contract_address
.map(|addr| addr.parse())
@@ -0,0 +1,441 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::collect_paged;
use crate::nyxd::contract_traits::NymContractsProvider;
use crate::nyxd::error::NyxdError;
use crate::nyxd::CosmWasmClient;
use async_trait::async_trait;
use cosmrs::AccountId;
use serde::Deserialize;
use nym_mixnet_contract_common::NodeId;
pub use nym_node_families_contract_common::{
msg::QueryMsg as NodeFamiliesQueryMsg, AllFamilyMembersPagedResponse,
AllPastFamilyInvitationsPagedResponse, FamiliesPagedResponse, FamilyMemberRecord,
FamilyMembersPagedResponse, GlobalPastFamilyInvitationCursor, NodeFamily,
NodeFamilyByNameResponse, NodeFamilyByOwnerResponse, NodeFamilyId,
NodeFamilyMembershipResponse, NodeFamilyResponse, PastFamilyInvitation,
PastFamilyInvitationCursor, PastFamilyInvitationForNodeCursor,
PastFamilyInvitationsForNodePagedResponse, PastFamilyInvitationsPagedResponse,
PastFamilyMember, PastFamilyMemberCursor, PastFamilyMemberForNodeCursor,
PastFamilyMembersForNodePagedResponse, PastFamilyMembersPagedResponse,
PendingFamilyInvitationDetails, PendingFamilyInvitationResponse,
PendingFamilyInvitationsPagedResponse, PendingInvitationsForNodePagedResponse,
PendingInvitationsPagedResponse,
};
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait NodeFamiliesQueryClient {
async fn query_node_families_contract<T>(
&self,
query: NodeFamiliesQueryMsg,
) -> Result<T, NyxdError>
where
for<'a> T: Deserialize<'a>;
async fn get_family_by_id(
&self,
family_id: NodeFamilyId,
) -> Result<NodeFamilyResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetFamilyById { family_id })
.await
}
async fn get_family_by_owner(
&self,
owner: &AccountId,
) -> Result<NodeFamilyByOwnerResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetFamilyByOwner {
owner: owner.to_string(),
})
.await
}
async fn get_family_by_name(
&self,
name: String,
) -> Result<NodeFamilyByNameResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetFamilyByName { name })
.await
}
async fn get_families_paged(
&self,
start_after: Option<NodeFamilyId>,
limit: Option<u32>,
) -> Result<FamiliesPagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetFamiliesPaged {
start_after,
limit,
})
.await
}
async fn get_family_membership(
&self,
node_id: NodeId,
) -> Result<NodeFamilyMembershipResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetFamilyMembership { node_id })
.await
}
async fn get_family_members_paged(
&self,
family_id: NodeFamilyId,
start_after: Option<NodeId>,
limit: Option<u32>,
) -> Result<FamilyMembersPagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetFamilyMembersPaged {
family_id,
start_after,
limit,
})
.await
}
async fn get_all_family_members_paged(
&self,
start_after: Option<NodeId>,
limit: Option<u32>,
) -> Result<AllFamilyMembersPagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetAllFamilyMembersPaged {
start_after,
limit,
})
.await
}
async fn get_pending_invitation(
&self,
family_id: NodeFamilyId,
node_id: NodeId,
) -> Result<PendingFamilyInvitationResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetPendingInvitation {
family_id,
node_id,
})
.await
}
async fn get_pending_invitations_for_family_paged(
&self,
family_id: NodeFamilyId,
start_after: Option<NodeId>,
limit: Option<u32>,
) -> Result<PendingFamilyInvitationsPagedResponse, NyxdError> {
self.query_node_families_contract(
NodeFamiliesQueryMsg::GetPendingInvitationsForFamilyPaged {
family_id,
start_after,
limit,
},
)
.await
}
async fn get_pending_invitations_for_node_paged(
&self,
node_id: NodeId,
start_after: Option<NodeFamilyId>,
limit: Option<u32>,
) -> Result<PendingInvitationsForNodePagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetPendingInvitationsForNodePaged {
node_id,
start_after,
limit,
})
.await
}
async fn get_all_pending_invitations_paged(
&self,
start_after: Option<(NodeFamilyId, NodeId)>,
limit: Option<u32>,
) -> Result<PendingInvitationsPagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetAllPendingInvitationsPaged {
start_after,
limit,
})
.await
}
async fn get_past_invitations_for_family_paged(
&self,
family_id: NodeFamilyId,
start_after: Option<PastFamilyInvitationCursor>,
limit: Option<u32>,
) -> Result<PastFamilyInvitationsPagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetPastInvitationsForFamilyPaged {
family_id,
start_after,
limit,
})
.await
}
async fn get_past_invitations_for_node_paged(
&self,
node_id: NodeId,
start_after: Option<PastFamilyInvitationForNodeCursor>,
limit: Option<u32>,
) -> Result<PastFamilyInvitationsForNodePagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetPastInvitationsForNodePaged {
node_id,
start_after,
limit,
})
.await
}
async fn get_all_past_invitations_paged(
&self,
start_after: Option<GlobalPastFamilyInvitationCursor>,
limit: Option<u32>,
) -> Result<AllPastFamilyInvitationsPagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetAllPastInvitationsPaged {
start_after,
limit,
})
.await
}
async fn get_past_members_for_family_paged(
&self,
family_id: NodeFamilyId,
start_after: Option<PastFamilyMemberCursor>,
limit: Option<u32>,
) -> Result<PastFamilyMembersPagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetPastMembersForFamilyPaged {
family_id,
start_after,
limit,
})
.await
}
async fn get_past_members_for_node_paged(
&self,
node_id: NodeId,
start_after: Option<PastFamilyMemberForNodeCursor>,
limit: Option<u32>,
) -> Result<PastFamilyMembersForNodePagedResponse, NyxdError> {
self.query_node_families_contract(NodeFamiliesQueryMsg::GetPastMembersForNodePaged {
node_id,
start_after,
limit,
})
.await
}
}
// extension trait to the query client to deal with the paged queries
// (it didn't feel appropriate to combine it with the existing trait)
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait PagedNodeFamiliesQueryClient: NodeFamiliesQueryClient {
async fn get_all_families(&self) -> Result<Vec<NodeFamily>, NyxdError> {
collect_paged!(self, get_families_paged, families)
}
async fn get_all_family_members_for_family(
&self,
family_id: NodeFamilyId,
) -> Result<Vec<FamilyMemberRecord>, NyxdError> {
collect_paged!(self, get_family_members_paged, members, family_id)
}
async fn get_all_family_members(&self) -> Result<Vec<FamilyMemberRecord>, NyxdError> {
collect_paged!(self, get_all_family_members_paged, members)
}
async fn get_all_pending_invitations_for_family(
&self,
family_id: NodeFamilyId,
) -> Result<Vec<PendingFamilyInvitationDetails>, NyxdError> {
collect_paged!(
self,
get_pending_invitations_for_family_paged,
invitations,
family_id
)
}
async fn get_all_pending_invitations_for_node(
&self,
node_id: NodeId,
) -> Result<Vec<PendingFamilyInvitationDetails>, NyxdError> {
collect_paged!(
self,
get_pending_invitations_for_node_paged,
invitations,
node_id
)
}
async fn get_all_pending_invitations(
&self,
) -> Result<Vec<PendingFamilyInvitationDetails>, NyxdError> {
collect_paged!(self, get_all_pending_invitations_paged, invitations)
}
async fn get_all_past_invitations_for_family(
&self,
family_id: NodeFamilyId,
) -> Result<Vec<PastFamilyInvitation>, NyxdError> {
collect_paged!(
self,
get_past_invitations_for_family_paged,
invitations,
family_id
)
}
async fn get_all_past_invitations_for_node(
&self,
node_id: NodeId,
) -> Result<Vec<PastFamilyInvitation>, NyxdError> {
collect_paged!(
self,
get_past_invitations_for_node_paged,
invitations,
node_id
)
}
async fn get_all_past_invitations(&self) -> Result<Vec<PastFamilyInvitation>, NyxdError> {
collect_paged!(self, get_all_past_invitations_paged, invitations)
}
async fn get_all_past_members_for_family(
&self,
family_id: NodeFamilyId,
) -> Result<Vec<PastFamilyMember>, NyxdError> {
collect_paged!(self, get_past_members_for_family_paged, members, family_id)
}
async fn get_all_past_members_for_node(
&self,
node_id: NodeId,
) -> Result<Vec<PastFamilyMember>, NyxdError> {
collect_paged!(self, get_past_members_for_node_paged, members, node_id)
}
}
#[async_trait]
impl<T> PagedNodeFamiliesQueryClient for T where T: NodeFamiliesQueryClient {}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<C> NodeFamiliesQueryClient for C
where
C: CosmWasmClient + NymContractsProvider + Send + Sync,
{
async fn query_node_families_contract<T>(
&self,
query: NodeFamiliesQueryMsg,
) -> Result<T, NyxdError>
where
for<'a> T: Deserialize<'a>,
{
let node_families_contract_address = &self
.node_families_contract_address()
.ok_or_else(|| NyxdError::unavailable_contract_address("node families contract"))?;
self.query_contract_smart(node_families_contract_address, &query)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::nyxd::contract_traits::tests::IgnoreValue;
use nym_node_families_contract_common::QueryMsg;
// it's enough that this compiles and clippy is happy about it
#[allow(dead_code)]
fn all_query_variants_are_covered<C: NodeFamiliesQueryClient + Send + Sync>(
client: C,
msg: NodeFamiliesQueryMsg,
) {
match msg {
NodeFamiliesQueryMsg::GetFamilyById { family_id } => {
client.get_family_by_id(family_id).ignore()
}
NodeFamiliesQueryMsg::GetFamilyByOwner { owner } => {
client.get_family_by_owner(&owner.parse().unwrap()).ignore()
}
NodeFamiliesQueryMsg::GetFamilyByName { name } => {
client.get_family_by_name(name).ignore()
}
NodeFamiliesQueryMsg::GetFamiliesPaged { start_after, limit } => {
client.get_families_paged(start_after, limit).ignore()
}
NodeFamiliesQueryMsg::GetFamilyMembership { node_id } => {
client.get_family_membership(node_id).ignore()
}
NodeFamiliesQueryMsg::GetFamilyMembersPaged {
family_id,
start_after,
limit,
} => client
.get_family_members_paged(family_id, start_after, limit)
.ignore(),
NodeFamiliesQueryMsg::GetAllFamilyMembersPaged { start_after, limit } => client
.get_all_family_members_paged(start_after, limit)
.ignore(),
NodeFamiliesQueryMsg::GetPendingInvitation { family_id, node_id } => {
client.get_pending_invitation(family_id, node_id).ignore()
}
NodeFamiliesQueryMsg::GetPendingInvitationsForFamilyPaged {
family_id,
start_after,
limit,
} => client
.get_pending_invitations_for_family_paged(family_id, start_after, limit)
.ignore(),
NodeFamiliesQueryMsg::GetPendingInvitationsForNodePaged {
node_id,
start_after,
limit,
} => client
.get_pending_invitations_for_node_paged(node_id, start_after, limit)
.ignore(),
NodeFamiliesQueryMsg::GetAllPendingInvitationsPaged { start_after, limit } => client
.get_all_pending_invitations_paged(start_after, limit)
.ignore(),
NodeFamiliesQueryMsg::GetPastInvitationsForFamilyPaged {
family_id,
start_after,
limit,
} => client
.get_past_invitations_for_family_paged(family_id, start_after, limit)
.ignore(),
NodeFamiliesQueryMsg::GetPastInvitationsForNodePaged {
node_id,
start_after,
limit,
} => client
.get_past_invitations_for_node_paged(node_id, start_after, limit)
.ignore(),
NodeFamiliesQueryMsg::GetAllPastInvitationsPaged { start_after, limit } => client
.get_all_past_invitations_paged(start_after, limit)
.ignore(),
NodeFamiliesQueryMsg::GetPastMembersForFamilyPaged {
family_id,
start_after,
limit,
} => client
.get_past_members_for_family_paged(family_id, start_after, limit)
.ignore(),
QueryMsg::GetPastMembersForNodePaged {
node_id,
start_after,
limit,
} => client
.get_past_members_for_node_paged(node_id, start_after, limit)
.ignore(),
};
}
}
@@ -0,0 +1,254 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::nyxd::coin::Coin;
use crate::nyxd::contract_traits::NymContractsProvider;
use crate::nyxd::cosmwasm_client::types::ExecuteResult;
use crate::nyxd::error::NyxdError;
use crate::nyxd::{Fee, SigningCosmWasmClient};
use crate::signing::signer::OfflineSigner;
use async_trait::async_trait;
use nym_mixnet_contract_common::NodeId;
use nym_node_families_contract_common::{
Config, ExecuteMsg as NodeFamiliesExecuteMsg, NodeFamilyId,
};
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait NodeFamiliesSigningClient {
async fn execute_node_families_contract(
&self,
fee: Option<Fee>,
msg: NodeFamiliesExecuteMsg,
memo: String,
funds: Vec<Coin>,
) -> Result<ExecuteResult, NyxdError>;
async fn update_node_families_config(
&self,
config: Config,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::UpdateConfig { config },
"NodeFamiliesContract::UpdateConfig".to_string(),
vec![],
)
.await
}
async fn create_family(
&self,
name: String,
description: String,
fee: Option<Fee>,
creation_fee: Vec<Coin>,
) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::CreateFamily { name, description },
"NodeFamiliesContract::CreateFamily".to_string(),
creation_fee,
)
.await
}
async fn disband_family(&self, fee: Option<Fee>) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::DisbandFamily {},
"NodeFamiliesContract::DisbandFamily".to_string(),
vec![],
)
.await
}
async fn invite_to_family(
&self,
node_id: NodeId,
validity_secs: Option<u64>,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::InviteToFamily {
node_id,
validity_secs,
},
"NodeFamiliesContract::InviteToFamily".to_string(),
vec![],
)
.await
}
async fn revoke_family_invitation(
&self,
node_id: NodeId,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::RevokeFamilyInvitation { node_id },
"NodeFamiliesContract::RevokeFamilyInvitation".to_string(),
vec![],
)
.await
}
async fn accept_family_invitation(
&self,
family_id: NodeFamilyId,
node_id: NodeId,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::AcceptFamilyInvitation { family_id, node_id },
"NodeFamiliesContract::AcceptFamilyInvitation".to_string(),
vec![],
)
.await
}
async fn reject_family_invitation(
&self,
family_id: NodeFamilyId,
node_id: NodeId,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::RejectFamilyInvitation { family_id, node_id },
"NodeFamiliesContract::RejectFamilyInvitation".to_string(),
vec![],
)
.await
}
async fn leave_family(
&self,
node_id: NodeId,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::LeaveFamily { node_id },
"NodeFamiliesContract::LeaveFamily".to_string(),
vec![],
)
.await
}
async fn kick_from_family(
&self,
node_id: NodeId,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::KickFromFamily { node_id },
"NodeFamiliesContract::KickFromFamily".to_string(),
vec![],
)
.await
}
/// Cross-contract callback fired by the mixnet contract on node unbonding.
/// Exposed for completeness; the families contract rejects this call from
/// any sender other than the configured mixnet contract address.
async fn on_nym_node_unbond(
&self,
node_id: NodeId,
fee: Option<Fee>,
) -> Result<ExecuteResult, NyxdError> {
self.execute_node_families_contract(
fee,
NodeFamiliesExecuteMsg::OnNymNodeUnbond { node_id },
"NodeFamiliesContract::OnNymNodeUnbond".to_string(),
vec![],
)
.await
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<C> NodeFamiliesSigningClient for C
where
C: SigningCosmWasmClient + NymContractsProvider + Sync,
NyxdError: From<<Self as OfflineSigner>::Error>,
{
async fn execute_node_families_contract(
&self,
fee: Option<Fee>,
msg: NodeFamiliesExecuteMsg,
memo: String,
funds: Vec<Coin>,
) -> Result<ExecuteResult, NyxdError> {
let node_families_contract_address = &self
.node_families_contract_address()
.ok_or_else(|| NyxdError::unavailable_contract_address("node families contract"))?;
let fee = fee.unwrap_or(Fee::Auto(Some(self.simulated_gas_multiplier())));
let signer_address = &self.signer_addresses()[0];
self.execute(
signer_address,
node_families_contract_address,
&msg,
fee,
memo,
funds,
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::nyxd::contract_traits::tests::IgnoreValue;
use nym_node_families_contract_common::ExecuteMsg;
// it's enough that this compiles and clippy is happy about it
#[allow(dead_code)]
fn all_execute_variants_are_covered<C: NodeFamiliesSigningClient + Send + Sync>(
client: C,
msg: NodeFamiliesExecuteMsg,
) {
match msg {
NodeFamiliesExecuteMsg::UpdateConfig { config } => {
client.update_node_families_config(config, None).ignore()
}
NodeFamiliesExecuteMsg::CreateFamily { name, description } => client
.create_family(name, description, None, vec![])
.ignore(),
NodeFamiliesExecuteMsg::DisbandFamily {} => client.disband_family(None).ignore(),
NodeFamiliesExecuteMsg::InviteToFamily {
node_id,
validity_secs,
} => client
.invite_to_family(node_id, validity_secs, None)
.ignore(),
NodeFamiliesExecuteMsg::RevokeFamilyInvitation { node_id } => {
client.revoke_family_invitation(node_id, None).ignore()
}
NodeFamiliesExecuteMsg::AcceptFamilyInvitation { family_id, node_id } => client
.accept_family_invitation(family_id, node_id, None)
.ignore(),
NodeFamiliesExecuteMsg::RejectFamilyInvitation { family_id, node_id } => client
.reject_family_invitation(family_id, node_id, None)
.ignore(),
NodeFamiliesExecuteMsg::LeaveFamily { node_id } => {
client.leave_family(node_id, None).ignore()
}
NodeFamiliesExecuteMsg::KickFromFamily { node_id } => {
client.kick_from_family(node_id, None).ignore()
}
ExecuteMsg::OnNymNodeUnbond { node_id } => {
client.on_nym_node_unbond(node_id, None).ignore()
}
};
}
}
@@ -286,6 +286,10 @@ impl<C, S> NyxdClient<C, S> {
self.config.contracts.multisig_contract_address = Some(address);
}
pub fn set_node_families_contract_address(&mut self, address: AccountId) {
self.config.contracts.node_families_contract_address = Some(address);
}
pub fn set_simulated_gas_multiplier(&mut self, multiplier: f32) {
self.config.simulated_gas_multiplier = multiplier;
}
@@ -304,6 +308,13 @@ impl<C, S> NymContractsProvider for NyxdClient<C, S> {
self.config.contracts.performance_contract_address.as_ref()
}
fn node_families_contract_address(&self) -> Option<&AccountId> {
self.config
.contracts
.node_families_contract_address
.as_ref()
}
fn ecash_contract_address(&self) -> Option<&AccountId> {
self.config.contracts.ecash_contract_address.as_ref()
}
@@ -30,6 +30,9 @@ pub struct Args {
#[clap(long)]
pub vesting_contract_address: Option<AccountId>,
#[clap(long)]
pub node_families_contract_address: Option<AccountId>,
#[clap(long)]
pub rewarding_denom: Option<String>,
@@ -130,6 +133,14 @@ pub async fn generate(args: Args) {
.expect("Failed converting vesting contract address to AccountId")
});
let node_families_contract_address = args.node_families_contract_address.unwrap_or_else(|| {
let address =
std::env::var(nym_network_defaults::var_names::NODE_FAMILIES_CONTRACT_ADDRESS)
.expect("node families contract address has to be set");
AccountId::from_str(address.as_str())
.expect("Failed converting node families contract address to AccountId")
});
let rewarding_denom = args.rewarding_denom.unwrap_or_else(|| {
std::env::var(nym_network_defaults::var_names::MIX_DENOM)
.expect("Rewarding (mix) denom has to be set")
@@ -142,6 +153,7 @@ pub async fn generate(args: Args) {
let instantiate_msg = InstantiateMsg {
rewarding_validator_address: rewarding_validator_address.to_string(),
vesting_contract_address: vesting_contract_address.to_string(),
node_families_contract_address: node_families_contract_address.to_string(),
rewarding_denom,
epochs_in_interval: args.epochs_in_interval,
epoch_duration: Duration::from_secs(args.epoch_duration),
@@ -26,6 +26,14 @@ pub trait ContractOpts {
fn addr_make(&self, input: &str) -> Addr;
fn make_sender_with_funds(&self, input: &str, funds: &[Coin]) -> MessageInfo {
message_info(&self.addr_make(input), funds)
}
fn make_sender(&self, input: &str) -> MessageInfo {
self.make_sender_with_funds(input, &[])
}
fn deps_mut_env(&mut self) -> (DepsMut<'_>, Env) {
let env = self.env().clone();
(self.deps_mut(), env)
@@ -3,12 +3,121 @@
use crate::error::MixnetContractError;
use crate::mixnode::PendingMixNodeChanges;
use crate::nym_node::NodeOwnershipResponse;
use crate::{
EpochEventId, IntervalEventId, MixNodeBond, MixNodeDetails, NodeId, NodeRewarding, NymNodeBond,
NymNodeDetails, PendingNodeChanges,
EpochEventId, EpochId, Interval, IntervalEventId, MixNodeBond, MixNodeDetails, NodeId,
NodeRewarding, NymNodeBond, NymNodeDetails, PendingNodeChanges, QueryMsg,
};
use cosmwasm_std::{Coin, Decimal, StdError, StdResult, Uint128};
use cosmwasm_std::{
Addr, Binary, Coin, CustomQuery, Decimal, QuerierWrapper, StdError, StdResult, Uint128,
from_json,
};
use cw_storage_plus::{Key, Namespace, Path, PrimaryKey};
use nym_contracts_common::IdentityKeyRef;
use serde::de::DeserializeOwned;
use std::ops::Deref;
pub trait MixnetContractQuerier {
#[allow(dead_code)]
fn query_mixnet_contract<T: DeserializeOwned>(
&self,
address: impl Into<String>,
msg: &QueryMsg,
) -> StdResult<T>;
fn query_mixnet_contract_storage(
&self,
address: impl Into<String>,
key: impl Into<Binary>,
) -> StdResult<Option<Vec<u8>>>;
fn query_mixnet_contract_storage_value<T: DeserializeOwned>(
&self,
address: impl Into<String>,
key: impl Into<Binary>,
) -> StdResult<Option<T>> {
match self.query_mixnet_contract_storage(address, key)? {
None => Ok(None),
Some(value) => Ok(Some(from_json(&value)?)),
}
}
fn query_current_mixnet_interval(&self, address: impl Into<String>) -> StdResult<Interval> {
self.query_mixnet_contract_storage_value(address, b"ci")?
.ok_or(StdError::not_found(
"unable to retrieve interval information from the mixnet contract storage",
))
}
fn query_current_absolute_mixnet_epoch_id(
&self,
address: impl Into<String>,
) -> StdResult<EpochId> {
self.query_current_mixnet_interval(address)
.map(|interval| interval.current_epoch_absolute_id())
}
fn check_node_existence(&self, address: impl Into<String>, node_id: NodeId) -> StdResult<bool> {
let mixnet_contract_address = address.into();
if let Some(nym_node) = self.query_nymnode_bond(mixnet_contract_address.clone(), node_id)? {
return Ok(!nym_node.is_unbonding);
}
Ok(false)
}
fn query_nymnode_bond(
&self,
address: impl Into<String>,
node_id: NodeId,
) -> StdResult<Option<NymNodeBond>> {
// construct proper map key
let pk_namespace = "nn";
let path: Path<NymNodeBond> = Path::new(
Namespace::from_static_str(pk_namespace).as_slice(),
&node_id.key().iter().map(Key::as_ref).collect::<Vec<_>>(),
);
let storage_key = path.deref();
self.query_mixnet_contract_storage_value(address, storage_key)
}
fn query_nymnode_ownership(
&self,
address: impl Into<String>,
owner: &Addr,
) -> StdResult<Option<NymNodeBond>> {
let resp: NodeOwnershipResponse = self.query_mixnet_contract(
address,
&QueryMsg::GetOwnedNymNode {
address: owner.to_string(),
},
)?;
Ok(resp.details.map(|d| d.bond_information))
}
}
impl<C> MixnetContractQuerier for QuerierWrapper<'_, C>
where
C: CustomQuery,
{
fn query_mixnet_contract<T: DeserializeOwned>(
&self,
address: impl Into<String>,
msg: &QueryMsg,
) -> StdResult<T> {
self.query_wasm_smart(address, msg)
}
fn query_mixnet_contract_storage(
&self,
address: impl Into<String>,
key: impl Into<Binary>,
) -> StdResult<Option<Vec<u8>>> {
self.query_wasm_raw(address, key)
}
}
#[track_caller]
pub fn compare_decimals(a: Decimal, b: Decimal, epsilon: Option<Decimal>) {
@@ -30,6 +30,7 @@ pub use gateway::{
Gateway, GatewayBond, GatewayBondResponse, GatewayConfigUpdate, GatewayOwnershipResponse,
PagedGatewayResponse,
};
pub use helpers::MixnetContractQuerier;
pub use interval::{
CurrentIntervalResponse, EpochId, EpochState, EpochStatus, Interval, IntervalId,
};
@@ -190,6 +190,10 @@ impl NodeRewarding {
truncate_reward(self.operator, denom)
}
pub fn delegations_with_reward(&self, denom: impl Into<String>) -> Coin {
truncate_reward(self.delegates, denom)
}
pub fn pending_delegator_reward(&self, delegation: &Delegation) -> StdResult<Coin> {
let delegator_reward = self.determine_delegation_reward(delegation)?;
Ok(truncate_reward(delegator_reward, &delegation.amount.denom))
@@ -63,6 +63,7 @@ use nym_contracts_common::{ContractBuildInformation, signing::Nonce};
pub struct InstantiateMsg {
pub rewarding_validator_address: String,
pub vesting_contract_address: String,
pub node_families_contract_address: String,
pub rewarding_denom: String,
pub epochs_in_interval: u32,
@@ -305,6 +306,22 @@ pub enum ExecuteMsg {
MigrateVestedDelegation {
mix_id: NodeId,
},
/// Admin-only: forcibly migrate the vested mixnode owned by `owner`.
/// Used to drain the last vested entries so the mixnet contract can drop its dependency on the vesting contract.
AdminMigrateVestedMixNode {
owner: String,
},
/// Admin-only: forcibly migrate the vested delegation `(mix_id, owner)`.
/// Used to drain the last vested entries so the mixnet contract can drop its dependency on the vesting contract.
AdminMigrateVestedDelegation {
mix_id: NodeId,
owner: String,
},
/// Admin-only: batch variant of [`ExecuteMsg::AdminMigrateVestedDelegation`].
/// Reverts the entire batch on the first error, so callers should treat it as all-or-nothing.
AdminBatchMigrateVestedDelegations {
entries: Vec<VestedDelegationMigrationEntry>,
},
// testing-only
#[cfg(feature = "contract-testing")]
@@ -394,6 +411,15 @@ impl ExecuteMsg {
}
ExecuteMsg::MigrateVestedMixNode { .. } => "migrate vested mixnode".into(),
ExecuteMsg::MigrateVestedDelegation { .. } => "migrate vested delegation".to_string(),
ExecuteMsg::AdminMigrateVestedMixNode { owner } => {
format!("admin migrating vested mixnode of {owner}")
}
ExecuteMsg::AdminMigrateVestedDelegation { mix_id, owner } => {
format!("admin migrating vested delegation of {owner} on mixnode {mix_id}")
}
ExecuteMsg::AdminBatchMigrateVestedDelegations { entries } => {
format!("admin batch migrating {} vested delegations", entries.len())
}
ExecuteMsg::AssignRoles { .. } => "assigning epoch roles".into(),
ExecuteMsg::MigrateMixnode { .. } => "migrating legacy mixnode".into(),
ExecuteMsg::MigrateGateway { .. } => "migrating legacy gateway".into(),
@@ -881,8 +907,15 @@ pub enum QueryMsg {
GetKeyRotationId {},
}
#[cw_serde]
pub struct VestedDelegationMigrationEntry {
pub mix_id: NodeId,
pub owner: String,
}
#[cw_serde]
pub struct MigrateMsg {
pub unsafe_skip_state_updates: Option<bool>,
pub vesting_contract_address: Option<String>,
pub node_families_contract_address: String,
}
@@ -212,6 +212,10 @@ pub struct ContractState {
/// track-related messages.
pub vesting_contract_address: Addr,
/// Address of the node families contract. It is called whenever nym-node unbonds
/// so that it could be removed from any family it belongs to.
pub node_families_contract_address: Addr,
/// The expected denom used for rewarding (and realistically any other operation).
/// Default: `unym`
pub rewarding_denom: String,
@@ -0,0 +1,32 @@
[package]
name = "nym-node-families-contract-common"
description = "Common crate for Nym's node families contract"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
rust-version = "1.85"
readme.workspace = true
publish = true
[dependencies]
thiserror = { workspace = true }
serde = { workspace = true }
schemars = { workspace = true }
cosmwasm-std = { workspace = true }
cosmwasm-schema = { workspace = true }
cw-controllers = { workspace = true }
cw-utils = { workspace = true }
nym-contracts-common = { workspace = true }
nym-mixnet-contract-common = { workspace = true }
[features]
schema = []
[lints]
workspace = true
@@ -0,0 +1,104 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
/// Storage key constants used by the node families contract.
///
/// They are kept in the common crate so that off-chain tooling (indexers, migration
/// scripts) can reference them without depending on the contract crate itself.
/// Changing any of these values is a breaking change for already-deployed contracts.
pub mod storage_keys {
/// `Item<Addr>`: address of the mixnet contract used to validate node existence.
pub const MIXNET_CONTRACT_ADDRESS: &str = "mixnet-contract-address";
/// `Item<Config>`: runtime configuration (fees, length limits) set at instantiation.
pub const CONFIG: &str = "config";
/// `Admin` (cw-controllers): admin allowed to perform privileged operations.
pub const CONTRACT_ADMIN: &str = "contract-admin";
/// `Item<NodeFamilyId>`: monotonically increasing id counter for new families.
pub const NODE_FAMILY_ID_COUNTER: &str = "node-family-id-counter";
/// Primary namespace for the current family-members `IndexedMap`,
/// keyed by `NodeId` with value [`crate::FamilyMembership`].
pub const NODE_FAMILY_MEMBERS: &str = "node-family-members";
/// Multi-index over current family members keyed by family id —
/// enables paginated listing of all nodes in a given family.
pub const NODE_FAMILY_MEMBERS_FAMILY_IDX_NAMESPACE: &str = "node-family-members__family";
/// Primary namespace for the families `IndexedMap`.
pub const FAMILIES_NAMESPACE: &str = "families";
/// Secondary unique index keyed by `owner` (one family per owner).
pub const FAMILIES_OWNER_IDX_NAMESPACE: &str = "families__owner";
/// Secondary unique index keyed by `name` (family names are globally unique).
pub const FAMILIES_NAME_IDX_NAMESPACE: &str = "families__name";
/// Primary namespace for the pending invitations `IndexedMap`.
pub const INVITATIONS_NAMESPACE: &str = "invitations";
/// Multi-index over pending invitations keyed by family id.
pub const INVITATIONS_FAMILY_IDX_NAMESPACE: &str = "invitations__family";
/// Multi-index over pending invitations keyed by node id
/// (a node can be invited to multiple families simultaneously).
pub const INVITATIONS_NODE_IDX_NAMESPACE: &str = "invitations__node";
/// Primary namespace for the archived (accepted/rejected/revoked) invitations `IndexedMap`.
pub const PAST_INVITATIONS_NAMESPACE: &str = "past-invitations";
/// Multi-index over past invitations keyed by family id.
pub const PAST_INVITATIONS_FAMILY_IDX_NAMESPACE: &str = "past-invitations__family";
/// Multi-index over past invitations keyed by node id.
pub const PAST_INVITATIONS_NODE_IDX_NAMESPACE: &str = "past-invitations__node";
/// `Map<(NodeFamilyId, NodeId), u64>`: per-`(family, node)` counter used to
/// disambiguate repeat archive entries (a node can be invited and have the
/// invitation reach a terminal state more than once).
pub const PAST_INVITATIONS_COUNTER_NAMESPACE: &str = "past-invitations-counter";
/// Primary namespace for the past-members `IndexedMap`.
pub const PAST_FAMILY_MEMBER_NAMESPACE: &str = "past-family-member";
/// Multi-index over past members keyed by family id.
pub const PAST_FAMILY_MEMBER_FAMILY_IDX_NAMESPACE: &str = "past-family-member__family";
/// Multi-index over past members keyed by node id.
pub const PAST_FAMILY_MEMBER_NODE_IDX_NAMESPACE: &str = "past-family-member__node";
/// `Map<(NodeFamilyId, NodeId), u64>`: per-`(family, node)` counter used to
/// disambiguate repeat past-membership entries (a node can join and leave
/// the same family more than once).
pub const PAST_FAMILY_MEMBER_COUNTER_NAMESPACE: &str = "past-family-member-counter";
}
pub mod events {
pub const FAMILY_CREATION_EVENT_NAME: &str = "family_creation";
pub const FAMILY_CREATION_EVENT_FAMILY_NAME: &str = "family_name";
pub const FAMILY_CREATION_EVENT_OWNER_ADDRESS: &str = "owner_address";
pub const FAMILY_CREATION_EVENT_FAMILY_ID: &str = "family_id";
pub const FAMILY_CREATION_EVENT_PAID_FEE: &str = "paid_fee";
pub const FAMILY_DISBAND_EVENT_NAME: &str = "family_disband";
pub const FAMILY_DISBAND_EVENT_FAMILY_ID: &str = "family_id";
pub const FAMILY_DISBAND_EVENT_OWNER_ADDRESS: &str = "owner_address";
pub const FAMILY_DISBAND_EVENT_REFUNDED_FEE: &str = "refunded_fee";
pub const FAMILY_INVITATION_EVENT_NAME: &str = "family_invitation";
pub const FAMILY_INVITATION_EVENT_FAMILY_ID: &str = "family_id";
pub const FAMILY_INVITATION_EVENT_NODE_ID: &str = "node_id";
pub const FAMILY_INVITATION_EVENT_EXPIRES_AT: &str = "expires_at";
pub const FAMILY_INVITATION_REVOKED_EVENT_NAME: &str = "family_invitation_revoked";
pub const FAMILY_INVITATION_REVOKED_EVENT_FAMILY_ID: &str = "family_id";
pub const FAMILY_INVITATION_REVOKED_EVENT_NODE_ID: &str = "node_id";
pub const FAMILY_INVITATION_ACCEPTED_EVENT_NAME: &str = "family_invitation_accepted";
pub const FAMILY_INVITATION_ACCEPTED_EVENT_FAMILY_ID: &str = "family_id";
pub const FAMILY_INVITATION_ACCEPTED_EVENT_NODE_ID: &str = "node_id";
pub const FAMILY_INVITATION_REJECTED_EVENT_NAME: &str = "family_invitation_rejected";
pub const FAMILY_INVITATION_REJECTED_EVENT_FAMILY_ID: &str = "family_id";
pub const FAMILY_INVITATION_REJECTED_EVENT_NODE_ID: &str = "node_id";
pub const FAMILY_MEMBER_LEFT_EVENT_NAME: &str = "family_member_left";
pub const FAMILY_MEMBER_LEFT_EVENT_FAMILY_ID: &str = "family_id";
pub const FAMILY_MEMBER_LEFT_EVENT_NODE_ID: &str = "node_id";
pub const FAMILY_MEMBER_KICKED_EVENT_NAME: &str = "family_member_kicked";
pub const FAMILY_MEMBER_KICKED_EVENT_FAMILY_ID: &str = "family_id";
pub const FAMILY_MEMBER_KICKED_EVENT_NODE_ID: &str = "node_id";
pub const NODE_UNBOND_CLEANUP_EVENT_NAME: &str = "family_node_unbond_cleanup";
pub const NODE_UNBOND_CLEANUP_EVENT_NODE_ID: &str = "node_id";
}
@@ -0,0 +1,161 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::NodeFamilyId;
use cosmwasm_std::{Addr, Coin};
use cw_controllers::AdminError;
use cw_utils::PaymentError;
use nym_mixnet_contract_common::NodeId;
use thiserror::Error;
/// Errors returned from any entry point of the node families contract.
#[derive(Error, Debug, PartialEq)]
pub enum NodeFamiliesContractError {
/// Returned from `migrate` when the on-chain state cannot be brought forward
/// to the current contract version (e.g. unsupported source version, malformed
/// stored data).
#[error("could not perform contract migration: {comment}")]
FailedMigration { comment: String },
/// The referenced family does not exist (or no longer exists).
#[error("family with id {family_id} does not exist")]
FamilyNotFound { family_id: NodeFamilyId },
/// Disbanding was requested on a family that still has members.
#[error("family {family_id} cannot be disbanded: it still has {members} member(s)")]
FamilyNotEmpty {
family_id: NodeFamilyId,
members: u64,
},
/// The given node is not currently a member of any family.
#[error("node {node_id} is not currently a member of any family")]
NodeNotInFamily { node_id: NodeId },
/// The given node is a member of a different family than the one the
/// caller is acting on. Distinct from [`NodeNotInFamily`] (which means the
/// node has no membership at all) — surfaces when, e.g., a family owner
/// tries to kick a node that belongs to someone else's family.
#[error("node {node_id} is not a member of family {family_id}")]
NodeNotMemberOfFamily {
node_id: NodeId,
family_id: NodeFamilyId,
},
/// A cross-contract callback (e.g. `OnNymNodeUnbond`) was received from a
/// sender that is not the configured mixnet contract address.
#[error("address {sender} is not authorised to invoke the mixnet-contract callback")]
UnauthorisedMixnetCallback { sender: Addr },
/// No pending invitation exists for the given `(family, node)` pair.
#[error("no pending invitation for node {node_id} from family {family_id}")]
InvitationNotFound {
family_id: NodeFamilyId,
node_id: NodeId,
},
/// A pending invitation for the given `(family, node)` pair already exists;
/// issuing a new one would silently overwrite it.
#[error("a pending invitation for node {node_id} from family {family_id} already exists")]
PendingInvitationAlreadyExists {
family_id: NodeFamilyId,
node_id: NodeId,
},
/// The invitation exists but its `expires_at` is at or before the current
/// block time, so it can no longer be acted on.
#[error(
"invitation for node {node_id} from family {family_id} expired at {expires_at} (now: {now})"
)]
InvitationExpired {
family_id: NodeFamilyId,
node_id: NodeId,
expires_at: u64,
now: u64,
},
/// The funds attached to a paid execution failed `cw_utils` payment
/// validation (no funds, wrong/extra denom).
#[error("invalid fee provided: {0}")]
InvalidDeposit(#[from] PaymentError),
/// The funds attached to a `CreateFamily` execution don't match the
/// configured `create_family_fee`.
#[error("expected exactly {expected} as family creation fee; received {received:?}")]
InvalidFamilyCreationFee { expected: Coin, received: Vec<Coin> },
/// The submitted family name normalised to the empty string (i.e. it
/// contained no ASCII alphanumeric characters).
#[error("family name cannot be empty after normalisation")]
EmptyFamilyName,
/// The submitted family name exceeds the configured length limit.
#[error("family name length {length} exceeds the configured limit of {limit}")]
FamilyNameTooLong { length: usize, limit: usize },
/// The submitted family description exceeds the configured length limit.
#[error("family description length {length} exceeds the configured limit of {limit}")]
FamilyDescriptionTooLong { length: usize, limit: usize },
/// The transaction sender already owns a family.
#[error("address {address} already owns family {family_id}")]
SenderAlreadyOwnsAFamily {
address: Addr,
family_id: NodeFamilyId,
},
/// The transaction sender does not currently own any family - emitted by
/// owner-gated operations like `disband_family` when the sender has
/// nothing to act on.
#[error("address {address} does not currently own any family")]
SenderDoesntOwnAFamily { address: Addr },
/// The transaction sender is not the controller of the bonded node
/// referenced by the message. Covers all of: sender controls no bonded
/// node, sender controls a different node id, and sender's node has
/// entered the unbonding state.
#[error("address {address} is not the controller of bonded node {node_id}")]
SenderDoesntControlNode { address: Addr, node_id: NodeId },
/// A family with the requested (normalised) name already exists.
#[error("a family with name {name:?} already exists (id {family_id})")]
FamilyNameAlreadyTaken {
name: String,
family_id: NodeFamilyId,
},
/// A node controlled by the address is currently a member of a family,
/// so the address cannot also become a family owner or join another family.
#[error("address {address} controls node {node_id} which is currently in family {family_id}")]
AlreadyInFamily {
address: Addr,
node_id: NodeId,
family_id: NodeFamilyId,
},
/// The node referenced by an invitation does not exist as a bonded node
/// in the mixnet contract (or has already unbonded).
#[error("node {node_id} is not a bonded node in the mixnet contract")]
NodeDoesntExist { node_id: NodeId },
/// The node referenced by an invitation is already a member of a family,
/// so it cannot be invited to another one until it leaves / is removed.
#[error("node {node_id} is already a member of family {family_id}")]
NodeAlreadyInFamily {
node_id: NodeId,
family_id: NodeFamilyId,
},
/// The sender supplied a `validity_secs` of `0` for an invitation, which
/// would create one that is already expired at the moment it is stored.
#[error("invitation validity must be strictly positive")]
ZeroInvitationValidity,
/// Wraps errors raised by `cw-controllers::Admin` (e.g. caller is not admin).
#[error(transparent)]
Admin(#[from] AdminError),
/// Wraps any underlying `cosmwasm_std::StdError` (storage, serialization, etc.).
#[error(transparent)]
StdErr(#[from] cosmwasm_std::StdError),
}
@@ -0,0 +1,22 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Common types, messages, errors and storage-key constants shared between the
//! node families contract and any off-chain client.
//!
//! Keeping these in a separate crate allows clients to depend on the contract's
//! public surface without pulling in `cw-storage-plus` and other on-chain-only
//! dependencies.
/// Storage-key string constants. See [`constants::storage_keys`].
pub mod constants;
/// Contract-level error type.
pub mod error;
/// `InstantiateMsg`, `ExecuteMsg`, `QueryMsg`, `MigrateMsg` definitions.
pub mod msg;
/// Domain types stored in / returned by the contract.
pub mod types;
pub use error::*;
pub use msg::{ExecuteMsg, InstantiateMsg, MigrateMsg, QueryMsg};
pub use types::*;
@@ -0,0 +1,211 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::{
Config, GlobalPastFamilyInvitationCursor, NodeFamilyId, PastFamilyInvitationCursor,
PastFamilyInvitationForNodeCursor, PastFamilyMemberCursor, PastFamilyMemberForNodeCursor,
};
use cosmwasm_schema::cw_serde;
use nym_mixnet_contract_common::NodeId;
#[cfg(feature = "schema")]
use crate::{
AllFamilyMembersPagedResponse, AllPastFamilyInvitationsPagedResponse, FamiliesPagedResponse,
FamilyMembersPagedResponse, NodeFamilyByNameResponse, NodeFamilyByOwnerResponse,
NodeFamilyMembershipResponse, NodeFamilyResponse, PastFamilyInvitationsForNodePagedResponse,
PastFamilyInvitationsPagedResponse, PastFamilyMembersForNodePagedResponse,
PastFamilyMembersPagedResponse, PendingFamilyInvitationResponse,
PendingFamilyInvitationsPagedResponse, PendingInvitationsForNodePagedResponse,
PendingInvitationsPagedResponse,
};
/// Message used to instantiate the node families contract.
#[cw_serde]
pub struct InstantiateMsg {
pub config: Config,
pub mixnet_contract_address: String,
}
/// Execute messages accepted by the contract.
#[cw_serde]
pub enum ExecuteMsg {
/// Replace the contract's runtime [`Config`]. Restricted to the contract
/// admin.
UpdateConfig { config: Config },
/// Create a new family owned by the message sender. The configured
/// `create_family_fee` must be attached as funds.
CreateFamily { name: String, description: String },
/// Disband the family owned by the message sender. The family must have
/// no current members; any still-pending invitations are revoked.
DisbandFamily {},
/// Invite a node to the family owned by the message sender. If
/// `validity_secs` is omitted the invitation expires
/// `default_invitation_validity_secs` seconds (from [`Config`]) after the
/// current block time.
InviteToFamily {
node_id: NodeId,
validity_secs: Option<u64>,
},
/// Revoke a still-pending invitation previously issued by the sender's
/// family.
RevokeFamilyInvitation { node_id: NodeId },
/// Accept a pending invitation. The sender must control `node_id`.
AcceptFamilyInvitation {
family_id: NodeFamilyId,
node_id: NodeId,
},
/// Reject a pending invitation. The sender must control `node_id`.
RejectFamilyInvitation {
family_id: NodeFamilyId,
node_id: NodeId,
},
/// Leave the family `node_id` currently belongs to. The sender must
/// control `node_id`.
LeaveFamily { node_id: NodeId },
/// Remove `node_id` from the family owned by the message sender.
KickFromFamily { node_id: NodeId },
/// Cross-contract callback fired by the mixnet contract the moment
/// node with `node_id` initiates unbonding.
/// Removes the node from any family it currently
/// belongs to and rejects every pending invitation issued to it.
/// Sender must be the configured mixnet contract address.
OnNymNodeUnbond { node_id: NodeId },
}
/// Query messages accepted by the contract.
#[cw_serde]
#[cfg_attr(feature = "schema", derive(cosmwasm_schema::QueryResponses))]
pub enum QueryMsg {
/// Look up a single family by its id.
#[cfg_attr(feature = "schema", returns(NodeFamilyResponse))]
GetFamilyById { family_id: NodeFamilyId },
/// Look up the (at most one) family owned by a given address.
#[cfg_attr(feature = "schema", returns(NodeFamilyByOwnerResponse))]
GetFamilyByOwner { owner: String },
/// Look up a single family by its name. The lookup is normalised
/// contract-side (lowercased, non-alphanumerics stripped), so equivalent
/// inputs resolve to the same family.
#[cfg_attr(feature = "schema", returns(NodeFamilyByNameResponse))]
GetFamilyByName { name: String },
#[cfg_attr(feature = "schema", returns(FamiliesPagedResponse))]
GetFamiliesPaged {
start_after: Option<NodeFamilyId>,
limit: Option<u32>,
},
/// Look up which family — if any — a node currently belongs to.
#[cfg_attr(feature = "schema", returns(NodeFamilyMembershipResponse))]
GetFamilyMembership { node_id: NodeId },
/// Page through every node currently in a given family.
#[cfg_attr(feature = "schema", returns(FamilyMembersPagedResponse))]
GetFamilyMembersPaged {
family_id: NodeFamilyId,
start_after: Option<NodeId>,
limit: Option<u32>,
},
/// Page through every current family member across all families, in
/// ascending [`NodeId`] order. Each entry carries the membership record
/// (which in turn names the family the node belongs to).
#[cfg_attr(feature = "schema", returns(AllFamilyMembersPagedResponse))]
GetAllFamilyMembersPaged {
start_after: Option<NodeId>,
limit: Option<u32>,
},
/// Look up the pending invitation for a specific `(family_id, node_id)`
/// pair.
#[cfg_attr(feature = "schema", returns(PendingFamilyInvitationResponse))]
GetPendingInvitation {
family_id: NodeFamilyId,
node_id: NodeId,
},
/// Page through every pending invitation issued by a given family.
#[cfg_attr(feature = "schema", returns(PendingFamilyInvitationsPagedResponse))]
GetPendingInvitationsForFamilyPaged {
family_id: NodeFamilyId,
start_after: Option<NodeId>,
limit: Option<u32>,
},
/// Page through every pending invitation issued for a given node.
#[cfg_attr(feature = "schema", returns(PendingInvitationsForNodePagedResponse))]
GetPendingInvitationsForNodePaged {
node_id: NodeId,
start_after: Option<NodeFamilyId>,
limit: Option<u32>,
},
/// Page through every pending invitation across all families.
#[cfg_attr(feature = "schema", returns(PendingInvitationsPagedResponse))]
GetAllPendingInvitationsPaged {
start_after: Option<(NodeFamilyId, NodeId)>,
limit: Option<u32>,
},
/// Page through every archived (terminal-state) invitation issued by a
/// given family.
#[cfg_attr(feature = "schema", returns(PastFamilyInvitationsPagedResponse))]
GetPastInvitationsForFamilyPaged {
family_id: NodeFamilyId,
start_after: Option<PastFamilyInvitationCursor>,
limit: Option<u32>,
},
/// Page through every archived (terminal-state) invitation issued to a
/// given node.
#[cfg_attr(feature = "schema", returns(PastFamilyInvitationsForNodePagedResponse))]
GetPastInvitationsForNodePaged {
node_id: NodeId,
start_after: Option<PastFamilyInvitationForNodeCursor>,
limit: Option<u32>,
},
/// Page through every archived (terminal-state) invitation across all
/// families.
#[cfg_attr(feature = "schema", returns(AllPastFamilyInvitationsPagedResponse))]
GetAllPastInvitationsPaged {
start_after: Option<GlobalPastFamilyInvitationCursor>,
limit: Option<u32>,
},
/// Page through every archived membership record for a given family
/// (nodes that used to belong to it but have since been removed).
#[cfg_attr(feature = "schema", returns(PastFamilyMembersPagedResponse))]
GetPastMembersForFamilyPaged {
family_id: NodeFamilyId,
start_after: Option<PastFamilyMemberCursor>,
limit: Option<u32>,
},
/// Page through every archived membership record for a given node
/// (every family the node used to belong to but has since been removed
/// from), across all families.
#[cfg_attr(feature = "schema", returns(PastFamilyMembersForNodePagedResponse))]
GetPastMembersForNodePaged {
node_id: NodeId,
start_after: Option<PastFamilyMemberForNodeCursor>,
limit: Option<u32>,
},
}
/// Message passed to the contract's `migrate` entry point.
#[cw_serde]
pub struct MigrateMsg {
//
}
@@ -0,0 +1,403 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use cosmwasm_schema::cw_serde;
use cosmwasm_std::{Addr, Coin};
use nym_mixnet_contract_common::NodeId;
/// Identifier of a node family.
///
/// Issued sequentially by the contract on family creation; never reused even if the
/// family is later disbanded.
pub type NodeFamilyId = u32;
/// Runtime configuration of the node families contract.
#[cw_serde]
pub struct Config {
/// Fee charged on each successful `create_family` execution.
pub create_family_fee: Coin,
/// Maximum allowed length, in characters, of a family name.
pub family_name_length_limit: usize,
/// Maximum allowed length, in characters, of a family description.
pub family_description_length_limit: usize,
/// Default lifetime, in seconds, used by `invite_to_family` when the
/// sender doesn't supply an explicit value. Senders may override this
/// per-invitation via the optional `validity_secs` argument.
pub default_invitation_validity_secs: u64,
}
/// On-chain representation of a node family.
#[cw_serde]
pub struct NodeFamily {
/// The id of the node family
pub id: NodeFamilyId,
/// The name of the node family
pub name: String,
/// Normalised name of the node family used for uniqueness checks
pub normalised_name: String,
/// The optional description of the node family
pub description: String,
/// The owner of the node family
pub owner: Addr,
/// Records the fee paid when the family was created,
/// so that the appropriate amount could be returned upon it getting disbanded.
pub paid_fee: Coin,
/// Memoized value of the current number of members in the node family
/// Used to detect if the family is empty
pub members: u64,
/// Timestamp of the creation of the node family
pub created_at: u64,
}
/// A pending invitation for a node to join a particular family.
///
/// Invitations are stored until they are accepted, rejected, revoked, or until the
/// chain advances past `expires_at` (in which case they remain in storage but are
/// treated as inert — there is no background process clearing expired invitations).
#[cw_serde]
pub struct FamilyInvitation {
/// The family that issued the invitation.
pub family_id: NodeFamilyId,
/// The node being invited.
pub node_id: NodeId,
/// Block timestamp (unix seconds) after which the invitation is no longer valid.
pub expires_at: u64,
}
/// On-chain record of a node's current family membership.
///
/// A node belongs to at most one family at a time, so this is keyed by
/// `NodeId` alone — `family_id` is carried in the value to support reverse
/// lookups (all nodes in a given family) via a secondary index.
#[cw_serde]
pub struct FamilyMembership {
/// The family the node is currently a member of.
pub family_id: NodeFamilyId,
/// Block timestamp (unix seconds) at which the node accepted its
/// invitation and joined the family.
pub joined_at: u64,
}
/// Historical record of a node that used to be part of a family but has since been
/// removed (kicked, left voluntarily, or because the family was disbanded).
#[cw_serde]
pub struct PastFamilyMember {
/// The family the node used to belong to.
pub family_id: NodeFamilyId,
/// The node that was removed.
pub node_id: NodeId,
/// Block timestamp (unix seconds) at which the membership was terminated.
pub removed_at: u64,
}
/// Terminal status for an invitation that has been moved out of the pending set.
///
/// Note: timed-out invitations are not represented here — they are simply left in
/// the pending set (see `FamilyInvitation::expires_at`).
#[cw_serde]
pub enum FamilyInvitationStatus {
/// Still awaiting a response. Recorded with a timestamp for completeness even
/// though pending invitations live in a separate map.
Pending { at: u64 },
/// The invitee accepted and joined the family at the given timestamp.
Accepted { at: u64 },
/// The invitee explicitly rejected the invitation at the given timestamp.
Rejected { at: u64 },
/// The family revoked the invitation at the given timestamp before it could
/// be accepted or rejected.
Revoked { at: u64 },
}
/// Historical record of an invitation that has reached a terminal state
/// (`Accepted`, `Rejected`, or `Revoked`). Timed-out invitations are **not**
/// archived here — they remain in the pending map until explicitly cleared.
#[cw_serde]
pub struct PastFamilyInvitation {
/// The original invitation as it was issued.
pub invitation: FamilyInvitation,
/// What ultimately happened to it.
pub status: FamilyInvitationStatus,
}
/// Response to [`QueryMsg::GetFamilyById`](crate::QueryMsg::GetFamilyById).
#[cw_serde]
pub struct NodeFamilyResponse {
/// The id that was queried, echoed back so paginated callers can correlate.
pub family_id: NodeFamilyId,
/// The matching family, or `None` if no family with `family_id` exists.
pub family: Option<NodeFamily>,
}
/// Response to [`QueryMsg::GetFamilyByOwner`](crate::QueryMsg::GetFamilyByOwner).
#[cw_serde]
pub struct NodeFamilyByOwnerResponse {
/// The (validated) owner address that was queried, echoed back so callers
/// can correlate.
pub owner: Addr,
/// The matching family, or `None` if `owner` does not currently own one.
pub family: Option<NodeFamily>,
}
/// Response to [`QueryMsg::GetFamilyByName`](crate::QueryMsg::GetFamilyByName).
#[cw_serde]
pub struct NodeFamilyByNameResponse {
/// The name that was queried, echoed back so callers can correlate.
pub name: String,
/// The matching family, or `None` if no family with that name exists.
pub family: Option<NodeFamily>,
}
/// Response to [`QueryMsg::GetFamilyMembership`](crate::QueryMsg::GetFamilyMembership).
#[cw_serde]
pub struct NodeFamilyMembershipResponse {
/// The node that was queried.
pub node_id: NodeId,
/// The id of the family the node currently belongs to, or `None` if the
/// node is not currently a member of any family.
pub family_id: Option<NodeFamilyId>,
}
/// A pending [`FamilyInvitation`] paired with whether it has already timed
/// out at the time the query was served.
#[cw_serde]
pub struct PendingFamilyInvitationDetails {
/// The stored invitation as it was issued.
pub invitation: FamilyInvitation,
/// `true` iff `now >= invitation.expires_at` at query time, i.e. the
/// invitation is still in the pending map but can no longer be acted on.
pub expired: bool,
}
/// Response to [`QueryMsg::GetPendingInvitation`](crate::QueryMsg::GetPendingInvitation).
#[cw_serde]
pub struct PendingFamilyInvitationResponse {
/// The family component of the queried `(family_id, node_id)` key.
pub family_id: NodeFamilyId,
/// The node component of the queried `(family_id, node_id)` key.
pub node_id: NodeId,
/// The matching pending invitation along with an explicit expiry flag,
/// or `None` if no such invitation exists.
pub invitation: Option<PendingFamilyInvitationDetails>,
}
/// One entry in a [`FamilyMembersPagedResponse`] page — pairs a node id with
/// its [`FamilyMembership`] record (notably its `joined_at` timestamp).
#[cw_serde]
pub struct FamilyMemberRecord {
/// The node currently in the family.
pub node_id: NodeId,
/// The membership record (carries `family_id` and `joined_at`).
pub membership: FamilyMembership,
}
/// Response to [`QueryMsg::GetFamilyMembersPaged`](crate::QueryMsg::GetFamilyMembersPaged).
#[cw_serde]
pub struct FamilyMembersPagedResponse {
/// The family whose members were queried, echoed back so paginated
/// callers can correlate.
pub family_id: NodeFamilyId,
/// The members on this page, in ascending [`NodeId`] order.
pub members: Vec<FamilyMemberRecord>,
/// Cursor to pass as `start_after` on the next call, or `None` if this
/// page is empty (which the caller should treat as end-of-list).
pub start_next_after: Option<NodeId>,
}
/// Response to [`QueryMsg::GetAllFamilyMembersPaged`](crate::QueryMsg::GetAllFamilyMembersPaged).
#[cw_serde]
pub struct AllFamilyMembersPagedResponse {
/// The members on this page, in ascending [`NodeId`] order across every
/// family.
pub members: Vec<FamilyMemberRecord>,
/// Cursor (last `node_id`) to pass as `start_after` on the next call,
/// or `None` if this page is empty (treat as end-of-list).
pub start_next_after: Option<NodeId>,
}
/// Response to [`QueryMsg::GetPendingInvitationsForFamilyPaged`](crate::QueryMsg::GetPendingInvitationsForFamilyPaged).
#[cw_serde]
pub struct PendingFamilyInvitationsPagedResponse {
/// The family whose pending invitations were queried, echoed back so
/// paginated callers can correlate.
pub family_id: NodeFamilyId,
/// The pending invitations on this page, in ascending invitee
/// [`NodeId`] order, each stamped with whether it had already timed out
/// at the time the query was served.
pub invitations: Vec<PendingFamilyInvitationDetails>,
/// Cursor (last invitee node id) to pass as `start_after` on the next
/// call, or `None` if this page is empty (treat as end-of-list).
pub start_next_after: Option<NodeId>,
}
/// Response to [`QueryMsg::GetPendingInvitationsForNodePaged`](crate::QueryMsg::GetPendingInvitationsForNodePaged).
#[cw_serde]
pub struct PendingInvitationsForNodePagedResponse {
/// The node whose pending invitations were queried, echoed back so
/// paginated callers can correlate.
pub node_id: NodeId,
/// The pending invitations addressed to this node on this page, in
/// ascending [`NodeFamilyId`] order, each stamped with whether it had
/// already timed out at the time the query was served.
pub invitations: Vec<PendingFamilyInvitationDetails>,
/// Cursor (last issuing family id) to pass as `start_after` on the
/// next call, or `None` if this page is empty (treat as end-of-list).
pub start_next_after: Option<NodeFamilyId>,
}
/// Response to [`QueryMsg::GetAllPendingInvitationsPaged`](crate::QueryMsg::GetAllPendingInvitationsPaged).
#[cw_serde]
pub struct PendingInvitationsPagedResponse {
/// The pending invitations on this page, in ascending
/// `(family_id, node_id)` order, each stamped with whether it had
/// already timed out at the time the query was served.
pub invitations: Vec<PendingFamilyInvitationDetails>,
/// Cursor (last `(family_id, node_id)` pair) to pass as `start_after`
/// on the next call, or `None` if this page is empty (treat as
/// end-of-list).
pub start_next_after: Option<(NodeFamilyId, NodeId)>,
}
/// Cursor for paginating per-family past-invitation listings: identifies a
/// single archive entry within a family by `(node_id, counter)`. The
/// `counter` is the per-`(family, node)` archive slot — multiple archived
/// invitations can exist for the same `(family, node)` pair (a node may be
/// invited and have the invitation reach a terminal state more than once).
pub type PastFamilyInvitationCursor = (NodeId, u64);
/// Cursor for paginating per-node past-invitation listings: identifies a
/// single archive entry addressed to a fixed node by `(family_id, counter)`.
pub type PastFamilyInvitationForNodeCursor = (NodeFamilyId, u64);
/// Cursor for paginating global past-invitation listings: identifies a
/// single archive entry across all families by `((family_id, node_id), counter)`.
pub type GlobalPastFamilyInvitationCursor = ((NodeFamilyId, NodeId), u64);
/// Response to [`QueryMsg::GetPastInvitationsForFamilyPaged`](crate::QueryMsg::GetPastInvitationsForFamilyPaged).
#[cw_serde]
pub struct PastFamilyInvitationsPagedResponse {
/// The family whose archived invitations were queried, echoed back so
/// paginated callers can correlate.
pub family_id: NodeFamilyId,
/// The archived invitations on this page, in ascending
/// `(node_id, counter)` order across all terminal statuses.
pub invitations: Vec<PastFamilyInvitation>,
/// Cursor to pass as `start_after` on the next call, or `None` if this
/// page is empty (treat as end-of-list).
pub start_next_after: Option<PastFamilyInvitationCursor>,
}
/// Response to [`QueryMsg::GetPastInvitationsForNodePaged`](crate::QueryMsg::GetPastInvitationsForNodePaged).
#[cw_serde]
pub struct PastFamilyInvitationsForNodePagedResponse {
/// The node whose past invitations were queried, echoed back so
/// paginated callers can correlate.
pub node_id: NodeId,
/// The archived invitations addressed to this node on this page, in
/// ascending `(family_id, counter)` order across all terminal statuses.
pub invitations: Vec<PastFamilyInvitation>,
/// Cursor to pass as `start_after` on the next call, or `None` if this
/// page is empty (treat as end-of-list).
pub start_next_after: Option<PastFamilyInvitationForNodeCursor>,
}
/// Response to [`QueryMsg::GetAllPastInvitationsPaged`](crate::QueryMsg::GetAllPastInvitationsPaged).
#[cw_serde]
pub struct AllPastFamilyInvitationsPagedResponse {
/// The archived invitations on this page, in ascending
/// `((family_id, node_id), counter)` order across all terminal statuses.
pub invitations: Vec<PastFamilyInvitation>,
/// Cursor to pass as `start_after` on the next call, or `None` if this
/// page is empty (treat as end-of-list).
pub start_next_after: Option<GlobalPastFamilyInvitationCursor>,
}
/// Cursor for paginating per-family past-member listings: identifies a single
/// archive entry within a family by `(node_id, counter)`. The `counter` is the
/// per-`(family, node)` archive slot — multiple archived membership entries
/// can exist for the same `(family, node)` pair (a node may join, leave, and
/// re-join the same family more than once).
pub type PastFamilyMemberCursor = (NodeId, u64);
/// Cursor for paginating per-node past-member listings: identifies a single
/// archive entry for a fixed node by `(family_id, counter)`.
pub type PastFamilyMemberForNodeCursor = (NodeFamilyId, u64);
/// Response to [`QueryMsg::GetPastMembersForFamilyPaged`](crate::QueryMsg::GetPastMembersForFamilyPaged).
#[cw_serde]
pub struct PastFamilyMembersPagedResponse {
/// The family whose archived memberships were queried, echoed back so
/// paginated callers can correlate.
pub family_id: NodeFamilyId,
/// The archived membership records on this page, in ascending
/// `(node_id, counter)` order.
pub members: Vec<PastFamilyMember>,
/// Cursor to pass as `start_after` on the next call, or `None` if this
/// page is empty (treat as end-of-list).
pub start_next_after: Option<PastFamilyMemberCursor>,
}
/// Response to [`QueryMsg::GetPastMembersForNodePaged`](crate::QueryMsg::GetPastMembersForNodePaged).
#[cw_serde]
pub struct PastFamilyMembersForNodePagedResponse {
/// The node whose archived memberships were queried, echoed back so
/// paginated callers can correlate.
pub node_id: NodeId,
/// The archived membership records for this node on this page, in
/// ascending `(family_id, counter)` order.
pub members: Vec<PastFamilyMember>,
/// Cursor to pass as `start_after` on the next call, or `None` if this
/// page is empty (treat as end-of-list).
pub start_next_after: Option<PastFamilyMemberForNodeCursor>,
}
/// Response to [`QueryMsg::GetFamiliesPaged`](crate::QueryMsg::GetFamiliesPaged).
#[cw_serde]
pub struct FamiliesPagedResponse {
/// The families on this page, in ascending [`NodeFamilyId`] order.
pub families: Vec<NodeFamily>,
/// Cursor to pass as `start_after` on the next call, or `None` if this
/// page is empty (which the caller should treat as end-of-list).
pub start_next_after: Option<NodeFamilyId>,
}
+3 -1
View File
@@ -8,7 +8,9 @@ license = { workspace = true }
repository = { workspace = true }
homepage.workspace = true
documentation.workspace = true
rust-version.workspace = true
# pinned (not inherited from workspace) because this crate is imported by the ecash contract,
# and the contracts workspace cannot be built with rustc more recent than 1.86
rust-version = "1.86.0"
readme.workspace = true
publish = true
@@ -1,7 +1,21 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::PublicKey;
use super::{PrivateKey, PublicKey};
pub mod bs58_x25519_private_key {
use super::*;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S: Serializer>(key: &PrivateKey, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&key.to_base58_string())
}
pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<PrivateKey, D::Error> {
let s = String::deserialize(deserializer)?;
PrivateKey::from_base58_string(s).map_err(serde::de::Error::custom)
}
}
pub mod bs58_x25519_pubkey {
use super::*;
+46 -8
View File
@@ -1173,7 +1173,16 @@ impl ApiClientCore for Client {
};
match response {
Ok(resp) => return Ok(resp),
Ok(resp) => {
// Check if the response includes a rate limit error from the vercel API
if is_http_rate_limit_err(&resp) {
warn!("encountered vercel rate limit error for {}", url.as_str());
// if we have multiple urls, update to the next
self.maybe_rotate_hosts(Some(url.clone()));
}
return Ok(resp);
}
Err(err) => {
#[cfg(target_arch = "wasm32")]
let is_network_err = err.is_timeout();
@@ -1226,17 +1235,39 @@ impl ApiClientCore for Client {
}
}
const VERCEL_CHALLENGE_HEADER: &str = "x-vercel-mitigated";
const VERCEL_CHALLENGE_VALUE: &[u8] = b"challenge";
/// Check for Rate Limit challenge response from the vercel API
pub(crate) fn is_http_rate_limit_err(resp: &Response) -> bool {
let status = resp.status() == StatusCode::FORBIDDEN;
let header = resp
.headers()
.get(VERCEL_CHALLENGE_HEADER)
.is_some_and(|v| v.as_bytes() == VERCEL_CHALLENGE_VALUE);
let content_type = resp
.headers()
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<Mime>().ok())
.is_some_and(|mime_type| {
mime_type.type_() == mime::TEXT && mime_type.subtype() == mime::HTML
});
status && header && content_type
}
#[cfg(not(target_arch = "wasm32"))]
const MAX_ERR_SOURCE_ITERATIONS: usize = 4;
/// This functions attempts to check the error returned by reqwest to see if
/// rotating host informtion (for clients with mutliple hosts defined) could be
/// helpful. This looks for situations where the error could plausibly be caused
/// by a network adversary, or where rotating to an equival hostname might help.
/// This functions attempts to check the error returned by reqwest to see if rotating host
/// information (for clients with multiple hosts defined) could be helpful. This looks for
/// situations where the error could plausibly be caused by a network adversary, or where rotating
/// to an equivalent hostname might help.
///
/// For example --> NetworkUnreachable will not be helped by rotating domains,
/// but ConnectionReset might be caused by a network adversary blocking by SNI
/// which could possibly benefit from rotating domains.
/// For example --> NetworkUnreachable will not be helped by rotating domains, but ConnectionReset
/// might be caused by a network adversary blocking by SNI which could possibly benefit from
/// rotating domains.
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn might_be_network_interference(err: &reqwest::Error) -> bool {
if err.is_timeout() {
@@ -1697,6 +1728,13 @@ where
decode_raw_response(&headers, full)
} else if res.status() == StatusCode::NOT_FOUND {
Err(HttpClientError::NotFound { url: Box::new(url) })
} else if is_http_rate_limit_err(&res) {
Err(HttpClientError::EndpointFailure {
url: Box::new(url),
status,
headers: Box::new(headers),
error: String::from("received vercel rate limit challenge response"),
})
} else {
let Ok(plaintext) = res.text().await else {
return Err(HttpClientError::RequestFailure {
@@ -129,6 +129,41 @@ where
}
}
#[derive(Default, Debug, Serialize, Deserialize, Copy, Clone)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[serde(rename_all = "lowercase")]
pub enum OutputV2 {
#[default]
Json,
Yaml,
}
#[derive(Default, Debug, Serialize, Deserialize, Copy, Clone)]
#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams, utoipa::ToSchema))]
#[serde(default)]
pub struct OutputParamsV2 {
pub output: Option<OutputV2>,
}
impl OutputParamsV2 {
pub fn get_output(&self) -> OutputV2 {
self.output.unwrap_or_default()
}
pub fn to_response<T: Serialize>(self, data: T) -> FormattedResponse<T> {
self.get_output().to_response(data)
}
}
impl OutputV2 {
pub fn to_response<T: Serialize>(self, data: T) -> FormattedResponse<T> {
match self {
OutputV2::Json => FormattedResponse::Json(Json::from(data)),
OutputV2::Yaml => FormattedResponse::Yaml(Yaml::from(data)),
}
}
}
#[derive(Default, Debug, Serialize, Deserialize, Copy, Clone)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[serde(rename_all = "lowercase")]
+1
View File
@@ -32,6 +32,7 @@ mod tests {
fn stream_transport_threshold_is_consistent() {
assert_eq!(MAX_NON_STREAM_VERSION, 8);
assert_eq!(SPHINX_STREAM_VERSION_THRESHOLD, 9);
const _: () = assert!(SPHINX_STREAM_VERSION_THRESHOLD > MAX_NON_STREAM_VERSION);
}
}
+3 -1
View File
@@ -8,7 +8,9 @@ license.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
rust-version.workspace = true
# pinned (not inherited from workspace) because this crate is imported by the ecash contract,
# and the contracts workspace cannot be built with rustc more recent than 1.86
rust-version = "1.86.0"
readme.workspace = true
publish = true
# Exclude build.rs from published crate - it's only used for dev-time sync
+5 -5
View File
@@ -22,6 +22,10 @@ pub const VESTING_CONTRACT_ADDRESS: &str =
pub const PERFORMANCE_CONTRACT_ADDRESS: &str = "";
// /\ TODO: this has to be updated once the contract is deployed
// \/ TODO: this has to be updated once the contract is deployed
pub const NODE_FAMILIES_CONTRACT_ADDRESS: &str = "";
// /\ TODO: this has to be updated once the contract is deployed
pub const ECASH_CONTRACT_ADDRESS: &str =
"n1r7s6aksyc6pqardx88k3rkgfagwvj4z4zum9mmz2sfk3zm2mha0sd4dnun";
pub const GROUP_CONTRACT_ADDRESS: &str =
@@ -43,10 +47,6 @@ pub const NYM_APIS: &[ApiUrlConst] = &[
url: NYM_API,
front_hosts: None,
},
ApiUrlConst {
url: "https://nym-frontdoor.vercel.app/api/",
front_hosts: Some(&["vercel.app", "vercel.com"]),
},
ApiUrlConst {
url: "https://nym-frontdoor.global.ssl.fastly.net/api/",
front_hosts: Some(&["yelp.global.ssl.fastly.net"]),
@@ -68,7 +68,7 @@ pub const UPGRADE_MODE_ATTESTER_ED25519_BS58_PUBKEY: &str =
pub const NYM_VPN_APIS: &[ApiUrlConst] = &[
ApiUrlConst {
url: NYM_VPN_API,
front_hosts: Some(&["vercel.app", "vercel.com"]),
front_hosts: None,
},
ApiUrlConst {
url: "https://nymvpn-frontdoor.global.ssl.fastly.net/api/",
+15
View File
@@ -39,6 +39,8 @@ pub struct NymContracts {
pub vesting_contract_address: Option<String>,
#[serde(default)]
pub performance_contract_address: Option<String>,
#[serde(default)]
pub node_families_contract_address: Option<String>,
pub ecash_contract_address: Option<String>,
pub group_contract_address: Option<String>,
pub multisig_contract_address: Option<String>,
@@ -174,6 +176,9 @@ impl NymNetworkDetails {
))
.with_mixnet_contract(get_optional_env(var_names::MIXNET_CONTRACT_ADDRESS))
.with_vesting_contract(get_optional_env(var_names::VESTING_CONTRACT_ADDRESS))
.with_node_families_contract(get_optional_env(
var_names::NODE_FAMILIES_CONTRACT_ADDRESS,
))
.with_ecash_contract(get_optional_env(var_names::ECASH_CONTRACT_ADDRESS))
.with_group_contract(get_optional_env(var_names::GROUP_CONTRACT_ADDRESS))
.with_multisig_contract(get_optional_env(var_names::MULTISIG_CONTRACT_ADDRESS))
@@ -199,6 +204,9 @@ impl NymNetworkDetails {
performance_contract_address: parse_optional_str(
mainnet::PERFORMANCE_CONTRACT_ADDRESS,
),
node_families_contract_address: parse_optional_str(
mainnet::NODE_FAMILIES_CONTRACT_ADDRESS,
),
ecash_contract_address: parse_optional_str(mainnet::ECASH_CONTRACT_ADDRESS),
group_contract_address: parse_optional_str(mainnet::GROUP_CONTRACT_ADDRESS),
multisig_contract_address: parse_optional_str(mainnet::MULTISIG_CONTRACT_ADDRESS),
@@ -252,6 +260,7 @@ impl NymNetworkDetails {
set_optional_var(var_names::MIXNET_CONTRACT_ADDRESS, self.contracts.mixnet_contract_address);
set_optional_var(var_names::VESTING_CONTRACT_ADDRESS, self.contracts.vesting_contract_address);
set_optional_var(var_names::NODE_FAMILIES_CONTRACT_ADDRESS, self.contracts.node_families_contract_address);
set_optional_var(var_names::ECASH_CONTRACT_ADDRESS, self.contracts.ecash_contract_address);
set_optional_var(var_names::GROUP_CONTRACT_ADDRESS, self.contracts.group_contract_address);
set_optional_var(var_names::MULTISIG_CONTRACT_ADDRESS, self.contracts.multisig_contract_address);
@@ -340,6 +349,12 @@ impl NymNetworkDetails {
self
}
#[must_use]
pub fn with_node_families_contract<S: Into<String>>(mut self, contract: Option<S>) -> Self {
self.contracts.node_families_contract_address = contract.map(Into::into);
self
}
#[must_use]
pub fn with_ecash_contract<S: Into<String>>(mut self, contract: Option<S>) -> Self {
self.contracts.ecash_contract_address = contract.map(Into::into);
+1
View File
@@ -17,6 +17,7 @@ pub const VESTING_CONTRACT_ADDRESS: &str = "VESTING_CONTRACT_ADDRESS";
pub const ECASH_CONTRACT_ADDRESS: &str = "ECASH_CONTRACT_ADDRESS";
pub const GROUP_CONTRACT_ADDRESS: &str = "GROUP_CONTRACT_ADDRESS";
pub const MULTISIG_CONTRACT_ADDRESS: &str = "MULTISIG_CONTRACT_ADDRESS";
pub const NODE_FAMILIES_CONTRACT_ADDRESS: &str = "NODE_FAMILIES_CONTRACT_ADDRESS";
pub const COCONUT_DKG_CONTRACT_ADDRESS: &str = "COCONUT_DKG_CONTRACT_ADDRESS";
pub const REWARDING_VALIDATOR_ADDRESS: &str = "REWARDING_VALIDATOR_ADDRESS";
pub const NYXD: &str = "NYXD";
+1 -1
View File
@@ -1,7 +1,7 @@
[package]
name = "nym-kkt"
description = "Key transport protocol for the Nym network"
version = "0.1.0"
version = "1.21.0"
authors = ["Georgio Nicolas <georgio@nymtech.net>"]
edition = { workspace = true }
license.workspace = true
+31
View File
@@ -0,0 +1,31 @@
[package]
name = "nym-lp-data"
description = "Lewes Protocol data structure for the Nym network"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes.workspace = true
dashmap.workspace = true
num_enum.workspace = true
tracing.workspace = true
thiserror.workspace = true
rand.workspace = true
nym-common.workspace = true
[dev-dependencies]
nym-lp.workspace = true
[lints]
workspace = true
+103
View File
@@ -0,0 +1,103 @@
# nym-lp-data
Trait definitions and data structures for Lewes Protocol (LP) processing pipelines in the Nym mixnet.
This crate is a *vocabulary* crate — it defines the traits that clients and mix nodes implement to compose a packet-processing pipeline, plus a few generic data wrappers (`TimedData`, `AddressedTimedData`, `PipelineData`) that thread per-packet state through every stage. It contains no concrete cryptography, transport, or network code. A concrete implementation live in [`nym-mix-sim`](../../nym-mix-sim).
## Crate layout
| Module | Purpose |
|--------|---------|
| [`common`](src/common) | Wire-layer traits ([`Framing`], [`FramingUnwrap`], [`Transport`], [`TransportUnwrap`]) and their composed supertraits ([`WireWrappingPipeline`], [`WireUnwrappingPipeline`]) shared by both clients and mixnodes, plus [`NoOpWireWrapper`] / [`NoOpWireUnwrapper`] marker traits for opting into a pass-through wire layer |
| [`clients`](src/clients) | Client-side outbound/inbound pipeline traits: [`Chunking`], [`Reliability`], [`Obfuscation`], [`RoutingSecurity`], plus the supertraits [`ClientWrappingPipeline`] / [`ClientUnwrappingPipeline`], a `Pipeline` composition struct, no-op marker traits, and a tick-driven [`ClientWrappingPipelineDriver`] |
| [`mixnodes`](src/mixnodes) | Mixnode processing trait [`NymNodeProcessingPipeline`] (unwrap → mix → re-wrap) and a `Pipeline` composition struct |
[`Framing`]: src/common/traits.rs
[`FramingUnwrap`]: src/common/traits.rs
[`Transport`]: src/common/traits.rs
[`TransportUnwrap`]: src/common/traits.rs
[`WireWrappingPipeline`]: src/common/traits.rs
[`WireUnwrappingPipeline`]: src/common/traits.rs
[`NoOpWireWrapper`]: src/common/helpers.rs
[`NoOpWireUnwrapper`]: src/common/helpers.rs
[`Chunking`]: src/clients/traits.rs
[`Reliability`]: src/clients/traits.rs
[`Obfuscation`]: src/clients/traits.rs
[`RoutingSecurity`]: src/clients/traits.rs
[`ClientWrappingPipeline`]: src/clients/traits.rs
[`ClientUnwrappingPipeline`]: src/clients/traits.rs
[`ClientWrappingPipelineDriver`]: src/clients/driver.rs
[`NymNodeProcessingPipeline`]: src/mixnodes/traits.rs
## Core data types
```text
TimedData<Ts, D> ── pairs a value of type D with a timestamp Ts
TimedPayload<Ts> ── alias for TimedData<Ts, Vec<u8>>
AddressedTimedData<Ts, D, NdId> ── TimedData plus a destination address
AddressedTimedPayload<Ts, NdId> ── alias for AddressedTimedData<Ts, Vec<u8>, NdId>
PipelineData<Ts, D, Opts, NdId> ── TimedData plus per-message Opts
(used inside the client wrapping pipeline)
PipelinePayload<Ts, Opts, NdId> ── alias for PipelineData<Ts, Vec<u8>, Opts, NdId>
```
`Ts` is the timestamp / tick-context type, `NdId` is the next-hop identifier type, and `Opts` is an [`InputOptions`](src/clients/mod.rs)-implementing per-message marker that toggles which optional pipeline stages run for a given payload (reliability, obfuscation, routing security).
## Client wrapping pipeline
The outbound client pipeline composes six stages, each represented by its own trait:
```text
Vec<u8> ──▶ Chunking ──▶ Reliability ──▶ Obfuscation
AddressedTimedData<Ts, Pkt, NdId> ◀── Transport ◀── Framing ◀── RoutingSecurity
```
[`ClientWrappingPipeline`] is the supertrait that ties them together and provides a default `process()` method which runs all six stages in order on every tick. Each stage is opt-in per message via the active [`InputOptions`].
### Pipeline tick semantics
`process()` is intended to be called on every tick (with or without an input payload):
- [`Reliability::reliable_encode`] is always called once with `Some(input)` (when present), then once more with `None` so that timer-driven retransmissions can fire even when no new payload arrived.
- [`Obfuscation::obfuscate`] follows the same pattern — once with the real input and once with `None` so that cover-traffic loops can fire on idle ticks.
- [`Chunking`] and [`RoutingSecurity`] only run when a payload is actually present.
This convention is what allows pipelines to support Poisson cover traffic and SURB-ACK retransmission without the caller having to know whether anything is in flight.
## Mixnode processing pipeline
The mixnode pipeline is simpler — three stages that consume a packet and emit zero or more re-wrapped output packets:
```text
Pkt ──▶ WireUnwrappingPipeline ──▶ mix ──▶ WireWrappingPipeline ──▶ Vec<AddressedTimedData<Ts, Pkt, NdId>>
(TransportUnwrap + ▲ (Framing + Transport)
FramingUnwrap) │
└── implementor decrypts, routes,
schedules delays, etc.
```
Implementors fill in `mix()`; everything else is provided by the [`NymNodeProcessingPipeline`] supertrait's default `process()`.
## Helpers
- **Client-stage no-op marker traits** ([`NoOpReliability`], [`NoOpRoutingSecurity`], [`NoOpObfuscation`] in [`clients/helpers.rs`](src/clients/helpers.rs)) — implement these to opt out of a pipeline stage with zero overhead. Useful for stub or testing pipelines.
- **Wire-layer no-op marker traits** ([`NoOpWireWrapper`], [`NoOpWireUnwrapper`] in [`common/helpers.rs`](src/common/helpers.rs)) — collapse the entire wire layer (framing + transport, or their inverses) to a pass-through. Use these when your packet type is already self-contained on the wire (e.g. a Sphinx packet) and needs no extra framing or transport header. `NoOpWireWrapper` requires `Pkt: From<Vec<u8>>`; `NoOpWireUnwrapper` requires `Pkt: Into<Vec<u8>>` and `Mk: Default`.
- **`Pipeline` composition structs** (in [`clients/types.rs`](src/clients/types.rs)) — generic structs that aggregate one component per pipeline stage and provide blanket impls of the relevant supertraits, so you can build a working pipeline by plugging in any combination of stage implementations.
- **[`ClientWrappingPipelineDriver`](src/clients/driver.rs)** — wraps a dyn-compatible client pipeline behind a tick-driven `tick(timestamp) -> Vec<(Pkt, NdId)>` interface, with an internal mpsc channel for application-supplied input payloads. Reads new input only when the internal buffer is empty so buffered packets do not stack additional latency on top.
[`NoOpReliability`]: src/clients/helpers.rs
[`NoOpRoutingSecurity`]: src/clients/helpers.rs
[`NoOpObfuscation`]: src/clients/helpers.rs
[`InputOptions`]: src/clients/mod.rs
[`Reliability::reliable_encode`]: src/clients/traits.rs
[`Obfuscation::obfuscate`]: src/clients/traits.rs
## Example users
[`nym-mix-sim`](../../nym-mix-sim) is the reference consumer: it ships two complete pipeline implementations (a pass-through `Simple*` family and a full Sphinx + Poisson + SURB-ACK family) on top of the traits defined here. See its source for end-to-end examples of implementing each pipeline stage.
The integration test under [`tests/integration`](tests/integration) wires together a small synthetic pipeline (`MockChunking`, `KcpReliability`, `SphinxSecurity`, `KekwObfuscation`, `LpFraming`, `LpTransport`) against the [`nym-lp`](../nym-lp) packet types — a useful starting point if you want to read a self-contained example of every trait being implemented.
+85
View File
@@ -0,0 +1,85 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::net::SocketAddr;
use std::sync::mpsc;
use std::time::Instant;
use crate::AddressedTimedData;
use crate::clients::traits::DynClientWrappingPipeline;
/// Drives a [`DynClientWrappingPipeline`] tick-by-tick, feeding it raw application
/// payloads and emitting transport packets whose scheduled timestamp is due.
///
/// ## How it works
///
/// 1. The caller submits raw byte payloads via [`ClientWrappingPipelineDriver::input_sender`].
/// 2. On each call to [`ClientWrappingPipelineDriver::tick`], the driver reads one pending
/// payload (only when both the packet buffer and the obfuscation buffer are
/// empty, to avoid adding extra latency on top of buffered data), runs it
/// through the pipeline, and appends the resulting timestamped packets to an
/// internal buffer.
/// 3. Packets whose `timestamp ≤ now` are extracted from the buffer and
/// returned to the caller for sending.
///
/// Timestamps are [`Instant`]s, compared with `≤` to decide which packets are due.
///
pub struct ClientWrappingPipelineDriver<Pkt, Opts> {
pipeline: Box<dyn DynClientWrappingPipeline<Pkt, Opts>>,
packet_buffer: Vec<AddressedTimedData<Pkt>>,
input: mpsc::Receiver<(Vec<u8>, Opts, SocketAddr)>,
// Keeping a ref so we don't have problem about it being dropped
input_sender: mpsc::SyncSender<(Vec<u8>, Opts, SocketAddr)>,
}
impl<Pkt, Opts> ClientWrappingPipelineDriver<Pkt, Opts> {
/// Create a new driver wrapping `pipeline`.
///
/// Internally allocates a zero-capacity `sync_channel` for input payloads.
pub fn new(pipeline: impl DynClientWrappingPipeline<Pkt, Opts> + 'static) -> Self {
let (input_sender, input_receiver) = mpsc::sync_channel(0);
Self {
pipeline: Box::new(pipeline),
packet_buffer: Vec::new(),
input: input_receiver,
input_sender,
}
}
/// Return a clone of the sender half of the input channel.
///
/// Send raw application payloads here; they will be picked up on the next
/// tick when the pipeline's internal buffers are empty.
pub fn input_sender(&self) -> mpsc::SyncSender<(Vec<u8>, Opts, SocketAddr)> {
self.input_sender.clone()
}
/// Advance the driver by one tick.
///
/// Reads a pending input payload (if both the packet buffer and the
/// obfuscation buffer are empty), runs it through the pipeline, then
/// returns all packets whose `timestamp ≤ now`.
pub fn tick(&mut self, timestamp: Instant) -> Vec<(Pkt, SocketAddr)> {
// We're reading a message only if our buffer is empty
// Otherwise, we will have buffers adding latencies to data
let next_message = if self.packet_buffer.is_empty() {
self.input
.try_recv()
.inspect_err(|_| tracing::trace!("No message in the queue"))
.ok()
} else {
None
};
self.packet_buffer
.extend(self.pipeline.process(next_message, timestamp));
self.packet_buffer
.extract_if(.., |p| p.data.timestamp <= timestamp)
.map(|pkt| (pkt.data.data, pkt.dst))
.collect()
}
}
+68
View File
@@ -0,0 +1,68 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::time::Instant;
use crate::PipelinePayload;
use crate::clients::traits::{Obfuscation, Reliability, RoutingSecurity};
/// Marker trait for a no-op [`Reliability`] implementation.
///
/// Implement this for your pipeline type to get a [`Reliability`] impl that
/// passes the payload through unchanged with zero byte overhead.
pub trait NoOpReliability {}
impl<T, Opts> Reliability<Opts> for T
where
T: NoOpReliability,
{
const OVERHEAD_SIZE: usize = 0;
fn reliable_encode(
&mut self,
input: Option<PipelinePayload<Opts>>,
_: Instant,
) -> Vec<PipelinePayload<Opts>> {
input.map(|payload| vec![payload]).unwrap_or_default()
}
}
/// Marker trait for a no-op [`RoutingSecurity`] implementation.
///
/// Implement this for your pipeline type to get a [`RoutingSecurity`] impl that
/// passes the payload through unchanged with zero byte overhead and `nb_frames() == 1`.
pub trait NoOpRoutingSecurity {}
impl<T, Opts> RoutingSecurity<Opts> for T
where
T: NoOpRoutingSecurity,
{
const OVERHEAD_SIZE: usize = 0;
fn nb_frames(&self) -> usize {
1
}
fn encrypt(&mut self, input: PipelinePayload<Opts>) -> PipelinePayload<Opts> {
input
}
}
/// Marker trait for a no-op [`Obfuscation`] implementation.
///
/// Implement this for your pipeline type to get an [`Obfuscation`] impl that
/// passes the input through unchanged with no cover traffic, delay, or
/// buffering.
pub trait NoOpObfuscation {}
impl<T, Opts> Obfuscation<Opts> for T
where
T: NoOpObfuscation,
{
fn obfuscate(
&mut self,
input: Option<PipelinePayload<Opts>>,
_: Instant,
) -> Vec<PipelinePayload<Opts>> {
input.map(|payload| vec![payload]).unwrap_or_default()
}
}
+7
View File
@@ -0,0 +1,7 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod driver;
pub mod helpers;
pub mod traits;
pub mod types;
+250
View File
@@ -0,0 +1,250 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::net::SocketAddr;
use std::time::Instant;
use crate::PipelinePayload;
use crate::common::traits::{WireUnwrappingPipeline, WireWrappingPipeline};
use crate::{AddressedTimedData, TimedPayload};
/// Trait for splitting an incoming payload into timestamped chunks.
///
/// # Type Parameters
/// - `Opts`: Opaque per-message metadata carried by each produced [`PipelinePayload`].
///
/// # Required Methods
/// - `chunked`: Split `input` (a [`PipelinePayload`] carrying the raw bytes,
/// per-message options, and destination) into chunks of at most `chunk_size`
/// bytes. Each output [`PipelinePayload`] inherits the input's options and
/// destination and is stamped with `timestamp`, ready to be fed through the
/// rest of the pipeline.
pub trait Chunking<Opts> {
fn chunked(
&mut self,
input: PipelinePayload<Opts>,
chunk_size: usize,
timestamp: Instant,
) -> Vec<PipelinePayload<Opts>>;
}
/// Trait for applying reliability encoding (e.g. SURB ACKs, retransmissions) to
/// a timed payload.
///
/// # Type Parameters
/// - `Opts`: Opaque per-message metadata carried by the [`PipelinePayload`].
///
/// # Associated Constants
/// - `OVERHEAD_SIZE`: Number of additional bytes added by the reliability scheme.
///
/// # Required Methods
/// - `reliable_encode`: Encode `input` with the reliability mechanism. When
/// `input` is `None`, the method is still called every tick so the layer can
/// emit pending retransmissions or scheduled control packets.
pub trait Reliability<Opts> {
const OVERHEAD_SIZE: usize;
fn reliable_encode(
&mut self,
input: Option<PipelinePayload<Opts>>,
timestamp: Instant,
) -> Vec<PipelinePayload<Opts>>;
}
/// Trait for applying obfuscation (cover traffic, traffic shaping) to a timed payload.
///
/// When obfuscation is enabled, `obfuscate` must be called on every tick — not
/// only on ticks that carry input — so the layer can produce cover traffic on
/// schedule even when the application has nothing to send.
///
/// # Type Parameters
/// - `Opts`: Opaque per-message metadata carried by the [`PipelinePayload`].
pub trait Obfuscation<Opts> {
/// Obfuscate `input` at the given `timestamp`.
///
/// # Parameters
/// - `input`: Payload to obfuscate, or `None` when the pipeline is ticking
/// with no real message available.
/// - `timestamp`: Current timestamp.
///
/// # Returns
/// A `Vec` of obfuscated payloads, possibly empty when no packet is due to be
/// emitted at this tick.
fn obfuscate(
&mut self,
input: Option<PipelinePayload<Opts>>,
timestamp: Instant,
) -> Vec<PipelinePayload<Opts>>;
}
/// Trait for applying routing-security encryption (e.g. Sphinx) to a timed payload.
///
/// # Type Parameters
/// - `Opts`: Opaque per-message metadata carried by the [`PipelinePayload`].
///
/// # Associated Constants
/// - `OVERHEAD_SIZE`: Number of additional bytes added by the encryption scheme.
///
/// # Required Methods
/// - `encrypt`: Encrypt the given payload, returning a new [`PipelinePayload`].
///
/// # Provided Methods
/// - `nb_frames`: Number of transport frames that one encrypted payload expands
/// into; defaults to `1`. Override when the encryption scheme (e.g. Sphinx)
/// produces multiple frames per input chunk.
pub trait RoutingSecurity<Opts> {
const OVERHEAD_SIZE: usize;
fn nb_frames(&self) -> usize;
fn encrypt(&mut self, input: PipelinePayload<Opts>) -> PipelinePayload<Opts>;
}
/// Full client-side outbound message pipeline.
///
/// Composes all six processing stages — [`Chunking`], [`Reliability`],
/// [`Obfuscation`], [`RoutingSecurity`], and the shared [`WireWrappingPipeline`]
/// (framing + transport) — into a single `process` call that takes a raw byte
/// payload and returns a list of timestamped transport packets ready for sending.
///
/// Every stage runs unconditionally; a pipeline that does not want a given stage
/// composes a no-op implementation for it (see the `NoOp*` marker traits), whose
/// `OVERHEAD_SIZE` is `0`.
///
/// # Type Parameters
/// - `Pkt`: Final transport packet type produced by transport.
/// - `Opts`: Opaque per-message metadata threaded through the pipeline.
///
/// # Provided Methods
/// - `chunk_size`: Derived from `frame_size` (via [`WireWrappingPipeline`]) minus
/// routing-security and reliability overheads, accounting for `nb_frames` expansion.
/// - `process`: Runs the full pipeline in order:
/// chunk → reliability encode → obfuscate → encrypt → frame → transport.
pub trait ClientWrappingPipeline<Pkt, Opts>:
Chunking<Opts>
+ Reliability<Opts>
+ Obfuscation<Opts>
+ RoutingSecurity<Opts>
+ WireWrappingPipeline<Pkt, Opts>
{
fn chunk_size(&self) -> usize {
// Frame size comes from WireWrappingPipeline
// SAFETY : While this CAN technically fail, it means that something is wrong in the code and it's pointless to continue anyway
#[allow(clippy::expect_used)]
(self.frame_size() * self.nb_frames())
.checked_sub(<Self as RoutingSecurity<_>>::OVERHEAD_SIZE)
.expect("not enough room in a packet for routing security overhead")
.checked_sub(<Self as Reliability<_>>::OVERHEAD_SIZE)
.expect("not enough room in a packet for reliability overhead")
}
fn process(
&mut self,
input: Option<(Vec<u8>, Opts, SocketAddr)>, // Optional to be able to tick the pipeline without input
timestamp: Instant,
) -> Vec<AddressedTimedData<Pkt>> {
let chunk_size = self.chunk_size();
let mut chunks = if let Some((input_data, input_options, next_hop)) = input {
let input_payload =
PipelinePayload::new(timestamp, input_data, input_options, next_hop);
self.chunked(input_payload, chunk_size, timestamp)
} else {
Vec::new()
};
// Reliability stage
chunks = if chunks.is_empty() {
// Even if we had nothing go into the reliability stage, we need to catch potential retransmissions
self.reliable_encode(None, timestamp)
} else {
chunks
.into_iter()
.flat_map(|chunk| self.reliable_encode(Some(chunk), timestamp))
.collect()
};
// Obfuscation stage
chunks = if chunks.is_empty() {
// Even if we had nothing go into the obfuscation stage, we need to catch potential cover traffic
self.obfuscate(None, timestamp)
} else {
chunks
.into_iter()
.flat_map(|chunk| self.obfuscate(Some(chunk), timestamp))
.collect()
};
// Routing-security stage
chunks = chunks
.into_iter()
.map(|chunk| self.encrypt(chunk))
.collect();
chunks
.into_iter()
.flat_map(|payload| self.wire_wrap(payload))
.collect::<Vec<_>>()
}
}
/// Dyn-compatible mirror of [`ClientWrappingPipeline`].
///
/// All associated constants from the sub-traits are exposed as methods so the
/// trait can be used as `dyn DynClientWrappingPipeline<Pkt, Opts>`, erasing the
/// concrete pipeline type while keeping `Pkt` and `Opts` visible.
///
/// Implement [`ClientWrappingPipeline`] on your concrete type; the blanket impl
/// below provides `DynClientWrappingPipeline` for free.
pub trait DynClientWrappingPipeline<Pkt, Opts> {
/// On-wire size of an output packet in bytes.
fn packet_size(&self) -> usize;
/// Run the full client wrapping pipeline; see [`ClientWrappingPipeline::process`].
fn process(
&mut self,
input: Option<(Vec<u8>, Opts, SocketAddr)>,
timestamp: Instant,
) -> Vec<AddressedTimedData<Pkt>>;
}
impl<T, Pkt, Opts> DynClientWrappingPipeline<Pkt, Opts> for T
where
T: ClientWrappingPipeline<Pkt, Opts>,
{
fn packet_size(&self) -> usize {
WireWrappingPipeline::packet_size(self)
}
fn process(
&mut self,
input: Option<(Vec<u8>, Opts, SocketAddr)>,
timestamp: Instant,
) -> Vec<AddressedTimedData<Pkt>> {
ClientWrappingPipeline::process(self, input, timestamp)
}
}
/// Full client-side inbound pipeline.
///
/// Combines the shared [`WireUnwrappingPipeline`] (transport + framing unwrap) with a
/// blank [`process_unwrapped`](Self::process_unwrapped) step that the implementor
/// fills in (routing-security decrypt, reliability decode, chunk reassembly, etc.).
///
/// # Type Parameters
/// - `Pkt`: Transport packet type consumed as input.
/// - `Mk`: Message-kind marker returned alongside reassembled payloads.
///
/// # Required Methods
/// - `process_unwrapped`: Called with the reassembled payload and its message kind
/// once a complete message is available. Returns the decoded application bytes,
/// or `None` if reassembly is still in progress.
///
/// # Provided Methods
/// - `unwrap`: Strips the wire layers via [`WireUnwrappingPipeline::wire_unwrap`],
/// then delegates to `process_unwrapped`.
pub trait ClientUnwrappingPipeline<Pkt, Mk>: WireUnwrappingPipeline<Pkt, Mk> {
fn process_unwrapped(&mut self, payload: TimedPayload, kind: Mk) -> Option<Vec<u8>>;
fn unwrap(&mut self, input: Pkt, timestamp: Instant) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(self
.wire_unwrap(input, timestamp)?
.and_then(|(payload, kind)| self.process_unwrapped(payload, kind)))
}
}
+147
View File
@@ -0,0 +1,147 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::time::Instant;
use crate::clients::traits::{
Chunking, ClientWrappingPipeline, Obfuscation, Reliability, RoutingSecurity,
};
use crate::common::traits::{Framing, Transport, WireWrappingPipeline};
use crate::{AddressedTimedData, PipelinePayload};
/// Generic composition struct that implements [`ClientWrappingPipeline`] by
/// delegating each stage to a held component.
///
/// Type parameters correspond to the six pipeline stages:
/// - `C`: [`Chunking`]
/// - `R`: [`Reliability`]
/// - `O`: [`Obfuscation`]
/// - `Rs`: [`RoutingSecurity`]
/// - `F`: [`Framing`]
/// - `T`: [`Transport`]
pub struct Pipeline<C, R, O, Rs, F, T> {
/// On-wire size of an output packet in bytes; returned by
/// [`WireWrappingPipeline::packet_size`].
pub packet_size: usize,
/// [`Chunking`] stage.
pub chunking: C,
/// [`Reliability`] stage.
pub reliability: R,
/// [`Obfuscation`] stage.
pub obfuscation: O,
/// [`RoutingSecurity`] stage.
pub security: Rs,
/// [`Framing`] stage.
pub framing: F,
/// [`Transport`] stage.
pub transport: T,
}
impl<Opts, C, R, O, Rs, F, T> Chunking<Opts> for Pipeline<C, R, O, Rs, F, T>
where
C: Chunking<Opts>,
{
fn chunked(
&mut self,
input: PipelinePayload<Opts>,
chunk_size: usize,
timestamp: Instant,
) -> Vec<PipelinePayload<Opts>> {
self.chunking.chunked(input, chunk_size, timestamp)
}
}
impl<Opts, C, R, O, Rs, F, T> Reliability<Opts> for Pipeline<C, R, O, Rs, F, T>
where
R: Reliability<Opts>,
{
const OVERHEAD_SIZE: usize = R::OVERHEAD_SIZE;
fn reliable_encode(
&mut self,
input: Option<PipelinePayload<Opts>>,
timestamp: Instant,
) -> Vec<PipelinePayload<Opts>> {
self.reliability.reliable_encode(input, timestamp)
}
}
impl<Opts, C, R, O, Rs, F, T> Obfuscation<Opts> for Pipeline<C, R, O, Rs, F, T>
where
O: Obfuscation<Opts>,
{
fn obfuscate(
&mut self,
input: Option<PipelinePayload<Opts>>,
timestamp: Instant,
) -> Vec<PipelinePayload<Opts>> {
self.obfuscation.obfuscate(input, timestamp)
}
}
impl<Opts, C, R, O, Rs, F, T> RoutingSecurity<Opts> for Pipeline<C, R, O, Rs, F, T>
where
Rs: RoutingSecurity<Opts>,
{
const OVERHEAD_SIZE: usize = Rs::OVERHEAD_SIZE;
fn nb_frames(&self) -> usize {
self.security.nb_frames()
}
fn encrypt(&mut self, input: PipelinePayload<Opts>) -> PipelinePayload<Opts> {
self.security.encrypt(input)
}
}
impl<Opts, C, R, O, Rs, F, T> Framing<Opts> for Pipeline<C, R, O, Rs, F, T>
where
F: Framing<Opts>,
{
type Frame = F::Frame;
const OVERHEAD_SIZE: usize = F::OVERHEAD_SIZE;
fn to_frame(
&mut self,
payload: PipelinePayload<Opts>,
frame_size: usize,
) -> Vec<AddressedTimedData<F::Frame>> {
self.framing.to_frame(payload, frame_size)
}
}
impl<Pkt, C, R, O, Rs, F, T> Transport<Pkt> for Pipeline<C, R, O, Rs, F, T>
where
T: Transport<Pkt>,
{
type Frame = T::Frame;
const OVERHEAD_SIZE: usize = T::OVERHEAD_SIZE;
fn to_transport_packet(
&mut self,
frame: AddressedTimedData<T::Frame>,
) -> AddressedTimedData<Pkt> {
self.transport.to_transport_packet(frame)
}
}
impl<Pkt, Opts, C, R, O, Rs, F, T> WireWrappingPipeline<Pkt, Opts> for Pipeline<C, R, O, Rs, F, T>
where
F: Framing<Opts>,
T: Transport<Pkt, Frame = F::Frame>,
{
fn packet_size(&self) -> usize {
self.packet_size
}
}
impl<Pkt, Opts, C, R, O, Rs, F, T> ClientWrappingPipeline<Pkt, Opts> for Pipeline<C, R, O, Rs, F, T>
where
C: Chunking<Opts>,
R: Reliability<Opts>,
O: Obfuscation<Opts>,
Rs: RoutingSecurity<Opts>,
F: Framing<Opts>,
T: Transport<Pkt, Frame = F::Frame>,
{
}
+104
View File
@@ -0,0 +1,104 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::time::Instant;
use crate::{
AddressedTimedData, AddressedTimedPayload, PipelinePayload, TimedData, TimedPayload,
common::traits::{
Framing, FramingUnwrap, Transport, TransportUnwrap, WireUnwrappingPipeline,
WireWrappingPipeline,
},
};
/// Marker trait for a no-op [`WireWrappingPipeline`] implementation.
///
/// Implement this for your pipeline type to get a [`WireWrappingPipeline`] impl that
/// passes the payload through unchanged with zero byte overhead.
pub trait NoOpWireWrapper {
const PACKET_SIZE: usize = 1500;
}
impl<T, Opts> Framing<Opts> for T
where
T: NoOpWireWrapper,
{
type Frame = Vec<u8>;
const OVERHEAD_SIZE: usize = 0;
fn to_frame(
&mut self,
payload: PipelinePayload<Opts>,
_: usize,
) -> Vec<AddressedTimedPayload> {
vec![payload.into_addressed()]
}
}
impl<T, Pkt> Transport<Pkt> for T
where
T: NoOpWireWrapper,
Pkt: From<Vec<u8>>,
{
type Frame = Vec<u8>;
const OVERHEAD_SIZE: usize = 0;
fn to_transport_packet(
&mut self,
frame: AddressedTimedPayload,
) -> AddressedTimedData<Pkt> {
frame.data_transform(|data| data.into())
}
}
impl<T, Pkt, Opts> WireWrappingPipeline<Pkt, Opts> for T
where
T: NoOpWireWrapper,
Pkt: From<Vec<u8>>,
{
fn packet_size(&self) -> usize {
T::PACKET_SIZE
}
}
/// Marker trait for a no-op [`WireUnwrappingPipeline`] implementation.
///
/// Implement this for your pipeline type to get a [`WireUnwrappingPipeline`] impl that
/// passes the payload through unchanged.
pub trait NoOpWireUnwrapper {}
impl<T, Mk> FramingUnwrap<Mk> for T
where
T: NoOpWireUnwrapper,
Mk: Default,
{
type Frame = Vec<u8>;
fn frame_to_message(&mut self, frame: TimedPayload) -> Option<(TimedPayload, Mk)> {
Some((frame, Default::default()))
}
}
impl<T, Pkt> TransportUnwrap<Pkt> for T
where
T: NoOpWireUnwrapper,
Pkt: Into<Vec<u8>>,
{
type Frame = Vec<u8>;
type Error = std::convert::Infallible;
fn packet_to_frame(
&mut self,
packet: Pkt,
timestamp: Instant,
) -> Result<TimedPayload, Self::Error> {
Ok(TimedData {
timestamp,
data: packet.into(),
})
}
}
impl<T, Pkt, Mk> WireUnwrappingPipeline<Pkt, Mk> for T
where
T: NoOpWireUnwrapper,
Pkt: Into<Vec<u8>>,
Mk: Default,
{
}
+5
View File
@@ -0,0 +1,5 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod helpers;
pub mod traits;
+163
View File
@@ -0,0 +1,163 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::time::Instant;
use crate::{AddressedTimedData, PipelinePayload, TimedData, TimedPayload};
/// Trait for applying framing to a timed payload.
///
/// # Type Parameters
/// - `Opts` : Opts type carried by the `PipelinePayload`
///
/// # Associated Types
/// - `Frame`: Frame type produced by the framing operation.
///
/// # Associated Constants
/// - `OVERHEAD_SIZE`: Number of additional bytes added by the framing scheme.
///
/// # Required Methods
/// - `to_frame`: Splits the payload into a `Vec<AddressedTimedData<Self::Frame>>` of frames of the given size.
pub trait Framing<Opts> {
type Frame;
const OVERHEAD_SIZE: usize;
fn to_frame(
&mut self,
payload: PipelinePayload<Opts>,
frame_size: usize,
) -> Vec<AddressedTimedData<Self::Frame>>;
}
/// Trait for unwrapping framing from a frame back into a payload.
///
/// # Type Parameters
/// - `Mk`: Enum describing the kind of message that can be returned.
///
/// # Associated Types
/// - `Frame`: Frame type consumed as input.
///
/// # Required Methods
/// - `frame_to_message`: Attempts to reassemble a payload from the given frame, returning
/// `Some((payload, kind))` when a complete message is available, or `None` otherwise.
pub trait FramingUnwrap<Mk> {
type Frame;
fn frame_to_message(&mut self, frame: TimedData<Self::Frame>) -> Option<(TimedPayload, Mk)>;
}
/// Trait for applying a transport layer to a framed payload.
///
/// # Type Parameters
/// - `Pkt`: Transport packet type produced as output.
///
/// # Associated Types
/// - `Frame`: Frame type consumed as input.
///
/// # Associated Constants
/// - `OVERHEAD_SIZE`: Number of additional bytes added by the transport scheme.
///
/// # Required Methods
/// - `to_transport_packet`: Wraps a frame into a transport packet.
pub trait Transport<Pkt> {
type Frame;
const OVERHEAD_SIZE: usize;
fn to_transport_packet(
&mut self,
frame: AddressedTimedData<Self::Frame>,
) -> AddressedTimedData<Pkt>;
}
/// Trait for unwrapping a transport packet back into a frame.
///
/// # Type Parameters
/// - `Pkt`: Transport packet type consumed as input.
///
/// # Associated Types
/// - `Frame`: Frame type produced as output.
/// - `Error`: Error type
///
/// # Required Methods
/// - `packet_to_frame`: Strips the transport layer from a packet, returning the inner frame
/// tagged with the given timestamp.
pub trait TransportUnwrap<Pkt> {
type Frame;
type Error;
fn packet_to_frame(
&mut self,
packet: Pkt,
timestamp: Instant,
) -> Result<TimedData<Self::Frame>, Self::Error>;
}
/// Supertrait combining [`Framing`] and [`Transport`] into a reusable wire-wrapping layer.
///
/// Used as the bottom stage of any outbound pipeline (client or mixnode).
///
/// # Type Parameters
/// - `Pkt`: Final transport packet type.
/// - `Opts` : Option type
///
/// Both [`Framing`] and [`Transport`] declare their own `type Frame`; this
/// supertrait cross-constrains them so `to_frame`'s output feeds directly into
/// `to_transport_packet`.
///
/// # Required Methods
/// - `packet_size`: Total on-wire size of an output packet in bytes.
///
/// # Provided Methods
/// - `frame_size`: Derived from `packet_size` minus transport and framing overheads.
/// - `wire_wrap`: Frames a payload and wraps each frame into a transport packet.
pub trait WireWrappingPipeline<Pkt, Opts>:
Transport<Pkt> + Framing<Opts, Frame = <Self as Transport<Pkt>>::Frame>
{
// IMPORTANT NOTE : This fn can be not constant to allow e.g. flexible MTU
// However, every possible value must be able to accommodate the different overhead.
// If it doesn't, the pipeline becomes unusable
fn packet_size(&self) -> usize;
fn frame_size(&self) -> usize {
// SAFETY : While this CAN technically fail, it means that something is wrong in the code and it's pointless to continue anyway
#[allow(clippy::expect_used)]
self.packet_size()
.checked_sub(
<Self as Transport<Pkt>>::OVERHEAD_SIZE + <Self as Framing<Opts>>::OVERHEAD_SIZE,
)
.expect("packet_size smaller than transport + framing overhead")
}
fn wire_wrap(&mut self, payload: PipelinePayload<Opts>) -> Vec<AddressedTimedData<Pkt>> {
let frame_size = self.frame_size();
self.to_frame(payload, frame_size)
.into_iter()
.map(|frame| self.to_transport_packet(frame))
.collect()
}
}
/// Supertrait combining [`TransportUnwrap`] and [`FramingUnwrap`] into a reusable
/// wire-unwrapping layer.
///
/// Used as the bottom stage of any inbound pipeline (client or mixnode).
///
/// # Type Parameters
/// - `Pkt`: Transport packet type consumed as input.
/// - `Mk`: Message-kind marker returned alongside the reassembled payload.
///
/// Both [`TransportUnwrap`] and [`FramingUnwrap`] declare their own `type Frame`;
/// this supertrait cross-constrains them so `packet_to_frame`'s output feeds
/// directly into `frame_to_message`.
///
/// # Provided Methods
/// - `wire_unwrap`: Strips the transport layer from a packet and attempts to reassemble
/// a payload, returning `Some((payload, kind))` when a complete message is available.
pub trait WireUnwrappingPipeline<Pkt, Mk>:
TransportUnwrap<Pkt> + FramingUnwrap<Mk, Frame = <Self as TransportUnwrap<Pkt>>::Frame>
{
fn wire_unwrap(
&mut self,
input: Pkt,
timestamp: Instant,
) -> Result<Option<(TimedPayload, Mk)>, Self::Error> {
let frame = self.packet_to_frame(input, timestamp)?;
Ok(self.frame_to_message(frame))
}
}
@@ -0,0 +1,156 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{
fragmentation::FragmentationError,
packet::{
LpFrame,
frame::{LpFrameAttributes, LpFrameKind},
},
};
#[derive(PartialEq, Clone, Debug)]
pub struct FragmentHeader {
/// ID associated to this particular `Fragment`.
id: u64,
/// Total number of `Fragment`s, used to be able to determine if entire
/// set was fully received as well as to perform bound checks.
total_fragments: u8,
/// Index of this fragment, in (0..total_fragments)
current_fragment: u8,
reserved: [u8; 4],
}
impl FragmentHeader {
// It's up to the caller to make sure values are valid
fn new(id: u64, total_fragments: u8, current_fragment: u8) -> Self {
FragmentHeader {
id,
total_fragments,
current_fragment,
reserved: [0; 4],
}
}
}
impl From<FragmentHeader> for LpFrameAttributes {
fn from(value: FragmentHeader) -> Self {
let mut buf = [0u8; 14];
buf[0..8].copy_from_slice(&value.id.to_be_bytes());
buf[8] = value.total_fragments;
buf[9] = value.current_fragment;
buf[10..14].copy_from_slice(&value.reserved);
buf
}
}
impl TryFrom<LpFrameAttributes> for FragmentHeader {
type Error = FragmentationError;
fn try_from(value: LpFrameAttributes) -> Result<Self, Self::Error> {
let total_fragments = value[8];
let current_fragment = value[9];
if current_fragment >= total_fragments {
return Err(FragmentationError::FragmentIndexOutOfBounds);
}
// SAFETY : Three conversion from slices to arrays with correct size
Ok(FragmentHeader {
#[allow(clippy::unwrap_used)]
id: u64::from_be_bytes(value[0..8].try_into().unwrap()),
total_fragments,
current_fragment,
#[allow(clippy::unwrap_used)]
reserved: value[10..14].try_into().unwrap(),
})
}
}
#[derive(PartialEq, Clone, Debug)]
pub struct Fragment {
header: FragmentHeader,
payload: Vec<u8>,
}
impl Fragment {
// It's up to the caller to make sure values are valid
fn new(payload: &[u8], id: u64, total_fragments: u8, current_fragment: u8) -> Self {
let header = FragmentHeader::new(id, total_fragments, current_fragment);
Fragment {
header,
payload: payload.to_vec(),
}
}
pub fn into_lp_frame(self) -> LpFrame {
LpFrame::new_with_attributes(LpFrameKind::FragmentedData, self.header, self.payload)
}
/// Extracts id of this `Fragment`.
pub fn id(&self) -> u64 {
self.header.id
}
/// Extracts total number of fragments associated with this particular `Fragment` (belonging to
/// the same `FragmentSet`).
pub fn total_fragments(&self) -> u8 {
self.header.total_fragments
}
/// Extracts position of this `Fragment` in a `FragmentSet`.
pub fn current_fragment(&self) -> u8 {
self.header.current_fragment
}
/// Consumes `self` to obtain payload (i.e. part of original message) associated with this
/// `Fragment`.
pub(crate) fn extract_payload(self) -> Vec<u8> {
self.payload
}
}
impl TryFrom<LpFrame> for Fragment {
type Error = FragmentationError;
fn try_from(value: LpFrame) -> Result<Self, Self::Error> {
match value.kind() {
LpFrameKind::FragmentedData => Ok(Fragment {
header: value.header.frame_attributes.try_into()?,
payload: value.content.to_vec(),
}),
_ => Err(FragmentationError::InvalidFrameKind),
}
}
}
/// Splits an LpFrame into multiple `Fragment`s
/// This is meant to be used during Framing, not Chunking. This way we can ensure it fits in less than 255 fragments
pub fn fragment_lp_message<R: rand::Rng>(
rng: &mut R,
message: LpFrame,
fragment_payload_size: usize,
) -> Vec<Fragment> {
debug_assert!(message.len() <= u8::MAX as usize * fragment_payload_size);
let message_bytes = message.to_bytes();
let id = rng.r#gen();
let num_fragments = (message_bytes.len() as f64 / fragment_payload_size as f64).ceil() as u8;
let mut fragments = Vec::with_capacity(num_fragments as usize);
for i in 0..num_fragments as usize {
let lb = i * fragment_payload_size;
let ub = usize::min(message_bytes.len(), (i + 1) * fragment_payload_size);
fragments.push(Fragment::new(
&message_bytes[lb..ub],
id,
num_fragments,
i as u8,
))
}
fragments
}
@@ -0,0 +1,16 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use thiserror::Error;
pub mod fragment;
pub mod reconstruction;
#[derive(Debug, Error)]
pub enum FragmentationError {
#[error("Fragment index is out of bounds for the announced lentgh")]
FragmentIndexOutOfBounds,
#[error("Provided frame isn't fragmented")]
InvalidFrameKind,
}
@@ -0,0 +1,503 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::fragmentation::fragment::Fragment;
use crate::packet::{LpFrame, MalformedLpPacketError};
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, trace, warn};
pub const DEFAULT_FRAGMENT_TIMEOUT_DURATION: Duration = Duration::from_secs(30);
/// Per-message buffer that collects every `Fragment` of a fragmented message
/// and reassembles the original payload once they are all in.
#[derive(Debug, Clone)]
struct MessageBuffer {
/// Cached completion flag, set as soon as the last missing slot has been
/// filled. Avoids re-scanning `fragments` on every read.
is_complete: bool,
/// Position-indexed slots for the message's fragments. Allocated up front
/// to `total_fragments` `None` entries on first sight of the message,
/// giving O(1) inserts and O(n) reassembly while preserving order.
fragments: Vec<Option<Fragment>>,
/// Timestamp of the most recently inserted fragment. Read by
/// [`MessageReconstructor::cleanup_stale_buffers`] to evict messages whose
/// remaining fragments never showed up.
last_fragment_timestamp: Instant,
}
impl MessageBuffer {
/// Create an empty buffer sized for `total_fragments` slots.
/// The `u8` argument bounds the allocation at `u8::MAX`.
fn new(total_fragments: u8, timestamp: Instant) -> Self {
// `new` should never be called with size 0: `total_fragments` is taken
// from the first received `Fragment` of the message, and decoding
// rejects any header where `current_fragment >= total_fragments`, so
// the smallest valid value is 1.
debug_assert!(total_fragments > 0);
MessageBuffer {
is_complete: false,
fragments: vec![None; total_fragments as usize],
last_fragment_timestamp: timestamp,
}
}
/// Consume the buffer and concatenate every fragment payload into the
/// original message bytes. The caller is expected to have observed
/// `is_complete == true` first.
fn into_message(self) -> Vec<u8> {
debug_assert!(self.is_complete);
// SAFETY: `is_complete` is only set inside `insert_fragment` after
// `is_done_receiving` confirms every slot is `Some`. The
// `debug_assert!` above pins this invariant, so reading slot 0 and
// unwrapping every slot below cannot panic.
#[allow(clippy::unwrap_used)]
let id = self.fragments[0].as_ref().unwrap().id();
debug!(
"Got {} fragments for message id {}",
self.fragments.len(),
id
);
// SAFETY: same invariant as above — every slot is `Some`.
#[allow(clippy::unwrap_used)]
self.fragments
.into_iter()
.flat_map(|fragment| fragment.unwrap().extract_payload())
.collect()
}
/// Whether every fragment slot has been filled.
fn is_done_receiving(&self) -> bool {
!self.fragments.contains(&None)
}
/// Insert `fragment` into the slot at `fragment.current_fragment()` and
/// update `last_fragment_timestamp` and `is_complete` accordingly.
///
/// Duplicate fragments are logged, then ignored
fn insert_fragment(&mut self, fragment: Fragment, timestamp: Instant) {
self.last_fragment_timestamp = timestamp;
// All fragments routed into a given buffer must share the same id —
// it is part of the buffer's lookup key, so a mismatch would
// indicate a routing bug upstream.
debug_assert!({
let present = self.fragments.iter().find(|frag| frag.is_some());
// SAFETY: `find` returned a slot that satisfied `is_some`, so
// the inner `unwrap` cannot panic.
#[allow(clippy::unwrap_used)]
let same_id = present.is_none_or(|p| p.as_ref().unwrap().id() == fragment.id());
same_id
});
let fragment_index = fragment.current_fragment() as usize;
if self.fragments[fragment_index].is_some() {
// If we receive a duplicate, we ignore it
warn!(
"duplicate fragment received! - frag - {} (message id: {})",
fragment.current_fragment(),
fragment.id()
);
} else {
self.fragments[fragment_index] = Some(fragment);
if self.is_done_receiving() {
self.is_complete = true;
}
}
}
}
/// Public reassembly state for fragmented messages. Buffers in-flight
/// messages keyed on their fragment id and yields the original bytes
/// once every fragment of a given message has been received.
#[derive(Debug, Clone)]
pub struct MessageReconstructor {
/// In-flight messages keyed on the random 64-bit fragment id.
in_flight_messages: Arc<DashMap<u64, MessageBuffer>>,
/// How long an incomplete message is allowed to sit before it is
/// dropped on the next `cleanup_stale_buffers` pass.
incomplete_message_timeout: Duration,
}
impl MessageReconstructor {
/// Create an empty `MessageReconstructor`.
pub fn new(incomplete_message_timeout: Duration) -> Self {
Self {
in_flight_messages: Default::default(),
incomplete_message_timeout,
}
}
/// Insert `fragment` into the buffer for its message and, if it was the
/// last outstanding fragment, return the reassembled LpFrame
///
/// Stale incomplete messages are evicted on every call.
pub fn insert_new_fragment(
&self,
fragment: Fragment,
timestamp: Instant,
) -> Option<Result<LpFrame, MalformedLpPacketError>> {
let frag_id = fragment.id();
let total_fragments = fragment.total_fragments();
let maybe_message = match self.in_flight_messages.entry(frag_id) {
Entry::Occupied(mut entry) => {
entry.get_mut().insert_fragment(fragment, timestamp);
entry
.get()
.is_complete
.then(|| LpFrame::decode(&entry.remove().into_message()))
}
Entry::Vacant(entry) => {
let mut buf = MessageBuffer::new(total_fragments, timestamp);
buf.insert_fragment(fragment, timestamp);
if buf.is_complete {
Some(LpFrame::decode(&buf.into_message()))
} else {
entry.insert(buf);
None
}
}
};
// This might be a bit slow, keep an eye on it
self.cleanup_stale_buffers(timestamp);
maybe_message
}
/// Drop incomplete messages whose `last_fragment_timestamp` is older
/// than `incomplete_message_timeout` ago.
pub fn cleanup_stale_buffers(&self, timestamp: Instant) {
trace!("Cleaning up stale buffers");
self.in_flight_messages.retain(|_, buf| {
let keep = buf.last_fragment_timestamp + self.incomplete_message_timeout > timestamp;
if !keep {
debug!(
"Removing stale buffer for message id {:?}",
buf.fragments
.first()
.and_then(|f| f.as_ref().map(|f| f.id()))
);
}
keep
});
}
}
impl Default for MessageReconstructor {
fn default() -> Self {
MessageReconstructor::new(DEFAULT_FRAGMENT_TIMEOUT_DURATION)
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::fragmentation::fragment::fragment_lp_message;
use crate::packet::LpFrame;
use crate::packet::frame::LpFrameKind;
use rand::SeedableRng;
use rand::rngs::StdRng;
const SPHINX: LpFrameKind = LpFrameKind::SphinxPacket;
/// Build a `Fragment` with explicit header values via the public
/// `LpFrame` round-trip, so tests can craft duplicates, out-of-order
/// inserts and id collisions without depending on RNG output.
fn make_fragment(
id: u64,
total_fragments: u8,
current_fragment: u8,
inner_kind: LpFrameKind,
payload: Vec<u8>,
) -> Fragment {
let mut attrs = [0u8; 14];
attrs[0..8].copy_from_slice(&id.to_be_bytes());
attrs[8] = total_fragments;
attrs[9] = current_fragment;
attrs[10..12].copy_from_slice(&u16::to_be_bytes(inner_kind.into()));
let frame = LpFrame::new_with_attributes(LpFrameKind::FragmentedData, attrs, payload);
Fragment::try_from(frame).unwrap()
}
fn split(message: LpFrame, fragment_size: usize) -> Vec<Fragment> {
let mut rng = StdRng::seed_from_u64(0xdead_beef);
fragment_lp_message(&mut rng, message, fragment_size)
}
/// Shared base instant for the test module. `Instant` cannot be constructed
/// from an absolute value, so we anchor on a single `now()` and express the
/// formerly-`u64` tick timestamps as offsets from it — only differences
/// matter for buffering/eviction logic, so determinism is preserved.
static BASE: std::sync::LazyLock<Instant> = std::sync::LazyLock::new(Instant::now);
/// A timestamp `ms` milliseconds after [`BASE`] (replaces the old `u64` ticks).
fn at(ms: u64) -> Instant {
*BASE + Duration::from_millis(ms)
}
/// A timeout of `ms` milliseconds (replaces the old `u64` offsets).
fn timeout(ms: u64) -> Duration {
Duration::from_millis(ms)
}
/// Build a deterministic, *decodable* set of `Fragment`s for a message of
/// `inner_kind` carrying `content`, tagged with `id` and split into exactly
/// `count` fragments.
///
/// Unlike [`make_fragment`], which crafts a single fragment from a raw
/// payload, this encodes a real [`LpFrame`] first (header + content) and
/// slices the encoded bytes — matching what `fragment_lp_message` does in
/// production, so the reassembled bytes decode back into the original frame.
fn make_message_fragments(
id: u64,
inner_kind: LpFrameKind,
content: &[u8],
count: u8,
) -> Vec<Fragment> {
let encoded = LpFrame::new(inner_kind, content.to_vec()).to_bytes();
let frag_size = encoded.len().div_ceil(count as usize);
let frags: Vec<Fragment> = encoded
.chunks(frag_size)
.enumerate()
.map(|(i, chunk)| make_fragment(id, count, i as u8, inner_kind, chunk.to_vec()))
.collect();
assert_eq!(
frags.len() as u8,
count,
"content/count combination did not split into exactly {count} fragments"
);
frags
}
// ---------- MessageBuffer ----------
#[test]
fn buffer_completes_on_single_fragment() {
let f = make_fragment(1, 1, 0, SPHINX, b"hi".to_vec());
let mut buf = MessageBuffer::new(1, at(0));
assert!(!buf.is_complete);
buf.insert_fragment(f, at(0));
assert!(buf.is_complete);
assert_eq!(buf.into_message(), b"hi");
}
#[test]
fn buffer_completes_only_after_last_fragment() {
let mut buf = MessageBuffer::new(3, at(0));
buf.insert_fragment(make_fragment(7, 3, 0, SPHINX, vec![0xaa]), at(1));
assert!(!buf.is_complete);
buf.insert_fragment(make_fragment(7, 3, 1, SPHINX, vec![0xbb]), at(2));
assert!(!buf.is_complete);
buf.insert_fragment(make_fragment(7, 3, 2, SPHINX, vec![0xcc]), at(3));
assert!(buf.is_complete);
assert_eq!(buf.into_message(), vec![0xaa, 0xbb, 0xcc]);
}
#[test]
fn buffer_reassembles_in_order_regardless_of_insertion_order() {
let mut buf = MessageBuffer::new(4, at(0));
buf.insert_fragment(make_fragment(1, 4, 2, SPHINX, vec![3]), at(0));
buf.insert_fragment(make_fragment(1, 4, 0, SPHINX, vec![1]), at(0));
buf.insert_fragment(make_fragment(1, 4, 3, SPHINX, vec![4]), at(0));
buf.insert_fragment(make_fragment(1, 4, 1, SPHINX, vec![2]), at(0));
assert!(buf.is_complete);
assert_eq!(buf.into_message(), vec![1, 2, 3, 4]);
}
#[test]
fn buffer_tracks_last_fragment_timestamp() {
let mut buf = MessageBuffer::new(2, at(100));
assert_eq!(buf.last_fragment_timestamp, at(100));
buf.insert_fragment(make_fragment(1, 2, 0, SPHINX, vec![0]), at(250));
assert_eq!(buf.last_fragment_timestamp, at(250));
buf.insert_fragment(make_fragment(1, 2, 1, SPHINX, vec![1]), at(400));
assert_eq!(buf.last_fragment_timestamp, at(400));
}
#[test]
fn buffer_duplicate_fragment_does_not_break_completion() {
let mut buf = MessageBuffer::new(2, at(0));
buf.insert_fragment(make_fragment(1, 2, 0, SPHINX, vec![0xaa]), at(0));
// Same slot twice
buf.insert_fragment(make_fragment(1, 2, 0, SPHINX, vec![0xaa]), at(0));
assert!(!buf.is_complete);
buf.insert_fragment(make_fragment(1, 2, 1, SPHINX, vec![0xbb]), at(0));
assert!(buf.is_complete);
assert_eq!(buf.into_message(), vec![0xaa, 0xbb]);
}
#[test]
fn buffer_empty_payloads_reassemble_to_empty_message() {
let mut buf = MessageBuffer::new(2, at(0));
buf.insert_fragment(make_fragment(1, 2, 0, SPHINX, vec![]), at(0));
buf.insert_fragment(make_fragment(1, 2, 1, SPHINX, vec![]), at(0));
assert!(buf.is_complete);
assert!(buf.into_message().is_empty());
}
// ---------- MessageReconstructor: round trip via fragment_payload ----------
#[test]
fn reconstructor_round_trip_single_fragment_message() {
let message = LpFrame::new(SPHINX, b"small".as_slice());
let mut fragments = split(message.clone(), 64);
assert_eq!(fragments.len(), 1);
let rec = MessageReconstructor::new(timeout(60));
let out = rec.insert_new_fragment(fragments.pop().unwrap(), at(0));
let recovered_frame = out
.expect("single fragment must complete the message")
.unwrap();
assert_eq!(recovered_frame, message);
}
#[test]
fn reconstructor_round_trip_multi_fragment_message() {
let message = LpFrame::new(SPHINX, (0u8..=200).collect::<Vec<_>>());
let fragments = split(message.clone(), 16);
assert!(fragments.len() > 1);
let rec = MessageReconstructor::new(timeout(60));
let total = fragments.len();
let mut out = None;
for (i, f) in fragments.into_iter().enumerate() {
out = rec.insert_new_fragment(f, at(i as u64));
if i + 1 < total {
assert!(out.is_none(), "premature completion at fragment {i}");
}
}
let recovered_frame = out
.expect("last fragment must complete the message")
.unwrap();
assert_eq!(recovered_frame, message);
}
#[test]
fn reconstructor_handles_out_of_order_arrival() {
let message = LpFrame::new(SPHINX, (0u8..=200).collect::<Vec<_>>());
let mut fragments = split(message.clone(), 18);
// Reverse arrival order.
fragments.reverse();
let rec = MessageReconstructor::new(timeout(60));
let mut out = None;
for (i, f) in fragments.into_iter().enumerate() {
out = rec.insert_new_fragment(f, at(i as u64));
}
let recovered_frame = out
.expect("last fragment must complete the message")
.unwrap();
assert_eq!(recovered_frame, message);
}
#[test]
fn reconstructor_keeps_distinct_messages_separate() {
// Two messages with different ids interleaved.
let mut a = make_message_fragments(1, SPHINX, &[0xa1, 0xa2], 2);
let mut b = make_message_fragments(2, SPHINX, &[0xb1, 0xb2], 2);
let rec = MessageReconstructor::new(timeout(60));
// Interleave.
assert!(rec.insert_new_fragment(a.remove(0), at(0)).is_none());
assert!(rec.insert_new_fragment(b.remove(0), at(1)).is_none());
let msg_a = rec
.insert_new_fragment(a.remove(0), at(2))
.unwrap()
.unwrap();
let msg_b = rec
.insert_new_fragment(b.remove(0), at(3))
.unwrap()
.unwrap();
assert_eq!(msg_a.content, vec![0xa1, 0xa2]);
assert_eq!(msg_b.content, vec![0xb1, 0xb2]);
}
#[test]
fn reconstructor_clears_buffer_after_emitting_message() {
let f = make_message_fragments(99, SPHINX, &[0xff], 1).remove(0);
let rec = MessageReconstructor::new(timeout(60));
rec.insert_new_fragment(f, at(0))
.expect("single fragment must complete the message")
.unwrap();
assert!(
rec.in_flight_messages.is_empty(),
"completed messages must not linger in the in-flight map"
);
}
// ---------- cleanup_stale_buffers ----------
#[test]
fn cleanup_evicts_buffers_older_than_timeout() {
let f = make_fragment(1, 2, 0, SPHINX, vec![0]);
let rec = MessageReconstructor::new(timeout(10));
// First (and only) fragment received at t=0; the message stays
// incomplete.
assert!(rec.insert_new_fragment(f, at(0)).is_none());
assert_eq!(rec.in_flight_messages.len(), 1);
// Within the timeout window — buffer must survive.
rec.cleanup_stale_buffers(at(5));
assert_eq!(rec.in_flight_messages.len(), 1);
// Past the window — evicted.
rec.cleanup_stale_buffers(at(100));
assert!(rec.in_flight_messages.is_empty());
}
#[test]
fn cleanup_runs_implicitly_on_insert() {
// Stale message at t=0, then a brand new message arrives well past
// the timeout. The implicit cleanup inside `insert_new_fragment`
// must drop the stale entry.
// Only the first of the stale message's two fragments is ever delivered.
let stale = make_message_fragments(1, SPHINX, &[0x00, 0x01], 2).remove(0);
let fresh = make_message_fragments(2, SPHINX, &[0xff], 1).remove(0);
let rec = MessageReconstructor::new(timeout(10));
assert!(rec.insert_new_fragment(stale, at(0)).is_none());
assert_eq!(rec.in_flight_messages.len(), 1);
let msg = rec.insert_new_fragment(fresh, at(1_000)).unwrap().unwrap();
assert_eq!(msg.content, vec![0xff]);
// `fresh` was a single-fragment message and is removed on emission;
// the stale buffer must also be gone.
assert!(rec.in_flight_messages.is_empty());
}
#[test]
fn cleanup_resets_idle_timer_on_each_fragment() {
// A buffer that keeps receiving fragments must not be evicted
// even if the absolute time exceeds the timeout, as long as the
// gap between fragments stays under it.
let rec = MessageReconstructor::new(timeout(10));
let mut frags = make_message_fragments(1, SPHINX, &[0xa, 0xb, 0xc], 3).into_iter();
assert!(
rec.insert_new_fragment(frags.next().unwrap(), at(0))
.is_none()
);
assert!(
rec.insert_new_fragment(frags.next().unwrap(), at(8))
.is_none()
);
// Absolute time is now 16 (> 10), but the gap from the previous
// fragment (8) to now (16) is 8, still within the 10-tick timeout.
let out = rec.insert_new_fragment(frags.next().unwrap(), at(16));
let msg = out.expect("buffer must still be alive").unwrap();
assert_eq!(msg.content, vec![0xa, 0xb, 0xc]);
}
}
+189
View File
@@ -0,0 +1,189 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
//! Trait definitions and data structures for low-level packet (LP) processing
//! pipelines in the Nym mixnet.
//!
//! ## Crate layout
//!
//! | Module | Purpose |
//! |--------|---------|
//! | [`clients`] | Client-side pipeline traits and types: chunking, reliability, obfuscation, routing security, framing, transport |
//! | [`common`] | Shared framing and transport traits used by both clients and mixnodes |
//! | [`nymnodes`] | Mixnode-side pipeline traits: unwrap incoming packets, re-wrap and forward them |
//!
//! ## Core types
//!
//! [`TimedData`] is the foundational wrapper that pairs any piece of data with an
//! [`Instant`] timestamp, threading timing information through every stage of the
//! pipeline. [`TimedPayload`] is a convenience alias for `TimedData<Vec<u8>>`.
use std::net::SocketAddr;
use std::time::Instant;
pub mod clients;
pub mod common;
pub mod fragmentation;
pub mod nymnodes;
pub mod packet;
/// Convenience alias for [`TimedData`] when the payload is a raw byte buffer.
pub type TimedPayload = TimedData<Vec<u8>>;
/// Convenience alias for [`AddressedTimedData`] when the payload is a raw byte buffer.
pub type AddressedTimedPayload = AddressedTimedData<Vec<u8>>;
/// Convenience alias for [`PipelineData`] when the payload is a raw byte buffer.
pub type PipelinePayload<Opts, NdId = SocketAddr> = PipelineData<Vec<u8>, Opts, NdId>;
/// A value of type `D` tagged with an [`Instant`] timestamp.
///
/// `TimedData` threads timing information through every stage of the LP
/// pipeline. It is produced by [`clients::traits::Chunking`] and propagated
/// unchanged (or with its timestamp replaced via [`TimedData::with_timestamp`])
/// through every subsequent pipeline stage until the packet is sent on the wire.
#[derive(Clone, Debug)]
pub struct TimedData<D> {
pub timestamp: Instant,
pub data: D,
}
impl<D> TimedData<D> {
pub fn new(timestamp: Instant, data: D) -> Self {
TimedData { timestamp, data }
}
/// Apply `op` to the data component, leaving the timestamp unchanged.
///
/// `Nd` can differ from `D`, so this also acts as a type transform.
pub fn data_transform<F, Nd>(self, mut op: F) -> TimedData<Nd>
where
F: FnMut(D) -> Nd,
{
TimedData {
data: op(self.data),
timestamp: self.timestamp,
}
}
/// Set a new timestamp
pub fn with_timestamp(self, new_timestamp: Instant) -> Self {
TimedData {
data: self.data,
timestamp: new_timestamp,
}
}
}
/// A timestamped payload extended with pipeline-stage options and a destination address.
///
/// `PipelineData` is the value flowing between client-side pipeline stages
/// ([`Chunking`], [`Reliability`], [`Obfuscation`], [`RoutingSecurity`], [`Framing`],
/// [`Transport`]). It carries:
///
/// - `data`: a [`TimedData`] pairing the payload with its scheduled timestamp,
/// - `options`: opaque per-message metadata threaded through the pipeline (`()`
/// once the message is reduced to an addressed payload),
/// - `dst`: the next-hop socket address the wire layer should send to.
///
/// [`Chunking`]: crate::clients::traits::Chunking
/// [`Reliability`]: crate::clients::traits::Reliability
/// [`Obfuscation`]: crate::clients::traits::Obfuscation
/// [`RoutingSecurity`]: crate::clients::traits::RoutingSecurity
/// [`Framing`]: crate::common::traits::Framing
/// [`Transport`]: crate::common::traits::Transport
#[derive(Clone, Debug)]
pub struct PipelineData<D, Opts, NdId = SocketAddr> {
pub data: TimedData<D>,
pub options: Opts,
pub dst: NdId,
}
impl<D, Opts, NdId> PipelineData<D, Opts, NdId> {
/// Construct a new [`PipelineData`] from its parts.
pub fn new(timestamp: Instant, data: D, options: Opts, dst: NdId) -> Self {
PipelineData {
data: TimedData::new(timestamp, data),
options,
dst,
}
}
/// Apply `op` to the data component, leaving the timestamp, options, and
/// destination unchanged.
///
/// `Nd` can differ from `D`, so this also acts as a type transform.
pub fn data_transform<F, Nd>(self, op: F) -> PipelineData<Nd, Opts, NdId>
where
F: FnMut(D) -> Nd,
{
PipelineData {
data: self.data.data_transform(op),
options: self.options,
dst: self.dst,
}
}
/// Set a new timestamp
pub fn with_timestamp(self, new_timestamp: Instant) -> Self {
PipelineData {
data: self.data.with_timestamp(new_timestamp),
options: self.options,
dst: self.dst,
}
}
/// Apply `op` to the options component, leaving the timestamp, data, and
/// destination unchanged.
///
/// `No` can differ from `O`, so this also acts as a type transform.
pub fn options_transform<F, No>(self, mut op: F) -> PipelineData<D, No, NdId>
where
F: FnMut(Opts) -> No,
{
PipelineData {
data: self.data,
options: op(self.options),
dst: self.dst,
}
}
/// Set a new destination
pub fn with_dst<NewNdId>(self, new_dst: NewNdId) -> PipelineData<D, Opts, NewNdId> {
PipelineData {
data: self.data,
options: self.options,
dst: new_dst,
}
}
/// Drop the pipeline options, producing a plain addressed payload.
pub fn into_addressed(self) -> AddressedTimedData<D, NdId> {
AddressedTimedData {
data: self.data,
options: (),
dst: self.dst,
}
}
}
/// Convenience alias for [`PipelineData`] when no per-message pipeline options
/// are needed. Avoids duplicating the pipeline data structure.
pub type AddressedTimedData<D, NdId = SocketAddr> = PipelineData<D, (), NdId>;
impl<D, NdId> AddressedTimedData<D, NdId> {
/// Construct a new [`AddressedTimedData`] with unit `options`.
pub fn new_addressed(timestamp: Instant, data: D, dst: NdId) -> Self {
AddressedTimedData {
data: TimedData::new(timestamp, data),
options: (),
dst,
}
}
/// Convert a [`AddressedTimedData`] into a [`PipelineData`] with the provided options.
pub fn with_options<Opts>(self, opts: Opts) -> PipelineData<D, Opts, NdId> {
PipelineData {
data: self.data,
options: opts,
dst: self.dst,
}
}
}
+4
View File
@@ -0,0 +1,4 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod traits;
+67
View File
@@ -0,0 +1,67 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::time::Instant;
use crate::{AddressedTimedData, PipelinePayload, TimedPayload};
use crate::common::traits::{WireUnwrappingPipeline, WireWrappingPipeline};
/// Top-level processing trait for a mix node.
///
/// Combines [`WireUnwrappingPipeline`] and [`WireWrappingPipeline`] with a blank [`mix`]
/// step that the implementor fills in (decrypt, route, re-encrypt, cover traffic, etc.).
///
/// # Type Parameters
/// - `Pkt`: Transport packet type; the same type is consumed and produced.
///
/// # Associated Types
/// - `Options`: Per-message pipeline options carried into the re-wrapping side.
/// - `MessageKind`: Message-kind marker returned by the unwrap side.
///
/// Both are properties of the concrete pipeline rather than something a caller
/// varies, so they live as associated types. This keeps consumers (e.g. a
/// generic worker driver) free of `Options` / `MessageKind` bounds.
///
/// Frame types are owned by the wire sub-traits as associated items and do not
/// appear in this trait's parameter list.
///
/// # Required Methods
/// - `mix`: Given a reassembled payload and the current timestamp, return zero or more
/// [`PipelinePayload`]s carrying their next-hop addresses to be re-wrapped and forwarded.
///
/// # Provided Methods
/// - `process`: Unwraps the incoming packet via [`WireUnwrappingPipeline::wire_unwrap`],
/// passes the result to [`mix`], and re-wraps each output payload via
/// [`WireWrappingPipeline::wire_wrap`].
///
/// [`mix`]: NymNodeProcessingPipeline::mix
pub trait NymNodeProcessingPipeline<Pkt>:
WireUnwrappingPipeline<Pkt, <Self as NymNodeProcessingPipeline<Pkt>>::MessageKind>
+ WireWrappingPipeline<Pkt, <Self as NymNodeProcessingPipeline<Pkt>>::Options>
{
type Options;
type MessageKind;
fn mix(
&mut self,
message_kind: Self::MessageKind,
payload: TimedPayload,
timestamp: Instant,
) -> Vec<PipelinePayload<Self::Options>>;
fn process(
&mut self,
input: Pkt,
timestamp: Instant,
) -> Result<Vec<AddressedTimedData<Pkt>>, Self::Error> {
let Some((payload, kind)) = self.wire_unwrap(input, timestamp)? else {
return Ok(Vec::new());
};
let mixed = self.mix(kind, payload, timestamp);
Ok(mixed
.into_iter()
.flat_map(|addressed_data| self.wire_wrap(addressed_data).into_iter())
.collect())
}
}
@@ -3,9 +3,31 @@
use crate::packet::error::MalformedLpPacketError;
use bytes::{BufMut, Bytes, BytesMut};
use num_enum::{IntoPrimitive, TryFromPrimitive};
use num_enum::{FromPrimitive, IntoPrimitive};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
/// Represent kind of application data being sent in Transport mode
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, FromPrimitive, Hash)]
#[repr(u16)]
pub enum LpFrameKind {
Opaque = 0,
Registration = 1,
Forward = 2,
SphinxStream = 3,
FragmentedData = 4,
SphinxPacket = 5, // Sphinx Packet to process, delay and forward
OutfoxPacket = 6, // Outfox Packet to process, delay and forward
ForwardSphinxPacket = 7, // Sphinx Packet to immediately forward
ForwardOutfoxPacket = 8, // Outfox Packet to immediately forward
#[num_enum(catch_all)]
Unknown(u16),
}
/// Raw 14-byte frame attributes field in every [`LpFrameHeader`].
/// Interpretation depends on the [`LpFrameKind`].
pub type LpFrameAttributes = [u8; 14];
#[derive(Debug, Clone, PartialEq)]
pub struct LpFrameHeader {
pub kind: LpFrameKind,
@@ -15,10 +37,10 @@ pub struct LpFrameHeader {
impl LpFrameHeader {
pub const SIZE: usize = 16; // message_kind(2) + message_attributes(14)
pub fn new(kind: LpFrameKind, frame_attributes: LpFrameAttributes) -> Self {
pub fn new(kind: LpFrameKind, frame_attributes: impl Into<LpFrameAttributes>) -> Self {
Self {
kind,
frame_attributes,
frame_attributes: frame_attributes.into(),
}
}
@@ -31,7 +53,7 @@ impl LpFrameHeader {
/// Encode directly into a BytesMut buffer
pub fn encode(&self, dst: &mut BytesMut) {
dst.put_u16_le(self.kind as u16);
dst.put_u16_le(self.kind.into());
dst.put_slice(&self.frame_attributes);
}
@@ -41,8 +63,7 @@ impl LpFrameHeader {
}
let raw_kind = u16::from_le_bytes([src[0], src[1]]);
let kind = LpFrameKind::try_from(raw_kind)
.map_err(|_| MalformedLpPacketError::invalid_data_kind(raw_kind))?;
let kind = LpFrameKind::from(raw_kind);
#[allow(clippy::unwrap_used)]
let message_attributes = src[2..16].try_into().unwrap();
@@ -60,12 +81,6 @@ pub struct LpFrame {
pub content: Bytes,
}
impl AsRef<[u8]> for LpFrame {
fn as_ref(&self) -> &[u8] {
&self.content
}
}
impl LpFrame {
pub fn new(kind: LpFrameKind, content: impl Into<Bytes>) -> Self {
Self {
@@ -74,6 +89,17 @@ impl LpFrame {
}
}
pub fn new_with_attributes(
kind: LpFrameKind,
attrs: impl Into<LpFrameAttributes>,
content: impl Into<Bytes>,
) -> Self {
Self {
header: LpFrameHeader::new(kind, attrs),
content: content.into(),
}
}
pub fn encode(&self, dst: &mut BytesMut) {
self.header.encode(dst);
@@ -87,6 +113,12 @@ impl LpFrame {
Ok(Self { header, content })
}
pub fn to_bytes(self) -> Vec<u8> {
let mut bytes = BytesMut::new();
self.encode(&mut bytes);
bytes.freeze().to_vec()
}
pub fn kind(&self) -> LpFrameKind {
self.header.kind
}
@@ -110,21 +142,13 @@ impl LpFrame {
}
}
pub(crate) fn len(&self) -> usize {
// is_empty in the sense len == 0 doesn't make sense in that case
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> usize {
LpFrameHeader::SIZE + self.content.len()
}
}
/// Represent kind of application data being sent in Transport mode
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, TryFromPrimitive)]
#[repr(u16)]
pub enum LpFrameKind {
Opaque = 0,
Registration = 1,
Forward = 2,
SphinxStream = 3,
}
/// Message type within a `LpFrameKind::SphinxStream` frame.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
@@ -151,10 +175,6 @@ pub struct SphinxStreamFrameAttributes {
pub sequence_num: u32,
}
/// Raw 14-byte frame attributes field in every [`LpFrameHeader`].
/// Interpretation depends on the [`LpFrameKind`].
pub type LpFrameAttributes = [u8; 14];
impl SphinxStreamFrameAttributes {
pub fn encode(&self) -> LpFrameAttributes {
let mut buf = [0u8; 14];
@@ -165,6 +185,8 @@ impl SphinxStreamFrameAttributes {
}
pub fn parse(attrs: &LpFrameAttributes) -> Result<Self, MalformedLpPacketError> {
// SAFETY : 8 bytes slice into 8 bytes array
#[allow(clippy::unwrap_used)]
let stream_id = u64::from_be_bytes(attrs[0..8].try_into().unwrap());
let msg_type = match attrs[8] {
0 => SphinxStreamMsgType::Open,
@@ -175,6 +197,8 @@ impl SphinxStreamFrameAttributes {
)));
}
};
// SAFETY : 4 bytes slice into 4 bytes array
#[allow(clippy::unwrap_used)]
let sequence_num = u32::from_be_bytes(attrs[9..13].try_into().unwrap());
Ok(Self {
stream_id,
@@ -1,11 +1,13 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::packet::error::MalformedLpPacketError;
use crate::packet::version;
use crate::{packet::error::MalformedLpPacketError, peer_config::LpReceiverIndex};
use bytes::{BufMut, BytesMut};
use tracing::warn;
pub type LpReceiverIndex = u32;
/// Outer header (12 bytes) - always cleartext, used for routing.
///
/// This is the first 12 bytes of every LP packet, containing only the fields
@@ -118,6 +120,8 @@ pub struct LpHeader {
}
impl LpHeader {
pub const SIZE: usize = OuterHeader::SIZE + InnerHeader::SIZE;
pub fn new(receiver_idx: LpReceiverIndex, counter: u64, protocol_version: u8) -> Self {
Self {
outer: OuterHeader {

Some files were not shown because too many files have changed in this diff Show More