Compare commits

...

186 Commits

Author SHA1 Message Date
mfahampshire 0f8a8ddf7e Trim obvious comments, add architecture.md stub 2026-03-17 15:56:04 +00:00
mfahampshire 3c92ce60ca sdk: remove superseded stream_wrapper module 2026-03-17 15:29:34 +00:00
mfahampshire 846dbba363 sdk: add ipr_wrapper module with IpMixStream
- IpMixStream wraps MixnetStream for IPR tunnel over mixnet
- LP Stream framing handled automatically by MixnetStream
- Gateway discovery, connect handshake, IP packet send/receive
2026-03-17 15:29:07 +00:00
mfahampshire 94ab9d5466 IPR: support LP Stream-framed client connections
- Detect and route LP Stream frames in mixnet_listener
- Wrap inline responses in LP Stream frames
- Thread stream_id to ConnectedClientHandler for TUN responses
2026-03-17 15:28:39 +00:00
mfahampshire c78d942383 Replace MixnetStream with LP framing
- Replace custom header with LpFrameHeader
- Added sequence number for message ordering
2026-03-17 12:03:12 +00:00
mfahampshire 0b6166d20e Add LpFrameKind::Stream variant with StreamFrameAttributes
- Define LP wire format for stream multiplexing
- Handle new variant in entry gateway match arm
2026-03-17 12:02:12 +00:00
mfahampshire 6384467526 Reset rebase contamination: restore develop state for shared code
Mass-reset ~50 files that were accidentally modified during rebase
(PollSender/InputMessageCodec/&mut self changes from old experimental
commits). Disable stream_wrapper module (will be rebuilt on MixnetStream
+ LP frame envelope). Remove IpMixStream refs from ip_packet_client
helpers temporarily.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 16:15:40 +00:00
mfahampshire fdd3823585 clean 2026-03-16 15:35:30 +00:00
mfahampshire 892a3bd826 add scratch notes to gitignore 2026-03-16 15:35:29 +00:00
mfahampshire 59ff7d6588 comment 2026-03-16 15:35:29 +00:00
mfahampshire 20c4553bca clippy 2026-03-16 15:35:29 +00:00
mfahampshire 4c38481c36 Fix env discovery 2026-03-16 15:35:29 +00:00
mfahampshire 07680db2c7 mod ignore 2026-03-16 15:35:29 +00:00
mfahampshire 59cbce50f7 mod logging to with poisoning retry 2026-03-16 15:35:29 +00:00
mfahampshire ac13ddbda8 remove unnecessary logging from unit tests 2026-03-16 15:35:29 +00:00
mfahampshire 67803930b6 dont always run dns ping tests 2026-03-16 15:35:29 +00:00
mfahampshire 7052e2e902 fix ipmixstream new() 2026-03-16 15:35:28 +00:00
mfahampshire cccfa76336 clippy 2026-03-16 15:35:28 +00:00
mfahampshire a946336e67 add missing network env 2026-03-16 15:35:28 +00:00
mfahampshire e5836bc1cb fmt 2026-03-16 15:35:28 +00:00
mfahampshire f12108a7db clippy 2026-03-16 15:35:28 +00:00
mfahampshire 70bdbce23f Clippy 2026-03-16 15:35:28 +00:00
mfahampshire e6f9b551ed clippy 2026-03-16 15:35:28 +00:00
mfahampshire fcfa0b604e rustfmt 2026-03-16 15:35:28 +00:00
mfahampshire 8b086e0239 Fix useragent 2026-03-16 15:35:27 +00:00
mfahampshire 6c76834b6c Example code 2026-03-16 15:35:27 +00:00
mfahampshire 071589237b Add network_envs 2026-03-16 15:35:27 +00:00
mfahampshire 771ee10ba2 Update inline examples 2026-03-16 15:35:27 +00:00
mfahampshire 33ce05a3df Add bootstrap network config 2026-03-16 15:35:27 +00:00
mfahampshire 73016ed687 Docs first pass 2026-03-16 15:35:27 +00:00
mfahampshire 8a5205ac4c Include err for no surb tag or peer 2026-03-16 15:35:27 +00:00
mfahampshire aaa7e317bf Fix Mixstream::new() with new configurable network 2026-03-16 15:35:27 +00:00
mfahampshire f28c49e9d6 Update docs + make network configurable 2026-03-16 15:35:27 +00:00
mfahampshire e2ceaf48ed new message borrow 2026-03-16 15:35:27 +00:00
mfahampshire 3e2137a33e First pass rework to bytes in bytes out 2026-03-16 15:35:26 +00:00
mfahampshire 984fa065e3 remove .expect()s and add some encode and decode tests 2026-03-16 15:35:26 +00:00
mfahampshire da46ea7485 remove unused connction type enum 2026-03-16 15:35:26 +00:00
mfahampshire b1bc359806 Fix double copy + deserialisation -> none loop 2026-03-16 15:35:26 +00:00
mfahampshire b338644620 Switch frm default bincode in nymsphinx 2026-03-16 15:35:26 +00:00
mfahampshire 1ec0bf868b use make_bincode_serializer instead of bincode default in client-core 2026-03-16 15:35:26 +00:00
mfahampshire 07842661b9 properly fail on version checks 2026-03-16 15:35:26 +00:00
mfahampshire 0cd4dd5747 Removed unneccesary panics with self.peer_surb_tag 2026-03-16 15:35:26 +00:00
Jędrzej Stuczyński abdd960b20 removed dependency on nym-gateway-directory 2026-03-16 15:35:25 +00:00
mfahampshire db2f3bff05 Use workspace import for mixtcp rustls 2026-03-16 15:35:14 +00:00
mfahampshire be56c79106 remove commented out code 2026-03-16 15:34:54 +00:00
mfahampshire 3ccfbee834 add doc info for other sdk modules 2026-03-16 15:34:53 +00:00
mfahampshire 942ab3c8e8 follow convention for to_v2_bytes 2026-03-16 15:34:53 +00:00
mfahampshire 9ec937dd30 fix comment and duplication in root cargo 2026-03-16 15:34:53 +00:00
mfahampshire 6ccc4a988a use workspace base64 version 2026-03-16 15:34:44 +00:00
mfahampshire 27890eb1a3 remove external patch 2026-03-16 15:34:43 +00:00
mfahampshire fa327a1b2a add license to mixtcp cargo 2026-03-16 15:34:43 +00:00
mfahampshire cea66c1237 edition matches workspace 2026-03-16 15:34:43 +00:00
mfahampshire 757a89c5d7 clippy 2026-03-16 15:34:43 +00:00
mfahampshire 1e3f531e15 remove last nym vpn api deps 2026-03-16 15:34:43 +00:00
mfahampshire 7cc33d8df7 remove nymvpnapi - always use http fallback 2026-03-16 15:34:43 +00:00
mfahampshire 1bd0bfeee1 temp before big mod 2026-03-16 15:34:43 +00:00
mfahampshire f297af2a8c cont removing unnecessary types 2026-03-16 15:34:43 +00:00
mfahampshire d9190e5899 remove unused 2026-03-16 15:34:43 +00:00
mfahampshire a562812ad9 added stream module to mixnet readme 2026-03-16 15:34:43 +00:00
mfahampshire 7368692629 remove external dep on nymvpn repo in sdk 2026-03-16 15:34:42 +00:00
mfahampshire c185f485a7 lint 2026-03-16 15:34:42 +00:00
mfahampshire 6930968e88 lock 2026-03-16 15:34:42 +00:00
mfahampshire 8294191913 remove old commented out imports 2026-03-16 15:34:42 +00:00
mfahampshire 9b2fb45270 temp get rid of logging for ci again 2026-03-16 15:34:42 +00:00
mfahampshire cb8747abb8 temp get rid of logging for ci 2026-03-16 15:34:42 +00:00
mfahampshire 47d37d8aed clippy 2026-03-16 15:34:42 +00:00
mfahampshire d452932b18 clippy 2026-03-16 15:34:42 +00:00
mfahampshire 702dfdc927 clippy warnings: remove 2026-03-16 15:34:30 +00:00
mfahampshire 18e8dfe394 Fix FFI shared lib 2026-03-16 15:34:29 +00:00
mfahampshire 0208a84b77 Mod to mixnet client mutability from traits elsewhere 2026-03-16 15:34:29 +00:00
mfahampshire 7105bbf4b4 Add RwLock to wasm client helper 2026-03-16 15:34:11 +00:00
mfahampshire 39692502df remove accidental import from merge 2026-03-16 15:34:10 +00:00
mfahampshire fcefa079b0 reintroduce import 2026-03-16 15:34:10 +00:00
mfahampshire 371422f27b lint 2026-03-16 15:34:10 +00:00
mfahampshire 5541f242ff smol mixtcp readme 2026-03-16 15:34:10 +00:00
mfahampshire 348e93dd70 rename smolmix - mixtcp 2026-03-16 15:34:10 +00:00
mfahampshire 7f8b7eea8c strip down commenting that is triggering compiler err 2026-03-16 15:34:10 +00:00
mfahampshire 8760c40d46 info -> debug logging for serialised bytes written by stream_wrapper 2026-03-16 15:34:10 +00:00
mfahampshire 8ae4b8fee2 Move files to examples + split examples apart 2026-03-16 15:34:10 +00:00
mfahampshire 4f4885fe50 Remove unused imports 2026-03-16 15:34:09 +00:00
mfahampshire bc52db53b7 remove comments and unused imports 2026-03-16 15:34:09 +00:00
mfahampshire 08d49a6f2e remove unwraps in place of error types 2026-03-16 15:34:09 +00:00
mfahampshire 6f53192dbf deprecate notice for tcpproxy module 2026-03-16 15:34:09 +00:00
mfahampshire b5afb77f19 Clean up unused imports 2026-03-16 15:34:09 +00:00
mfahampshire 29714dea76 Fix gw directory api change in ipr wrapper 2026-03-16 15:34:09 +00:00
mfahampshire 8fd9cee189 almost sorted new version gw dir 2026-03-16 15:34:09 +00:00
mfahampshire 2b4a11e273 linting 2026-03-16 15:34:09 +00:00
mfahampshire a58b32703c add missed stuff from rebase 2026-03-16 15:34:09 +00:00
mfahampshire de80b4ce48 Made explicit error types 2026-03-16 15:34:08 +00:00
mfahampshire 85a3b25be9 Fix logging in tests 2026-03-16 15:33:44 +00:00
mfahampshire 708bd71a56 framing > byte buffer 2026-03-16 15:33:44 +00:00
mfahampshire 40b886e0bd Fix inverted buffer slice logic 2026-03-16 15:33:44 +00:00
mfahampshire 23c1c4bdac Tests + getting reuable client in new() for speedup 2026-03-16 15:33:44 +00:00
mfahampshire 2dd8707725 rough first reqwest client poc 2026-03-16 15:33:44 +00:00
mfahampshire 0bb3c4b2bf remove clunky old examples in place of unit tests 2026-03-16 15:33:43 +00:00
mfahampshire 72e8180abe TLS first version 2026-03-16 15:33:43 +00:00
mfahampshire 2d5b1d577c update readme with new logging 2026-03-16 15:33:43 +00:00
mfahampshire b5e45040ca change logging for nym provider 2026-03-16 15:33:43 +00:00
mfahampshire e420081512 remove old note 2026-03-16 15:33:43 +00:00
mfahampshire 0da4ee985b smolmix device + example 2026-03-16 15:33:43 +00:00
mfahampshire 6d8cacc900 commenting 2026-03-16 15:33:43 +00:00
mfahampshire 49543fcd98 export extra types from ipmixstream 2026-03-16 15:33:43 +00:00
mfahampshire 7b80716c9a split ipmixstream + tests 2026-03-16 15:33:43 +00:00
mfahampshire a4a48c60ae err handling on surb send between split 2026-03-16 15:33:42 +00:00
mfahampshire e027b5a1fe removed IpMixSocket; was a bit unnecessary given connection logic 2026-03-16 15:33:42 +00:00
mfahampshire 723df5584e Remove unnecessary MixnetClient from IpSocket: streamlining 2026-03-16 15:33:42 +00:00
mfahampshire 2ca5155748 more comments 2026-03-16 15:33:42 +00:00
mfahampshire 4f0cc58a11 commenting 2026-03-16 15:33:42 +00:00
mfahampshire 2ccdfedd65 commenting format change + comment out logging in test 2026-03-16 15:33:42 +00:00
mfahampshire d7ddb7592c comment out logging in test 2026-03-16 15:33:42 +00:00
mfahampshire 7371ce3e36 * got ipr pings working with stream_wrapper_ipr
* updated stream_wrapper with debug methods
2026-03-16 15:33:42 +00:00
mfahampshire cd7bb9931e pull in + mod nym-gateway 2026-03-16 15:33:24 +00:00
mfahampshire b77dbdd87e * pulled in helpers from various files
* added readme to explain
2026-03-16 15:33:23 +00:00
mfahampshire 83dcf3fd13 got ipr wrapper connected 2026-03-16 15:33:23 +00:00
mfahampshire a5c6e9d0e2 mod ip_packet_client 2026-03-16 15:33:23 +00:00
mfahampshire a417411184 out of dependency hell 2026-03-16 15:33:23 +00:00
mfahampshire 24d5e4aba9 removed circular dep from gateway-directory 2026-03-16 15:33:23 +00:00
mfahampshire 6cb2fc8445 before directory modification 2026-03-16 15:33:23 +00:00
mfahampshire 4ea2c3beb3 temp commit: got gateway dir dependency working, moving on to vpn-api-client 2026-03-16 15:33:23 +00:00
mfahampshire be8c1191f3 commit before messing with reexport stuff 2026-03-16 15:32:53 +00:00
mfahampshire d969979c8c reorg 2026-03-16 15:32:53 +00:00
mfahampshire c6fd3c8527 added surbs to split r/w + some streamlining + comments + tests 2026-03-16 15:32:53 +00:00
mfahampshire 6ac4d93909 adding surbs + anon reply functionality 2026-03-16 15:32:53 +00:00
mfahampshire 197a7eaec8 make inputmessage anonymous type over simple 2026-03-16 15:32:53 +00:00
mfahampshire f598ee2916 first full pass @ stream + split wrappers 2026-03-16 15:32:53 +00:00
mfahampshire b2fa6cdf8f temp 2026-03-16 15:32:53 +00:00
mfahampshire 97dbef155d initial pass streamwrapper 2026-03-16 15:32:52 +00:00
durch 9dbd91d93e Address part of PR comments 2026-03-16 15:32:52 +00:00
durch 7914cbdbb7 fmt 2026-03-16 15:32:52 +00:00
durch 99febfb3aa Log decoding error 2026-03-16 15:32:52 +00:00
durch 2b00188983 Cleanup prints 2026-03-16 15:32:52 +00:00
durch 82f270329f Update IPR sig 2026-03-16 15:32:52 +00:00
mfahampshire 3cb17e76bd tweaks to tcpproxy example 2026-03-16 15:32:52 +00:00
durch 7b2f8a4ed1 WASM changes 2026-03-16 15:32:40 +00:00
durch 438e745cb3 AsyncWrite 2026-03-16 15:32:40 +00:00
mfahampshire 674fd511f4 remove double asyncwrite 2026-03-16 15:32:40 +00:00
durch 66d85a7c0d Use tokio AsyncRead 2026-03-16 15:32:39 +00:00
durch d12a5d754a ReconstructedMessageCodec 2026-03-16 15:32:21 +00:00
Drazen 3a78d62240 InputMessageCodec, Serde for MixPacket 2026-03-16 15:32:21 +00:00
mfahampshire 5e651b55fc minor changes with new files / fixes 2026-03-16 15:32:05 +00:00
Drazen 8a6bf4a03d Use Sink always 2026-03-16 15:32:04 +00:00
mfahampshire 6a2f1a67ed temp 2026-03-16 15:31:39 +00:00
Drazen d56ab91a2e Switch to PollSender 2026-03-16 15:31:33 +00:00
durch 8f670f467b AsyncRead for MixnetClient 2026-03-16 15:31:08 +00:00
durch d013168823 serde for ReconstructedMessage 2026-03-16 15:29:35 +00:00
dynco-nym 8dc3ba4ec3 Add LP to NS UI (#6562)
* Add LP column to gateway view

* Add LP to graphs
2026-03-16 14:07:19 +01:00
Lawrence Stalder 712e3f5183 Change runner from ubuntu-latest to arc-linux-latest 2026-03-13 14:25:33 +01:00
Lawrence Stalder 5229df47ab Change runner to arc-linux-latest for SonarQube job 2026-03-13 14:24:04 +01:00
Lawrence Stalder 32cffed36b Change runner from ubuntu-latest to arc-linux-latest 2026-03-13 14:16:42 +01:00
Jędrzej Stuczyński 49c710e651 feat: nyxd watcher (#6561)
* removed explicit storage_tx within MsgModule, TxModule and BlockModule impls

* created a NyxdWatcher that does not persist processed block info

* removed unused imports
2026-03-13 13:15:36 +00:00
Lawrence Stalder 0a5227a894 Remove cron schedule from nightly-build workflow
Removed scheduled cron job from nightly build workflow.
2026-03-13 14:01:02 +01:00
mfahampshire b231eb4f04 Max/asyncread asyncwrite nym client (#6318)
* Remove AsyncRead/Write traits from native client - moving them to
stream/

* Substream model first push

* Update / add examples

* Update lockfile

* Clippy

* clippy examples

* remove codecs

* Remove unused bincode setup

* Revert a lot of changes when SDK client itself implemented
AsyncRead/Write

* Remove unnecessary mut

* Use local PollSender in MixnetStream instead of client_input.input_sender

Now that client-core's input_sender is back to mpsc::Sender (reverted
PollSender migration), MixnetStream creates its own PollSender wrapper
for the AsyncWrite impl's poll_ready/start_send calls.

* Remove now-unnecessary parameter

* Clippy

* Cleanup more stragglers from previous setup (Async traits on
MixnetClient)

* Rename files (remove module inception)

* - Shrink StreamId from 16 bytes to u64, add version byte to wire format
  - Introduce MixStreamHeader/MixStreamFrame structs for decode
  - Replace StreamMap type alias with struct using tokio::sync::Mutex
  - Add StreamMap helper methods, eliminate lock().expect() panics
  - Rename stream.rs -> mixnet_stream.rs to avoid module inception
  - Document irrevocable stream mode, add LP integration TODO

* - Remove dummy channel
- Add err variant for reciever alredy taken
- Remove panics

* add timeout to stream

* clippy
2026-03-13 09:40:45 +00:00
mfahampshire fdd2c8fac2 update nymvpn cli docs (#6559)
* update nymvpn cli docs

* update nymvpn cli docs again
2026-03-12 16:32:39 +00:00
Jędrzej Stuczyński e2dd8ac743 feat: localnet v2 (#6277)
* squashing localnet-v2 commits (again)

cargo fmt

fixes to localnet purge

provide path in the error message

output args

log failed exec

print based on tty

check-prerequisites cmd

checked iptables update

basic kernel features check

enable ipv6 rules

add forwarding rules

squashing localnet-v2 commits

additional changes

propagate custom-dns flag to all run containers

remove is_mock from EcashManager

another localnet squash

unused import

chore: remove redundant testnet manager

missing impl

additional linux fixes

command to rebuild container image

wait for at least 2 blocks

additional node startup fixes

added --custom-dns flag to nym node setup

add gateway probe + wait for DKG magic file

fixed localnet down on linux

container ls

re-enable state resync

additional feature locking

macos adjustments

working nyxd startup on linux

wip linux box

wip

separating network inspect betweewn macos and linux

initial linux feature locking

moved all container commands into a single location

finally working initial node performance

squashing orchestrator commits

cleanup

fixed condition for naive rearrangement

added cache of cosmwasm contracts for speed up on subsequent runs

'down' command

refreshing described cache after nodes are bonded

nym nodes setup + wip on nym api refresh

nodes setup WIP

first pass cleanup

placeholder for nym-node setup

bypassing the dkg

further progress on nym-api setup

wip: api setup

up/down/purge placeholders

persisting contract setup data

fix contract upload by forcing amd64 container platform

wip: contracts setup4

wip: contracts setup3

wip: contracts setup2

wip: contracts setup

include network setup

init and spawn nyxd

build nyxd image in dedicated orchestrator

build nyxd image

squashed cherry-picked lp changes

Bits and bobs to make everything work

Title

MacOS setup instructions

Docker/Container localnet

* clippy

* fixes on non-unix targets

---------

Co-authored-by: durch <durch@users.noreply.github.com>
2026-03-12 14:46:00 +00:00
import this 8001fa7f40 [HOTFIX/DOCs]: Get Vercel deployment to work (#6557)
* try rebuild

* update package.json
2026-03-12 13:53:04 +00:00
dynco-nym 80370b98ec Additional ticket for agent (#6551)
* Additional ticket type for LP tests

* Remove hardcoded comments

* bump cargo version

* Nuke fallback edge case in the probe

* Cleanup unused code

* Bump API & agent versions
- agent bump required due to probe changes
2026-03-12 14:49:03 +01:00
import this 3524089ad8 [DOCs/operators]: Release notes for v2026.5 raclette (#6556)
* update changelog

* bump up versions

* bump up stats

* update stats

* rephrase probe info
2026-03-12 13:05:25 +00:00
mfahampshire ec7ee49282 Version bump (#6553)
* Version bump

* update docs dep
2026-03-12 10:40:07 +00:00
import this 653d1c2dea [NTM]: Open ports according to NIP-8 and NIP-9 (#6545)
* add nip-9 to NTM

* update ntm nip 8

* fix symbol syntax
2026-03-12 10:10:50 +00:00
mfahampshire b579f987b1 Max/mixfetch concurrentcy tweak (#6539)
* Remove debug connect logging

* Add random suffix to addressmapping for concurrent outgoing requests to
same endpoint

* Comment + renaming + pulling apart of mapping key & URL.

* Add certs file + remove hardcoding + add certs script

* Add cleanbuild helper script

* Update DEVELOPERS.md

* Add cleanbuild script info to DEVELOPERS.md

* Remove notice about blocking on concurrent same endpoint reqs
2026-03-11 18:42:07 +00:00
Jędrzej Stuczyński 59254c92c3 bugfix: make sure to use old values from metrics debug config during v12 migration (#6546) (#6547) 2026-03-11 08:33:53 +00:00
Simon Wicky 69887921cc typo (#6543) 2026-03-10 16:27:02 +01:00
import this e075b07632 Hotfix: Add a missing commit with an ansible role (#6542)
* Create ansible playbook for trimming and rotationg logs

* add docs for triming and log rotation

* update ansible docs

* add info on logic

* cleanup the cleanup guide

* update scraped stats

* ready for review

* address review

* add main default values
2026-03-10 14:23:02 +00:00
import this d32b680351 Server Ansible maintenance & documentation (#6514)
* Create ansible playbook for trimming and rotationg logs

* add docs for triming and log rotation

* update ansible docs

* add info on logic

* cleanup the cleanup guide

* update scraped stats

* ready for review

* address review
2026-03-10 13:28:39 +00:00
Simon Wicky fcd59a19be rng changes for a Send variant (#6541) 2026-03-10 13:43:49 +01:00
dynco-nym 08b20ac2ab Add LP fields (#6535)
* Add lp field to /dvpn/gateways

* Expand unit tests

* Add lp ports, keys, hashes

* Include the whole struct

* Update Toml version
2026-03-10 13:06:56 +01:00
Jędrzej Stuczyński 4c007669f9 chore: update ts-rs dep (#6517) 2026-03-10 11:51:30 +00:00
benedetta davico c3a8fa8d0d Merge pull request #6536 from nymtech/release/2026.5-raclette
Raclette to develop
2026-03-10 12:06:56 +01:00
Simon Wicky d8769157fd enable LP registration in registration client (#6534) 2026-03-10 11:35:48 +01:00
benedettadavico 7cccf3cfff update changelog 2026-03-10 10:46:12 +01:00
Jędrzej Stuczyński 02eec164f8 bugfix: lp information to have proper snake_case on API endpoints (#6531) 2026-03-09 14:56:31 +00:00
Jędrzej Stuczyński 4f13ab1e0a Merge pull request #6532 from nymtech/chore/reg-metrics
chore: introduce additional prometheus metrics for registration times
2026-03-09 14:56:18 +00:00
benedetta davico a34c7ef19f Merge pull request #6533 from nymtech/bugfix/lp-gateway-probe
bugfix: correctly populate gateway probe LP data
2026-03-09 15:55:00 +01:00
Jędrzej Stuczyński f00b18298c bugfix: correctly populate gateway probe LP data 2026-03-09 14:10:24 +00:00
Jędrzej Stuczyński 0426adc94e chore: introduce additional prometheus metrics for registration times 2026-03-09 13:50:46 +00:00
Jędrzej Stuczyński 4b4a2fe387 Merge pull request #6530 from nymtech/chore/rename-lp-message
chore: rename LpMessage to LpFrame
2026-03-09 13:44:39 +00:00
Jędrzej Stuczyński 1ebb7e06c7 chore: rename LpMessage to LpFrame 2026-03-09 13:21:39 +00:00
Jędrzej Stuczyński 1fd17c5cb3 Merge pull request #6526 from nymtech/chore/lp-improvements
chore: LP improvements
2026-03-09 10:57:26 +00:00
Jędrzej Stuczyński ef65cf4c9e additional adjustments 2026-03-06 16:34:42 +00:00
Jędrzej Stuczyński 48dad0f16b bugfix: setting correct LpPeerConfig during handshake 2026-03-06 16:09:28 +00:00
Jędrzej Stuczyński 93ac638765 importing over changes from 'lp/persistent-node-connection' 2026-03-06 16:07:35 +00:00
Jędrzej Stuczyński c6589ca92c chore: add unit test for mutual KKT 2026-03-06 15:40:13 +00:00
Sachin Kamath 03d5a87826 Merge pull request #6525 from nymtech/readme-midnight-attribution
chore: add midnight attribution
2026-03-06 16:47:16 +05:30
Sachin Kamath 512cfd1b74 chore: add midnight attribution 2026-03-06 16:40:45 +05:30
Bogdan-Ștefan Neacşu ba0625cd97 Remove dep leak of strum iterator (#6522)
strum iterator over an enum leaks the version needed to iterate over it,
which can cause problems to dependent crates that use a different strum
version.

While at it, bump the strum crates as well
2026-03-06 10:44:14 +02:00
mfahampshire a2c489dc5b Max/sitemap generation fix (#6515)
* Tweak README ordering

* Linting

* Add sitemap generation + NEXT env var to CI

* Update domain for sitemap generation

* Inc. sitemap -0

* test remove lockfile

* fix borked name in package

* add redoc

* add framer

* Add pnpm-lock file

* Add sitemap to remote + ci workflow

* remove extra sitemap

* remove static files from remote for vercel

* add sitemap gen to next build step for vercel
2026-03-04 16:01:51 +00:00
384 changed files with 35069 additions and 14415 deletions
+1 -1
View File
@@ -7,7 +7,7 @@ jobs:
build:
runs-on: arc-ubuntu-22.04
env:
NEXT_PUBLIC_SITE_URL: https://nymtech.net/docs
NEXT_PUBLIC_SITE_URL: https://nym.com/docs
defaults:
run:
working-directory: documentation/docs
+2
View File
@@ -48,6 +48,8 @@ jobs:
run: pnpm i
- name: Build project
run: pnpm run build
- name: Generate sitemap
run: npx next-sitemap
- name: Move files to /dist/
run: ../scripts/move-to-dist.sh
+1 -1
View File
@@ -8,7 +8,7 @@ on:
jobs:
sonarqube:
name: SonarQube
runs-on: ubuntu-latest
runs-on: arc-linux-latest
steps:
- uses: actions/checkout@v6
with:
-2
View File
@@ -2,8 +2,6 @@ name: nightly-build
on:
workflow_dispatch:
schedule:
- cron: '14 1 * * *'
jobs:
build:
@@ -9,7 +9,7 @@ on:
jobs:
integration-tests:
runs-on: ubuntu-latest
runs-on: arc-linux-latest
env:
API_BASE_URL: http://localhost:8000
+1 -1
View File
@@ -23,7 +23,7 @@ env:
jobs:
check-milestone:
name: Check Milestone
runs-on: ubuntu-latest
runs-on: arc-linux-latest
steps:
- if: github.event.pull_request.milestone == null && contains( env.LABELS, 'no-milestone' ) == false
run: exit 1
+3
View File
@@ -36,6 +36,9 @@ jobs:
with:
go-version: "1.24.6"
- name: Update root CA certificate bundle
run: ./wasm/mix-fetch/go-mix-conn/scripts/update-root-certs.sh
- name: Install dependencies
run: yarn
+1
View File
@@ -46,6 +46,7 @@ storybook-static
**/.DS_Store
cpu-cycles/libcpucycles/build
foxyfox.env
scratch.txt
.next
ppa-private-key.b64
+46
View File
@@ -4,6 +4,52 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2026.5-raclette] (2026-03-10)
- bugfix: correctly populate gateway probe LP data ([#6533])
- chore: introduce additional prometheus metrics for registration times ([#6532])
- bugfix: lp information to have proper snake_case on API endpoints ([#6531])
- removed redundant LP states ([#6509])
- chore: removed all matrix notifications from github actions ([#6495])
- feat: Lewes Protocol with PSQv2 ([#6491])
- build(deps): bump minimatch from 3.1.2 to 3.1.4 in /documentation/docs ([#6486])
- build(deps): bump bn.js from 4.12.2 to 4.12.3 in /documentation/docs ([#6484])
- build(deps): bump bn.js from 4.12.2 to 4.12.3 ([#6483])
- build(deps): bump ajv from 8.17.1 to 8.18.0 in /clients/native/examples/js-examples/websocket ([#6478])
- build(deps): bump ajv from 6.12.6 to 6.14.0 in /documentation/docs ([#6477])
- build(deps): bump minimatch and glob in /documentation/scripts/post-process ([#6476])
- build(deps): bump hono from 4.11.9 to 4.12.0 ([#6475])
- build(deps): bump keccak from 0.1.5 to 0.1.6 ([#6472])
- build(deps-dev): bump qs from 6.14.1 to 6.14.2 in /clients/native/examples/js-examples/websocket ([#6466])
- build(deps): bump mikefarah/yq from 4.52.2 to 4.52.4 ([#6465])
- Otel minimal v2 ([#6464])
- build(deps): bump qs and express in /wasm/client/internal-dev ([#6461])
- bugfix: restore 'latest_measurement' field for nym-node /verloc endpoint ([#6452])
- build(deps-dev): bump webpack from 5.77.0 to 5.104.1 in /wasm/node-tester/internal-dev ([#6451])
- Max/mixfetch concurrent test ([#6417])
[#6533]: https://github.com/nymtech/nym/pull/6533
[#6532]: https://github.com/nymtech/nym/pull/6532
[#6531]: https://github.com/nymtech/nym/pull/6531
[#6509]: https://github.com/nymtech/nym/pull/6509
[#6495]: https://github.com/nymtech/nym/pull/6495
[#6491]: https://github.com/nymtech/nym/pull/6491
[#6486]: https://github.com/nymtech/nym/pull/6486
[#6484]: https://github.com/nymtech/nym/pull/6484
[#6483]: https://github.com/nymtech/nym/pull/6483
[#6478]: https://github.com/nymtech/nym/pull/6478
[#6477]: https://github.com/nymtech/nym/pull/6477
[#6476]: https://github.com/nymtech/nym/pull/6476
[#6475]: https://github.com/nymtech/nym/pull/6475
[#6472]: https://github.com/nymtech/nym/pull/6472
[#6466]: https://github.com/nymtech/nym/pull/6466
[#6465]: https://github.com/nymtech/nym/pull/6465
[#6464]: https://github.com/nymtech/nym/pull/6464
[#6461]: https://github.com/nymtech/nym/pull/6461
[#6452]: https://github.com/nymtech/nym/pull/6452
[#6451]: https://github.com/nymtech/nym/pull/6451
[#6417]: https://github.com/nymtech/nym/pull/6417
## [2026.4-quark] (2026-02-24)
- Enhance CI workflow with feature inputs ([#6462])
Generated
+1945 -1410
View File
File diff suppressed because it is too large Load Diff
+12 -8
View File
@@ -157,8 +157,8 @@ members = [
"tools/internal/mixnet-connectivity-check",
# "tools/internal/sdk-version-bump",
"tools/internal/ssl-inject",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/internal/localnet-orchestrator",
"tools/internal/localnet-orchestrator/dkg-bypass-contract",
"tools/internal/validator-status-check",
"tools/nym-cli",
"tools/nym-id-cli",
@@ -171,9 +171,10 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
# "nym-gateway-probe",
"nym-gateway-probe",
"integration-tests",
"common/nym-kkt-ciphersuite", "common/nym-kkt-context",
"common/nym-kkt-ciphersuite",
"common/nym-kkt-context",
]
default-members = [
@@ -190,7 +191,8 @@ default-members = [
"service-providers/ip-packet-router",
"service-providers/network-requester",
"tools/nymvisor",
"nym-registration-client"
"nym-registration-client",
"tools/internal/localnet-orchestrator"
]
exclude = ["contracts", "nym-wallet", "cpu-cycles"]
@@ -232,6 +234,7 @@ bloomfilter = "3.0.1"
bs58 = "0.5.1"
bytecodec = "0.4.15"
bytes = "1.11.1"
cargo-edit = "0.13.8"
cargo_metadata = "0.19.2"
celes = "2.6.0"
cfg-if = "1.0.0"
@@ -347,8 +350,8 @@ si-scale = "0.2.3"
snow = "0.9.6"
sphinx-packet = "=0.6.0"
sqlx = "0.8.6"
strum = "0.27.2"
strum_macros = "0.27.2"
strum = "0.28.0"
strum_macros = "0.28.0"
subtle-encoding = "0.5"
syn = "2"
sysinfo = "0.37.0"
@@ -375,7 +378,7 @@ tracing-opentelemetry = "0.32.1"
tracing-subscriber = "0.3.20"
tracing-indicatif = "0.3.9"
tracing-test = "0.2.5"
ts-rs = "10.1.0"
ts-rs = "12.0.1"
tungstenite = { version = "0.20.1", default-features = false }
typed-builder = "0.23.0"
uniffi = "0.29.2"
@@ -438,6 +441,7 @@ nym-ecash-time = { version = "1.20.4", path = "common/ecash-time" }
nym-exit-policy = { version = "1.20.4", path = "common/exit-policy" }
nym-ffi-shared = { version = "1.20.4", path = "sdk/ffi/shared" }
nym-gateway-client = { version = "1.20.4", path = "common/client-libs/gateway-client", default-features = false }
nym-gateway-probe = { version = "1.18.0", path = "nym-gateway-probe" }
nym-gateway-requests = { version = "1.20.4", path = "common/gateway-requests" }
nym-gateway-storage = { version = "1.20.4", path = "common/gateway-storage" }
nym-gateway-stats-storage = { version = "1.20.4", path = "common/gateway-stats-storage" }
+3
View File
@@ -30,8 +30,11 @@ client ───► Gateway ──┘ mix │ mix ┌─►mix ───►
```
<!-- This is broken
[![Build Status](https://img.shields.io/github/actions/workflow/status/nymtech/nym/build.yml?branch=develop&style=for-the-badge&logo=github-actions)](https://github.com/nymtech/nym/actions?query=branch%3Adevelop)
-->
> This project integrates with the Midnight Network
### Building
+89 -13
View File
@@ -1,32 +1,38 @@
---
ansible_ssh_private_key_file: ~/.ssh/<SSH_KEY>
# nym_version: "v2025.21-mozzarella"
#
# NOTE:
# if you want to pin Nym to a specific version instead of using the
# latest release from GitHub in /tasks/main.yml then
# uncomment the line above and set the tag
cli_url: "https://github.com/nymtech/nym/releases/download/nym-binaries-{{ nym_version }}/nym-cli"
tunnel_manager_url: "https://github.com/nymtech/nym/raw/refs/heads/develop/scripts/nym-node-setup/network-tunnel-manager.sh"
quic_bridge_deployment_url: "https://raw.githubusercontent.com/nymtech/nym/refs/heads/develop/scripts/nym-node-setup/quic_bridge_deployment.sh"
# NOTE: These values will be used globally unless overwritten per node in inventory/all
###############################################################################
## GLOBAL VARS
## These values will be used globally unless overwritten per node in inventory/all
###############################################################################
ansible_user: root # used for ssh, like `ssh root@nym-exit.ch-1.mynodes.net`
email: "<EMAIL>" # used in certbot, description.toml and landing page
website: "<WEBSITE>" # it is used in the description.toml
description: "<NODE_PUBLIC_DESCRIPTION>" # or define per node in inventory/all
# operator_name: "<OPERATOR_NAME>" # used in landing page if provided
###############################################################################
## GLOBAL VARS
## These values will be used globally unless overwritten per node in inventory/all
## Set these vars only if you want them globally for all nodes
## Per node changes in inventory/all will overwrite these global vars
###############################################################################
# NOTE: Set these vars if you want them globally for all nodes
# Per node changes in inventory/all will overwrite these global ones:
hostname: "" # this is a fallback, keep it and setup hostname per node in inventory/all
# moniker: "<MONIKER>" # if not setup here not in inventory/all it get's derived from the hostname
# mode: <MODE> # entry-gateway/exit-gateway/mixnode
# wireguard_enabled: <WIREGUARD_ENABLED> # true/false
hostname: "" # this is a fallback, keep it and setup hostname per node in inventory/all
###############################################################################
## GLOBAL PACKAGES
## These will be installed during deployment
###############################################################################
# NOTE: Possible vars to incule on landing page, etc.
# operator_name: "<OPERATOR_NAME>"
packages:
- tmux
@@ -42,3 +48,73 @@ packages:
- jq
- wget
- ufw
###############################################################################
## OPTIONAL OVERRIDES
## All values below already have defaults in the playbook/roles
## Uncomment only if you want to override them
###############################################################################
###############################################################################
## SYSTEM MAINTENANCE PLAYBOOK KNOBS
###############################################################################
# nym_version: "v2025.21-mozzarella"
## NOTE:
## if you want to pin Nym to a specific version instead of using the
## latest release from GitHub in /tasks/main.yml then
## uncomment the line above and set the tag
###############################################################################
## SYSTEM MAINTENANCE PLAYBOOK KNOBS
###############################################################################
## JOURNALD LIMITS
# journald_system_max_use: "100M" # max persistent journal size
# journald_runtime_max_use: "50M" # max runtime journal size
# journald_system_max_file_size: "25M" # max single journal file
# journald_runtime_max_file_size: "10M" # max runtime journal file
# journald_max_retention_sec: "3day" # retention time
# journald_rate_limit_interval: "30s" # rate limit window
# journald_rate_limit_burst: "1000" # rate limit burst
## NYM-NODE LOG CONTROL
# nymnode_log_level_max: "warning" # drop INFO logs
# nymnode_rate_limit_interval: "30s" # per nym-node rate limit window
# nymnode_rate_limit_burst: "200" # per nym-node rate limit burst
## JOURNAL VACUUM TARGETS
# journal_vacuum_size: "100M"
# journal_vacuum_time: "3days"
## RSYSLOG
# disable_rsyslog: true
## FSTRIM SCHEDULE
# fstrim_every_calendar: "*:0/15" # Aggressive
# fstrim_every_calendar: "hourly" # Less aggressive
## OPTIONAL CLEANUPS
# enable_apt_cleanup: true
# enable_snap_cleanup: true
## WRITEBACK TUNING
# enable_writeback_tuning: true
# writeback_dirty_writeback_centisecs: 1500
# writeback_dirty_expire_centisecs: 6000
@@ -0,0 +1,38 @@
---
- name: Restrict logging, vacuum journals, and enable periodic trim
hosts: all
become: true
gather_facts: false
# global knobs - override in inventory/group_vars/host_vars as needed
vars:
journald_system_max_use: "100M"
journald_runtime_max_use: "50M"
journald_system_max_file_size: "25M"
journald_runtime_max_file_size: "10M"
journald_max_retention_sec: "3day"
journald_rate_limit_interval: "30s"
journald_rate_limit_burst: "1000"
# per nym-node rate limit + level cap
nymnode_log_level_max: "warning"
nymnode_rate_limit_interval: "30s"
nymnode_rate_limit_burst: "200"
# journal vacuum targets
journal_vacuum_size: "100M"
journal_vacuum_time: "3days"
# fstrim cadence (note: the systemd override uses cron-like calendar)
fstrim_every_calendar: "*:0/15"
roles:
- role: journald_limits
- role: nymnode_logging
- role: rsyslog_disable
- role: journal_vacuum
- role: classic_log_cleanup
- role: apt_cleanup
- role: snap_cleanup
- role: fstrim_15min
- role: report
@@ -0,0 +1,21 @@
---
- name: Clean apt cache
command: apt-get clean
ignore_errors: true
- name: Autoremove unused packages
command: apt-get -y autoremove
ignore_errors: true
- name: Remove apt lists to reclaim space (they will be re-fetched on update)
file:
path: /var/lib/apt/lists
state: absent
ignore_errors: true
- name: Recreate apt lists directory
file:
path: /var/lib/apt/lists
state: directory
mode: "0755"
ignore_errors: true
@@ -0,0 +1,20 @@
---
- name: Remove classic /var/log files if present (optional)
file:
path: "{{ item }}"
state: absent
loop:
- /var/log/syslog
- /var/log/syslog.1
- /var/log/kern.log
- /var/log/kern.log.1
- /var/log/auth.log
- /var/log/auth.log.1
- /var/log/ufw.log
- /var/log/ufw.log.1
ignore_errors: true
# This is best-effort and may still fail if other packages' postrotate scripts assume services exist.
- name: Force logrotate (best-effort)
command: "logrotate --force /etc/logrotate.conf"
ignore_errors: true
@@ -0,0 +1,3 @@
---
fstrim_timer_dropin_dir: "/etc/systemd/system/fstrim.timer.d"
fstrim_every_calendar: "*:0/15"
@@ -0,0 +1,31 @@
---
- name: Ensure systemd drop-in dir for fstrim.timer exists
file:
path: "{{ fstrim_timer_dropin_dir }}"
state: directory
mode: "0755"
- name: Override fstrim.timer schedule
copy:
dest: "{{ fstrim_timer_dropin_dir }}/override.conf"
mode: "0644"
content: |
[Timer]
OnCalendar=
OnCalendar={{ fstrim_every_calendar }}
Persistent=true
RandomizedDelaySec=0
- name: Reload systemd after fstrim override
systemd:
daemon_reload: true
- name: Enable and start fstrim timer
systemd:
name: fstrim.timer
enabled: true
state: started
- name: Run fstrim now (best-effort)
command: fstrim -av
ignore_errors: true
@@ -0,0 +1,3 @@
---
journal_vacuum_size: "100M"
journal_vacuum_time: "3days"
@@ -0,0 +1,6 @@
---
- name: Vacuum journal to size cap (hard)
command: "journalctl --vacuum-size={{ journal_vacuum_size }}"
- name: Vacuum journal older than retention window (time)
command: "journalctl --vacuum-time={{ journal_vacuum_time }}"
@@ -0,0 +1,8 @@
---
journald_system_max_use: "100M"
journald_runtime_max_use: "50M"
journald_system_max_file_size: "25M"
journald_runtime_max_file_size: "10M"
journald_max_retention_sec: "3day"
journald_rate_limit_interval: "30s"
journald_rate_limit_burst: "1000"
@@ -0,0 +1,5 @@
---
- name: Restart journald
systemd:
name: systemd-journald
state: restarted
@@ -0,0 +1,20 @@
---
- name: Configure journald limits (persistent, capped, rate-limited)
copy:
dest: /etc/systemd/journald.conf
mode: "0644"
content: |
[Journal]
Storage=persistent
Compress=yes
Seal=yes
SystemMaxUse={{ journald_system_max_use }}
RuntimeMaxUse={{ journald_runtime_max_use }}
SystemMaxFileSize={{ journald_system_max_file_size }}
RuntimeMaxFileSize={{ journald_runtime_max_file_size }}
MaxRetentionSec={{ journald_max_retention_sec }}
RateLimitIntervalSec={{ journald_rate_limit_interval }}
RateLimitBurst={{ journald_rate_limit_burst }}
notify: Restart journald
@@ -0,0 +1,7 @@
---
nymnode_log_level_max: "warning"
nymnode_rate_limit_interval: "30s"
nymnode_rate_limit_burst: "200"
nymnode_unit_name: "nym-node" # set to "nym-node.service" if your distro expects it
nymnode_dropin_dir: "/etc/systemd/system/nym-node.service.d"
nymnode_dropin_file: "10-logging.conf"
@@ -0,0 +1,26 @@
---
- name: Ensure systemd drop-in dir for nym-node exists
file:
path: "{{ nymnode_dropin_dir }}"
state: directory
mode: "0755"
- name: Cap nym-node logs + apply per-unit rate limiting
copy:
dest: "{{ nymnode_dropin_dir }}/{{ nymnode_dropin_file }}"
mode: "0644"
content: |
[Service]
LogLevelMax={{ nymnode_log_level_max }}
LogRateLimitIntervalSec={{ nymnode_rate_limit_interval }}
LogRateLimitBurst={{ nymnode_rate_limit_burst }}
- name: Reload systemd after nym-node drop-in
systemd:
daemon_reload: true
- name: Restart nym-node to apply new logging limits (best-effort)
systemd:
name: "{{ nymnode_unit_name }}"
state: restarted
ignore_errors: true
@@ -0,0 +1,8 @@
---
- name: Show journal disk usage
command: journalctl --disk-usage
register: journal_usage
changed_when: false
- debug:
var: journal_usage.stdout
@@ -0,0 +1,13 @@
---
- name: Stop/disable rsyslog if installed (best-effort)
systemd:
name: rsyslog
state: stopped
enabled: false
ignore_errors: true
- name: Remove rsyslog logrotate stanza if present (prevents logrotate failures)
file:
path: /etc/logrotate.d/rsyslog
state: absent
ignore_errors: true
@@ -0,0 +1,10 @@
---
- name: Remove disabled snap revisions (best-effort)
shell: |
set -euo pipefail
snap list --all | awk '/disabled/{print $1, $3}' | while read -r name rev; do
snap remove "$name" --revision="$rev" || true
done
args:
executable: /bin/bash
ignore_errors: true
+1 -1
View File
@@ -2,7 +2,7 @@
name = "nym-client-core"
version.workspace = true
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2021"
edition = "2024"
rust-version = "1.85"
license.workspace = true
description = "Crate containing core client functionality and configs, used by all other Nym client implentations"
@@ -32,6 +32,7 @@ const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
const DEFAULT_MAX_STARTUP_GATEWAY_WAITING_PERIOD: Duration = Duration::from_secs(70 * 60); // 70min -> full epoch (1h) + a bit of overhead
const DEFAULT_MAX_STARTUP_TOPOLOGY_WAITING_PERIOD: Duration = Duration::from_secs(70 * 60); // 70min -> full epoch (1h) + a bit of overhead
// Set this to a high value for now, so that we don't risk sporadic timeouts that might cause
// bought bandwidth tokens to not have time to be spent; Once we remove the gateway from the
@@ -555,6 +556,11 @@ pub struct Topology {
#[serde(with = "humantime_serde")]
pub max_startup_gateway_waiting_period: Duration,
/// Defines how long the client is going to wait on startup for minimal topology to become online,
/// before abandoning the procedure.
#[serde(with = "humantime_serde")]
pub max_startup_network_waiting_period: Duration,
/// Specifies a minimum performance of a mixnode that is used on route construction.
/// This setting is only applicable when `NymApi` topology is used.
pub minimum_mixnode_performance: u8,
@@ -583,6 +589,7 @@ impl Default for Topology {
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
max_startup_gateway_waiting_period: DEFAULT_MAX_STARTUP_GATEWAY_WAITING_PERIOD,
max_startup_network_waiting_period: DEFAULT_MAX_STARTUP_TOPOLOGY_WAITING_PERIOD,
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
use_extended_topology: false,
@@ -159,6 +159,7 @@ impl From<ConfigV6> for Config {
use_extended_topology: value.debug.topology.use_extended_topology,
ignore_egress_epoch_role: value.debug.topology.ignore_egress_epoch_role,
ignore_ingress_epoch_role: value.debug.topology.ignore_ingress_epoch_role,
..Default::default()
},
reply_surbs: ReplySurbs {
minimum_reply_surb_storage_threshold: value
@@ -160,7 +160,10 @@ where
)
.await?;
} else {
info!("registered with new gateway {} (under address {address}), but this will not be our default address", gateway_details.gateway_id);
info!(
"registered with new gateway {} (under address {address}), but this will not be our default address",
gateway_details.gateway_id
);
}
Ok(GatewayInfo {
@@ -4,13 +4,13 @@
use super::mix_traffic::ClientRequestSender;
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
use crate::client::event_control::EventControl;
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
use crate::client::real_messages_control;
@@ -52,12 +52,12 @@ use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
use nym_statistics_common::clients::ClientStatsSender;
use nym_statistics_common::generate_client_stats_id;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_task::ShutdownTracker;
use nym_topology::provider_trait::TopologyProvider;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_topology::HardcodedTopologyProvider;
use nym_topology::provider_trait::TopologyProvider;
use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use nym_validator_client::{UserAgent, nyxd::contract_traits::DkgQueryClient};
use rand::prelude::SliceRandom;
use rand::rngs::OsRng;
use rand::thread_rng;
@@ -220,6 +220,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
nym_api_urls: Option<Vec<nym_network_defaults::ApiUrl>>,
wait_for_gateway: bool,
wait_for_initial_topology: bool,
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
shutdown: Option<ShutdownTracker>,
@@ -250,6 +251,7 @@ where
dkg_query_client,
nym_api_urls: None,
wait_for_gateway: false,
wait_for_initial_topology: false,
custom_topology_provider: None,
custom_gateway_transceiver: None,
shutdown: None,
@@ -305,6 +307,12 @@ where
self
}
#[must_use]
pub fn with_wait_for_initial_topology(mut self, wait_for_initial_topology: bool) -> Self {
self.wait_for_initial_topology = wait_for_initial_topology;
self
}
#[must_use]
pub fn with_topology_provider(
mut self,
@@ -674,6 +682,7 @@ where
topology_accessor: TopologyAccessor,
local_gateway: NodeIdentity,
wait_for_gateway: bool,
wait_for_initial_topology: bool,
shutdown_tracker: &ShutdownTracker,
) -> Result<(), ClientCoreError> {
let topology_refresher_config =
@@ -694,6 +703,46 @@ where
tracing::info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
// 1. wait for the minimum topology (if applicable)
if topology_refresher
.ensure_topology_is_routable()
.await
.is_err()
&& wait_for_initial_topology
{
if let Err(err) = topology_refresher
.wait_for_initial_network(topology_config.max_startup_network_waiting_period)
.await
{
tracing::error!(
"the network did not come become online within the specified timeout: {err}"
);
return Err(err.into());
}
}
// 2. wait for our gateway (if applicable)
if topology_refresher
.ensure_contains_routable_egress(local_gateway)
.await
.is_err()
&& wait_for_gateway
{
if let Err(err) = topology_refresher
.wait_for_gateway(
local_gateway,
topology_config.max_startup_gateway_waiting_period,
)
.await
{
tracing::error!(
"the gateway did not come back online within the specified timeout: {err}"
);
return Err(err.into());
}
}
// 3. check if the topology is routable (in case we were NOT waiting for it)
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
tracing::error!(
"The current network topology seem to be insufficient to route any packets through \
@@ -702,30 +751,15 @@ where
return Err(ClientCoreError::InsufficientNetworkTopology(err));
}
let gateway_wait_timeout = if wait_for_gateway {
Some(topology_config.max_startup_gateway_waiting_period)
} else {
None
};
// 4. check if the gateway exists (in case we were NOT waiting for it)
if let Err(err) = topology_refresher
.ensure_contains_routable_egress(local_gateway)
.await
{
if let Some(waiting_timeout) = gateway_wait_timeout {
if let Err(err) = topology_refresher
.wait_for_gateway(local_gateway, waiting_timeout)
.await
{
tracing::error!(
"the gateway did not come back online within the specified timeout: {err}"
);
return Err(err.into());
}
} else {
tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
return Err(err.into());
}
tracing::error!(
"the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}"
);
return Err(err.into());
}
if !topology_config.disable_refreshing {
@@ -1024,6 +1058,7 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
self.wait_for_initial_topology,
&shutdown_tracker.clone(),
)
.await?;
@@ -1195,9 +1230,11 @@ mod tests {
]);
assert_eq!(network_details.nym_api_urls.as_ref().unwrap().len(), 2);
assert!(network_details.nym_api_urls.as_ref().unwrap()[1]
.front_hosts
.is_some());
assert!(
network_details.nym_api_urls.as_ref().unwrap()[1]
.front_hosts
.is_some()
);
}
#[test]
@@ -1210,11 +1247,13 @@ mod tests {
assert_eq!(api_url.url, "https://nym-frontdoor.vercel.app/api/");
assert_eq!(api_url.front_hosts.as_ref().unwrap().len(), 2);
assert!(api_url
.front_hosts
.as_ref()
.unwrap()
.contains(&"vercel.app".to_string()));
assert!(
api_url
.front_hosts
.as_ref()
.unwrap()
.contains(&"vercel.app".to_string())
);
}
#[test]
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::{
client::replies::reply_storage::{fs_backend, CombinedReplyStorage, ReplyStorageBackend},
client::replies::reply_storage::{CombinedReplyStorage, ReplyStorageBackend, fs_backend},
config,
config::Config,
error::ClientCoreError,
@@ -10,7 +10,7 @@ use crate::{
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
use nym_validator_client::{QueryHttpRpcNyxdClient, nyxd};
use std::{io, path::Path};
use time::OffsetDateTime;
use tracing::{error, info, trace};
@@ -24,7 +24,9 @@ async fn setup_fresh_backend<P: AsRef<Path>>(
let mut storage_backend = match fs_backend::Backend::init(db_path).await {
Ok(backend) => backend,
Err(err) => {
error!("setup_fresh_backend: Failed to setup persistent storage backend for our reply needs: {err}");
error!(
"setup_fresh_backend: Failed to setup persistent storage backend for our reply needs: {err}"
);
return Err(ClientCoreError::SurbStorageError {
source: Box::new(err),
});
@@ -93,7 +95,9 @@ pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
match fs_backend::Backend::try_load(db_path).await {
Ok(backend) => Ok(backend),
Err(err) => {
error!("setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
error!(
"setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future"
);
archive_corrupted_database(db_path).await?;
setup_fresh_backend(db_path, surb_config).await
}
@@ -1,8 +1,8 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::error::ClientCoreError;
use nym_client_core_gateways_storage::{
ActiveGateway, GatewayPublishedData, GatewayRegistration, GatewaysDetailsStore,
@@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
pub mod v1_1_33 {
use crate::config::disk_persistence::old_v1_1_33::CommonClientPathsV1_1_33;
use crate::config::disk_persistence::CommonClientPaths;
use crate::config::disk_persistence::old_v1_1_33::CommonClientPathsV1_1_33;
use crate::config::old_config_v1_1_33::OldGatewayEndpointConfigV1_1_33;
use crate::error::ClientCoreError;
@@ -11,8 +11,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use rand::{rngs::OsRng, CryptoRng, Rng};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use rand::{CryptoRng, Rng, rngs::OsRng};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -20,10 +20,10 @@ use tokio::sync::mpsc::error::TrySendError;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
use tokio::time::{Sleep, sleep};
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::{sleep, Sleep};
use wasmtimer::tokio::{Sleep, sleep};
pub struct LoopCoverTrafficStream<R>
where
@@ -179,7 +179,9 @@ impl LoopCoverTrafficStream<OsRng> {
) {
Ok(topology) => topology,
Err(err) => {
warn!("We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}");
warn!(
"We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}"
);
return;
}
};
@@ -13,10 +13,10 @@ use crate::config::disk_persistence::ClientKeysPaths;
#[cfg(not(target_arch = "wasm32"))]
use nym_crypto::asymmetric::{ed25519, x25519};
#[cfg(not(target_arch = "wasm32"))]
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
#[cfg(not(target_arch = "wasm32"))]
use nym_pemstore::KeyPairPath;
#[cfg(not(target_arch = "wasm32"))]
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
#[cfg(not(target_arch = "wasm32"))]
use nym_sphinx::acknowledgements::AckKey;
// we have to define it as an async trait since wasm storage is async
@@ -4,8 +4,8 @@
use async_trait::async_trait;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::GatewayClient;
use nym_gateway_client::error::GatewayClientError;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
@@ -2,13 +2,13 @@
// SPDX-License-Identifier: Apache-2.0
use super::action_controller::{AckActionSender, Action};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use futures::StreamExt;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::{
acknowledgements::{identifier::recover_identifier, AckKey},
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
acknowledgements::{AckKey, identifier::recover_identifier},
chunking::fragment::{COVER_FRAG_ID, FragmentIdentifier},
};
use nym_task::ShutdownToken;
use std::sync::Arc;
@@ -3,11 +3,11 @@
use super::PendingAcknowledgement;
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
use futures::channel::mpsc;
use futures::StreamExt;
use futures::channel::mpsc;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::Delay as SphinxDelay;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_task::ShutdownToken;
use std::collections::HashMap;
use std::sync::Arc;
@@ -9,8 +9,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use nym_task::connections::TransmissionLane;
use rand::{CryptoRng, Rng};
use tracing::*;
@@ -16,10 +16,10 @@ use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::{
Delay as SphinxDelay,
acknowledgements::AckKey,
addressing::clients::Recipient,
chunking::fragment::{Fragment, FragmentIdentifier},
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use rand::{CryptoRng, Rng};
@@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use super::{
action_controller::{AckActionSender, Action},
PendingAcknowledgement, RetransmissionRequestReceiver,
action_controller::{AckActionSender, Action},
};
use crate::client::real_messages_control::acknowledgement_control::PacketDestination;
use crate::client::real_messages_control::message_handler::{MessageHandler, PreparationError};
@@ -13,7 +13,7 @@ use futures::StreamExt;
use nym_sphinx::chunking::fragment::Fragment;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
use nym_task::{connections::TransmissionLane, ShutdownToken};
use nym_task::{ShutdownToken, connections::TransmissionLane};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
use tracing::*;
@@ -1,10 +1,10 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::action_controller::{AckActionSender, Action};
use super::SentPacketNotificationReceiver;
use super::action_controller::{AckActionSender, Action};
use futures::StreamExt;
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use nym_sphinx::chunking::fragment::{COVER_FRAG_ID, FragmentIdentifier};
use tracing::*;
/// Module responsible for starting up retransmission timers.
@@ -10,17 +10,17 @@ use crate::client::replies::reply_controller::MaxRetransmissions;
use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags};
use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit};
use nym_client_core_surb_storage::RetrievedReplySurb;
use nym_sphinx::Delay;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessage, ReplyMessage};
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessage, ReplyMessage};
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use nym_task::connections::TransmissionLane;
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
@@ -272,7 +272,9 @@ where
let primary_count = msg.required_packets(self.config.primary_packet_size);
let secondary_count = msg.required_packets(secondary_packet);
trace!("This message would require: {primary_count} primary packets or {secondary_count} secondary packets...");
trace!(
"This message would require: {primary_count} primary packets or {secondary_count} secondary packets..."
);
// if there would be no benefit in using the secondary packet - use the primary (duh)
if primary_count <= secondary_count {
trace!("so choosing primary for this message");
@@ -25,9 +25,9 @@ use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_task::ShutdownToken;
use rand::{rngs::OsRng, CryptoRng, Rng};
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use rand::{CryptoRng, Rng, rngs::OsRng};
use std::sync::Arc;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
@@ -17,11 +17,11 @@ use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketSize;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use nym_task::ShutdownToken;
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::ShutdownToken;
use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -29,11 +29,11 @@ use std::time::Duration;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
use tokio::time::{Sleep, sleep};
// use nym_wasm_utils::console_log;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::{sleep, Sleep};
use wasmtimer::tokio::{Sleep, sleep};
mod sending_delay_controller;
/// Configurable parameters of the `OutQueueControl`
@@ -230,7 +230,9 @@ where
let (next_message, fragment_id, packet_size) = match next_message {
StreamMessage::Cover => {
let cover_traffic_packet_size = self.loop_cover_message_size();
trace!("the next loop cover message will be put in a {cover_traffic_packet_size} packet");
trace!(
"the next loop cover message will be put in a {cover_traffic_packet_size} packet"
);
// TODO for way down the line: in very rare cases (during topology update) we might have
// to wait a really tiny bit before actually obtaining the permit hence messing with our
@@ -244,7 +246,9 @@ where
) {
Ok(topology) => topology,
Err(err) => {
warn!("We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}");
warn!(
"We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}"
);
return;
}
};
@@ -436,7 +440,7 @@ where
}
}
if let Some(ref mut next_delay) = &mut self.next_delay {
if let Some(next_delay) = &mut self.next_delay {
// it is not yet time to return a message
if next_delay.as_mut().poll(cx).is_pending() {
return Poll::Pending;
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::helpers::{get_time_now, Instant};
use crate::client::helpers::{Instant, get_time_now};
use std::time::Duration;
// The minimum time between increasing the average delay between packets. If we hit the ceiling in
@@ -5,20 +5,20 @@ use crate::client::helpers::get_time_now;
use crate::client::replies::{
reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
};
use futures::StreamExt;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::StreamExt;
use nym_crypto::asymmetric::x25519;
use nym_crypto::Digest;
use nym_crypto::asymmetric::x25519;
use nym_gateway_client::MixnetMessageReceiver;
use nym_sphinx::anonymous_replies::requests::{
RepliableMessage, RepliableMessageContent, ReplyMessage, ReplyMessageContent,
};
use nym_sphinx::anonymous_replies::{encryption_key::EncryptionKeyDigest, SurbEncryptionKey};
use nym_sphinx::anonymous_replies::{SurbEncryptionKey, encryption_key::EncryptionKeyDigest};
use nym_sphinx::message::{NymMessage, PlainMessage};
use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use nym_task::ShutdownToken;
use std::collections::HashSet;
use std::sync::Arc;
@@ -78,14 +78,19 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
let fragment = match self.message_receiver.recover_fragment(fragment_data) {
Err(err) => {
warn!("failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!");
warn!(
"failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!"
);
return None;
}
Ok(frag) => frag,
};
if self.recently_reconstructed.contains(&fragment.id()) {
debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
debug!(
"Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost",
fragment.id()
);
return None;
}
@@ -93,7 +98,9 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
match self.message_receiver.insert_new_fragment(fragment) {
Err(err) => match err {
MessageRecoveryError::MalformedReconstructedMessage { source, used_sets } => {
error!("message reconstruction failed - {source}. Attempting to re-use the message sets...");
error!(
"message reconstruction failed - {source}. Attempting to re-use the message sets..."
);
// TODO: should we really insert reconstructed sets? could this be abused for some attack?
for set_id in used_sets {
if !self.recently_reconstructed.insert(set_id) {
@@ -144,7 +151,9 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
&mut raw_fragment,
) {
Err(err) => {
warn!("failed to recover fragment data: {err}. The whole underlying message might be corrupted and unrecoverable!");
warn!(
"failed to recover fragment data: {err}. The whole underlying message might be corrupted and unrecoverable!"
);
return None;
}
Ok(frag_data) => frag_data,
@@ -275,7 +284,9 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
RepliableMessageContent::Heartbeat(content) => {
let additional_reply_surbs = content.additional_reply_surbs;
error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
error!(
"received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)"
);
(additional_reply_surbs, false)
}
RepliableMessageContent::DataV2(content) => {
@@ -304,7 +315,9 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
RepliableMessageContent::HeartbeatV2(content) => {
let additional_reply_surbs = content.additional_reply_surbs;
error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
error!(
"received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)"
);
(additional_reply_surbs, false)
}
};
@@ -380,7 +393,9 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
if let Some(sender) = &inner_guard.message_sender {
trace!("Sending reconstructed messages to announced sender");
if let Err(err) = sender.unbounded_send(reconstructed_messages) {
warn!("The reconstructed message receiver went offline without explicit notification (relevant error: - {err})");
warn!(
"The reconstructed message receiver went offline without explicit notification (relevant error: - {err})"
);
inner_guard.message_sender = None;
inner_guard.messages.extend(err.into_inner());
}
@@ -5,15 +5,15 @@ use crate::client::real_messages_control::acknowledgement_control::PendingAcknow
use crate::client::real_messages_control::message_handler::{
FragmentWithMaxRetransmissions, MessageHandler, PreparationError,
};
use crate::client::replies::reply_controller::key_rotation_helpers::SurbRefreshState;
use crate::client::replies::reply_controller::Config;
use crate::client::replies::reply_controller::key_rotation_helpers::SurbRefreshState;
use crate::client::topology_control::TopologyAccessor;
use crate::client::transmission_buffer::TransmissionBuffer;
use futures::channel::oneshot;
use nym_client_core_surb_storage::{ReceivedReplySurb, ReceivedReplySurbsMap};
use nym_crypto::aes::cipher::crypto_common::rand_core::CryptoRng;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_task::connections::{ConnectionId, TransmissionLane};
use nym_topology::NymTopologyMetadata;
@@ -50,7 +50,9 @@ impl SenderData {
let pending_retransmissions = self.pending_retransmissions.len();
let total_pending = pending_retransmissions + pending_replies;
debug!("total queue size: {total_pending} = pending data {pending_replies} + pending retransmission {pending_retransmissions}");
debug!(
"total queue size: {total_pending} = pending data {pending_replies} + pending retransmission {pending_retransmissions}"
);
total_pending
}
@@ -200,7 +202,9 @@ where
let total_required_surbs = total_queue + target_surbs_after_clearing_queue;
let total_available_surbs = pending_surbs + available_surbs;
debug!("available surbs: {available_surbs} pending surbs: {pending_surbs} threshold range: {min_surbs_threshold}..+{min_surbs_threshold_buffer}..{max_surbs_threshold}");
debug!(
"available surbs: {available_surbs} pending surbs: {pending_surbs} threshold range: {min_surbs_threshold}..+{min_surbs_threshold_buffer}..{max_surbs_threshold}"
);
// We should request more surbs if:
// 1. We haven't hit the maximum surb threshold, and
@@ -225,9 +229,13 @@ where
.is_none()
{
// don't report it every single time
warn!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
warn!(
"received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!"
);
} else {
trace!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
trace!(
"received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!"
);
}
return;
}
@@ -383,7 +391,9 @@ where
let (surbs_for_reply, _) = self.surbs_storage.get_reply_surbs(&target, to_take.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
error!(
"somehow different task has stolen our reply surbs! - this should have been impossible"
);
self.re_insert_pending_retransmission(&target, to_take);
return;
};
@@ -459,7 +469,9 @@ where
.get_reply_surbs(&target, to_send_clone.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
error!(
"somehow different task has stolen our reply surbs! - this should have been impossible"
);
self.re_insert_pending_replies(&target, to_send);
return;
};
@@ -543,7 +555,9 @@ where
let ack_ref = match timed_out_ack.upgrade() {
Some(ack) => ack,
None => {
debug!("we received the ack for one of the reply packets as we were putting it in the retransmission queue");
debug!(
"we received the ack for one of the reply packets as we were putting it in the retransmission queue"
);
return;
}
};
@@ -657,9 +671,13 @@ where
// only log at higher level if it's the first time this error has occurred in a while
if now - last_failure > time::Duration::seconds(30) {
warn!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
warn!(
"failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}"
)
} else {
debug!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
debug!(
"failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}"
)
}
}
}
@@ -681,7 +699,10 @@ where
.surbs_storage
.surbs_last_received_at(pending_reply_target)
else {
error!("we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!", retransmission_buf.total_size());
error!(
"we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!",
retransmission_buf.total_size()
);
to_remove.push(*pending_reply_target);
continue;
};
@@ -702,7 +723,9 @@ where
// if client is offline)
if vals.current_clear_rerequest_counter > max_rerequests {
to_remove.push(*pending_reply_target);
debug!("we have reached the maximum threshold of attempting to request surbs from {pending_reply_target}. dropping the sender");
debug!(
"we have reached the maximum threshold of attempting to request surbs from {pending_reply_target}. dropping the sender"
);
continue;
}
@@ -710,7 +733,10 @@ where
if diff > max_drop_wait {
to_remove.push(*pending_reply_target)
} else {
debug!("We haven't received any surbs in {} from {pending_reply_target}. Going to explicitly ask for more", humantime::format_duration(diff.unsigned_abs()));
debug!(
"We haven't received any surbs in {} from {pending_reply_target}. Going to explicitly ask for more",
humantime::format_duration(diff.unsigned_abs())
);
vals.increment_current_clear_rerequest_counter();
to_request.push(*pending_reply_target);
}
@@ -4,8 +4,8 @@
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
use futures::channel::{mpsc, oneshot};
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_task::connections::{ConnectionId, TransmissionLane};
use std::sync::Weak;
@@ -43,7 +43,9 @@ where
// 1. check whether we sent any surbs in the past to this recipient, otherwise
// they have no business in asking for more
if !self.tags_storage.exists(&recipient) {
warn!("{recipient} asked us for reply SURBs even though we never sent them any anonymous messages before!");
warn!(
"{recipient} asked us for reply SURBs even though we never sent them any anonymous messages before!"
);
return;
}
@@ -54,7 +56,12 @@ where
.reply_surbs
.maximum_allowed_reply_surb_request_size
{
warn!("The requested reply surb amount is larger than our maximum allowed ({amount} > {}). Lowering it to a more sane value...", self.config.reply_surbs.maximum_allowed_reply_surb_request_size);
warn!(
"The requested reply surb amount is larger than our maximum allowed ({amount} > {}). Lowering it to a more sane value...",
self.config
.reply_surbs
.maximum_allowed_reply_surb_request_size
);
amount = self
.config
.reply_surbs
@@ -23,7 +23,7 @@ use nym_sphinx::addressing::Recipient;
use nym_statistics_common::clients::{
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
};
use nym_task::{connections::TransmissionLane, ShutdownToken, ShutdownTracker};
use nym_task::{ShutdownToken, ShutdownTracker, connections::TransmissionLane};
use std::time::Duration;
/// Time interval between reporting statistics locally (logging/shutdown_token)
@@ -5,8 +5,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError, NymTopologyMetadata};
use nym_validator_client::models::KeyRotationId;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::{Notify, RwLock, RwLockReadGuard};
#[derive(Debug)]
@@ -63,7 +63,9 @@ impl TopologyRefresher {
trace!("Refreshing the topology");
if self.topology_accessor.controlled_manually() {
info!("topology is being controlled manually - we're going to wait until the control is released...");
info!(
"topology is being controlled manually - we're going to wait until the control is released..."
);
self.topology_accessor
.wait_for_released_manual_control()
.await;
@@ -138,6 +140,35 @@ impl TopologyRefresher {
}
}
pub async fn wait_for_initial_network(
&mut self,
timeout_duration: Duration,
) -> Result<(), NymTopologyError> {
info!(
"going to wait for at most {timeout_duration:?} for initial network to become online"
);
let deadline = sleep(timeout_duration);
tokio::pin!(deadline);
loop {
tokio::select! {
_ = &mut deadline => {
return Err(NymTopologyError::TimedOutWaitingForTopology)
}
_ = self.try_refresh() => {
if let Err(err) = self.ensure_topology_is_routable().await {
info!("network is still not routable...: {err}");
} else {
return Ok(())
}
sleep(self.refresh_rate).await
}
}
}
}
// it's perfectly fine if task is interrupted mid-refresh
// there's no data to persist or send over
pub async fn run(&mut self) {
@@ -3,8 +3,8 @@
use async_trait::async_trait;
use nym_mixnet_contract_common::EpochRewardedSet;
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
use nym_topology::NymTopology;
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
use nym_validator_client::nym_api::NymApiClientExt;
use rand::prelude::SliceRandom;
use rand::thread_rng;
@@ -82,7 +82,9 @@ impl NymApiTopologyProvider {
fn use_next_nym_api(&mut self) {
if self.nym_api_urls.len() == 1 {
warn!("There's only a single nym API available - it won't be possible to use a different one");
warn!(
"There's only a single nym API available - it won't be possible to use a different one"
);
return;
}
@@ -155,7 +157,10 @@ impl NymApiTopologyProvider {
let mixnodes = mixnodes_res.nodes;
if !gateways_res.metadata.consistency_check(&metadata) {
warn!("inconsistent nodes metadata between mixnodes and gateways calls! {metadata:?} and {:?}", gateways_res.metadata);
warn!(
"inconsistent nodes metadata between mixnodes and gateways calls! {metadata:?} and {:?}",
gateways_res.metadata
);
return None;
}
@@ -1,11 +1,11 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::helpers::{get_time_now, Instant};
use crate::client::helpers::{Instant, get_time_now};
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
use nym_sphinx::chunking::fragment::Fragment;
use nym_task::connections::TransmissionLane;
use rand::{seq::SliceRandom, Rng};
use rand::{Rng, seq::SliceRandom};
use std::{
collections::{HashMap, HashSet, VecDeque},
time::Duration,
+13 -5
View File
@@ -7,9 +7,9 @@ use nym_gateway_client::error::GatewayClientError;
use nym_task::RegistryAccessError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::ValidatorClientError;
use nym_validator_client::nym_api::error::NymAPIError;
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::ValidatorClientError;
use rand::distributions::WeightedError;
use std::error::Error;
use std::path::PathBuf;
@@ -56,7 +56,9 @@ pub enum ClientCoreError {
#[error("no gateways on network")]
NoGatewaysOnNetwork,
#[error("there are no more new gateways on the network - it seems this client has already registered with all nodes it could have")]
#[error(
"there are no more new gateways on the network - it seems this client has already registered with all nodes it could have"
)]
NoNewGatewaysAvailable,
#[error("list of nym apis is empty")]
@@ -127,7 +129,9 @@ pub enum ClientCoreError {
#[error("unexpected exit")]
UnexpectedExit,
#[error("this operation would have resulted in the gateway {gateway_id:?} key being overwritten without permission")]
#[error(
"this operation would have resulted in the gateway {gateway_id:?} key being overwritten without permission"
)]
ForbiddenGatewayKeyOverwrite { gateway_id: String },
#[error(
@@ -151,7 +155,9 @@ pub enum ClientCoreError {
#[error("attempted to obtain fresh gateway details whilst already knowing about one")]
UnexpectedGatewayDetails,
#[error("the provided gateway details (for gateway {gateway_id}) do not correspond to the shared keys")]
#[error(
"the provided gateway details (for gateway {gateway_id}) do not correspond to the shared keys"
)]
MismatchedGatewayDetails { gateway_id: String },
#[error("unable to upgrade config file from `{current_version}`")]
@@ -227,7 +233,9 @@ pub enum ClientCoreError {
source: url::ParseError,
},
#[error("this client (id: '{client_id}') has already been initialised before. If you want to add additional gateway, use `add-gateway` command")]
#[error(
"this client (id: '{client_id}') has already been initialised before. If you want to add additional gateway, use `add-gateway` command"
)]
AlreadyInitialised { client_id: String },
#[error("this client has already registered with gateway {gateway_id}")]
+5 -5
View File
@@ -5,13 +5,13 @@ use crate::error::ClientCoreError;
use crate::init::types::RegistrationResult;
use futures::{SinkExt, StreamExt};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::client::GatewayListeners;
use nym_gateway_client::GatewayClient;
use nym_gateway_client::client::GatewayListeners;
use nym_topology::node::RoutingNode;
use nym_validator_client::UserAgent;
use nym_validator_client::client::{IdentityKeyRef, NymApiClientExt};
use nym_validator_client::nym_nodes::SkimmedNodesWithMetadata;
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
use rand::{Rng, seq::SliceRandom};
#[cfg(unix)]
use std::os::fd::RawFd;
use std::{sync::Arc, time::Duration};
@@ -28,10 +28,10 @@ use nym_wasm_utils::websocket::JSWebsocket;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
+1 -1
View File
@@ -7,8 +7,8 @@ use crate::client::base_client::storage::helpers::{
has_gateway_details, load_active_gateway_details, load_client_keys, load_gateway_details,
store_gateway_details, update_stored_published_data_gateway,
};
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::error::ClientCoreError;
use crate::init::helpers::{
choose_gateway_by_latency, get_specified_gateway, uniformly_random_gateway,
+2 -2
View File
@@ -1,8 +1,8 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::config::Config;
use crate::error::ClientCoreError;
use crate::init::{setup_gateway, use_loaded_gateway_details};
@@ -10,8 +10,8 @@ use nym_client_core_gateways_storage::{
GatewayRegistration, GatewaysDetailsStore, RemoteGatewayDetails,
};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::client::{GatewayListeners, InitGatewayClient};
use nym_gateway_client::SharedSymmetricKey;
use nym_gateway_client::client::{GatewayListeners, InitGatewayClient};
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKey;
+1
View File
@@ -1,3 +1,4 @@
#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
use std::future::Future;
#[cfg(all(
@@ -1,6 +1,7 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::from_exact_iter`: please upgrade to generic-array 1.x - TODO
pub use backend::*;
pub use combined::CombinedReplyStorage;
pub use key_storage::SentReplyKeys;
@@ -130,7 +130,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
let req = QueryBalanceRequest {
address: address.to_string(),
denom: search_denom.to_string(),
denom: search_denom,
};
let res = self
@@ -199,6 +199,18 @@ impl NyxdClient<HttpClient, DirectSecp256k1HdWallet> {
let wallet = DirectSecp256k1HdWallet::checked_from_mnemonic(prefix, mnemonic)?;
Ok(Self::connect_with_signer(config, client, wallet))
}
pub fn connect_with_mnemonic_and_network_details<U>(
endpoint: U,
network_details: NymNetworkDetails,
mnemonic: bip39::Mnemonic,
) -> Result<DirectSigningHttpRpcNyxdClient, NyxdError>
where
U: TryInto<HttpClientUrl, Error = TendermintRpcError>,
{
let config = Config::try_from_nym_network_details(&network_details)?;
Self::connect_with_mnemonic(config, endpoint, mnemonic)
}
}
#[allow(deprecated)]
@@ -1,6 +1,8 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(clippy::derivable_impls)]
// MAX: surpressing warning for the moment, will be dealt with in a different PR (TODO)
use cosmwasm_schema::cw_serde;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
+5
View File
@@ -4,6 +4,7 @@
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use strum::IntoEnumIterator as _;
use thiserror::Error;
use time::{Date, OffsetDateTime};
@@ -315,6 +316,10 @@ impl TicketType {
_ => Err(UnknownTicketType),
}
}
pub fn exposed_iter() -> impl Iterator<Item = TicketType> {
TicketType::iter()
}
}
impl From<TicketType> for TicketTypeRepr {
+2
View File
@@ -1,6 +1,8 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `generic_array::GenericArray::<T, N>::from_exact_iter`: please upgrade to generic-array 1.x - TODO
#[cfg(feature = "asymmetric")]
pub mod asymmetric;
pub mod bech32_address_validation;
+1
View File
@@ -1,5 +1,6 @@
// Copyright 2020-2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
pub use nym_crypto::generic_array;
use nym_crypto::OutputSizeUser;
+3
View File
@@ -1,6 +1,9 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)]
// silences clippy warning: use of deprecated tuple variant `HttpClientError::GenericRequestFailure`: use another more strongly typed variant - this variant is only left for compatibility reasons - TODO
//! Nym HTTP API Client
//!
//! Centralizes and implements the core API client functionality. This crate provides custom,
+11 -5
View File
@@ -22,6 +22,16 @@ pub struct ChainDetails {
pub stake_denom: DenomDetailsOwned,
}
impl ChainDetails {
pub fn mainnet() -> Self {
ChainDetails {
bech32_account_prefix: mainnet::BECH32_PREFIX.into(),
mix_denom: mainnet::MIX_DENOM.into(),
stake_denom: mainnet::STAKE_DENOM.into(),
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize, JsonSchema)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NymContracts {
@@ -181,11 +191,7 @@ impl NymNetworkDetails {
// Consider caching this process (lazy static)
NymNetworkDetails {
network_name: mainnet::NETWORK_NAME.into(),
chain_details: ChainDetails {
bech32_account_prefix: mainnet::BECH32_PREFIX.into(),
mix_denom: mainnet::MIX_DENOM.into(),
stake_denom: mainnet::STAKE_DENOM.into(),
},
chain_details: ChainDetails::mainnet(),
endpoints: mainnet::validators(),
contracts: NymContracts {
mixnet_contract_address: parse_optional_str(mainnet::MIXNET_CONTRACT_ADDRESS),
+1
View File
@@ -30,6 +30,7 @@ libcrux-ml-kem = { workspace = true }
[dev-dependencies]
rand_chacha = "0.9.0"
anyhow = { workspace = true }
nym-test-utils = { workspace = true }
[lints]
+122 -59
View File
@@ -16,10 +16,6 @@ pub use nym_kkt_context as context;
#[cfg(test)]
mod test {
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
use rand09::RngCore;
use std::collections::BTreeMap;
use crate::keys::KEMKeys;
use crate::{
initiator::KKTInitiator,
@@ -29,9 +25,13 @@ mod test {
},
responder::KKTResponder,
};
use nym_kkt_ciphersuite::{Ciphersuite, HashFunction, HashLength, KEM, SignatureScheme};
use nym_test_utils::helpers::deterministic_rng_09;
use rand09::RngCore;
use std::collections::BTreeMap;
#[test]
fn test_kkt_psq_e2e_encrypted_carrier() {
fn test_kkt_psq_e2e_one_way_encrypted_carrier() {
let mut rng = rand09::rng();
let mut payload: Vec<u8> = vec![0u8; 900_000];
@@ -47,7 +47,6 @@ mod test {
HashFunction::Shake256,
] {
// generate kem public keys
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
@@ -64,20 +63,6 @@ mod test {
HashLength::Default.value(),
responder_kem.mc_eliece_encapsulation_key().as_ref(),
);
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let _i_dir_hash_mlkem = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
initiator_mlkem_keypair.public_key().as_slice(),
);
let _i_dir_hash_mceliece = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
initiator_mceliece_keypair.pk.as_ref(),
);
let init_hashes = BTreeMap::new();
@@ -128,41 +113,6 @@ mod test {
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// Mutual - MlKem
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::MlKem768,
hash_function,
SignatureScheme::Ed25519,
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
&mut rng,
ciphersuite,
&responder_x25519_keypair.pk,
&r_dir_hash_mlkem,
1u8,
Some(payload.clone()),
)
.unwrap();
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
// if we keep unverified keys, this should change
assert!(processed_request.remote_encapsulation_key.is_none());
let processed_response = initiator
.process_response(processed_request.response, 0)
.unwrap();
assert_eq!(
processed_response.encapsulation_key.as_bytes(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// OneWay - McEliece
{
@@ -195,7 +145,110 @@ mod test {
responder_kem.mc_eliece_encapsulation_key().as_ref()
)
}
}
}
#[test]
fn test_kkt_psq_e2e_mutual_encrypted_carrier() {
let mut rng = deterministic_rng_09();
let mut payload: Vec<u8> = vec![0u8; 50000];
rng.fill_bytes(&mut payload);
// generate kem public keys
let initiator_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let initiator_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let responder_mlkem_keypair = generate_keypair_mlkem(&mut rng);
let responder_mceliece_keypair = generate_keypair_mceliece(&mut rng);
let responder_x25519_keypair = generate_lp_keypair_x25519(&mut rng);
let initiator_kem = KEMKeys::new(initiator_mceliece_keypair, initiator_mlkem_keypair);
let responder_kem = KEMKeys::new(responder_mceliece_keypair, responder_mlkem_keypair);
let init_hashes = initiator_kem.encapsulation_keys_digests();
let responder = KKTResponder::new(
&responder_x25519_keypair,
&responder_kem,
&init_hashes,
&[
HashFunction::Blake3,
HashFunction::SHA256,
HashFunction::Shake128,
HashFunction::Shake256,
],
&[SignatureScheme::Ed25519],
&[1],
)
.unwrap();
for hash_function in [
HashFunction::Blake3,
HashFunction::SHA256,
HashFunction::Shake128,
HashFunction::Shake256,
] {
let r_dir_hash_mlkem = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
);
let r_dir_hash_mceliece = hash_encapsulation_key(
hash_function,
HashLength::Default.value(),
responder_kem.mc_eliece_encapsulation_key().as_ref(),
);
// Mutual - MlKem
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::MlKem768,
hash_function,
SignatureScheme::Ed25519,
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
&mut rng,
ciphersuite,
initiator_kem
.encoded_encapsulation_key(KEM::MlKem768)
.unwrap(),
&responder_x25519_keypair.pk,
&r_dir_hash_mlkem,
1u8,
Some(payload.clone()),
)
.unwrap();
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
assert_eq!(
processed_request
.remote_encapsulation_key
.unwrap()
.as_bytes(),
initiator_kem
.encapsulation_key(KEM::MlKem768)
.unwrap()
.as_bytes()
);
let processed_response = initiator
.process_response(processed_request.response, 0)
.unwrap();
assert_eq!(
processed_response.encapsulation_key.as_bytes(),
responder_kem.ml_kem768_encapsulation_key().as_slice(),
)
}
// Mutual - McEliece is not supported due to the key being too large
{
let ciphersuite = Ciphersuite::resolve_ciphersuite(
KEM::McEliece,
@@ -204,9 +257,12 @@ mod test {
None,
)
.unwrap();
let (mut initiator, request) = KKTInitiator::generate_one_way_request(
let (mut initiator, request) = KKTInitiator::generate_mutual_request(
&mut rng,
ciphersuite,
initiator_kem
.encoded_encapsulation_key(KEM::McEliece)
.unwrap(),
&responder_x25519_keypair.pk,
&r_dir_hash_mceliece,
1u8,
@@ -217,9 +273,16 @@ mod test {
let processed_request = responder.process_request(request, payload.len()).unwrap();
assert_eq!(processed_request.request_payload, payload);
// if we keep unverified keys, this should change
assert!(processed_request.remote_encapsulation_key.is_none());
assert_eq!(
processed_request
.remote_encapsulation_key
.unwrap()
.as_bytes(),
initiator_kem
.encapsulation_key(KEM::McEliece)
.unwrap()
.as_bytes()
);
let processed_response = initiator
.process_response(processed_request.response, 0)
+9 -9
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::LpError;
use crate::packet::{EncryptedLpPacket, InnerHeader, LpHeader, LpMessage, LpPacket};
use crate::packet::{EncryptedLpPacket, InnerHeader, LpFrame, LpHeader, LpPacket};
use bytes::BytesMut;
use libcrux_psq::Channel;
@@ -46,9 +46,9 @@ pub(crate) fn encrypt_lp_packet(
packet: LpPacket,
transport: &mut libcrux_psq::session::Transport,
) -> Result<EncryptedLpPacket, LpError> {
let mut plaintext = BytesMut::with_capacity(InnerHeader::SIZE + packet.message().len());
let mut plaintext = BytesMut::with_capacity(InnerHeader::SIZE + packet.frame().len());
packet.header().inner.encode(&mut plaintext);
packet.message().encode(&mut plaintext);
packet.frame().encode(&mut plaintext);
let ciphertext = encrypt_data(plaintext.as_ref(), transport)?;
@@ -67,14 +67,14 @@ pub(crate) fn decrypt_lp_packet(
let inner_header = InnerHeader::parse(&plaintext)?;
let payload = &plaintext[InnerHeader::SIZE..];
let message = LpMessage::decode(payload)?;
let frame = LpFrame::decode(payload)?;
Ok(LpPacket::new(
LpHeader {
outer: packet.outer_header(),
inner: inner_header,
},
message,
frame,
))
}
@@ -82,7 +82,7 @@ pub(crate) fn decrypt_lp_packet(
mod tests {
use crate::LpError;
use crate::codec::{decrypt_data, decrypt_lp_packet, encrypt_data, encrypt_lp_packet};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::packet::{EncryptedLpPacket, LpFrame, LpHeader, LpPacket};
use crate::peer::mock_peers;
use crate::psq::initiator::{build_psq_ciphersuite, build_psq_principal};
use crate::psq::{PSQ_MSG2_SIZE, psq_msg1_size, responder};
@@ -261,7 +261,7 @@ mod tests {
// happy path
let packet = LpPacket::new(
LpHeader::new(123, 0, 1),
LpMessage::new_opaque(b"foomp".to_vec()),
LpFrame::new_opaque(b"foomp".to_vec()),
);
let ciphertext = encrypt_lp_packet(packet.clone(), &mut init_transport).unwrap();
@@ -273,7 +273,7 @@ mod tests {
// incomplete ciphertext
let packet = LpPacket::new(
LpHeader::new(123, 1, 1),
LpMessage::new_opaque(b"foomp".to_vec()),
LpFrame::new_opaque(b"foomp".to_vec()),
);
let ciphertext2 = encrypt_lp_packet(packet, &mut init_transport).unwrap();
let l = ciphertext2.ciphertext().len();
@@ -285,7 +285,7 @@ mod tests {
// too small buffer
let packet = LpPacket::new(
LpHeader::new(123, 1, 1),
LpMessage::new_opaque(b"foomp".to_vec()),
LpFrame::new_opaque(b"foomp".to_vec()),
);
let ciphertext3 = encrypt_lp_packet(packet, &mut resp_transport).unwrap();
let malformed = EncryptedLpPacket::new(ciphertext3.outer_header(), vec![]);
+4 -4
View File
@@ -11,8 +11,8 @@ pub enum MalformedLpPacketError {
#[error("provided insufficient data to fully deserialise the struct")]
InsufficientData,
#[error("{0} is not a valid LpDataKind")]
InvalidLpDataKind(u16),
#[error("{0} is not a valid LpFrameKind value")]
InvalidLpFrameKind(u16),
#[error("invalid payload size: expected {expected}, got {actual}")]
InvalidPayloadSize { expected: usize, actual: usize },
@@ -27,7 +27,7 @@ pub enum MalformedLpPacketError {
}
impl MalformedLpPacketError {
pub fn invalid_data_kind(message_type: u16) -> Self {
MalformedLpPacketError::InvalidLpDataKind(message_type)
pub fn invalid_data_kind(frame_kind: u16) -> Self {
MalformedLpPacketError::InvalidLpFrameKind(frame_kind)
}
}
@@ -7,32 +7,32 @@ use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
#[derive(Debug, Clone, PartialEq)]
pub struct LpMessageHeader {
pub kind: LpMessageType,
pub message_attributes: [u8; 14],
pub struct LpFrameHeader {
pub kind: LpFrameKind,
pub frame_attributes: [u8; 14],
}
impl LpMessageHeader {
impl LpFrameHeader {
pub const SIZE: usize = 16; // message_kind(2) + message_attributes(14)
pub fn new(kind: LpMessageType, message_attributes: [u8; 14]) -> Self {
pub fn new(kind: LpFrameKind, frame_attributes: [u8; 14]) -> Self {
Self {
kind,
message_attributes,
frame_attributes,
}
}
pub fn new_no_attributes(kind: LpMessageType) -> Self {
pub fn new_no_attributes(kind: LpFrameKind) -> Self {
Self {
kind,
message_attributes: [0; 14],
frame_attributes: [0; 14],
}
}
/// Encode directly into a BytesMut buffer
pub fn encode(&self, dst: &mut BytesMut) {
dst.put_u16_le(self.kind as u16);
dst.put_slice(&self.message_attributes);
dst.put_slice(&self.frame_attributes);
}
pub fn parse(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
@@ -41,35 +41,35 @@ impl LpMessageHeader {
}
let raw_kind = u16::from_le_bytes([src[0], src[1]]);
let kind = LpMessageType::try_from(raw_kind)
let kind = LpFrameKind::try_from(raw_kind)
.map_err(|_| MalformedLpPacketError::invalid_data_kind(raw_kind))?;
#[allow(clippy::unwrap_used)]
let message_attributes = src[2..16].try_into().unwrap();
Ok(Self {
kind,
message_attributes,
frame_attributes: message_attributes,
})
}
}
/// Represent application data being sent in Transport mode
#[derive(Debug, Clone, PartialEq)]
pub struct LpMessage {
pub header: LpMessageHeader,
pub struct LpFrame {
pub header: LpFrameHeader,
pub content: Bytes,
}
impl AsRef<[u8]> for LpMessage {
impl AsRef<[u8]> for LpFrame {
fn as_ref(&self) -> &[u8] {
&self.content
}
}
impl LpMessage {
pub fn new(kind: LpMessageType, content: impl Into<Bytes>) -> Self {
impl LpFrame {
pub fn new(kind: LpFrameKind, content: impl Into<Bytes>) -> Self {
Self {
header: LpMessageHeader::new_no_attributes(kind),
header: LpFrameHeader::new_no_attributes(kind),
content: content.into(),
}
}
@@ -81,40 +81,103 @@ impl LpMessage {
}
pub fn decode(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
let header = LpMessageHeader::parse(src)?;
let content = src[LpMessageHeader::SIZE..].to_vec().into();
let header = LpFrameHeader::parse(src)?;
let content = src[LpFrameHeader::SIZE..].to_vec().into();
Ok(Self { header, content })
}
pub fn kind(&self) -> LpMessageType {
pub fn kind(&self) -> LpFrameKind {
self.header.kind
}
pub fn new_opaque(content: impl Into<Bytes>) -> Self {
Self::new(LpMessageType::Opaque, content)
Self::new(LpFrameKind::Opaque, content)
}
pub fn new_registration(data: impl Into<Bytes>) -> Self {
Self::new(LpMessageType::Registration, data)
Self::new(LpFrameKind::Registration, data)
}
pub fn new_forward(data: impl Into<Bytes>) -> Self {
Self::new(LpMessageType::Forward, data)
Self::new(LpFrameKind::Forward, data)
}
pub fn new_stream(attrs: StreamFrameAttributes, content: impl Into<Bytes>) -> Self {
Self {
header: LpFrameHeader::new(LpFrameKind::Stream, attrs.encode()),
content: content.into(),
}
}
pub(crate) fn len(&self) -> usize {
LpMessageHeader::SIZE + self.content.len()
LpFrameHeader::SIZE + self.content.len()
}
}
/// Represent kind of application data being sent in Transport mode
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, TryFromPrimitive)]
#[repr(u16)]
pub enum LpMessageType {
pub enum LpFrameKind {
Opaque = 0,
Registration = 1,
Forward = 2,
Stream = 3,
}
/// Message type within a `LpFrameKind::Stream` frame.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StreamMsgType {
/// Open a new stream. Content is optional initial data.
Open = 0,
/// Data on an existing stream.
Data = 1,
}
/// Parsed form of the 14-byte `frame_attributes` for `LpFrameKind::Stream`.
///
/// Wire layout (big-endian):
/// ```text
/// [0..8 ) stream_id : u64
/// [8 ) msg_type : u8 (0 = Open, 1 = Data)
/// [9..13) sequence_num : u32
/// [13 ) reserved : u8
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamFrameAttributes {
pub stream_id: u64,
pub msg_type: StreamMsgType,
pub sequence_num: u32,
}
impl StreamFrameAttributes {
pub fn encode(&self) -> [u8; 14] {
let mut buf = [0u8; 14];
buf[0..8].copy_from_slice(&self.stream_id.to_be_bytes());
buf[8] = self.msg_type as u8;
buf[9..13].copy_from_slice(&self.sequence_num.to_be_bytes());
buf
}
pub fn parse(attrs: &[u8; 14]) -> Result<Self, MalformedLpPacketError> {
let stream_id = u64::from_be_bytes(attrs[0..8].try_into().unwrap());
let msg_type = match attrs[8] {
0 => StreamMsgType::Open,
1 => StreamMsgType::Data,
other => {
return Err(MalformedLpPacketError::DeserialisationFailure(format!(
"invalid stream msg_type: {other}"
)));
}
};
let sequence_num = u32::from_be_bytes(attrs[9..13].try_into().unwrap());
Ok(Self {
stream_id,
msg_type,
sequence_num,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
+10 -10
View File
@@ -6,12 +6,12 @@ use bytes::{BufMut, BytesMut};
use std::fmt::{Debug, Formatter};
pub use error::MalformedLpPacketError;
pub use frame::{ForwardPacketData, LpFrame};
pub use header::{InnerHeader, LpHeader, OuterHeader};
pub use message::{ForwardPacketData, LpMessage};
pub mod error;
pub mod frame;
pub mod header;
pub mod message;
pub mod replay;
pub mod utils;
@@ -81,7 +81,7 @@ impl EncryptedLpPacket {
#[derive(Clone, PartialEq)]
pub struct LpPacket {
pub(crate) header: LpHeader,
pub(crate) message: LpMessage,
pub(crate) frame: LpFrame,
}
impl Debug for LpPacket {
@@ -91,16 +91,16 @@ impl Debug for LpPacket {
}
impl LpPacket {
pub fn new(header: LpHeader, message: LpMessage) -> Self {
Self { header, message }
pub fn new(header: LpHeader, frame: LpFrame) -> Self {
Self { header, frame }
}
pub fn message(&self) -> &LpMessage {
&self.message
pub fn frame(&self) -> &LpFrame {
&self.frame
}
pub fn into_message(self) -> LpMessage {
self.message
pub fn into_frame(self) -> LpFrame {
self.frame
}
pub fn header(&self) -> &LpHeader {
@@ -115,6 +115,6 @@ impl LpPacket {
pub(crate) fn dbg_encode(&self, dst: &mut BytesMut) {
self.header.dbg_encode(dst);
self.message.encode(dst)
self.frame.encode(dst)
}
}
+4
View File
@@ -143,6 +143,10 @@ impl LpRemotePeer {
.ok_or(LpError::NoKnownKEMKeyDigests { kem, hash_function })
.cloned()
}
pub fn kem_key_digests(&self) -> &BTreeMap<KEM, KEMKeyDigests> {
&self.expected_kem_key_digests
}
}
impl From<DHPublicKey> for LpRemotePeer {
+5
View File
@@ -65,6 +65,7 @@ impl LpPeerConfig {
rng.random(),
)
}
/// Creates a new client to exit config.
/// Inputs:
/// hop_id: this value must be in the range (1..=15). This function returns an error if this is not the case.
@@ -79,6 +80,7 @@ impl LpPeerConfig {
{
Self::new(rng, hop_id, true, false, censorship_resistance)
}
/// Creates a new client to an intermediate node config.
/// Inputs:
/// hop_id: this value must be in the range (1..=14). This function returns an error if this is not the case.
@@ -130,6 +132,7 @@ impl LpPeerConfig {
rng.random(),
)
}
fn build(
hop_id: u8,
is_exit: bool,
@@ -147,6 +150,7 @@ impl LpPeerConfig {
seed,
}
}
fn build_checked(
hop_id: u8,
is_exit: bool,
@@ -203,6 +207,7 @@ impl LpPeerConfig {
output_bytes[4..].copy_from_slice(&self.seed);
output_bytes
}
pub fn deserialize(bytes: &[u8]) -> Result<Self, LpError> {
if bytes.len() != LP_PEER_CONFIG_SIZE {
return Err(LpError::DeserializationError(format!(
+43 -13
View File
@@ -24,10 +24,31 @@ use nym_kkt::message::{KKTRequest, KKTResponse};
use rand09::SeedableRng;
use tracing::debug;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HandshakeMode {
// Client <> Entry
OneWayEntry,
// Client <> Exit
OneWayExit,
// Entry <> Exit
MutualInternode,
// in the future more variants will be supported (such as individual mix hops)
}
impl HandshakeMode {
pub fn is_mutual(&self) -> bool {
matches!(self, HandshakeMode::MutualInternode)
}
}
pub struct PSQHandshakeStateInitiator<'a, S> {
pub(super) inner_state: PSQHandshakeState<'a, S>,
pub(super) initiator_data: InitiatorData,
pub(super) mutual: bool,
/// The mode of the handshake (mutual node-node, client-entry, entry-exit)
pub(super) mode: HandshakeMode,
}
pub(crate) fn build_psq_principal<R>(
@@ -78,13 +99,23 @@ impl<'a, S> PSQHandshakeStateInitiator<'a, S>
where
S: LpHandshakeChannel + Unpin,
{
pub fn set_mutual_kkt(mut self) -> Result<Self, LpError> {
if self.inner_state.local_peer.kem_keypairs.is_none() {
return Err(LpError::PSQMutualInitiatorMissingKemKey);
}
fn lp_peer_config<R>(&self, rng: &mut R) -> Result<LpPeerConfig, LpError>
where
R: rand09::CryptoRng,
{
// for now we don't support censorship resistance flag
let censorship_resistance = false;
self.mutual = true;
Ok(self)
match self.mode {
HandshakeMode::OneWayEntry => Ok(LpPeerConfig::new_client_to_entry(
rng,
censorship_resistance,
)),
HandshakeMode::OneWayExit => {
LpPeerConfig::new_client_to_exit(rng, 1, censorship_resistance)
}
HandshakeMode::MutualInternode => LpPeerConfig::new_node_to_node(rng),
}
}
/// Attempt to send KKT request to begin the handshake
@@ -132,7 +163,7 @@ where
let ciphersuite = self.inner_state.local_peer.ciphersuite();
let kem = ciphersuite.kem();
let lp_peer_config = LpPeerConfig::new_client_to_entry(rng, false);
let lp_peer_config = self.lp_peer_config(rng)?;
// 1. retrieve the expected kem key hash. if we don't know it,
let dir_hash = self
@@ -141,7 +172,7 @@ where
.expected_kem_key_hash(ciphersuite)?;
// 2. prepare and send KKT request
let (mut initiator, kkt_request) = if self.mutual {
let (mut initiator, kkt_request) = if self.mode.is_mutual() {
// this has been verified when setting the mutual flag
let Some(local_encapsulation_key) = self.inner_state.local_peer.encoded_kem_key(kem)
else {
@@ -273,8 +304,8 @@ mod tests {
resp.ciphersuite = ciphersuite;
let initiator_data = InitiatorData::new(1, resp_remote);
let handshake_init =
PSQHandshakeState::new(conn_init, init).as_initiator(initiator_data);
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(initiator_data, HandshakeMode::OneWayEntry)?;
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
@@ -396,8 +427,7 @@ mod tests {
let initiator_data = InitiatorData::new(1, resp_remote);
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(initiator_data)
.set_mutual_kkt()?;
.as_initiator(initiator_data, HandshakeMode::MutualInternode)?;
let mut init_rng = DeterministicRng09Send::new(u64_seeded_rng_09(1));
+22 -9
View File
@@ -12,6 +12,8 @@ mod helpers;
pub mod initiator;
pub mod responder;
use crate::LpError;
use crate::psq::initiator::HandshakeMode;
pub use initiator::PSQHandshakeStateInitiator;
pub use responder::PSQHandshakeStateResponder;
@@ -107,12 +109,20 @@ where
}
}
pub fn as_initiator(self, initiator_data: InitiatorData) -> PSQHandshakeStateInitiator<'a, S> {
PSQHandshakeStateInitiator {
pub fn as_initiator(
self,
initiator_data: InitiatorData,
mode: HandshakeMode,
) -> Result<PSQHandshakeStateInitiator<'a, S>, LpError> {
if mode.is_mutual() && self.local_peer.kem_keypairs.is_none() {
return Err(LpError::PSQMutualInitiatorMissingKemKey);
}
Ok(PSQHandshakeStateInitiator {
initiator_data,
inner_state: self,
mutual: false,
}
mode,
})
}
pub fn as_responder(self, responder_data: ResponderData) -> PSQHandshakeStateResponder<'a, S> {
@@ -160,8 +170,10 @@ mod tests {
resp.ciphersuite = ciphersuite;
let resp_remote = resp.as_remote();
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(InitiatorData::new(1, resp_remote));
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
InitiatorData::new(1, resp_remote),
HandshakeMode::OneWayEntry,
)?;
let handshake_resp =
PSQHandshakeState::new(conn_resp, resp).as_responder(ResponderData::default());
@@ -232,9 +244,10 @@ mod tests {
let resp_remote = resp.as_remote();
let init_remote = init.as_remote();
let handshake_init = PSQHandshakeState::new(conn_init, init)
.as_initiator(InitiatorData::new(1, resp_remote))
.set_mutual_kkt()?;
let handshake_init = PSQHandshakeState::new(conn_init, init).as_initiator(
InitiatorData::new(1, resp_remote),
HandshakeMode::MutualInternode,
)?;
let handshake_resp = PSQHandshakeState::new(conn_resp, resp).as_responder(
ResponderData::default()
.with_initiator_kem_hashes(init_remote.expected_kem_key_digests),
+63 -26
View File
@@ -6,9 +6,10 @@
//! This module implements session management functionality, including replay protection
use crate::codec::{decrypt_lp_packet, encrypt_lp_packet};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::packet::{EncryptedLpPacket, LpFrame, LpHeader, LpPacket};
use crate::peer::{LpLocalPeer, LpRemotePeer};
use crate::peer_config::LpReceiverIndex;
use crate::psq::initiator::HandshakeMode;
use crate::psq::{
InitiatorData, PSQHandshakeState, PSQHandshakeStateInitiator, PSQHandshakeStateResponder,
ResponderData,
@@ -19,6 +20,8 @@ use crate::{LpError, replay::ReceivingKeyCounterValidator};
use libcrux_psq::handshake::types::{Authenticator, DHPublicKey};
use libcrux_psq::session::{Session, SessionBinding};
use nym_kkt::keys::EncapsulationKey;
use nym_kkt_ciphersuite::{KEM, KEMKeyDigests};
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
/// Represents inputs that drive the state machine transitions.
@@ -29,7 +32,7 @@ pub enum LpInput {
ReceivePacket(EncryptedLpPacket),
/// Application wants to send data (only valid in Transport state).
SendData(LpMessage),
SendFrame(LpFrame),
}
/// Represents actions the state machine requests the environment to perform.
@@ -39,7 +42,7 @@ pub enum LpAction {
SendPacket(EncryptedLpPacket),
/// Deliver decrypted application data received from the peer.
DeliverData(LpMessage),
DeliverFrame(LpFrame),
}
pub type SessionId = [u8; 32];
@@ -154,12 +157,34 @@ impl LpTransportSession {
local_peer: LpLocalPeer,
remote_peer: LpRemotePeer,
remote_protocol_version: u8,
) -> PSQHandshakeStateInitiator<'_, S>
mode: HandshakeMode,
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
where
S: LpHandshakeChannel + Unpin,
{
PSQHandshakeState::new(connection, local_peer)
.as_initiator(InitiatorData::new(remote_protocol_version, remote_peer))
PSQHandshakeState::new(connection, local_peer).as_initiator(
InitiatorData::new(remote_protocol_version, remote_peer),
mode,
)
}
/// Helper function to create `PSQHandshakeState` for the handshake initiator for mutual KKT
pub fn psq_handshake_initiator_mutual_internode<S>(
connection: &'_ mut S,
local_peer: LpLocalPeer,
remote_peer: LpRemotePeer,
remote_protocol_version: u8,
) -> Result<PSQHandshakeStateInitiator<'_, S>, LpError>
where
S: LpHandshakeChannel + Unpin,
{
Self::psq_handshake_initiator(
connection,
local_peer,
remote_peer,
remote_protocol_version,
HandshakeMode::MutualInternode,
)
}
/// Helper function to create `PSQHandshakeState` for the handshake responder
@@ -173,6 +198,19 @@ impl LpTransportSession {
PSQHandshakeState::new(connection, local_peer).as_responder(ResponderData::default())
}
/// Helper function to create `PSQHandshakeState` for the handshake responder for mutual KKT
pub fn psq_handshake_responder_mutual<S>(
connection: &'_ mut S,
local_peer: LpLocalPeer,
initiator_kem_hashes: BTreeMap<KEM, KEMKeyDigests>,
) -> PSQHandshakeStateResponder<'_, S>
where
S: LpHandshakeChannel + Unpin,
{
PSQHandshakeState::new(connection, local_peer)
.as_responder(ResponderData::default().with_initiator_kem_hashes(initiator_kem_hashes))
}
pub fn session_binding(&self) -> &PersistentSessionBinding {
&self.session_binding
}
@@ -196,10 +234,10 @@ impl LpTransportSession {
self.protocol_version
}
pub fn next_packet(&mut self, message: LpMessage) -> Result<LpPacket, LpError> {
pub fn next_packet(&mut self, frame: LpFrame) -> Result<LpPacket, LpError> {
let counter = self.next_counter();
let header = LpHeader::new(self.receiver_index(), counter, self.protocol_version);
let packet = LpPacket::new(header, message);
let packet = LpPacket::new(header, frame);
Ok(packet)
}
@@ -261,22 +299,19 @@ impl LpTransportSession {
self.receiving_counter.current_packet_cnt()
}
/// Encrypts a produced application using the established transport session
/// and produce an `EncryptedLpPacket`
/// Wrap the provided `LpFrame` into an `LpPacket` and encrypt its content using the established transport session
/// to produce an `EncryptedLpPacket`
///
/// # Arguments
///
/// * `data` - plaintext data to encrypt
/// * `frame` - structured `LpFrame` to wrap and encrypt
///
/// # Returns
///
/// * `Ok(EncryptedLpPacket)` containing the encrypted message ciphertext.
/// * `Err(LpError)` if the session is not in transport mode or encryption fails.
pub(crate) fn encrypt_application_data(
&mut self,
data: LpMessage,
) -> Result<EncryptedLpPacket, LpError> {
let packet = self.next_packet(data)?;
pub(crate) fn wrap_lp_frame(&mut self, frame: LpFrame) -> Result<EncryptedLpPacket, LpError> {
let packet = self.next_packet(frame)?;
encrypt_lp_packet(packet, &mut self.active_transport)
}
@@ -284,7 +319,7 @@ impl LpTransportSession {
///
/// # Arguments
///
/// * `ciphertext` - The encrypted packet
/// * `packet` - The encrypted packet
///
/// # Returns
///
@@ -320,11 +355,11 @@ impl LpTransportSession {
self.receiving_counter_mark(ctr)?;
// 4. deliver the message
Ok(LpAction::DeliverData(packet.message))
Ok(LpAction::DeliverFrame(packet.frame))
}
LpInput::SendData(data) => {
LpInput::SendFrame(data) => {
// Encrypt and send application data
match self.encrypt_application_data(data) {
match self.wrap_lp_frame(data) {
Ok(packet) => Ok(LpAction::SendPacket(packet)),
Err(e) => Err(e),
}
@@ -438,8 +473,9 @@ mod tests {
// --- Transport Phase ---
println!("--- Step 1: Initiator sends data ---");
let data_to_send_1 = LpMessage::new_opaque(b"hello responder".to_vec());
let init_actions_4 = initiator.process_input(LpInput::SendData(data_to_send_1.clone()));
let data_to_send_1 = LpFrame::new_opaque(b"hello responder".to_vec());
let init_actions_4 =
initiator.process_input(LpInput::SendFrame(data_to_send_1.clone()));
let data_packet_1 = if let Ok(LpAction::SendPacket(packet)) = init_actions_4 {
packet.clone()
} else {
@@ -449,7 +485,7 @@ mod tests {
println!("--- Step 2: Responder receives data ---");
let resp_actions_5 = responder.process_input(LpInput::ReceivePacket(data_packet_1));
let resp_data_1 = if let Ok(LpAction::DeliverData(data)) = resp_actions_5 {
let resp_data_1 = if let Ok(LpAction::DeliverFrame(data)) = resp_actions_5 {
data
} else {
panic!("Responder should deliver data");
@@ -457,8 +493,9 @@ mod tests {
assert_eq!(resp_data_1, data_to_send_1);
println!("--- Step 3: Responder sends data ---");
let data_to_send_2 = LpMessage::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 = responder.process_input(LpInput::SendData(data_to_send_2.clone()));
let data_to_send_2 = LpFrame::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 =
responder.process_input(LpInput::SendFrame(data_to_send_2.clone()));
let data_packet_2 = if let Ok(LpAction::SendPacket(packet)) = resp_actions_6 {
packet.clone()
} else {
@@ -468,7 +505,7 @@ mod tests {
println!("--- Step 4: Initiator receives data ---");
let init_actions_5 = initiator.process_input(LpInput::ReceivePacket(data_packet_2));
if let Ok(LpAction::DeliverData(data)) = init_actions_5 {
if let Ok(LpAction::DeliverFrame(data)) = init_actions_5 {
assert_eq!(data, data_to_send_2);
} else {
panic!("Initiator should deliver data");
+18 -20
View File
@@ -1,6 +1,6 @@
#[cfg(test)]
mod tests {
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::packet::{EncryptedLpPacket, LpFrame};
use crate::session::{LpAction, LpInput};
use crate::{LpError, SessionManager, SessionsMock};
use nym_kkt_ciphersuite::{IntoEnumIterator, KEM};
@@ -9,7 +9,7 @@ mod tests {
trait ActionExtract {
fn ciphertext(self) -> EncryptedLpPacket;
fn data(self) -> LpMessage;
fn data(self) -> LpFrame;
}
impl ActionExtract for LpAction {
@@ -21,8 +21,8 @@ mod tests {
}
}
fn data(self) -> LpMessage {
if let LpAction::DeliverData(data) = self {
fn data(self) -> LpFrame {
if let LpAction::DeliverFrame(data) = self {
data
} else {
panic!("invalid action");
@@ -54,7 +54,7 @@ mod tests {
// --- A sends to B ---
let plaintext_a = format!("A->B Message {i}").into_bytes();
let ciphertext_a = session_manager_1
.send_data(peer_a_sm, LpMessage::new_opaque(plaintext_a.clone()))
.send_frame(peer_a_sm, LpFrame::new_opaque(plaintext_a.clone()))
.unwrap()
.ciphertext();
@@ -68,7 +68,7 @@ mod tests {
// --- B sends to A ---
let plaintext_b = format!("B->A Message {i}").into_bytes();
let ciphertext_b = session_manager_2
.send_data(peer_b_sm, LpMessage::new_opaque(plaintext_b.clone()))
.send_frame(peer_b_sm, LpFrame::new_opaque(plaintext_b.clone()))
.unwrap()
.ciphertext();
@@ -183,15 +183,13 @@ mod tests {
// --- 3. Simulate Data Transfer via process_input ---
println!("Starting data transfer simulation via process_input...");
let plaintext_a_to_b =
LpMessage::new_opaque(b"Hello from A via process_input!".to_vec());
let plaintext_b_to_a =
LpMessage::new_opaque(b"Hello from B via process_input!".to_vec());
let plaintext_a_to_b = LpFrame::new_opaque(b"Hello from A via process_input!".to_vec());
let plaintext_b_to_a = LpFrame::new_opaque(b"Hello from B via process_input!".to_vec());
// --- A sends to B ---
println!(" A sends to B");
let action_a_send = session_manager_1
.process_input(session_id, LpInput::SendData(plaintext_a_to_b.clone()))
.process_input(session_id, LpInput::SendFrame(plaintext_a_to_b.clone()))
.expect("A SendData failed");
let data_packet_a = action_a_send.ciphertext();
@@ -202,7 +200,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(data_packet_a))
.expect("B ReceivePacket (data) failed");
if let LpAction::DeliverData(data) = action_b_recv {
if let LpAction::DeliverFrame(data) = action_b_recv {
assert_eq!(data, plaintext_a_to_b, "Decrypted data mismatch A->B");
println!(
" B successfully decrypted: {:?}",
@@ -215,7 +213,7 @@ mod tests {
// --- B sends to A ---
println!(" B sends to A");
let action_b_send = session_manager_2
.process_input(session_id, LpInput::SendData(plaintext_b_to_a.clone()))
.process_input(session_id, LpInput::SendFrame(plaintext_b_to_a.clone()))
.expect("B SendData failed");
let data_packet_b = action_b_send.ciphertext();
@@ -229,7 +227,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(data_packet_b))
.expect("A ReceivePacket (data) failed");
if let LpAction::DeliverData(data) = action_a_recv {
if let LpAction::DeliverFrame(data) = action_a_recv {
assert_eq!(data, plaintext_b_to_a, "Decrypted data mismatch B->A");
println!(
" A successfully decrypted: {:?}",
@@ -258,11 +256,11 @@ mod tests {
println!("Testing out-of-order reception via process_input...");
// A prepares N+1 then N
let data_n_plus_1 = LpMessage::new_opaque(b"Message N+1".to_vec());
let data_n = LpMessage::new_opaque(b"Message N".to_vec());
let data_n_plus_1 = LpFrame::new_opaque(b"Message N+1".to_vec());
let data_n = LpFrame::new_opaque(b"Message N".to_vec());
let action_send_n1 = session_manager_1
.process_input(session_id, LpInput::SendData(data_n_plus_1.clone()))
.process_input(session_id, LpInput::SendFrame(data_n_plus_1.clone()))
.unwrap();
let packet_n1 = match action_send_n1 {
LpAction::SendPacket(p) => p,
@@ -270,7 +268,7 @@ mod tests {
};
let action_send_n = session_manager_1
.process_input(session_id, LpInput::SendData(data_n.clone()))
.process_input(session_id, LpInput::SendFrame(data_n.clone()))
.unwrap();
let packet_n = match action_send_n {
LpAction::SendPacket(p) => p,
@@ -284,7 +282,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(packet_n1))
.unwrap();
match action_recv_n1 {
LpAction::DeliverData(d) => assert_eq!(d, data_n_plus_1, "Data N+1 mismatch"),
LpAction::DeliverFrame(d) => assert_eq!(d, data_n_plus_1, "Data N+1 mismatch"),
_ => panic!("Expected DeliverData for N+1"),
}
@@ -294,7 +292,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(packet_n))
.unwrap();
match action_recv_n {
LpAction::DeliverData(d) => assert_eq!(d, data_n, "Data N mismatch"),
LpAction::DeliverFrame(d) => assert_eq!(d, data_n, "Data N mismatch"),
_ => panic!("Expected DeliverData for N"),
}
+4 -4
View File
@@ -6,7 +6,7 @@
//! This module implements session lifecycle management functionality, handling
//! creation, retrieval, and storage of sessions.
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::packet::{EncryptedLpPacket, LpFrame};
use crate::peer_config::LpReceiverIndex;
use crate::{LpError, LpTransportSession};
use std::collections::HashMap;
@@ -39,12 +39,12 @@ impl SessionManager {
self.with_session_mut(lp_id, |sm| sm.process_input(input))?
}
pub fn send_data(
pub fn send_frame(
&mut self,
lp_id: LpReceiverIndex,
data: LpMessage,
frame: LpFrame,
) -> Result<LpAction, LpError> {
self.process_input(lp_id, LpInput::SendData(data))
self.process_input(lp_id, LpInput::SendFrame(frame))
}
pub fn receive_packet(
@@ -1,5 +1,6 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
pub mod identifier;
pub mod key;
@@ -1,6 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated struct `nym_crypto::generic_array::GenericArray`: please upgrade to generic-array 1.x - TODO
pub mod encryption_key;
pub mod reply_surb;
pub mod requests;
@@ -0,0 +1,64 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nyxd_scraper_shared::error::ScraperError;
use nyxd_scraper_shared::storage::FullBlockInformation;
use nyxd_scraper_shared::watcher::{NyxdWatcher, WatcherConfig};
use nyxd_scraper_shared::{BlockModule, ParsedTransactionResponse, TxModule};
struct FancyBlockModule;
struct FancyTxModule;
#[async_trait::async_trait]
impl BlockModule for FancyBlockModule {
async fn handle_block(&mut self, block: &FullBlockInformation) -> Result<(), ScraperError> {
println!("🚀 got new block for height {}", block.block.header.height);
// should be false
println!("results scraped: {}", block.results.is_some());
// should be false
println!("validators scraped: {}", block.validators.is_some());
// should be true
println!("transactions scraped: {}", block.transactions.is_some());
println!();
Ok(())
}
}
#[async_trait::async_trait]
impl TxModule for FancyTxModule {
async fn handle_tx(&mut self, tx: &ParsedTransactionResponse) -> Result<(), ScraperError> {
println!(
"✨ got new tx for height {}: {} ({} msgs)",
tx.block.header.height,
tx.hash,
tx.parsed_messages.len()
);
Ok(())
}
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
let cfg = WatcherConfig {
websocket_url: "wss://rpc.nymtech.net/websocket".parse()?,
rpc_url: "https://rpc.nymtech.net".parse()?,
};
let watcher = NyxdWatcher::builder(cfg)
.with_block_module(FancyBlockModule)
.with_tx_module(FancyTxModule)
.build_and_start()
.await?;
// run for 30s before shutting down
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
watcher.stop().await;
Ok(())
}
@@ -0,0 +1,94 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::NyxdScraperStorageError;
use crate::{NyxdScraperStorage, NyxdScraperTransaction, ParsedTransactionResponse};
use tendermint::Block;
use tendermint::block::Commit;
use tendermint_rpc::endpoint::validators::Response;
use thiserror::Error;
#[derive(Clone)]
pub struct Ephemeral;
#[derive(Debug, Error)]
#[error("no storage backend enabled")]
pub struct EphemeralStorageError;
pub struct EphemeralTransaction;
#[async_trait::async_trait]
impl NyxdScraperTransaction for EphemeralTransaction {
async fn commit(self) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_validators(&mut self, _: &Response) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_block_data(
&mut self,
_: &Block,
_: i64,
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_commits(
&mut self,
_: &Commit,
_: &Response,
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_txs(
&mut self,
_: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_messages(
&mut self,
_: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn update_last_processed(&mut self, _: i64) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
}
#[async_trait::async_trait]
impl NyxdScraperStorage for Ephemeral {
type StorageTransaction = EphemeralTransaction;
async fn initialise(_: &str, _: &bool) -> Result<Self, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn prune_storage(&self, _: u32, _: u32) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
}
@@ -2,13 +2,16 @@
// SPDX-License-Identifier: Apache-2.0
use crate::PruningOptions;
use crate::block_processor::ephemeral_storage::Ephemeral;
use crate::block_processor::helpers::split_request_range;
use crate::block_processor::types::BlockToProcess;
use crate::block_requester::BlockRequest;
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::storage::{NyxdScraperStorage, NyxdScraperTransaction, persist_block};
use crate::rpc_client::{RetrievalConfig, RpcClient};
use crate::storage::{
FullBlockInformation, NyxdScraperStorage, NyxdScraperTransaction, persist_block,
};
use futures::StreamExt;
use std::cmp::max;
use std::collections::{BTreeMap, HashSet, VecDeque};
@@ -22,6 +25,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, trace, warn};
mod ephemeral_storage;
mod helpers;
pub(crate) mod pruning;
pub(crate) mod types;
@@ -77,20 +81,148 @@ impl BlockProcessorConfig {
}
}
pub struct BlockProcessor<S> {
pub struct BlockProcessorPersistence<S> {
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
last_pruned_height: u32,
storage: S,
}
impl<S> BlockProcessorPersistence<S>
where
S: NyxdScraperStorage,
{
pub(crate) async fn new(
config: BlockProcessorConfig,
synced: Arc<Notify>,
storage: S,
) -> Result<Self, ScraperError> {
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(pruned_height = %last_pruned_height, "setting up block processor...");
Ok(Self {
config,
synced,
last_pruned_height,
storage,
})
}
#[must_use]
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.config.pruning_options = pruning_options;
self
}
async fn stored_last_processed_height(&self) -> Result<u32, ScraperError> {
let last_processed = self.storage.get_last_processed_height().await?;
let last_processed_height = last_processed.try_into().unwrap_or_default();
Ok(last_processed_height)
}
async fn persist_block(
&mut self,
full_info: &FullBlockInformation,
) -> Result<(), ScraperError> {
// process the entire block as a transaction so that if anything fails,
// we wouldn't end up with a corrupted storage.
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(full_info, &mut tx, self.config.store_precommits).await?;
let commit_start = Instant::now();
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
let last_processed_height = full_info.block.header.height.value() as u32;
if let Err(err) = self.maybe_prune_storage(last_processed_height).await {
error!("failed to prune the storage: {err}");
}
Ok(())
}
#[instrument(skip(self))]
async fn prune_storage(&mut self, last_processed_height: u32) -> Result<(), ScraperError> {
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = last_processed_height - keep_recent;
info!(
keep_recent,
oldest_to_keep = last_to_keep,
"pruning the storage"
);
let lowest: u32 = self
.storage
.lowest_block_height()
.await?
.unwrap_or_default()
.try_into()
.unwrap_or_default();
let to_prune = last_to_keep.saturating_sub(lowest);
match to_prune {
v if v > 1000 => warn!("approximately {v} blocks worth of data will be pruned"),
v if v > 100 => info!("approximately {v} blocks worth of data will be pruned"),
0 => trace!("no blocks to prune"),
v => debug!("approximately {v} blocks worth of data will be pruned"),
}
if to_prune == 0 {
self.last_pruned_height = last_processed_height;
return Ok(());
}
self.storage
.prune_storage(last_to_keep, last_processed_height)
.await?;
self.last_pruned_height = last_processed_height;
Ok(())
}
async fn maybe_prune_storage(
&mut self,
last_processed_height: u32,
) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= last_processed_height {
self.prune_storage(last_processed_height).await?;
}
Ok(())
}
}
pub struct BlockProcessor<S = Ephemeral> {
cancel: CancellationToken,
last_processed_height: u32,
last_processed_at: Instant,
pending_sync: PendingSync,
queued_blocks: BTreeMap<u32, BlockToProcess>,
/// Specifies how much data to actually retrieve per block
retrieval_config: RetrievalConfig,
rpc_client: RpcClient,
incoming: UnboundedReceiverStream<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: S,
persistence: Option<BlockProcessorPersistence<S>>,
// future work: rather than sending each msg to every msg module,
// let them subscribe based on `type_url` inside the message itself
@@ -105,108 +237,97 @@ impl<S> BlockProcessor<S>
where
S: NyxdScraperStorage,
{
pub async fn new(
config: BlockProcessorConfig,
pub fn new(
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: S,
rpc_client: RpcClient,
) -> Result<Self, ScraperError> {
let last_processed = storage.get_last_processed_height().await?;
let last_processed_height = last_processed.try_into().unwrap_or_default();
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(last_processed_height = %last_processed_height, pruned_height = %last_pruned_height, "setting up block processor...");
Ok(BlockProcessor {
config,
) -> Self {
BlockProcessor {
cancel,
synced,
last_processed_height,
last_pruned_height,
last_processed_height: Default::default(),
last_processed_at: Instant::now(),
pending_sync: Default::default(),
queued_blocks: Default::default(),
retrieval_config: RetrievalConfig::default(),
rpc_client,
incoming: incoming.into(),
block_requester,
storage,
persistence: None,
block_modules: vec![],
tx_modules: vec![],
msg_modules: vec![],
})
}
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.config.pruning_options = pruning_options;
#[must_use]
pub fn with_retrieval_config(mut self, retrieval_config: RetrievalConfig) -> Self {
self.retrieval_config = retrieval_config;
self
}
pub async fn with_persistence(
mut self,
persistence: BlockProcessorPersistence<S>,
) -> Result<Self, ScraperError> {
let last_processed_height = persistence.stored_last_processed_height().await?;
debug!(last_processed_height = %last_processed_height, "setting up block processor...");
self.persistence = Some(persistence);
Ok(self)
}
pub(super) async fn process_block(
&mut self,
block: BlockToProcess,
) -> Result<(), ScraperError> {
info!("processing block at height {}", block.height);
let full_info = self.rpc_client.try_get_full_details(block).await?;
let full_info = self
.rpc_client
.try_get_full_details(block, self.retrieval_config)
.await?;
debug!(
"this block has {} transaction(s)",
full_info.transactions.len()
);
for tx in &full_info.transactions {
debug!("{} has {} message(s)", tx.hash, tx.tx.body.messages.len());
for (index, msg) in tx.tx.body.messages.iter().enumerate() {
debug!("{index}: {:?}", msg.type_url)
if let Some(tx_info) = &full_info.transactions {
debug!("this block has {} transaction(s)", tx_info.len());
for tx in tx_info {
debug!("{} has {} message(s)", tx.hash, tx.tx.body.messages.len());
for (index, msg) in tx.tx.body.messages.iter().enumerate() {
debug!("{index}: {:?}", msg.type_url)
}
}
}
// process the entire block as a transaction so that if anything fails,
// we won't end up with a corrupted storage.
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
// if we have enabled persistence, do try to store the block information
if let Some(persistence) = &mut self.persistence {
persistence.persist_block(&full_info).await?;
}
// let the modules do whatever they want
// the ones wanting the full block:
for block_module in &mut self.block_modules {
block_module.handle_block(&full_info, &mut tx).await?;
block_module.handle_block(&full_info).await?;
}
// the ones wanting transactions:
for block_tx in full_info.transactions {
for tx_module in &mut self.tx_modules {
tx_module.handle_tx(&block_tx, &mut tx).await?;
}
// the ones concerned with individual messages
for (index, msg) in block_tx.tx.body.messages.iter().enumerate() {
for msg_module in &mut self.msg_modules {
if msg.type_url == msg_module.type_url() {
msg_module
.handle_msg(index, msg, &block_tx, &mut tx)
.await?
// the ones wanting transactions (assuming tx retrieval is enabled):
if let Some(tx_info) = &full_info.transactions {
for block_tx in tx_info {
for tx_module in &mut self.tx_modules {
tx_module.handle_tx(block_tx).await?;
}
// the ones concerned with individual messages
for (index, msg) in block_tx.tx.body.messages.iter().enumerate() {
for msg_module in &mut self.msg_modules {
if msg.type_url == msg_module.type_url() {
msg_module.handle_msg(index, msg, block_tx).await?
}
}
}
}
}
let commit_start = Instant::now();
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
self.last_processed_height = full_info.block.header.height.value() as u32;
self.last_processed_at = Instant::now();
if let Err(err) = self.maybe_prune_storage().await {
error!("failed to prune the storage: {err}");
}
Ok(())
}
@@ -284,62 +405,6 @@ where
Ok(())
}
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
keep_recent,
oldest_to_keep = last_to_keep,
"pruning the storage"
);
let lowest: u32 = self
.storage
.lowest_block_height()
.await?
.unwrap_or_default()
.try_into()
.unwrap_or_default();
let to_prune = last_to_keep.saturating_sub(lowest);
match to_prune {
v if v > 1000 => warn!("approximately {v} blocks worth of data will be pruned"),
v if v > 100 => info!("approximately {v} blocks worth of data will be pruned"),
0 => trace!("no blocks to prune"),
v => debug!("approximately {v} blocks worth of data will be pruned"),
}
if to_prune == 0 {
self.last_pruned_height = self.last_processed_height;
return Ok(());
}
self.storage
.prune_storage(last_to_keep, self.last_processed_height)
.await?;
self.last_pruned_height = self.last_processed_height;
Ok(())
}
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
Ok(())
}
async fn next_incoming(&mut self, block: BlockToProcess) {
let height = block.height;
@@ -379,8 +444,10 @@ where
self.try_request_pending().await;
if self.pending_sync.is_empty() {
self.synced.notify_one();
if let Some(persistence) = &self.persistence
&& self.pending_sync.is_empty()
{
persistence.synced.notify_one();
}
}
@@ -410,7 +477,14 @@ where
assert!(self.pending_sync.is_empty());
info!("attempting to run startup resync...");
self.maybe_prune_storage().await?;
let Some(persistence) = self.persistence.as_mut() else {
// without data persistence, we're always starting from scratch
return Ok(());
};
persistence
.maybe_prune_storage(self.last_processed_height)
.await?;
let latest_block = self.rpc_client.current_block_height().await? as u32;
info!("obtained latest block height: {latest_block}");
@@ -419,10 +493,10 @@ where
info!("we have already processed some blocks in the past - attempting to resume...");
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let keep_recent = persistence.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
if !self.config.pruning_options.strategy.is_nothing() {
if !persistence.config.pruning_options.strategy.is_nothing() {
self.last_processed_height = max(self.last_processed_height, last_to_keep);
}
@@ -440,7 +514,7 @@ where
// this is the first time starting up
if self.last_processed_height == 0 {
info!("this is the first time starting up");
let Some(starting_height) = self.config.explicit_starting_block_height else {
let Some(starting_height) = persistence.config.explicit_starting_block_height else {
info!("no starting block height set - will use the default behaviour");
// nothing to do
return Ok(());
@@ -451,7 +525,9 @@ where
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");
if earliest_available > starting_height && self.config.use_best_effort_start_height {
if earliest_available > starting_height
&& persistence.config.use_best_effort_start_height
{
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
@@ -2,8 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::ScraperError;
use crate::helpers;
use std::collections::HashMap;
use std::collections::BTreeMap;
use tendermint::{Block, Hash, abci, block, tx};
use tendermint_rpc::endpoint::{block as block_endpoint, block_results, validators};
use tendermint_rpc::event::{Event, EventData};
@@ -28,9 +27,9 @@ pub struct ParsedTransactionResponse {
pub proof: Option<tx::Proof>,
pub parsed_messages: HashMap<usize, serde_json::Value>,
pub parsed_messages: BTreeMap<usize, serde_json::Value>,
pub parsed_message_urls: HashMap<usize, String>,
pub parsed_message_urls: BTreeMap<usize, String>,
pub block: Block,
}
@@ -41,32 +40,13 @@ pub struct FullBlockInformation {
pub block: Block,
/// All of the emitted events alongside any tx results.
pub results: block_results::Response,
pub results: Option<block_results::Response>,
/// Validator set for this particular block
pub validators: validators::Response,
pub validators: Option<validators::Response>,
/// Transaction results from this particular block
pub transactions: Vec<ParsedTransactionResponse>,
}
impl FullBlockInformation {
pub fn ensure_proposer(&self) -> Result<(), ScraperError> {
let block_proposer = self.block.header.proposer_address;
if !self
.validators
.validators
.iter()
.any(|v| v.address == block_proposer)
{
let proposer = helpers::validator_consensus_address(block_proposer)?;
return Err(ScraperError::BlockProposerNotInValidatorSet {
height: self.block.header.height.value() as u32,
proposer: proposer.to_string(),
});
}
Ok(())
}
pub transactions: Option<Vec<ParsedTransactionResponse>>,
}
pub(crate) struct BlockToProcess {
+8 -4
View File
@@ -172,16 +172,20 @@ pub enum ScraperError {
#[source]
error: base64::DecodeError,
},
#[error("no modules configured for the chain watcher")]
NoModulesConfigured,
#[error("no storage configured for the chain watcher")]
NoStorageConfigured,
}
impl ScraperError {
pub fn tx_begin_failure(source: NyxdScraperStorageError) -> ScraperError
where {
pub fn tx_begin_failure(source: NyxdScraperStorageError) -> ScraperError {
ScraperError::StorageTxBeginFailure { source }
}
pub fn tx_commit_failure(source: NyxdScraperStorageError) -> ScraperError
where {
pub fn tx_commit_failure(source: NyxdScraperStorageError) -> ScraperError {
ScraperError::StorageTxCommitFailure { source }
}
}
+2
View File
@@ -11,6 +11,8 @@ pub mod modules;
pub(crate) mod rpc_client;
pub(crate) mod scraper;
pub mod storage;
pub(crate) mod subscriber;
pub mod watcher;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use block_processor::types::ParsedTransactionResponse;
@@ -3,14 +3,9 @@
use crate::block_processor::types::FullBlockInformation;
use crate::error::ScraperError;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
pub trait BlockModule {
async fn handle_block(
&mut self,
block: &FullBlockInformation,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
async fn handle_block(&mut self, block: &FullBlockInformation) -> Result<(), ScraperError>;
}
@@ -3,7 +3,6 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
use cosmrs::Any;
@@ -16,6 +15,5 @@ pub trait MsgModule {
index: usize,
msg: &Any,
tx: &ParsedTransactionResponse,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,14 +3,9 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
pub trait TxModule {
async fn handle_tx(
&mut self,
tx: &ParsedTransactionResponse,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
async fn handle_tx(&mut self, tx: &ParsedTransactionResponse) -> Result<(), ScraperError>;
}
+86 -21
View File
@@ -9,15 +9,32 @@ use crate::helpers::tx_hash;
use crate::{Any, MessageRegistry, default_message_registry};
use futures::StreamExt;
use futures::future::join3;
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::sync::Arc;
use tendermint::Hash;
use tendermint::{Block, Hash};
use tendermint_rpc::endpoint::{block, block_results, tx, validators};
use tendermint_rpc::{Client, HttpClient, Paging};
use tokio::sync::Mutex;
use tracing::{debug, instrument, warn};
use url::Url;
#[derive(Debug, Clone, Copy)]
pub struct RetrievalConfig {
pub get_validators: bool,
pub get_transactions: bool,
pub get_block_results: bool,
}
impl Default for RetrievalConfig {
fn default() -> Self {
Self {
get_validators: true,
get_transactions: true,
get_block_results: true,
}
}
}
#[derive(Clone)]
pub struct RpcClient {
// right now I don't care about anything nym specific, so a simple http client is sufficient,
@@ -53,27 +70,15 @@ impl RpcClient {
}
}
#[instrument(skip(self, block), fields(height = block.height))]
pub async fn try_get_full_details(
fn parse_transactions(
&self,
block: BlockToProcess,
) -> Result<FullBlockInformation, ScraperError> {
debug!("getting complete block details");
let height = block.height;
// make all the http requests concurrently
let (results, validators, raw_transactions) = join3(
self.get_block_results(height),
self.get_validators_details(height),
self.get_transaction_results(&block.block.data),
)
.await;
let raw_transactions = raw_transactions?;
raw_transactions: Vec<tx::Response>,
block: &Block,
) -> Result<Vec<ParsedTransactionResponse>, ScraperError> {
let mut transactions = Vec::with_capacity(raw_transactions.len());
for raw_tx in raw_transactions {
let mut parsed_messages = HashMap::new();
let mut parsed_message_urls = HashMap::new();
let mut parsed_messages = BTreeMap::new();
let mut parsed_message_urls = BTreeMap::new();
let tx = cosmrs::Tx::from_bytes(&raw_tx.tx).map_err(|source| {
ScraperError::TxParseFailure {
hash: raw_tx.hash,
@@ -97,9 +102,33 @@ impl RpcClient {
proof: raw_tx.proof,
parsed_messages,
parsed_message_urls,
block: block.block.clone(),
block: block.clone(),
})
}
Ok(transactions)
}
#[instrument(skip(self, block), fields(height = block.height))]
pub async fn try_get_full_details(
&self,
block: BlockToProcess,
config: RetrievalConfig,
) -> Result<FullBlockInformation, ScraperError> {
debug!("getting complete block details");
let height = block.height;
// make all the http requests run concurrently
let (results, validators, raw_transactions) = join3(
self.maybe_get_block_results(height, config.get_block_results),
self.maybe_get_validators_details(height, config.get_validators),
self.maybe_get_transaction_results(&block.block.data, config.get_transactions),
)
.await;
let transactions = match raw_transactions? {
Some(raw) => Some(self.parse_transactions(raw, &block.block)?),
None => None,
};
Ok(FullBlockInformation {
block: block.block,
@@ -140,6 +169,18 @@ impl RpcClient {
})
}
async fn maybe_get_block_results(
&self,
height: u32,
retrieve: bool,
) -> Result<Option<block_results::Response>, ScraperError> {
if retrieve {
self.get_block_results(height).await.map(Some)
} else {
Ok(None)
}
}
pub(crate) async fn current_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting current block height");
@@ -196,6 +237,18 @@ impl RpcClient {
inner.into_values().collect()
}
async fn maybe_get_transaction_results(
&self,
raw: &[Vec<u8>],
retrieve: bool,
) -> Result<Option<Vec<tx::Response>>, ScraperError> {
if retrieve {
self.get_transaction_results(raw).await.map(Some)
} else {
Ok(None)
}
}
#[instrument(skip(self, tx_hash), fields(tx_hash = %tx_hash), err(Display))]
async fn get_transaction_result(&self, tx_hash: Hash) -> Result<tx::Response, ScraperError> {
debug!("getting tx results");
@@ -224,4 +277,16 @@ impl RpcClient {
source: Box::new(source),
})
}
async fn maybe_get_validators_details(
&self,
height: u32,
retrieve: bool,
) -> Result<Option<validators::Response>, ScraperError> {
if retrieve {
self.get_validators_details(height).await.map(Some)
} else {
Ok(None)
}
}
}
@@ -1,15 +1,15 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::PruningOptions;
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
use crate::block_processor::{BlockProcessor, BlockProcessorConfig, BlockProcessorPersistence};
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::scraper::subscriber::ChainSubscriber;
use crate::storage::NyxdScraperStorage;
use crate::subscriber::ChainSubscriber;
use futures::future::join_all;
use std::marker::PhantomData;
use std::sync::Arc;
@@ -22,8 +22,6 @@ use tokio_util::task::TaskTracker;
use tracing::{error, info};
use url::Url;
mod subscriber;
#[derive(Default, Clone, Copy)]
pub struct StartingBlockOpts {
pub start_block_height: Option<u32>,
@@ -72,7 +70,7 @@ where
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
let rpc_client = RpcClient::new(&scraper.config.rpc_url)?;
let rpc_client = scraper.rpc_client.clone();
// create the tasks
let block_requester = BlockRequester::new(
@@ -90,14 +88,19 @@ where
);
let mut block_processor = BlockProcessor::new(
block_processor_config,
scraper.cancel_token.clone(),
scraper.startup_sync.clone(),
processing_rx,
req_tx,
scraper.storage.clone(),
rpc_client,
)
.with_persistence(
BlockProcessorPersistence::new(
block_processor_config,
scraper.startup_sync.clone(),
scraper.storage.clone(),
)
.await?,
)
.await?;
block_processor.set_block_modules(self.block_modules);
block_processor.set_tx_modules(self.tx_modules);
@@ -126,16 +129,19 @@ where
}
}
#[must_use]
pub fn with_block_module<M: BlockModule + Send + 'static>(mut self, module: M) -> Self {
self.block_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_tx_module<M: TxModule + Send + 'static>(mut self, module: M) -> Self {
self.tx_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_msg_module<M: MsgModule + Send + 'static>(mut self, module: M) -> Self {
self.msg_modules.push(Box::new(module));
self
@@ -209,9 +215,12 @@ where
let (req_tx, _) = channel(5);
let mut block_processor = self
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
.new_block_processor_with_persistence(
req_tx.clone(),
processing_rx,
PruningOptions::nothing(),
)
.await?;
let block = self.rpc_client.get_basic_block_details(height).await?;
@@ -234,9 +243,12 @@ where
let (req_tx, _) = channel(5);
let mut block_processor = self
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
.new_block_processor_with_persistence(
req_tx.clone(),
processing_rx,
PruningOptions::nothing(),
)
.await?;
let mut current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
@@ -345,10 +357,24 @@ where
)
}
async fn new_block_processor(
fn new_block_processor(
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> BlockProcessor<S> {
BlockProcessor::<S>::new(
self.cancel_token.clone(),
processing_rx,
req_tx,
self.rpc_client.clone(),
)
}
async fn new_block_processor_with_persistence(
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
pruning_options: impl Into<Option<PruningOptions>>,
) -> Result<BlockProcessor<S>, ScraperError> {
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
@@ -357,16 +383,27 @@ where
self.config.start_block.use_best_effort_start_height,
);
BlockProcessor::<S>::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
req_tx,
self.storage.clone(),
self.rpc_client.clone(),
)
.await
let persistence = match pruning_options.into() {
Some(options) => BlockProcessorPersistence::new(
block_processor_config,
self.startup_sync.clone(),
self.storage.clone(),
)
.await?
.with_pruning(options),
None => {
BlockProcessorPersistence::new(
block_processor_config,
self.startup_sync.clone(),
self.storage.clone(),
)
.await?
}
};
self.new_block_processor(req_tx, processing_rx)
.with_persistence(persistence)
.await
}
async fn new_chain_subscriber(
@@ -388,7 +425,9 @@ where
// create the tasks
let block_requester = self.new_block_requester(req_rx, processing_tx.clone());
let block_processor = self.new_block_processor(req_tx, processing_rx).await?;
let block_processor = self
.new_block_processor_with_persistence(req_tx, processing_rx, None)
.await?;
let chain_subscriber = self.new_chain_subscriber(processing_tx).await?;
// spawn them
+41 -15
View File
@@ -3,6 +3,7 @@
use crate::error::ScraperError;
use async_trait::async_trait;
use tendermint::block;
use thiserror::Error;
use tracing::warn;
@@ -89,6 +90,25 @@ pub trait NyxdScraperTransaction {
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError>;
}
fn ensure_proposer_present(
block_header: &block::Header,
validators: &validators::Response,
) -> Result<(), ScraperError> {
let block_proposer = block_header.proposer_address;
if !validators
.validators
.iter()
.any(|v| v.address == block_proposer)
{
let proposer = crate::helpers::validator_consensus_address(block_proposer)?;
return Err(ScraperError::BlockProposerNotInValidatorSet {
height: block_header.height.value() as u32,
proposer: proposer.to_string(),
});
}
Ok(())
}
pub async fn persist_block<Tx>(
block: &FullBlockInformation,
tx: &mut Tx,
@@ -97,28 +117,34 @@ pub async fn persist_block<Tx>(
where
Tx: NyxdScraperTransaction,
{
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
// SANITY CHECK: make sure the block proposer is present in the validator set
block.ensure_proposer()?;
tx.persist_validators(&block.validators).await?;
let total_gas = match block.transactions.as_ref() {
Some(txs) => crate::helpers::tx_gas_sum(txs),
None => 0,
};
tx.persist_block_data(&block.block, total_gas).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
tx.persist_commits(commit, &block.validators).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
if let Some(validators) = &block.validators {
// SANITY CHECK: make sure the block proposer is present in the validator set
ensure_proposer_present(&block.block.header, validators)?;
tx.persist_validators(validators).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
tx.persist_commits(commit, validators).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
}
// persist txs
tx.persist_txs(&block.transactions).await?;
if let Some(transactions) = &block.transactions {
// persist txs
tx.persist_txs(transactions).await?;
// persist messages (inside the transactions)
tx.persist_messages(&block.transactions).await?;
// persist messages (inside the transactions)
tx.persist_messages(transactions).await?;
}
tx.update_last_processed(block.block.header.height.into())
.await?;

Some files were not shown because too many files have changed in this diff Show More