Compare commits

...

23 Commits

Author SHA1 Message Date
Mark Sinclair 6131082cee Data Observatory: add optional coingecko API key 2026-02-20 14:55:41 +00:00
Mark Sinclair 8bd6a1006b Data Observatory: add more logging for coingecko API failures 2026-02-20 14:38:59 +00:00
Mark Sinclair 1678fb6b94 Data Observatory: add env var PG_CERT and bump version 2026-02-19 17:08:06 +00:00
import this 94a3599b4d [DOCs]: Fix missing diagnostic tool in developers menu (#6470)
* bump up stats and run prebuild

* fix typos
2026-02-19 15:08:04 +00:00
import this a6bc54461a [DOCs]: Diagnostic tool (#6467)
* create diagnostic-tool page

* add to menu

* add to list of tools

* syntax fix

* syntax fix

* syntax fix

* syntax fix

* rm old
2026-02-18 16:57:55 +00:00
Tommy Verrall 4f0c40dab7 Merge pull request #6464 from nymtech/otel-minimal-v2
Otel minimal v2
2026-02-18 14:23:35 +01:00
Tommy Verrall 3eff6e5e3b fix testthroughput 2026-02-18 11:06:42 +01:00
Tommy Verrall a519f4ccb8 pr feedback
- Moved OTel CLI options into a separate OtelArgs
- Otel is built behind the feature flag otel
- Store timing is in microseconds
- Restore comments to existing files
2026-02-18 10:48:54 +01:00
Tommy Verrall a3ba3bfc5a remove non OTEL work here 2026-02-17 10:17:22 +01:00
Tommy Verrall 988df7cff7 sampling to avoid costs
- add otel timeouts
2026-02-17 09:10:52 +01:00
Tommy Verrall 260f8e9714 revert docker/localnet to develop; localnet work to follow in separate PR 2026-02-17 08:37:49 +01:00
Tommy Verrall d28d0ac39e fix replay batch drop, harden error handling and scripts 2026-02-16 19:42:24 +01:00
Tommy Verrall dce4d6b34b otel: refactor key selection, add environment label, fix clippy 2026-02-16 19:13:11 +01:00
Tommy Verrall bc47e9a1b2 otel: explicit TLS config for https endpoints 2026-02-16 18:11:28 +01:00
Tommy Verrall 3b693741b2 Merge branch 'develop' of https://github.com/nymtech/nym into otel-minimal-v2 2026-02-16 16:41:16 +01:00
Tommy Verrall cb277fe487 otel: support signoz cloud ingestion key and TLS 2026-02-16 16:11:31 +01:00
Tommy Verrall 8bb29f4d07 localnet: add loadtest script and signoz docs 2026-02-16 15:44:55 +01:00
Tommy Verrall e753f24ed1 localnet: fix runtime and gateway flags 2026-02-16 15:21:45 +01:00
Tommy Verrall c7cd962627 localnet: multi-stage dockerfile 2026-02-16 14:45:05 +01:00
Tommy Verrall 00467e4440 fix upstream build: update lockfile and stabilise nym-lp 2026-02-16 14:11:40 +01:00
Tommy Verrall f3d1000472 Add gitignore 2026-02-16 13:57:04 +01:00
Tommy Verrall 597aae1a20 localnet: wire otel 2026-02-16 13:54:15 +01:00
Tommy Verrall 40a3cd28b7 otel: add tracing 2026-02-16 13:46:17 +01:00
27 changed files with 2534 additions and 1829 deletions
+1
View File
@@ -3,4 +3,5 @@
.gitignore
**/node_modules
**/target
target-otel
dist
+1
View File
@@ -76,3 +76,4 @@ CLAUDE.md
.claude/settings.json
/notes
/target-otel
Generated
+1627 -1626
View File
File diff suppressed because it is too large Load Diff
+5 -4
View File
@@ -309,8 +309,10 @@ nix = "0.30.1"
notify = "5.1.0"
num_enum = "0.7.5"
once_cell = "1.21.3"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
opentelemetry = "0.31.0"
opentelemetry_sdk = "0.31.0"
opentelemetry-otlp = "0.31.0"
tonic = "0.14.4"
parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
@@ -368,9 +370,8 @@ tower = "0.5.2"
tower-http = "0.6.6"
tracing = "0.1.41"
tracing-log = "0.2"
tracing-opentelemetry = "0.19.0"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = "0.3.20"
tracing-tree = "0.2.2"
tracing-indicatif = "0.3.9"
tracing-test = "0.2.5"
ts-rs = "10.1.0"
+13 -9
View File
@@ -19,12 +19,15 @@ serde_json = { workspace = true, optional = true }
## tracing
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
tracing-tree = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
opentelemetry-jaeger = { workspace = true, features = ["rt-tokio", "collector_client", "isahc_collector_client"], optional = true }
tracing-opentelemetry = { workspace = true, optional = true }
utoipa = { workspace = true, optional = true }
opentelemetry = { workspace = true, features = ["rt-tokio"], optional = true }
opentelemetry = { workspace = true, features = ["trace"], optional = true }
## otel-otlp (modern OTLP export to SigNoz/any OTLP collector)
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "trace", "tls-roots"], optional = true }
tonic = { workspace = true, optional = true }
[build-dependencies]
@@ -35,13 +38,14 @@ default = []
openapi = ["utoipa"]
output_format = ["serde_json", "dep:clap"]
bin_info_schema = ["schemars"]
basic_tracing = ["dep:tracing", "tracing-subscriber"]
tracing = [
basic_tracing = ["dep:tracing", "dep:tracing-subscriber"]
otel-otlp = [
"basic_tracing",
"tracing-tree",
"opentelemetry-jaeger",
"tracing-opentelemetry",
"opentelemetry",
"dep:opentelemetry",
"dep:opentelemetry_sdk",
"dep:opentelemetry-otlp",
"dep:tracing-opentelemetry",
"dep:tonic",
]
clap = ["dep:clap", "dep:clap_complete", "dep:clap_complete_fig"]
models = []
+97 -38
View File
@@ -4,16 +4,9 @@
use serde::{Deserialize, Serialize};
use std::io::IsTerminal;
#[cfg(feature = "tracing")]
pub use opentelemetry;
#[cfg(feature = "tracing")]
pub use opentelemetry_jaeger;
#[cfg(feature = "tracing")]
pub use tracing_opentelemetry;
// Re-export tracing_subscriber for consumers that need to compose layers
#[cfg(feature = "basic_tracing")]
pub use tracing_subscriber;
#[cfg(feature = "tracing")]
pub use tracing_tree;
#[derive(Debug, Default, Copy, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
@@ -69,40 +62,106 @@ pub fn setup_tracing_logger() {
build_tracing_logger().init()
}
// TODO: This has to be a macro, running it as a function does not work for the file_appender for some reason
#[cfg(feature = "tracing")]
#[macro_export]
macro_rules! setup_tracing {
($service_name: expr) => {
use nym_bin_common::logging::tracing_subscriber::layer::SubscriberExt;
use nym_bin_common::logging::tracing_subscriber::util::SubscriberInitExt;
/// Initialize an OpenTelemetry tracing layer that exports spans via OTLP/gRPC.
///
/// This produces a layer compatible with `tracing_subscriber::registry()` that
/// sends traces to any OTLP-compatible collector (SigNoz, Grafana Tempo, etc).
///
/// Returns both the tracing layer and the [`SdkTracerProvider`] so the caller
/// can invoke [`SdkTracerProvider::shutdown`] for graceful flush on exit.
///
/// # Arguments
/// * `service_name` - The service name reported to the collector (e.g. "nym-node")
/// * `endpoint` - The OTLP/gRPC collector endpoint (e.g. "http://localhost:4317"
/// or "https://ingest.eu.signoz.cloud:443" for SigNoz Cloud)
/// * `ingestion_key` - Optional SigNoz Cloud ingestion key. When provided, it is
/// sent as the `signoz-ingestion-key` gRPC metadata header on every export.
/// * `environment` - Deployment environment label (e.g. "sandbox", "mainnet", "canary").
/// Attached as the `deployment.environment` OTel resource attribute.
/// * `sample_ratio` - Trace sampling ratio in 0.0..=1.0 (e.g. 0.1 = 10% of traces).
/// Used to limit cost when exporting from many nodes; clamped to [0.0, 1.0].
/// * `export_timeout_secs` - Timeout in seconds for each OTLP export batch. Prevents
/// unbounded blocking if the collector is slow or unreachable.
#[cfg(feature = "otel-otlp")]
pub fn init_otel_layer<S>(
service_name: &str,
endpoint: &str,
ingestion_key: Option<&str>,
environment: &str,
sample_ratio: f64,
export_timeout_secs: u64,
) -> Result<
(
tracing_opentelemetry::OpenTelemetryLayer<S, opentelemetry_sdk::trace::SdkTracer>,
opentelemetry_sdk::trace::SdkTracerProvider,
),
Box<dyn std::error::Error + Send + Sync>,
>
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::WithTonicConfig;
use opentelemetry_sdk::trace::Sampler;
use std::time::Duration;
let registry = nym_bin_common::logging::tracing_subscriber::Registry::default()
.with(nym_bin_common::logging::tracing_subscriber::EnvFilter::from_default_env())
.with(
nym_bin_common::logging::tracing_tree::HierarchicalLayer::new(4)
.with_targets(true)
.with_bracketed_fields(true),
);
// Validate endpoint URI early to fail with a clear message
if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") {
return Err(format!(
"invalid OTLP endpoint URI: {endpoint} (must start with http:// or https://)"
)
.into());
}
let tracer = nym_bin_common::logging::opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://44.199.230.10:14268/api/traces")
.with_service_name($service_name)
.with_isahc()
.with_trace_config(
nym_bin_common::logging::opentelemetry::sdk::trace::config().with_sampler(
nym_bin_common::logging::opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(
0.1,
),
),
)
.install_batch(nym_bin_common::logging::opentelemetry::runtime::Tokio)
.expect("Could not init tracer");
let sample_ratio_clamped = sample_ratio.clamp(0.0, 1.0);
let telemetry = nym_bin_common::logging::tracing_opentelemetry::layer().with_tracer(tracer);
let mut builder = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_timeout(Duration::from_secs(export_timeout_secs));
registry.with(telemetry).init();
};
// Explicitly configure TLS when the endpoint uses HTTPS
if endpoint.starts_with("https://") {
builder =
builder.with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
}
if let Some(key) = ingestion_key {
let mut metadata = tonic::metadata::MetadataMap::new();
metadata.insert(
"signoz-ingestion-key",
key.parse()
.map_err(|_| "invalid ingestion key format (value redacted)")?,
);
builder = builder.with_metadata(metadata);
}
let exporter = builder
.build()
.map_err(|e| format!("failed to build OTLP exporter for endpoint {endpoint}: {e}"))?;
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_sampler(Sampler::TraceIdRatioBased(sample_ratio_clamped))
.with_batch_exporter(exporter)
.with_resource(
opentelemetry_sdk::Resource::builder()
.with_service_name(service_name.to_owned())
.with_attribute(opentelemetry::KeyValue::new(
"deployment.environment",
environment.to_owned(),
))
.build(),
)
.build();
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
let tracer = tracer_provider.tracer(service_name.to_owned());
Ok((
tracing_opentelemetry::layer().with_tracer(tracer),
tracer_provider,
))
}
pub fn banner(crate_name: &str, crate_version: &str) -> String {
+80 -26
View File
@@ -128,54 +128,95 @@ impl ManagedConnection {
async fn run(self) {
let address = self.address;
let reconnection_attempt = self.current_reconnection.load(Ordering::Acquire);
let connect_start = tokio::time::Instant::now();
let connection_fut = TcpStream::connect(address);
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", self.address);
let connect_ms = connect_start.elapsed().as_millis() as u64;
debug!(
peer = %address,
connect_ms,
"Managed to establish connection to {}", self.address
);
let noise_start = tokio::time::Instant::now();
let noise_stream =
match upgrade_noise_initiator(stream, &self.noise_config).await {
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {address} - {err}");
// we failed to finish the noise handshake - increase reconnection attempt
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
warn!(
event = "connection.failed.noise",
peer = %address,
error = %err,
connect_ms,
noise_handshake_ms,
reconnection_attempt,
exit_reason = "noise_error",
"Failed to perform Noise initiator handshake with {address}"
);
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
// if we managed to connect AND do the noise handshake, reset the reconnection count (whatever it might have been)
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
self.current_reconnection.store(0, Ordering::Release);
debug!("Noise initiator handshake completed for {:?}", address);
debug!(
peer = %address,
connect_ms,
noise_handshake_ms,
"Noise initiator handshake completed for {:?}", address
);
Framed::new(noise_stream, NymCodec)
}
Err(err) => {
debug!("failed to establish connection to {address} (err: {err})",);
let connect_ms = connect_start.elapsed().as_millis() as u64;
warn!(
event = "connection.failed.connect",
peer = %address,
error = %err,
connect_ms,
reconnection_attempt,
exit_reason = "connect_error",
"failed to establish connection to {address}"
);
return;
}
},
Err(_) => {
debug!(
let connect_ms = connect_start.elapsed().as_millis() as u64;
warn!(
event = "connection.failed.timeout",
peer = %address,
timeout_ms = self.connection_timeout.as_millis() as u64,
connect_ms,
reconnection_attempt,
exit_reason = "timeout",
"failed to connect to {address} within {:?}",
self.connection_timeout
);
// we failed to connect - increase reconnection attempt
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {address}: {err}");
warn!(
event = "connection.forward_error",
peer = %address,
error = %err,
exit_reason = "forward_error",
"Failed to forward packets to {address}: {err}"
);
}
debug!(
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
peer = %address,
exit_reason = "sender_dropped",
"connection manager to {address} finished"
);
}
}
@@ -272,16 +313,18 @@ impl SendWithoutResponse for Client {
trace!("Sending packet to {address}");
// TODO: optimisation for the future: rather than constantly using legacy encoding,
// once we're addressing by node_id (and thus have full node info here),
// we could simply infer supported encoding based on their version
// use the mix packet type / flags to pick encoding per packet
let framed_packet =
FramedNymPacket::from_mix_packet(packet, self.config.use_legacy_packet_encoding);
let Some(sender) = self.active_connections.get_mut(&address) else {
// there was never a connection to begin with
debug!("establishing initial connection to {address}");
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
debug!(
event = "mixclient.try_send",
peer = %address,
result = "not_connected",
"establishing initial connection to {address}"
);
self.make_connection(address, framed_packet);
return Err(io::Error::new(
io::ErrorKind::NotConnected,
@@ -289,15 +332,24 @@ impl SendWithoutResponse for Client {
));
};
let channel_capacity = sender.channel.max_capacity();
let channel_available = sender.channel.capacity();
let channel_used = channel_capacity - channel_available;
let sending_res = sender.channel.try_send(framed_packet);
drop(sender);
sending_res.map_err(|err| {
match err {
TrySendError::Full(_) => {
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
warn!(
event = "mixclient.try_send",
peer = %address,
result = "full_dropped",
channel_capacity,
channel_used,
"dropping packet: connection buffer to {address} is full ({channel_used}/{channel_capacity})"
);
io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
@@ -305,11 +357,13 @@ impl SendWithoutResponse for Client {
}
TrySendError::Closed(dropped) => {
debug!(
"Connection to {address} seems to be dead. attempting to re-establish it...",
event = "mixclient.try_send",
peer = %address,
result = "closed_reconnecting",
channel_capacity,
channel_used,
"connection to {address} dead, attempting re-establishment"
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, dropped);
io::Error::new(
io::ErrorKind::ConnectionAborted,
@@ -1 +1 @@
Wednesday, February 11th 2026, 11:35:05 UTC
Thursday, February 19th 2026, 13:59:34 UTC
@@ -11,7 +11,7 @@ options:
--no_routing_history Display node stats without routing history
--no_verloc_metrics Display node stats without verloc metrics
-m, --markdown Display results in markdown format
-o, --output [OUTPUT]
-o [OUTPUT], --output [OUTPUT]
Save results to file (in current dir or supply with
path without filename)
```
@@ -12,7 +12,8 @@ usage: nym-node-cli install [-h] [-V] [-d BRANCH] [-v]
options:
-h, --help show this help message and exit
-V, --version show program's version number and exit
-d, --dev BRANCH Define github branch (default: develop)
-d BRANCH, --dev BRANCH
Define github branch (default: develop)
-v, --verbose Show full error tracebacks
--mode {mixnode,entry-gateway,exit-gateway}
Node mode: 'mixnode', 'entry-gateway', or 'exit-
@@ -62,8 +62,6 @@ Options:
Tunnel port announced to external clients wishing to connect to the wireguard interface. Useful in the instances where the node is behind a proxy [env: NYMNODE_WG_ANNOUNCED_PORT=]
--wireguard-private-network-prefix <WIREGUARD_PRIVATE_NETWORK_PREFIX>
The prefix denoting the maximum number of the clients that can be connected via Wireguard. The maximum value for IPv4 is 32 and for IPv6 is 128 [env: NYMNODE_WG_PRIVATE_NETWORK_PREFIX=]
--wireguard-userspace <WIREGUARD_USERSPACE>
Use userspace implementation of WireGuard (wireguard-go) instead of kernel module. Useful in containerized environments without kernel WireGuard support [env: NYMNODE_WG_USERSPACE=] [possible values: true, false]
--verloc-bind-address <VERLOC_BIND_ADDRESS>
Socket address this node will use for binding its verloc API. default: `[::]:1790` [env: NYMNODE_VERLOC_BIND_ADDRESS=]
--verloc-announce-port <VERLOC_ANNOUNCE_PORT>
@@ -82,8 +80,6 @@ Options:
Endpoint to query to retrieve current upgrade mode attestation. This argument should never be set outside testnets and local networks [env: NYMNODE_UPGRADE_MODE_ATTESTATION_URL=]
--upgrade-mode-attester-public-key <UPGRADE_MODE_ATTESTER_PUBLIC_KEY>
Expected public key of the entity signing the published attestation. This argument should never be set outside testnets and local networks [env: NYMNODE_UPGRADE_MODE_ATTESTER_PUBKEY=]
--lp-use-mock-ecash <LP_USE_MOCK_ECASH>
Use mock ecash manager for LP testing. WARNING: Only use this for local testing! Never enable in production. When enabled, the LP listener will accept any credential without blockchain verification [env: NYMNODE_LP_USE_MOCK_ECASH=] [possible values: true, false]
--upstream-exit-policy-url <UPSTREAM_EXIT_POLICY_URL>
Specifies the url for an upstream source of the exit policy used by this node [env: NYMNODE_UPSTREAM_EXIT_POLICY=]
--open-proxy <OPEN_PROXY>
@@ -1,5 +1,6 @@
{
"nym-cli": "Nym-cli",
"diagnostic-tool": "Diagnostic Tool",
"echo-server": "Echo Server",
"standalone-tcpproxy": "TcpProxy Binaries (Standalone)"
}
@@ -0,0 +1,134 @@
import { Steps } from 'nextra/components';
# Diagnostic Tool
The Diagnostic Tool is a standalone binary designed to perform various network tests, including DNS, HTTP, and gateway connectivity tests. This tool helps diagnose connectivity issues and provides insights into network performance.
Its also possible to run it within the daemon with the same CLI interface.
## Download Binary
To get `nym-diagnostic` follow these steps:
<Steps>
###### 1. Download `nym-vpn-core`
- Navigate to [github.com/nymtech/nym-vpn-client/releases](https://github.com/nymtech/nym-vpn-client/releases)
- Find latest `nym-vpn-core-<VERSION>`
- Download version for your system
###### 2. Install or extract and make executable
- If you downloaded `.deb` installer, install it with this command:
```sh
sudo dpkg -i <FILE_NAME>
```
- If you downloaded `.tar.gz`, in terminal you can extract the file with
```sh
tar -xvf <FILE_NAME>
```
- Navigate inside the directory and make executable:
```sh
cd nym-vpn-core-<VERSION>
chmod +x ./*
```
</ Steps>
## CLI Usage
The Diagnostic Tool can be executed from the command line interface (CLI). Below are the usage instructions and options available. Read in the chapter [*Tests Performed*](#tests-performed) about the purpose and outcome of these commands.
### Command Syntax
```sh
./nym-diagnostic [command] [options]
./nym-vpnc diagnostic [command] [options]
```
#### `run` command arguments
The most useful command is `run`, here are the options for that command:
```sh
-h, --help Display help information and exit.
--skip-dns Skip the DNS tests
--skip-http Skip the HTTP tests
--gateway <ID_KEY> Run the gateway connectivity test on the given gateway. Skip those tests if not provided
-v, --verbose Enable verbose output for detailed logging.
```
#### `register` command arguments
Command `register` requires valid credential. Here are the options for that command:
```sh
--gateway <ID_KEY> Register to the given gateway
--storage-path Path to the directory containing the credentials database. If it is not valid registration will be skipped.
--skip-wireguard Skip Wireguard tests
```
### Command Examples
- Run all tests on a gateway:
```sh
./nym-diagnostic run --gateway <ID_KEY>
```
- Run the DNS tests only:
```sh
./nym-diagnostic run --skip-http
```
- Register to a gateway:
```sh
sudo ./nym-diagnostic register --gateway <ID_KEY> --storage-path /var/lib/nym-vpnd/mainnet
# sudo is required to read the database
```
- You can also run DNS and HTTP tests from `nym-vpnc` (installation [here](/developers/nymvpncli)):
```sh
./nym-vpnc diagnostic run
```
## Tests Performed
The Diagnostic Tool runs the following tests:
### 1. DNS Test
- **Purpose**: To check the resolution DNS availability.
- **Process**: We try to resolve all the domain names present in a given nym network environment with different DNS configurations
- **Output**: Displays the resolved IP address and the time taken for the resolution.
### 2. HTTP Test
- **Purpose**: To verify the accessibility of the NymVPN API.
- **Process**: The tool query the `health` endpoint as well as the `nodes/described` endpoint.
- **Output**: Displays the response of the `health` endpoint, the time skew and the number of nodes in the network (sanity check)
### 3. Gateway Test
- **Purpose**: To check the connectivity to a given gateway.
- **Process**: The tool fetches information about the gateway, then establishes a TCP connection, upgrades it to WS and sends a request
- **Output**: Display the gateway reported information, the status of the connections and the WS response.
### 4. Registration Test
- **Purpose:** To check the correctness of the registration process.
- **Process:** The tool tries to build a mixnet client to the provided gateway and then tries to register to the entry authenticator
- **Output:** Display the status of the different steps
- **Caveat:** This test requires a credential to be spent, which is why it is available as a separate command only
### 5. Wireguard Test
- **Purpose:** To check the soundness of a wireguard connection
- **Process:** The tool uses the registration data from the previous step to establish a wireguard connection and ping an IP.
- **Output:** Display the ping RTTs and any error that might have happened
## Reports
Reports are logged in a JSON format and also returned by the commands for a future use
@@ -62,12 +62,13 @@ There are multiple ways to monitor performance of nodes and the machines on whic
### Guides to Setup Own Metrics
A list of different scripts, templates and guides for easier navigation:
A list of different tools, templates and guides for easier navigation:
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx): a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
* [Diagnostic Tool](/developers/tools/diagnostic-tool): diagnose connectivity issues and provides insights into network performance
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx) - a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
* [Prometheus and Grafana](performance-and-testing/prometheus-grafana.mdx) self-hosted setup
* [Nym-node CPU cron service](https://gist.github.com/tommyv1987/97e939a7adf491333d686a8eaa68d4bd) - an easy bash script by Nym core developer [@tommy1987](https://gist.github.com/tommyv1987), designed to monitor a CPU usage of your node, running locally
* Nym's script [`prom_targets.py`](https://github.com/nymtech/nym/blob/develop/scripts/prom_targets.py) - a useful python program to request data from API and can be run on its own or plugged to more sophisticated flows
### Collecting Testing Metrics
@@ -202,3 +202,13 @@ PING_RETRIES=10 PING_TIMEOUT=5 CONCURRENCY=16 ./test-nodes-pings.sh
You can look up the IPs from `ping_not_working.csv`, using some online database, like [ipinfo.io](https://ipinfo.io).
Feel invited to share the outcome with Nym team, mentors and the rest of the operators in our [Matrix Node Operators channel](https://matrix.to/#/#operators:nymtech.chat).
## Guides to Setup Own Metrics
A list of different tools, templates and guides for easier navigation:
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx): a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
* [Diagnostic Tool](/developers/tools/diagnostic-tool): diagnose connectivity issues and provides insights into network performance
* [Prometheus and Grafana](performance-and-testing/prometheus-grafana.mdx) self-hosted setup
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-data-observatory"
version = "1.0.1"
version = "1.0.4"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
+10 -1
View File
@@ -1,5 +1,6 @@
use anyhow::{Result, anyhow};
use sqlx::{Postgres, migrate::Migrator, postgres::PgConnectOptions};
use std::env;
use std::str::FromStr;
use tracing::info;
@@ -19,7 +20,15 @@ pub(crate) struct Storage {
impl Storage {
pub async fn init(connection_url: String) -> Result<Self> {
let connect_options = PgConnectOptions::from_str(&connection_url)?;
let mut connect_options = PgConnectOptions::from_str(&connection_url)?;
let ssl_cert_path = env::var("PG_CERT").ok();
if let Some(ssl_cert) = ssl_cert_path {
connect_options = connect_options
.ssl_mode(sqlx::postgres::PgSslMode::Require)
.ssl_root_cert(ssl_cert);
}
let pool = DbPool::connect_with(connect_options)
.await
+19 -4
View File
@@ -3,6 +3,7 @@ use crate::db::{
queries::price::insert_nym_prices,
};
use core::str;
use std::env;
use tokio::time::Duration;
use crate::db::DbPool;
@@ -29,10 +30,24 @@ impl PriceScraper {
async fn get_coingecko_prices(&self) -> anyhow::Result<CoingeckoPriceResponse> {
tracing::info!("💰 Fetching CoinGecko prices from {COINGECKO_API_URL}");
let response = reqwest::get(COINGECKO_API_URL)
.await?
.json::<CoingeckoPriceResponse>()
.await;
let mut url = COINGECKO_API_URL.to_string();
let coin_gecko_api_key = env::var("COINGECKO_API_KEY").ok();
if let Some(api_key) = coin_gecko_api_key {
url = format!("{url}&x_cg_demo_api_key={api_key}")
}
let response = reqwest::get(url).await?;
if !response.status().is_success() {
tracing::error!(
"CoinGecko price query returned error: {}",
response.status()
);
}
let response = response.json::<CoingeckoPriceResponse>().await;
tracing::info!("Got response {:?}", response);
match response {
+3
View File
@@ -40,6 +40,8 @@ thiserror.workspace = true
tracing.workspace = true
tracing-indicatif = { workspace = true }
tracing-subscriber.workspace = true
opentelemetry = { workspace = true, features = ["trace"], optional = true }
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
tokio = { workspace = true, features = ["macros", "sync", "rt-multi-thread"] }
tokio-util = { workspace = true, features = ["codec"] }
tokio-stream = { workspace = true }
@@ -135,6 +137,7 @@ rand_chacha = { workspace = true }
[features]
tokio-console = ["console-subscriber", "nym-task/tokio-tracing"]
otel = ["nym-bin-common/otel-otlp", "dep:opentelemetry", "dep:opentelemetry_sdk"]
[lints]
workspace = true
+102 -28
View File
@@ -8,7 +8,6 @@ use crate::cli::commands::{
use crate::env::vars::{NYMNODE_CONFIG_ENV_FILE_ARG, NYMNODE_NO_BANNER_ARG};
use clap::{Args, Parser, Subcommand};
use nym_bin_common::bin_info;
use std::future::Future;
use std::sync::OnceLock;
pub(crate) mod commands;
@@ -22,6 +21,43 @@ fn pretty_build_info_static() -> &'static str {
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
/// OpenTelemetry-related CLI arguments. Only present when built with the `otel` feature.
#[cfg(feature = "otel")]
#[derive(Args, Debug, Clone)]
pub(crate) struct OtelArgs {
/// Enable OpenTelemetry tracing export via OTLP/gRPC.
#[clap(long, env = "NYMNODE_OTEL_ENABLE")]
pub(crate) otel: bool,
/// OpenTelemetry OTLP collector endpoint (gRPC).
/// Only used when --otel is enabled.
/// For SigNoz Cloud use https://ingest.<region>.signoz.cloud:443
#[clap(
long,
env = "NYMNODE_OTEL_ENDPOINT",
default_value = "http://localhost:4317"
)]
pub(crate) otel_endpoint: String,
/// SigNoz Cloud ingestion key for authenticated OTLP export.
/// Only needed for SigNoz Cloud (not self-hosted).
#[clap(long, env = "NYMNODE_OTEL_KEY")]
pub(crate) otel_key: Option<String>,
/// Deployment environment label attached to all exported traces.
/// Used to distinguish sandbox / mainnet / canary in the OTel backend.
#[clap(long, env = "NYMNODE_OTEL_ENV", default_value = "mainnet")]
pub(crate) otel_env: String,
/// Trace sampling ratio (0.0 to 1.0). e.g. 0.1 = 10%% of traces exported. Reduces cost.
#[clap(long, env = "NYMNODE_OTEL_SAMPLE_RATIO", default_value = "0.1")]
pub(crate) otel_sample_ratio: f64,
/// Timeout in seconds for each OTLP export batch. Prevents unbounded blocking.
#[clap(long, env = "NYMNODE_OTEL_EXPORT_TIMEOUT", default_value = "10")]
pub(crate) otel_export_timeout: u64,
}
#[derive(Parser, Debug)]
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
pub(crate) struct Cli {
@@ -40,44 +76,82 @@ pub(crate) struct Cli {
)]
pub(crate) no_banner: bool,
#[cfg(feature = "otel")]
#[clap(flatten)]
pub(crate) otel: OtelArgs,
#[clap(subcommand)]
command: Commands,
}
impl Cli {
fn execute_async<F: Future>(fut: F) -> anyhow::Result<F::Output> {
Ok(tokio::runtime::Builder::new_multi_thread()
pub(crate) fn execute(self) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(fut))
.build()?;
// Set up tracing inside the runtime so the OTel batch exporter (when enabled)
// can spawn its background tasks on the tokio reactor.
let use_otel = matches!(self.command, Commands::Run(..));
let _otel_guard = runtime.block_on(async { self.setup_logging(use_otel) })?;
// `_otel_guard` is dropped at function exit, flushing pending spans via its Drop impl
runtime.block_on(async {
match self.command {
Commands::BuildInfo(args) => build_info::execute(args)?,
Commands::BondingInformation(args) => bonding_information::execute(args).await?,
Commands::NodeDetails(args) => node_details::execute(args).await?,
Commands::Run(args) => run::execute(*args).await?,
Commands::Migrate(args) => migrate::execute(*args)?,
Commands::Sign(args) => sign::execute(args).await?,
Commands::TestThroughput(args) => test_throughput::execute(args)?,
Commands::UnsafeResetSphinxKeys(args) => reset_sphinx_keys::execute(args).await?,
Commands::Debug(debug) => match debug.command {
DebugCommands::ResetProvidersGatewayDbs(args) => {
debug::reset_providers_dbs::execute(args).await?
}
},
}
Ok::<(), anyhow::Error>(())
})
}
pub(crate) fn execute(self) -> anyhow::Result<()> {
// NOTE: `test_throughput` sets up its own logger as it has to include additional layers
if !matches!(self.command, Commands::TestThroughput(..)) {
crate::logging::setup_tracing_logger()?;
#[cfg(feature = "otel")]
fn build_otel_config(&self) -> Option<crate::logging::OtelConfig> {
if self.otel.otel {
Some(crate::logging::OtelConfig {
endpoint: self.otel.otel_endpoint.clone(),
service_name: "nym-node".to_string(),
ingestion_key: self.otel.otel_key.clone(),
environment: self.otel.otel_env.clone(),
sample_ratio: self.otel.otel_sample_ratio,
export_timeout_secs: self.otel.otel_export_timeout,
})
} else {
None
}
}
match self.command {
Commands::BuildInfo(args) => build_info::execute(args)?,
Commands::BondingInformation(args) => {
{ Self::execute_async(bonding_information::execute(args))? }?
}
Commands::NodeDetails(args) => { Self::execute_async(node_details::execute(args))? }?,
Commands::Run(args) => { Self::execute_async(run::execute(*args))? }?,
Commands::Migrate(args) => migrate::execute(*args)?,
Commands::Sign(args) => { Self::execute_async(sign::execute(args))? }?,
Commands::TestThroughput(args) => test_throughput::execute(args)?,
Commands::UnsafeResetSphinxKeys(args) => {
{ Self::execute_async(reset_sphinx_keys::execute(args))? }?
}
Commands::Debug(debug) => match debug.command {
DebugCommands::ResetProvidersGatewayDbs(args) => {
{ Self::execute_async(debug::reset_providers_dbs::execute(args))? }?
}
},
#[cfg(feature = "otel")]
fn setup_logging(&self, use_otel: bool) -> anyhow::Result<Option<crate::logging::OtelGuard>> {
if matches!(self.command, Commands::TestThroughput(..)) {
return Ok(None);
}
Ok(())
let otel_config = if use_otel {
self.build_otel_config()
} else {
None
};
crate::logging::setup_tracing_logger(otel_config)
}
#[cfg(not(feature = "otel"))]
fn setup_logging(&self, _use_otel: bool) -> anyhow::Result<Option<()>> {
if matches!(self.command, Commands::TestThroughput(..)) {
return Ok(None);
}
crate::logging::setup_tracing_logger()?;
Ok(None)
}
}
+111 -1
View File
@@ -7,6 +7,42 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
/// Configuration for OpenTelemetry OTLP export.
#[cfg(feature = "otel")]
pub(crate) struct OtelConfig {
/// OTLP/gRPC collector endpoint, e.g. `http://localhost:4317`
/// or `https://ingest.eu.signoz.cloud:443` for SigNoz Cloud.
pub endpoint: String,
/// Service name reported to the collector (appears in SigNoz "Services" view).
pub service_name: String,
/// Optional SigNoz Cloud ingestion key for authenticated export.
/// Sent as the `signoz-ingestion-key` gRPC metadata header.
pub ingestion_key: Option<String>,
/// Deployment environment label, e.g. `mainnet`, `sandbox`, `canary`.
/// Attached as the `deployment.environment` OTel resource attribute.
pub environment: String,
/// Trace sampling ratio in 0.0..=1.0 (e.g. 0.1 = 10% of traces). Used to limit cost.
pub sample_ratio: f64,
/// Timeout in seconds for each OTLP export batch. Prevents unbounded blocking.
pub export_timeout_secs: u64,
}
/// Handle returned when OTel is active. Flushes pending spans on drop
/// to prevent telemetry loss during panics or early exits.
#[cfg(feature = "otel")]
pub(crate) struct OtelGuard {
pub provider: opentelemetry_sdk::trace::SdkTracerProvider,
}
#[cfg(feature = "otel")]
impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(e) = self.provider.shutdown() {
eprintln!("OpenTelemetry shutdown error in Drop: {e}");
}
}
}
pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
directive.into().parse().map_err(From::from)
@@ -22,12 +58,86 @@ pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
Ok(filter)
}
/// Initialise the tracing subscriber stack.
///
/// When the `otel` feature is enabled **and** an `OtelConfig` is supplied, an
/// OTLP exporter layer is added and the returned `OtelGuard` must be used to
/// flush pending spans on shutdown.
#[cfg(feature = "otel")]
pub(crate) fn setup_tracing_logger(otel: Option<OtelConfig>) -> anyhow::Result<Option<OtelGuard>> {
let stderr_layer =
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
let console_layer = console_subscriber::spawn();
if let Some(otel_config) = otel {
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
&otel_config.service_name,
&otel_config.endpoint,
otel_config.ingestion_key.as_deref(),
&otel_config.environment,
otel_config.sample_ratio,
otel_config.export_timeout_secs,
).map_err(|e| anyhow::anyhow!(
"failed to initialise OpenTelemetry exporter (endpoint: {}, service: {}): {e}",
otel_config.endpoint,
otel_config.service_name,
))?;
tracing_subscriber::registry()
.with(console_layer)
.with(stderr_layer)
.with(otel_layer)
.init();
Ok(Some(OtelGuard { provider }))
} else {
tracing_subscriber::registry()
.with(console_layer)
.with(stderr_layer)
.init();
Ok(None)
}
} else {
if let Some(otel_config) = otel {
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
&otel_config.service_name,
&otel_config.endpoint,
otel_config.ingestion_key.as_deref(),
&otel_config.environment,
otel_config.sample_ratio,
otel_config.export_timeout_secs,
).map_err(|e| anyhow::anyhow!(
"failed to initialise OpenTelemetry exporter (endpoint: {}, service: {}): {e}",
otel_config.endpoint,
otel_config.service_name,
))?;
tracing_subscriber::registry()
.with(stderr_layer)
.with(otel_layer)
.init();
Ok(Some(OtelGuard { provider }))
} else {
tracing_subscriber::registry()
.with(stderr_layer)
.init();
Ok(None)
}
}}
}
/// Non-OTel variant -- identical subscriber stack without the OTLP layer.
#[cfg(not(feature = "otel"))]
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
let stderr_layer =
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
let console_layer = console_subscriber::spawn();
tracing_subscriber::registry()
+227 -65
View File
@@ -1,6 +1,7 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::key_rotation::active_keys::SphinxKeyGuard;
use crate::node::mixnet::shared::SharedData;
use futures::StreamExt;
use nym_noise::connection::Connection;
@@ -20,7 +21,10 @@ use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
use tracing::{debug, error, instrument, trace, warn};
use tracing::{Span, debug, error, instrument, trace, warn};
/// How often (in packets) the stream-level span updates its packet count.
const SPAN_UPDATE_INTERVAL: u64 = 10_000;
struct PendingReplayCheckPackets {
// map of rotation id used for packet creation to the packets
@@ -51,6 +55,10 @@ impl PendingReplayCheckPackets {
.push(packet.packet)
}
fn total_count(&self) -> usize {
self.packets.values().map(|v| v.len()).sum()
}
fn replay_tags(&self) -> HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>> {
let mut replay_tags = HashMap::with_capacity(self.packets.len());
'outer: for (rotation_id, packets) in &self.packets {
@@ -130,20 +138,54 @@ impl ConnectionHandler {
Some(now + delay)
}
#[instrument(
name = "mixnode.forward_packet",
skip(self, mix_packet, delay),
level = "debug",
fields(
remote_addr = %self.remote_address,
delay_ms = tracing::field::Empty,
)
)]
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
if !self.shared.processing_config.forward_hop_processing_enabled {
trace!("this nym-node does not support forward hop packets");
warn!(
event = "packet.dropped.forward_disabled",
remote_addr = %self.remote_address,
"dropping packet: forward hop processing disabled"
);
self.shared.dropped_forward_packet(self.remote_address.ip());
return;
}
let forward_instant = self.create_delay_target(now, delay);
if let Some(target) = forward_instant {
Span::current().record(
"delay_ms",
target.saturating_duration_since(now).as_millis() as u64,
);
}
self.shared.forward_mix_packet(mix_packet, forward_instant);
}
#[instrument(
name = "mixnode.final_hop",
skip(self, final_hop_data),
level = "debug",
fields(
remote_addr = %self.remote_address,
client_online,
disk_fallback = false,
ack_forwarded = false,
)
)]
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop) {
if !self.shared.processing_config.final_hop_processing_enabled {
trace!("this nym-node does not support final hop packets");
warn!(
event = "packet.dropped.final_hop_disabled",
remote_addr = %self.remote_address,
"dropping packet: final hop processing disabled"
);
self.shared
.dropped_final_hop_packet(self.remote_address.ip());
return;
@@ -151,11 +193,13 @@ impl ConnectionHandler {
let client = final_hop_data.destination;
let message = final_hop_data.message;
let has_ack = final_hop_data.forward_ack.is_some();
// if possible attempt to push message directly to the client
match self.shared.try_push_message_to_client(client, message) {
Err(unsent_plaintext) => {
// if that failed, store it on disk (to be 🔥 soon...)
// if that failed, store it on disk
Span::current().record("client_online", false);
match self
.shared
.store_processed_packet_payload(client, unsent_plaintext)
@@ -163,6 +207,7 @@ impl ConnectionHandler {
{
Err(err) => error!("Failed to store client data - {err}"),
Ok(_) => {
Span::current().record("disk_fallback", true);
self.shared
.metrics
.mixnet
@@ -172,13 +217,18 @@ impl ConnectionHandler {
}
}
}
Ok(_) => trace!("Pushed received packet to {client}"),
Ok(_) => {
Span::current().record("client_online", true);
trace!("Pushed received packet to {client}");
}
}
// if we managed to either push message directly to the [online] client or store it at
// its inbox, it means that it must exist at this gateway, hence we can send the
// received ack back into the network
// disk, forward the ack
self.shared.forward_ack_packet(final_hop_data.forward_ack);
if has_ack {
Span::current().record("ack_forwarded", true);
}
}
fn within_deferral_threshold(&self, now: Instant) -> bool {
@@ -206,32 +256,86 @@ impl ConnectionHandler {
if !time_threshold {
warn!(
"{}: time failure - {}",
event = "replay_detection.deferral_exceeded",
threshold_type = "time",
deferred_count = self.pending_packets.total_count(),
deferral_ms = now.saturating_duration_since(self.pending_packets.last_acquired_mutex).as_millis() as u64,
remote_addr = %self.remote_address,
"{}: time deferral threshold exceeded with {} pending packets",
self.remote_address,
self.pending_packets.packets.len()
self.pending_packets.total_count()
)
}
if !count_threshold {
warn!("{}, count failure", self.remote_address)
warn!(
event = "replay_detection.deferral_exceeded",
threshold_type = "count",
deferred_count = self.pending_packets.total_count(),
remote_addr = %self.remote_address,
"{}: count deferral threshold exceeded",
self.remote_address
)
}
time_threshold && count_threshold
}
/// Resolve the sphinx key for the given rotation, recording the rotation
/// label on the current tracing span. Returns `ExpiredKey` if the requested
/// odd/even key has already been rotated out.
fn resolve_rotation_key(
&self,
rotation: SphinxKeyRotation,
) -> Result<SphinxKeyGuard, PacketProcessingError> {
let rotation_label = match rotation {
SphinxKeyRotation::Unknown => "unknown",
SphinxKeyRotation::OddRotation => "odd",
SphinxKeyRotation::EvenRotation => "even",
};
Span::current().record("key_rotation", rotation_label);
match rotation {
SphinxKeyRotation::Unknown => Ok(self.shared.sphinx_keys.primary()),
SphinxKeyRotation::OddRotation => self.shared.sphinx_keys.odd().ok_or_else(|| {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "odd",
remote_addr = %self.remote_address,
"dropping packet: odd key rotation expired"
);
PacketProcessingError::ExpiredKey
}),
SphinxKeyRotation::EvenRotation => self.shared.sphinx_keys.even().ok_or_else(|| {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "even",
remote_addr = %self.remote_address,
"dropping packet: even key rotation expired"
);
PacketProcessingError::ExpiredKey
}),
}
}
#[instrument(
name = "mixnode.sphinx_partial_unwrap",
skip(self, packet),
level = "debug",
fields(key_rotation, unwrap_result,)
)]
fn try_partially_unwrap_packet(
&self,
packet: FramedNymPacket,
) -> Result<PartialyUnwrappedPacketWithKeyRotation, PacketProcessingError> {
// based on the received sphinx key rotation information,
// attempt to choose appropriate key for processing the packet
match packet.header().key_rotation {
let rotation = packet.header().key_rotation;
let result = match rotation {
SphinxKeyRotation::Unknown => {
let primary = self.shared.sphinx_keys.primary();
// Unknown rotation: try primary, fallback to secondary
let primary = self.resolve_rotation_key(rotation)?;
let primary_rotation = primary.rotation_id();
// we have to try both keys, start with the primary as it has higher likelihood of being correct
// if let Ok(partially_unwrapped) = PartiallyUnwrappedPacket::new()
match PartiallyUnwrappedPacket::new(packet, primary.inner().as_ref()) {
Ok(unwrapped_packet) => {
Ok(unwrapped_packet.with_key_rotation(primary_rotation))
@@ -248,25 +352,17 @@ impl ConnectionHandler {
}
}
}
SphinxKeyRotation::OddRotation => {
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
return Err(PacketProcessingError::ExpiredKey);
};
let odd_rotation = odd_key.rotation_id();
PartiallyUnwrappedPacket::new(packet, odd_key.inner().as_ref())
_ => {
let key = self.resolve_rotation_key(rotation)?;
let rotation_id = key.rotation_id();
PartiallyUnwrappedPacket::new(packet, key.inner().as_ref())
.map_err(|(_, err)| err)
.map(|p| p.with_key_rotation(odd_rotation))
.map(|p| p.with_key_rotation(rotation_id))
}
SphinxKeyRotation::EvenRotation => {
let Some(even_key) = self.shared.sphinx_keys.even() else {
return Err(PacketProcessingError::ExpiredKey);
};
let even_rotation = even_key.rotation_id();
PartiallyUnwrappedPacket::new(packet, even_key.inner().as_ref())
.map_err(|(_, err)| err)
.map(|p| p.with_key_rotation(even_rotation))
}
}
};
Span::current().record("unwrap_result", if result.is_ok() { "ok" } else { "err" });
result
}
async fn handle_received_packet_with_replay_detection(
@@ -280,6 +376,12 @@ impl ConnectionHandler {
Ok(unwrapped) => unwrapped,
Err(err) => {
trace!("failed to process received mix packet: {err}");
warn!(
event = "packet.dropped.malformed",
error = %err,
remote_addr = %self.remote_address,
"dropping malformed packet"
);
self.shared
.metrics
.mixnet
@@ -316,7 +418,9 @@ impl ConnectionHandler {
// 3. forward the packet to the relevant sink (if enabled)
match unwrapped_packet {
Err(err) => trace!("failed to process received mix packet: {err}"),
Err(err) => {
trace!("failed to process received mix packet: {err}");
}
Ok(processed_packet) => match processed_packet.processing_data {
MixProcessingResultData::ForwardHop { packet, delay } => {
self.handle_forward_packet(now, packet, delay);
@@ -334,6 +438,7 @@ impl ConnectionHandler {
packets: HashMap<u32, Vec<PartiallyUnwrappedPacket>>,
replay_check_results: HashMap<u32, Vec<bool>>,
) {
let mut replays_detected: u64 = 0;
for (rotation_id, packets) in packets {
let Some(replay_checks) = replay_check_results.get(&rotation_id) else {
// this should never happen, but if we messed up, and it does, don't panic, just drop the packets
@@ -342,6 +447,13 @@ impl ConnectionHandler {
};
for (packet, &replayed) in packets.into_iter().zip(replay_checks) {
let unwrapped_packet = if replayed {
replays_detected += 1;
warn!(
event = "packet.dropped.replay",
remote_addr = %self.remote_address,
rotation_id,
"dropping replayed packet"
);
Err(PacketProcessingError::PacketReplay)
} else {
packet.finalise_unwrapping()
@@ -350,6 +462,13 @@ impl ConnectionHandler {
self.handle_unwrapped_packet(now, unwrapped_packet).await;
}
}
if replays_detected > 0 {
debug!(
replays_detected,
remote_addr = %self.remote_address,
"replay detection batch completed with replays"
);
}
}
async fn handle_pending_packets_batch_no_locking(&mut self, now: Instant) -> bool {
@@ -379,13 +498,22 @@ impl ConnectionHandler {
true
}
#[instrument(
name = "mixnode.replay_check_batch",
skip(self),
level = "debug",
fields(batch_size, mutex_wait_ms,)
)]
async fn handle_pending_packets_batch(&mut self, now: Instant) {
let batch = self.pending_packets.reset(now);
let replay_tags = self.pending_packets.replay_tags();
if replay_tags.is_empty() {
return;
}
let batch_size = self.pending_packets.total_count();
Span::current().record("batch_size", batch_size as u64);
let mutex_start = Instant::now();
let Ok(replay_check_results) = self
.shared
.replay_protection_filter
@@ -396,37 +524,25 @@ impl ConnectionHandler {
self.shared.shutdown_token.cancel();
return;
};
Span::current().record("mutex_wait_ms", mutex_start.elapsed().as_millis() as u64);
let batch = self.pending_packets.reset(now);
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
.await;
}
#[instrument(
name = "mixnode.sphinx_full_unwrap",
skip(self, packet),
level = "debug",
fields(key_rotation)
)]
fn try_full_unwrap_packet(
&self,
packet: FramedNymPacket,
) -> Result<MixProcessingResult, PacketProcessingError> {
// based on the received sphinx key rotation information,
// attempt to choose appropriate key for processing the packet
// NOTE: due to the function signatures, outfox packets will **only** attempt primary key
// if no rotation information is available (but that's fine given outfox is not really in use,
// and by the time we need it, the rotation info should be present)
match packet.header().key_rotation {
SphinxKeyRotation::Unknown => {
process_framed_packet(packet, self.shared.sphinx_keys.primary().inner().as_ref())
}
SphinxKeyRotation::OddRotation => {
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
return Err(PacketProcessingError::ExpiredKey);
};
process_framed_packet(packet, odd_key.inner().as_ref())
}
SphinxKeyRotation::EvenRotation => {
let Some(even_key) = self.shared.sphinx_keys.even() else {
return Err(PacketProcessingError::ExpiredKey);
};
process_framed_packet(packet, even_key.inner().as_ref())
}
}
let key = self.resolve_rotation_key(packet.header().key_rotation)?;
process_framed_packet(packet, key.inner().as_ref())
}
async fn handle_received_packet_with_no_replay_detection(
@@ -456,23 +572,36 @@ impl ConnectionHandler {
}
#[instrument(
skip(self),
name = "mixnode.connection",
skip(self, socket),
level = "debug",
fields(
remote = %self.remote_address
remote = %self.remote_address,
noise_handshake_ms = tracing::field::Empty,
)
)]
pub(crate) async fn handle_connection(&mut self, socket: TcpStream) {
let handshake_start = Instant::now();
let noise_stream = match upgrade_noise_responder(socket, &self.shared.noise_config).await {
Ok(noise_stream) => noise_stream,
Err(err) => {
error!(
"Failed to perform Noise handshake with {:?} - {err}",
self.remote_address
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
warn!(
event = "connection.failed.noise",
remote_addr = %self.remote_address,
error = %err,
"Noise responder handshake failed"
);
return;
}
};
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
debug!(
"Noise responder handshake completed for {:?}",
self.remote_address
@@ -481,26 +610,58 @@ impl ConnectionHandler {
.await
}
#[instrument(
name = "mixnode.stream",
skip(self, mixnet_connection),
level = "debug",
fields(
remote = %self.remote_address,
packets_processed = 0u64,
exit_reason,
)
)]
pub(crate) async fn handle_stream(
&mut self,
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
) {
let mut packets_processed: u64 = 0;
loop {
tokio::select! {
biased;
_ = self.shared.shutdown_token.cancelled() => {
trace!("connection handler: received shutdown");
Span::current().record("exit_reason", "shutdown");
break
}
maybe_framed_nym_packet = mixnet_connection.next() => {
match maybe_framed_nym_packet {
Some(Ok(packet)) => self.handle_received_nym_packet(packet).await,
Some(Ok(packet)) => {
self.handle_received_nym_packet(packet).await;
packets_processed += 1;
if packets_processed.is_multiple_of(SPAN_UPDATE_INTERVAL) {
Span::current().record("packets_processed", packets_processed);
}
}
Some(Err(err)) => {
debug!("connection got corrupted with: {err}");
warn!(
event = "connection.corrupted",
remote_addr = %self.remote_address,
error = %err,
packets_processed,
"connection stream corrupted"
);
Span::current().record("exit_reason", "corrupted");
Span::current().record("packets_processed", packets_processed);
return
}
None => {
debug!("connection got closed by the remote");
debug!(
remote_addr = %self.remote_address,
packets_processed,
"connection closed by remote"
);
Span::current().record("exit_reason", "closed_by_remote");
Span::current().record("packets_processed", packets_processed);
return
}
}
@@ -508,6 +669,7 @@ impl ConnectionHandler {
}
}
Span::current().record("packets_processed", packets_processed);
debug!("exiting and closing connection");
}
}
@@ -56,11 +56,17 @@ impl<C, F> PacketForwarder<C, F> {
if let Err(err) = self.mixnet_client.send_without_response(packet) {
if err.kind() == io::ErrorKind::WouldBlock {
// we only know for sure if we dropped a packet if our sending queue was full
// in any other case the connection might still be re-established (or created for the first time)
// and the packet might get sent, but we won't know about it
warn!(
event = "packet.dropped.buffer_full",
next_hop = %next_hop,
"dropping packet: egress connection buffer full (WouldBlock)"
);
self.metrics.mixnet.egress_dropped_forward_packet(next_hop)
} else if err.kind() == io::ErrorKind::NotConnected {
// let's give the benefit of the doubt and assume we manage to establish connection
debug!(
next_hop = %next_hop,
"packet queued for not-yet-connected peer"
);
self.metrics.mixnet.egress_sent_forward_packet(next_hop)
}
} else {
@@ -86,7 +92,11 @@ impl<C, F> PacketForwarder<C, F> {
let next_hop = new_packet.packet.next_hop();
if !self.routing_filter.should_route(next_hop.as_ref().ip()) {
debug!("dropping packet as the egress address does not belong to any known node");
warn!(
event = "packet.dropped.routing_filter",
next_hop = %next_hop,
"dropping packet: egress address does not belong to any known node"
);
self.metrics
.mixnet
.egress_dropped_forward_packet(next_hop.into());
@@ -125,7 +135,7 @@ impl<C, F> PacketForwarder<C, F> {
C: SendWithoutResponse,
F: RoutingFilter,
{
let mut processed = 0;
let mut processed: u64 = 0;
trace!("starting PacketForwarder");
loop {
tokio::select! {
@@ -145,11 +155,29 @@ impl<C, F> PacketForwarder<C, F> {
#[allow(clippy::unwrap_used)]
self.handle_new_packet(new_packet.unwrap());
let channel_len = self.packet_sender.len();
if processed % 1000 == 0 {
let delay_queue_len = self.delay_queue.len();
if processed.is_multiple_of(1000) {
match channel_len {
n if n > 1000 => error!("there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"),
n if n > 500 => warn!("there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"),
n => trace!("there are currently {n} mix packets waiting to get forwarded"),
n if n > 1000 => error!(
event = "forwarder.queue_overload",
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"
),
n if n > 500 => warn!(
event = "forwarder.queue_high",
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"
),
n => trace!(
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"forwarder queue status"
),
}
}
self.update_channel_size_metric(channel_len);
+41 -4
View File
@@ -5,7 +5,8 @@ use nym_gateway::node::{
ActiveClientsStore, GatewayStorage, GatewayStorageError, InboxGatewayStorage,
};
use nym_sphinx_types::DestinationAddressBytes;
use tracing::debug;
use tokio::time::Instant;
use tracing::{debug, warn};
#[derive(Clone)]
pub(crate) struct SharedFinalHopData {
@@ -27,14 +28,37 @@ impl SharedFinalHopData {
message: Vec<u8>,
) -> Result<(), Vec<u8>> {
match self.active_clients.get_sender(client_address) {
None => Err(message),
None => {
debug!(
event = "gateway.push_to_client",
client_found = false,
send_result = "client_not_found",
"client {client_address} not found in active clients"
);
Err(message)
}
Some(sender_channel) => {
let send_start = Instant::now();
if let Err(unsent) = sender_channel.unbounded_send(vec![message]) {
warn!(
event = "gateway.push_to_client",
client_found = true,
send_result = "channel_closed",
send_us = send_start.elapsed().as_micros() as u64,
"client {client_address} channel closed, message not delivered"
);
// the unwrap here is fine as the original message got returned;
// plus we're only ever sending 1 message at the time (for now)
#[allow(clippy::unwrap_used)]
Err(unsent.into_inner().pop().unwrap())
} else {
debug!(
event = "gateway.push_to_client",
client_found = true,
send_result = "ok",
send_us = send_start.elapsed().as_micros() as u64,
"pushed message to client {client_address}"
);
Ok(())
}
}
@@ -46,8 +70,21 @@ impl SharedFinalHopData {
client_address: DestinationAddressBytes,
message: Vec<u8>,
) -> Result<(), GatewayStorageError> {
let start = Instant::now();
debug!("Storing received message for {client_address} on the disk...",);
self.storage.store_message(client_address, message).await
let result = self.storage.store_message(client_address, message).await;
let store_us = start.elapsed().as_micros() as u64;
if result.is_ok() {
debug!(
event = "gateway.disk_store",
store_us, "stored message for {client_address} on disk in {store_us}us"
);
} else {
warn!(
event = "gateway.disk_store_failed",
store_us, "failed to store message for {client_address} on disk after {store_us}us"
);
}
result
}
}
+3
View File
@@ -185,6 +185,7 @@ impl SharedData {
}
pub(super) fn forward_mix_packet(&self, packet: MixPacket, delay_until: Option<Instant>) {
let has_delay = delay_until.is_some();
if self
.mixnet_forwarder
.forward_packet(PacketToForward::new(packet, delay_until))
@@ -192,6 +193,8 @@ impl SharedData {
&& !self.shutdown_token.is_cancelled()
{
error!(
event = "forwarder.channel_send_failed",
has_delay,
"failed to forward sphinx packet on the channel while the process is not going through the shutdown!"
);
self.shutdown_token.cancel();