Compare commits

...

43 Commits

Author SHA1 Message Date
Mark Sinclair e28f5809ff nym-data-observatory: bump version 2026-03-17 15:46:44 +00:00
Mark Sinclair 598a9341a4 nym-data-observatory: add CLI command to process blocks from a range 2026-03-17 15:26:56 +00:00
dynco-nym 8dc3ba4ec3 Add LP to NS UI (#6562)
* Add LP column to gateway view

* Add LP to graphs
2026-03-16 14:07:19 +01:00
Lawrence Stalder 712e3f5183 Change runner from ubuntu-latest to arc-linux-latest 2026-03-13 14:25:33 +01:00
Lawrence Stalder 5229df47ab Change runner to arc-linux-latest for SonarQube job 2026-03-13 14:24:04 +01:00
Lawrence Stalder 32cffed36b Change runner from ubuntu-latest to arc-linux-latest 2026-03-13 14:16:42 +01:00
Jędrzej Stuczyński 49c710e651 feat: nyxd watcher (#6561)
* removed explicit storage_tx within MsgModule, TxModule and BlockModule impls

* created a NyxdWatcher that does not persist processed block info

* removed unused imports
2026-03-13 13:15:36 +00:00
Lawrence Stalder 0a5227a894 Remove cron schedule from nightly-build workflow
Removed scheduled cron job from nightly build workflow.
2026-03-13 14:01:02 +01:00
mfahampshire b231eb4f04 Max/asyncread asyncwrite nym client (#6318)
* Remove AsyncRead/Write traits from native client - moving them to
stream/

* Substream model first push

* Update / add examples

* Update lockfile

* Clippy

* clippy examples

* remove codecs

* Remove unused bincode setup

* Revert a lot of changes when SDK client itself implemented
AsyncRead/Write

* Remove unnecessary mut

* Use local PollSender in MixnetStream instead of client_input.input_sender

Now that client-core's input_sender is back to mpsc::Sender (reverted
PollSender migration), MixnetStream creates its own PollSender wrapper
for the AsyncWrite impl's poll_ready/start_send calls.

* Remove now-unnecessary parameter

* Clippy

* Cleanup more stragglers from previous setup (Async traits on
MixnetClient)

* Rename files (remove module inception)

* - Shrink StreamId from 16 bytes to u64, add version byte to wire format
  - Introduce MixStreamHeader/MixStreamFrame structs for decode
  - Replace StreamMap type alias with struct using tokio::sync::Mutex
  - Add StreamMap helper methods, eliminate lock().expect() panics
  - Rename stream.rs -> mixnet_stream.rs to avoid module inception
  - Document irrevocable stream mode, add LP integration TODO

* - Remove dummy channel
- Add err variant for reciever alredy taken
- Remove panics

* add timeout to stream

* clippy
2026-03-13 09:40:45 +00:00
mfahampshire fdd2c8fac2 update nymvpn cli docs (#6559)
* update nymvpn cli docs

* update nymvpn cli docs again
2026-03-12 16:32:39 +00:00
Jędrzej Stuczyński e2dd8ac743 feat: localnet v2 (#6277)
* squashing localnet-v2 commits (again)

cargo fmt

fixes to localnet purge

provide path in the error message

output args

log failed exec

print based on tty

check-prerequisites cmd

checked iptables update

basic kernel features check

enable ipv6 rules

add forwarding rules

squashing localnet-v2 commits

additional changes

propagate custom-dns flag to all run containers

remove is_mock from EcashManager

another localnet squash

unused import

chore: remove redundant testnet manager

missing impl

additional linux fixes

command to rebuild container image

wait for at least 2 blocks

additional node startup fixes

added --custom-dns flag to nym node setup

add gateway probe + wait for DKG magic file

fixed localnet down on linux

container ls

re-enable state resync

additional feature locking

macos adjustments

working nyxd startup on linux

wip linux box

wip

separating network inspect betweewn macos and linux

initial linux feature locking

moved all container commands into a single location

finally working initial node performance

squashing orchestrator commits

cleanup

fixed condition for naive rearrangement

added cache of cosmwasm contracts for speed up on subsequent runs

'down' command

refreshing described cache after nodes are bonded

nym nodes setup + wip on nym api refresh

nodes setup WIP

first pass cleanup

placeholder for nym-node setup

bypassing the dkg

further progress on nym-api setup

wip: api setup

up/down/purge placeholders

persisting contract setup data

fix contract upload by forcing amd64 container platform

wip: contracts setup4

wip: contracts setup3

wip: contracts setup2

wip: contracts setup

include network setup

init and spawn nyxd

build nyxd image in dedicated orchestrator

build nyxd image

squashed cherry-picked lp changes

Bits and bobs to make everything work

Title

MacOS setup instructions

Docker/Container localnet

* clippy

* fixes on non-unix targets

---------

Co-authored-by: durch <durch@users.noreply.github.com>
2026-03-12 14:46:00 +00:00
import this 8001fa7f40 [HOTFIX/DOCs]: Get Vercel deployment to work (#6557)
* try rebuild

* update package.json
2026-03-12 13:53:04 +00:00
dynco-nym 80370b98ec Additional ticket for agent (#6551)
* Additional ticket type for LP tests

* Remove hardcoded comments

* bump cargo version

* Nuke fallback edge case in the probe

* Cleanup unused code

* Bump API & agent versions
- agent bump required due to probe changes
2026-03-12 14:49:03 +01:00
import this 3524089ad8 [DOCs/operators]: Release notes for v2026.5 raclette (#6556)
* update changelog

* bump up versions

* bump up stats

* update stats

* rephrase probe info
2026-03-12 13:05:25 +00:00
mfahampshire ec7ee49282 Version bump (#6553)
* Version bump

* update docs dep
2026-03-12 10:40:07 +00:00
import this 653d1c2dea [NTM]: Open ports according to NIP-8 and NIP-9 (#6545)
* add nip-9 to NTM

* update ntm nip 8

* fix symbol syntax
2026-03-12 10:10:50 +00:00
mfahampshire b579f987b1 Max/mixfetch concurrentcy tweak (#6539)
* Remove debug connect logging

* Add random suffix to addressmapping for concurrent outgoing requests to
same endpoint

* Comment + renaming + pulling apart of mapping key & URL.

* Add certs file + remove hardcoding + add certs script

* Add cleanbuild helper script

* Update DEVELOPERS.md

* Add cleanbuild script info to DEVELOPERS.md

* Remove notice about blocking on concurrent same endpoint reqs
2026-03-11 18:42:07 +00:00
Jędrzej Stuczyński 59254c92c3 bugfix: make sure to use old values from metrics debug config during v12 migration (#6546) (#6547) 2026-03-11 08:33:53 +00:00
Simon Wicky 69887921cc typo (#6543) 2026-03-10 16:27:02 +01:00
import this e075b07632 Hotfix: Add a missing commit with an ansible role (#6542)
* Create ansible playbook for trimming and rotationg logs

* add docs for triming and log rotation

* update ansible docs

* add info on logic

* cleanup the cleanup guide

* update scraped stats

* ready for review

* address review

* add main default values
2026-03-10 14:23:02 +00:00
import this d32b680351 Server Ansible maintenance & documentation (#6514)
* Create ansible playbook for trimming and rotationg logs

* add docs for triming and log rotation

* update ansible docs

* add info on logic

* cleanup the cleanup guide

* update scraped stats

* ready for review

* address review
2026-03-10 13:28:39 +00:00
Simon Wicky fcd59a19be rng changes for a Send variant (#6541) 2026-03-10 13:43:49 +01:00
dynco-nym 08b20ac2ab Add LP fields (#6535)
* Add lp field to /dvpn/gateways

* Expand unit tests

* Add lp ports, keys, hashes

* Include the whole struct

* Update Toml version
2026-03-10 13:06:56 +01:00
Jędrzej Stuczyński 4c007669f9 chore: update ts-rs dep (#6517) 2026-03-10 11:51:30 +00:00
benedetta davico c3a8fa8d0d Merge pull request #6536 from nymtech/release/2026.5-raclette
Raclette to develop
2026-03-10 12:06:56 +01:00
Simon Wicky d8769157fd enable LP registration in registration client (#6534) 2026-03-10 11:35:48 +01:00
benedettadavico 7cccf3cfff update changelog 2026-03-10 10:46:12 +01:00
Jędrzej Stuczyński 02eec164f8 bugfix: lp information to have proper snake_case on API endpoints (#6531) 2026-03-09 14:56:31 +00:00
Jędrzej Stuczyński 4f13ab1e0a Merge pull request #6532 from nymtech/chore/reg-metrics
chore: introduce additional prometheus metrics for registration times
2026-03-09 14:56:18 +00:00
benedetta davico a34c7ef19f Merge pull request #6533 from nymtech/bugfix/lp-gateway-probe
bugfix: correctly populate gateway probe LP data
2026-03-09 15:55:00 +01:00
Jędrzej Stuczyński f00b18298c bugfix: correctly populate gateway probe LP data 2026-03-09 14:10:24 +00:00
Jędrzej Stuczyński 0426adc94e chore: introduce additional prometheus metrics for registration times 2026-03-09 13:50:46 +00:00
Jędrzej Stuczyński 4b4a2fe387 Merge pull request #6530 from nymtech/chore/rename-lp-message
chore: rename LpMessage to LpFrame
2026-03-09 13:44:39 +00:00
Jędrzej Stuczyński 1ebb7e06c7 chore: rename LpMessage to LpFrame 2026-03-09 13:21:39 +00:00
Jędrzej Stuczyński 1fd17c5cb3 Merge pull request #6526 from nymtech/chore/lp-improvements
chore: LP improvements
2026-03-09 10:57:26 +00:00
Jędrzej Stuczyński ef65cf4c9e additional adjustments 2026-03-06 16:34:42 +00:00
Jędrzej Stuczyński 48dad0f16b bugfix: setting correct LpPeerConfig during handshake 2026-03-06 16:09:28 +00:00
Jędrzej Stuczyński 93ac638765 importing over changes from 'lp/persistent-node-connection' 2026-03-06 16:07:35 +00:00
Jędrzej Stuczyński c6589ca92c chore: add unit test for mutual KKT 2026-03-06 15:40:13 +00:00
Sachin Kamath 03d5a87826 Merge pull request #6525 from nymtech/readme-midnight-attribution
chore: add midnight attribution
2026-03-06 16:47:16 +05:30
Sachin Kamath 512cfd1b74 chore: add midnight attribution 2026-03-06 16:40:45 +05:30
Bogdan-Ștefan Neacşu ba0625cd97 Remove dep leak of strum iterator (#6522)
strum iterator over an enum leaks the version needed to iterate over it,
which can cause problems to dependent crates that use a different strum
version.

While at it, bump the strum crates as well
2026-03-06 10:44:14 +02:00
mfahampshire a2c489dc5b Max/sitemap generation fix (#6515)
* Tweak README ordering

* Linting

* Add sitemap generation + NEXT env var to CI

* Update domain for sitemap generation

* Inc. sitemap -0

* test remove lockfile

* fix borked name in package

* add redoc

* add framer

* Add pnpm-lock file

* Add sitemap to remote + ci workflow

* remove extra sitemap

* remove static files from remote for vercel

* add sitemap gen to next build step for vercel
2026-03-04 16:01:51 +00:00
363 changed files with 25187 additions and 13156 deletions
+1 -1
View File
@@ -7,7 +7,7 @@ jobs:
build:
runs-on: arc-ubuntu-22.04
env:
NEXT_PUBLIC_SITE_URL: https://nymtech.net/docs
NEXT_PUBLIC_SITE_URL: https://nym.com/docs
defaults:
run:
working-directory: documentation/docs
+2
View File
@@ -48,6 +48,8 @@ jobs:
run: pnpm i
- name: Build project
run: pnpm run build
- name: Generate sitemap
run: npx next-sitemap
- name: Move files to /dist/
run: ../scripts/move-to-dist.sh
+1 -1
View File
@@ -8,7 +8,7 @@ on:
jobs:
sonarqube:
name: SonarQube
runs-on: ubuntu-latest
runs-on: arc-linux-latest
steps:
- uses: actions/checkout@v6
with:
-2
View File
@@ -2,8 +2,6 @@ name: nightly-build
on:
workflow_dispatch:
schedule:
- cron: '14 1 * * *'
jobs:
build:
@@ -9,7 +9,7 @@ on:
jobs:
integration-tests:
runs-on: ubuntu-latest
runs-on: arc-linux-latest
env:
API_BASE_URL: http://localhost:8000
+1 -1
View File
@@ -23,7 +23,7 @@ env:
jobs:
check-milestone:
name: Check Milestone
runs-on: ubuntu-latest
runs-on: arc-linux-latest
steps:
- if: github.event.pull_request.milestone == null && contains( env.LABELS, 'no-milestone' ) == false
run: exit 1
+3
View File
@@ -36,6 +36,9 @@ jobs:
with:
go-version: "1.24.6"
- name: Update root CA certificate bundle
run: ./wasm/mix-fetch/go-mix-conn/scripts/update-root-certs.sh
- name: Install dependencies
run: yarn
+46
View File
@@ -4,6 +4,52 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2026.5-raclette] (2026-03-10)
- bugfix: correctly populate gateway probe LP data ([#6533])
- chore: introduce additional prometheus metrics for registration times ([#6532])
- bugfix: lp information to have proper snake_case on API endpoints ([#6531])
- removed redundant LP states ([#6509])
- chore: removed all matrix notifications from github actions ([#6495])
- feat: Lewes Protocol with PSQv2 ([#6491])
- build(deps): bump minimatch from 3.1.2 to 3.1.4 in /documentation/docs ([#6486])
- build(deps): bump bn.js from 4.12.2 to 4.12.3 in /documentation/docs ([#6484])
- build(deps): bump bn.js from 4.12.2 to 4.12.3 ([#6483])
- build(deps): bump ajv from 8.17.1 to 8.18.0 in /clients/native/examples/js-examples/websocket ([#6478])
- build(deps): bump ajv from 6.12.6 to 6.14.0 in /documentation/docs ([#6477])
- build(deps): bump minimatch and glob in /documentation/scripts/post-process ([#6476])
- build(deps): bump hono from 4.11.9 to 4.12.0 ([#6475])
- build(deps): bump keccak from 0.1.5 to 0.1.6 ([#6472])
- build(deps-dev): bump qs from 6.14.1 to 6.14.2 in /clients/native/examples/js-examples/websocket ([#6466])
- build(deps): bump mikefarah/yq from 4.52.2 to 4.52.4 ([#6465])
- Otel minimal v2 ([#6464])
- build(deps): bump qs and express in /wasm/client/internal-dev ([#6461])
- bugfix: restore 'latest_measurement' field for nym-node /verloc endpoint ([#6452])
- build(deps-dev): bump webpack from 5.77.0 to 5.104.1 in /wasm/node-tester/internal-dev ([#6451])
- Max/mixfetch concurrent test ([#6417])
[#6533]: https://github.com/nymtech/nym/pull/6533
[#6532]: https://github.com/nymtech/nym/pull/6532
[#6531]: https://github.com/nymtech/nym/pull/6531
[#6509]: https://github.com/nymtech/nym/pull/6509
[#6495]: https://github.com/nymtech/nym/pull/6495
[#6491]: https://github.com/nymtech/nym/pull/6491
[#6486]: https://github.com/nymtech/nym/pull/6486
[#6484]: https://github.com/nymtech/nym/pull/6484
[#6483]: https://github.com/nymtech/nym/pull/6483
[#6478]: https://github.com/nymtech/nym/pull/6478
[#6477]: https://github.com/nymtech/nym/pull/6477
[#6476]: https://github.com/nymtech/nym/pull/6476
[#6475]: https://github.com/nymtech/nym/pull/6475
[#6472]: https://github.com/nymtech/nym/pull/6472
[#6466]: https://github.com/nymtech/nym/pull/6466
[#6465]: https://github.com/nymtech/nym/pull/6465
[#6464]: https://github.com/nymtech/nym/pull/6464
[#6461]: https://github.com/nymtech/nym/pull/6461
[#6452]: https://github.com/nymtech/nym/pull/6452
[#6451]: https://github.com/nymtech/nym/pull/6451
[#6417]: https://github.com/nymtech/nym/pull/6417
## [2026.4-quark] (2026-02-24)
- Enhance CI workflow with feature inputs ([#6462])
Generated
+530 -156
View File
File diff suppressed because it is too large Load Diff
+12 -8
View File
@@ -157,8 +157,8 @@ members = [
"tools/internal/mixnet-connectivity-check",
# "tools/internal/sdk-version-bump",
"tools/internal/ssl-inject",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/internal/localnet-orchestrator",
"tools/internal/localnet-orchestrator/dkg-bypass-contract",
"tools/internal/validator-status-check",
"tools/nym-cli",
"tools/nym-id-cli",
@@ -171,9 +171,10 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
# "nym-gateway-probe",
"nym-gateway-probe",
"integration-tests",
"common/nym-kkt-ciphersuite", "common/nym-kkt-context",
"common/nym-kkt-ciphersuite",
"common/nym-kkt-context",
]
default-members = [
@@ -190,7 +191,8 @@ default-members = [
"service-providers/ip-packet-router",
"service-providers/network-requester",
"tools/nymvisor",
"nym-registration-client"
"nym-registration-client",
"tools/internal/localnet-orchestrator"
]
exclude = ["contracts", "nym-wallet", "cpu-cycles"]
@@ -232,6 +234,7 @@ bloomfilter = "3.0.1"
bs58 = "0.5.1"
bytecodec = "0.4.15"
bytes = "1.11.1"
cargo-edit = "0.13.8"
cargo_metadata = "0.19.2"
celes = "2.6.0"
cfg-if = "1.0.0"
@@ -347,8 +350,8 @@ si-scale = "0.2.3"
snow = "0.9.6"
sphinx-packet = "=0.6.0"
sqlx = "0.8.6"
strum = "0.27.2"
strum_macros = "0.27.2"
strum = "0.28.0"
strum_macros = "0.28.0"
subtle-encoding = "0.5"
syn = "2"
sysinfo = "0.37.0"
@@ -375,7 +378,7 @@ tracing-opentelemetry = "0.32.1"
tracing-subscriber = "0.3.20"
tracing-indicatif = "0.3.9"
tracing-test = "0.2.5"
ts-rs = "10.1.0"
ts-rs = "12.0.1"
tungstenite = { version = "0.20.1", default-features = false }
typed-builder = "0.23.0"
uniffi = "0.29.2"
@@ -438,6 +441,7 @@ nym-ecash-time = { version = "1.20.4", path = "common/ecash-time" }
nym-exit-policy = { version = "1.20.4", path = "common/exit-policy" }
nym-ffi-shared = { version = "1.20.4", path = "sdk/ffi/shared" }
nym-gateway-client = { version = "1.20.4", path = "common/client-libs/gateway-client", default-features = false }
nym-gateway-probe = { version = "1.18.0", path = "nym-gateway-probe" }
nym-gateway-requests = { version = "1.20.4", path = "common/gateway-requests" }
nym-gateway-storage = { version = "1.20.4", path = "common/gateway-storage" }
nym-gateway-stats-storage = { version = "1.20.4", path = "common/gateway-stats-storage" }
+3
View File
@@ -30,8 +30,11 @@ client ───► Gateway ──┘ mix │ mix ┌─►mix ───►
```
<!-- This is broken
[![Build Status](https://img.shields.io/github/actions/workflow/status/nymtech/nym/build.yml?branch=develop&style=for-the-badge&logo=github-actions)](https://github.com/nymtech/nym/actions?query=branch%3Adevelop)
-->
> This project integrates with the Midnight Network
### Building
+89 -13
View File
@@ -1,32 +1,38 @@
---
ansible_ssh_private_key_file: ~/.ssh/<SSH_KEY>
# nym_version: "v2025.21-mozzarella"
#
# NOTE:
# if you want to pin Nym to a specific version instead of using the
# latest release from GitHub in /tasks/main.yml then
# uncomment the line above and set the tag
cli_url: "https://github.com/nymtech/nym/releases/download/nym-binaries-{{ nym_version }}/nym-cli"
tunnel_manager_url: "https://github.com/nymtech/nym/raw/refs/heads/develop/scripts/nym-node-setup/network-tunnel-manager.sh"
quic_bridge_deployment_url: "https://raw.githubusercontent.com/nymtech/nym/refs/heads/develop/scripts/nym-node-setup/quic_bridge_deployment.sh"
# NOTE: These values will be used globally unless overwritten per node in inventory/all
###############################################################################
## GLOBAL VARS
## These values will be used globally unless overwritten per node in inventory/all
###############################################################################
ansible_user: root # used for ssh, like `ssh root@nym-exit.ch-1.mynodes.net`
email: "<EMAIL>" # used in certbot, description.toml and landing page
website: "<WEBSITE>" # it is used in the description.toml
description: "<NODE_PUBLIC_DESCRIPTION>" # or define per node in inventory/all
# operator_name: "<OPERATOR_NAME>" # used in landing page if provided
###############################################################################
## GLOBAL VARS
## These values will be used globally unless overwritten per node in inventory/all
## Set these vars only if you want them globally for all nodes
## Per node changes in inventory/all will overwrite these global vars
###############################################################################
# NOTE: Set these vars if you want them globally for all nodes
# Per node changes in inventory/all will overwrite these global ones:
hostname: "" # this is a fallback, keep it and setup hostname per node in inventory/all
# moniker: "<MONIKER>" # if not setup here not in inventory/all it get's derived from the hostname
# mode: <MODE> # entry-gateway/exit-gateway/mixnode
# wireguard_enabled: <WIREGUARD_ENABLED> # true/false
hostname: "" # this is a fallback, keep it and setup hostname per node in inventory/all
###############################################################################
## GLOBAL PACKAGES
## These will be installed during deployment
###############################################################################
# NOTE: Possible vars to incule on landing page, etc.
# operator_name: "<OPERATOR_NAME>"
packages:
- tmux
@@ -42,3 +48,73 @@ packages:
- jq
- wget
- ufw
###############################################################################
## OPTIONAL OVERRIDES
## All values below already have defaults in the playbook/roles
## Uncomment only if you want to override them
###############################################################################
###############################################################################
## SYSTEM MAINTENANCE PLAYBOOK KNOBS
###############################################################################
# nym_version: "v2025.21-mozzarella"
## NOTE:
## if you want to pin Nym to a specific version instead of using the
## latest release from GitHub in /tasks/main.yml then
## uncomment the line above and set the tag
###############################################################################
## SYSTEM MAINTENANCE PLAYBOOK KNOBS
###############################################################################
## JOURNALD LIMITS
# journald_system_max_use: "100M" # max persistent journal size
# journald_runtime_max_use: "50M" # max runtime journal size
# journald_system_max_file_size: "25M" # max single journal file
# journald_runtime_max_file_size: "10M" # max runtime journal file
# journald_max_retention_sec: "3day" # retention time
# journald_rate_limit_interval: "30s" # rate limit window
# journald_rate_limit_burst: "1000" # rate limit burst
## NYM-NODE LOG CONTROL
# nymnode_log_level_max: "warning" # drop INFO logs
# nymnode_rate_limit_interval: "30s" # per nym-node rate limit window
# nymnode_rate_limit_burst: "200" # per nym-node rate limit burst
## JOURNAL VACUUM TARGETS
# journal_vacuum_size: "100M"
# journal_vacuum_time: "3days"
## RSYSLOG
# disable_rsyslog: true
## FSTRIM SCHEDULE
# fstrim_every_calendar: "*:0/15" # Aggressive
# fstrim_every_calendar: "hourly" # Less aggressive
## OPTIONAL CLEANUPS
# enable_apt_cleanup: true
# enable_snap_cleanup: true
## WRITEBACK TUNING
# enable_writeback_tuning: true
# writeback_dirty_writeback_centisecs: 1500
# writeback_dirty_expire_centisecs: 6000
@@ -0,0 +1,38 @@
---
- name: Restrict logging, vacuum journals, and enable periodic trim
hosts: all
become: true
gather_facts: false
# global knobs - override in inventory/group_vars/host_vars as needed
vars:
journald_system_max_use: "100M"
journald_runtime_max_use: "50M"
journald_system_max_file_size: "25M"
journald_runtime_max_file_size: "10M"
journald_max_retention_sec: "3day"
journald_rate_limit_interval: "30s"
journald_rate_limit_burst: "1000"
# per nym-node rate limit + level cap
nymnode_log_level_max: "warning"
nymnode_rate_limit_interval: "30s"
nymnode_rate_limit_burst: "200"
# journal vacuum targets
journal_vacuum_size: "100M"
journal_vacuum_time: "3days"
# fstrim cadence (note: the systemd override uses cron-like calendar)
fstrim_every_calendar: "*:0/15"
roles:
- role: journald_limits
- role: nymnode_logging
- role: rsyslog_disable
- role: journal_vacuum
- role: classic_log_cleanup
- role: apt_cleanup
- role: snap_cleanup
- role: fstrim_15min
- role: report
@@ -0,0 +1,21 @@
---
- name: Clean apt cache
command: apt-get clean
ignore_errors: true
- name: Autoremove unused packages
command: apt-get -y autoremove
ignore_errors: true
- name: Remove apt lists to reclaim space (they will be re-fetched on update)
file:
path: /var/lib/apt/lists
state: absent
ignore_errors: true
- name: Recreate apt lists directory
file:
path: /var/lib/apt/lists
state: directory
mode: "0755"
ignore_errors: true
@@ -0,0 +1,20 @@
---
- name: Remove classic /var/log files if present (optional)
file:
path: "{{ item }}"
state: absent
loop:
- /var/log/syslog
- /var/log/syslog.1
- /var/log/kern.log
- /var/log/kern.log.1
- /var/log/auth.log
- /var/log/auth.log.1
- /var/log/ufw.log
- /var/log/ufw.log.1
ignore_errors: true
# This is best-effort and may still fail if other packages' postrotate scripts assume services exist.
- name: Force logrotate (best-effort)
command: "logrotate --force /etc/logrotate.conf"
ignore_errors: true
@@ -0,0 +1,3 @@
---
fstrim_timer_dropin_dir: "/etc/systemd/system/fstrim.timer.d"
fstrim_every_calendar: "*:0/15"
@@ -0,0 +1,31 @@
---
- name: Ensure systemd drop-in dir for fstrim.timer exists
file:
path: "{{ fstrim_timer_dropin_dir }}"
state: directory
mode: "0755"
- name: Override fstrim.timer schedule
copy:
dest: "{{ fstrim_timer_dropin_dir }}/override.conf"
mode: "0644"
content: |
[Timer]
OnCalendar=
OnCalendar={{ fstrim_every_calendar }}
Persistent=true
RandomizedDelaySec=0
- name: Reload systemd after fstrim override
systemd:
daemon_reload: true
- name: Enable and start fstrim timer
systemd:
name: fstrim.timer
enabled: true
state: started
- name: Run fstrim now (best-effort)
command: fstrim -av
ignore_errors: true
@@ -0,0 +1,3 @@
---
journal_vacuum_size: "100M"
journal_vacuum_time: "3days"
@@ -0,0 +1,6 @@
---
- name: Vacuum journal to size cap (hard)
command: "journalctl --vacuum-size={{ journal_vacuum_size }}"
- name: Vacuum journal older than retention window (time)
command: "journalctl --vacuum-time={{ journal_vacuum_time }}"
@@ -0,0 +1,8 @@
---
journald_system_max_use: "100M"
journald_runtime_max_use: "50M"
journald_system_max_file_size: "25M"
journald_runtime_max_file_size: "10M"
journald_max_retention_sec: "3day"
journald_rate_limit_interval: "30s"
journald_rate_limit_burst: "1000"
@@ -0,0 +1,5 @@
---
- name: Restart journald
systemd:
name: systemd-journald
state: restarted
@@ -0,0 +1,20 @@
---
- name: Configure journald limits (persistent, capped, rate-limited)
copy:
dest: /etc/systemd/journald.conf
mode: "0644"
content: |
[Journal]
Storage=persistent
Compress=yes
Seal=yes
SystemMaxUse={{ journald_system_max_use }}
RuntimeMaxUse={{ journald_runtime_max_use }}
SystemMaxFileSize={{ journald_system_max_file_size }}
RuntimeMaxFileSize={{ journald_runtime_max_file_size }}
MaxRetentionSec={{ journald_max_retention_sec }}
RateLimitIntervalSec={{ journald_rate_limit_interval }}
RateLimitBurst={{ journald_rate_limit_burst }}
notify: Restart journald
@@ -0,0 +1,7 @@
---
nymnode_log_level_max: "warning"
nymnode_rate_limit_interval: "30s"
nymnode_rate_limit_burst: "200"
nymnode_unit_name: "nym-node" # set to "nym-node.service" if your distro expects it
nymnode_dropin_dir: "/etc/systemd/system/nym-node.service.d"
nymnode_dropin_file: "10-logging.conf"
@@ -0,0 +1,26 @@
---
- name: Ensure systemd drop-in dir for nym-node exists
file:
path: "{{ nymnode_dropin_dir }}"
state: directory
mode: "0755"
- name: Cap nym-node logs + apply per-unit rate limiting
copy:
dest: "{{ nymnode_dropin_dir }}/{{ nymnode_dropin_file }}"
mode: "0644"
content: |
[Service]
LogLevelMax={{ nymnode_log_level_max }}
LogRateLimitIntervalSec={{ nymnode_rate_limit_interval }}
LogRateLimitBurst={{ nymnode_rate_limit_burst }}
- name: Reload systemd after nym-node drop-in
systemd:
daemon_reload: true
- name: Restart nym-node to apply new logging limits (best-effort)
systemd:
name: "{{ nymnode_unit_name }}"
state: restarted
ignore_errors: true
@@ -0,0 +1,8 @@
---
- name: Show journal disk usage
command: journalctl --disk-usage
register: journal_usage
changed_when: false
- debug:
var: journal_usage.stdout
@@ -0,0 +1,13 @@
---
- name: Stop/disable rsyslog if installed (best-effort)
systemd:
name: rsyslog
state: stopped
enabled: false
ignore_errors: true
- name: Remove rsyslog logrotate stanza if present (prevents logrotate failures)
file:
path: /etc/logrotate.d/rsyslog
state: absent
ignore_errors: true
@@ -0,0 +1,10 @@
---
- name: Remove disabled snap revisions (best-effort)
shell: |
set -euo pipefail
snap list --all | awk '/disabled/{print $1, $3}' | while read -r name rev; do
snap remove "$name" --revision="$rev" || true
done
args:
executable: /bin/bash
ignore_errors: true
+1 -1
View File
@@ -2,7 +2,7 @@
name = "nym-client-core"
version.workspace = true
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
edition = "2024"
rust-version = "1.85"
license.workspace = true
description = "Crate containing core client functionality and configs, used by all other Nym client implentations"
@@ -32,6 +32,7 @@ const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
const DEFAULT_MAX_STARTUP_GATEWAY_WAITING_PERIOD: Duration = Duration::from_secs(70 * 60); // 70min -> full epoch (1h) + a bit of overhead
const DEFAULT_MAX_STARTUP_TOPOLOGY_WAITING_PERIOD: Duration = Duration::from_secs(70 * 60); // 70min -> full epoch (1h) + a bit of overhead
// Set this to a high value for now, so that we don't risk sporadic timeouts that might cause
// bought bandwidth tokens to not have time to be spent; Once we remove the gateway from the
@@ -555,6 +556,11 @@ pub struct Topology {
#[serde(with = "humantime_serde")]
pub max_startup_gateway_waiting_period: Duration,
/// Defines how long the client is going to wait on startup for minimal topology to become online,
/// before abandoning the procedure.
#[serde(with = "humantime_serde")]
pub max_startup_network_waiting_period: Duration,
/// Specifies a minimum performance of a mixnode that is used on route construction.
/// This setting is only applicable when `NymApi` topology is used.
pub minimum_mixnode_performance: u8,
@@ -583,6 +589,7 @@ impl Default for Topology {
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
max_startup_gateway_waiting_period: DEFAULT_MAX_STARTUP_GATEWAY_WAITING_PERIOD,
max_startup_network_waiting_period: DEFAULT_MAX_STARTUP_TOPOLOGY_WAITING_PERIOD,
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
use_extended_topology: false,
@@ -159,6 +159,7 @@ impl From<ConfigV6> for Config {
use_extended_topology: value.debug.topology.use_extended_topology,
ignore_egress_epoch_role: value.debug.topology.ignore_egress_epoch_role,
ignore_ingress_epoch_role: value.debug.topology.ignore_ingress_epoch_role,
..Default::default()
},
reply_surbs: ReplySurbs {
minimum_reply_surb_storage_threshold: value
@@ -160,7 +160,10 @@ where
)
.await?;
} else {
info!("registered with new gateway {} (under address {address}), but this will not be our default address", gateway_details.gateway_id);
info!(
"registered with new gateway {} (under address {address}), but this will not be our default address",
gateway_details.gateway_id
);
}
Ok(GatewayInfo {
@@ -4,13 +4,13 @@
use super::mix_traffic::ClientRequestSender;
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
use crate::client::event_control::EventControl;
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
use crate::client::real_messages_control;
@@ -52,12 +52,12 @@ use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
use nym_statistics_common::clients::ClientStatsSender;
use nym_statistics_common::generate_client_stats_id;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_task::ShutdownTracker;
use nym_topology::provider_trait::TopologyProvider;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_topology::HardcodedTopologyProvider;
use nym_topology::provider_trait::TopologyProvider;
use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use nym_validator_client::{UserAgent, nyxd::contract_traits::DkgQueryClient};
use rand::prelude::SliceRandom;
use rand::rngs::OsRng;
use rand::thread_rng;
@@ -220,6 +220,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
nym_api_urls: Option<Vec<nym_network_defaults::ApiUrl>>,
wait_for_gateway: bool,
wait_for_initial_topology: bool,
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
shutdown: Option<ShutdownTracker>,
@@ -250,6 +251,7 @@ where
dkg_query_client,
nym_api_urls: None,
wait_for_gateway: false,
wait_for_initial_topology: false,
custom_topology_provider: None,
custom_gateway_transceiver: None,
shutdown: None,
@@ -305,6 +307,12 @@ where
self
}
#[must_use]
pub fn with_wait_for_initial_topology(mut self, wait_for_initial_topology: bool) -> Self {
self.wait_for_initial_topology = wait_for_initial_topology;
self
}
#[must_use]
pub fn with_topology_provider(
mut self,
@@ -674,6 +682,7 @@ where
topology_accessor: TopologyAccessor,
local_gateway: NodeIdentity,
wait_for_gateway: bool,
wait_for_initial_topology: bool,
shutdown_tracker: &ShutdownTracker,
) -> Result<(), ClientCoreError> {
let topology_refresher_config =
@@ -694,6 +703,46 @@ where
tracing::info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
// 1. wait for the minimum topology (if applicable)
if topology_refresher
.ensure_topology_is_routable()
.await
.is_err()
&& wait_for_initial_topology
{
if let Err(err) = topology_refresher
.wait_for_initial_network(topology_config.max_startup_network_waiting_period)
.await
{
tracing::error!(
"the network did not come become online within the specified timeout: {err}"
);
return Err(err.into());
}
}
// 2. wait for our gateway (if applicable)
if topology_refresher
.ensure_contains_routable_egress(local_gateway)
.await
.is_err()
&& wait_for_gateway
{
if let Err(err) = topology_refresher
.wait_for_gateway(
local_gateway,
topology_config.max_startup_gateway_waiting_period,
)
.await
{
tracing::error!(
"the gateway did not come back online within the specified timeout: {err}"
);
return Err(err.into());
}
}
// 3. check if the topology is routable (in case we were NOT waiting for it)
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
tracing::error!(
"The current network topology seem to be insufficient to route any packets through \
@@ -702,30 +751,15 @@ where
return Err(ClientCoreError::InsufficientNetworkTopology(err));
}
let gateway_wait_timeout = if wait_for_gateway {
Some(topology_config.max_startup_gateway_waiting_period)
} else {
None
};
// 4. check if the gateway exists (in case we were NOT waiting for it)
if let Err(err) = topology_refresher
.ensure_contains_routable_egress(local_gateway)
.await
{
if let Some(waiting_timeout) = gateway_wait_timeout {
if let Err(err) = topology_refresher
.wait_for_gateway(local_gateway, waiting_timeout)
.await
{
tracing::error!(
"the gateway did not come back online within the specified timeout: {err}"
);
return Err(err.into());
}
} else {
tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
return Err(err.into());
}
tracing::error!(
"the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}"
);
return Err(err.into());
}
if !topology_config.disable_refreshing {
@@ -1024,6 +1058,7 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
self.wait_for_initial_topology,
&shutdown_tracker.clone(),
)
.await?;
@@ -1195,9 +1230,11 @@ mod tests {
]);
assert_eq!(network_details.nym_api_urls.as_ref().unwrap().len(), 2);
assert!(network_details.nym_api_urls.as_ref().unwrap()[1]
.front_hosts
.is_some());
assert!(
network_details.nym_api_urls.as_ref().unwrap()[1]
.front_hosts
.is_some()
);
}
#[test]
@@ -1210,11 +1247,13 @@ mod tests {
assert_eq!(api_url.url, "https://nym-frontdoor.vercel.app/api/");
assert_eq!(api_url.front_hosts.as_ref().unwrap().len(), 2);
assert!(api_url
.front_hosts
.as_ref()
.unwrap()
.contains(&"vercel.app".to_string()));
assert!(
api_url
.front_hosts
.as_ref()
.unwrap()
.contains(&"vercel.app".to_string())
);
}
#[test]
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::{
client::replies::reply_storage::{fs_backend, CombinedReplyStorage, ReplyStorageBackend},
client::replies::reply_storage::{CombinedReplyStorage, ReplyStorageBackend, fs_backend},
config,
config::Config,
error::ClientCoreError,
@@ -10,7 +10,7 @@ use crate::{
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
use nym_validator_client::{QueryHttpRpcNyxdClient, nyxd};
use std::{io, path::Path};
use time::OffsetDateTime;
use tracing::{error, info, trace};
@@ -24,7 +24,9 @@ async fn setup_fresh_backend<P: AsRef<Path>>(
let mut storage_backend = match fs_backend::Backend::init(db_path).await {
Ok(backend) => backend,
Err(err) => {
error!("setup_fresh_backend: Failed to setup persistent storage backend for our reply needs: {err}");
error!(
"setup_fresh_backend: Failed to setup persistent storage backend for our reply needs: {err}"
);
return Err(ClientCoreError::SurbStorageError {
source: Box::new(err),
});
@@ -93,7 +95,9 @@ pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
match fs_backend::Backend::try_load(db_path).await {
Ok(backend) => Ok(backend),
Err(err) => {
error!("setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
error!(
"setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future"
);
archive_corrupted_database(db_path).await?;
setup_fresh_backend(db_path, surb_config).await
}
@@ -1,8 +1,8 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::error::ClientCoreError;
use nym_client_core_gateways_storage::{
ActiveGateway, GatewayPublishedData, GatewayRegistration, GatewaysDetailsStore,
@@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
pub mod v1_1_33 {
use crate::config::disk_persistence::old_v1_1_33::CommonClientPathsV1_1_33;
use crate::config::disk_persistence::CommonClientPaths;
use crate::config::disk_persistence::old_v1_1_33::CommonClientPathsV1_1_33;
use crate::config::old_config_v1_1_33::OldGatewayEndpointConfigV1_1_33;
use crate::error::ClientCoreError;
@@ -11,8 +11,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use rand::{rngs::OsRng, CryptoRng, Rng};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use rand::{CryptoRng, Rng, rngs::OsRng};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -20,10 +20,10 @@ use tokio::sync::mpsc::error::TrySendError;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
use tokio::time::{Sleep, sleep};
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::{sleep, Sleep};
use wasmtimer::tokio::{Sleep, sleep};
pub struct LoopCoverTrafficStream<R>
where
@@ -179,7 +179,9 @@ impl LoopCoverTrafficStream<OsRng> {
) {
Ok(topology) => topology,
Err(err) => {
warn!("We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}");
warn!(
"We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}"
);
return;
}
};
@@ -13,10 +13,10 @@ use crate::config::disk_persistence::ClientKeysPaths;
#[cfg(not(target_arch = "wasm32"))]
use nym_crypto::asymmetric::{ed25519, x25519};
#[cfg(not(target_arch = "wasm32"))]
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
#[cfg(not(target_arch = "wasm32"))]
use nym_pemstore::KeyPairPath;
#[cfg(not(target_arch = "wasm32"))]
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
#[cfg(not(target_arch = "wasm32"))]
use nym_sphinx::acknowledgements::AckKey;
// we have to define it as an async trait since wasm storage is async
@@ -4,8 +4,8 @@
use async_trait::async_trait;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::GatewayClient;
use nym_gateway_client::error::GatewayClientError;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
@@ -2,13 +2,13 @@
// SPDX-License-Identifier: Apache-2.0
use super::action_controller::{AckActionSender, Action};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use futures::StreamExt;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::{
acknowledgements::{identifier::recover_identifier, AckKey},
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
acknowledgements::{AckKey, identifier::recover_identifier},
chunking::fragment::{COVER_FRAG_ID, FragmentIdentifier},
};
use nym_task::ShutdownToken;
use std::sync::Arc;
@@ -3,11 +3,11 @@
use super::PendingAcknowledgement;
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
use futures::channel::mpsc;
use futures::StreamExt;
use futures::channel::mpsc;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::Delay as SphinxDelay;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_task::ShutdownToken;
use std::collections::HashMap;
use std::sync::Arc;
@@ -9,8 +9,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use nym_task::connections::TransmissionLane;
use rand::{CryptoRng, Rng};
use tracing::*;
@@ -16,10 +16,10 @@ use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::{
Delay as SphinxDelay,
acknowledgements::AckKey,
addressing::clients::Recipient,
chunking::fragment::{Fragment, FragmentIdentifier},
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use rand::{CryptoRng, Rng};
@@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use super::{
action_controller::{AckActionSender, Action},
PendingAcknowledgement, RetransmissionRequestReceiver,
action_controller::{AckActionSender, Action},
};
use crate::client::real_messages_control::acknowledgement_control::PacketDestination;
use crate::client::real_messages_control::message_handler::{MessageHandler, PreparationError};
@@ -13,7 +13,7 @@ use futures::StreamExt;
use nym_sphinx::chunking::fragment::Fragment;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
use nym_task::{connections::TransmissionLane, ShutdownToken};
use nym_task::{ShutdownToken, connections::TransmissionLane};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
use tracing::*;
@@ -1,10 +1,10 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::action_controller::{AckActionSender, Action};
use super::SentPacketNotificationReceiver;
use super::action_controller::{AckActionSender, Action};
use futures::StreamExt;
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use nym_sphinx::chunking::fragment::{COVER_FRAG_ID, FragmentIdentifier};
use tracing::*;
/// Module responsible for starting up retransmission timers.
@@ -10,17 +10,17 @@ use crate::client::replies::reply_controller::MaxRetransmissions;
use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags};
use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit};
use nym_client_core_surb_storage::RetrievedReplySurb;
use nym_sphinx::Delay;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessage, ReplyMessage};
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessage, ReplyMessage};
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use nym_task::connections::TransmissionLane;
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
@@ -272,7 +272,9 @@ where
let primary_count = msg.required_packets(self.config.primary_packet_size);
let secondary_count = msg.required_packets(secondary_packet);
trace!("This message would require: {primary_count} primary packets or {secondary_count} secondary packets...");
trace!(
"This message would require: {primary_count} primary packets or {secondary_count} secondary packets..."
);
// if there would be no benefit in using the secondary packet - use the primary (duh)
if primary_count <= secondary_count {
trace!("so choosing primary for this message");
@@ -25,9 +25,9 @@ use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_task::ShutdownToken;
use rand::{rngs::OsRng, CryptoRng, Rng};
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use rand::{CryptoRng, Rng, rngs::OsRng};
use std::sync::Arc;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
@@ -17,11 +17,11 @@ use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketSize;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use nym_task::ShutdownToken;
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::ShutdownToken;
use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -29,11 +29,11 @@ use std::time::Duration;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
use tokio::time::{Sleep, sleep};
// use nym_wasm_utils::console_log;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::{sleep, Sleep};
use wasmtimer::tokio::{Sleep, sleep};
mod sending_delay_controller;
/// Configurable parameters of the `OutQueueControl`
@@ -230,7 +230,9 @@ where
let (next_message, fragment_id, packet_size) = match next_message {
StreamMessage::Cover => {
let cover_traffic_packet_size = self.loop_cover_message_size();
trace!("the next loop cover message will be put in a {cover_traffic_packet_size} packet");
trace!(
"the next loop cover message will be put in a {cover_traffic_packet_size} packet"
);
// TODO for way down the line: in very rare cases (during topology update) we might have
// to wait a really tiny bit before actually obtaining the permit hence messing with our
@@ -244,7 +246,9 @@ where
) {
Ok(topology) => topology,
Err(err) => {
warn!("We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}");
warn!(
"We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}"
);
return;
}
};
@@ -436,7 +440,7 @@ where
}
}
if let Some(ref mut next_delay) = &mut self.next_delay {
if let Some(next_delay) = &mut self.next_delay {
// it is not yet time to return a message
if next_delay.as_mut().poll(cx).is_pending() {
return Poll::Pending;
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::helpers::{get_time_now, Instant};
use crate::client::helpers::{Instant, get_time_now};
use std::time::Duration;
// The minimum time between increasing the average delay between packets. If we hit the ceiling in
@@ -5,20 +5,20 @@ use crate::client::helpers::get_time_now;
use crate::client::replies::{
reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
};
use futures::StreamExt;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::StreamExt;
use nym_crypto::asymmetric::x25519;
use nym_crypto::Digest;
use nym_crypto::asymmetric::x25519;
use nym_gateway_client::MixnetMessageReceiver;
use nym_sphinx::anonymous_replies::requests::{
RepliableMessage, RepliableMessageContent, ReplyMessage, ReplyMessageContent,
};
use nym_sphinx::anonymous_replies::{encryption_key::EncryptionKeyDigest, SurbEncryptionKey};
use nym_sphinx::anonymous_replies::{SurbEncryptionKey, encryption_key::EncryptionKeyDigest};
use nym_sphinx::message::{NymMessage, PlainMessage};
use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use nym_task::ShutdownToken;
use std::collections::HashSet;
use std::sync::Arc;
@@ -78,14 +78,19 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
let fragment = match self.message_receiver.recover_fragment(fragment_data) {
Err(err) => {
warn!("failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!");
warn!(
"failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!"
);
return None;
}
Ok(frag) => frag,
};
if self.recently_reconstructed.contains(&fragment.id()) {
debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
debug!(
"Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost",
fragment.id()
);
return None;
}
@@ -93,7 +98,9 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
match self.message_receiver.insert_new_fragment(fragment) {
Err(err) => match err {
MessageRecoveryError::MalformedReconstructedMessage { source, used_sets } => {
error!("message reconstruction failed - {source}. Attempting to re-use the message sets...");
error!(
"message reconstruction failed - {source}. Attempting to re-use the message sets..."
);
// TODO: should we really insert reconstructed sets? could this be abused for some attack?
for set_id in used_sets {
if !self.recently_reconstructed.insert(set_id) {
@@ -144,7 +151,9 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
&mut raw_fragment,
) {
Err(err) => {
warn!("failed to recover fragment data: {err}. The whole underlying message might be corrupted and unrecoverable!");
warn!(
"failed to recover fragment data: {err}. The whole underlying message might be corrupted and unrecoverable!"
);
return None;
}
Ok(frag_data) => frag_data,
@@ -275,7 +284,9 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
RepliableMessageContent::Heartbeat(content) => {
let additional_reply_surbs = content.additional_reply_surbs;
error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
error!(
"received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)"
);
(additional_reply_surbs, false)
}
RepliableMessageContent::DataV2(content) => {
@@ -304,7 +315,9 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
RepliableMessageContent::HeartbeatV2(content) => {
let additional_reply_surbs = content.additional_reply_surbs;
error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
error!(
"received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)"
);
(additional_reply_surbs, false)
}
};
@@ -380,7 +393,9 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
if let Some(sender) = &inner_guard.message_sender {
trace!("Sending reconstructed messages to announced sender");
if let Err(err) = sender.unbounded_send(reconstructed_messages) {
warn!("The reconstructed message receiver went offline without explicit notification (relevant error: - {err})");
warn!(
"The reconstructed message receiver went offline without explicit notification (relevant error: - {err})"
);
inner_guard.message_sender = None;
inner_guard.messages.extend(err.into_inner());
}
@@ -5,15 +5,15 @@ use crate::client::real_messages_control::acknowledgement_control::PendingAcknow
use crate::client::real_messages_control::message_handler::{
FragmentWithMaxRetransmissions, MessageHandler, PreparationError,
};
use crate::client::replies::reply_controller::key_rotation_helpers::SurbRefreshState;
use crate::client::replies::reply_controller::Config;
use crate::client::replies::reply_controller::key_rotation_helpers::SurbRefreshState;
use crate::client::topology_control::TopologyAccessor;
use crate::client::transmission_buffer::TransmissionBuffer;
use futures::channel::oneshot;
use nym_client_core_surb_storage::{ReceivedReplySurb, ReceivedReplySurbsMap};
use nym_crypto::aes::cipher::crypto_common::rand_core::CryptoRng;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_task::connections::{ConnectionId, TransmissionLane};
use nym_topology::NymTopologyMetadata;
@@ -50,7 +50,9 @@ impl SenderData {
let pending_retransmissions = self.pending_retransmissions.len();
let total_pending = pending_retransmissions + pending_replies;
debug!("total queue size: {total_pending} = pending data {pending_replies} + pending retransmission {pending_retransmissions}");
debug!(
"total queue size: {total_pending} = pending data {pending_replies} + pending retransmission {pending_retransmissions}"
);
total_pending
}
@@ -200,7 +202,9 @@ where
let total_required_surbs = total_queue + target_surbs_after_clearing_queue;
let total_available_surbs = pending_surbs + available_surbs;
debug!("available surbs: {available_surbs} pending surbs: {pending_surbs} threshold range: {min_surbs_threshold}..+{min_surbs_threshold_buffer}..{max_surbs_threshold}");
debug!(
"available surbs: {available_surbs} pending surbs: {pending_surbs} threshold range: {min_surbs_threshold}..+{min_surbs_threshold_buffer}..{max_surbs_threshold}"
);
// We should request more surbs if:
// 1. We haven't hit the maximum surb threshold, and
@@ -225,9 +229,13 @@ where
.is_none()
{
// don't report it every single time
warn!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
warn!(
"received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!"
);
} else {
trace!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
trace!(
"received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!"
);
}
return;
}
@@ -383,7 +391,9 @@ where
let (surbs_for_reply, _) = self.surbs_storage.get_reply_surbs(&target, to_take.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
error!(
"somehow different task has stolen our reply surbs! - this should have been impossible"
);
self.re_insert_pending_retransmission(&target, to_take);
return;
};
@@ -459,7 +469,9 @@ where
.get_reply_surbs(&target, to_send_clone.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
error!(
"somehow different task has stolen our reply surbs! - this should have been impossible"
);
self.re_insert_pending_replies(&target, to_send);
return;
};
@@ -543,7 +555,9 @@ where
let ack_ref = match timed_out_ack.upgrade() {
Some(ack) => ack,
None => {
debug!("we received the ack for one of the reply packets as we were putting it in the retransmission queue");
debug!(
"we received the ack for one of the reply packets as we were putting it in the retransmission queue"
);
return;
}
};
@@ -657,9 +671,13 @@ where
// only log at higher level if it's the first time this error has occurred in a while
if now - last_failure > time::Duration::seconds(30) {
warn!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
warn!(
"failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}"
)
} else {
debug!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
debug!(
"failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}"
)
}
}
}
@@ -681,7 +699,10 @@ where
.surbs_storage
.surbs_last_received_at(pending_reply_target)
else {
error!("we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!", retransmission_buf.total_size());
error!(
"we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!",
retransmission_buf.total_size()
);
to_remove.push(*pending_reply_target);
continue;
};
@@ -702,7 +723,9 @@ where
// if client is offline)
if vals.current_clear_rerequest_counter > max_rerequests {
to_remove.push(*pending_reply_target);
debug!("we have reached the maximum threshold of attempting to request surbs from {pending_reply_target}. dropping the sender");
debug!(
"we have reached the maximum threshold of attempting to request surbs from {pending_reply_target}. dropping the sender"
);
continue;
}
@@ -710,7 +733,10 @@ where
if diff > max_drop_wait {
to_remove.push(*pending_reply_target)
} else {
debug!("We haven't received any surbs in {} from {pending_reply_target}. Going to explicitly ask for more", humantime::format_duration(diff.unsigned_abs()));
debug!(
"We haven't received any surbs in {} from {pending_reply_target}. Going to explicitly ask for more",
humantime::format_duration(diff.unsigned_abs())
);
vals.increment_current_clear_rerequest_counter();
to_request.push(*pending_reply_target);
}
@@ -4,8 +4,8 @@
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
use futures::channel::{mpsc, oneshot};
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_task::connections::{ConnectionId, TransmissionLane};
use std::sync::Weak;
@@ -43,7 +43,9 @@ where
// 1. check whether we sent any surbs in the past to this recipient, otherwise
// they have no business in asking for more
if !self.tags_storage.exists(&recipient) {
warn!("{recipient} asked us for reply SURBs even though we never sent them any anonymous messages before!");
warn!(
"{recipient} asked us for reply SURBs even though we never sent them any anonymous messages before!"
);
return;
}
@@ -54,7 +56,12 @@ where
.reply_surbs
.maximum_allowed_reply_surb_request_size
{
warn!("The requested reply surb amount is larger than our maximum allowed ({amount} > {}). Lowering it to a more sane value...", self.config.reply_surbs.maximum_allowed_reply_surb_request_size);
warn!(
"The requested reply surb amount is larger than our maximum allowed ({amount} > {}). Lowering it to a more sane value...",
self.config
.reply_surbs
.maximum_allowed_reply_surb_request_size
);
amount = self
.config
.reply_surbs
@@ -23,7 +23,7 @@ use nym_sphinx::addressing::Recipient;
use nym_statistics_common::clients::{
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
};
use nym_task::{connections::TransmissionLane, ShutdownToken, ShutdownTracker};
use nym_task::{ShutdownToken, ShutdownTracker, connections::TransmissionLane};
use std::time::Duration;
/// Time interval between reporting statistics locally (logging/shutdown_token)
@@ -5,8 +5,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError, NymTopologyMetadata};
use nym_validator_client::models::KeyRotationId;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::{Notify, RwLock, RwLockReadGuard};
#[derive(Debug)]
@@ -63,7 +63,9 @@ impl TopologyRefresher {
trace!("Refreshing the topology");
if self.topology_accessor.controlled_manually() {
info!("topology is being controlled manually - we're going to wait until the control is released...");
info!(
"topology is being controlled manually - we're going to wait until the control is released..."
);
self.topology_accessor
.wait_for_released_manual_control()
.await;
@@ -138,6 +140,35 @@ impl TopologyRefresher {
}
}
pub async fn wait_for_initial_network(
&mut self,
timeout_duration: Duration,
) -> Result<(), NymTopologyError> {
info!(
"going to wait for at most {timeout_duration:?} for initial network to become online"
);
let deadline = sleep(timeout_duration);
tokio::pin!(deadline);
loop {
tokio::select! {
_ = &mut deadline => {
return Err(NymTopologyError::TimedOutWaitingForTopology)
}
_ = self.try_refresh() => {
if let Err(err) = self.ensure_topology_is_routable().await {
info!("network is still not routable...: {err}");
} else {
return Ok(())
}
sleep(self.refresh_rate).await
}
}
}
}
// it's perfectly fine if task is interrupted mid-refresh
// there's no data to persist or send over
pub async fn run(&mut self) {
@@ -3,8 +3,8 @@
use async_trait::async_trait;
use nym_mixnet_contract_common::EpochRewardedSet;
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
use nym_topology::NymTopology;
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
use nym_validator_client::nym_api::NymApiClientExt;
use rand::prelude::SliceRandom;
use rand::thread_rng;
@@ -82,7 +82,9 @@ impl NymApiTopologyProvider {
fn use_next_nym_api(&mut self) {
if self.nym_api_urls.len() == 1 {
warn!("There's only a single nym API available - it won't be possible to use a different one");
warn!(
"There's only a single nym API available - it won't be possible to use a different one"
);
return;
}
@@ -155,7 +157,10 @@ impl NymApiTopologyProvider {
let mixnodes = mixnodes_res.nodes;
if !gateways_res.metadata.consistency_check(&metadata) {
warn!("inconsistent nodes metadata between mixnodes and gateways calls! {metadata:?} and {:?}", gateways_res.metadata);
warn!(
"inconsistent nodes metadata between mixnodes and gateways calls! {metadata:?} and {:?}",
gateways_res.metadata
);
return None;
}
@@ -1,11 +1,11 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::helpers::{get_time_now, Instant};
use crate::client::helpers::{Instant, get_time_now};
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
use nym_sphinx::chunking::fragment::Fragment;
use nym_task::connections::TransmissionLane;
use rand::{seq::SliceRandom, Rng};
use rand::{Rng, seq::SliceRandom};
use std::{
collections::{HashMap, HashSet, VecDeque},
time::Duration,
+13 -5
View File
@@ -7,9 +7,9 @@ use nym_gateway_client::error::GatewayClientError;
use nym_task::RegistryAccessError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::ValidatorClientError;
use nym_validator_client::nym_api::error::NymAPIError;
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::ValidatorClientError;
use rand::distributions::WeightedError;
use std::error::Error;
use std::path::PathBuf;
@@ -56,7 +56,9 @@ pub enum ClientCoreError {
#[error("no gateways on network")]
NoGatewaysOnNetwork,
#[error("there are no more new gateways on the network - it seems this client has already registered with all nodes it could have")]
#[error(
"there are no more new gateways on the network - it seems this client has already registered with all nodes it could have"
)]
NoNewGatewaysAvailable,
#[error("list of nym apis is empty")]
@@ -127,7 +129,9 @@ pub enum ClientCoreError {
#[error("unexpected exit")]
UnexpectedExit,
#[error("this operation would have resulted in the gateway {gateway_id:?} key being overwritten without permission")]
#[error(
"this operation would have resulted in the gateway {gateway_id:?} key being overwritten without permission"
)]
ForbiddenGatewayKeyOverwrite { gateway_id: String },
#[error(
@@ -151,7 +155,9 @@ pub enum ClientCoreError {
#[error("attempted to obtain fresh gateway details whilst already knowing about one")]
UnexpectedGatewayDetails,
#[error("the provided gateway details (for gateway {gateway_id}) do not correspond to the shared keys")]
#[error(
"the provided gateway details (for gateway {gateway_id}) do not correspond to the shared keys"
)]
MismatchedGatewayDetails { gateway_id: String },
#[error("unable to upgrade config file from `{current_version}`")]
@@ -227,7 +233,9 @@ pub enum ClientCoreError {
source: url::ParseError,
},
#[error("this client (id: '{client_id}') has already been initialised before. If you want to add additional gateway, use `add-gateway` command")]
#[error(
"this client (id: '{client_id}') has already been initialised before. If you want to add additional gateway, use `add-gateway` command"
)]
AlreadyInitialised { client_id: String },
#[error("this client has already registered with gateway {gateway_id}")]
+5 -5
View File
@@ -5,13 +5,13 @@ use crate::error::ClientCoreError;
use crate::init::types::RegistrationResult;
use futures::{SinkExt, StreamExt};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::client::GatewayListeners;
use nym_gateway_client::GatewayClient;
use nym_gateway_client::client::GatewayListeners;
use nym_topology::node::RoutingNode;
use nym_validator_client::UserAgent;
use nym_validator_client::client::{IdentityKeyRef, NymApiClientExt};
use nym_validator_client::nym_nodes::SkimmedNodesWithMetadata;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
use rand::{Rng, seq::SliceRandom};
#[cfg(unix)]
use std::os::fd::RawFd;
use std::{sync::Arc, time::Duration};
@@ -28,10 +28,10 @@ use nym_wasm_utils::websocket::JSWebsocket;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
+1 -1
View File
@@ -7,8 +7,8 @@ use crate::client::base_client::storage::helpers::{
has_gateway_details, load_active_gateway_details, load_client_keys, load_gateway_details,
store_gateway_details, update_stored_published_data_gateway,
};
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::error::ClientCoreError;
use crate::init::helpers::{
choose_gateway_by_latency, get_specified_gateway, uniformly_random_gateway,
+2 -2
View File
@@ -1,8 +1,8 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::config::Config;
use crate::error::ClientCoreError;
use crate::init::{setup_gateway, use_loaded_gateway_details};
@@ -10,8 +10,8 @@ use nym_client_core_gateways_storage::{
GatewayRegistration, GatewaysDetailsStore, RemoteGatewayDetails,
};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::client::{GatewayListeners, InitGatewayClient};
use nym_gateway_client::SharedSymmetricKey;
use nym_gateway_client::client::{GatewayListeners, InitGatewayClient};
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKey;
+1
View File
@@ -1,3 +1,4 @@
#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
use std::future::Future;
#[cfg(all(
@@ -1,6 +1,7 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::from_exact_iter`: please upgrade to generic-array 1.x - TODO
pub use backend::*;
pub use combined::CombinedReplyStorage;
pub use key_storage::SentReplyKeys;
@@ -130,7 +130,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
let req = QueryBalanceRequest {
address: address.to_string(),
denom: search_denom.to_string(),
denom: search_denom,
};
let res = self
@@ -199,6 +199,18 @@ impl NyxdClient<HttpClient, DirectSecp256k1HdWallet> {
let wallet = DirectSecp256k1HdWallet::checked_from_mnemonic(prefix, mnemonic)?;
Ok(Self::connect_with_signer(config, client, wallet))
}
pub fn connect_with_mnemonic_and_network_details<U>(
endpoint: U,
network_details: NymNetworkDetails,
mnemonic: bip39::Mnemonic,
) -> Result<DirectSigningHttpRpcNyxdClient, NyxdError>
where
U: TryInto<HttpClientUrl, Error = TendermintRpcError>,
{
let config = Config::try_from_nym_network_details(&network_details)?;
Self::connect_with_mnemonic(config, endpoint, mnemonic)
}
}
#[allow(deprecated)]
@@ -1,6 +1,8 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(clippy::derivable_impls)]
// MAX: surpressing warning for the moment, will be dealt with in a different PR (TODO)
use cosmwasm_schema::cw_serde;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
+5
View File
@@ -4,6 +4,7 @@
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use strum::IntoEnumIterator as _;
use thiserror::Error;
use time::{Date, OffsetDateTime};
@@ -315,6 +316,10 @@ impl TicketType {
_ => Err(UnknownTicketType),
}
}
pub fn exposed_iter() -> impl Iterator<Item = TicketType> {
TicketType::iter()
}
}
impl From<TicketType> for TicketTypeRepr {
+2
View File
@@ -1,6 +1,8 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `generic_array::GenericArray::<T, N>::from_exact_iter`: please upgrade to generic-array 1.x - TODO
#[cfg(feature = "asymmetric")]
pub mod asymmetric;
pub mod bech32_address_validation;
+1
View File
@@ -1,5 +1,6 @@
// Copyright 2020-2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
pub use nym_crypto::generic_array;
use nym_crypto::OutputSizeUser;
+3
View File
@@ -1,6 +1,9 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)]
// silences clippy warning: use of deprecated tuple variant `HttpClientError::GenericRequestFailure`: use another more strongly typed variant - this variant is only left for compatibility reasons - TODO
//! Nym HTTP API Client
//!
//! Centralizes and implements the core API client functionality. This crate provides custom,
+11 -5
View File
@@ -22,6 +22,16 @@ pub struct ChainDetails {
pub stake_denom: DenomDetailsOwned,
}
impl ChainDetails {
pub fn mainnet() -> Self {
ChainDetails {
bech32_account_prefix: mainnet::BECH32_PREFIX.into(),
mix_denom: mainnet::MIX_DENOM.into(),
stake_denom: mainnet::STAKE_DENOM.into(),
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize, JsonSchema)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NymContracts {
@@ -181,11 +191,7 @@ impl NymNetworkDetails {
// Consider caching this process (lazy static)
NymNetworkDetails {
network_name: mainnet::NETWORK_NAME.into(),
chain_details: ChainDetails {
bech32_account_prefix: mainnet::BECH32_PREFIX.into(),
mix_denom: mainnet::MIX_DENOM.into(),
stake_denom: mainnet::STAKE_DENOM.into(),
},
chain_details: ChainDetails::mainnet(),
endpoints: mainnet::validators(),
contracts: NymContracts {
mixnet_contract_address: parse_optional_str(mainnet::MIXNET_CONTRACT_ADDRESS),
+1
View File
@@ -30,6 +30,7 @@ libcrux-ml-kem = { workspace = true }
[dev-dependencies]
rand_chacha = "0.9.0"
anyhow = { workspace = true }
nym-test-utils = { workspace = true }
[lints]
+122 -59
View File
@@ -16,10 +16,6 @@ pub use nym_kkt_context as context;
#[cfg(test)]
mod test {
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
use rand09::RngCore;
use std::collections::BTreeMap;
use crate::keys::KEMKeys;
use crate::{
initiator::KKTInitiator,
@@ -29,9 +25,13 @@ mod test {
},
responder::KKTResponder,
};
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
use nym_test_utils::helpers::deterministic_rng_09;
use rand09::RngCore;
use std::collections::BTreeMap;
#[test]
fn test_kkt_psq_e2e_encrypted_carrier() {
fn test_kkt_psq_e2e_one_way_encrypted_carrier() {
let mut rng = rand09::rng();
let mut payload: Vec<u8> = vec![0u8; 900_000];
@@ -47,7 +47,6 @@ mod test {
HashFunction::Shake256,
] {
// generate kem public keys
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
@@ -64,20 +63,6 @@ mod test {
HashLength::Default.value(),
responder_kem.mc_eliece_encapsulation_key().as_ref(),
);
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let _i_dir_hash_mlkem = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
initiator_mlkem_keypair.public_key().as_slice(),
);
let _i_dir_hash_mceliece = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
initiator_mceliece_keypair.pk.as_ref(),
);
let init_hashes = BTreeMap::new();
@@ -128,41 +113,6 @@ mod test {
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// Mutual - MlKem
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::MlKem768,
hash_function,
SignatureScheme::Ed25519,
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
&mut rng,
ciphersuite,
&responder_x25519_keypair.pk,
&r_dir_hash_mlkem,
1u8,
Some(payload.clone()),
)
.unwrap();
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
// if we keep unverified keys, this should change
assert!(processed_request.remote_encapsulation_key.is_none());
let processed_response = initiator
.process_response(processed_request.response, 0)
.unwrap();
assert_eq!(
processed_response.encapsulation_key.as_bytes(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// OneWay - McEliece
{
@@ -195,7 +145,110 @@ mod test {
responder_kem.mc_eliece_encapsulation_key().as_ref()
)
}
}
}
#[test]
fn test_kkt_psq_e2e_mutual_encrypted_carrier() {
let mut rng = deterministic_rng_09();
let mut payload: Vec<u8> = vec![0u8; 50000];
rng.fill_bytes(&mut payload);
// generate kem public keys
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let responder_x25519_keypair = generate_lp_keypair_x25519(&mut rng);
let initiator_kem = KEMKeys::new(initiator_mceliece_keypair, initiator_mlkem_keypair);
let responder_kem = KEMKeys::new(responder_mceliece_keypair, responder_mlkem_keypair);
let init_hashes = initiator_kem.encapsulation_keys_digests();
let responder = KKTResponder::new(
&responder_x25519_keypair,
&responder_kem,
&init_hashes,
&[
HashFunction::Blake3,
HashFunction::SHA256,
HashFunction::Shake128,
HashFunction::Shake256,
],
&[SignatureScheme::Ed25519],
&[1],
)
.unwrap();
for hash_function in [
HashFunction::Blake3,
HashFunction::SHA256,
HashFunction::Shake128,
HashFunction::Shake256,
] {
let r_dir_hash_mlkem = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
);
let r_dir_hash_mceliece = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
responder_kem.mc_eliece_encapsulation_key().as_ref(),
);
// Mutual - MlKem
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::MlKem768,
hash_function,
SignatureScheme::Ed25519,
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
&mut rng,
ciphersuite,
initiator_kem
.encoded_encapsulation_key(KEM::MlKem768)
.unwrap(),
&responder_x25519_keypair.pk,
&r_dir_hash_mlkem,
1u8,
Some(payload.clone()),
)
.unwrap();
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
assert_eq!(
processed_request
.remote_encapsulation_key
.unwrap()
.as_bytes(),
initiator_kem
.encapsulation_key(KEM::MlKem768)
.unwrap()
.as_bytes()
);
let processed_response = initiator
.process_response(processed_request.response, 0)
.unwrap();
assert_eq!(
processed_response.encapsulation_key.as_bytes(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// Mutual - McEliece is not supported due to the key being too large
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::McEliece,
@@ -204,9 +257,12 @@ mod test {
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
&mut rng,
ciphersuite,
initiator_kem
.encoded_encapsulation_key(KEM::McEliece)
.unwrap(),
&responder_x25519_keypair.pk,
&r_dir_hash_mceliece,
1u8,
@@ -217,9 +273,16 @@ mod test {
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
// if we keep unverified keys, this should change
assert!(processed_request.remote_encapsulation_key.is_none());
assert_eq!(
processed_request
.remote_encapsulation_key
.unwrap()
.as_bytes(),
initiator_kem
.encapsulation_key(KEM::McEliece)
.unwrap()
.as_bytes()
);
let processed_response = initiator
.process_response(processed_request.response, 0)
+9 -9
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::LpError;
use crate::packet::{EncryptedLpPacket, InnerHeader, LpHeader, LpMessage, LpPacket};
use crate::packet::{EncryptedLpPacket, InnerHeader, LpFrame, LpHeader, LpPacket};
use bytes::BytesMut;
use libcrux_psq::Channel;
@@ -46,9 +46,9 @@ pub(crate) fn encrypt_lp_packet(
packet: LpPacket,
transport: &mut libcrux_psq::session::Transport,
) -> Result<EncryptedLpPacket, LpError> {
let mut plaintext = BytesMut::with_capacity(InnerHeader::SIZE + packet.message().len());
let mut plaintext = BytesMut::with_capacity(InnerHeader::SIZE + packet.frame().len());
packet.header().inner.encode(&mut plaintext);
packet.message().encode(&mut plaintext);
packet.frame().encode(&mut plaintext);
let ciphertext = encrypt_data(plaintext.as_ref(), transport)?;
@@ -67,14 +67,14 @@ pub(crate) fn decrypt_lp_packet(
let inner_header = InnerHeader::parse(&plaintext)?;
let payload = &plaintext[InnerHeader::SIZE..];
let message = LpMessage::decode(payload)?;
let frame = LpFrame::decode(payload)?;
Ok(LpPacket::new(
LpHeader {
outer: packet.outer_header(),
inner: inner_header,
},
message,
frame,
))
}
@@ -82,7 +82,7 @@ pub(crate) fn decrypt_lp_packet(
mod tests {
use crate::LpError;
use crate::codec::{decrypt_data, decrypt_lp_packet, encrypt_data, encrypt_lp_packet};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::packet::{EncryptedLpPacket, LpFrame, LpHeader, LpPacket};
use crate::peer::mock_peers;
use crate::psq::initiator::{build_psq_ciphersuite, build_psq_principal};
use crate::psq::{PSQ_MSG2_SIZE, psq_msg1_size, responder};
@@ -261,7 +261,7 @@ mod tests {
// happy path
let packet = LpPacket::new(
LpHeader::new(123, 0, 1),
LpMessage::new_opaque(b"foomp".to_vec()),
LpFrame::new_opaque(b"foomp".to_vec()),
);
let ciphertext = encrypt_lp_packet(packet.clone(), &mut init_transport).unwrap();
@@ -273,7 +273,7 @@ mod tests {
// incomplete ciphertext
let packet = LpPacket::new(
LpHeader::new(123, 1, 1),
LpMessage::new_opaque(b"foomp".to_vec()),
LpFrame::new_opaque(b"foomp".to_vec()),
);
let ciphertext2 = encrypt_lp_packet(packet, &mut init_transport).unwrap();
let l = ciphertext2.ciphertext().len();
@@ -285,7 +285,7 @@ mod tests {
// too small buffer
let packet = LpPacket::new(
LpHeader::new(123, 1, 1),
LpMessage::new_opaque(b"foomp".to_vec()),
LpFrame::new_opaque(b"foomp".to_vec()),
);
let ciphertext3 = encrypt_lp_packet(packet, &mut resp_transport).unwrap();
let malformed = EncryptedLpPacket::new(ciphertext3.outer_header(), vec![]);
+4 -4
View File
@@ -11,8 +11,8 @@ pub enum MalformedLpPacketError {
#[error("provided insufficient data to fully deserialise the struct")]
InsufficientData,
#[error("{0} is not a valid LpDataKind")]
InvalidLpDataKind(u16),
#[error("{0} is not a valid LpFrameKind value")]
InvalidLpFrameKind(u16),
#[error("invalid payload size: expected {expected}, got {actual}")]
InvalidPayloadSize { expected: usize, actual: usize },
@@ -27,7 +27,7 @@ pub enum MalformedLpPacketError {
}
impl MalformedLpPacketError {
pub fn invalid_data_kind(message_type: u16) -> Self {
MalformedLpPacketError::InvalidLpDataKind(message_type)
pub fn invalid_data_kind(frame_kind: u16) -> Self {
MalformedLpPacketError::InvalidLpFrameKind(frame_kind)
}
}
@@ -7,32 +7,32 @@ use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
#[derive(Debug, Clone, PartialEq)]
pub struct LpMessageHeader {
pub kind: LpMessageType,
pub message_attributes: [u8; 14],
pub struct LpFrameHeader {
pub kind: LpFrameKind,
pub frame_attributes: [u8; 14],
}
impl LpMessageHeader {
impl LpFrameHeader {
pub const SIZE: usize = 16; // message_kind(2) + message_attributes(14)
pub fn new(kind: LpMessageType, message_attributes: [u8; 14]) -> Self {
pub fn new(kind: LpFrameKind, frame_attributes: [u8; 14]) -> Self {
Self {
kind,
message_attributes,
frame_attributes,
}
}
pub fn new_no_attributes(kind: LpMessageType) -> Self {
pub fn new_no_attributes(kind: LpFrameKind) -> Self {
Self {
kind,
message_attributes: [0; 14],
frame_attributes: [0; 14],
}
}
/// Encode directly into a BytesMut buffer
pub fn encode(&self, dst: &mut BytesMut) {
dst.put_u16_le(self.kind as u16);
dst.put_slice(&self.message_attributes);
dst.put_slice(&self.frame_attributes);
}
pub fn parse(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
@@ -41,35 +41,35 @@ impl LpMessageHeader {
}
let raw_kind = u16::from_le_bytes([src[0], src[1]]);
let kind = LpMessageType::try_from(raw_kind)
let kind = LpFrameKind::try_from(raw_kind)
.map_err(|_| MalformedLpPacketError::invalid_data_kind(raw_kind))?;
#[allow(clippy::unwrap_used)]
let message_attributes = src[2..16].try_into().unwrap();
Ok(Self {
kind,
message_attributes,
frame_attributes: message_attributes,
})
}
}
/// Represent application data being sent in Transport mode
#[derive(Debug, Clone, PartialEq)]
pub struct LpMessage {
pub header: LpMessageHeader,
pub struct LpFrame {
pub header: LpFrameHeader,
pub content: Bytes,
}
impl AsRef<[u8]> for LpMessage {
impl AsRef<[u8]> for LpFrame {
fn as_ref(&self) -> &[u8] {
&self.content
}
}
impl LpMessage {
pub fn new(kind: LpMessageType, content: impl Into<Bytes>) -> Self {
impl LpFrame {
pub fn new(kind: LpFrameKind, content: impl Into<Bytes>) -> Self {
Self {
header: LpMessageHeader::new_no_attributes(kind),
header: LpFrameHeader::new_no_attributes(kind),
content: content.into(),
}
}
@@ -81,37 +81,37 @@ impl LpMessage {
}
pub fn decode(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
let header = LpMessageHeader::parse(src)?;
let content = src[LpMessageHeader::SIZE..].to_vec().into();
let header = LpFrameHeader::parse(src)?;
let content = src[LpFrameHeader::SIZE..].to_vec().into();
Ok(Self { header, content })
}
pub fn kind(&self) -> LpMessageType {
pub fn kind(&self) -> LpFrameKind {
self.header.kind
}
pub fn new_opaque(content: impl Into<Bytes>) -> Self {
Self::new(LpMessageType::Opaque, content)
Self::new(LpFrameKind::Opaque, content)
}
pub fn new_registration(data: impl Into<Bytes>) -> Self {
Self::new(LpMessageType::Registration, data)
Self::new(LpFrameKind::Registration, data)
}
pub fn new_forward(data: impl Into<Bytes>) -> Self {
Self::new(LpMessageType::Forward, data)
Self::new(LpFrameKind::Forward, data)
}
pub(crate) fn len(&self) -> usize {
LpMessageHeader::SIZE + self.content.len()
LpFrameHeader::SIZE + self.content.len()
}
}
/// Represent kind of application data being sent in Transport mode
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, TryFromPrimitive)]
#[repr(u16)]
pub enum LpMessageType {
pub enum LpFrameKind {
Opaque = 0,
Registration = 1,
Forward = 2,
+10 -10
View File
@@ -6,12 +6,12 @@ use bytes::{BufMut, BytesMut};
use std::fmt::{Debug, Formatter};
pub use error::MalformedLpPacketError;
pub use frame::{ForwardPacketData, LpFrame};
pub use header::{InnerHeader, LpHeader, OuterHeader};
pub use message::{ForwardPacketData, LpMessage};
pub mod error;
pub mod frame;
pub mod header;
pub mod message;
pub mod replay;
pub mod utils;
@@ -81,7 +81,7 @@ impl EncryptedLpPacket {
#[derive(Clone, PartialEq)]
pub struct LpPacket {
pub(crate) header: LpHeader,
pub(crate) message: LpMessage,
pub(crate) frame: LpFrame,
}
impl Debug for LpPacket {
@@ -91,16 +91,16 @@ impl Debug for LpPacket {
}
impl LpPacket {
pub fn new(header: LpHeader, message: LpMessage) -> Self {
Self { header, message }
pub fn new(header: LpHeader, frame: LpFrame) -> Self {
Self { header, frame }
}
pub fn message(&self) -> &LpMessage {
&self.message
pub fn frame(&self) -> &LpFrame {
&self.frame
}
pub fn into_message(self) -> LpMessage {
self.message
pub fn into_frame(self) -> LpFrame {
self.frame
}
pub fn header(&self) -> &LpHeader {
@@ -115,6 +115,6 @@ impl LpPacket {
pub(crate) fn dbg_encode(&self, dst: &mut BytesMut) {
self.header.dbg_encode(dst);
self.message.encode(dst)
self.frame.encode(dst)
}
}
+4
View File
@@ -143,6 +143,10 @@ impl LpRemotePeer {
.ok_or(LpError::NoKnownKEMKeyDigests { kem, hash_function })
.cloned()
}
pub fn kem_key_digests(&self) -> &BTreeMap<KEM, KEMKeyDigests> {
&self.expected_kem_key_digests
}
}
impl From<DHPublicKey> for LpRemotePeer {
+5
View File
@@ -65,6 +65,7 @@ impl LpPeerConfig {
rng.random(),
)
}
/// Creates a new client to exit config.
/// Inputs:
/// hop_id: this value must be in the range (1..=15). This function returns an error if this is not the case.
@@ -79,6 +80,7 @@ impl LpPeerConfig {
{
Self::new(rng, hop_id, true, false, censorship_resistance)
}
/// Creates a new client to an intermediate node config.
/// Inputs:
/// hop_id: this value must be in the range (1..=14). This function returns an error if this is not the case.
@@ -130,6 +132,7 @@ impl LpPeerConfig {
rng.random(),
)
}
fn build(
hop_id: u8,
is_exit: bool,
@@ -147,6 +150,7 @@ impl LpPeerConfig {
seed,
}
}
fn build_checked(
hop_id: u8,
is_exit: bool,
@@ -203,6 +207,7 @@ impl LpPeerConfig {
output_bytes[4..].copy_from_slice(&self.seed);
output_bytes
}
pub fn deserialize(bytes: &[u8]) -> Result<Self, LpError> {
if bytes.len() != LP_PEER_CONFIG_SIZE {
return Err(LpError::DeserializationError(format!(
+43 -13
View File
@@ -24,10 +24,31 @@ use nym_kkt::message::{KKTRequest, KKTResponse};
use rand09::SeedableRng;
use tracing::debug;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HandshakeMode {
// Client <> Entry
OneWayEntry,
// Client <> Exit
OneWayExit,
// Entry <> Exit
MutualInternode,
// in the future more variants will be supported (such as individual mix hops)
}
impl HandshakeMode {
pub fn is_mutual(&self) -> bool {
matches!(self, HandshakeMode::MutualInternode)
}
}
pub struct PSQHandshakeStateInitiator<'a, S> {
pub(super) inner_state: PSQHandshakeState<'a, S>,
pub(super) initiator_data: InitiatorData,
pub(super) mutual: bool,
/// The mode of the handshake (mutual node-node, client-entry, entry-exit)
pub(super) mode: HandshakeMode,
}
pub(crate) fn build_psq_principal<R>(
@@ -78,13 +99,23 @@ impl<'a, S> PSQHandshakeStateInitiator<'a, S>
where
S: LpHandshakeChannel + Unpin,
{
pub fn set_mutual_kkt(mut self) -> Result<Self, LpError> {
if self.inner_state.local_peer.kem_keypairs.is_none() {
return Err(LpError::PSQMutualInitiatorMissingKemKey);
}
fn lp_peer_config<R>(&self, rng: &mut R) -> Result<LpPeerConfig, LpError>
where
R: rand09::CryptoRng,
{
// for now we don't support censorship resistance flag
let censorship_resistance = false;
self.mutual = true;
Ok(self)
match self.mode {
HandshakeMode::OneWayEntry => Ok(LpPeerConfig::new_client_to_entry(
rng,
censorship_resistance,
)),
HandshakeMode::OneWayExit => {
LpPeerConfig::new_client_to_exit(rng, 1, censorship_resistance)
}
HandshakeMode::MutualInternode => LpPeerConfig::new_node_to_node(rng),
}
}
/// Attempt to send KKT request to begin the handshake
@@ -132,7 +163,7 @@ where
let ciphersuite = self.inner_state.local_peer.ciphersuite();
let kem = ciphersuite.kem();
let lp_peer_config = LpPeerConfig::new_client_to_entry(rng, false);
let lp_peer_config = self.lp_peer_config(rng)?;
// 1. retrieve the expected kem key hash. if we don't know it,
let dir_hash = self
@@ -141,7 +172,7 @@ where
.expected_kem_key_hash(ciphersuite)?;
// 2. prepare and send KKT request
let (mut initiator, kkt_request) = if self.mutual {
let (mut initiator, kkt_request) = if self.mode.is_mutual() {
// this has been verified when setting the mutual flag
let Some(local_encapsulation_key) = self.inner_state.local_peer.encoded_kem_key(kem)
else {
@@ -273,8 +304,8 @@ mod tests {
resp.ciphersuite = ciphersuite;
let initiator_data = InitiatorData::new(1, resp_remote);
let handshake_init =
PSQHandshakeState::new(conn_init, init).as_initiator(initiator_data);
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(initiator_data, HandshakeMode::OneWayEntry)?;
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
@@ -396,8 +427,7 @@ mod tests {
let initiator_data = InitiatorData::new(1, resp_remote);
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(initiator_data)
.set_mutual_kkt()?;
.as_initiator(initiator_data, HandshakeMode::MutualInternode)?;
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
+22 -9
View File
@@ -12,6 +12,8 @@ mod helpers;
pub mod initiator;
pub mod responder;
use crate::LpError;
use crate::psq::initiator::HandshakeMode;
pub use initiator::PSQHandshakeStateInitiator;
pub use responder::PSQHandshakeStateResponder;
@@ -107,12 +109,20 @@ where
}
}
pub fn as_initiator(self, initiator_data: InitiatorData) -> PSQHandshakeStateInitiator<'a, S> {
PSQHandshakeStateInitiator {
pub fn as_initiator(
self,
initiator_data: InitiatorData,
mode: HandshakeMode,
) -> Result<PSQHandshakeStateInitiator<'a, S>, LpError> {
if mode.is_mutual() && self.local_peer.kem_keypairs.is_none() {
return Err(LpError::PSQMutualInitiatorMissingKemKey);
}
Ok(PSQHandshakeStateInitiator {
initiator_data,
inner_state: self,
mutual: false,
}
mode,
})
}
pub fn as_responder(self, responder_data: ResponderData) -> PSQHandshakeStateResponder<'a, S> {
@@ -160,8 +170,10 @@ mod tests {
resp.ciphersuite = ciphersuite;
let resp_remote = resp.as_remote();
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(InitiatorData::new(1, resp_remote));
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
InitiatorData::new(1, resp_remote),
HandshakeMode::OneWayEntry,
)?;
let handshake_resp =
PSQHandshakeState::new(conn_resp, resp).as_responder(ResponderData::default());
@@ -232,9 +244,10 @@ mod tests {
let resp_remote = resp.as_remote();
let init_remote = init.as_remote();
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(InitiatorData::new(1, resp_remote))
.set_mutual_kkt()?;
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
InitiatorData::new(1, resp_remote),
HandshakeMode::MutualInternode,
)?;
let handshake_resp = PSQHandshakeState::new(conn_resp, resp).as_responder(
ResponderData::default()
.with_initiator_kem_hashes(init_remote.expected_kem_key_digests),
+63 -26
View File
@@ -6,9 +6,10 @@
//! This module implements session management functionality, including replay protection
use crate::codec::{decrypt_lp_packet, encrypt_lp_packet};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::packet::{EncryptedLpPacket, LpFrame, LpHeader, LpPacket};
use crate::peer::{LpLocalPeer, LpRemotePeer};
use crate::peer_config::LpReceiverIndex;
use crate::psq::initiator::HandshakeMode;
use crate::psq::{
InitiatorData, PSQHandshakeState, PSQHandshakeStateInitiator, PSQHandshakeStateResponder,
ResponderData,
@@ -19,6 +20,8 @@ use crate::{LpError, replay::ReceivingKeyCounterValidator};
use libcrux_psq::handshake::types::{Authenticator, DHPublicKey};
use libcrux_psq::session::{Session, SessionBinding};
use nym_kkt::keys::EncapsulationKey;
use nym_kkt_ciphersuite::{KEM, KEMKeyDigests};
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
/// Represents inputs that drive the state machine transitions.
@@ -29,7 +32,7 @@ pub enum LpInput {
ReceivePacket(EncryptedLpPacket),
/// Application wants to send data (only valid in Transport state).
SendData(LpMessage),
SendFrame(LpFrame),
}
/// Represents actions the state machine requests the environment to perform.
@@ -39,7 +42,7 @@ pub enum LpAction {
SendPacket(EncryptedLpPacket),
/// Deliver decrypted application data received from the peer.
DeliverData(LpMessage),
DeliverFrame(LpFrame),
}
pub type SessionId = [u8; 32];
@@ -154,12 +157,34 @@ impl LpTransportSession {
local_peer: LpLocalPeer,
remote_peer: LpRemotePeer,
remote_protocol_version: u8,
) -> PSQHandshakeStateInitiator<'_, S>
mode: HandshakeMode,
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
where
S: LpHandshakeChannel + Unpin,
{
PSQHandshakeState::new(connection, local_peer)
.as_initiator(InitiatorData::new(remote_protocol_version, remote_peer))
PSQHandshakeState::new(connection, local_peer).as_initiator(
InitiatorData::new(remote_protocol_version, remote_peer),
mode,
)
}
/// Helper function to create `PSQHandshakeState` for the handshake initiator for mutual KKT
pub fn psq_handshake_initiator_mutual_internode<S>(
connection: &'_ mut S,
local_peer: LpLocalPeer,
remote_peer: LpRemotePeer,
remote_protocol_version: u8,
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
where
S: LpHandshakeChannel + Unpin,
{
Self::psq_handshake_initiator(
connection,
local_peer,
remote_peer,
remote_protocol_version,
HandshakeMode::MutualInternode,
)
}
/// Helper function to create `PSQHandshakeState` for the handshake responder
@@ -173,6 +198,19 @@ impl LpTransportSession {
PSQHandshakeState::new(connection, local_peer).as_responder(ResponderData::default())
}
/// Helper function to create `PSQHandshakeState` for the handshake responder for mutual KKT
pub fn psq_handshake_responder_mutual<S>(
connection: &'_ mut S,
local_peer: LpLocalPeer,
initiator_kem_hashes: BTreeMap<KEM, KEMKeyDigests>,
) -> PSQHandshakeStateResponder<'_, S>
where
S: LpHandshakeChannel + Unpin,
{
PSQHandshakeState::new(connection, local_peer)
.as_responder(ResponderData::default().with_initiator_kem_hashes(initiator_kem_hashes))
}
pub fn session_binding(&self) -> &PersistentSessionBinding {
&self.session_binding
}
@@ -196,10 +234,10 @@ impl LpTransportSession {
self.protocol_version
}
pub fn next_packet(&mut self, message: LpMessage) -> Result<LpPacket, LpError> {
pub fn next_packet(&mut self, frame: LpFrame) -> Result<LpPacket, LpError> {
let counter = self.next_counter();
let header = LpHeader::new(self.receiver_index(), counter, self.protocol_version);
let packet = LpPacket::new(header, message);
let packet = LpPacket::new(header, frame);
Ok(packet)
}
@@ -261,22 +299,19 @@ impl LpTransportSession {
self.receiving_counter.current_packet_cnt()
}
/// Encrypts a produced application using the established transport session
/// and produce an `EncryptedLpPacket`
/// Wrap the provided `LpFrame` into an `LpPacket` and encrypt its content using the established transport session
/// to produce an `EncryptedLpPacket`
///
/// # Arguments
///
/// * `data` - plaintext data to encrypt
/// * `frame` - structured `LpFrame` to wrap and encrypt
///
/// # Returns
///
/// * `Ok(EncryptedLpPacket)` containing the encrypted message ciphertext.
/// * `Err(LpError)` if the session is not in transport mode or encryption fails.
pub(crate) fn encrypt_application_data(
&mut self,
data: LpMessage,
) -> Result<EncryptedLpPacket, LpError> {
let packet = self.next_packet(data)?;
pub(crate) fn wrap_lp_frame(&mut self, frame: LpFrame) -> Result<EncryptedLpPacket, LpError> {
let packet = self.next_packet(frame)?;
encrypt_lp_packet(packet, &mut self.active_transport)
}
@@ -284,7 +319,7 @@ impl LpTransportSession {
///
/// # Arguments
///
/// * `ciphertext` - The encrypted packet
/// * `packet` - The encrypted packet
///
/// # Returns
///
@@ -320,11 +355,11 @@ impl LpTransportSession {
self.receiving_counter_mark(ctr)?;
// 4. deliver the message
Ok(LpAction::DeliverData(packet.message))
Ok(LpAction::DeliverFrame(packet.frame))
}
LpInput::SendData(data) => {
LpInput::SendFrame(data) => {
// Encrypt and send application data
match self.encrypt_application_data(data) {
match self.wrap_lp_frame(data) {
Ok(packet) => Ok(LpAction::SendPacket(packet)),
Err(e) => Err(e),
}
@@ -438,8 +473,9 @@ mod tests {
// --- Transport Phase ---
println!("--- Step 1: Initiator sends data ---");
let data_to_send_1 = LpMessage::new_opaque(b"hello responder".to_vec());
let init_actions_4 = initiator.process_input(LpInput::SendData(data_to_send_1.clone()));
let data_to_send_1 = LpFrame::new_opaque(b"hello responder".to_vec());
let init_actions_4 =
initiator.process_input(LpInput::SendFrame(data_to_send_1.clone()));
let data_packet_1 = if let Ok(LpAction::SendPacket(packet)) = init_actions_4 {
packet.clone()
} else {
@@ -449,7 +485,7 @@ mod tests {
println!("--- Step 2: Responder receives data ---");
let resp_actions_5 = responder.process_input(LpInput::ReceivePacket(data_packet_1));
let resp_data_1 = if let Ok(LpAction::DeliverData(data)) = resp_actions_5 {
let resp_data_1 = if let Ok(LpAction::DeliverFrame(data)) = resp_actions_5 {
data
} else {
panic!("Responder should deliver data");
@@ -457,8 +493,9 @@ mod tests {
assert_eq!(resp_data_1, data_to_send_1);
println!("--- Step 3: Responder sends data ---");
let data_to_send_2 = LpMessage::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 = responder.process_input(LpInput::SendData(data_to_send_2.clone()));
let data_to_send_2 = LpFrame::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 =
responder.process_input(LpInput::SendFrame(data_to_send_2.clone()));
let data_packet_2 = if let Ok(LpAction::SendPacket(packet)) = resp_actions_6 {
packet.clone()
} else {
@@ -468,7 +505,7 @@ mod tests {
println!("--- Step 4: Initiator receives data ---");
let init_actions_5 = initiator.process_input(LpInput::ReceivePacket(data_packet_2));
if let Ok(LpAction::DeliverData(data)) = init_actions_5 {
if let Ok(LpAction::DeliverFrame(data)) = init_actions_5 {
assert_eq!(data, data_to_send_2);
} else {
panic!("Initiator should deliver data");
+18 -20
View File
@@ -1,6 +1,6 @@
#[cfg(test)]
mod tests {
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::packet::{EncryptedLpPacket, LpFrame};
use crate::session::{LpAction, LpInput};
use crate::{LpError, SessionManager, SessionsMock};
use nym_kkt_ciphersuite::{IntoEnumIterator, KEM};
@@ -9,7 +9,7 @@ mod tests {
trait ActionExtract {
fn ciphertext(self) -> EncryptedLpPacket;
fn data(self) -> LpMessage;
fn data(self) -> LpFrame;
}
impl ActionExtract for LpAction {
@@ -21,8 +21,8 @@ mod tests {
}
}
fn data(self) -> LpMessage {
if let LpAction::DeliverData(data) = self {
fn data(self) -> LpFrame {
if let LpAction::DeliverFrame(data) = self {
data
} else {
panic!("invalid action");
@@ -54,7 +54,7 @@ mod tests {
// --- A sends to B ---
let plaintext_a = format!("A->B Message {i}").into_bytes();
let ciphertext_a = session_manager_1
.send_data(peer_a_sm, LpMessage::new_opaque(plaintext_a.clone()))
.send_frame(peer_a_sm, LpFrame::new_opaque(plaintext_a.clone()))
.unwrap()
.ciphertext();
@@ -68,7 +68,7 @@ mod tests {
// --- B sends to A ---
let plaintext_b = format!("B->A Message {i}").into_bytes();
let ciphertext_b = session_manager_2
.send_data(peer_b_sm, LpMessage::new_opaque(plaintext_b.clone()))
.send_frame(peer_b_sm, LpFrame::new_opaque(plaintext_b.clone()))
.unwrap()
.ciphertext();
@@ -183,15 +183,13 @@ mod tests {
// --- 3. Simulate Data Transfer via process_input ---
println!("Starting data transfer simulation via process_input...");
let plaintext_a_to_b =
LpMessage::new_opaque(b"Hello from A via process_input!".to_vec());
let plaintext_b_to_a =
LpMessage::new_opaque(b"Hello from B via process_input!".to_vec());
let plaintext_a_to_b = LpFrame::new_opaque(b"Hello from A via process_input!".to_vec());
let plaintext_b_to_a = LpFrame::new_opaque(b"Hello from B via process_input!".to_vec());
// --- A sends to B ---
println!(" A sends to B");
let action_a_send = session_manager_1
.process_input(session_id, LpInput::SendData(plaintext_a_to_b.clone()))
.process_input(session_id, LpInput::SendFrame(plaintext_a_to_b.clone()))
.expect("A SendData failed");
let data_packet_a = action_a_send.ciphertext();
@@ -202,7 +200,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(data_packet_a))
.expect("B ReceivePacket (data) failed");
if let LpAction::DeliverData(data) = action_b_recv {
if let LpAction::DeliverFrame(data) = action_b_recv {
assert_eq!(data, plaintext_a_to_b, "Decrypted data mismatch A->B");
println!(
" B successfully decrypted: {:?}",
@@ -215,7 +213,7 @@ mod tests {
// --- B sends to A ---
println!(" B sends to A");
let action_b_send = session_manager_2
.process_input(session_id, LpInput::SendData(plaintext_b_to_a.clone()))
.process_input(session_id, LpInput::SendFrame(plaintext_b_to_a.clone()))
.expect("B SendData failed");
let data_packet_b = action_b_send.ciphertext();
@@ -229,7 +227,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(data_packet_b))
.expect("A ReceivePacket (data) failed");
if let LpAction::DeliverData(data) = action_a_recv {
if let LpAction::DeliverFrame(data) = action_a_recv {
assert_eq!(data, plaintext_b_to_a, "Decrypted data mismatch B->A");
println!(
" A successfully decrypted: {:?}",
@@ -258,11 +256,11 @@ mod tests {
println!("Testing out-of-order reception via process_input...");
// A prepares N+1 then N
let data_n_plus_1 = LpMessage::new_opaque(b"Message N+1".to_vec());
let data_n = LpMessage::new_opaque(b"Message N".to_vec());
let data_n_plus_1 = LpFrame::new_opaque(b"Message N+1".to_vec());
let data_n = LpFrame::new_opaque(b"Message N".to_vec());
let action_send_n1 = session_manager_1
.process_input(session_id, LpInput::SendData(data_n_plus_1.clone()))
.process_input(session_id, LpInput::SendFrame(data_n_plus_1.clone()))
.unwrap();
let packet_n1 = match action_send_n1 {
LpAction::SendPacket(p) => p,
@@ -270,7 +268,7 @@ mod tests {
};
let action_send_n = session_manager_1
.process_input(session_id, LpInput::SendData(data_n.clone()))
.process_input(session_id, LpInput::SendFrame(data_n.clone()))
.unwrap();
let packet_n = match action_send_n {
LpAction::SendPacket(p) => p,
@@ -284,7 +282,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(packet_n1))
.unwrap();
match action_recv_n1 {
LpAction::DeliverData(d) => assert_eq!(d, data_n_plus_1, "Data N+1 mismatch"),
LpAction::DeliverFrame(d) => assert_eq!(d, data_n_plus_1, "Data N+1 mismatch"),
_ => panic!("Expected DeliverData for N+1"),
}
@@ -294,7 +292,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(packet_n))
.unwrap();
match action_recv_n {
LpAction::DeliverData(d) => assert_eq!(d, data_n, "Data N mismatch"),
LpAction::DeliverFrame(d) => assert_eq!(d, data_n, "Data N mismatch"),
_ => panic!("Expected DeliverData for N"),
}
+4 -4
View File
@@ -6,7 +6,7 @@
//! This module implements session lifecycle management functionality, handling
//! creation, retrieval, and storage of sessions.
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::packet::{EncryptedLpPacket, LpFrame};
use crate::peer_config::LpReceiverIndex;
use crate::{LpError, LpTransportSession};
use std::collections::HashMap;
@@ -39,12 +39,12 @@ impl SessionManager {
self.with_session_mut(lp_id, |sm| sm.process_input(input))?
}
pub fn send_data(
pub fn send_frame(
&mut self,
lp_id: LpReceiverIndex,
data: LpMessage,
frame: LpFrame,
) -> Result<LpAction, LpError> {
self.process_input(lp_id, LpInput::SendData(data))
self.process_input(lp_id, LpInput::SendFrame(frame))
}
pub fn receive_packet(
@@ -1,5 +1,6 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
pub mod identifier;
pub mod key;
@@ -1,6 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated struct `nym_crypto::generic_array::GenericArray`: please upgrade to generic-array 1.x - TODO
pub mod encryption_key;
pub mod reply_surb;
pub mod requests;
@@ -0,0 +1,64 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nyxd_scraper_shared::error::ScraperError;
use nyxd_scraper_shared::storage::FullBlockInformation;
use nyxd_scraper_shared::watcher::{NyxdWatcher, WatcherConfig};
use nyxd_scraper_shared::{BlockModule, ParsedTransactionResponse, TxModule};
struct FancyBlockModule;
struct FancyTxModule;
#[async_trait::async_trait]
impl BlockModule for FancyBlockModule {
async fn handle_block(&mut self, block: &FullBlockInformation) -> Result<(), ScraperError> {
println!("🚀 got new block for height {}", block.block.header.height);
// should be false
println!("results scraped: {}", block.results.is_some());
// should be false
println!("validators scraped: {}", block.validators.is_some());
// should be true
println!("transactions scraped: {}", block.transactions.is_some());
println!();
Ok(())
}
}
#[async_trait::async_trait]
impl TxModule for FancyTxModule {
async fn handle_tx(&mut self, tx: &ParsedTransactionResponse) -> Result<(), ScraperError> {
println!(
"✨ got new tx for height {}: {} ({} msgs)",
tx.block.header.height,
tx.hash,
tx.parsed_messages.len()
);
Ok(())
}
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
let cfg = WatcherConfig {
websocket_url: "wss://rpc.nymtech.net/websocket".parse()?,
rpc_url: "https://rpc.nymtech.net".parse()?,
};
let watcher = NyxdWatcher::builder(cfg)
.with_block_module(FancyBlockModule)
.with_tx_module(FancyTxModule)
.build_and_start()
.await?;
// run for 30s before shutting down
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
watcher.stop().await;
Ok(())
}
@@ -0,0 +1,94 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::NyxdScraperStorageError;
use crate::{NyxdScraperStorage, NyxdScraperTransaction, ParsedTransactionResponse};
use tendermint::Block;
use tendermint::block::Commit;
use tendermint_rpc::endpoint::validators::Response;
use thiserror::Error;
#[derive(Clone)]
pub struct Ephemeral;
#[derive(Debug, Error)]
#[error("no storage backend enabled")]
pub struct EphemeralStorageError;
pub struct EphemeralTransaction;
#[async_trait::async_trait]
impl NyxdScraperTransaction for EphemeralTransaction {
async fn commit(self) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_validators(&mut self, _: &Response) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_block_data(
&mut self,
_: &Block,
_: i64,
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_commits(
&mut self,
_: &Commit,
_: &Response,
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_txs(
&mut self,
_: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_messages(
&mut self,
_: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn update_last_processed(&mut self, _: i64) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
}
#[async_trait::async_trait]
impl NyxdScraperStorage for Ephemeral {
type StorageTransaction = EphemeralTransaction;
async fn initialise(_: &str, _: &bool) -> Result<Self, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn prune_storage(&self, _: u32, _: u32) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
}
@@ -2,13 +2,16 @@
// SPDX-License-Identifier: Apache-2.0
use crate::PruningOptions;
use crate::block_processor::ephemeral_storage::Ephemeral;
use crate::block_processor::helpers::split_request_range;
use crate::block_processor::types::BlockToProcess;
use crate::block_requester::BlockRequest;
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::storage::{NyxdScraperStorage, NyxdScraperTransaction, persist_block};
use crate::rpc_client::{RetrievalConfig, RpcClient};
use crate::storage::{
FullBlockInformation, NyxdScraperStorage, NyxdScraperTransaction, persist_block,
};
use futures::StreamExt;
use std::cmp::max;
use std::collections::{BTreeMap, HashSet, VecDeque};
@@ -22,6 +25,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, trace, warn};
mod ephemeral_storage;
mod helpers;
pub(crate) mod pruning;
pub(crate) mod types;
@@ -77,20 +81,148 @@ impl BlockProcessorConfig {
}
}
pub struct BlockProcessor<S> {
pub struct BlockProcessorPersistence<S> {
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
last_pruned_height: u32,
storage: S,
}
impl<S> BlockProcessorPersistence<S>
where
S: NyxdScraperStorage,
{
pub(crate) async fn new(
config: BlockProcessorConfig,
synced: Arc<Notify>,
storage: S,
) -> Result<Self, ScraperError> {
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(pruned_height = %last_pruned_height, "setting up block processor...");
Ok(Self {
config,
synced,
last_pruned_height,
storage,
})
}
#[must_use]
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.config.pruning_options = pruning_options;
self
}
async fn stored_last_processed_height(&self) -> Result<u32, ScraperError> {
let last_processed = self.storage.get_last_processed_height().await?;
let last_processed_height = last_processed.try_into().unwrap_or_default();
Ok(last_processed_height)
}
async fn persist_block(
&mut self,
full_info: &FullBlockInformation,
) -> Result<(), ScraperError> {
// process the entire block as a transaction so that if anything fails,
// we wouldn't end up with a corrupted storage.
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(full_info, &mut tx, self.config.store_precommits).await?;
let commit_start = Instant::now();
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
let last_processed_height = full_info.block.header.height.value() as u32;
if let Err(err) = self.maybe_prune_storage(last_processed_height).await {
error!("failed to prune the storage: {err}");
}
Ok(())
}
#[instrument(skip(self))]
async fn prune_storage(&mut self, last_processed_height: u32) -> Result<(), ScraperError> {
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = last_processed_height - keep_recent;
info!(
keep_recent,
oldest_to_keep = last_to_keep,
"pruning the storage"
);
let lowest: u32 = self
.storage
.lowest_block_height()
.await?
.unwrap_or_default()
.try_into()
.unwrap_or_default();
let to_prune = last_to_keep.saturating_sub(lowest);
match to_prune {
v if v > 1000 => warn!("approximately {v} blocks worth of data will be pruned"),
v if v > 100 => info!("approximately {v} blocks worth of data will be pruned"),
0 => trace!("no blocks to prune"),
v => debug!("approximately {v} blocks worth of data will be pruned"),
}
if to_prune == 0 {
self.last_pruned_height = last_processed_height;
return Ok(());
}
self.storage
.prune_storage(last_to_keep, last_processed_height)
.await?;
self.last_pruned_height = last_processed_height;
Ok(())
}
async fn maybe_prune_storage(
&mut self,
last_processed_height: u32,
) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= last_processed_height {
self.prune_storage(last_processed_height).await?;
}
Ok(())
}
}
pub struct BlockProcessor<S = Ephemeral> {
cancel: CancellationToken,
last_processed_height: u32,
last_processed_at: Instant,
pending_sync: PendingSync,
queued_blocks: BTreeMap<u32, BlockToProcess>,
/// Specifies how much data to actually retrieve per block
retrieval_config: RetrievalConfig,
rpc_client: RpcClient,
incoming: UnboundedReceiverStream<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: S,
persistence: Option<BlockProcessorPersistence<S>>,
// future work: rather than sending each msg to every msg module,
// let them subscribe based on `type_url` inside the message itself
@@ -105,108 +237,97 @@ impl<S> BlockProcessor<S>
where
S: NyxdScraperStorage,
{
pub async fn new(
config: BlockProcessorConfig,
pub fn new(
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: S,
rpc_client: RpcClient,
) -> Result<Self, ScraperError> {
let last_processed = storage.get_last_processed_height().await?;
let last_processed_height = last_processed.try_into().unwrap_or_default();
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(last_processed_height = %last_processed_height, pruned_height = %last_pruned_height, "setting up block processor...");
Ok(BlockProcessor {
config,
) -> Self {
BlockProcessor {
cancel,
synced,
last_processed_height,
last_pruned_height,
last_processed_height: Default::default(),
last_processed_at: Instant::now(),
pending_sync: Default::default(),
queued_blocks: Default::default(),
retrieval_config: RetrievalConfig::default(),
rpc_client,
incoming: incoming.into(),
block_requester,
storage,
persistence: None,
block_modules: vec![],
tx_modules: vec![],
msg_modules: vec![],
})
}
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.config.pruning_options = pruning_options;
#[must_use]
pub fn with_retrieval_config(mut self, retrieval_config: RetrievalConfig) -> Self {
self.retrieval_config = retrieval_config;
self
}
pub async fn with_persistence(
mut self,
persistence: BlockProcessorPersistence<S>,
) -> Result<Self, ScraperError> {
let last_processed_height = persistence.stored_last_processed_height().await?;
debug!(last_processed_height = %last_processed_height, "setting up block processor...");
self.persistence = Some(persistence);
Ok(self)
}
pub(super) async fn process_block(
&mut self,
block: BlockToProcess,
) -> Result<(), ScraperError> {
info!("processing block at height {}", block.height);
let full_info = self.rpc_client.try_get_full_details(block).await?;
let full_info = self
.rpc_client
.try_get_full_details(block, self.retrieval_config)
.await?;
debug!(
"this block has {} transaction(s)",
full_info.transactions.len()
);
for tx in &full_info.transactions {
debug!("{} has {} message(s)", tx.hash, tx.tx.body.messages.len());
for (index, msg) in tx.tx.body.messages.iter().enumerate() {
debug!("{index}: {:?}", msg.type_url)
if let Some(tx_info) = &full_info.transactions {
debug!("this block has {} transaction(s)", tx_info.len());
for tx in tx_info {
debug!("{} has {} message(s)", tx.hash, tx.tx.body.messages.len());
for (index, msg) in tx.tx.body.messages.iter().enumerate() {
debug!("{index}: {:?}", msg.type_url)
}
}
}
// process the entire block as a transaction so that if anything fails,
// we won't end up with a corrupted storage.
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
// if we have enabled persistence, do try to store the block information
if let Some(persistence) = &mut self.persistence {
persistence.persist_block(&full_info).await?;
}
// let the modules do whatever they want
// the ones wanting the full block:
for block_module in &mut self.block_modules {
block_module.handle_block(&full_info, &mut tx).await?;
block_module.handle_block(&full_info).await?;
}
// the ones wanting transactions:
for block_tx in full_info.transactions {
for tx_module in &mut self.tx_modules {
tx_module.handle_tx(&block_tx, &mut tx).await?;
}
// the ones concerned with individual messages
for (index, msg) in block_tx.tx.body.messages.iter().enumerate() {
for msg_module in &mut self.msg_modules {
if msg.type_url == msg_module.type_url() {
msg_module
.handle_msg(index, msg, &block_tx, &mut tx)
.await?
// the ones wanting transactions (assuming tx retrieval is enabled):
if let Some(tx_info) = &full_info.transactions {
for block_tx in tx_info {
for tx_module in &mut self.tx_modules {
tx_module.handle_tx(block_tx).await?;
}
// the ones concerned with individual messages
for (index, msg) in block_tx.tx.body.messages.iter().enumerate() {
for msg_module in &mut self.msg_modules {
if msg.type_url == msg_module.type_url() {
msg_module.handle_msg(index, msg, block_tx).await?
}
}
}
}
}
let commit_start = Instant::now();
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
self.last_processed_height = full_info.block.header.height.value() as u32;
self.last_processed_at = Instant::now();
if let Err(err) = self.maybe_prune_storage().await {
error!("failed to prune the storage: {err}");
}
Ok(())
}
@@ -284,62 +405,6 @@ where
Ok(())
}
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
keep_recent,
oldest_to_keep = last_to_keep,
"pruning the storage"
);
let lowest: u32 = self
.storage
.lowest_block_height()
.await?
.unwrap_or_default()
.try_into()
.unwrap_or_default();
let to_prune = last_to_keep.saturating_sub(lowest);
match to_prune {
v if v > 1000 => warn!("approximately {v} blocks worth of data will be pruned"),
v if v > 100 => info!("approximately {v} blocks worth of data will be pruned"),
0 => trace!("no blocks to prune"),
v => debug!("approximately {v} blocks worth of data will be pruned"),
}
if to_prune == 0 {
self.last_pruned_height = self.last_processed_height;
return Ok(());
}
self.storage
.prune_storage(last_to_keep, self.last_processed_height)
.await?;
self.last_pruned_height = self.last_processed_height;
Ok(())
}
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
Ok(())
}
async fn next_incoming(&mut self, block: BlockToProcess) {
let height = block.height;
@@ -379,8 +444,10 @@ where
self.try_request_pending().await;
if self.pending_sync.is_empty() {
self.synced.notify_one();
if let Some(persistence) = &self.persistence
&& self.pending_sync.is_empty()
{
persistence.synced.notify_one();
}
}
@@ -410,7 +477,14 @@ where
assert!(self.pending_sync.is_empty());
info!("attempting to run startup resync...");
self.maybe_prune_storage().await?;
let Some(persistence) = self.persistence.as_mut() else {
// without data persistence, we're always starting from scratch
return Ok(());
};
persistence
.maybe_prune_storage(self.last_processed_height)
.await?;
let latest_block = self.rpc_client.current_block_height().await? as u32;
info!("obtained latest block height: {latest_block}");
@@ -419,10 +493,10 @@ where
info!("we have already processed some blocks in the past - attempting to resume...");
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let keep_recent = persistence.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
if !self.config.pruning_options.strategy.is_nothing() {
if !persistence.config.pruning_options.strategy.is_nothing() {
self.last_processed_height = max(self.last_processed_height, last_to_keep);
}
@@ -440,7 +514,7 @@ where
// this is the first time starting up
if self.last_processed_height == 0 {
info!("this is the first time starting up");
let Some(starting_height) = self.config.explicit_starting_block_height else {
let Some(starting_height) = persistence.config.explicit_starting_block_height else {
info!("no starting block height set - will use the default behaviour");
// nothing to do
return Ok(());
@@ -451,7 +525,9 @@ where
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");
if earliest_available > starting_height && self.config.use_best_effort_start_height {
if earliest_available > starting_height
&& persistence.config.use_best_effort_start_height
{
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
@@ -2,8 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::ScraperError;
use crate::helpers;
use std::collections::HashMap;
use std::collections::BTreeMap;
use tendermint::{Block, Hash, abci, block, tx};
use tendermint_rpc::endpoint::{block as block_endpoint, block_results, validators};
use tendermint_rpc::event::{Event, EventData};
@@ -28,9 +27,9 @@ pub struct ParsedTransactionResponse {
pub proof: Option<tx::Proof>,
pub parsed_messages: HashMap<usize, serde_json::Value>,
pub parsed_messages: BTreeMap<usize, serde_json::Value>,
pub parsed_message_urls: HashMap<usize, String>,
pub parsed_message_urls: BTreeMap<usize, String>,
pub block: Block,
}
@@ -41,32 +40,13 @@ pub struct FullBlockInformation {
pub block: Block,
/// All of the emitted events alongside any tx results.
pub results: block_results::Response,
pub results: Option<block_results::Response>,
/// Validator set for this particular block
pub validators: validators::Response,
pub validators: Option<validators::Response>,
/// Transaction results from this particular block
pub transactions: Vec<ParsedTransactionResponse>,
}
impl FullBlockInformation {
pub fn ensure_proposer(&self) -> Result<(), ScraperError> {
let block_proposer = self.block.header.proposer_address;
if !self
.validators
.validators
.iter()
.any(|v| v.address == block_proposer)
{
let proposer = helpers::validator_consensus_address(block_proposer)?;
return Err(ScraperError::BlockProposerNotInValidatorSet {
height: self.block.header.height.value() as u32,
proposer: proposer.to_string(),
});
}
Ok(())
}
pub transactions: Option<Vec<ParsedTransactionResponse>>,
}
pub(crate) struct BlockToProcess {
+8 -4
View File
@@ -172,16 +172,20 @@ pub enum ScraperError {
#[source]
error: base64::DecodeError,
},
#[error("no modules configured for the chain watcher")]
NoModulesConfigured,
#[error("no storage configured for the chain watcher")]
NoStorageConfigured,
}
impl ScraperError {
pub fn tx_begin_failure(source: NyxdScraperStorageError) -> ScraperError
where {
pub fn tx_begin_failure(source: NyxdScraperStorageError) -> ScraperError {
ScraperError::StorageTxBeginFailure { source }
}
pub fn tx_commit_failure(source: NyxdScraperStorageError) -> ScraperError
where {
pub fn tx_commit_failure(source: NyxdScraperStorageError) -> ScraperError {
ScraperError::StorageTxCommitFailure { source }
}
}
+2
View File
@@ -11,6 +11,8 @@ pub mod modules;
pub(crate) mod rpc_client;
pub(crate) mod scraper;
pub mod storage;
pub(crate) mod subscriber;
pub mod watcher;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use block_processor::types::ParsedTransactionResponse;
@@ -3,14 +3,9 @@
use crate::block_processor::types::FullBlockInformation;
use crate::error::ScraperError;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
pub trait BlockModule {
async fn handle_block(
&mut self,
block: &FullBlockInformation,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
async fn handle_block(&mut self, block: &FullBlockInformation) -> Result<(), ScraperError>;
}
@@ -3,7 +3,6 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
use cosmrs::Any;
@@ -16,6 +15,5 @@ pub trait MsgModule {
index: usize,
msg: &Any,
tx: &ParsedTransactionResponse,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,14 +3,9 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
pub trait TxModule {
async fn handle_tx(
&mut self,
tx: &ParsedTransactionResponse,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
async fn handle_tx(&mut self, tx: &ParsedTransactionResponse) -> Result<(), ScraperError>;
}
+86 -21
View File
@@ -9,15 +9,32 @@ use crate::helpers::tx_hash;
use crate::{Any, MessageRegistry, default_message_registry};
use futures::StreamExt;
use futures::future::join3;
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::sync::Arc;
use tendermint::Hash;
use tendermint::{Block, Hash};
use tendermint_rpc::endpoint::{block, block_results, tx, validators};
use tendermint_rpc::{Client, HttpClient, Paging};
use tokio::sync::Mutex;
use tracing::{debug, instrument, warn};
use url::Url;
#[derive(Debug, Clone, Copy)]
pub struct RetrievalConfig {
pub get_validators: bool,
pub get_transactions: bool,
pub get_block_results: bool,
}
impl Default for RetrievalConfig {
fn default() -> Self {
Self {
get_validators: true,
get_transactions: true,
get_block_results: true,
}
}
}
#[derive(Clone)]
pub struct RpcClient {
// right now I don't care about anything nym specific, so a simple http client is sufficient,
@@ -53,27 +70,15 @@ impl RpcClient {
}
}
#[instrument(skip(self, block), fields(height = block.height))]
pub async fn try_get_full_details(
fn parse_transactions(
&self,
block: BlockToProcess,
) -> Result<FullBlockInformation, ScraperError> {
debug!("getting complete block details");
let height = block.height;
// make all the http requests concurrently
let (results, validators, raw_transactions) = join3(
self.get_block_results(height),
self.get_validators_details(height),
self.get_transaction_results(&block.block.data),
)
.await;
let raw_transactions = raw_transactions?;
raw_transactions: Vec<tx::Response>,
block: &Block,
) -> Result<Vec<ParsedTransactionResponse>, ScraperError> {
let mut transactions = Vec::with_capacity(raw_transactions.len());
for raw_tx in raw_transactions {
let mut parsed_messages = HashMap::new();
let mut parsed_message_urls = HashMap::new();
let mut parsed_messages = BTreeMap::new();
let mut parsed_message_urls = BTreeMap::new();
let tx = cosmrs::Tx::from_bytes(&raw_tx.tx).map_err(|source| {
ScraperError::TxParseFailure {
hash: raw_tx.hash,
@@ -97,9 +102,33 @@ impl RpcClient {
proof: raw_tx.proof,
parsed_messages,
parsed_message_urls,
block: block.block.clone(),
block: block.clone(),
})
}
Ok(transactions)
}
#[instrument(skip(self, block), fields(height = block.height))]
pub async fn try_get_full_details(
&self,
block: BlockToProcess,
config: RetrievalConfig,
) -> Result<FullBlockInformation, ScraperError> {
debug!("getting complete block details");
let height = block.height;
// make all the http requests run concurrently
let (results, validators, raw_transactions) = join3(
self.maybe_get_block_results(height, config.get_block_results),
self.maybe_get_validators_details(height, config.get_validators),
self.maybe_get_transaction_results(&block.block.data, config.get_transactions),
)
.await;
let transactions = match raw_transactions? {
Some(raw) => Some(self.parse_transactions(raw, &block.block)?),
None => None,
};
Ok(FullBlockInformation {
block: block.block,
@@ -140,6 +169,18 @@ impl RpcClient {
})
}
async fn maybe_get_block_results(
&self,
height: u32,
retrieve: bool,
) -> Result<Option<block_results::Response>, ScraperError> {
if retrieve {
self.get_block_results(height).await.map(Some)
} else {
Ok(None)
}
}
pub(crate) async fn current_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting current block height");
@@ -196,6 +237,18 @@ impl RpcClient {
inner.into_values().collect()
}
async fn maybe_get_transaction_results(
&self,
raw: &[Vec<u8>],
retrieve: bool,
) -> Result<Option<Vec<tx::Response>>, ScraperError> {
if retrieve {
self.get_transaction_results(raw).await.map(Some)
} else {
Ok(None)
}
}
#[instrument(skip(self, tx_hash), fields(tx_hash = %tx_hash), err(Display))]
async fn get_transaction_result(&self, tx_hash: Hash) -> Result<tx::Response, ScraperError> {
debug!("getting tx results");
@@ -224,4 +277,16 @@ impl RpcClient {
source: Box::new(source),
})
}
async fn maybe_get_validators_details(
&self,
height: u32,
retrieve: bool,
) -> Result<Option<validators::Response>, ScraperError> {
if retrieve {
self.get_validators_details(height).await.map(Some)
} else {
Ok(None)
}
}
}
@@ -1,15 +1,15 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::PruningOptions;
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
use crate::block_processor::{BlockProcessor, BlockProcessorConfig, BlockProcessorPersistence};
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::scraper::subscriber::ChainSubscriber;
use crate::storage::NyxdScraperStorage;
use crate::subscriber::ChainSubscriber;
use futures::future::join_all;
use std::marker::PhantomData;
use std::sync::Arc;
@@ -22,8 +22,6 @@ use tokio_util::task::TaskTracker;
use tracing::{error, info};
use url::Url;
mod subscriber;
#[derive(Default, Clone, Copy)]
pub struct StartingBlockOpts {
pub start_block_height: Option<u32>,
@@ -72,7 +70,7 @@ where
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
let rpc_client = RpcClient::new(&scraper.config.rpc_url)?;
let rpc_client = scraper.rpc_client.clone();
// create the tasks
let block_requester = BlockRequester::new(
@@ -90,14 +88,19 @@ where
);
let mut block_processor = BlockProcessor::new(
block_processor_config,
scraper.cancel_token.clone(),
scraper.startup_sync.clone(),
processing_rx,
req_tx,
scraper.storage.clone(),
rpc_client,
)
.with_persistence(
BlockProcessorPersistence::new(
block_processor_config,
scraper.startup_sync.clone(),
scraper.storage.clone(),
)
.await?,
)
.await?;
block_processor.set_block_modules(self.block_modules);
block_processor.set_tx_modules(self.tx_modules);
@@ -116,6 +119,22 @@ where
Ok(scraper)
}
pub async fn build_unsafe(self) -> Result<NyxdScraper<S>, ScraperError> {
self.config.pruning_options.validate()?;
let storage =
S::initialise(&self.config.database_storage, &self.config.run_migrations).await?;
let rpc_client = RpcClient::new(&self.config.rpc_url)?;
Ok(NyxdScraper {
config: self.config,
task_tracker: TaskTracker::new(),
cancel_token: CancellationToken::new(),
startup_sync: Arc::new(Default::default()),
storage,
rpc_client,
})
}
pub fn new(config: Config) -> Self {
NyxdScraperBuilder {
_storage: PhantomData,
@@ -126,16 +145,19 @@ where
}
}
#[must_use]
pub fn with_block_module<M: BlockModule + Send + 'static>(mut self, module: M) -> Self {
self.block_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_tx_module<M: TxModule + Send + 'static>(mut self, module: M) -> Self {
self.tx_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_msg_module<M: MsgModule + Send + 'static>(mut self, module: M) -> Self {
self.msg_modules.push(Box::new(module));
self
@@ -209,9 +231,12 @@ where
let (req_tx, _) = channel(5);
let mut block_processor = self
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
.new_block_processor_with_persistence(
req_tx.clone(),
processing_rx,
PruningOptions::nothing(),
)
.await?;
let block = self.rpc_client.get_basic_block_details(height).await?;
@@ -234,9 +259,12 @@ where
let (req_tx, _) = channel(5);
let mut block_processor = self
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
.new_block_processor_with_persistence(
req_tx.clone(),
processing_rx,
PruningOptions::nothing(),
)
.await?;
let mut current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
@@ -345,10 +373,24 @@ where
)
}
async fn new_block_processor(
fn new_block_processor(
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> BlockProcessor<S> {
BlockProcessor::<S>::new(
self.cancel_token.clone(),
processing_rx,
req_tx,
self.rpc_client.clone(),
)
}
async fn new_block_processor_with_persistence(
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
pruning_options: impl Into<Option<PruningOptions>>,
) -> Result<BlockProcessor<S>, ScraperError> {
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
@@ -357,16 +399,27 @@ where
self.config.start_block.use_best_effort_start_height,
);
BlockProcessor::<S>::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
req_tx,
self.storage.clone(),
self.rpc_client.clone(),
)
.await
let persistence = match pruning_options.into() {
Some(options) => BlockProcessorPersistence::new(
block_processor_config,
self.startup_sync.clone(),
self.storage.clone(),
)
.await?
.with_pruning(options),
None => {
BlockProcessorPersistence::new(
block_processor_config,
self.startup_sync.clone(),
self.storage.clone(),
)
.await?
}
};
self.new_block_processor(req_tx, processing_rx)
.with_persistence(persistence)
.await
}
async fn new_chain_subscriber(
@@ -388,7 +441,9 @@ where
// create the tasks
let block_requester = self.new_block_requester(req_rx, processing_tx.clone());
let block_processor = self.new_block_processor(req_tx, processing_rx).await?;
let block_processor = self
.new_block_processor_with_persistence(req_tx, processing_rx, None)
.await?;
let chain_subscriber = self.new_chain_subscriber(processing_tx).await?;
// spawn them
+41 -15
View File
@@ -3,6 +3,7 @@
use crate::error::ScraperError;
use async_trait::async_trait;
use tendermint::block;
use thiserror::Error;
use tracing::warn;
@@ -89,6 +90,25 @@ pub trait NyxdScraperTransaction {
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError>;
}
fn ensure_proposer_present(
block_header: &block::Header,
validators: &validators::Response,
) -> Result<(), ScraperError> {
let block_proposer = block_header.proposer_address;
if !validators
.validators
.iter()
.any(|v| v.address == block_proposer)
{
let proposer = crate::helpers::validator_consensus_address(block_proposer)?;
return Err(ScraperError::BlockProposerNotInValidatorSet {
height: block_header.height.value() as u32,
proposer: proposer.to_string(),
});
}
Ok(())
}
pub async fn persist_block<Tx>(
block: &FullBlockInformation,
tx: &mut Tx,
@@ -97,28 +117,34 @@ pub async fn persist_block<Tx>(
where
Tx: NyxdScraperTransaction,
{
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
// SANITY CHECK: make sure the block proposer is present in the validator set
block.ensure_proposer()?;
tx.persist_validators(&block.validators).await?;
let total_gas = match block.transactions.as_ref() {
Some(txs) => crate::helpers::tx_gas_sum(txs),
None => 0,
};
tx.persist_block_data(&block.block, total_gas).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
tx.persist_commits(commit, &block.validators).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
if let Some(validators) = &block.validators {
// SANITY CHECK: make sure the block proposer is present in the validator set
ensure_proposer_present(&block.block.header, validators)?;
tx.persist_validators(validators).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
tx.persist_commits(commit, validators).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
}
// persist txs
tx.persist_txs(&block.transactions).await?;
if let Some(transactions) = &block.transactions {
// persist txs
tx.persist_txs(transactions).await?;
// persist messages (inside the transactions)
tx.persist_messages(&block.transactions).await?;
// persist messages (inside the transactions)
tx.persist_messages(transactions).await?;
}
tx.update_last_processed(block.block.header.height.into())
.await?;
+160
View File
@@ -0,0 +1,160 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::BlockProcessor;
use crate::block_requester::BlockRequester;
use crate::error::ScraperError;
use crate::rpc_client::{RetrievalConfig, RpcClient};
use crate::subscriber::ChainSubscriber;
use crate::{BlockModule, MsgModule, TxModule};
use tokio::sync::mpsc::{channel, unbounded_channel};
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tracing::info;
use url::Url;
pub struct WatcherConfig {
/// Url to the websocket endpoint of a validator, for example `wss://rpc.nymtech.net/websocket`
pub websocket_url: Url,
/// Url to the rpc endpoint of a validator, for example `https://rpc.nymtech.net/`
pub rpc_url: Url,
}
pub struct NyxdWatcherBuilder {
config: WatcherConfig,
block_modules: Vec<Box<dyn BlockModule + Send>>,
tx_modules: Vec<Box<dyn TxModule + Send>>,
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
impl NyxdWatcherBuilder {
pub fn new(config: WatcherConfig) -> Self {
NyxdWatcherBuilder {
config,
block_modules: vec![],
tx_modules: vec![],
msg_modules: vec![],
}
}
#[must_use]
pub fn with_block_module<M: BlockModule + Send + 'static>(mut self, module: M) -> Self {
self.block_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_tx_module<M: TxModule + Send + 'static>(mut self, module: M) -> Self {
self.tx_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_msg_module<M: MsgModule + Send + 'static>(mut self, module: M) -> Self {
self.msg_modules.push(Box::new(module));
self
}
pub async fn build_and_start(self) -> Result<NyxdWatcher, ScraperError> {
// we must have at least something configured to run the watcher
if self.block_modules.is_empty()
&& self.tx_modules.is_empty()
&& self.msg_modules.is_empty()
{
return Err(ScraperError::NoModulesConfigured);
}
let watcher = NyxdWatcher::new();
let rpc_client = RpcClient::new(&self.config.rpc_url)?;
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
// create the tasks
let block_requester = BlockRequester::new(
watcher.cancel_token(),
rpc_client.clone(),
req_rx,
processing_tx.clone(),
);
let mut block_processor =
BlockProcessor::new(watcher.cancel_token(), processing_rx, req_tx, rpc_client)
.with_retrieval_config(RetrievalConfig {
get_validators: false,
get_transactions: true,
get_block_results: false,
});
block_processor.set_block_modules(self.block_modules);
block_processor.set_tx_modules(self.tx_modules);
block_processor.set_msg_modules(self.msg_modules);
let chain_subscriber = ChainSubscriber::new(
&self.config.websocket_url,
watcher.cancel_token(),
watcher.task_tracker.clone(),
processing_tx,
)
.await?;
watcher.start_tasks(block_requester, block_processor, chain_subscriber);
Ok(watcher)
}
}
/// A simpler alternative to the `NyxdScraper` that does not persist any received block information.
/// Instead, it only calls the registered modules on the processed data.
///
/// Furthermore, it also does not retrieve any validator information or detailed block information
pub struct NyxdWatcher {
task_tracker: TaskTracker,
cancel_token: CancellationToken,
}
impl NyxdWatcher {
pub fn builder(config: WatcherConfig) -> NyxdWatcherBuilder {
NyxdWatcherBuilder::new(config)
}
fn new() -> Self {
Self {
task_tracker: TaskTracker::new(),
cancel_token: CancellationToken::new(),
}
}
fn start_tasks(
&self,
mut block_requester: BlockRequester,
mut block_processor: BlockProcessor,
mut chain_subscriber: ChainSubscriber,
) {
self.task_tracker
.spawn(async move { block_requester.run().await });
self.task_tracker
.spawn(async move { block_processor.run().await });
self.task_tracker
.spawn(async move { chain_subscriber.run().await });
self.task_tracker.close();
}
pub async fn stop(self) {
info!("stopping the chain watcher");
assert!(self.task_tracker.is_closed());
self.cancel_token.cancel();
self.task_tracker.wait().await
}
pub fn cancel_token(&self) -> CancellationToken {
self.cancel_token.clone()
}
pub fn is_cancelled(&self) -> bool {
self.cancel_token.is_cancelled()
}
}

Some files were not shown because too many files have changed in this diff Show More