Compare commits

..

23 Commits

Author SHA1 Message Date
Mark Sinclair 6131082cee Data Observatory: add optional coingecko API key 2026-02-20 14:55:41 +00:00
Mark Sinclair 8bd6a1006b Data Observatory: add more logging for coingecko API failures 2026-02-20 14:38:59 +00:00
Mark Sinclair 1678fb6b94 Data Observatory: add env var PG_CERT and bump version 2026-02-19 17:08:06 +00:00
import this 94a3599b4d [DOCs]: Fix missing diagnostic tool in developers menu (#6470)
* bump up stats and run prebuild

* fix typos
2026-02-19 15:08:04 +00:00
import this a6bc54461a [DOCs]: Diagnostic tool (#6467)
* create diagnostic-tool page

* add to menu

* add to list of tools

* syntax fix

* syntax fix

* syntax fix

* syntax fix

* rm old
2026-02-18 16:57:55 +00:00
Tommy Verrall 4f0c40dab7 Merge pull request #6464 from nymtech/otel-minimal-v2
Otel minimal v2
2026-02-18 14:23:35 +01:00
Tommy Verrall 3eff6e5e3b fix testthroughput 2026-02-18 11:06:42 +01:00
Tommy Verrall a519f4ccb8 pr feedback
- Moved OTel CLI options into a separate OtelArgs
- Otel is built behind the feature flag otel
- Store timing is in microseconds
- Restore comments to existing files
2026-02-18 10:48:54 +01:00
Tommy Verrall a3ba3bfc5a remove non OTEL work here 2026-02-17 10:17:22 +01:00
Tommy Verrall 988df7cff7 sampling to avoid costs
- add otel timeouts
2026-02-17 09:10:52 +01:00
Tommy Verrall 260f8e9714 revert docker/localnet to develop; localnet work to follow in separate PR 2026-02-17 08:37:49 +01:00
Tommy Verrall d28d0ac39e fix replay batch drop, harden error handling and scripts 2026-02-16 19:42:24 +01:00
Tommy Verrall dce4d6b34b otel: refactor key selection, add environment label, fix clippy 2026-02-16 19:13:11 +01:00
Tommy Verrall bc47e9a1b2 otel: explicit TLS config for https endpoints 2026-02-16 18:11:28 +01:00
Tommy Verrall 3b693741b2 Merge branch 'develop' of https://github.com/nymtech/nym into otel-minimal-v2 2026-02-16 16:41:16 +01:00
Tommy Verrall cb277fe487 otel: support signoz cloud ingestion key and TLS 2026-02-16 16:11:31 +01:00
Tommy Verrall 8bb29f4d07 localnet: add loadtest script and signoz docs 2026-02-16 15:44:55 +01:00
Tommy Verrall e753f24ed1 localnet: fix runtime and gateway flags 2026-02-16 15:21:45 +01:00
Tommy Verrall c7cd962627 localnet: multi-stage dockerfile 2026-02-16 14:45:05 +01:00
Tommy Verrall 00467e4440 fix upstream build: update lockfile and stabilise nym-lp 2026-02-16 14:11:40 +01:00
Tommy Verrall f3d1000472 Add gitignore 2026-02-16 13:57:04 +01:00
Tommy Verrall 597aae1a20 localnet: wire otel 2026-02-16 13:54:15 +01:00
Tommy Verrall 40a3cd28b7 otel: add tracing 2026-02-16 13:46:17 +01:00
32 changed files with 2676 additions and 2656 deletions
+1
View File
@@ -76,3 +76,4 @@ CLAUDE.md
.claude/settings.json
/notes
/target-otel
Generated
+1627 -1626
View File
File diff suppressed because it is too large Load Diff
+5 -4
View File
@@ -309,8 +309,10 @@ nix = "0.30.1"
notify = "5.1.0"
num_enum = "0.7.5"
once_cell = "1.21.3"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
opentelemetry = "0.31.0"
opentelemetry_sdk = "0.31.0"
opentelemetry-otlp = "0.31.0"
tonic = "0.14.4"
parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
@@ -368,9 +370,8 @@ tower = "0.5.2"
tower-http = "0.6.6"
tracing = "0.1.41"
tracing-log = "0.2"
tracing-opentelemetry = "0.19.0"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = "0.3.20"
tracing-tree = "0.2.2"
tracing-indicatif = "0.3.9"
tracing-test = "0.2.5"
ts-rs = "10.1.0"
+13 -9
View File
@@ -19,12 +19,15 @@ serde_json = { workspace = true, optional = true }
## tracing
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
tracing-tree = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
opentelemetry-jaeger = { workspace = true, features = ["rt-tokio", "collector_client", "isahc_collector_client"], optional = true }
tracing-opentelemetry = { workspace = true, optional = true }
utoipa = { workspace = true, optional = true }
opentelemetry = { workspace = true, features = ["rt-tokio"], optional = true }
opentelemetry = { workspace = true, features = ["trace"], optional = true }
## otel-otlp (modern OTLP export to SigNoz/any OTLP collector)
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "trace", "tls-roots"], optional = true }
tonic = { workspace = true, optional = true }
[build-dependencies]
@@ -35,13 +38,14 @@ default = []
openapi = ["utoipa"]
output_format = ["serde_json", "dep:clap"]
bin_info_schema = ["schemars"]
basic_tracing = ["dep:tracing", "tracing-subscriber"]
tracing = [
basic_tracing = ["dep:tracing", "dep:tracing-subscriber"]
otel-otlp = [
"basic_tracing",
"tracing-tree",
"opentelemetry-jaeger",
"tracing-opentelemetry",
"opentelemetry",
"dep:opentelemetry",
"dep:opentelemetry_sdk",
"dep:opentelemetry-otlp",
"dep:tracing-opentelemetry",
"dep:tonic",
]
clap = ["dep:clap", "dep:clap_complete", "dep:clap_complete_fig"]
models = []
+97 -38
View File
@@ -4,16 +4,9 @@
use serde::{Deserialize, Serialize};
use std::io::IsTerminal;
#[cfg(feature = "tracing")]
pub use opentelemetry;
#[cfg(feature = "tracing")]
pub use opentelemetry_jaeger;
#[cfg(feature = "tracing")]
pub use tracing_opentelemetry;
// Re-export tracing_subscriber for consumers that need to compose layers
#[cfg(feature = "basic_tracing")]
pub use tracing_subscriber;
#[cfg(feature = "tracing")]
pub use tracing_tree;
#[derive(Debug, Default, Copy, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
@@ -69,40 +62,106 @@ pub fn setup_tracing_logger() {
build_tracing_logger().init()
}
// TODO: This has to be a macro, running it as a function does not work for the file_appender for some reason
#[cfg(feature = "tracing")]
#[macro_export]
macro_rules! setup_tracing {
($service_name: expr) => {
use nym_bin_common::logging::tracing_subscriber::layer::SubscriberExt;
use nym_bin_common::logging::tracing_subscriber::util::SubscriberInitExt;
/// Initialize an OpenTelemetry tracing layer that exports spans via OTLP/gRPC.
///
/// This produces a layer compatible with `tracing_subscriber::registry()` that
/// sends traces to any OTLP-compatible collector (SigNoz, Grafana Tempo, etc).
///
/// Returns both the tracing layer and the [`SdkTracerProvider`] so the caller
/// can invoke [`SdkTracerProvider::shutdown`] for graceful flush on exit.
///
/// # Arguments
/// * `service_name` - The service name reported to the collector (e.g. "nym-node")
/// * `endpoint` - The OTLP/gRPC collector endpoint (e.g. "http://localhost:4317"
/// or "https://ingest.eu.signoz.cloud:443" for SigNoz Cloud)
/// * `ingestion_key` - Optional SigNoz Cloud ingestion key. When provided, it is
/// sent as the `signoz-ingestion-key` gRPC metadata header on every export.
/// * `environment` - Deployment environment label (e.g. "sandbox", "mainnet", "canary").
/// Attached as the `deployment.environment` OTel resource attribute.
/// * `sample_ratio` - Trace sampling ratio in 0.0..=1.0 (e.g. 0.1 = 10% of traces).
/// Used to limit cost when exporting from many nodes; clamped to [0.0, 1.0].
/// * `export_timeout_secs` - Timeout in seconds for each OTLP export batch. Prevents
/// unbounded blocking if the collector is slow or unreachable.
#[cfg(feature = "otel-otlp")]
pub fn init_otel_layer<S>(
service_name: &str,
endpoint: &str,
ingestion_key: Option<&str>,
environment: &str,
sample_ratio: f64,
export_timeout_secs: u64,
) -> Result<
(
tracing_opentelemetry::OpenTelemetryLayer<S, opentelemetry_sdk::trace::SdkTracer>,
opentelemetry_sdk::trace::SdkTracerProvider,
),
Box<dyn std::error::Error + Send + Sync>,
>
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::WithTonicConfig;
use opentelemetry_sdk::trace::Sampler;
use std::time::Duration;
let registry = nym_bin_common::logging::tracing_subscriber::Registry::default()
.with(nym_bin_common::logging::tracing_subscriber::EnvFilter::from_default_env())
.with(
nym_bin_common::logging::tracing_tree::HierarchicalLayer::new(4)
.with_targets(true)
.with_bracketed_fields(true),
);
// Validate endpoint URI early to fail with a clear message
if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") {
return Err(format!(
"invalid OTLP endpoint URI: {endpoint} (must start with http:// or https://)"
)
.into());
}
let tracer = nym_bin_common::logging::opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://44.199.230.10:14268/api/traces")
.with_service_name($service_name)
.with_isahc()
.with_trace_config(
nym_bin_common::logging::opentelemetry::sdk::trace::config().with_sampler(
nym_bin_common::logging::opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(
0.1,
),
),
)
.install_batch(nym_bin_common::logging::opentelemetry::runtime::Tokio)
.expect("Could not init tracer");
let sample_ratio_clamped = sample_ratio.clamp(0.0, 1.0);
let telemetry = nym_bin_common::logging::tracing_opentelemetry::layer().with_tracer(tracer);
let mut builder = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_timeout(Duration::from_secs(export_timeout_secs));
registry.with(telemetry).init();
};
// Explicitly configure TLS when the endpoint uses HTTPS
if endpoint.starts_with("https://") {
builder =
builder.with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
}
if let Some(key) = ingestion_key {
let mut metadata = tonic::metadata::MetadataMap::new();
metadata.insert(
"signoz-ingestion-key",
key.parse()
.map_err(|_| "invalid ingestion key format (value redacted)")?,
);
builder = builder.with_metadata(metadata);
}
let exporter = builder
.build()
.map_err(|e| format!("failed to build OTLP exporter for endpoint {endpoint}: {e}"))?;
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_sampler(Sampler::TraceIdRatioBased(sample_ratio_clamped))
.with_batch_exporter(exporter)
.with_resource(
opentelemetry_sdk::Resource::builder()
.with_service_name(service_name.to_owned())
.with_attribute(opentelemetry::KeyValue::new(
"deployment.environment",
environment.to_owned(),
))
.build(),
)
.build();
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
let tracer = tracer_provider.tracer(service_name.to_owned());
Ok((
tracing_opentelemetry::layer().with_tracer(tracer),
tracer_provider,
))
}
pub fn banner(crate_name: &str, crate_version: &str) -> String {
+80 -26
View File
@@ -128,54 +128,95 @@ impl ManagedConnection {
async fn run(self) {
let address = self.address;
let reconnection_attempt = self.current_reconnection.load(Ordering::Acquire);
let connect_start = tokio::time::Instant::now();
let connection_fut = TcpStream::connect(address);
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", self.address);
let connect_ms = connect_start.elapsed().as_millis() as u64;
debug!(
peer = %address,
connect_ms,
"Managed to establish connection to {}", self.address
);
let noise_start = tokio::time::Instant::now();
let noise_stream =
match upgrade_noise_initiator(stream, &self.noise_config).await {
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {address} - {err}");
// we failed to finish the noise handshake - increase reconnection attempt
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
warn!(
event = "connection.failed.noise",
peer = %address,
error = %err,
connect_ms,
noise_handshake_ms,
reconnection_attempt,
exit_reason = "noise_error",
"Failed to perform Noise initiator handshake with {address}"
);
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
// if we managed to connect AND do the noise handshake, reset the reconnection count (whatever it might have been)
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
self.current_reconnection.store(0, Ordering::Release);
debug!("Noise initiator handshake completed for {:?}", address);
debug!(
peer = %address,
connect_ms,
noise_handshake_ms,
"Noise initiator handshake completed for {:?}", address
);
Framed::new(noise_stream, NymCodec)
}
Err(err) => {
debug!("failed to establish connection to {address} (err: {err})",);
let connect_ms = connect_start.elapsed().as_millis() as u64;
warn!(
event = "connection.failed.connect",
peer = %address,
error = %err,
connect_ms,
reconnection_attempt,
exit_reason = "connect_error",
"failed to establish connection to {address}"
);
return;
}
},
Err(_) => {
debug!(
let connect_ms = connect_start.elapsed().as_millis() as u64;
warn!(
event = "connection.failed.timeout",
peer = %address,
timeout_ms = self.connection_timeout.as_millis() as u64,
connect_ms,
reconnection_attempt,
exit_reason = "timeout",
"failed to connect to {address} within {:?}",
self.connection_timeout
);
// we failed to connect - increase reconnection attempt
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {address}: {err}");
warn!(
event = "connection.forward_error",
peer = %address,
error = %err,
exit_reason = "forward_error",
"Failed to forward packets to {address}: {err}"
);
}
debug!(
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
peer = %address,
exit_reason = "sender_dropped",
"connection manager to {address} finished"
);
}
}
@@ -272,16 +313,18 @@ impl SendWithoutResponse for Client {
trace!("Sending packet to {address}");
// TODO: optimisation for the future: rather than constantly using legacy encoding,
// once we're addressing by node_id (and thus have full node info here),
// we could simply infer supported encoding based on their version
// use the mix packet type / flags to pick encoding per packet
let framed_packet =
FramedNymPacket::from_mix_packet(packet, self.config.use_legacy_packet_encoding);
let Some(sender) = self.active_connections.get_mut(&address) else {
// there was never a connection to begin with
debug!("establishing initial connection to {address}");
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
debug!(
event = "mixclient.try_send",
peer = %address,
result = "not_connected",
"establishing initial connection to {address}"
);
self.make_connection(address, framed_packet);
return Err(io::Error::new(
io::ErrorKind::NotConnected,
@@ -289,15 +332,24 @@ impl SendWithoutResponse for Client {
));
};
let channel_capacity = sender.channel.max_capacity();
let channel_available = sender.channel.capacity();
let channel_used = channel_capacity - channel_available;
let sending_res = sender.channel.try_send(framed_packet);
drop(sender);
sending_res.map_err(|err| {
match err {
TrySendError::Full(_) => {
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
warn!(
event = "mixclient.try_send",
peer = %address,
result = "full_dropped",
channel_capacity,
channel_used,
"dropping packet: connection buffer to {address} is full ({channel_used}/{channel_capacity})"
);
io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
@@ -305,11 +357,13 @@ impl SendWithoutResponse for Client {
}
TrySendError::Closed(dropped) => {
debug!(
"Connection to {address} seems to be dead. attempting to re-establish it...",
event = "mixclient.try_send",
peer = %address,
result = "closed_reconnecting",
channel_capacity,
channel_used,
"connection to {address} dead, attempting re-establishment"
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, dropped);
io::Error::new(
io::ErrorKind::ConnectionAborted,
+22 -26
View File
@@ -1,24 +1,22 @@
# Multi-stage Dockerfile for Nym localnet
# Stage 1: Build binaries
# Stage 2: Slim runtime with only the final binaries
# Single-stage Dockerfile for Nym localnet
# Builds: nym-node, nym-network-requester, nym-socks5-client
# Target: Apple Container Runtime with host networking
# --- Build stage ---
FROM rust:latest AS builder
FROM rust:latest
WORKDIR /usr/src/nym
COPY ./ ./
ENV CARGO_BUILD_JOBS=8
RUN cargo build --release --locked -p nym-node --features otel && \
cargo build --release --locked -p nym-network-requester -p nym-socks5-client
# Build all required binaries in release mode
RUN cargo build --release --locked \
-p nym-node \
-p nym-network-requester \
-p nym-socks5-client
# --- Runtime stage ---
FROM debian:trixie-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
build-essential \
# Install runtime dependencies including Go for wireguard-go
RUN apt update && apt install -y \
python3 \
python3-pip \
netcat-openbsd \
@@ -26,33 +24,31 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
iproute2 \
net-tools \
wireguard-tools \
golang-go \
git \
iptables \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Go and build wireguard-go, then clean up
ARG TARGETARCH
RUN curl -fsSL "https://go.dev/dl/go1.23.6.linux-${TARGETARCH}.tar.gz" \
| tar -C /usr/local -xz && \
export PATH="/usr/local/go/bin:$PATH" && \
git clone https://git.zx2c4.com/wireguard-go && \
# Install wireguard-go (userspace WireGuard implementation)
RUN git clone https://git.zx2c4.com/wireguard-go && \
cd wireguard-go && \
make && \
cp wireguard-go /usr/local/bin/ && \
cd .. && \
rm -rf wireguard-go /usr/local/go && \
apt-get purge -y --auto-remove build-essential curl
rm -rf wireguard-go
# Install Python dependencies for build_topology.py
RUN pip3 install --break-system-packages base58
# Copy only the compiled binaries from the builder stage
COPY --from=builder /usr/src/nym/target/release/nym-node /usr/local/bin/
COPY --from=builder /usr/src/nym/target/release/nym-network-requester /usr/local/bin/
COPY --from=builder /usr/src/nym/target/release/nym-socks5-client /usr/local/bin/
# Move binaries to /usr/local/bin for easy access
RUN cp target/release/nym-node /usr/local/bin/ && \
cp target/release/nym-network-requester /usr/local/bin/ && \
cp target/release/nym-socks5-client /usr/local/bin/
# Copy supporting scripts
COPY ./docker/localnet/build_topology.py /usr/local/bin/
WORKDIR /nym
# Default command
CMD ["nym-node", "--help"]
+37 -128
View File
@@ -1,71 +1,35 @@
# Nym Localnet
# Nym Localnet for Kata Container Runtimes
A complete Nym mixnet test environment with OpenTelemetry instrumentation.
Supports both Docker Desktop and Apple Container Runtime on macOS.
A complete Nym mixnet test environment running on Apple's container runtime for macOS (for now).
## Overview
This localnet setup provides a fully functional Nym mixnet for local development and testing:
- **3 mixnodes** (layer 1, 2, 3)
- **2 gateways** (entry + exit mode)
- **1 gateway** (entry + exit mode)
- **1 network-requester** (service provider)
- **1 SOCKS5 client**
- **OpenTelemetry tracing** via OTLP/gRPC to SigNoz (or any OTLP collector)
All components run in isolated containers with proper networking and dynamic IP resolution.
When the `otel` feature is enabled (default), every nym-node exports traces covering
the full packet lifecycle: ingress, Sphinx processing, forwarding, and final-hop delivery.
## Prerequisites
### Required
- **macOS** (tested on macOS Sequoia 15.0+)
- **Docker Desktop** (recommended) or **Apple Container Runtime**
- **Apple Container Runtime** - Built into macOS
- **Docker Desktop** (for building images only)
- **Python 3** with `base58` library
### SigNoz (for trace viewing)
SigNoz is an open-source APM that receives and visualises OpenTelemetry data.
Install it locally with Docker Compose -- this takes about 2 minutes:
```bash
# Clone the SigNoz repository
git clone -b main https://github.com/SigNoz/signoz.git ~/signoz
cd ~/signoz/deploy
# Start SigNoz (runs ClickHouse, otel-collector, query-service, frontend)
docker compose up -d
# Verify it is running
docker ps --filter "name=signoz" --format "table {{.Names}}\t{{.Status}}"
```
Once running:
- **SigNoz UI**: http://localhost:8080
- **OTLP gRPC collector**: localhost:4317 (used by nym-nodes)
- **OTLP HTTP collector**: localhost:4318
The localnet script auto-detects the SigNoz Docker network (`signoz-net`) and
routes OTel traffic directly to the collector container -- no manual endpoint
configuration needed.
To stop SigNoz later:
```bash
cd ~/signoz/deploy && docker compose down
```
### Installation
```bash
# Install Python dependencies
pip3 install --break-system-packages base58
# Verify Docker is installed
docker --version
```
If using Apple Container Runtime instead of Docker:
```bash
# Verify container runtime is available
container --version
# Verify Docker is installed (for building)
docker --version
```
## Quick Start
@@ -118,17 +82,7 @@ Ports published to host:
- 20001-20005 → Verloc ports
- 30001-30005 → HTTP APIs
- 41264/41265 → LP control ports (registration)
- 51822/51823 → WireGuard tunnel ports (gateway/gateway2; only used when WireGuard is enabled)
### WireGuard and privileges
By default, gateways run with **WireGuard disabled** (`--wireguard-enabled false`). No elevated capabilities are required: the script does not use `--cap-add=NET_ADMIN` or `--device /dev/net/tun`, so localnet runs without net admin privileges and is suitable for mixnet packet testing and SOCKS5 over the mixnet.
To enable WireGuard VPN routing in localnet (e.g. for two-hop VPN tests), set `WIREGUARD_ENABLED=1` before starting. The script will then add `--cap-add=NET_ADMIN` and `--device /dev/net/tun` to the gateway containers and configure IP forwarding and NAT. This may not work in all Docker environments (e.g. some hosted runners restrict capabilities).
```bash
WIREGUARD_ENABLED=1 ./localnet.sh start
```
- 51822/51823 → WireGuard tunnel ports (gateway/gateway2)
### Startup Flow
@@ -244,99 +198,54 @@ container logs nym-gateway --follow
### Status
```bash
# List all containers
docker ps --filter "name=nym-" --format "table {{.Names}}\t{{.Status}}"
container list
# Check specific container
docker logs nym-gateway
container logs nym-gateway
# Inspect network
docker network inspect nym-localnet-network
container network inspect nym-localnet-network
```
## Testing
### Basic SOCKS5 Test
```bash
# Simple HTTP request through the mixnet
curl -x socks5h://127.0.0.1:1080 https://httpbin.org/get
# Simple HTTP request with redirect following
curl -L --socks5 localhost:1080 http://example.com
# HTTPS request
curl -x socks5h://127.0.0.1:1080 https://nymtech.net
curl -L --socks5 localhost:1080 https://nymtech.net
# Download a file
curl -x socks5h://127.0.0.1:1080 \
curl -L --socks5 localhost:1080 \
https://test-download-files-nym.s3.amazonaws.com/download-files/1MB.zip \
--output /tmp/test.zip
```
### Load Testing
A load test script is included to generate sustained traffic and populate SigNoz
with meaningful trace data:
```bash
# Default: 10 concurrent workers, 60 seconds
./loadtest.sh
# Heavier load: 20 workers for 2 minutes
./loadtest.sh -c 20 -d 120
# Light single-threaded test
./loadtest.sh -c 1 -d 10
# Target a specific URL
./loadtest.sh -c 5 -d 30 -u https://httpbin.org/bytes/4096
```
The script reports live progress, then prints a summary with request counts,
throughput, and latency percentiles (p50/p95/p99).
### Verify Network Topology
```bash
# View the generated topology
docker exec nym-gateway cat /localnet/network.json | jq .
container exec nym-gateway cat /localnet/network.json | jq .
# Check container status
docker ps --filter "name=nym-" --format "table {{.Names}}\t{{.Status}}"
# Check container IPs
container list | grep nym-
# Verify all bonding files exist
docker exec nym-gateway ls -la /localnet/
container exec nym-gateway ls -la /localnet/
```
### Test Mixnet Routing
```bash
# All traffic flows through: client -> gateway -> mix1 -> mix2 -> mix3 -> gateway -> internet
# All traffic flows through: client mix1 mix2 mix3 gateway internet
# Watch logs to verify routing:
docker logs nym-mixnode1 --follow &
docker logs nym-mixnode2 --follow &
docker logs nym-mixnode3 --follow &
docker logs nym-gateway --follow &
container logs nym-mixnode1 --follow &
container logs nym-mixnode2 --follow &
container logs nym-mixnode3 --follow &
container logs nym-gateway --follow &
# Make a request
curl -x socks5h://127.0.0.1:1080 https://nymtech.net
```
## OpenTelemetry
OTel is enabled by default. Each nym-node exports traces via OTLP/gRPC covering
packet ingress, Sphinx processing, forwarding, and final-hop delivery.
### Viewing Traces
- **SigNoz UI**: http://localhost:8080 -- filter by `serviceName = nym-node`
- **Terminal report** (queries ClickHouse directly, no login needed):
```bash
./otel-report.sh # last 15 minutes
./otel-report.sh 60 # last 60 minutes
./otel-report.sh live # auto-refresh every 10s
```
### Disabling OTel
```bash
OTEL_ENABLE=0 ./localnet.sh start # disable
OTEL_ENDPOINT=http://my-collector:4317 ./localnet.sh start # custom collector
curl -L --socks5 localhost:1080 https://nymtech.com
```
### LP (Lewes Protocol) Testing
@@ -380,11 +289,8 @@ This makes localnet perfect for rapid LP protocol development and testing.
docker/localnet/
├── README.md # This file
├── localnet.sh # Main orchestration script
├── loadtest.sh # Load test / traffic generator
── otel-report.sh # Terminal-based OTel metrics report
├── Dockerfile.localnet # Multi-stage Docker image (builder + slim runtime)
├── build_topology.py # Topology generator
└── localnet-logs.sh # Tmux-based multi-container log viewer
├── Dockerfile.localnet # Docker image definition
── build_topology.py # Topology generator
```
## How It Works
@@ -674,11 +580,14 @@ start_mixnode 4 "$MIXNODE4_CONTAINER"
### Container Runtime
**Docker Desktop** is the default and recommended runtime; no extra setup is required for mixnet testing.
Apple's container runtime is a native macOS container system:
- Uses Virtualization.framework for isolation
- Lightweight VMs for each container
- Native macOS integration
- Separate image store from Docker
- Natively uses [Kata Containers](https://github.com/kata-containers/kata-containers) images
**Apple Container Runtime** is an optional alternative on macOS. It natively uses [Kata Containers](https://github.com/kata-containers/kata-containers) images and is only required if you use `container` instead of Docker (e.g. for consistency with other Apple tooling). Kata is also the path that provides a kernel with `CONFIG_TUN=y` if you need TUN/WireGuard inside containers under the Apple runtime.
### Initial setup for [Container Runtime](https://github.com/apple/container) (optional)
### Initial setup for [Container Runtime](https://github.com/apple/container)
- **MUST** have MacOS Tahoe for inter-container networking
- `brew install --cask container`
@@ -722,7 +631,7 @@ Both are ephemeral by default (cleaned up on stop).
- **No Docker Compose**: Uses custom orchestration script
- **Dynamic IPs**: Container IPs may change between restarts
- **Port conflicts**: Cannot run alongside services using same ports
- **TUN device**: Only required when `WIREGUARD_ENABLED=1`; otherwise gateways run without it
- **TUN device**: Gateway requires `ip` command for network interfaces
## Support
-297
View File
@@ -1,297 +0,0 @@
#!/bin/bash
# Nym Localnet Load Test
# Generates sustained traffic through the mixnet SOCKS5 proxy to produce
# OTel traces and exercise the packet pipeline end-to-end.
#
# Usage:
# ./loadtest.sh # defaults: 10 concurrent, 60s, mixed sizes
# ./loadtest.sh -c 20 -d 120 # 20 concurrent, 120s
# ./loadtest.sh -s 64k # fixed 64KB responses (many Sphinx fragments)
# ./loadtest.sh -s 1k -c 5 -d 30 # small payloads, 5 workers
#
# Payload sizes (-s flag) map to Sphinx packet fragmentation:
# 1k = ~1 Sphinx packet (sub-MTU, minimal fragmentation)
# 4k = ~2-3 packets (small payload)
# 16k = ~8-10 packets (medium payload)
# 64k = ~32-35 packets (large payload, stresses forwarding)
# 256k = ~128-130 packets (heavy payload, stresses queues)
# 1m = ~512 packets (very heavy, potential backpressure)
#
# Prerequisites:
# - Localnet running (./localnet.sh start)
# - SOCKS5 proxy available on localhost:1080
set -e
CONCURRENCY=10
DURATION=60
PROXY="socks5h://127.0.0.1:1080"
PAYLOAD_SIZE=""
CUSTOM_URL=""
STATS_INTERVAL=5
# Default targets: mixed sizes for general testing
TARGETS=(
"https://httpbin.org/get"
"https://httpbin.org/bytes/1024"
"https://httpbin.org/delay/1"
"https://example.com"
"https://nym.com"
)
# Convert human-readable size to bytes for httpbin
parse_size() {
local s
s=$(echo "$1" | tr '[:upper:]' '[:lower:]')
local num
num=$(echo "$s" | sed 's/[a-z]*$//')
case "$s" in
*m|*mb) echo $(( num * 1024 * 1024 )) ;;
*k|*kb) echo $(( num * 1024 )) ;;
*) echo "$num" ;;
esac
}
usage() {
echo "Usage: $0 [-c concurrency] [-d duration_secs] [-s payload_size] [-u url] [-p proxy]"
echo ""
echo "Options:"
echo " -c Number of concurrent workers (default: $CONCURRENCY)"
echo " -d Test duration in seconds (default: $DURATION)"
echo " -s Response payload size: 1k, 4k, 16k, 64k, 256k, 1m (default: mixed)"
echo " -u Custom target URL (overrides -s and default targets)"
echo " -p SOCKS5 proxy address (default: $PROXY)"
echo ""
echo "Examples:"
echo " $0 # 10 workers, 60s, mixed targets/sizes"
echo " $0 -s 1k # small payloads (~1 Sphinx packet each)"
echo " $0 -s 64k -c 5 # large payloads, 5 workers"
echo " $0 -s 256k -c 2 -d 30 # very large payloads, observe queue pressure"
echo " $0 -c 20 -d 120 # heavier concurrency, 2 minutes"
exit 0
}
while getopts "c:d:s:u:p:h" opt; do
case $opt in
c) CONCURRENCY=$OPTARG ;;
d) DURATION=$OPTARG ;;
s) PAYLOAD_SIZE=$OPTARG ;;
u) CUSTOM_URL=$OPTARG ;;
p) PROXY=$OPTARG ;;
h) usage ;;
*) usage ;;
esac
done
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
log_info() { echo -e "${BLUE}[INFO]${NC} $*"; }
log_ok() { echo -e "${GREEN}[OK]${NC} $*"; }
log_warn() { echo -e "${YELLOW}[WARN]${NC} $*"; }
log_err() { echo -e "${RED}[ERROR]${NC} $*"; }
# Build sized URL if -s was specified
SIZED_URL=""
SIZE_LABEL="mixed"
if [ -n "$PAYLOAD_SIZE" ]; then
PAYLOAD_BYTES=$(parse_size "$PAYLOAD_SIZE")
SIZED_URL="https://httpbin.org/bytes/${PAYLOAD_BYTES}"
SIZE_LABEL="${PAYLOAD_SIZE} (~${PAYLOAD_BYTES} bytes)"
fi
# Preflight checks
if ! nc -z 127.0.0.1 1080 2>/dev/null; then
log_err "SOCKS5 proxy not reachable on localhost:1080. Is the localnet running?"
exit 1
fi
# Counters (written to temp files for cross-process aggregation)
STATS_DIR=$(mktemp -d)
cleanup() {
kill $(jobs -p) 2>/dev/null || true
rm -rf "$STATS_DIR"
}
trap cleanup INT TERM EXIT
pick_url() {
if [ -n "$CUSTOM_URL" ]; then
echo "$CUSTOM_URL"
elif [ -n "$PAYLOAD_SIZE" ]; then
echo "$SIZED_URL"
else
local idx=$((RANDOM % ${#TARGETS[@]}))
echo "${TARGETS[$idx]}"
fi
}
# Millisecond timestamp (works on both GNU and BSD/macOS date)
now_ms() {
python3 -c 'import time; print(int(time.time()*1000))'
}
# Worker function: runs requests in a loop until duration expires
worker() {
local id=$1
local end_time=$2
local ok=0
local fail=0
while [ "$(date +%s)" -lt "$end_time" ]; do
local url
url=$(pick_url)
local start_ms
start_ms=$(now_ms)
if curl -x "$PROXY" -m 15 -sf -o /dev/null -w "" "$url" 2>/dev/null; then
ok=$((ok + 1))
else
fail=$((fail + 1))
fi
local end_ms
end_ms=$(now_ms)
local latency=$((end_ms - start_ms))
echo "$latency" >> "$STATS_DIR/latencies_${id}.txt"
done
echo "$ok" > "$STATS_DIR/ok_${id}.txt"
echo "$fail" > "$STATS_DIR/fail_${id}.txt"
}
echo ""
log_info "=== Nym Localnet Load Test ==="
log_info "Concurrency: $CONCURRENCY workers"
log_info "Duration: ${DURATION}s"
log_info "Payload: $SIZE_LABEL"
if [ -n "$CUSTOM_URL" ]; then
log_info "Target: $CUSTOM_URL"
elif [ -n "$PAYLOAD_SIZE" ]; then
log_info "Target: $SIZED_URL"
else
log_info "Targets: ${#TARGETS[@]} rotating URLs"
fi
log_info "Proxy: $PROXY"
echo ""
# Quick connectivity check
log_info "Preflight: testing SOCKS5 proxy..."
if curl -x "$PROXY" -m 15 -sf -o /dev/null "https://httpbin.org/get"; then
log_ok "SOCKS5 proxy is working"
else
log_err "SOCKS5 proxy test failed. Check localnet status."
exit 1
fi
END_TIME=$(( $(date +%s) + DURATION ))
START_TIME=$(date +%s)
log_info "Starting $CONCURRENCY workers for ${DURATION}s..."
echo ""
for i in $(seq 1 "$CONCURRENCY"); do
worker "$i" "$END_TIME" &
done
# Progress reporter (counts completed latency entries as a proxy for request count)
while [ "$(date +%s)" -lt "$END_TIME" ]; do
sleep "$STATS_INTERVAL"
elapsed=$(( $(date +%s) - START_TIME ))
remaining=$(( END_TIME - $(date +%s) ))
if [ "$remaining" -lt 0 ]; then remaining=0; fi
total=0
for f in "$STATS_DIR"/latencies_*.txt; do
if [ -f "$f" ]; then
count=$(wc -l < "$f" 2>/dev/null || echo 0)
total=$((total + count))
fi
done
if [ "$elapsed" -gt 0 ]; then
rps=$(echo "scale=1; $total / $elapsed" | bc 2>/dev/null || echo "?")
else
rps="?"
fi
printf "\r [%3ds / %3ds] requests: %d | ~%s req/s | remaining: %ds " \
"$elapsed" "$DURATION" "$total" "$rps" "$remaining"
done
echo ""
log_info "Waiting for workers to finish..."
wait 2>/dev/null || true
# Final stats
echo ""
log_info "=== Results ==="
total_ok=0
total_fail=0
all_latencies=""
for f in "$STATS_DIR"/ok_*.txt; do
[ -f "$f" ] && total_ok=$((total_ok + $(cat "$f" 2>/dev/null || echo 0)))
done
for f in "$STATS_DIR"/fail_*.txt; do
[ -f "$f" ] && total_fail=$((total_fail + $(cat "$f" 2>/dev/null || echo 0)))
done
for f in "$STATS_DIR"/latencies_*.txt; do
[ -f "$f" ] && all_latencies="$all_latencies $(cat "$f" 2>/dev/null | tr '\n' ' ')"
done
total=$((total_ok + total_fail))
actual_duration=$(( $(date +%s) - START_TIME ))
echo ""
echo " Total requests: $total"
echo " Successful: $total_ok"
echo " Failed: $total_fail"
if [ "$actual_duration" -gt 0 ]; then
rps=$(echo "scale=2; $total / $actual_duration" | bc 2>/dev/null || echo "?")
echo " Duration: ${actual_duration}s"
echo " Throughput: ~${rps} req/s"
fi
if [ -n "$all_latencies" ]; then
sorted=$(echo "$all_latencies" | tr ' ' '\n' | sort -n | grep -v '^$')
count=$(echo "$sorted" | wc -l | tr -d ' ')
if [ "$count" -gt 0 ]; then
p50_idx=$(( count * 50 / 100 ))
p95_idx=$(( count * 95 / 100 ))
p99_idx=$(( count * 99 / 100 ))
[ "$p50_idx" -lt 1 ] && p50_idx=1
[ "$p95_idx" -lt 1 ] && p95_idx=1
[ "$p99_idx" -lt 1 ] && p99_idx=1
min_lat=$(echo "$sorted" | head -1)
max_lat=$(echo "$sorted" | tail -1)
p50=$(echo "$sorted" | sed -n "${p50_idx}p")
p95=$(echo "$sorted" | sed -n "${p95_idx}p")
p99=$(echo "$sorted" | sed -n "${p99_idx}p")
echo ""
echo " Latency (ms):"
echo " min: ${min_lat}ms"
echo " p50: ${p50}ms"
echo " p95: ${p95}ms"
echo " p99: ${p99}ms"
echo " max: ${max_lat}ms"
fi
fi
echo ""
if [ "$total_fail" -gt 0 ] && [ "$total" -gt 0 ]; then
fail_pct=$(echo "scale=1; $total_fail * 100 / $total" | bc 2>/dev/null || echo "?")
log_warn "Failure rate: ${fail_pct}% -- ${total_fail} of ${total} failed"
else
log_ok "All requests succeeded"
fi
echo ""
log_info "View traces in SigNoz: http://localhost:8080/traces"
log_info "Filter by service: nym-node"
echo ""
+7 -14
View File
@@ -5,13 +5,6 @@
SESSION_NAME="nym-localnet-logs"
# Detect runtime
if command -v container &> /dev/null; then
RUNTIME="container"
else
RUNTIME="docker"
fi
# Container names
CONTAINERS=(
"nym-mixnode1"
@@ -24,9 +17,9 @@ CONTAINERS=(
# Check if containers are running
running_containers=()
for ctr in "${CONTAINERS[@]}"; do
if $RUNTIME inspect "$ctr" &>/dev/null; then
running_containers+=("$ctr")
for container in "${CONTAINERS[@]}"; do
if container inspect "$container" &>/dev/null; then
running_containers+=("$container")
fi
done
@@ -39,11 +32,11 @@ fi
# Check if we're already in tmux
if [ -n "$TMUX" ]; then
# Inside tmux - create new window
tmux new-window -n "logs" "$RUNTIME logs -f ${running_containers[0]}"
tmux new-window -n "logs" "container logs -f ${running_containers[0]}"
# Split for remaining containers
for ((i=1; i<${#running_containers[@]}; i++)); do
tmux split-window -t logs "$RUNTIME logs -f ${running_containers[$i]}"
tmux split-window -t logs "container logs -f ${running_containers[$i]}"
tmux select-layout -t logs tiled
done
@@ -55,11 +48,11 @@ else
exec tmux attach-session -t "$SESSION_NAME"
else
# Create new session
tmux new-session -d -s "$SESSION_NAME" -n "logs" "$RUNTIME logs -f ${running_containers[0]}"
tmux new-session -d -s "$SESSION_NAME" -n "logs" "container logs -f ${running_containers[0]}"
# Split for remaining containers
for ((i=1; i<${#running_containers[@]}; i++)); do
tmux split-window -t "$SESSION_NAME:logs" "$RUNTIME logs -f ${running_containers[$i]}"
tmux split-window -t "$SESSION_NAME:logs" "container logs -f ${running_containers[$i]}"
tmux select-layout -t "$SESSION_NAME:logs" tiled
done
+77 -140
View File
@@ -2,8 +2,8 @@
set -ex
# Nym Localnet Orchestration Script
# Supports both Docker and Apple Container Runtime
# Nym Localnet Orchestration Script for Apple Container Runtime
# Emulates docker-compose functionality
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
@@ -14,54 +14,6 @@ NYM_VOLUME_PATH="/tmp/nym-localnet-home-$$"
SUFFIX=${NYM_NODE_SUFFIX:-localnet}
# Detect container runtime: prefer Apple 'container' if available, fall back to docker
if command -v container &> /dev/null; then
RUNTIME="container"
HOST_INTERNAL="host.containers.internal"
else
RUNTIME="docker"
HOST_INTERNAL="host.docker.internal"
fi
# WireGuard: set to 1 only if you need VPN routing in localnet (requires NET_ADMIN and /dev/net/tun).
# Default 0: mixnet-only, no elevated capabilities required.
WIREGUARD_ENABLED=${WIREGUARD_ENABLED:-0}
# OpenTelemetry configuration
# Set OTEL_ENABLE=1 to enable OTel tracing on all nym-node instances.
# OTEL_ENDPOINT should point to the OTLP gRPC collector reachable from containers.
# When SigNoz runs in Docker (signoz-net), we route to its collector directly.
OTEL_ENABLE=${OTEL_ENABLE:-1}
if [ -z "${OTEL_ENDPOINT:-}" ]; then
SIGNOZ_NET=$(docker network ls --filter name=signoz-net --format '{{.Name}}' 2>/dev/null || true)
if [ "$RUNTIME" = "docker" ] && [ -n "$SIGNOZ_NET" ]; then
OTEL_ENDPOINT="http://signoz-otel-collector:4317"
OTEL_SIGNOZ_NET="$SIGNOZ_NET"
else
OTEL_ENDPOINT="http://${HOST_INTERNAL}:4317"
OTEL_SIGNOZ_NET=""
fi
fi
# Build OTel flags for nym-node run commands
otel_flags() {
if [ "$OTEL_ENABLE" = "1" ]; then
echo "--otel --otel-endpoint $OTEL_ENDPOINT"
fi
}
# WireGuard capability flags for gateway containers (only when WIREGUARD_ENABLED=1)
wireguard_cap_args() {
if [ "$WIREGUARD_ENABLED" = "1" ]; then
echo "--cap-add=NET_ADMIN --device /dev/net/tun"
fi
}
# --wireguard-enabled value for nym-node
wireguard_flag() {
[ "$WIREGUARD_ENABLED" = "1" ] && echo "true" || echo "false"
}
# Container names
INIT_CONTAINER="nym-localnet-init"
MIXNODE1_CONTAINER="nym-mixnode1"
@@ -112,13 +64,13 @@ cleanup_host_state() {
done
}
# Check prerequisites
# Check if container command exists
check_prerequisites() {
if ! command -v docker &> /dev/null; then
log_error "Docker not found"
if ! command -v container &> /dev/null; then
log_error "Apple 'container' command not found"
log_error "Install from: https://github.com/apple/container"
exit 1
fi
log_info "Using runtime: $RUNTIME"
}
# Build the Docker image
@@ -128,6 +80,7 @@ build_image() {
cd "$PROJECT_ROOT"
# Build with Docker
log_info "Building with Docker..."
if ! docker build \
-f "$SCRIPT_DIR/Dockerfile.localnet" \
@@ -137,24 +90,30 @@ build_image() {
exit 1
fi
# If using Apple container runtime, transfer image from Docker
if [ "$RUNTIME" = "container" ]; then
log_info "Transferring image to Apple container runtime..."
TEMP_IMAGE="/tmp/nym-localnet-image-$$.tar"
if ! docker save -o "$TEMP_IMAGE" "$IMAGE_NAME"; then
log_error "Failed to save Docker image"
exit 1
fi
if ! container image load --input "$TEMP_IMAGE"; then
rm -f "$TEMP_IMAGE"
log_error "Failed to load image into container runtime"
exit 1
fi
# Transfer image to container runtime
log_info "Transferring image to container runtime..."
# Save to temporary file (container image load doesn't support stdin)
TEMP_IMAGE="/tmp/nym-localnet-image-$$.tar"
if ! docker save -o "$TEMP_IMAGE" "$IMAGE_NAME"; then
log_error "Failed to save Docker image"
exit 1
fi
# Load into container runtime from file
if ! container image load --input "$TEMP_IMAGE"; then
rm -f "$TEMP_IMAGE"
if ! container image inspect "$IMAGE_NAME" &>/dev/null; then
log_error "Image not found in container runtime after load"
exit 1
fi
log_error "Failed to load image into container runtime"
exit 1
fi
# Clean up temporary file
rm -f "$TEMP_IMAGE"
# Verify image is available
if ! container image inspect "$IMAGE_NAME" &>/dev/null; then
log_error "Image not found in container runtime after load"
exit 1
fi
log_success "Image built and loaded: $IMAGE_NAME"
@@ -196,7 +155,7 @@ NETWORK_NAME="nym-localnet-network"
# Create container network
create_network() {
log_info "Creating container network: $NETWORK_NAME"
if $RUNTIME network create "$NETWORK_NAME" 2>/dev/null; then
if container network create "$NETWORK_NAME" 2>/dev/null; then
log_success "Network created: $NETWORK_NAME"
else
log_info "Network $NETWORK_NAME already exists or creation failed"
@@ -205,9 +164,9 @@ create_network() {
# Remove container network
remove_network() {
if $RUNTIME network list | grep -q "$NETWORK_NAME"; then
if container network list | grep -q "$NETWORK_NAME"; then
log_info "Removing network: $NETWORK_NAME"
$RUNTIME network rm "$NETWORK_NAME" 2>/dev/null || true
container network rm "$NETWORK_NAME" 2>/dev/null || true
log_success "Network removed"
fi
}
@@ -224,10 +183,7 @@ start_mixnode() {
local verloc_port="2000${node_id}"
local http_port="3000${node_id}"
local otel_args
otel_args=$(otel_flags)
$RUNTIME run \
container run \
--name "$container_name" \
-m 2G \
--network "$NETWORK_NAME" \
@@ -259,7 +215,7 @@ start_mixnode() {
sleep 2;
done;
echo "Starting mix'"${node_id}"'...";
exec nym-node '"${otel_args}"' run --id mix'"${node_id}"'-localnet --unsafe-disable-replay-protection --local
exec nym-node run --id mix'"${node_id}"'-localnet --unsafe-disable-replay-protection --local
'
log_success "$container_name started"
@@ -268,14 +224,9 @@ start_mixnode() {
start_gateway() {
log_info "Starting $GATEWAY_CONTAINER..."
local otel_args wg_flag
otel_args=$(otel_flags)
wg_flag=$(wireguard_flag)
$RUNTIME run \
container run \
--name "$GATEWAY_CONTAINER" \
-m 2G \
$(wireguard_cap_args) \
--network "$NETWORK_NAME" \
-p 9000:9000 \
-p 10004:10004 \
@@ -304,9 +255,11 @@ start_gateway() {
--http-bind-address=0.0.0.0:30004 \
--http-access-token=lala \
--public-ips $CONTAINER_IP \
--enable-lp true \
--lp-use-mock-ecash true \
--output=json \
--wireguard-enabled '"$wg_flag"' \
--wireguard-enabled true \
--wireguard-userspace true \
--bonding-information-output="/localnet/gateway.json";
echo "Waiting for network.json...";
@@ -314,7 +267,7 @@ start_gateway() {
sleep 2;
done;
echo "Starting gateway with LP listener (mock ecash)...";
exec nym-node '"${otel_args}"' run --id gateway-localnet --unsafe-disable-replay-protection --local --wireguard-enabled '"$wg_flag"' --lp-use-mock-ecash true
exec nym-node run --id gateway-localnet --unsafe-disable-replay-protection --local --wireguard-enabled true --wireguard-userspace true --lp-use-mock-ecash true
'
log_success "$GATEWAY_CONTAINER started"
@@ -338,14 +291,9 @@ start_gateway() {
start_gateway2() {
log_info "Starting $GATEWAY2_CONTAINER..."
local otel_args wg_flag
otel_args=$(otel_flags)
wg_flag=$(wireguard_flag)
$RUNTIME run \
container run \
--name "$GATEWAY2_CONTAINER" \
-m 2G \
$(wireguard_cap_args) \
--network "$NETWORK_NAME" \
-p 9001:9001 \
-p 10005:10005 \
@@ -374,9 +322,11 @@ start_gateway2() {
--http-bind-address=0.0.0.0:30005 \
--http-access-token=lala \
--public-ips $CONTAINER_IP \
--enable-lp true \
--lp-use-mock-ecash true \
--output=json \
--wireguard-enabled '"$wg_flag"' \
--wireguard-enabled true \
--wireguard-userspace true \
--bonding-information-output="/localnet/gateway2.json";
echo "Waiting for network.json...";
@@ -384,7 +334,7 @@ start_gateway2() {
sleep 2;
done;
echo "Starting gateway2 with LP listener (mock ecash)...";
exec nym-node '"${otel_args}"' run --id gateway2-localnet --unsafe-disable-replay-protection --local --wireguard-enabled '"$wg_flag"' --lp-use-mock-ecash true
exec nym-node run --id gateway2-localnet --unsafe-disable-replay-protection --local --wireguard-enabled true --wireguard-userspace true --lp-use-mock-ecash true
'
log_success "$GATEWAY2_CONTAINER started"
@@ -408,12 +358,12 @@ start_gateway2() {
start_network_requester() {
log_info "Starting $REQUESTER_CONTAINER..."
# Get gateway IP address (first IP only, in case container has multiple networks)
# Get gateway IP address
log_info "Getting gateway IP address..."
GATEWAY_IP=$($RUNTIME exec "$GATEWAY_CONTAINER" hostname -i | awk '{print $1}')
GATEWAY_IP=$(container exec "$GATEWAY_CONTAINER" hostname -i)
log_info "Gateway IP: $GATEWAY_IP"
$RUNTIME run \
container run \
--name "$REQUESTER_CONTAINER" \
--network "$NETWORK_NAME" \
-v "$VOLUME_PATH:/localnet" \
@@ -448,7 +398,7 @@ start_network_requester() {
start_socks5_client() {
log_info "Starting $SOCKS5_CONTAINER..."
$RUNTIME run \
container run \
--name "$SOCKS5_CONTAINER" \
--network "$NETWORK_NAME" \
-p 1080:1080 \
@@ -501,15 +451,15 @@ stop_containers() {
log_info "Stopping all containers..."
for container_name in "${ALL_CONTAINERS[@]}"; do
if $RUNTIME inspect "$container_name" &>/dev/null; then
if container inspect "$container_name" &>/dev/null; then
log_info "Stopping $container_name"
$RUNTIME stop "$container_name" 2>/dev/null || true
$RUNTIME rm "$container_name" 2>/dev/null || true
container stop "$container_name" 2>/dev/null || true
container rm "$container_name" 2>/dev/null || true
fi
done
# Also clean up init container if it exists
$RUNTIME rm "$INIT_CONTAINER" 2>/dev/null || true
container rm "$INIT_CONTAINER" 2>/dev/null || true
log_success "All containers stopped"
@@ -517,7 +467,7 @@ stop_containers() {
remove_network
}
# Show $RUNTIME logs
# Show container logs
show_logs() {
local container_name=${1:-}
@@ -528,8 +478,8 @@ show_logs() {
fi
# Show logs for specific container
if $RUNTIME inspect "$container_name" &>/dev/null; then
$RUNTIME logs -f "$container_name"
if container inspect "$container_name" &>/dev/null; then
container logs -f "$container_name"
else
log_error "Container not found: $container_name"
log_info "Available containers:"
@@ -546,8 +496,8 @@ show_status() {
echo ""
for container_name in "${ALL_CONTAINERS[@]}"; do
if $RUNTIME inspect "$container_name" &>/dev/null; then
local status=$($RUNTIME inspect "$container_name" 2>/dev/null | grep -o '"Status":"[^"]*"' | cut -d'"' -f4 || echo "unknown")
if container inspect "$container_name" &>/dev/null; then
local status=$(container inspect "$container_name" 2>/dev/null | grep -o '"Status":"[^"]*"' | cut -d'"' -f4 || echo "unknown")
echo -e " ${GREEN}${NC} $container_name - $status"
else
echo -e " ${RED}${NC} $container_name - not running"
@@ -602,13 +552,13 @@ build_topology() {
log_success " $file created"
done
# Get container IPs (first IP only, containers may be on multiple networks)
# Get container IPs
log_info "Getting container IP addresses..."
MIX1_IP=$($RUNTIME exec "$MIXNODE1_CONTAINER" hostname -i | awk '{print $1}')
MIX2_IP=$($RUNTIME exec "$MIXNODE2_CONTAINER" hostname -i | awk '{print $1}')
MIX3_IP=$($RUNTIME exec "$MIXNODE3_CONTAINER" hostname -i | awk '{print $1}')
GATEWAY_IP=$($RUNTIME exec "$GATEWAY_CONTAINER" hostname -i | awk '{print $1}')
GATEWAY2_IP=$($RUNTIME exec "$GATEWAY2_CONTAINER" hostname -i | awk '{print $1}')
MIX1_IP=$(container exec "$MIXNODE1_CONTAINER" hostname -i)
MIX2_IP=$(container exec "$MIXNODE2_CONTAINER" hostname -i)
MIX3_IP=$(container exec "$MIXNODE3_CONTAINER" hostname -i)
GATEWAY_IP=$(container exec "$GATEWAY_CONTAINER" hostname -i)
GATEWAY2_IP=$(container exec "$GATEWAY2_CONTAINER" hostname -i)
log_info "Container IPs:"
echo " mix1: $MIX1_IP"
@@ -618,7 +568,7 @@ build_topology() {
echo " gateway2: $GATEWAY2_IP"
# Run build_topology.py in a container with access to the volumes
$RUNTIME run \
container run \
--name "nym-localnet-topology-builder" \
--network "$NETWORK_NAME" \
-v "$VOLUME_PATH:/localnet" \
@@ -657,33 +607,20 @@ start_all() {
start_mixnode 3 "$MIXNODE3_CONTAINER"
start_gateway
start_gateway2
# Connect nym containers to SigNoz network for direct OTLP routing
if [ -n "${OTEL_SIGNOZ_NET:-}" ]; then
log_info "Connecting containers to SigNoz network ($OTEL_SIGNOZ_NET)..."
for c in "$MIXNODE1_CONTAINER" "$MIXNODE2_CONTAINER" "$MIXNODE3_CONTAINER" \
"$GATEWAY_CONTAINER" "$GATEWAY2_CONTAINER"; do
docker network connect "$OTEL_SIGNOZ_NET" "$c" 2>/dev/null && \
log_success " $c connected to $OTEL_SIGNOZ_NET" || true
done
fi
build_topology
# Configure networking for WireGuard VPN routing only when WIREGUARD_ENABLED=1
if [ "$WIREGUARD_ENABLED" = "1" ]; then
log_info "Configuring gateway networking (IP forwarding, NAT) for WireGuard..."
for gw in "$GATEWAY_CONTAINER" "$GATEWAY2_CONTAINER"; do
if $RUNTIME exec "$gw" sh -c "
echo 1 > /proc/sys/net/ipv4/ip_forward 2>/dev/null
iptables-legacy -t nat -A POSTROUTING -o eth0 -j MASQUERADE 2>/dev/null
" 2>/dev/null; then
log_success "Configured $gw"
else
log_warn "Could not configure NAT on $gw. WireGuard VPN routing may not work."
fi
done
fi
# Configure networking for two-hop WireGuard routing on both gateways
# Note: Runs after build_topology to ensure gateways have finished WireGuard setup
log_info "Configuring gateway networking (IP forwarding, NAT)..."
for gw in "$GATEWAY_CONTAINER" "$GATEWAY2_CONTAINER"; do
container exec "$gw" sh -c "
# Enable IP forwarding
echo 1 > /proc/sys/net/ipv4/ip_forward
# Add NAT masquerade for outbound traffic
iptables-legacy -t nat -A POSTROUTING -o eth0 -j MASQUERADE
"
log_success "Configured $gw"
done
start_network_requester
start_socks5_client
-222
View File
@@ -1,222 +0,0 @@
#!/bin/bash
# Nym Localnet OTel Report
# Queries ClickHouse directly to produce a terminal-based summary of
# the core metrics captured by the OTel-instrumented nym-nodes.
#
# Usage:
# ./otel-report.sh # last 15 minutes
# ./otel-report.sh 60 # last 60 minutes
# ./otel-report.sh live # live mode: refresh every 10s
#
# Prerequisites: localnet + SigNoz running
set -e
CH_CONTAINER="signoz-clickhouse"
TRACES_TABLE="signoz_traces.distributed_signoz_index_v3"
LOOKBACK_MIN=${1:-15}
LIVE=false
if [ "$1" = "live" ]; then
LIVE=true
LOOKBACK_MIN=5
fi
BLUE='\033[0;34m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
BOLD='\033[1m'
DIM='\033[2m'
NC='\033[0m'
ch() {
docker exec "$CH_CONTAINER" clickhouse-client --query "$1" 2>/dev/null
}
divider() {
echo -e "${DIM}$(printf '%.0s-' {1..78})${NC}"
}
print_report() {
local window="$1"
echo ""
echo -e "${BOLD} Nym Localnet -- OTel Packet Pipeline Report${NC}"
echo -e " ${DIM}Window: last ${window} minutes | $(date '+%Y-%m-%d %H:%M:%S')${NC}"
divider
# 1. Throughput per operation
echo -e "\n${BOLD} [1] Packet Throughput (packets/sec by operation)${NC}\n"
ch "
SELECT
name AS operation,
count(*) AS total,
round(count(*) / (${window} * 60), 1) AS per_sec
FROM ${TRACES_TABLE}
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
AND serviceName = 'nym-node'
AND name IN (
'handle_received_nym_packet',
'mixnode.sphinx_full_unwrap',
'mixnode.forward_packet',
'mixnode.final_hop'
)
GROUP BY name
ORDER BY total DESC
FORMAT PrettyCompactNoEscapes
"
divider
# 2. Latency per operation
echo -e "\n${BOLD} [2] Processing Latency (milliseconds)${NC}\n"
ch "
SELECT
name AS operation,
round(quantile(0.50)(duration_nano / 1e6), 3) AS p50_ms,
round(quantile(0.95)(duration_nano / 1e6), 3) AS p95_ms,
round(quantile(0.99)(duration_nano / 1e6), 3) AS p99_ms,
round(quantile(0.999)(duration_nano / 1e6), 3) AS p999_ms,
round(avg(duration_nano / 1e6), 3) AS avg_ms
FROM ${TRACES_TABLE}
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
AND serviceName = 'nym-node'
AND name IN (
'handle_received_nym_packet',
'mixnode.sphinx_full_unwrap',
'mixnode.forward_packet',
'mixnode.final_hop'
)
AND duration_nano < 60000000000
GROUP BY name
ORDER BY p50_ms DESC
FORMAT PrettyCompactNoEscapes
"
divider
# 3. Error rate
echo -e "\n${BOLD} [3] Error Rate${NC}\n"
local errors
errors=$(ch "
SELECT
name,
countIf(has_error = true) AS errors,
count(*) AS total,
round(100.0 * countIf(has_error = true) / count(*), 3) AS error_pct
FROM ${TRACES_TABLE}
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
AND serviceName = 'nym-node'
AND name IN (
'handle_received_nym_packet',
'mixnode.sphinx_full_unwrap',
'mixnode.forward_packet',
'mixnode.final_hop'
)
GROUP BY name
HAVING errors > 0
ORDER BY errors DESC
FORMAT PrettyCompactNoEscapes
")
if [ -z "$errors" ]; then
echo -e " ${GREEN}No errors detected across all operations${NC}"
else
echo "$errors"
fi
divider
# 4. Forwarding ratio (are packets being dropped between stages?)
echo -e "\n${BOLD} [4] Pipeline Funnel (packet drop detection)${NC}\n"
ch "
SELECT
name AS stage,
count(*) AS packets,
round(100.0 * count(*) / max(total_ingress), 1) AS pct_of_ingress
FROM ${TRACES_TABLE}
CROSS JOIN (
SELECT count(*) AS total_ingress
FROM ${TRACES_TABLE}
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
AND serviceName = 'nym-node'
AND name = 'handle_received_nym_packet'
) AS t
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
AND serviceName = 'nym-node'
AND name IN (
'handle_received_nym_packet',
'mixnode.sphinx_full_unwrap',
'mixnode.forward_packet',
'mixnode.final_hop'
)
GROUP BY name
ORDER BY packets DESC
FORMAT PrettyCompactNoEscapes
"
echo ""
echo -e " ${DIM}Expected ratios: sphinx_unwrap ~ 100%, forward ~ 75% (3 of 4 hops forward),${NC}"
echo -e " ${DIM}final_hop ~ 25% (1 of 4 hops is the last one). Significantly lower = drops.${NC}"
divider
# 5. Throughput over time (1-minute buckets)
echo -e "\n${BOLD} [5] Throughput Timeline (1-min buckets, ingress packets)${NC}\n"
ch "
SELECT
toStartOfMinute(timestamp) AS minute,
count(*) AS packets,
round(count(*) / 60, 1) AS per_sec
FROM ${TRACES_TABLE}
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
AND serviceName = 'nym-node'
AND name = 'handle_received_nym_packet'
GROUP BY minute
ORDER BY minute
FORMAT PrettyCompactNoEscapes
"
divider
# 6. Latency spikes (potential TCP congestion / backpressure indicators)
echo -e "\n${BOLD} [6] Latency Spikes (sphinx_unwrap p99 per minute)${NC}\n"
ch "
SELECT
toStartOfMinute(timestamp) AS minute,
round(quantile(0.99)(duration_nano / 1e6), 3) AS p99_ms,
round(quantile(0.50)(duration_nano / 1e6), 3) AS p50_ms,
round(p99_ms / greatest(p50_ms, 0.001), 1) AS spike_ratio,
count(*) AS samples
FROM ${TRACES_TABLE}
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
AND serviceName = 'nym-node'
AND name = 'mixnode.sphinx_full_unwrap'
GROUP BY minute
ORDER BY minute
FORMAT PrettyCompactNoEscapes
"
echo ""
echo -e " ${DIM}spike_ratio > 10x suggests backpressure or queue buildup.${NC}"
echo -e " ${DIM}Sustained high p99 across minutes may indicate TCP meltdown.${NC}"
divider
echo ""
echo -e " ${BLUE}SigNoz UI:${NC} http://localhost:8080"
echo -e " ${DIM}Traces tab -> Filter: serviceName = nym-node${NC}"
echo ""
}
if [ "$LIVE" = "true" ]; then
while true; do
clear
print_report "$LOOKBACK_MIN"
echo -e " ${DIM}Refreshing in 10s... (Ctrl+C to stop)${NC}"
sleep 10
done
else
print_report "$LOOKBACK_MIN"
fi
@@ -1 +1 @@
Wednesday, February 11th 2026, 11:35:05 UTC
Thursday, February 19th 2026, 13:59:34 UTC
@@ -11,7 +11,7 @@ options:
--no_routing_history Display node stats without routing history
--no_verloc_metrics Display node stats without verloc metrics
-m, --markdown Display results in markdown format
-o, --output [OUTPUT]
-o [OUTPUT], --output [OUTPUT]
Save results to file (in current dir or supply with
path without filename)
```
@@ -12,7 +12,8 @@ usage: nym-node-cli install [-h] [-V] [-d BRANCH] [-v]
options:
-h, --help show this help message and exit
-V, --version show program's version number and exit
-d, --dev BRANCH Define github branch (default: develop)
-d BRANCH, --dev BRANCH
Define github branch (default: develop)
-v, --verbose Show full error tracebacks
--mode {mixnode,entry-gateway,exit-gateway}
Node mode: 'mixnode', 'entry-gateway', or 'exit-
@@ -62,8 +62,6 @@ Options:
Tunnel port announced to external clients wishing to connect to the wireguard interface. Useful in the instances where the node is behind a proxy [env: NYMNODE_WG_ANNOUNCED_PORT=]
--wireguard-private-network-prefix <WIREGUARD_PRIVATE_NETWORK_PREFIX>
The prefix denoting the maximum number of the clients that can be connected via Wireguard. The maximum value for IPv4 is 32 and for IPv6 is 128 [env: NYMNODE_WG_PRIVATE_NETWORK_PREFIX=]
--wireguard-userspace <WIREGUARD_USERSPACE>
Use userspace implementation of WireGuard (wireguard-go) instead of kernel module. Useful in containerized environments without kernel WireGuard support [env: NYMNODE_WG_USERSPACE=] [possible values: true, false]
--verloc-bind-address <VERLOC_BIND_ADDRESS>
Socket address this node will use for binding its verloc API. default: `[::]:1790` [env: NYMNODE_VERLOC_BIND_ADDRESS=]
--verloc-announce-port <VERLOC_ANNOUNCE_PORT>
@@ -82,8 +80,6 @@ Options:
Endpoint to query to retrieve current upgrade mode attestation. This argument should never be set outside testnets and local networks [env: NYMNODE_UPGRADE_MODE_ATTESTATION_URL=]
--upgrade-mode-attester-public-key <UPGRADE_MODE_ATTESTER_PUBLIC_KEY>
Expected public key of the entity signing the published attestation. This argument should never be set outside testnets and local networks [env: NYMNODE_UPGRADE_MODE_ATTESTER_PUBKEY=]
--lp-use-mock-ecash <LP_USE_MOCK_ECASH>
Use mock ecash manager for LP testing. WARNING: Only use this for local testing! Never enable in production. When enabled, the LP listener will accept any credential without blockchain verification [env: NYMNODE_LP_USE_MOCK_ECASH=] [possible values: true, false]
--upstream-exit-policy-url <UPSTREAM_EXIT_POLICY_URL>
Specifies the url for an upstream source of the exit policy used by this node [env: NYMNODE_UPSTREAM_EXIT_POLICY=]
--open-proxy <OPEN_PROXY>
@@ -1,5 +1,6 @@
{
"nym-cli": "Nym-cli",
"diagnostic-tool": "Diagnostic Tool",
"echo-server": "Echo Server",
"standalone-tcpproxy": "TcpProxy Binaries (Standalone)"
}
@@ -0,0 +1,134 @@
import { Steps } from 'nextra/components';
# Diagnostic Tool
The Diagnostic Tool is a standalone binary designed to perform various network tests, including DNS, HTTP, and gateway connectivity tests. This tool helps diagnose connectivity issues and provides insights into network performance.
Its also possible to run it within the daemon with the same CLI interface.
## Download Binary
To get `nym-diagnostic` follow these steps:
<Steps>
###### 1. Download `nym-vpn-core`
- Navigate to [github.com/nymtech/nym-vpn-client/releases](https://github.com/nymtech/nym-vpn-client/releases)
- Find latest `nym-vpn-core-<VERSION>`
- Download version for your system
###### 2. Install or extract and make executable
- If you downloaded `.deb` installer, install it with this command:
```sh
sudo dpkg -i <FILE_NAME>
```
- If you downloaded `.tar.gz`, in terminal you can extract the file with
```sh
tar -xvf <FILE_NAME>
```
- Navigate inside the directory and make executable:
```sh
cd nym-vpn-core-<VERSION>
chmod +x ./*
```
</ Steps>
## CLI Usage
The Diagnostic Tool can be executed from the command line interface (CLI). Below are the usage instructions and options available. Read in the chapter [*Tests Performed*](#tests-performed) about the purpose and outcome of these commands.
### Command Syntax
```sh
./nym-diagnostic [command] [options]
./nym-vpnc diagnostic [command] [options]
```
#### `run` command arguments
The most useful command is `run`, here are the options for that command:
```sh
-h, --help Display help information and exit.
--skip-dns Skip the DNS tests
--skip-http Skip the HTTP tests
--gateway <ID_KEY> Run the gateway connectivity test on the given gateway. Skip those tests if not provided
-v, --verbose Enable verbose output for detailed logging.
```
#### `register` command arguments
Command `register` requires valid credential. Here are the options for that command:
```sh
--gateway <ID_KEY> Register to the given gateway
--storage-path Path to the directory containing the credentials database. If it is not valid registration will be skipped.
--skip-wireguard Skip Wireguard tests
```
### Command Examples
- Run all tests on a gateway:
```sh
./nym-diagnostic run --gateway <ID_KEY>
```
- Run the DNS tests only:
```sh
./nym-diagnostic run --skip-http
```
- Register to a gateway:
```sh
sudo ./nym-diagnostic register --gateway <ID_KEY> --storage-path /var/lib/nym-vpnd/mainnet
# sudo is required to read the database
```
- You can also run DNS and HTTP tests from `nym-vpnc` (installation [here](/developers/nymvpncli)):
```sh
./nym-vpnc diagnostic run
```
## Tests Performed
The Diagnostic Tool runs the following tests:
### 1. DNS Test
- **Purpose**: To check the resolution DNS availability.
- **Process**: We try to resolve all the domain names present in a given nym network environment with different DNS configurations
- **Output**: Displays the resolved IP address and the time taken for the resolution.
### 2. HTTP Test
- **Purpose**: To verify the accessibility of the NymVPN API.
- **Process**: The tool query the `health` endpoint as well as the `nodes/described` endpoint.
- **Output**: Displays the response of the `health` endpoint, the time skew and the number of nodes in the network (sanity check)
### 3. Gateway Test
- **Purpose**: To check the connectivity to a given gateway.
- **Process**: The tool fetches information about the gateway, then establishes a TCP connection, upgrades it to WS and sends a request
- **Output**: Display the gateway reported information, the status of the connections and the WS response.
### 4. Registration Test
- **Purpose:** To check the correctness of the registration process.
- **Process:** The tool tries to build a mixnet client to the provided gateway and then tries to register to the entry authenticator
- **Output:** Display the status of the different steps
- **Caveat:** This test requires a credential to be spent, which is why it is available as a separate command only
### 5. Wireguard Test
- **Purpose:** To check the soundness of a wireguard connection
- **Process:** The tool uses the registration data from the previous step to establish a wireguard connection and ping an IP.
- **Output:** Display the ping RTTs and any error that might have happened
## Reports
Reports are logged in a JSON format and also returned by the commands for a future use
@@ -62,12 +62,13 @@ There are multiple ways to monitor performance of nodes and the machines on whic
### Guides to Setup Own Metrics
A list of different scripts, templates and guides for easier navigation:
A list of different tools, templates and guides for easier navigation:
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx): a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
* [Diagnostic Tool](/developers/tools/diagnostic-tool): diagnose connectivity issues and provides insights into network performance
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx) - a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
* [Prometheus and Grafana](performance-and-testing/prometheus-grafana.mdx) self-hosted setup
* [Nym-node CPU cron service](https://gist.github.com/tommyv1987/97e939a7adf491333d686a8eaa68d4bd) - an easy bash script by Nym core developer [@tommy1987](https://gist.github.com/tommyv1987), designed to monitor a CPU usage of your node, running locally
* Nym's script [`prom_targets.py`](https://github.com/nymtech/nym/blob/develop/scripts/prom_targets.py) - a useful python program to request data from API and can be run on its own or plugged to more sophisticated flows
### Collecting Testing Metrics
@@ -202,3 +202,13 @@ PING_RETRIES=10 PING_TIMEOUT=5 CONCURRENCY=16 ./test-nodes-pings.sh
You can look up the IPs from `ping_not_working.csv`, using some online database, like [ipinfo.io](https://ipinfo.io).
Feel invited to share the outcome with Nym team, mentors and the rest of the operators in our [Matrix Node Operators channel](https://matrix.to/#/#operators:nymtech.chat).
## Guides to Setup Own Metrics
A list of different tools, templates and guides for easier navigation:
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx): a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
* [Diagnostic Tool](/developers/tools/diagnostic-tool): diagnose connectivity issues and provides insights into network performance
* [Prometheus and Grafana](performance-and-testing/prometheus-grafana.mdx) self-hosted setup
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-data-observatory"
version = "1.0.1"
version = "1.0.4"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
+10 -1
View File
@@ -1,5 +1,6 @@
use anyhow::{Result, anyhow};
use sqlx::{Postgres, migrate::Migrator, postgres::PgConnectOptions};
use std::env;
use std::str::FromStr;
use tracing::info;
@@ -19,7 +20,15 @@ pub(crate) struct Storage {
impl Storage {
pub async fn init(connection_url: String) -> Result<Self> {
let connect_options = PgConnectOptions::from_str(&connection_url)?;
let mut connect_options = PgConnectOptions::from_str(&connection_url)?;
let ssl_cert_path = env::var("PG_CERT").ok();
if let Some(ssl_cert) = ssl_cert_path {
connect_options = connect_options
.ssl_mode(sqlx::postgres::PgSslMode::Require)
.ssl_root_cert(ssl_cert);
}
let pool = DbPool::connect_with(connect_options)
.await
+19 -4
View File
@@ -3,6 +3,7 @@ use crate::db::{
queries::price::insert_nym_prices,
};
use core::str;
use std::env;
use tokio::time::Duration;
use crate::db::DbPool;
@@ -29,10 +30,24 @@ impl PriceScraper {
async fn get_coingecko_prices(&self) -> anyhow::Result<CoingeckoPriceResponse> {
tracing::info!("💰 Fetching CoinGecko prices from {COINGECKO_API_URL}");
let response = reqwest::get(COINGECKO_API_URL)
.await?
.json::<CoingeckoPriceResponse>()
.await;
let mut url = COINGECKO_API_URL.to_string();
let coin_gecko_api_key = env::var("COINGECKO_API_KEY").ok();
if let Some(api_key) = coin_gecko_api_key {
url = format!("{url}&x_cg_demo_api_key={api_key}")
}
let response = reqwest::get(url).await?;
if !response.status().is_success() {
tracing::error!(
"CoinGecko price query returned error: {}",
response.status()
);
}
let response = response.json::<CoingeckoPriceResponse>().await;
tracing::info!("Got response {:?}", response);
match response {
+3
View File
@@ -40,6 +40,8 @@ thiserror.workspace = true
tracing.workspace = true
tracing-indicatif = { workspace = true }
tracing-subscriber.workspace = true
opentelemetry = { workspace = true, features = ["trace"], optional = true }
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
tokio = { workspace = true, features = ["macros", "sync", "rt-multi-thread"] }
tokio-util = { workspace = true, features = ["codec"] }
tokio-stream = { workspace = true }
@@ -135,6 +137,7 @@ rand_chacha = { workspace = true }
[features]
tokio-console = ["console-subscriber", "nym-task/tokio-tracing"]
otel = ["nym-bin-common/otel-otlp", "dep:opentelemetry", "dep:opentelemetry_sdk"]
[lints]
workspace = true
+102 -28
View File
@@ -8,7 +8,6 @@ use crate::cli::commands::{
use crate::env::vars::{NYMNODE_CONFIG_ENV_FILE_ARG, NYMNODE_NO_BANNER_ARG};
use clap::{Args, Parser, Subcommand};
use nym_bin_common::bin_info;
use std::future::Future;
use std::sync::OnceLock;
pub(crate) mod commands;
@@ -22,6 +21,43 @@ fn pretty_build_info_static() -> &'static str {
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
/// OpenTelemetry-related CLI arguments. Only present when built with the `otel` feature.
#[cfg(feature = "otel")]
#[derive(Args, Debug, Clone)]
pub(crate) struct OtelArgs {
/// Enable OpenTelemetry tracing export via OTLP/gRPC.
#[clap(long, env = "NYMNODE_OTEL_ENABLE")]
pub(crate) otel: bool,
/// OpenTelemetry OTLP collector endpoint (gRPC).
/// Only used when --otel is enabled.
/// For SigNoz Cloud use https://ingest.<region>.signoz.cloud:443
#[clap(
long,
env = "NYMNODE_OTEL_ENDPOINT",
default_value = "http://localhost:4317"
)]
pub(crate) otel_endpoint: String,
/// SigNoz Cloud ingestion key for authenticated OTLP export.
/// Only needed for SigNoz Cloud (not self-hosted).
#[clap(long, env = "NYMNODE_OTEL_KEY")]
pub(crate) otel_key: Option<String>,
/// Deployment environment label attached to all exported traces.
/// Used to distinguish sandbox / mainnet / canary in the OTel backend.
#[clap(long, env = "NYMNODE_OTEL_ENV", default_value = "mainnet")]
pub(crate) otel_env: String,
/// Trace sampling ratio (0.0 to 1.0). e.g. 0.1 = 10%% of traces exported. Reduces cost.
#[clap(long, env = "NYMNODE_OTEL_SAMPLE_RATIO", default_value = "0.1")]
pub(crate) otel_sample_ratio: f64,
/// Timeout in seconds for each OTLP export batch. Prevents unbounded blocking.
#[clap(long, env = "NYMNODE_OTEL_EXPORT_TIMEOUT", default_value = "10")]
pub(crate) otel_export_timeout: u64,
}
#[derive(Parser, Debug)]
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
pub(crate) struct Cli {
@@ -40,44 +76,82 @@ pub(crate) struct Cli {
)]
pub(crate) no_banner: bool,
#[cfg(feature = "otel")]
#[clap(flatten)]
pub(crate) otel: OtelArgs,
#[clap(subcommand)]
command: Commands,
}
impl Cli {
fn execute_async<F: Future>(fut: F) -> anyhow::Result<F::Output> {
Ok(tokio::runtime::Builder::new_multi_thread()
pub(crate) fn execute(self) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(fut))
.build()?;
// Set up tracing inside the runtime so the OTel batch exporter (when enabled)
// can spawn its background tasks on the tokio reactor.
let use_otel = matches!(self.command, Commands::Run(..));
let _otel_guard = runtime.block_on(async { self.setup_logging(use_otel) })?;
// `_otel_guard` is dropped at function exit, flushing pending spans via its Drop impl
runtime.block_on(async {
match self.command {
Commands::BuildInfo(args) => build_info::execute(args)?,
Commands::BondingInformation(args) => bonding_information::execute(args).await?,
Commands::NodeDetails(args) => node_details::execute(args).await?,
Commands::Run(args) => run::execute(*args).await?,
Commands::Migrate(args) => migrate::execute(*args)?,
Commands::Sign(args) => sign::execute(args).await?,
Commands::TestThroughput(args) => test_throughput::execute(args)?,
Commands::UnsafeResetSphinxKeys(args) => reset_sphinx_keys::execute(args).await?,
Commands::Debug(debug) => match debug.command {
DebugCommands::ResetProvidersGatewayDbs(args) => {
debug::reset_providers_dbs::execute(args).await?
}
},
}
Ok::<(), anyhow::Error>(())
})
}
pub(crate) fn execute(self) -> anyhow::Result<()> {
// NOTE: `test_throughput` sets up its own logger as it has to include additional layers
if !matches!(self.command, Commands::TestThroughput(..)) {
crate::logging::setup_tracing_logger()?;
#[cfg(feature = "otel")]
fn build_otel_config(&self) -> Option<crate::logging::OtelConfig> {
if self.otel.otel {
Some(crate::logging::OtelConfig {
endpoint: self.otel.otel_endpoint.clone(),
service_name: "nym-node".to_string(),
ingestion_key: self.otel.otel_key.clone(),
environment: self.otel.otel_env.clone(),
sample_ratio: self.otel.otel_sample_ratio,
export_timeout_secs: self.otel.otel_export_timeout,
})
} else {
None
}
}
match self.command {
Commands::BuildInfo(args) => build_info::execute(args)?,
Commands::BondingInformation(args) => {
{ Self::execute_async(bonding_information::execute(args))? }?
}
Commands::NodeDetails(args) => { Self::execute_async(node_details::execute(args))? }?,
Commands::Run(args) => { Self::execute_async(run::execute(*args))? }?,
Commands::Migrate(args) => migrate::execute(*args)?,
Commands::Sign(args) => { Self::execute_async(sign::execute(args))? }?,
Commands::TestThroughput(args) => test_throughput::execute(args)?,
Commands::UnsafeResetSphinxKeys(args) => {
{ Self::execute_async(reset_sphinx_keys::execute(args))? }?
}
Commands::Debug(debug) => match debug.command {
DebugCommands::ResetProvidersGatewayDbs(args) => {
{ Self::execute_async(debug::reset_providers_dbs::execute(args))? }?
}
},
#[cfg(feature = "otel")]
fn setup_logging(&self, use_otel: bool) -> anyhow::Result<Option<crate::logging::OtelGuard>> {
if matches!(self.command, Commands::TestThroughput(..)) {
return Ok(None);
}
Ok(())
let otel_config = if use_otel {
self.build_otel_config()
} else {
None
};
crate::logging::setup_tracing_logger(otel_config)
}
#[cfg(not(feature = "otel"))]
fn setup_logging(&self, _use_otel: bool) -> anyhow::Result<Option<()>> {
if matches!(self.command, Commands::TestThroughput(..)) {
return Ok(None);
}
crate::logging::setup_tracing_logger()?;
Ok(None)
}
}
+111 -1
View File
@@ -7,6 +7,42 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
/// Configuration for OpenTelemetry OTLP export.
#[cfg(feature = "otel")]
pub(crate) struct OtelConfig {
/// OTLP/gRPC collector endpoint, e.g. `http://localhost:4317`
/// or `https://ingest.eu.signoz.cloud:443` for SigNoz Cloud.
pub endpoint: String,
/// Service name reported to the collector (appears in SigNoz "Services" view).
pub service_name: String,
/// Optional SigNoz Cloud ingestion key for authenticated export.
/// Sent as the `signoz-ingestion-key` gRPC metadata header.
pub ingestion_key: Option<String>,
/// Deployment environment label, e.g. `mainnet`, `sandbox`, `canary`.
/// Attached as the `deployment.environment` OTel resource attribute.
pub environment: String,
/// Trace sampling ratio in 0.0..=1.0 (e.g. 0.1 = 10% of traces). Used to limit cost.
pub sample_ratio: f64,
/// Timeout in seconds for each OTLP export batch. Prevents unbounded blocking.
pub export_timeout_secs: u64,
}
/// Handle returned when OTel is active. Flushes pending spans on drop
/// to prevent telemetry loss during panics or early exits.
#[cfg(feature = "otel")]
pub(crate) struct OtelGuard {
pub provider: opentelemetry_sdk::trace::SdkTracerProvider,
}
#[cfg(feature = "otel")]
impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(e) = self.provider.shutdown() {
eprintln!("OpenTelemetry shutdown error in Drop: {e}");
}
}
}
pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
directive.into().parse().map_err(From::from)
@@ -22,12 +58,86 @@ pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
Ok(filter)
}
/// Initialise the tracing subscriber stack.
///
/// When the `otel` feature is enabled **and** an `OtelConfig` is supplied, an
/// OTLP exporter layer is added and the returned `OtelGuard` must be used to
/// flush pending spans on shutdown.
#[cfg(feature = "otel")]
pub(crate) fn setup_tracing_logger(otel: Option<OtelConfig>) -> anyhow::Result<Option<OtelGuard>> {
let stderr_layer =
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
let console_layer = console_subscriber::spawn();
if let Some(otel_config) = otel {
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
&otel_config.service_name,
&otel_config.endpoint,
otel_config.ingestion_key.as_deref(),
&otel_config.environment,
otel_config.sample_ratio,
otel_config.export_timeout_secs,
).map_err(|e| anyhow::anyhow!(
"failed to initialise OpenTelemetry exporter (endpoint: {}, service: {}): {e}",
otel_config.endpoint,
otel_config.service_name,
))?;
tracing_subscriber::registry()
.with(console_layer)
.with(stderr_layer)
.with(otel_layer)
.init();
Ok(Some(OtelGuard { provider }))
} else {
tracing_subscriber::registry()
.with(console_layer)
.with(stderr_layer)
.init();
Ok(None)
}
} else {
if let Some(otel_config) = otel {
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
&otel_config.service_name,
&otel_config.endpoint,
otel_config.ingestion_key.as_deref(),
&otel_config.environment,
otel_config.sample_ratio,
otel_config.export_timeout_secs,
).map_err(|e| anyhow::anyhow!(
"failed to initialise OpenTelemetry exporter (endpoint: {}, service: {}): {e}",
otel_config.endpoint,
otel_config.service_name,
))?;
tracing_subscriber::registry()
.with(stderr_layer)
.with(otel_layer)
.init();
Ok(Some(OtelGuard { provider }))
} else {
tracing_subscriber::registry()
.with(stderr_layer)
.init();
Ok(None)
}
}}
}
/// Non-OTel variant -- identical subscriber stack without the OTLP layer.
#[cfg(not(feature = "otel"))]
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
let stderr_layer =
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
let console_layer = console_subscriber::spawn();
tracing_subscriber::registry()
+227 -65
View File
@@ -1,6 +1,7 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::key_rotation::active_keys::SphinxKeyGuard;
use crate::node::mixnet::shared::SharedData;
use futures::StreamExt;
use nym_noise::connection::Connection;
@@ -20,7 +21,10 @@ use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
use tracing::{debug, error, instrument, trace, warn};
use tracing::{Span, debug, error, instrument, trace, warn};
/// How often (in packets) the stream-level span updates its packet count.
const SPAN_UPDATE_INTERVAL: u64 = 10_000;
struct PendingReplayCheckPackets {
// map of rotation id used for packet creation to the packets
@@ -51,6 +55,10 @@ impl PendingReplayCheckPackets {
.push(packet.packet)
}
fn total_count(&self) -> usize {
self.packets.values().map(|v| v.len()).sum()
}
fn replay_tags(&self) -> HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>> {
let mut replay_tags = HashMap::with_capacity(self.packets.len());
'outer: for (rotation_id, packets) in &self.packets {
@@ -130,20 +138,54 @@ impl ConnectionHandler {
Some(now + delay)
}
#[instrument(
name = "mixnode.forward_packet",
skip(self, mix_packet, delay),
level = "debug",
fields(
remote_addr = %self.remote_address,
delay_ms = tracing::field::Empty,
)
)]
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
if !self.shared.processing_config.forward_hop_processing_enabled {
trace!("this nym-node does not support forward hop packets");
warn!(
event = "packet.dropped.forward_disabled",
remote_addr = %self.remote_address,
"dropping packet: forward hop processing disabled"
);
self.shared.dropped_forward_packet(self.remote_address.ip());
return;
}
let forward_instant = self.create_delay_target(now, delay);
if let Some(target) = forward_instant {
Span::current().record(
"delay_ms",
target.saturating_duration_since(now).as_millis() as u64,
);
}
self.shared.forward_mix_packet(mix_packet, forward_instant);
}
#[instrument(
name = "mixnode.final_hop",
skip(self, final_hop_data),
level = "debug",
fields(
remote_addr = %self.remote_address,
client_online,
disk_fallback = false,
ack_forwarded = false,
)
)]
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop) {
if !self.shared.processing_config.final_hop_processing_enabled {
trace!("this nym-node does not support final hop packets");
warn!(
event = "packet.dropped.final_hop_disabled",
remote_addr = %self.remote_address,
"dropping packet: final hop processing disabled"
);
self.shared
.dropped_final_hop_packet(self.remote_address.ip());
return;
@@ -151,11 +193,13 @@ impl ConnectionHandler {
let client = final_hop_data.destination;
let message = final_hop_data.message;
let has_ack = final_hop_data.forward_ack.is_some();
// if possible attempt to push message directly to the client
match self.shared.try_push_message_to_client(client, message) {
Err(unsent_plaintext) => {
// if that failed, store it on disk (to be 🔥 soon...)
// if that failed, store it on disk
Span::current().record("client_online", false);
match self
.shared
.store_processed_packet_payload(client, unsent_plaintext)
@@ -163,6 +207,7 @@ impl ConnectionHandler {
{
Err(err) => error!("Failed to store client data - {err}"),
Ok(_) => {
Span::current().record("disk_fallback", true);
self.shared
.metrics
.mixnet
@@ -172,13 +217,18 @@ impl ConnectionHandler {
}
}
}
Ok(_) => trace!("Pushed received packet to {client}"),
Ok(_) => {
Span::current().record("client_online", true);
trace!("Pushed received packet to {client}");
}
}
// if we managed to either push message directly to the [online] client or store it at
// its inbox, it means that it must exist at this gateway, hence we can send the
// received ack back into the network
// disk, forward the ack
self.shared.forward_ack_packet(final_hop_data.forward_ack);
if has_ack {
Span::current().record("ack_forwarded", true);
}
}
fn within_deferral_threshold(&self, now: Instant) -> bool {
@@ -206,32 +256,86 @@ impl ConnectionHandler {
if !time_threshold {
warn!(
"{}: time failure - {}",
event = "replay_detection.deferral_exceeded",
threshold_type = "time",
deferred_count = self.pending_packets.total_count(),
deferral_ms = now.saturating_duration_since(self.pending_packets.last_acquired_mutex).as_millis() as u64,
remote_addr = %self.remote_address,
"{}: time deferral threshold exceeded with {} pending packets",
self.remote_address,
self.pending_packets.packets.len()
self.pending_packets.total_count()
)
}
if !count_threshold {
warn!("{}, count failure", self.remote_address)
warn!(
event = "replay_detection.deferral_exceeded",
threshold_type = "count",
deferred_count = self.pending_packets.total_count(),
remote_addr = %self.remote_address,
"{}: count deferral threshold exceeded",
self.remote_address
)
}
time_threshold && count_threshold
}
/// Resolve the sphinx key for the given rotation, recording the rotation
/// label on the current tracing span. Returns `ExpiredKey` if the requested
/// odd/even key has already been rotated out.
fn resolve_rotation_key(
&self,
rotation: SphinxKeyRotation,
) -> Result<SphinxKeyGuard, PacketProcessingError> {
let rotation_label = match rotation {
SphinxKeyRotation::Unknown => "unknown",
SphinxKeyRotation::OddRotation => "odd",
SphinxKeyRotation::EvenRotation => "even",
};
Span::current().record("key_rotation", rotation_label);
match rotation {
SphinxKeyRotation::Unknown => Ok(self.shared.sphinx_keys.primary()),
SphinxKeyRotation::OddRotation => self.shared.sphinx_keys.odd().ok_or_else(|| {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "odd",
remote_addr = %self.remote_address,
"dropping packet: odd key rotation expired"
);
PacketProcessingError::ExpiredKey
}),
SphinxKeyRotation::EvenRotation => self.shared.sphinx_keys.even().ok_or_else(|| {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "even",
remote_addr = %self.remote_address,
"dropping packet: even key rotation expired"
);
PacketProcessingError::ExpiredKey
}),
}
}
#[instrument(
name = "mixnode.sphinx_partial_unwrap",
skip(self, packet),
level = "debug",
fields(key_rotation, unwrap_result,)
)]
fn try_partially_unwrap_packet(
&self,
packet: FramedNymPacket,
) -> Result<PartialyUnwrappedPacketWithKeyRotation, PacketProcessingError> {
// based on the received sphinx key rotation information,
// attempt to choose appropriate key for processing the packet
match packet.header().key_rotation {
let rotation = packet.header().key_rotation;
let result = match rotation {
SphinxKeyRotation::Unknown => {
let primary = self.shared.sphinx_keys.primary();
// Unknown rotation: try primary, fallback to secondary
let primary = self.resolve_rotation_key(rotation)?;
let primary_rotation = primary.rotation_id();
// we have to try both keys, start with the primary as it has higher likelihood of being correct
// if let Ok(partially_unwrapped) = PartiallyUnwrappedPacket::new()
match PartiallyUnwrappedPacket::new(packet, primary.inner().as_ref()) {
Ok(unwrapped_packet) => {
Ok(unwrapped_packet.with_key_rotation(primary_rotation))
@@ -248,25 +352,17 @@ impl ConnectionHandler {
}
}
}
SphinxKeyRotation::OddRotation => {
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
return Err(PacketProcessingError::ExpiredKey);
};
let odd_rotation = odd_key.rotation_id();
PartiallyUnwrappedPacket::new(packet, odd_key.inner().as_ref())
_ => {
let key = self.resolve_rotation_key(rotation)?;
let rotation_id = key.rotation_id();
PartiallyUnwrappedPacket::new(packet, key.inner().as_ref())
.map_err(|(_, err)| err)
.map(|p| p.with_key_rotation(odd_rotation))
.map(|p| p.with_key_rotation(rotation_id))
}
SphinxKeyRotation::EvenRotation => {
let Some(even_key) = self.shared.sphinx_keys.even() else {
return Err(PacketProcessingError::ExpiredKey);
};
let even_rotation = even_key.rotation_id();
PartiallyUnwrappedPacket::new(packet, even_key.inner().as_ref())
.map_err(|(_, err)| err)
.map(|p| p.with_key_rotation(even_rotation))
}
}
};
Span::current().record("unwrap_result", if result.is_ok() { "ok" } else { "err" });
result
}
async fn handle_received_packet_with_replay_detection(
@@ -280,6 +376,12 @@ impl ConnectionHandler {
Ok(unwrapped) => unwrapped,
Err(err) => {
trace!("failed to process received mix packet: {err}");
warn!(
event = "packet.dropped.malformed",
error = %err,
remote_addr = %self.remote_address,
"dropping malformed packet"
);
self.shared
.metrics
.mixnet
@@ -316,7 +418,9 @@ impl ConnectionHandler {
// 3. forward the packet to the relevant sink (if enabled)
match unwrapped_packet {
Err(err) => trace!("failed to process received mix packet: {err}"),
Err(err) => {
trace!("failed to process received mix packet: {err}");
}
Ok(processed_packet) => match processed_packet.processing_data {
MixProcessingResultData::ForwardHop { packet, delay } => {
self.handle_forward_packet(now, packet, delay);
@@ -334,6 +438,7 @@ impl ConnectionHandler {
packets: HashMap<u32, Vec<PartiallyUnwrappedPacket>>,
replay_check_results: HashMap<u32, Vec<bool>>,
) {
let mut replays_detected: u64 = 0;
for (rotation_id, packets) in packets {
let Some(replay_checks) = replay_check_results.get(&rotation_id) else {
// this should never happen, but if we messed up, and it does, don't panic, just drop the packets
@@ -342,6 +447,13 @@ impl ConnectionHandler {
};
for (packet, &replayed) in packets.into_iter().zip(replay_checks) {
let unwrapped_packet = if replayed {
replays_detected += 1;
warn!(
event = "packet.dropped.replay",
remote_addr = %self.remote_address,
rotation_id,
"dropping replayed packet"
);
Err(PacketProcessingError::PacketReplay)
} else {
packet.finalise_unwrapping()
@@ -350,6 +462,13 @@ impl ConnectionHandler {
self.handle_unwrapped_packet(now, unwrapped_packet).await;
}
}
if replays_detected > 0 {
debug!(
replays_detected,
remote_addr = %self.remote_address,
"replay detection batch completed with replays"
);
}
}
async fn handle_pending_packets_batch_no_locking(&mut self, now: Instant) -> bool {
@@ -379,13 +498,22 @@ impl ConnectionHandler {
true
}
#[instrument(
name = "mixnode.replay_check_batch",
skip(self),
level = "debug",
fields(batch_size, mutex_wait_ms,)
)]
async fn handle_pending_packets_batch(&mut self, now: Instant) {
let batch = self.pending_packets.reset(now);
let replay_tags = self.pending_packets.replay_tags();
if replay_tags.is_empty() {
return;
}
let batch_size = self.pending_packets.total_count();
Span::current().record("batch_size", batch_size as u64);
let mutex_start = Instant::now();
let Ok(replay_check_results) = self
.shared
.replay_protection_filter
@@ -396,37 +524,25 @@ impl ConnectionHandler {
self.shared.shutdown_token.cancel();
return;
};
Span::current().record("mutex_wait_ms", mutex_start.elapsed().as_millis() as u64);
let batch = self.pending_packets.reset(now);
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
.await;
}
#[instrument(
name = "mixnode.sphinx_full_unwrap",
skip(self, packet),
level = "debug",
fields(key_rotation)
)]
fn try_full_unwrap_packet(
&self,
packet: FramedNymPacket,
) -> Result<MixProcessingResult, PacketProcessingError> {
// based on the received sphinx key rotation information,
// attempt to choose appropriate key for processing the packet
// NOTE: due to the function signatures, outfox packets will **only** attempt primary key
// if no rotation information is available (but that's fine given outfox is not really in use,
// and by the time we need it, the rotation info should be present)
match packet.header().key_rotation {
SphinxKeyRotation::Unknown => {
process_framed_packet(packet, self.shared.sphinx_keys.primary().inner().as_ref())
}
SphinxKeyRotation::OddRotation => {
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
return Err(PacketProcessingError::ExpiredKey);
};
process_framed_packet(packet, odd_key.inner().as_ref())
}
SphinxKeyRotation::EvenRotation => {
let Some(even_key) = self.shared.sphinx_keys.even() else {
return Err(PacketProcessingError::ExpiredKey);
};
process_framed_packet(packet, even_key.inner().as_ref())
}
}
let key = self.resolve_rotation_key(packet.header().key_rotation)?;
process_framed_packet(packet, key.inner().as_ref())
}
async fn handle_received_packet_with_no_replay_detection(
@@ -456,23 +572,36 @@ impl ConnectionHandler {
}
#[instrument(
skip(self),
name = "mixnode.connection",
skip(self, socket),
level = "debug",
fields(
remote = %self.remote_address
remote = %self.remote_address,
noise_handshake_ms = tracing::field::Empty,
)
)]
pub(crate) async fn handle_connection(&mut self, socket: TcpStream) {
let handshake_start = Instant::now();
let noise_stream = match upgrade_noise_responder(socket, &self.shared.noise_config).await {
Ok(noise_stream) => noise_stream,
Err(err) => {
error!(
"Failed to perform Noise handshake with {:?} - {err}",
self.remote_address
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
warn!(
event = "connection.failed.noise",
remote_addr = %self.remote_address,
error = %err,
"Noise responder handshake failed"
);
return;
}
};
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
debug!(
"Noise responder handshake completed for {:?}",
self.remote_address
@@ -481,26 +610,58 @@ impl ConnectionHandler {
.await
}
#[instrument(
name = "mixnode.stream",
skip(self, mixnet_connection),
level = "debug",
fields(
remote = %self.remote_address,
packets_processed = 0u64,
exit_reason,
)
)]
pub(crate) async fn handle_stream(
&mut self,
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
) {
let mut packets_processed: u64 = 0;
loop {
tokio::select! {
biased;
_ = self.shared.shutdown_token.cancelled() => {
trace!("connection handler: received shutdown");
Span::current().record("exit_reason", "shutdown");
break
}
maybe_framed_nym_packet = mixnet_connection.next() => {
match maybe_framed_nym_packet {
Some(Ok(packet)) => self.handle_received_nym_packet(packet).await,
Some(Ok(packet)) => {
self.handle_received_nym_packet(packet).await;
packets_processed += 1;
if packets_processed.is_multiple_of(SPAN_UPDATE_INTERVAL) {
Span::current().record("packets_processed", packets_processed);
}
}
Some(Err(err)) => {
debug!("connection got corrupted with: {err}");
warn!(
event = "connection.corrupted",
remote_addr = %self.remote_address,
error = %err,
packets_processed,
"connection stream corrupted"
);
Span::current().record("exit_reason", "corrupted");
Span::current().record("packets_processed", packets_processed);
return
}
None => {
debug!("connection got closed by the remote");
debug!(
remote_addr = %self.remote_address,
packets_processed,
"connection closed by remote"
);
Span::current().record("exit_reason", "closed_by_remote");
Span::current().record("packets_processed", packets_processed);
return
}
}
@@ -508,6 +669,7 @@ impl ConnectionHandler {
}
}
Span::current().record("packets_processed", packets_processed);
debug!("exiting and closing connection");
}
}
@@ -56,11 +56,17 @@ impl<C, F> PacketForwarder<C, F> {
if let Err(err) = self.mixnet_client.send_without_response(packet) {
if err.kind() == io::ErrorKind::WouldBlock {
// we only know for sure if we dropped a packet if our sending queue was full
// in any other case the connection might still be re-established (or created for the first time)
// and the packet might get sent, but we won't know about it
warn!(
event = "packet.dropped.buffer_full",
next_hop = %next_hop,
"dropping packet: egress connection buffer full (WouldBlock)"
);
self.metrics.mixnet.egress_dropped_forward_packet(next_hop)
} else if err.kind() == io::ErrorKind::NotConnected {
// let's give the benefit of the doubt and assume we manage to establish connection
debug!(
next_hop = %next_hop,
"packet queued for not-yet-connected peer"
);
self.metrics.mixnet.egress_sent_forward_packet(next_hop)
}
} else {
@@ -86,7 +92,11 @@ impl<C, F> PacketForwarder<C, F> {
let next_hop = new_packet.packet.next_hop();
if !self.routing_filter.should_route(next_hop.as_ref().ip()) {
debug!("dropping packet as the egress address does not belong to any known node");
warn!(
event = "packet.dropped.routing_filter",
next_hop = %next_hop,
"dropping packet: egress address does not belong to any known node"
);
self.metrics
.mixnet
.egress_dropped_forward_packet(next_hop.into());
@@ -125,7 +135,7 @@ impl<C, F> PacketForwarder<C, F> {
C: SendWithoutResponse,
F: RoutingFilter,
{
let mut processed = 0;
let mut processed: u64 = 0;
trace!("starting PacketForwarder");
loop {
tokio::select! {
@@ -145,11 +155,29 @@ impl<C, F> PacketForwarder<C, F> {
#[allow(clippy::unwrap_used)]
self.handle_new_packet(new_packet.unwrap());
let channel_len = self.packet_sender.len();
if processed % 1000 == 0 {
let delay_queue_len = self.delay_queue.len();
if processed.is_multiple_of(1000) {
match channel_len {
n if n > 1000 => error!("there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"),
n if n > 500 => warn!("there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"),
n => trace!("there are currently {n} mix packets waiting to get forwarded"),
n if n > 1000 => error!(
event = "forwarder.queue_overload",
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"
),
n if n > 500 => warn!(
event = "forwarder.queue_high",
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"
),
n => trace!(
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"forwarder queue status"
),
}
}
self.update_channel_size_metric(channel_len);
+41 -4
View File
@@ -5,7 +5,8 @@ use nym_gateway::node::{
ActiveClientsStore, GatewayStorage, GatewayStorageError, InboxGatewayStorage,
};
use nym_sphinx_types::DestinationAddressBytes;
use tracing::debug;
use tokio::time::Instant;
use tracing::{debug, warn};
#[derive(Clone)]
pub(crate) struct SharedFinalHopData {
@@ -27,14 +28,37 @@ impl SharedFinalHopData {
message: Vec<u8>,
) -> Result<(), Vec<u8>> {
match self.active_clients.get_sender(client_address) {
None => Err(message),
None => {
debug!(
event = "gateway.push_to_client",
client_found = false,
send_result = "client_not_found",
"client {client_address} not found in active clients"
);
Err(message)
}
Some(sender_channel) => {
let send_start = Instant::now();
if let Err(unsent) = sender_channel.unbounded_send(vec![message]) {
warn!(
event = "gateway.push_to_client",
client_found = true,
send_result = "channel_closed",
send_us = send_start.elapsed().as_micros() as u64,
"client {client_address} channel closed, message not delivered"
);
// the unwrap here is fine as the original message got returned;
// plus we're only ever sending 1 message at the time (for now)
#[allow(clippy::unwrap_used)]
Err(unsent.into_inner().pop().unwrap())
} else {
debug!(
event = "gateway.push_to_client",
client_found = true,
send_result = "ok",
send_us = send_start.elapsed().as_micros() as u64,
"pushed message to client {client_address}"
);
Ok(())
}
}
@@ -46,8 +70,21 @@ impl SharedFinalHopData {
client_address: DestinationAddressBytes,
message: Vec<u8>,
) -> Result<(), GatewayStorageError> {
let start = Instant::now();
debug!("Storing received message for {client_address} on the disk...",);
self.storage.store_message(client_address, message).await
let result = self.storage.store_message(client_address, message).await;
let store_us = start.elapsed().as_micros() as u64;
if result.is_ok() {
debug!(
event = "gateway.disk_store",
store_us, "stored message for {client_address} on disk in {store_us}us"
);
} else {
warn!(
event = "gateway.disk_store_failed",
store_us, "failed to store message for {client_address} on disk after {store_us}us"
);
}
result
}
}
+3
View File
@@ -185,6 +185,7 @@ impl SharedData {
}
pub(super) fn forward_mix_packet(&self, packet: MixPacket, delay_until: Option<Instant>) {
let has_delay = delay_until.is_some();
if self
.mixnet_forwarder
.forward_packet(PacketToForward::new(packet, delay_until))
@@ -192,6 +193,8 @@ impl SharedData {
&& !self.shutdown_token.is_cancelled()
{
error!(
event = "forwarder.channel_send_failed",
has_delay,
"failed to forward sphinx packet on the channel while the process is not going through the shutdown!"
);
self.shutdown_token.cancel();