Compare commits

..

9 Commits

Author SHA1 Message Date
Jędrzej Stuczyński 0b97d2bf93 wip 2026-03-06 15:37:49 +00:00
Jędrzej Stuczyński 16928a48a9 bugfix: setting correct LpPeerConfig during handshake 2026-03-06 10:18:45 +00:00
Jędrzej Stuczyński 24891adddf additional wiring of nested connections control 2026-03-05 16:07:50 +00:00
Jędrzej Stuczyński 6a42a8dd49 chore: add unit test for mutual KKT 2026-03-05 10:56:26 +00:00
Jędrzej Stuczyński bc0c2e5d19 basic node<>node KKT 2026-03-05 10:38:13 +00:00
Jędrzej Stuczyński 225178f95a reorganise control structure 2026-03-04 16:58:49 +00:00
Jędrzej Stuczyński 32cfb3fff8 basic node LP handler 2026-03-04 15:59:40 +00:00
Jędrzej Stuczyński f62a74a6af wip 2026-03-04 13:48:11 +00:00
Jędrzej Stuczyński fe9275274c scaffolding persistent gateway connections 2026-03-04 13:47:53 +00:00
377 changed files with 14411 additions and 33918 deletions
+1 -1
View File
@@ -7,7 +7,7 @@ jobs:
build:
runs-on: arc-ubuntu-22.04
env:
NEXT_PUBLIC_SITE_URL: https://nym.com/docs
NEXT_PUBLIC_SITE_URL: https://nymtech.net/docs
defaults:
run:
working-directory: documentation/docs
-2
View File
@@ -48,8 +48,6 @@ jobs:
run: pnpm i
- name: Build project
run: pnpm run build
- name: Generate sitemap
run: npx next-sitemap
- name: Move files to /dist/
run: ../scripts/move-to-dist.sh
+1 -1
View File
@@ -8,7 +8,7 @@ on:
jobs:
sonarqube:
name: SonarQube
runs-on: arc-linux-latest
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
with:
+2
View File
@@ -2,6 +2,8 @@ name: nightly-build
on:
workflow_dispatch:
schedule:
- cron: '14 1 * * *'
jobs:
build:
@@ -9,7 +9,7 @@ on:
jobs:
integration-tests:
runs-on: arc-linux-latest
runs-on: ubuntu-latest
env:
API_BASE_URL: http://localhost:8000
+1 -1
View File
@@ -23,7 +23,7 @@ env:
jobs:
check-milestone:
name: Check Milestone
runs-on: arc-linux-latest
runs-on: ubuntu-latest
steps:
- if: github.event.pull_request.milestone == null && contains( env.LABELS, 'no-milestone' ) == false
run: exit 1
-3
View File
@@ -36,9 +36,6 @@ jobs:
with:
go-version: "1.24.6"
- name: Update root CA certificate bundle
run: ./wasm/mix-fetch/go-mix-conn/scripts/update-root-certs.sh
- name: Install dependencies
run: yarn
-1
View File
@@ -46,7 +46,6 @@ storybook-static
**/.DS_Store
cpu-cycles/libcpucycles/build
foxyfox.env
scratch.txt
.next
ppa-private-key.b64
-46
View File
@@ -4,52 +4,6 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2026.5-raclette] (2026-03-10)
- bugfix: correctly populate gateway probe LP data ([#6533])
- chore: introduce additional prometheus metrics for registration times ([#6532])
- bugfix: lp information to have proper snake_case on API endpoints ([#6531])
- removed redundant LP states ([#6509])
- chore: removed all matrix notifications from github actions ([#6495])
- feat: Lewes Protocol with PSQv2 ([#6491])
- build(deps): bump minimatch from 3.1.2 to 3.1.4 in /documentation/docs ([#6486])
- build(deps): bump bn.js from 4.12.2 to 4.12.3 in /documentation/docs ([#6484])
- build(deps): bump bn.js from 4.12.2 to 4.12.3 ([#6483])
- build(deps): bump ajv from 8.17.1 to 8.18.0 in /clients/native/examples/js-examples/websocket ([#6478])
- build(deps): bump ajv from 6.12.6 to 6.14.0 in /documentation/docs ([#6477])
- build(deps): bump minimatch and glob in /documentation/scripts/post-process ([#6476])
- build(deps): bump hono from 4.11.9 to 4.12.0 ([#6475])
- build(deps): bump keccak from 0.1.5 to 0.1.6 ([#6472])
- build(deps-dev): bump qs from 6.14.1 to 6.14.2 in /clients/native/examples/js-examples/websocket ([#6466])
- build(deps): bump mikefarah/yq from 4.52.2 to 4.52.4 ([#6465])
- Otel minimal v2 ([#6464])
- build(deps): bump qs and express in /wasm/client/internal-dev ([#6461])
- bugfix: restore 'latest_measurement' field for nym-node /verloc endpoint ([#6452])
- build(deps-dev): bump webpack from 5.77.0 to 5.104.1 in /wasm/node-tester/internal-dev ([#6451])
- Max/mixfetch concurrent test ([#6417])
[#6533]: https://github.com/nymtech/nym/pull/6533
[#6532]: https://github.com/nymtech/nym/pull/6532
[#6531]: https://github.com/nymtech/nym/pull/6531
[#6509]: https://github.com/nymtech/nym/pull/6509
[#6495]: https://github.com/nymtech/nym/pull/6495
[#6491]: https://github.com/nymtech/nym/pull/6491
[#6486]: https://github.com/nymtech/nym/pull/6486
[#6484]: https://github.com/nymtech/nym/pull/6484
[#6483]: https://github.com/nymtech/nym/pull/6483
[#6478]: https://github.com/nymtech/nym/pull/6478
[#6477]: https://github.com/nymtech/nym/pull/6477
[#6476]: https://github.com/nymtech/nym/pull/6476
[#6475]: https://github.com/nymtech/nym/pull/6475
[#6472]: https://github.com/nymtech/nym/pull/6472
[#6466]: https://github.com/nymtech/nym/pull/6466
[#6465]: https://github.com/nymtech/nym/pull/6465
[#6464]: https://github.com/nymtech/nym/pull/6464
[#6461]: https://github.com/nymtech/nym/pull/6461
[#6452]: https://github.com/nymtech/nym/pull/6452
[#6451]: https://github.com/nymtech/nym/pull/6451
[#6417]: https://github.com/nymtech/nym/pull/6417
## [2026.4-quark] (2026-02-24)
- Enhance CI workflow with feature inputs ([#6462])
Generated
+1374 -1908
View File
File diff suppressed because it is too large Load Diff
+8 -12
View File
@@ -157,8 +157,8 @@ members = [
"tools/internal/mixnet-connectivity-check",
# "tools/internal/sdk-version-bump",
"tools/internal/ssl-inject",
"tools/internal/localnet-orchestrator",
"tools/internal/localnet-orchestrator/dkg-bypass-contract",
"tools/internal/testnet-manager",
"tools/internal/testnet-manager/dkg-bypass-contract",
"tools/internal/validator-status-check",
"tools/nym-cli",
"tools/nym-id-cli",
@@ -171,10 +171,9 @@ members = [
"wasm/mix-fetch",
"wasm/node-tester",
"wasm/zknym-lib",
"nym-gateway-probe",
# "nym-gateway-probe",
"integration-tests",
"common/nym-kkt-ciphersuite",
"common/nym-kkt-context",
"common/nym-kkt-ciphersuite", "common/nym-kkt-context",
]
default-members = [
@@ -191,8 +190,7 @@ default-members = [
"service-providers/ip-packet-router",
"service-providers/network-requester",
"tools/nymvisor",
"nym-registration-client",
"tools/internal/localnet-orchestrator"
"nym-registration-client"
]
exclude = ["contracts", "nym-wallet", "cpu-cycles"]
@@ -234,7 +232,6 @@ bloomfilter = "3.0.1"
bs58 = "0.5.1"
bytecodec = "0.4.15"
bytes = "1.11.1"
cargo-edit = "0.13.8"
cargo_metadata = "0.19.2"
celes = "2.6.0"
cfg-if = "1.0.0"
@@ -350,8 +347,8 @@ si-scale = "0.2.3"
snow = "0.9.6"
sphinx-packet = "=0.6.0"
sqlx = "0.8.6"
strum = "0.28.0"
strum_macros = "0.28.0"
strum = "0.27.2"
strum_macros = "0.27.2"
subtle-encoding = "0.5"
syn = "2"
sysinfo = "0.37.0"
@@ -378,7 +375,7 @@ tracing-opentelemetry = "0.32.1"
tracing-subscriber = "0.3.20"
tracing-indicatif = "0.3.9"
tracing-test = "0.2.5"
ts-rs = "12.0.1"
ts-rs = "10.1.0"
tungstenite = { version = "0.20.1", default-features = false }
typed-builder = "0.23.0"
uniffi = "0.29.2"
@@ -441,7 +438,6 @@ nym-ecash-time = { version = "1.20.4", path = "common/ecash-time" }
nym-exit-policy = { version = "1.20.4", path = "common/exit-policy" }
nym-ffi-shared = { version = "1.20.4", path = "sdk/ffi/shared" }
nym-gateway-client = { version = "1.20.4", path = "common/client-libs/gateway-client", default-features = false }
nym-gateway-probe = { version = "1.18.0", path = "nym-gateway-probe" }
nym-gateway-requests = { version = "1.20.4", path = "common/gateway-requests" }
nym-gateway-storage = { version = "1.20.4", path = "common/gateway-storage" }
nym-gateway-stats-storage = { version = "1.20.4", path = "common/gateway-stats-storage" }
-3
View File
@@ -30,11 +30,8 @@ client ───► Gateway ──┘ mix │ mix ┌─►mix ───►
```
<!-- This is broken
[![Build Status](https://img.shields.io/github/actions/workflow/status/nymtech/nym/build.yml?branch=develop&style=for-the-badge&logo=github-actions)](https://github.com/nymtech/nym/actions?query=branch%3Adevelop)
-->
> This project integrates with the Midnight Network
### Building
+13 -89
View File
@@ -1,38 +1,32 @@
---
ansible_ssh_private_key_file: ~/.ssh/<SSH_KEY>
# nym_version: "v2025.21-mozzarella"
#
# NOTE:
# if you want to pin Nym to a specific version instead of using the
# latest release from GitHub in /tasks/main.yml then
# uncomment the line above and set the tag
cli_url: "https://github.com/nymtech/nym/releases/download/nym-binaries-{{ nym_version }}/nym-cli"
tunnel_manager_url: "https://github.com/nymtech/nym/raw/refs/heads/develop/scripts/nym-node-setup/network-tunnel-manager.sh"
quic_bridge_deployment_url: "https://raw.githubusercontent.com/nymtech/nym/refs/heads/develop/scripts/nym-node-setup/quic_bridge_deployment.sh"
###############################################################################
## GLOBAL VARS
## These values will be used globally unless overwritten per node in inventory/all
###############################################################################
# NOTE: These values will be used globally unless overwritten per node in inventory/all
ansible_user: root # used for ssh, like `ssh root@nym-exit.ch-1.mynodes.net`
email: "<EMAIL>" # used in certbot, description.toml and landing page
website: "<WEBSITE>" # it is used in the description.toml
description: "<NODE_PUBLIC_DESCRIPTION>" # or define per node in inventory/all
# operator_name: "<OPERATOR_NAME>" # used in landing page if provided
###############################################################################
## GLOBAL VARS
## These values will be used globally unless overwritten per node in inventory/all
## Set these vars only if you want them globally for all nodes
## Per node changes in inventory/all will overwrite these global vars
###############################################################################
# NOTE: Set these vars if you want them globally for all nodes
# Per node changes in inventory/all will overwrite these global ones:
hostname: "" # this is a fallback, keep it and setup hostname per node in inventory/all
# moniker: "<MONIKER>" # if not setup here not in inventory/all it get's derived from the hostname
# mode: <MODE> # entry-gateway/exit-gateway/mixnode
# wireguard_enabled: <WIREGUARD_ENABLED> # true/false
hostname: "" # this is a fallback, keep it and setup hostname per node in inventory/all
###############################################################################
## GLOBAL PACKAGES
## These will be installed during deployment
###############################################################################
# NOTE: Possible vars to incule on landing page, etc.
# operator_name: "<OPERATOR_NAME>"
packages:
- tmux
@@ -48,73 +42,3 @@ packages:
- jq
- wget
- ufw
###############################################################################
## OPTIONAL OVERRIDES
## All values below already have defaults in the playbook/roles
## Uncomment only if you want to override them
###############################################################################
###############################################################################
## SYSTEM MAINTENANCE PLAYBOOK KNOBS
###############################################################################
# nym_version: "v2025.21-mozzarella"
## NOTE:
## if you want to pin Nym to a specific version instead of using the
## latest release from GitHub in /tasks/main.yml then
## uncomment the line above and set the tag
###############################################################################
## SYSTEM MAINTENANCE PLAYBOOK KNOBS
###############################################################################
## JOURNALD LIMITS
# journald_system_max_use: "100M" # max persistent journal size
# journald_runtime_max_use: "50M" # max runtime journal size
# journald_system_max_file_size: "25M" # max single journal file
# journald_runtime_max_file_size: "10M" # max runtime journal file
# journald_max_retention_sec: "3day" # retention time
# journald_rate_limit_interval: "30s" # rate limit window
# journald_rate_limit_burst: "1000" # rate limit burst
## NYM-NODE LOG CONTROL
# nymnode_log_level_max: "warning" # drop INFO logs
# nymnode_rate_limit_interval: "30s" # per nym-node rate limit window
# nymnode_rate_limit_burst: "200" # per nym-node rate limit burst
## JOURNAL VACUUM TARGETS
# journal_vacuum_size: "100M"
# journal_vacuum_time: "3days"
## RSYSLOG
# disable_rsyslog: true
## FSTRIM SCHEDULE
# fstrim_every_calendar: "*:0/15" # Aggressive
# fstrim_every_calendar: "hourly" # Less aggressive
## OPTIONAL CLEANUPS
# enable_apt_cleanup: true
# enable_snap_cleanup: true
## WRITEBACK TUNING
# enable_writeback_tuning: true
# writeback_dirty_writeback_centisecs: 1500
# writeback_dirty_expire_centisecs: 6000
@@ -1,38 +0,0 @@
---
- name: Restrict logging, vacuum journals, and enable periodic trim
hosts: all
become: true
gather_facts: false
# global knobs - override in inventory/group_vars/host_vars as needed
vars:
journald_system_max_use: "100M"
journald_runtime_max_use: "50M"
journald_system_max_file_size: "25M"
journald_runtime_max_file_size: "10M"
journald_max_retention_sec: "3day"
journald_rate_limit_interval: "30s"
journald_rate_limit_burst: "1000"
# per nym-node rate limit + level cap
nymnode_log_level_max: "warning"
nymnode_rate_limit_interval: "30s"
nymnode_rate_limit_burst: "200"
# journal vacuum targets
journal_vacuum_size: "100M"
journal_vacuum_time: "3days"
# fstrim cadence (note: the systemd override uses cron-like calendar)
fstrim_every_calendar: "*:0/15"
roles:
- role: journald_limits
- role: nymnode_logging
- role: rsyslog_disable
- role: journal_vacuum
- role: classic_log_cleanup
- role: apt_cleanup
- role: snap_cleanup
- role: fstrim_15min
- role: report
@@ -1,21 +0,0 @@
---
- name: Clean apt cache
command: apt-get clean
ignore_errors: true
- name: Autoremove unused packages
command: apt-get -y autoremove
ignore_errors: true
- name: Remove apt lists to reclaim space (they will be re-fetched on update)
file:
path: /var/lib/apt/lists
state: absent
ignore_errors: true
- name: Recreate apt lists directory
file:
path: /var/lib/apt/lists
state: directory
mode: "0755"
ignore_errors: true
@@ -1,20 +0,0 @@
---
- name: Remove classic /var/log files if present (optional)
file:
path: "{{ item }}"
state: absent
loop:
- /var/log/syslog
- /var/log/syslog.1
- /var/log/kern.log
- /var/log/kern.log.1
- /var/log/auth.log
- /var/log/auth.log.1
- /var/log/ufw.log
- /var/log/ufw.log.1
ignore_errors: true
# This is best-effort and may still fail if other packages' postrotate scripts assume services exist.
- name: Force logrotate (best-effort)
command: "logrotate --force /etc/logrotate.conf"
ignore_errors: true
@@ -1,3 +0,0 @@
---
fstrim_timer_dropin_dir: "/etc/systemd/system/fstrim.timer.d"
fstrim_every_calendar: "*:0/15"
@@ -1,31 +0,0 @@
---
- name: Ensure systemd drop-in dir for fstrim.timer exists
file:
path: "{{ fstrim_timer_dropin_dir }}"
state: directory
mode: "0755"
- name: Override fstrim.timer schedule
copy:
dest: "{{ fstrim_timer_dropin_dir }}/override.conf"
mode: "0644"
content: |
[Timer]
OnCalendar=
OnCalendar={{ fstrim_every_calendar }}
Persistent=true
RandomizedDelaySec=0
- name: Reload systemd after fstrim override
systemd:
daemon_reload: true
- name: Enable and start fstrim timer
systemd:
name: fstrim.timer
enabled: true
state: started
- name: Run fstrim now (best-effort)
command: fstrim -av
ignore_errors: true
@@ -1,3 +0,0 @@
---
journal_vacuum_size: "100M"
journal_vacuum_time: "3days"
@@ -1,6 +0,0 @@
---
- name: Vacuum journal to size cap (hard)
command: "journalctl --vacuum-size={{ journal_vacuum_size }}"
- name: Vacuum journal older than retention window (time)
command: "journalctl --vacuum-time={{ journal_vacuum_time }}"
@@ -1,8 +0,0 @@
---
journald_system_max_use: "100M"
journald_runtime_max_use: "50M"
journald_system_max_file_size: "25M"
journald_runtime_max_file_size: "10M"
journald_max_retention_sec: "3day"
journald_rate_limit_interval: "30s"
journald_rate_limit_burst: "1000"
@@ -1,5 +0,0 @@
---
- name: Restart journald
systemd:
name: systemd-journald
state: restarted
@@ -1,20 +0,0 @@
---
- name: Configure journald limits (persistent, capped, rate-limited)
copy:
dest: /etc/systemd/journald.conf
mode: "0644"
content: |
[Journal]
Storage=persistent
Compress=yes
Seal=yes
SystemMaxUse={{ journald_system_max_use }}
RuntimeMaxUse={{ journald_runtime_max_use }}
SystemMaxFileSize={{ journald_system_max_file_size }}
RuntimeMaxFileSize={{ journald_runtime_max_file_size }}
MaxRetentionSec={{ journald_max_retention_sec }}
RateLimitIntervalSec={{ journald_rate_limit_interval }}
RateLimitBurst={{ journald_rate_limit_burst }}
notify: Restart journald
@@ -1,7 +0,0 @@
---
nymnode_log_level_max: "warning"
nymnode_rate_limit_interval: "30s"
nymnode_rate_limit_burst: "200"
nymnode_unit_name: "nym-node" # set to "nym-node.service" if your distro expects it
nymnode_dropin_dir: "/etc/systemd/system/nym-node.service.d"
nymnode_dropin_file: "10-logging.conf"
@@ -1,26 +0,0 @@
---
- name: Ensure systemd drop-in dir for nym-node exists
file:
path: "{{ nymnode_dropin_dir }}"
state: directory
mode: "0755"
- name: Cap nym-node logs + apply per-unit rate limiting
copy:
dest: "{{ nymnode_dropin_dir }}/{{ nymnode_dropin_file }}"
mode: "0644"
content: |
[Service]
LogLevelMax={{ nymnode_log_level_max }}
LogRateLimitIntervalSec={{ nymnode_rate_limit_interval }}
LogRateLimitBurst={{ nymnode_rate_limit_burst }}
- name: Reload systemd after nym-node drop-in
systemd:
daemon_reload: true
- name: Restart nym-node to apply new logging limits (best-effort)
systemd:
name: "{{ nymnode_unit_name }}"
state: restarted
ignore_errors: true
@@ -1,8 +0,0 @@
---
- name: Show journal disk usage
command: journalctl --disk-usage
register: journal_usage
changed_when: false
- debug:
var: journal_usage.stdout
@@ -1,13 +0,0 @@
---
- name: Stop/disable rsyslog if installed (best-effort)
systemd:
name: rsyslog
state: stopped
enabled: false
ignore_errors: true
- name: Remove rsyslog logrotate stanza if present (prevents logrotate failures)
file:
path: /etc/logrotate.d/rsyslog
state: absent
ignore_errors: true
@@ -1,10 +0,0 @@
---
- name: Remove disabled snap revisions (best-effort)
shell: |
set -euo pipefail
snap list --all | awk '/disabled/{print $1, $3}' | while read -r name rev; do
snap remove "$name" --revision="$rev" || true
done
args:
executable: /bin/bash
ignore_errors: true
+1 -1
View File
@@ -2,7 +2,7 @@
name = "nym-client-core"
version.workspace = true
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
edition = "2024"
edition = "2021"
rust-version = "1.85"
license.workspace = true
description = "Crate containing core client functionality and configs, used by all other Nym client implentations"
@@ -32,7 +32,6 @@ const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
const DEFAULT_MAX_STARTUP_GATEWAY_WAITING_PERIOD: Duration = Duration::from_secs(70 * 60); // 70min -> full epoch (1h) + a bit of overhead
const DEFAULT_MAX_STARTUP_TOPOLOGY_WAITING_PERIOD: Duration = Duration::from_secs(70 * 60); // 70min -> full epoch (1h) + a bit of overhead
// Set this to a high value for now, so that we don't risk sporadic timeouts that might cause
// bought bandwidth tokens to not have time to be spent; Once we remove the gateway from the
@@ -556,11 +555,6 @@ pub struct Topology {
#[serde(with = "humantime_serde")]
pub max_startup_gateway_waiting_period: Duration,
/// Defines how long the client is going to wait on startup for minimal topology to become online,
/// before abandoning the procedure.
#[serde(with = "humantime_serde")]
pub max_startup_network_waiting_period: Duration,
/// Specifies a minimum performance of a mixnode that is used on route construction.
/// This setting is only applicable when `NymApi` topology is used.
pub minimum_mixnode_performance: u8,
@@ -589,7 +583,6 @@ impl Default for Topology {
topology_resolution_timeout: DEFAULT_TOPOLOGY_RESOLUTION_TIMEOUT,
disable_refreshing: false,
max_startup_gateway_waiting_period: DEFAULT_MAX_STARTUP_GATEWAY_WAITING_PERIOD,
max_startup_network_waiting_period: DEFAULT_MAX_STARTUP_TOPOLOGY_WAITING_PERIOD,
minimum_mixnode_performance: DEFAULT_MIN_MIXNODE_PERFORMANCE,
minimum_gateway_performance: DEFAULT_MIN_GATEWAY_PERFORMANCE,
use_extended_topology: false,
@@ -159,7 +159,6 @@ impl From<ConfigV6> for Config {
use_extended_topology: value.debug.topology.use_extended_topology,
ignore_egress_epoch_role: value.debug.topology.ignore_egress_epoch_role,
ignore_ingress_epoch_role: value.debug.topology.ignore_ingress_epoch_role,
..Default::default()
},
reply_surbs: ReplySurbs {
minimum_reply_surb_storage_threshold: value
@@ -160,10 +160,7 @@ where
)
.await?;
} else {
info!(
"registered with new gateway {} (under address {address}), but this will not be our default address",
gateway_details.gateway_id
);
info!("registered with new gateway {} (under address {address}), but this will not be our default address", gateway_details.gateway_id);
}
Ok(GatewayInfo {
@@ -4,13 +4,13 @@
use super::mix_traffic::ClientRequestSender;
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
use crate::client::event_control::EventControl;
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
use crate::client::real_messages_control;
@@ -52,12 +52,12 @@ use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
use nym_statistics_common::clients::ClientStatsSender;
use nym_statistics_common::generate_client_stats_id;
use nym_task::ShutdownTracker;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_topology::HardcodedTopologyProvider;
use nym_task::ShutdownTracker;
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::nym_api::NymApiClientExt;
use nym_validator_client::{UserAgent, nyxd::contract_traits::DkgQueryClient};
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use rand::prelude::SliceRandom;
use rand::rngs::OsRng;
use rand::thread_rng;
@@ -220,7 +220,6 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
nym_api_urls: Option<Vec<nym_network_defaults::ApiUrl>>,
wait_for_gateway: bool,
wait_for_initial_topology: bool,
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
shutdown: Option<ShutdownTracker>,
@@ -251,7 +250,6 @@ where
dkg_query_client,
nym_api_urls: None,
wait_for_gateway: false,
wait_for_initial_topology: false,
custom_topology_provider: None,
custom_gateway_transceiver: None,
shutdown: None,
@@ -307,12 +305,6 @@ where
self
}
#[must_use]
pub fn with_wait_for_initial_topology(mut self, wait_for_initial_topology: bool) -> Self {
self.wait_for_initial_topology = wait_for_initial_topology;
self
}
#[must_use]
pub fn with_topology_provider(
mut self,
@@ -682,7 +674,6 @@ where
topology_accessor: TopologyAccessor,
local_gateway: NodeIdentity,
wait_for_gateway: bool,
wait_for_initial_topology: bool,
shutdown_tracker: &ShutdownTracker,
) -> Result<(), ClientCoreError> {
let topology_refresher_config =
@@ -703,46 +694,6 @@ where
tracing::info!("Obtaining initial network topology");
topology_refresher.try_refresh().await;
// 1. wait for the minimum topology (if applicable)
if topology_refresher
.ensure_topology_is_routable()
.await
.is_err()
&& wait_for_initial_topology
{
if let Err(err) = topology_refresher
.wait_for_initial_network(topology_config.max_startup_network_waiting_period)
.await
{
tracing::error!(
"the network did not come become online within the specified timeout: {err}"
);
return Err(err.into());
}
}
// 2. wait for our gateway (if applicable)
if topology_refresher
.ensure_contains_routable_egress(local_gateway)
.await
.is_err()
&& wait_for_gateway
{
if let Err(err) = topology_refresher
.wait_for_gateway(
local_gateway,
topology_config.max_startup_gateway_waiting_period,
)
.await
{
tracing::error!(
"the gateway did not come back online within the specified timeout: {err}"
);
return Err(err.into());
}
}
// 3. check if the topology is routable (in case we were NOT waiting for it)
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
tracing::error!(
"The current network topology seem to be insufficient to route any packets through \
@@ -751,15 +702,30 @@ where
return Err(ClientCoreError::InsufficientNetworkTopology(err));
}
// 4. check if the gateway exists (in case we were NOT waiting for it)
let gateway_wait_timeout = if wait_for_gateway {
Some(topology_config.max_startup_gateway_waiting_period)
} else {
None
};
if let Err(err) = topology_refresher
.ensure_contains_routable_egress(local_gateway)
.await
{
tracing::error!(
"the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}"
);
return Err(err.into());
if let Some(waiting_timeout) = gateway_wait_timeout {
if let Err(err) = topology_refresher
.wait_for_gateway(local_gateway, waiting_timeout)
.await
{
tracing::error!(
"the gateway did not come back online within the specified timeout: {err}"
);
return Err(err.into());
}
} else {
tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
return Err(err.into());
}
}
if !topology_config.disable_refreshing {
@@ -1058,7 +1024,6 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
self.wait_for_initial_topology,
&shutdown_tracker.clone(),
)
.await?;
@@ -1230,11 +1195,9 @@ mod tests {
]);
assert_eq!(network_details.nym_api_urls.as_ref().unwrap().len(), 2);
assert!(
network_details.nym_api_urls.as_ref().unwrap()[1]
.front_hosts
.is_some()
);
assert!(network_details.nym_api_urls.as_ref().unwrap()[1]
.front_hosts
.is_some());
}
#[test]
@@ -1247,13 +1210,11 @@ mod tests {
assert_eq!(api_url.url, "https://nym-frontdoor.vercel.app/api/");
assert_eq!(api_url.front_hosts.as_ref().unwrap().len(), 2);
assert!(
api_url
.front_hosts
.as_ref()
.unwrap()
.contains(&"vercel.app".to_string())
);
assert!(api_url
.front_hosts
.as_ref()
.unwrap()
.contains(&"vercel.app".to_string()));
}
#[test]
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::{
client::replies::reply_storage::{CombinedReplyStorage, ReplyStorageBackend, fs_backend},
client::replies::reply_storage::{fs_backend, CombinedReplyStorage, ReplyStorageBackend},
config,
config::Config,
error::ClientCoreError,
@@ -10,7 +10,7 @@ use crate::{
use nym_bandwidth_controller::BandwidthController;
use nym_client_core_gateways_storage::OnDiskGatewaysDetails;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_validator_client::{QueryHttpRpcNyxdClient, nyxd};
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient};
use std::{io, path::Path};
use time::OffsetDateTime;
use tracing::{error, info, trace};
@@ -24,9 +24,7 @@ async fn setup_fresh_backend<P: AsRef<Path>>(
let mut storage_backend = match fs_backend::Backend::init(db_path).await {
Ok(backend) => backend,
Err(err) => {
error!(
"setup_fresh_backend: Failed to setup persistent storage backend for our reply needs: {err}"
);
error!("setup_fresh_backend: Failed to setup persistent storage backend for our reply needs: {err}");
return Err(ClientCoreError::SurbStorageError {
source: Box::new(err),
});
@@ -95,9 +93,7 @@ pub async fn setup_fs_reply_surb_backend<P: AsRef<Path>>(
match fs_backend::Backend::try_load(db_path).await {
Ok(backend) => Ok(backend),
Err(err) => {
error!(
"setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future"
);
error!("setup_fs_reply_surb_backend: Failed to setup persistent storage backend for our reply needs: {err}. We're going to create a fresh database instead. This behaviour might change in the future");
archive_corrupted_database(db_path).await?;
setup_fresh_backend(db_path, surb_config).await
}
@@ -1,8 +1,8 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::error::ClientCoreError;
use nym_client_core_gateways_storage::{
ActiveGateway, GatewayPublishedData, GatewayRegistration, GatewaysDetailsStore,
@@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
pub mod v1_1_33 {
use crate::config::disk_persistence::CommonClientPaths;
use crate::config::disk_persistence::old_v1_1_33::CommonClientPathsV1_1_33;
use crate::config::disk_persistence::CommonClientPaths;
use crate::config::old_config_v1_1_33::OldGatewayEndpointConfigV1_1_33;
use crate::error::ClientCoreError;
@@ -11,8 +11,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use rand::{CryptoRng, Rng, rngs::OsRng};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -20,10 +20,10 @@ use tokio::sync::mpsc::error::TrySendError;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{Sleep, sleep};
use tokio::time::{sleep, Sleep};
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::{Sleep, sleep};
use wasmtimer::tokio::{sleep, Sleep};
pub struct LoopCoverTrafficStream<R>
where
@@ -179,9 +179,7 @@ impl LoopCoverTrafficStream<OsRng> {
) {
Ok(topology) => topology,
Err(err) => {
warn!(
"We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}"
);
warn!("We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}");
return;
}
};
@@ -13,10 +13,10 @@ use crate::config::disk_persistence::ClientKeysPaths;
#[cfg(not(target_arch = "wasm32"))]
use nym_crypto::asymmetric::{ed25519, x25519};
#[cfg(not(target_arch = "wasm32"))]
use nym_pemstore::KeyPairPath;
#[cfg(not(target_arch = "wasm32"))]
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
#[cfg(not(target_arch = "wasm32"))]
use nym_pemstore::KeyPairPath;
#[cfg(not(target_arch = "wasm32"))]
use nym_sphinx::acknowledgements::AckKey;
// we have to define it as an async trait since wasm storage is async
@@ -4,8 +4,8 @@
use async_trait::async_trait;
use nym_credential_storage::storage::Storage as CredentialStorage;
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::GatewayClient;
use nym_gateway_client::error::GatewayClientError;
use nym_gateway_client::GatewayClient;
pub use nym_gateway_client::{GatewayPacketRouter, PacketRouter};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
@@ -2,13 +2,13 @@
// SPDX-License-Identifier: Apache-2.0
use super::action_controller::{AckActionSender, Action};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use futures::StreamExt;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::{
acknowledgements::{AckKey, identifier::recover_identifier},
chunking::fragment::{COVER_FRAG_ID, FragmentIdentifier},
acknowledgements::{identifier::recover_identifier, AckKey},
chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID},
};
use nym_task::ShutdownToken;
use std::sync::Arc;
@@ -3,11 +3,11 @@
use super::PendingAcknowledgement;
use crate::client::real_messages_control::acknowledgement_control::RetransmissionRequestSender;
use futures::StreamExt;
use futures::channel::mpsc;
use futures::StreamExt;
use nym_nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, QueueKey};
use nym_sphinx::Delay as SphinxDelay;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_sphinx::Delay as SphinxDelay;
use nym_task::ShutdownToken;
use std::collections::HashMap;
use std::sync::Arc;
@@ -9,8 +9,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketType;
use nym_task::ShutdownToken;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use rand::{CryptoRng, Rng};
use tracing::*;
@@ -16,10 +16,10 @@ use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::{
Delay as SphinxDelay,
acknowledgements::AckKey,
addressing::clients::Recipient,
chunking::fragment::{Fragment, FragmentIdentifier},
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use rand::{CryptoRng, Rng};
@@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use super::{
PendingAcknowledgement, RetransmissionRequestReceiver,
action_controller::{AckActionSender, Action},
PendingAcknowledgement, RetransmissionRequestReceiver,
};
use crate::client::real_messages_control::acknowledgement_control::PacketDestination;
use crate::client::real_messages_control::message_handler::{MessageHandler, PreparationError};
@@ -13,7 +13,7 @@ use futures::StreamExt;
use nym_sphinx::chunking::fragment::Fragment;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::{addressing::clients::Recipient, params::PacketType};
use nym_task::{ShutdownToken, connections::TransmissionLane};
use nym_task::{connections::TransmissionLane, ShutdownToken};
use rand::{CryptoRng, Rng};
use std::sync::{Arc, Weak};
use tracing::*;
@@ -1,10 +1,10 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use super::SentPacketNotificationReceiver;
use super::action_controller::{AckActionSender, Action};
use super::SentPacketNotificationReceiver;
use futures::StreamExt;
use nym_sphinx::chunking::fragment::{COVER_FRAG_ID, FragmentIdentifier};
use nym_sphinx::chunking::fragment::{FragmentIdentifier, COVER_FRAG_ID};
use tracing::*;
/// Module responsible for starting up retransmission timers.
@@ -10,17 +10,17 @@ use crate::client::replies::reply_controller::MaxRetransmissions;
use crate::client::replies::reply_storage::{ReceivedReplySurbsMap, SentReplyKeys, UsedSenderTags};
use crate::client::topology_control::{TopologyAccessor, TopologyReadPermit};
use nym_client_core_surb_storage::RetrievedReplySurb;
use nym_sphinx::Delay;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::anonymous_replies::requests::{AnonymousSenderTag, RepliableMessage, ReplyMessage};
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::chunking::fragment::{Fragment, FragmentIdentifier};
use nym_sphinx::message::NymMessage;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::preparer::{MessagePreparer, PreparedFragment};
use nym_task::ShutdownToken;
use nym_sphinx::Delay;
use nym_task::connections::TransmissionLane;
use nym_task::ShutdownToken;
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, Rng};
use std::collections::HashMap;
@@ -272,9 +272,7 @@ where
let primary_count = msg.required_packets(self.config.primary_packet_size);
let secondary_count = msg.required_packets(secondary_packet);
trace!(
"This message would require: {primary_count} primary packets or {secondary_count} secondary packets..."
);
trace!("This message would require: {primary_count} primary packets or {secondary_count} secondary packets...");
// if there would be no benefit in using the secondary packet - use the primary (duh)
if primary_count <= secondary_count {
trace!("so choosing primary for this message");
@@ -25,9 +25,9 @@ use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::ShutdownToken;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use rand::{CryptoRng, Rng, rngs::OsRng};
use nym_task::ShutdownToken;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
@@ -17,11 +17,11 @@ use nym_sphinx::forwarding::packet::MixPacket;
use nym_sphinx::params::PacketSize;
use nym_sphinx::preparer::PreparedFragment;
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use nym_task::ShutdownToken;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::connections::{
ConnectionCommand, ConnectionCommandReceiver, ConnectionId, LaneQueueLengths, TransmissionLane,
};
use nym_task::ShutdownToken;
use rand::{CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
@@ -29,11 +29,11 @@ use std::time::Duration;
use tracing::*;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{Sleep, sleep};
use tokio::time::{sleep, Sleep};
// use nym_wasm_utils::console_log;
#[cfg(target_arch = "wasm32")]
use wasmtimer::tokio::{Sleep, sleep};
use wasmtimer::tokio::{sleep, Sleep};
mod sending_delay_controller;
/// Configurable parameters of the `OutQueueControl`
@@ -230,9 +230,7 @@ where
let (next_message, fragment_id, packet_size) = match next_message {
StreamMessage::Cover => {
let cover_traffic_packet_size = self.loop_cover_message_size();
trace!(
"the next loop cover message will be put in a {cover_traffic_packet_size} packet"
);
trace!("the next loop cover message will be put in a {cover_traffic_packet_size} packet");
// TODO for way down the line: in very rare cases (during topology update) we might have
// to wait a really tiny bit before actually obtaining the permit hence messing with our
@@ -246,9 +244,7 @@ where
) {
Ok(topology) => topology,
Err(err) => {
warn!(
"We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}"
);
warn!("We're not going to send any loop cover message this time, as the current topology seem to be invalid - {err}");
return;
}
};
@@ -440,7 +436,7 @@ where
}
}
if let Some(next_delay) = &mut self.next_delay {
if let Some(ref mut next_delay) = &mut self.next_delay {
// it is not yet time to return a message
if next_delay.as_mut().poll(cx).is_pending() {
return Poll::Pending;
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::helpers::{Instant, get_time_now};
use crate::client::helpers::{get_time_now, Instant};
use std::time::Duration;
// The minimum time between increasing the average delay between packets. If we hit the ceiling in
@@ -5,20 +5,20 @@ use crate::client::helpers::get_time_now;
use crate::client::replies::{
reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
};
use futures::StreamExt;
use futures::channel::mpsc;
use futures::lock::Mutex;
use nym_crypto::Digest;
use futures::StreamExt;
use nym_crypto::asymmetric::x25519;
use nym_crypto::Digest;
use nym_gateway_client::MixnetMessageReceiver;
use nym_sphinx::anonymous_replies::requests::{
RepliableMessage, RepliableMessageContent, ReplyMessage, ReplyMessageContent,
};
use nym_sphinx::anonymous_replies::{SurbEncryptionKey, encryption_key::EncryptionKeyDigest};
use nym_sphinx::anonymous_replies::{encryption_key::EncryptionKeyDigest, SurbEncryptionKey};
use nym_sphinx::message::{NymMessage, PlainMessage};
use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
use nym_statistics_common::clients::{ClientStatsSender, packet_statistics::PacketStatisticsEvent};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use nym_task::ShutdownToken;
use std::collections::HashSet;
use std::sync::Arc;
@@ -78,19 +78,14 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
let fragment = match self.message_receiver.recover_fragment(fragment_data) {
Err(err) => {
warn!(
"failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!"
);
warn!("failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!");
return None;
}
Ok(frag) => frag,
};
if self.recently_reconstructed.contains(&fragment.id()) {
debug!(
"Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost",
fragment.id()
);
debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
return None;
}
@@ -98,9 +93,7 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
match self.message_receiver.insert_new_fragment(fragment) {
Err(err) => match err {
MessageRecoveryError::MalformedReconstructedMessage { source, used_sets } => {
error!(
"message reconstruction failed - {source}. Attempting to re-use the message sets..."
);
error!("message reconstruction failed - {source}. Attempting to re-use the message sets...");
// TODO: should we really insert reconstructed sets? could this be abused for some attack?
for set_id in used_sets {
if !self.recently_reconstructed.insert(set_id) {
@@ -151,9 +144,7 @@ impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
&mut raw_fragment,
) {
Err(err) => {
warn!(
"failed to recover fragment data: {err}. The whole underlying message might be corrupted and unrecoverable!"
);
warn!("failed to recover fragment data: {err}. The whole underlying message might be corrupted and unrecoverable!");
return None;
}
Ok(frag_data) => frag_data,
@@ -284,9 +275,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
RepliableMessageContent::Heartbeat(content) => {
let additional_reply_surbs = content.additional_reply_surbs;
error!(
"received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)"
);
error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
(additional_reply_surbs, false)
}
RepliableMessageContent::DataV2(content) => {
@@ -315,9 +304,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
}
RepliableMessageContent::HeartbeatV2(content) => {
let additional_reply_surbs = content.additional_reply_surbs;
error!(
"received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)"
);
error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
(additional_reply_surbs, false)
}
};
@@ -393,9 +380,7 @@ impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
if let Some(sender) = &inner_guard.message_sender {
trace!("Sending reconstructed messages to announced sender");
if let Err(err) = sender.unbounded_send(reconstructed_messages) {
warn!(
"The reconstructed message receiver went offline without explicit notification (relevant error: - {err})"
);
warn!("The reconstructed message receiver went offline without explicit notification (relevant error: - {err})");
inner_guard.message_sender = None;
inner_guard.messages.extend(err.into_inner());
}
@@ -5,15 +5,15 @@ use crate::client::real_messages_control::acknowledgement_control::PendingAcknow
use crate::client::real_messages_control::message_handler::{
FragmentWithMaxRetransmissions, MessageHandler, PreparationError,
};
use crate::client::replies::reply_controller::Config;
use crate::client::replies::reply_controller::key_rotation_helpers::SurbRefreshState;
use crate::client::replies::reply_controller::Config;
use crate::client::topology_control::TopologyAccessor;
use crate::client::transmission_buffer::TransmissionBuffer;
use futures::channel::oneshot;
use nym_client_core_surb_storage::{ReceivedReplySurb, ReceivedReplySurbsMap};
use nym_crypto::aes::cipher::crypto_common::rand_core::CryptoRng;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::chunking::fragment::FragmentIdentifier;
use nym_task::connections::{ConnectionId, TransmissionLane};
use nym_topology::NymTopologyMetadata;
@@ -50,9 +50,7 @@ impl SenderData {
let pending_retransmissions = self.pending_retransmissions.len();
let total_pending = pending_retransmissions + pending_replies;
debug!(
"total queue size: {total_pending} = pending data {pending_replies} + pending retransmission {pending_retransmissions}"
);
debug!("total queue size: {total_pending} = pending data {pending_replies} + pending retransmission {pending_retransmissions}");
total_pending
}
@@ -202,9 +200,7 @@ where
let total_required_surbs = total_queue + target_surbs_after_clearing_queue;
let total_available_surbs = pending_surbs + available_surbs;
debug!(
"available surbs: {available_surbs} pending surbs: {pending_surbs} threshold range: {min_surbs_threshold}..+{min_surbs_threshold_buffer}..{max_surbs_threshold}"
);
debug!("available surbs: {available_surbs} pending surbs: {pending_surbs} threshold range: {min_surbs_threshold}..+{min_surbs_threshold_buffer}..{max_surbs_threshold}");
// We should request more surbs if:
// 1. We haven't hit the maximum surb threshold, and
@@ -229,13 +225,9 @@ where
.is_none()
{
// don't report it every single time
warn!(
"received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!"
);
warn!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
} else {
trace!(
"received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!"
);
trace!("received reply request for {recipient_tag} but we don't have any surbs stored for that recipient!");
}
return;
}
@@ -391,9 +383,7 @@ where
let (surbs_for_reply, _) = self.surbs_storage.get_reply_surbs(&target, to_take.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!(
"somehow different task has stolen our reply surbs! - this should have been impossible"
);
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
self.re_insert_pending_retransmission(&target, to_take);
return;
};
@@ -469,9 +459,7 @@ where
.get_reply_surbs(&target, to_send_clone.len());
let Some(surbs_for_reply) = surbs_for_reply else {
error!(
"somehow different task has stolen our reply surbs! - this should have been impossible"
);
error!("somehow different task has stolen our reply surbs! - this should have been impossible");
self.re_insert_pending_replies(&target, to_send);
return;
};
@@ -555,9 +543,7 @@ where
let ack_ref = match timed_out_ack.upgrade() {
Some(ack) => ack,
None => {
debug!(
"we received the ack for one of the reply packets as we were putting it in the retransmission queue"
);
debug!("we received the ack for one of the reply packets as we were putting it in the retransmission queue");
return;
}
};
@@ -671,13 +657,9 @@ where
// only log at higher level if it's the first time this error has occurred in a while
if now - last_failure > time::Duration::seconds(30) {
warn!(
"failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}"
)
warn!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
} else {
debug!(
"failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}"
)
debug!("failed to request more surbs to clear pending queue of size {total_queue} (attempted to request: {request_size}): {err}")
}
}
}
@@ -699,10 +681,7 @@ where
.surbs_storage
.surbs_last_received_at(pending_reply_target)
else {
error!(
"we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!",
retransmission_buf.total_size()
);
error!("we have {} pending replies for {pending_reply_target}, but we somehow never received any reply surbs from them!", retransmission_buf.total_size());
to_remove.push(*pending_reply_target);
continue;
};
@@ -723,9 +702,7 @@ where
// if client is offline)
if vals.current_clear_rerequest_counter > max_rerequests {
to_remove.push(*pending_reply_target);
debug!(
"we have reached the maximum threshold of attempting to request surbs from {pending_reply_target}. dropping the sender"
);
debug!("we have reached the maximum threshold of attempting to request surbs from {pending_reply_target}. dropping the sender");
continue;
}
@@ -733,10 +710,7 @@ where
if diff > max_drop_wait {
to_remove.push(*pending_reply_target)
} else {
debug!(
"We haven't received any surbs in {} from {pending_reply_target}. Going to explicitly ask for more",
humantime::format_duration(diff.unsigned_abs())
);
debug!("We haven't received any surbs in {} from {pending_reply_target}. Going to explicitly ask for more", humantime::format_duration(diff.unsigned_abs()));
vals.increment_current_clear_rerequest_counter();
to_request.push(*pending_reply_target);
}
@@ -4,8 +4,8 @@
use crate::client::real_messages_control::acknowledgement_control::PendingAcknowledgement;
use futures::channel::{mpsc, oneshot};
use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag;
use nym_sphinx::anonymous_replies::ReplySurbWithKeyRotation;
use nym_task::connections::{ConnectionId, TransmissionLane};
use std::sync::Weak;
@@ -43,9 +43,7 @@ where
// 1. check whether we sent any surbs in the past to this recipient, otherwise
// they have no business in asking for more
if !self.tags_storage.exists(&recipient) {
warn!(
"{recipient} asked us for reply SURBs even though we never sent them any anonymous messages before!"
);
warn!("{recipient} asked us for reply SURBs even though we never sent them any anonymous messages before!");
return;
}
@@ -56,12 +54,7 @@ where
.reply_surbs
.maximum_allowed_reply_surb_request_size
{
warn!(
"The requested reply surb amount is larger than our maximum allowed ({amount} > {}). Lowering it to a more sane value...",
self.config
.reply_surbs
.maximum_allowed_reply_surb_request_size
);
warn!("The requested reply surb amount is larger than our maximum allowed ({amount} > {}). Lowering it to a more sane value...", self.config.reply_surbs.maximum_allowed_reply_surb_request_size);
amount = self
.config
.reply_surbs
@@ -23,7 +23,7 @@ use nym_sphinx::addressing::Recipient;
use nym_statistics_common::clients::{
ClientStatsController, ClientStatsReceiver, ClientStatsSender,
};
use nym_task::{ShutdownToken, ShutdownTracker, connections::TransmissionLane};
use nym_task::{connections::TransmissionLane, ShutdownToken, ShutdownTracker};
use std::time::Duration;
/// Time interval between reporting statistics locally (logging/shutdown_token)
@@ -5,8 +5,8 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_topology::{NymRouteProvider, NymTopology, NymTopologyError, NymTopologyMetadata};
use nym_validator_client::models::KeyRotationId;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::{Notify, RwLock, RwLockReadGuard};
#[derive(Debug)]
@@ -63,9 +63,7 @@ impl TopologyRefresher {
trace!("Refreshing the topology");
if self.topology_accessor.controlled_manually() {
info!(
"topology is being controlled manually - we're going to wait until the control is released..."
);
info!("topology is being controlled manually - we're going to wait until the control is released...");
self.topology_accessor
.wait_for_released_manual_control()
.await;
@@ -140,35 +138,6 @@ impl TopologyRefresher {
}
}
pub async fn wait_for_initial_network(
&mut self,
timeout_duration: Duration,
) -> Result<(), NymTopologyError> {
info!(
"going to wait for at most {timeout_duration:?} for initial network to become online"
);
let deadline = sleep(timeout_duration);
tokio::pin!(deadline);
loop {
tokio::select! {
_ = &mut deadline => {
return Err(NymTopologyError::TimedOutWaitingForTopology)
}
_ = self.try_refresh() => {
if let Err(err) = self.ensure_topology_is_routable().await {
info!("network is still not routable...: {err}");
} else {
return Ok(())
}
sleep(self.refresh_rate).await
}
}
}
}
// it's perfectly fine if task is interrupted mid-refresh
// there's no data to persist or send over
pub async fn run(&mut self) {
@@ -3,8 +3,8 @@
use async_trait::async_trait;
use nym_mixnet_contract_common::EpochRewardedSet;
use nym_topology::NymTopology;
use nym_topology::provider_trait::{ToTopologyMetadata, TopologyProvider};
use nym_topology::NymTopology;
use nym_validator_client::nym_api::NymApiClientExt;
use rand::prelude::SliceRandom;
use rand::thread_rng;
@@ -82,9 +82,7 @@ impl NymApiTopologyProvider {
fn use_next_nym_api(&mut self) {
if self.nym_api_urls.len() == 1 {
warn!(
"There's only a single nym API available - it won't be possible to use a different one"
);
warn!("There's only a single nym API available - it won't be possible to use a different one");
return;
}
@@ -157,10 +155,7 @@ impl NymApiTopologyProvider {
let mixnodes = mixnodes_res.nodes;
if !gateways_res.metadata.consistency_check(&metadata) {
warn!(
"inconsistent nodes metadata between mixnodes and gateways calls! {metadata:?} and {:?}",
gateways_res.metadata
);
warn!("inconsistent nodes metadata between mixnodes and gateways calls! {metadata:?} and {:?}", gateways_res.metadata);
return None;
}
@@ -1,11 +1,11 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::helpers::{Instant, get_time_now};
use crate::client::helpers::{get_time_now, Instant};
use crate::client::real_messages_control::real_traffic_stream::RealMessage;
use nym_sphinx::chunking::fragment::Fragment;
use nym_task::connections::TransmissionLane;
use rand::{Rng, seq::SliceRandom};
use rand::{seq::SliceRandom, Rng};
use std::{
collections::{HashMap, HashSet, VecDeque},
time::Duration,
+5 -13
View File
@@ -7,9 +7,9 @@ use nym_gateway_client::error::GatewayClientError;
use nym_task::RegistryAccessError;
use nym_topology::node::RoutingNodeError;
use nym_topology::{NodeId, NymTopologyError};
use nym_validator_client::ValidatorClientError;
use nym_validator_client::nym_api::error::NymAPIError;
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::ValidatorClientError;
use rand::distributions::WeightedError;
use std::error::Error;
use std::path::PathBuf;
@@ -56,9 +56,7 @@ pub enum ClientCoreError {
#[error("no gateways on network")]
NoGatewaysOnNetwork,
#[error(
"there are no more new gateways on the network - it seems this client has already registered with all nodes it could have"
)]
#[error("there are no more new gateways on the network - it seems this client has already registered with all nodes it could have")]
NoNewGatewaysAvailable,
#[error("list of nym apis is empty")]
@@ -129,9 +127,7 @@ pub enum ClientCoreError {
#[error("unexpected exit")]
UnexpectedExit,
#[error(
"this operation would have resulted in the gateway {gateway_id:?} key being overwritten without permission"
)]
#[error("this operation would have resulted in the gateway {gateway_id:?} key being overwritten without permission")]
ForbiddenGatewayKeyOverwrite { gateway_id: String },
#[error(
@@ -155,9 +151,7 @@ pub enum ClientCoreError {
#[error("attempted to obtain fresh gateway details whilst already knowing about one")]
UnexpectedGatewayDetails,
#[error(
"the provided gateway details (for gateway {gateway_id}) do not correspond to the shared keys"
)]
#[error("the provided gateway details (for gateway {gateway_id}) do not correspond to the shared keys")]
MismatchedGatewayDetails { gateway_id: String },
#[error("unable to upgrade config file from `{current_version}`")]
@@ -233,9 +227,7 @@ pub enum ClientCoreError {
source: url::ParseError,
},
#[error(
"this client (id: '{client_id}') has already been initialised before. If you want to add additional gateway, use `add-gateway` command"
)]
#[error("this client (id: '{client_id}') has already been initialised before. If you want to add additional gateway, use `add-gateway` command")]
AlreadyInitialised { client_id: String },
#[error("this client has already registered with gateway {gateway_id}")]
+5 -5
View File
@@ -5,13 +5,13 @@ use crate::error::ClientCoreError;
use crate::init::types::RegistrationResult;
use futures::{SinkExt, StreamExt};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::GatewayClient;
use nym_gateway_client::client::GatewayListeners;
use nym_gateway_client::GatewayClient;
use nym_topology::node::RoutingNode;
use nym_validator_client::UserAgent;
use nym_validator_client::client::{IdentityKeyRef, NymApiClientExt};
use nym_validator_client::nym_nodes::SkimmedNodesWithMetadata;
use rand::{Rng, seq::SliceRandom};
use nym_validator_client::UserAgent;
use rand::{seq::SliceRandom, Rng};
#[cfg(unix)]
use std::os::fd::RawFd;
use std::{sync::Arc, time::Duration};
@@ -28,10 +28,10 @@ use nym_wasm_utils::websocket::JSWebsocket;
#[cfg(not(target_arch = "wasm32"))]
use tokio::net::TcpStream;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
#[cfg(target_arch = "wasm32")]
use wasmtimer::std::Instant;
+1 -1
View File
@@ -7,8 +7,8 @@ use crate::client::base_client::storage::helpers::{
has_gateway_details, load_active_gateway_details, load_client_keys, load_gateway_details,
store_gateway_details, update_stored_published_data_gateway,
};
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::error::ClientCoreError;
use crate::init::helpers::{
choose_gateway_by_latency, get_specified_gateway, uniformly_random_gateway,
+2 -2
View File
@@ -1,8 +1,8 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::config::Config;
use crate::error::ClientCoreError;
use crate::init::{setup_gateway, use_loaded_gateway_details};
@@ -10,8 +10,8 @@ use nym_client_core_gateways_storage::{
GatewayRegistration, GatewaysDetailsStore, RemoteGatewayDetails,
};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::SharedSymmetricKey;
use nym_gateway_client::client::{GatewayListeners, InitGatewayClient};
use nym_gateway_client::SharedSymmetricKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_topology::node::RoutingNode;
use nym_validator_client::client::IdentityKey;
-1
View File
@@ -1,4 +1,3 @@
#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
use std::future::Future;
#[cfg(all(
@@ -1,7 +1,6 @@
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: use of deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::from_exact_iter`: please upgrade to generic-array 1.x - TODO
pub use backend::*;
pub use combined::CombinedReplyStorage;
pub use key_storage::SentReplyKeys;
@@ -130,7 +130,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
let req = QueryBalanceRequest {
address: address.to_string(),
denom: search_denom,
denom: search_denom.to_string(),
};
let res = self
@@ -199,18 +199,6 @@ impl NyxdClient<HttpClient, DirectSecp256k1HdWallet> {
let wallet = DirectSecp256k1HdWallet::checked_from_mnemonic(prefix, mnemonic)?;
Ok(Self::connect_with_signer(config, client, wallet))
}
pub fn connect_with_mnemonic_and_network_details<U>(
endpoint: U,
network_details: NymNetworkDetails,
mnemonic: bip39::Mnemonic,
) -> Result<DirectSigningHttpRpcNyxdClient, NyxdError>
where
U: TryInto<HttpClientUrl, Error = TendermintRpcError>,
{
let config = Config::try_from_nym_network_details(&network_details)?;
Self::connect_with_mnemonic(config, endpoint, mnemonic)
}
}
#[allow(deprecated)]
@@ -1,8 +1,6 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(clippy::derivable_impls)]
// MAX: surpressing warning for the moment, will be dealt with in a different PR (TODO)
use cosmwasm_schema::cw_serde;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
-5
View File
@@ -4,7 +4,6 @@
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use strum::IntoEnumIterator as _;
use thiserror::Error;
use time::{Date, OffsetDateTime};
@@ -316,10 +315,6 @@ impl TicketType {
_ => Err(UnknownTicketType),
}
}
pub fn exposed_iter() -> impl Iterator<Item = TicketType> {
TicketType::iter()
}
}
impl From<TicketType> for TicketTypeRepr {
-2
View File
@@ -1,8 +1,6 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `generic_array::GenericArray::<T, N>::from_exact_iter`: please upgrade to generic-array 1.x - TODO
#[cfg(feature = "asymmetric")]
pub mod asymmetric;
pub mod bech32_address_validation;
-1
View File
@@ -1,6 +1,5 @@
// Copyright 2020-2022 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
pub use nym_crypto::generic_array;
use nym_crypto::OutputSizeUser;
-3
View File
@@ -1,9 +1,6 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)]
// silences clippy warning: use of deprecated tuple variant `HttpClientError::GenericRequestFailure`: use another more strongly typed variant - this variant is only left for compatibility reasons - TODO
//! Nym HTTP API Client
//!
//! Centralizes and implements the core API client functionality. This crate provides custom,
+5 -11
View File
@@ -22,16 +22,6 @@ pub struct ChainDetails {
pub stake_denom: DenomDetailsOwned,
}
impl ChainDetails {
pub fn mainnet() -> Self {
ChainDetails {
bech32_account_prefix: mainnet::BECH32_PREFIX.into(),
mix_denom: mainnet::MIX_DENOM.into(),
stake_denom: mainnet::STAKE_DENOM.into(),
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize, JsonSchema)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct NymContracts {
@@ -191,7 +181,11 @@ impl NymNetworkDetails {
// Consider caching this process (lazy static)
NymNetworkDetails {
network_name: mainnet::NETWORK_NAME.into(),
chain_details: ChainDetails::mainnet(),
chain_details: ChainDetails {
bech32_account_prefix: mainnet::BECH32_PREFIX.into(),
mix_denom: mainnet::MIX_DENOM.into(),
stake_denom: mainnet::STAKE_DENOM.into(),
},
endpoints: mainnet::validators(),
contracts: NymContracts {
mixnet_contract_address: parse_optional_str(mainnet::MIXNET_CONTRACT_ADDRESS),
+9 -9
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::LpError;
use crate::packet::{EncryptedLpPacket, InnerHeader, LpFrame, LpHeader, LpPacket};
use crate::packet::{EncryptedLpPacket, InnerHeader, LpHeader, LpMessage, LpPacket};
use bytes::BytesMut;
use libcrux_psq::Channel;
@@ -46,9 +46,9 @@ pub(crate) fn encrypt_lp_packet(
packet: LpPacket,
transport: &mut libcrux_psq::session::Transport,
) -> Result<EncryptedLpPacket, LpError> {
let mut plaintext = BytesMut::with_capacity(InnerHeader::SIZE + packet.frame().len());
let mut plaintext = BytesMut::with_capacity(InnerHeader::SIZE + packet.message().len());
packet.header().inner.encode(&mut plaintext);
packet.frame().encode(&mut plaintext);
packet.message().encode(&mut plaintext);
let ciphertext = encrypt_data(plaintext.as_ref(), transport)?;
@@ -67,14 +67,14 @@ pub(crate) fn decrypt_lp_packet(
let inner_header = InnerHeader::parse(&plaintext)?;
let payload = &plaintext[InnerHeader::SIZE..];
let frame = LpFrame::decode(payload)?;
let message = LpMessage::decode(payload)?;
Ok(LpPacket::new(
LpHeader {
outer: packet.outer_header(),
inner: inner_header,
},
frame,
message,
))
}
@@ -82,7 +82,7 @@ pub(crate) fn decrypt_lp_packet(
mod tests {
use crate::LpError;
use crate::codec::{decrypt_data, decrypt_lp_packet, encrypt_data, encrypt_lp_packet};
use crate::packet::{EncryptedLpPacket, LpFrame, LpHeader, LpPacket};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::peer::mock_peers;
use crate::psq::initiator::{build_psq_ciphersuite, build_psq_principal};
use crate::psq::{PSQ_MSG2_SIZE, psq_msg1_size, responder};
@@ -261,7 +261,7 @@ mod tests {
// happy path
let packet = LpPacket::new(
LpHeader::new(123, 0, 1),
LpFrame::new_opaque(b"foomp".to_vec()),
LpMessage::new_opaque(b"foomp".to_vec()),
);
let ciphertext = encrypt_lp_packet(packet.clone(), &mut init_transport).unwrap();
@@ -273,7 +273,7 @@ mod tests {
// incomplete ciphertext
let packet = LpPacket::new(
LpHeader::new(123, 1, 1),
LpFrame::new_opaque(b"foomp".to_vec()),
LpMessage::new_opaque(b"foomp".to_vec()),
);
let ciphertext2 = encrypt_lp_packet(packet, &mut init_transport).unwrap();
let l = ciphertext2.ciphertext().len();
@@ -285,7 +285,7 @@ mod tests {
// too small buffer
let packet = LpPacket::new(
LpHeader::new(123, 1, 1),
LpFrame::new_opaque(b"foomp".to_vec()),
LpMessage::new_opaque(b"foomp".to_vec()),
);
let ciphertext3 = encrypt_lp_packet(packet, &mut resp_transport).unwrap();
let malformed = EncryptedLpPacket::new(ciphertext3.outer_header(), vec![]);
+4 -4
View File
@@ -11,8 +11,8 @@ pub enum MalformedLpPacketError {
#[error("provided insufficient data to fully deserialise the struct")]
InsufficientData,
#[error("{0} is not a valid LpFrameKind value")]
InvalidLpFrameKind(u16),
#[error("{0} is not a valid LpDataKind")]
InvalidLpDataKind(u16),
#[error("invalid payload size: expected {expected}, got {actual}")]
InvalidPayloadSize { expected: usize, actual: usize },
@@ -27,7 +27,7 @@ pub enum MalformedLpPacketError {
}
impl MalformedLpPacketError {
pub fn invalid_data_kind(frame_kind: u16) -> Self {
MalformedLpPacketError::InvalidLpFrameKind(frame_kind)
pub fn invalid_data_kind(message_type: u16) -> Self {
MalformedLpPacketError::InvalidLpDataKind(message_type)
}
}
@@ -7,32 +7,32 @@ use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
#[derive(Debug, Clone, PartialEq)]
pub struct LpFrameHeader {
pub kind: LpFrameKind,
pub frame_attributes: [u8; 14],
pub struct LpMessageHeader {
pub kind: LpMessageType,
pub message_attributes: [u8; 14],
}
impl LpFrameHeader {
impl LpMessageHeader {
pub const SIZE: usize = 16; // message_kind(2) + message_attributes(14)
pub fn new(kind: LpFrameKind, frame_attributes: [u8; 14]) -> Self {
pub fn new(kind: LpMessageType, message_attributes: [u8; 14]) -> Self {
Self {
kind,
frame_attributes,
message_attributes,
}
}
pub fn new_no_attributes(kind: LpFrameKind) -> Self {
pub fn new_no_attributes(kind: LpMessageType) -> Self {
Self {
kind,
frame_attributes: [0; 14],
message_attributes: [0; 14],
}
}
/// Encode directly into a BytesMut buffer
pub fn encode(&self, dst: &mut BytesMut) {
dst.put_u16_le(self.kind as u16);
dst.put_slice(&self.frame_attributes);
dst.put_slice(&self.message_attributes);
}
pub fn parse(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
@@ -41,35 +41,35 @@ impl LpFrameHeader {
}
let raw_kind = u16::from_le_bytes([src[0], src[1]]);
let kind = LpFrameKind::try_from(raw_kind)
let kind = LpMessageType::try_from(raw_kind)
.map_err(|_| MalformedLpPacketError::invalid_data_kind(raw_kind))?;
#[allow(clippy::unwrap_used)]
let message_attributes = src[2..16].try_into().unwrap();
Ok(Self {
kind,
frame_attributes: message_attributes,
message_attributes,
})
}
}
/// Represent application data being sent in Transport mode
#[derive(Debug, Clone, PartialEq)]
pub struct LpFrame {
pub header: LpFrameHeader,
pub struct LpMessage {
pub header: LpMessageHeader,
pub content: Bytes,
}
impl AsRef<[u8]> for LpFrame {
impl AsRef<[u8]> for LpMessage {
fn as_ref(&self) -> &[u8] {
&self.content
}
}
impl LpFrame {
pub fn new(kind: LpFrameKind, content: impl Into<Bytes>) -> Self {
impl LpMessage {
pub fn new(kind: LpMessageType, content: impl Into<Bytes>) -> Self {
Self {
header: LpFrameHeader::new_no_attributes(kind),
header: LpMessageHeader::new_no_attributes(kind),
content: content.into(),
}
}
@@ -81,103 +81,40 @@ impl LpFrame {
}
pub fn decode(src: &[u8]) -> Result<Self, MalformedLpPacketError> {
let header = LpFrameHeader::parse(src)?;
let content = src[LpFrameHeader::SIZE..].to_vec().into();
let header = LpMessageHeader::parse(src)?;
let content = src[LpMessageHeader::SIZE..].to_vec().into();
Ok(Self { header, content })
}
pub fn kind(&self) -> LpFrameKind {
pub fn kind(&self) -> LpMessageType {
self.header.kind
}
pub fn new_opaque(content: impl Into<Bytes>) -> Self {
Self::new(LpFrameKind::Opaque, content)
Self::new(LpMessageType::Opaque, content)
}
pub fn new_registration(data: impl Into<Bytes>) -> Self {
Self::new(LpFrameKind::Registration, data)
Self::new(LpMessageType::Registration, data)
}
pub fn new_forward(data: impl Into<Bytes>) -> Self {
Self::new(LpFrameKind::Forward, data)
}
pub fn new_stream(attrs: StreamFrameAttributes, content: impl Into<Bytes>) -> Self {
Self {
header: LpFrameHeader::new(LpFrameKind::Stream, attrs.encode()),
content: content.into(),
}
Self::new(LpMessageType::Forward, data)
}
pub(crate) fn len(&self) -> usize {
LpFrameHeader::SIZE + self.content.len()
LpMessageHeader::SIZE + self.content.len()
}
}
/// Represent kind of application data being sent in Transport mode
#[derive(Clone, Copy, PartialEq, Eq, Debug, IntoPrimitive, TryFromPrimitive)]
#[repr(u16)]
pub enum LpFrameKind {
pub enum LpMessageType {
Opaque = 0,
Registration = 1,
Forward = 2,
Stream = 3,
}
/// Message type within a `LpFrameKind::Stream` frame.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StreamMsgType {
/// Open a new stream. Content is optional initial data.
Open = 0,
/// Data on an existing stream.
Data = 1,
}
/// Parsed form of the 14-byte `frame_attributes` for `LpFrameKind::Stream`.
///
/// Wire layout (big-endian):
/// ```text
/// [0..8 ) stream_id : u64
/// [8 ) msg_type : u8 (0 = Open, 1 = Data)
/// [9..13) sequence_num : u32
/// [13 ) reserved : u8
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamFrameAttributes {
pub stream_id: u64,
pub msg_type: StreamMsgType,
pub sequence_num: u32,
}
impl StreamFrameAttributes {
pub fn encode(&self) -> [u8; 14] {
let mut buf = [0u8; 14];
buf[0..8].copy_from_slice(&self.stream_id.to_be_bytes());
buf[8] = self.msg_type as u8;
buf[9..13].copy_from_slice(&self.sequence_num.to_be_bytes());
buf
}
pub fn parse(attrs: &[u8; 14]) -> Result<Self, MalformedLpPacketError> {
let stream_id = u64::from_be_bytes(attrs[0..8].try_into().unwrap());
let msg_type = match attrs[8] {
0 => StreamMsgType::Open,
1 => StreamMsgType::Data,
other => {
return Err(MalformedLpPacketError::DeserialisationFailure(format!(
"invalid stream msg_type: {other}"
)));
}
};
let sequence_num = u32::from_be_bytes(attrs[9..13].try_into().unwrap());
Ok(Self {
stream_id,
msg_type,
sequence_num,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
+10 -10
View File
@@ -6,12 +6,12 @@ use bytes::{BufMut, BytesMut};
use std::fmt::{Debug, Formatter};
pub use error::MalformedLpPacketError;
pub use frame::{ForwardPacketData, LpFrame};
pub use header::{InnerHeader, LpHeader, OuterHeader};
pub use message::{ForwardPacketData, LpMessage};
pub mod error;
pub mod frame;
pub mod header;
pub mod message;
pub mod replay;
pub mod utils;
@@ -81,7 +81,7 @@ impl EncryptedLpPacket {
#[derive(Clone, PartialEq)]
pub struct LpPacket {
pub(crate) header: LpHeader,
pub(crate) frame: LpFrame,
pub(crate) message: LpMessage,
}
impl Debug for LpPacket {
@@ -91,16 +91,16 @@ impl Debug for LpPacket {
}
impl LpPacket {
pub fn new(header: LpHeader, frame: LpFrame) -> Self {
Self { header, frame }
pub fn new(header: LpHeader, message: LpMessage) -> Self {
Self { header, message }
}
pub fn frame(&self) -> &LpFrame {
&self.frame
pub fn message(&self) -> &LpMessage {
&self.message
}
pub fn into_frame(self) -> LpFrame {
self.frame
pub fn into_message(self) -> LpMessage {
self.message
}
pub fn header(&self) -> &LpHeader {
@@ -115,6 +115,6 @@ impl LpPacket {
pub(crate) fn dbg_encode(&self, dst: &mut BytesMut) {
self.header.dbg_encode(dst);
self.frame.encode(dst)
self.message.encode(dst)
}
}
+1 -1
View File
@@ -1,5 +1,5 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// SPDX-License-Identifier: GPL-3.0-only
use crate::LpError;
use nym_kkt::keys::EncapsulationKey;
+23 -22
View File
@@ -6,7 +6,7 @@
//! This module implements session management functionality, including replay protection
use crate::codec::{decrypt_lp_packet, encrypt_lp_packet};
use crate::packet::{EncryptedLpPacket, LpFrame, LpHeader, LpPacket};
use crate::packet::{EncryptedLpPacket, LpHeader, LpMessage, LpPacket};
use crate::peer::{LpLocalPeer, LpRemotePeer};
use crate::peer_config::LpReceiverIndex;
use crate::psq::initiator::HandshakeMode;
@@ -32,7 +32,7 @@ pub enum LpInput {
ReceivePacket(EncryptedLpPacket),
/// Application wants to send data (only valid in Transport state).
SendFrame(LpFrame),
SendData(LpMessage),
}
/// Represents actions the state machine requests the environment to perform.
@@ -42,7 +42,7 @@ pub enum LpAction {
SendPacket(EncryptedLpPacket),
/// Deliver decrypted application data received from the peer.
DeliverFrame(LpFrame),
DeliverData(LpMessage),
}
pub type SessionId = [u8; 32];
@@ -234,10 +234,10 @@ impl LpTransportSession {
self.protocol_version
}
pub fn next_packet(&mut self, frame: LpFrame) -> Result<LpPacket, LpError> {
pub fn next_packet(&mut self, message: LpMessage) -> Result<LpPacket, LpError> {
let counter = self.next_counter();
let header = LpHeader::new(self.receiver_index(), counter, self.protocol_version);
let packet = LpPacket::new(header, frame);
let packet = LpPacket::new(header, message);
Ok(packet)
}
@@ -299,19 +299,22 @@ impl LpTransportSession {
self.receiving_counter.current_packet_cnt()
}
/// Wrap the provided `LpFrame` into an `LpPacket` and encrypt its content using the established transport session
/// to produce an `EncryptedLpPacket`
/// Encrypts a produced application using the established transport session
/// and produce an `EncryptedLpPacket`
///
/// # Arguments
///
/// * `frame` - structured `LpFrame` to wrap and encrypt
/// * `data` - plaintext data to encrypt
///
/// # Returns
///
/// * `Ok(EncryptedLpPacket)` containing the encrypted message ciphertext.
/// * `Err(LpError)` if the session is not in transport mode or encryption fails.
pub(crate) fn wrap_lp_frame(&mut self, frame: LpFrame) -> Result<EncryptedLpPacket, LpError> {
let packet = self.next_packet(frame)?;
pub(crate) fn encrypt_application_data(
&mut self,
data: LpMessage,
) -> Result<EncryptedLpPacket, LpError> {
let packet = self.next_packet(data)?;
encrypt_lp_packet(packet, &mut self.active_transport)
}
@@ -319,7 +322,7 @@ impl LpTransportSession {
///
/// # Arguments
///
/// * `packet` - The encrypted packet
/// * `ciphertext` - The encrypted packet
///
/// # Returns
///
@@ -355,11 +358,11 @@ impl LpTransportSession {
self.receiving_counter_mark(ctr)?;
// 4. deliver the message
Ok(LpAction::DeliverFrame(packet.frame))
Ok(LpAction::DeliverData(packet.message))
}
LpInput::SendFrame(data) => {
LpInput::SendData(data) => {
// Encrypt and send application data
match self.wrap_lp_frame(data) {
match self.encrypt_application_data(data) {
Ok(packet) => Ok(LpAction::SendPacket(packet)),
Err(e) => Err(e),
}
@@ -473,9 +476,8 @@ mod tests {
// --- Transport Phase ---
println!("--- Step 1: Initiator sends data ---");
let data_to_send_1 = LpFrame::new_opaque(b"hello responder".to_vec());
let init_actions_4 =
initiator.process_input(LpInput::SendFrame(data_to_send_1.clone()));
let data_to_send_1 = LpMessage::new_opaque(b"hello responder".to_vec());
let init_actions_4 = initiator.process_input(LpInput::SendData(data_to_send_1.clone()));
let data_packet_1 = if let Ok(LpAction::SendPacket(packet)) = init_actions_4 {
packet.clone()
} else {
@@ -485,7 +487,7 @@ mod tests {
println!("--- Step 2: Responder receives data ---");
let resp_actions_5 = responder.process_input(LpInput::ReceivePacket(data_packet_1));
let resp_data_1 = if let Ok(LpAction::DeliverFrame(data)) = resp_actions_5 {
let resp_data_1 = if let Ok(LpAction::DeliverData(data)) = resp_actions_5 {
data
} else {
panic!("Responder should deliver data");
@@ -493,9 +495,8 @@ mod tests {
assert_eq!(resp_data_1, data_to_send_1);
println!("--- Step 3: Responder sends data ---");
let data_to_send_2 = LpFrame::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 =
responder.process_input(LpInput::SendFrame(data_to_send_2.clone()));
let data_to_send_2 = LpMessage::new_opaque(b"hello initiator".to_vec());
let resp_actions_6 = responder.process_input(LpInput::SendData(data_to_send_2.clone()));
let data_packet_2 = if let Ok(LpAction::SendPacket(packet)) = resp_actions_6 {
packet.clone()
} else {
@@ -505,7 +506,7 @@ mod tests {
println!("--- Step 4: Initiator receives data ---");
let init_actions_5 = initiator.process_input(LpInput::ReceivePacket(data_packet_2));
if let Ok(LpAction::DeliverFrame(data)) = init_actions_5 {
if let Ok(LpAction::DeliverData(data)) = init_actions_5 {
assert_eq!(data, data_to_send_2);
} else {
panic!("Initiator should deliver data");
+20 -18
View File
@@ -1,6 +1,6 @@
#[cfg(test)]
mod tests {
use crate::packet::{EncryptedLpPacket, LpFrame};
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::session::{LpAction, LpInput};
use crate::{LpError, SessionManager, SessionsMock};
use nym_kkt_ciphersuite::{IntoEnumIterator, KEM};
@@ -9,7 +9,7 @@ mod tests {
trait ActionExtract {
fn ciphertext(self) -> EncryptedLpPacket;
fn data(self) -> LpFrame;
fn data(self) -> LpMessage;
}
impl ActionExtract for LpAction {
@@ -21,8 +21,8 @@ mod tests {
}
}
fn data(self) -> LpFrame {
if let LpAction::DeliverFrame(data) = self {
fn data(self) -> LpMessage {
if let LpAction::DeliverData(data) = self {
data
} else {
panic!("invalid action");
@@ -54,7 +54,7 @@ mod tests {
// --- A sends to B ---
let plaintext_a = format!("A->B Message {i}").into_bytes();
let ciphertext_a = session_manager_1
.send_frame(peer_a_sm, LpFrame::new_opaque(plaintext_a.clone()))
.send_data(peer_a_sm, LpMessage::new_opaque(plaintext_a.clone()))
.unwrap()
.ciphertext();
@@ -68,7 +68,7 @@ mod tests {
// --- B sends to A ---
let plaintext_b = format!("B->A Message {i}").into_bytes();
let ciphertext_b = session_manager_2
.send_frame(peer_b_sm, LpFrame::new_opaque(plaintext_b.clone()))
.send_data(peer_b_sm, LpMessage::new_opaque(plaintext_b.clone()))
.unwrap()
.ciphertext();
@@ -183,13 +183,15 @@ mod tests {
// --- 3. Simulate Data Transfer via process_input ---
println!("Starting data transfer simulation via process_input...");
let plaintext_a_to_b = LpFrame::new_opaque(b"Hello from A via process_input!".to_vec());
let plaintext_b_to_a = LpFrame::new_opaque(b"Hello from B via process_input!".to_vec());
let plaintext_a_to_b =
LpMessage::new_opaque(b"Hello from A via process_input!".to_vec());
let plaintext_b_to_a =
LpMessage::new_opaque(b"Hello from B via process_input!".to_vec());
// --- A sends to B ---
println!(" A sends to B");
let action_a_send = session_manager_1
.process_input(session_id, LpInput::SendFrame(plaintext_a_to_b.clone()))
.process_input(session_id, LpInput::SendData(plaintext_a_to_b.clone()))
.expect("A SendData failed");
let data_packet_a = action_a_send.ciphertext();
@@ -200,7 +202,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(data_packet_a))
.expect("B ReceivePacket (data) failed");
if let LpAction::DeliverFrame(data) = action_b_recv {
if let LpAction::DeliverData(data) = action_b_recv {
assert_eq!(data, plaintext_a_to_b, "Decrypted data mismatch A->B");
println!(
" B successfully decrypted: {:?}",
@@ -213,7 +215,7 @@ mod tests {
// --- B sends to A ---
println!(" B sends to A");
let action_b_send = session_manager_2
.process_input(session_id, LpInput::SendFrame(plaintext_b_to_a.clone()))
.process_input(session_id, LpInput::SendData(plaintext_b_to_a.clone()))
.expect("B SendData failed");
let data_packet_b = action_b_send.ciphertext();
@@ -227,7 +229,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(data_packet_b))
.expect("A ReceivePacket (data) failed");
if let LpAction::DeliverFrame(data) = action_a_recv {
if let LpAction::DeliverData(data) = action_a_recv {
assert_eq!(data, plaintext_b_to_a, "Decrypted data mismatch B->A");
println!(
" A successfully decrypted: {:?}",
@@ -256,11 +258,11 @@ mod tests {
println!("Testing out-of-order reception via process_input...");
// A prepares N+1 then N
let data_n_plus_1 = LpFrame::new_opaque(b"Message N+1".to_vec());
let data_n = LpFrame::new_opaque(b"Message N".to_vec());
let data_n_plus_1 = LpMessage::new_opaque(b"Message N+1".to_vec());
let data_n = LpMessage::new_opaque(b"Message N".to_vec());
let action_send_n1 = session_manager_1
.process_input(session_id, LpInput::SendFrame(data_n_plus_1.clone()))
.process_input(session_id, LpInput::SendData(data_n_plus_1.clone()))
.unwrap();
let packet_n1 = match action_send_n1 {
LpAction::SendPacket(p) => p,
@@ -268,7 +270,7 @@ mod tests {
};
let action_send_n = session_manager_1
.process_input(session_id, LpInput::SendFrame(data_n.clone()))
.process_input(session_id, LpInput::SendData(data_n.clone()))
.unwrap();
let packet_n = match action_send_n {
LpAction::SendPacket(p) => p,
@@ -282,7 +284,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(packet_n1))
.unwrap();
match action_recv_n1 {
LpAction::DeliverFrame(d) => assert_eq!(d, data_n_plus_1, "Data N+1 mismatch"),
LpAction::DeliverData(d) => assert_eq!(d, data_n_plus_1, "Data N+1 mismatch"),
_ => panic!("Expected DeliverData for N+1"),
}
@@ -292,7 +294,7 @@ mod tests {
.process_input(session_id, LpInput::ReceivePacket(packet_n))
.unwrap();
match action_recv_n {
LpAction::DeliverFrame(d) => assert_eq!(d, data_n, "Data N mismatch"),
LpAction::DeliverData(d) => assert_eq!(d, data_n, "Data N mismatch"),
_ => panic!("Expected DeliverData for N"),
}
+4 -4
View File
@@ -6,7 +6,7 @@
//! This module implements session lifecycle management functionality, handling
//! creation, retrieval, and storage of sessions.
use crate::packet::{EncryptedLpPacket, LpFrame};
use crate::packet::{EncryptedLpPacket, LpMessage};
use crate::peer_config::LpReceiverIndex;
use crate::{LpError, LpTransportSession};
use std::collections::HashMap;
@@ -39,12 +39,12 @@ impl SessionManager {
self.with_session_mut(lp_id, |sm| sm.process_input(input))?
}
pub fn send_frame(
pub fn send_data(
&mut self,
lp_id: LpReceiverIndex,
frame: LpFrame,
data: LpMessage,
) -> Result<LpAction, LpError> {
self.process_input(lp_id, LpInput::SendFrame(frame))
self.process_input(lp_id, LpInput::SendData(data))
}
pub fn receive_packet(
@@ -1,6 +1,5 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated associated function `nym_crypto::generic_array::GenericArray::<T, N>::clone_from_slice`: please upgrade to generic-array 1.x - TODO
pub mod identifier;
pub mod key;
@@ -1,7 +1,6 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)] // silences clippy warning: deprecated struct `nym_crypto::generic_array::GenericArray`: please upgrade to generic-array 1.x - TODO
pub mod encryption_key;
pub mod reply_surb;
pub mod requests;
@@ -1,64 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nyxd_scraper_shared::error::ScraperError;
use nyxd_scraper_shared::storage::FullBlockInformation;
use nyxd_scraper_shared::watcher::{NyxdWatcher, WatcherConfig};
use nyxd_scraper_shared::{BlockModule, ParsedTransactionResponse, TxModule};
struct FancyBlockModule;
struct FancyTxModule;
#[async_trait::async_trait]
impl BlockModule for FancyBlockModule {
async fn handle_block(&mut self, block: &FullBlockInformation) -> Result<(), ScraperError> {
println!("🚀 got new block for height {}", block.block.header.height);
// should be false
println!("results scraped: {}", block.results.is_some());
// should be false
println!("validators scraped: {}", block.validators.is_some());
// should be true
println!("transactions scraped: {}", block.transactions.is_some());
println!();
Ok(())
}
}
#[async_trait::async_trait]
impl TxModule for FancyTxModule {
async fn handle_tx(&mut self, tx: &ParsedTransactionResponse) -> Result<(), ScraperError> {
println!(
"✨ got new tx for height {}: {} ({} msgs)",
tx.block.header.height,
tx.hash,
tx.parsed_messages.len()
);
Ok(())
}
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
let cfg = WatcherConfig {
websocket_url: "wss://rpc.nymtech.net/websocket".parse()?,
rpc_url: "https://rpc.nymtech.net".parse()?,
};
let watcher = NyxdWatcher::builder(cfg)
.with_block_module(FancyBlockModule)
.with_tx_module(FancyTxModule)
.build_and_start()
.await?;
// run for 30s before shutting down
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
watcher.stop().await;
Ok(())
}
@@ -1,94 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::NyxdScraperStorageError;
use crate::{NyxdScraperStorage, NyxdScraperTransaction, ParsedTransactionResponse};
use tendermint::Block;
use tendermint::block::Commit;
use tendermint_rpc::endpoint::validators::Response;
use thiserror::Error;
#[derive(Clone)]
pub struct Ephemeral;
#[derive(Debug, Error)]
#[error("no storage backend enabled")]
pub struct EphemeralStorageError;
pub struct EphemeralTransaction;
#[async_trait::async_trait]
impl NyxdScraperTransaction for EphemeralTransaction {
async fn commit(self) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_validators(&mut self, _: &Response) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_block_data(
&mut self,
_: &Block,
_: i64,
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_commits(
&mut self,
_: &Commit,
_: &Response,
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_txs(
&mut self,
_: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn persist_messages(
&mut self,
_: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn update_last_processed(&mut self, _: i64) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
}
#[async_trait::async_trait]
impl NyxdScraperStorage for Ephemeral {
type StorageTransaction = EphemeralTransaction;
async fn initialise(_: &str, _: &bool) -> Result<Self, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
async fn prune_storage(&self, _: u32, _: u32) -> Result<(), NyxdScraperStorageError> {
Err(NyxdScraperStorageError::new(EphemeralStorageError))
}
}
@@ -2,16 +2,13 @@
// SPDX-License-Identifier: Apache-2.0
use crate::PruningOptions;
use crate::block_processor::ephemeral_storage::Ephemeral;
use crate::block_processor::helpers::split_request_range;
use crate::block_processor::types::BlockToProcess;
use crate::block_requester::BlockRequest;
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::{RetrievalConfig, RpcClient};
use crate::storage::{
FullBlockInformation, NyxdScraperStorage, NyxdScraperTransaction, persist_block,
};
use crate::rpc_client::RpcClient;
use crate::storage::{NyxdScraperStorage, NyxdScraperTransaction, persist_block};
use futures::StreamExt;
use std::cmp::max;
use std::collections::{BTreeMap, HashSet, VecDeque};
@@ -25,7 +22,6 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, trace, warn};
mod ephemeral_storage;
mod helpers;
pub(crate) mod pruning;
pub(crate) mod types;
@@ -81,148 +77,20 @@ impl BlockProcessorConfig {
}
}
pub struct BlockProcessorPersistence<S> {
pub struct BlockProcessor<S> {
config: BlockProcessorConfig,
synced: Arc<Notify>,
last_pruned_height: u32,
storage: S,
}
impl<S> BlockProcessorPersistence<S>
where
S: NyxdScraperStorage,
{
pub(crate) async fn new(
config: BlockProcessorConfig,
synced: Arc<Notify>,
storage: S,
) -> Result<Self, ScraperError> {
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(pruned_height = %last_pruned_height, "setting up block processor...");
Ok(Self {
config,
synced,
last_pruned_height,
storage,
})
}
#[must_use]
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.config.pruning_options = pruning_options;
self
}
async fn stored_last_processed_height(&self) -> Result<u32, ScraperError> {
let last_processed = self.storage.get_last_processed_height().await?;
let last_processed_height = last_processed.try_into().unwrap_or_default();
Ok(last_processed_height)
}
async fn persist_block(
&mut self,
full_info: &FullBlockInformation,
) -> Result<(), ScraperError> {
// process the entire block as a transaction so that if anything fails,
// we wouldn't end up with a corrupted storage.
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(full_info, &mut tx, self.config.store_precommits).await?;
let commit_start = Instant::now();
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
let last_processed_height = full_info.block.header.height.value() as u32;
if let Err(err) = self.maybe_prune_storage(last_processed_height).await {
error!("failed to prune the storage: {err}");
}
Ok(())
}
#[instrument(skip(self))]
async fn prune_storage(&mut self, last_processed_height: u32) -> Result<(), ScraperError> {
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = last_processed_height - keep_recent;
info!(
keep_recent,
oldest_to_keep = last_to_keep,
"pruning the storage"
);
let lowest: u32 = self
.storage
.lowest_block_height()
.await?
.unwrap_or_default()
.try_into()
.unwrap_or_default();
let to_prune = last_to_keep.saturating_sub(lowest);
match to_prune {
v if v > 1000 => warn!("approximately {v} blocks worth of data will be pruned"),
v if v > 100 => info!("approximately {v} blocks worth of data will be pruned"),
0 => trace!("no blocks to prune"),
v => debug!("approximately {v} blocks worth of data will be pruned"),
}
if to_prune == 0 {
self.last_pruned_height = last_processed_height;
return Ok(());
}
self.storage
.prune_storage(last_to_keep, last_processed_height)
.await?;
self.last_pruned_height = last_processed_height;
Ok(())
}
async fn maybe_prune_storage(
&mut self,
last_processed_height: u32,
) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= last_processed_height {
self.prune_storage(last_processed_height).await?;
}
Ok(())
}
}
pub struct BlockProcessor<S = Ephemeral> {
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
last_pruned_height: u32,
last_processed_at: Instant,
pending_sync: PendingSync,
queued_blocks: BTreeMap<u32, BlockToProcess>,
/// Specifies how much data to actually retrieve per block
retrieval_config: RetrievalConfig,
rpc_client: RpcClient,
incoming: UnboundedReceiverStream<BlockToProcess>,
block_requester: Sender<BlockRequest>,
persistence: Option<BlockProcessorPersistence<S>>,
storage: S,
// future work: rather than sending each msg to every msg module,
// let them subscribe based on `type_url` inside the message itself
@@ -237,97 +105,108 @@ impl<S> BlockProcessor<S>
where
S: NyxdScraperStorage,
{
pub fn new(
pub async fn new(
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: S,
rpc_client: RpcClient,
) -> Self {
BlockProcessor {
) -> Result<Self, ScraperError> {
let last_processed = storage.get_last_processed_height().await?;
let last_processed_height = last_processed.try_into().unwrap_or_default();
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(last_processed_height = %last_processed_height, pruned_height = %last_pruned_height, "setting up block processor...");
Ok(BlockProcessor {
config,
cancel,
last_processed_height: Default::default(),
synced,
last_processed_height,
last_pruned_height,
last_processed_at: Instant::now(),
pending_sync: Default::default(),
queued_blocks: Default::default(),
retrieval_config: RetrievalConfig::default(),
rpc_client,
incoming: incoming.into(),
block_requester,
persistence: None,
storage,
block_modules: vec![],
tx_modules: vec![],
msg_modules: vec![],
}
})
}
#[must_use]
pub fn with_retrieval_config(mut self, retrieval_config: RetrievalConfig) -> Self {
self.retrieval_config = retrieval_config;
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.config.pruning_options = pruning_options;
self
}
pub async fn with_persistence(
mut self,
persistence: BlockProcessorPersistence<S>,
) -> Result<Self, ScraperError> {
let last_processed_height = persistence.stored_last_processed_height().await?;
debug!(last_processed_height = %last_processed_height, "setting up block processor...");
self.persistence = Some(persistence);
Ok(self)
}
pub(super) async fn process_block(
&mut self,
block: BlockToProcess,
) -> Result<(), ScraperError> {
info!("processing block at height {}", block.height);
let full_info = self
.rpc_client
.try_get_full_details(block, self.retrieval_config)
.await?;
let full_info = self.rpc_client.try_get_full_details(block).await?;
if let Some(tx_info) = &full_info.transactions {
debug!("this block has {} transaction(s)", tx_info.len());
for tx in tx_info {
debug!("{} has {} message(s)", tx.hash, tx.tx.body.messages.len());
for (index, msg) in tx.tx.body.messages.iter().enumerate() {
debug!("{index}: {:?}", msg.type_url)
}
debug!(
"this block has {} transaction(s)",
full_info.transactions.len()
);
for tx in &full_info.transactions {
debug!("{} has {} message(s)", tx.hash, tx.tx.body.messages.len());
for (index, msg) in tx.tx.body.messages.iter().enumerate() {
debug!("{index}: {:?}", msg.type_url)
}
}
// if we have enabled persistence, do try to store the block information
if let Some(persistence) = &mut self.persistence {
persistence.persist_block(&full_info).await?;
}
// process the entire block as a transaction so that if anything fails,
// we won't end up with a corrupted storage.
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
// let the modules do whatever they want
// the ones wanting the full block:
for block_module in &mut self.block_modules {
block_module.handle_block(&full_info).await?;
block_module.handle_block(&full_info, &mut tx).await?;
}
// the ones wanting transactions (assuming tx retrieval is enabled):
if let Some(tx_info) = &full_info.transactions {
for block_tx in tx_info {
for tx_module in &mut self.tx_modules {
tx_module.handle_tx(block_tx).await?;
}
// the ones concerned with individual messages
for (index, msg) in block_tx.tx.body.messages.iter().enumerate() {
for msg_module in &mut self.msg_modules {
if msg.type_url == msg_module.type_url() {
msg_module.handle_msg(index, msg, block_tx).await?
}
// the ones wanting transactions:
for block_tx in full_info.transactions {
for tx_module in &mut self.tx_modules {
tx_module.handle_tx(&block_tx, &mut tx).await?;
}
// the ones concerned with individual messages
for (index, msg) in block_tx.tx.body.messages.iter().enumerate() {
for msg_module in &mut self.msg_modules {
if msg.type_url == msg_module.type_url() {
msg_module
.handle_msg(index, msg, &block_tx, &mut tx)
.await?
}
}
}
}
let commit_start = Instant::now();
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
self.last_processed_height = full_info.block.header.height.value() as u32;
self.last_processed_at = Instant::now();
if let Err(err) = self.maybe_prune_storage().await {
error!("failed to prune the storage: {err}");
}
Ok(())
}
@@ -405,6 +284,62 @@ where
Ok(())
}
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
keep_recent,
oldest_to_keep = last_to_keep,
"pruning the storage"
);
let lowest: u32 = self
.storage
.lowest_block_height()
.await?
.unwrap_or_default()
.try_into()
.unwrap_or_default();
let to_prune = last_to_keep.saturating_sub(lowest);
match to_prune {
v if v > 1000 => warn!("approximately {v} blocks worth of data will be pruned"),
v if v > 100 => info!("approximately {v} blocks worth of data will be pruned"),
0 => trace!("no blocks to prune"),
v => debug!("approximately {v} blocks worth of data will be pruned"),
}
if to_prune == 0 {
self.last_pruned_height = self.last_processed_height;
return Ok(());
}
self.storage
.prune_storage(last_to_keep, self.last_processed_height)
.await?;
self.last_pruned_height = self.last_processed_height;
Ok(())
}
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
Ok(())
}
async fn next_incoming(&mut self, block: BlockToProcess) {
let height = block.height;
@@ -444,10 +379,8 @@ where
self.try_request_pending().await;
if let Some(persistence) = &self.persistence
&& self.pending_sync.is_empty()
{
persistence.synced.notify_one();
if self.pending_sync.is_empty() {
self.synced.notify_one();
}
}
@@ -477,14 +410,7 @@ where
assert!(self.pending_sync.is_empty());
info!("attempting to run startup resync...");
let Some(persistence) = self.persistence.as_mut() else {
// without data persistence, we're always starting from scratch
return Ok(());
};
persistence
.maybe_prune_storage(self.last_processed_height)
.await?;
self.maybe_prune_storage().await?;
let latest_block = self.rpc_client.current_block_height().await? as u32;
info!("obtained latest block height: {latest_block}");
@@ -493,10 +419,10 @@ where
info!("we have already processed some blocks in the past - attempting to resume...");
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = persistence.config.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
if !persistence.config.pruning_options.strategy.is_nothing() {
if !self.config.pruning_options.strategy.is_nothing() {
self.last_processed_height = max(self.last_processed_height, last_to_keep);
}
@@ -514,7 +440,7 @@ where
// this is the first time starting up
if self.last_processed_height == 0 {
info!("this is the first time starting up");
let Some(starting_height) = persistence.config.explicit_starting_block_height else {
let Some(starting_height) = self.config.explicit_starting_block_height else {
info!("no starting block height set - will use the default behaviour");
// nothing to do
return Ok(());
@@ -525,9 +451,7 @@ where
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");
if earliest_available > starting_height
&& persistence.config.use_best_effort_start_height
{
if earliest_available > starting_height && self.config.use_best_effort_start_height {
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
@@ -2,7 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::ScraperError;
use std::collections::BTreeMap;
use crate::helpers;
use std::collections::HashMap;
use tendermint::{Block, Hash, abci, block, tx};
use tendermint_rpc::endpoint::{block as block_endpoint, block_results, validators};
use tendermint_rpc::event::{Event, EventData};
@@ -27,9 +28,9 @@ pub struct ParsedTransactionResponse {
pub proof: Option<tx::Proof>,
pub parsed_messages: BTreeMap<usize, serde_json::Value>,
pub parsed_messages: HashMap<usize, serde_json::Value>,
pub parsed_message_urls: BTreeMap<usize, String>,
pub parsed_message_urls: HashMap<usize, String>,
pub block: Block,
}
@@ -40,13 +41,32 @@ pub struct FullBlockInformation {
pub block: Block,
/// All of the emitted events alongside any tx results.
pub results: Option<block_results::Response>,
pub results: block_results::Response,
/// Validator set for this particular block
pub validators: Option<validators::Response>,
pub validators: validators::Response,
/// Transaction results from this particular block
pub transactions: Option<Vec<ParsedTransactionResponse>>,
pub transactions: Vec<ParsedTransactionResponse>,
}
impl FullBlockInformation {
pub fn ensure_proposer(&self) -> Result<(), ScraperError> {
let block_proposer = self.block.header.proposer_address;
if !self
.validators
.validators
.iter()
.any(|v| v.address == block_proposer)
{
let proposer = helpers::validator_consensus_address(block_proposer)?;
return Err(ScraperError::BlockProposerNotInValidatorSet {
height: self.block.header.height.value() as u32,
proposer: proposer.to_string(),
});
}
Ok(())
}
}
pub(crate) struct BlockToProcess {
+4 -8
View File
@@ -172,20 +172,16 @@ pub enum ScraperError {
#[source]
error: base64::DecodeError,
},
#[error("no modules configured for the chain watcher")]
NoModulesConfigured,
#[error("no storage configured for the chain watcher")]
NoStorageConfigured,
}
impl ScraperError {
pub fn tx_begin_failure(source: NyxdScraperStorageError) -> ScraperError {
pub fn tx_begin_failure(source: NyxdScraperStorageError) -> ScraperError
where {
ScraperError::StorageTxBeginFailure { source }
}
pub fn tx_commit_failure(source: NyxdScraperStorageError) -> ScraperError {
pub fn tx_commit_failure(source: NyxdScraperStorageError) -> ScraperError
where {
ScraperError::StorageTxCommitFailure { source }
}
}
-2
View File
@@ -11,8 +11,6 @@ pub mod modules;
pub(crate) mod rpc_client;
pub(crate) mod scraper;
pub mod storage;
pub(crate) mod subscriber;
pub mod watcher;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use block_processor::types::ParsedTransactionResponse;
@@ -3,9 +3,14 @@
use crate::block_processor::types::FullBlockInformation;
use crate::error::ScraperError;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
pub trait BlockModule {
async fn handle_block(&mut self, block: &FullBlockInformation) -> Result<(), ScraperError>;
async fn handle_block(
&mut self,
block: &FullBlockInformation,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,6 +3,7 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
use cosmrs::Any;
@@ -15,5 +16,6 @@ pub trait MsgModule {
index: usize,
msg: &Any,
tx: &ParsedTransactionResponse,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,9 +3,14 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
pub trait TxModule {
async fn handle_tx(&mut self, tx: &ParsedTransactionResponse) -> Result<(), ScraperError>;
async fn handle_tx(
&mut self,
tx: &ParsedTransactionResponse,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
+21 -86
View File
@@ -9,32 +9,15 @@ use crate::helpers::tx_hash;
use crate::{Any, MessageRegistry, default_message_registry};
use futures::StreamExt;
use futures::future::join3;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use tendermint::{Block, Hash};
use tendermint::Hash;
use tendermint_rpc::endpoint::{block, block_results, tx, validators};
use tendermint_rpc::{Client, HttpClient, Paging};
use tokio::sync::Mutex;
use tracing::{debug, instrument, warn};
use url::Url;
#[derive(Debug, Clone, Copy)]
pub struct RetrievalConfig {
pub get_validators: bool,
pub get_transactions: bool,
pub get_block_results: bool,
}
impl Default for RetrievalConfig {
fn default() -> Self {
Self {
get_validators: true,
get_transactions: true,
get_block_results: true,
}
}
}
#[derive(Clone)]
pub struct RpcClient {
// right now I don't care about anything nym specific, so a simple http client is sufficient,
@@ -70,15 +53,27 @@ impl RpcClient {
}
}
fn parse_transactions(
#[instrument(skip(self, block), fields(height = block.height))]
pub async fn try_get_full_details(
&self,
raw_transactions: Vec<tx::Response>,
block: &Block,
) -> Result<Vec<ParsedTransactionResponse>, ScraperError> {
block: BlockToProcess,
) -> Result<FullBlockInformation, ScraperError> {
debug!("getting complete block details");
let height = block.height;
// make all the http requests concurrently
let (results, validators, raw_transactions) = join3(
self.get_block_results(height),
self.get_validators_details(height),
self.get_transaction_results(&block.block.data),
)
.await;
let raw_transactions = raw_transactions?;
let mut transactions = Vec::with_capacity(raw_transactions.len());
for raw_tx in raw_transactions {
let mut parsed_messages = BTreeMap::new();
let mut parsed_message_urls = BTreeMap::new();
let mut parsed_messages = HashMap::new();
let mut parsed_message_urls = HashMap::new();
let tx = cosmrs::Tx::from_bytes(&raw_tx.tx).map_err(|source| {
ScraperError::TxParseFailure {
hash: raw_tx.hash,
@@ -102,33 +97,9 @@ impl RpcClient {
proof: raw_tx.proof,
parsed_messages,
parsed_message_urls,
block: block.clone(),
block: block.block.clone(),
})
}
Ok(transactions)
}
#[instrument(skip(self, block), fields(height = block.height))]
pub async fn try_get_full_details(
&self,
block: BlockToProcess,
config: RetrievalConfig,
) -> Result<FullBlockInformation, ScraperError> {
debug!("getting complete block details");
let height = block.height;
// make all the http requests run concurrently
let (results, validators, raw_transactions) = join3(
self.maybe_get_block_results(height, config.get_block_results),
self.maybe_get_validators_details(height, config.get_validators),
self.maybe_get_transaction_results(&block.block.data, config.get_transactions),
)
.await;
let transactions = match raw_transactions? {
Some(raw) => Some(self.parse_transactions(raw, &block.block)?),
None => None,
};
Ok(FullBlockInformation {
block: block.block,
@@ -169,18 +140,6 @@ impl RpcClient {
})
}
async fn maybe_get_block_results(
&self,
height: u32,
retrieve: bool,
) -> Result<Option<block_results::Response>, ScraperError> {
if retrieve {
self.get_block_results(height).await.map(Some)
} else {
Ok(None)
}
}
pub(crate) async fn current_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting current block height");
@@ -237,18 +196,6 @@ impl RpcClient {
inner.into_values().collect()
}
async fn maybe_get_transaction_results(
&self,
raw: &[Vec<u8>],
retrieve: bool,
) -> Result<Option<Vec<tx::Response>>, ScraperError> {
if retrieve {
self.get_transaction_results(raw).await.map(Some)
} else {
Ok(None)
}
}
#[instrument(skip(self, tx_hash), fields(tx_hash = %tx_hash), err(Display))]
async fn get_transaction_result(&self, tx_hash: Hash) -> Result<tx::Response, ScraperError> {
debug!("getting tx results");
@@ -277,16 +224,4 @@ impl RpcClient {
source: Box::new(source),
})
}
async fn maybe_get_validators_details(
&self,
height: u32,
retrieve: bool,
) -> Result<Option<validators::Response>, ScraperError> {
if retrieve {
self.get_validators_details(height).await.map(Some)
} else {
Ok(None)
}
}
}
@@ -1,15 +1,15 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::PruningOptions;
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::{BlockProcessor, BlockProcessorConfig, BlockProcessorPersistence};
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::scraper::subscriber::ChainSubscriber;
use crate::storage::NyxdScraperStorage;
use crate::subscriber::ChainSubscriber;
use futures::future::join_all;
use std::marker::PhantomData;
use std::sync::Arc;
@@ -22,6 +22,8 @@ use tokio_util::task::TaskTracker;
use tracing::{error, info};
use url::Url;
mod subscriber;
#[derive(Default, Clone, Copy)]
pub struct StartingBlockOpts {
pub start_block_height: Option<u32>,
@@ -70,7 +72,7 @@ where
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
let rpc_client = scraper.rpc_client.clone();
let rpc_client = RpcClient::new(&scraper.config.rpc_url)?;
// create the tasks
let block_requester = BlockRequester::new(
@@ -88,19 +90,14 @@ where
);
let mut block_processor = BlockProcessor::new(
block_processor_config,
scraper.cancel_token.clone(),
scraper.startup_sync.clone(),
processing_rx,
req_tx,
scraper.storage.clone(),
rpc_client,
)
.with_persistence(
BlockProcessorPersistence::new(
block_processor_config,
scraper.startup_sync.clone(),
scraper.storage.clone(),
)
.await?,
)
.await?;
block_processor.set_block_modules(self.block_modules);
block_processor.set_tx_modules(self.tx_modules);
@@ -129,19 +126,16 @@ where
}
}
#[must_use]
pub fn with_block_module<M: BlockModule + Send + 'static>(mut self, module: M) -> Self {
self.block_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_tx_module<M: TxModule + Send + 'static>(mut self, module: M) -> Self {
self.tx_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_msg_module<M: MsgModule + Send + 'static>(mut self, module: M) -> Self {
self.msg_modules.push(Box::new(module));
self
@@ -215,12 +209,9 @@ where
let (req_tx, _) = channel(5);
let mut block_processor = self
.new_block_processor_with_persistence(
req_tx.clone(),
processing_rx,
PruningOptions::nothing(),
)
.await?;
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
let block = self.rpc_client.get_basic_block_details(height).await?;
@@ -243,12 +234,9 @@ where
let (req_tx, _) = channel(5);
let mut block_processor = self
.new_block_processor_with_persistence(
req_tx.clone(),
processing_rx,
PruningOptions::nothing(),
)
.await?;
.new_block_processor(req_tx.clone(), processing_rx)
.await?
.with_pruning(PruningOptions::nothing());
let mut current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
@@ -357,24 +345,10 @@ where
)
}
fn new_block_processor(
async fn new_block_processor(
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> BlockProcessor<S> {
BlockProcessor::<S>::new(
self.cancel_token.clone(),
processing_rx,
req_tx,
self.rpc_client.clone(),
)
}
async fn new_block_processor_with_persistence(
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
pruning_options: impl Into<Option<PruningOptions>>,
) -> Result<BlockProcessor<S>, ScraperError> {
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
@@ -383,27 +357,16 @@ where
self.config.start_block.use_best_effort_start_height,
);
let persistence = match pruning_options.into() {
Some(options) => BlockProcessorPersistence::new(
block_processor_config,
self.startup_sync.clone(),
self.storage.clone(),
)
.await?
.with_pruning(options),
None => {
BlockProcessorPersistence::new(
block_processor_config,
self.startup_sync.clone(),
self.storage.clone(),
)
.await?
}
};
self.new_block_processor(req_tx, processing_rx)
.with_persistence(persistence)
.await
BlockProcessor::<S>::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
req_tx,
self.storage.clone(),
self.rpc_client.clone(),
)
.await
}
async fn new_chain_subscriber(
@@ -425,9 +388,7 @@ where
// create the tasks
let block_requester = self.new_block_requester(req_rx, processing_tx.clone());
let block_processor = self
.new_block_processor_with_persistence(req_tx, processing_rx, None)
.await?;
let block_processor = self.new_block_processor(req_tx, processing_rx).await?;
let chain_subscriber = self.new_chain_subscriber(processing_tx).await?;
// spawn them
+15 -41
View File
@@ -3,7 +3,6 @@
use crate::error::ScraperError;
use async_trait::async_trait;
use tendermint::block;
use thiserror::Error;
use tracing::warn;
@@ -90,25 +89,6 @@ pub trait NyxdScraperTransaction {
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError>;
}
fn ensure_proposer_present(
block_header: &block::Header,
validators: &validators::Response,
) -> Result<(), ScraperError> {
let block_proposer = block_header.proposer_address;
if !validators
.validators
.iter()
.any(|v| v.address == block_proposer)
{
let proposer = crate::helpers::validator_consensus_address(block_proposer)?;
return Err(ScraperError::BlockProposerNotInValidatorSet {
height: block_header.height.value() as u32,
proposer: proposer.to_string(),
});
}
Ok(())
}
pub async fn persist_block<Tx>(
block: &FullBlockInformation,
tx: &mut Tx,
@@ -117,34 +97,28 @@ pub async fn persist_block<Tx>(
where
Tx: NyxdScraperTransaction,
{
let total_gas = match block.transactions.as_ref() {
Some(txs) => crate::helpers::tx_gas_sum(txs),
None => 0,
};
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
// SANITY CHECK: make sure the block proposer is present in the validator set
block.ensure_proposer()?;
tx.persist_validators(&block.validators).await?;
tx.persist_block_data(&block.block, total_gas).await?;
if let Some(validators) = &block.validators {
// SANITY CHECK: make sure the block proposer is present in the validator set
ensure_proposer_present(&block.block.header, validators)?;
tx.persist_validators(validators).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
tx.persist_commits(commit, validators).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
if store_precommits {
if let Some(commit) = &block.block.last_commit {
tx.persist_commits(commit, &block.validators).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
if let Some(transactions) = &block.transactions {
// persist txs
tx.persist_txs(transactions).await?;
// persist txs
tx.persist_txs(&block.transactions).await?;
// persist messages (inside the transactions)
tx.persist_messages(transactions).await?;
}
// persist messages (inside the transactions)
tx.persist_messages(&block.transactions).await?;
tx.update_last_processed(block.block.header.height.into())
.await?;
-160
View File
@@ -1,160 +0,0 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::BlockProcessor;
use crate::block_requester::BlockRequester;
use crate::error::ScraperError;
use crate::rpc_client::{RetrievalConfig, RpcClient};
use crate::subscriber::ChainSubscriber;
use crate::{BlockModule, MsgModule, TxModule};
use tokio::sync::mpsc::{channel, unbounded_channel};
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tracing::info;
use url::Url;
pub struct WatcherConfig {
/// Url to the websocket endpoint of a validator, for example `wss://rpc.nymtech.net/websocket`
pub websocket_url: Url,
/// Url to the rpc endpoint of a validator, for example `https://rpc.nymtech.net/`
pub rpc_url: Url,
}
pub struct NyxdWatcherBuilder {
config: WatcherConfig,
block_modules: Vec<Box<dyn BlockModule + Send>>,
tx_modules: Vec<Box<dyn TxModule + Send>>,
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
impl NyxdWatcherBuilder {
pub fn new(config: WatcherConfig) -> Self {
NyxdWatcherBuilder {
config,
block_modules: vec![],
tx_modules: vec![],
msg_modules: vec![],
}
}
#[must_use]
pub fn with_block_module<M: BlockModule + Send + 'static>(mut self, module: M) -> Self {
self.block_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_tx_module<M: TxModule + Send + 'static>(mut self, module: M) -> Self {
self.tx_modules.push(Box::new(module));
self
}
#[must_use]
pub fn with_msg_module<M: MsgModule + Send + 'static>(mut self, module: M) -> Self {
self.msg_modules.push(Box::new(module));
self
}
pub async fn build_and_start(self) -> Result<NyxdWatcher, ScraperError> {
// we must have at least something configured to run the watcher
if self.block_modules.is_empty()
&& self.tx_modules.is_empty()
&& self.msg_modules.is_empty()
{
return Err(ScraperError::NoModulesConfigured);
}
let watcher = NyxdWatcher::new();
let rpc_client = RpcClient::new(&self.config.rpc_url)?;
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
// create the tasks
let block_requester = BlockRequester::new(
watcher.cancel_token(),
rpc_client.clone(),
req_rx,
processing_tx.clone(),
);
let mut block_processor =
BlockProcessor::new(watcher.cancel_token(), processing_rx, req_tx, rpc_client)
.with_retrieval_config(RetrievalConfig {
get_validators: false,
get_transactions: true,
get_block_results: false,
});
block_processor.set_block_modules(self.block_modules);
block_processor.set_tx_modules(self.tx_modules);
block_processor.set_msg_modules(self.msg_modules);
let chain_subscriber = ChainSubscriber::new(
&self.config.websocket_url,
watcher.cancel_token(),
watcher.task_tracker.clone(),
processing_tx,
)
.await?;
watcher.start_tasks(block_requester, block_processor, chain_subscriber);
Ok(watcher)
}
}
/// A simpler alternative to the `NyxdScraper` that does not persist any received block information.
/// Instead, it only calls the registered modules on the processed data.
///
/// Furthermore, it also does not retrieve any validator information or detailed block information
pub struct NyxdWatcher {
task_tracker: TaskTracker,
cancel_token: CancellationToken,
}
impl NyxdWatcher {
pub fn builder(config: WatcherConfig) -> NyxdWatcherBuilder {
NyxdWatcherBuilder::new(config)
}
fn new() -> Self {
Self {
task_tracker: TaskTracker::new(),
cancel_token: CancellationToken::new(),
}
}
fn start_tasks(
&self,
mut block_requester: BlockRequester,
mut block_processor: BlockProcessor,
mut chain_subscriber: ChainSubscriber,
) {
self.task_tracker
.spawn(async move { block_requester.run().await });
self.task_tracker
.spawn(async move { block_processor.run().await });
self.task_tracker
.spawn(async move { chain_subscriber.run().await });
self.task_tracker.close();
}
pub async fn stop(self) {
info!("stopping the chain watcher");
assert!(self.task_tracker.is_closed());
self.cancel_token.cancel();
self.task_tracker.wait().await
}
pub fn cancel_token(&self) -> CancellationToken {
self.cancel_token.clone()
}
pub fn is_cancelled(&self) -> bool {
self.cancel_token.is_cancelled()
}
}
-1
View File
@@ -1,7 +1,6 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![allow(deprecated)]
use aes_gcm::aead::{Aead, Nonce};
use aes_gcm::{AeadCore, AeadInPlace, KeyInit};
use rand::{thread_rng, CryptoRng, Fill, RngCore};
-3
View File
@@ -38,9 +38,6 @@ pub enum NymTopologyError {
#[error("timed out while waiting for gateway '{identity_key}' to come online")]
TimedOutWaitingForGateway { identity_key: String },
#[error("timed out while waiting for minimum network topology to become online")]
TimedOutWaitingForTopology,
#[error(
"Wanted to create a mix route with {requested} hops, while only {available} layers are available"
)]
-10
View File
@@ -403,10 +403,6 @@ pub struct TopologyWasm {
/// before abandoning the procedure.
pub max_startup_gateway_waiting_period_ms: u32,
/// Defines how long the client is going to wait on startup for minimal topology to become online,
/// before abandoning the procedure.
pub max_startup_network_waiting_period_ms: u32,
/// Specifies whether the client should not refresh the network topology after obtaining
/// the first valid instance.
/// Supersedes `topology_refresh_rate_ms`.
@@ -450,9 +446,6 @@ impl From<TopologyWasm> for ConfigTopology {
max_startup_gateway_waiting_period: Duration::from_millis(
topology.max_startup_gateway_waiting_period_ms as u64,
),
max_startup_network_waiting_period: Duration::from_millis(
topology.max_startup_network_waiting_period_ms as u64,
),
minimum_mixnode_performance: topology.minimum_mixnode_performance,
minimum_gateway_performance: topology.minimum_gateway_performance,
use_extended_topology: topology.use_extended_topology,
@@ -470,9 +463,6 @@ impl From<ConfigTopology> for TopologyWasm {
max_startup_gateway_waiting_period_ms: topology
.max_startup_gateway_waiting_period
.as_millis() as u32,
max_startup_network_waiting_period_ms: topology
.max_startup_network_waiting_period
.as_millis() as u32,
disable_refreshing: topology.disable_refreshing,
minimum_mixnode_performance: topology.minimum_mixnode_performance,
minimum_gateway_performance: topology.minimum_gateway_performance,
@@ -276,11 +276,6 @@ pub struct TopologyWasmOverride {
#[tsify(optional)]
pub max_startup_gateway_waiting_period_ms: Option<u32>,
/// Defines how long the client is going to wait on startup for minimal topology to become online,
/// before abandoning the procedure.
#[tsify(optional)]
pub max_startup_network_waiting_period_ms: Option<u32>,
/// Specifies whether the client should not refresh the network topology after obtaining
/// the first valid instance.
/// Supersedes `topology_refresh_rate_ms`.
@@ -327,9 +322,6 @@ impl From<TopologyWasmOverride> for TopologyWasm {
max_startup_gateway_waiting_period_ms: value
.max_startup_gateway_waiting_period_ms
.unwrap_or(def.max_startup_gateway_waiting_period_ms),
max_startup_network_waiting_period_ms: value
.max_startup_network_waiting_period_ms
.unwrap_or(def.max_startup_network_waiting_period_ms),
disable_refreshing: value.disable_refreshing.unwrap_or(def.disable_refreshing),
minimum_mixnode_performance: value
.minimum_mixnode_performance

Some files were not shown because too many files have changed in this diff Show More