Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6131082cee | |||
| 8bd6a1006b | |||
| 1678fb6b94 | |||
| 94a3599b4d | |||
| a6bc54461a | |||
| 4f0c40dab7 | |||
| 3eff6e5e3b | |||
| a519f4ccb8 | |||
| a3ba3bfc5a | |||
| 988df7cff7 | |||
| 260f8e9714 | |||
| d28d0ac39e | |||
| dce4d6b34b | |||
| bc47e9a1b2 | |||
| 3b693741b2 | |||
| cb277fe487 | |||
| 8bb29f4d07 | |||
| e753f24ed1 | |||
| c7cd962627 | |||
| 00467e4440 | |||
| f3d1000472 | |||
| 597aae1a20 | |||
| 40a3cd28b7 |
@@ -76,3 +76,4 @@ CLAUDE.md
|
||||
.claude/settings.json
|
||||
|
||||
/notes
|
||||
/target-otel
|
||||
|
||||
Generated
+1627
-1626
File diff suppressed because it is too large
Load Diff
+5
-4
@@ -309,8 +309,10 @@ nix = "0.30.1"
|
||||
notify = "5.1.0"
|
||||
num_enum = "0.7.5"
|
||||
once_cell = "1.21.3"
|
||||
opentelemetry = "0.19.0"
|
||||
opentelemetry-jaeger = "0.18.0"
|
||||
opentelemetry = "0.31.0"
|
||||
opentelemetry_sdk = "0.31.0"
|
||||
opentelemetry-otlp = "0.31.0"
|
||||
tonic = "0.14.4"
|
||||
parking_lot = "0.12.3"
|
||||
pem = "0.8"
|
||||
petgraph = "0.6.5"
|
||||
@@ -368,9 +370,8 @@ tower = "0.5.2"
|
||||
tower-http = "0.6.6"
|
||||
tracing = "0.1.41"
|
||||
tracing-log = "0.2"
|
||||
tracing-opentelemetry = "0.19.0"
|
||||
tracing-opentelemetry = "0.32.1"
|
||||
tracing-subscriber = "0.3.20"
|
||||
tracing-tree = "0.2.2"
|
||||
tracing-indicatif = "0.3.9"
|
||||
tracing-test = "0.2.5"
|
||||
ts-rs = "10.1.0"
|
||||
|
||||
@@ -19,12 +19,15 @@ serde_json = { workspace = true, optional = true }
|
||||
|
||||
## tracing
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
|
||||
tracing-tree = { workspace = true, optional = true }
|
||||
tracing = { workspace = true, optional = true }
|
||||
opentelemetry-jaeger = { workspace = true, features = ["rt-tokio", "collector_client", "isahc_collector_client"], optional = true }
|
||||
tracing-opentelemetry = { workspace = true, optional = true }
|
||||
utoipa = { workspace = true, optional = true }
|
||||
opentelemetry = { workspace = true, features = ["rt-tokio"], optional = true }
|
||||
opentelemetry = { workspace = true, features = ["trace"], optional = true }
|
||||
|
||||
## otel-otlp (modern OTLP export to SigNoz/any OTLP collector)
|
||||
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
|
||||
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "trace", "tls-roots"], optional = true }
|
||||
tonic = { workspace = true, optional = true }
|
||||
|
||||
|
||||
[build-dependencies]
|
||||
@@ -35,13 +38,14 @@ default = []
|
||||
openapi = ["utoipa"]
|
||||
output_format = ["serde_json", "dep:clap"]
|
||||
bin_info_schema = ["schemars"]
|
||||
basic_tracing = ["dep:tracing", "tracing-subscriber"]
|
||||
tracing = [
|
||||
basic_tracing = ["dep:tracing", "dep:tracing-subscriber"]
|
||||
otel-otlp = [
|
||||
"basic_tracing",
|
||||
"tracing-tree",
|
||||
"opentelemetry-jaeger",
|
||||
"tracing-opentelemetry",
|
||||
"opentelemetry",
|
||||
"dep:opentelemetry",
|
||||
"dep:opentelemetry_sdk",
|
||||
"dep:opentelemetry-otlp",
|
||||
"dep:tracing-opentelemetry",
|
||||
"dep:tonic",
|
||||
]
|
||||
clap = ["dep:clap", "dep:clap_complete", "dep:clap_complete_fig"]
|
||||
models = []
|
||||
|
||||
@@ -4,16 +4,9 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::IsTerminal;
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use opentelemetry;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use opentelemetry_jaeger;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use tracing_opentelemetry;
|
||||
// Re-export tracing_subscriber for consumers that need to compose layers
|
||||
#[cfg(feature = "basic_tracing")]
|
||||
pub use tracing_subscriber;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use tracing_tree;
|
||||
|
||||
#[derive(Debug, Default, Copy, Clone, Deserialize, PartialEq, Eq, Serialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
@@ -69,40 +62,106 @@ pub fn setup_tracing_logger() {
|
||||
build_tracing_logger().init()
|
||||
}
|
||||
|
||||
// TODO: This has to be a macro, running it as a function does not work for the file_appender for some reason
|
||||
#[cfg(feature = "tracing")]
|
||||
#[macro_export]
|
||||
macro_rules! setup_tracing {
|
||||
($service_name: expr) => {
|
||||
use nym_bin_common::logging::tracing_subscriber::layer::SubscriberExt;
|
||||
use nym_bin_common::logging::tracing_subscriber::util::SubscriberInitExt;
|
||||
/// Initialize an OpenTelemetry tracing layer that exports spans via OTLP/gRPC.
|
||||
///
|
||||
/// This produces a layer compatible with `tracing_subscriber::registry()` that
|
||||
/// sends traces to any OTLP-compatible collector (SigNoz, Grafana Tempo, etc).
|
||||
///
|
||||
/// Returns both the tracing layer and the [`SdkTracerProvider`] so the caller
|
||||
/// can invoke [`SdkTracerProvider::shutdown`] for graceful flush on exit.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `service_name` - The service name reported to the collector (e.g. "nym-node")
|
||||
/// * `endpoint` - The OTLP/gRPC collector endpoint (e.g. "http://localhost:4317"
|
||||
/// or "https://ingest.eu.signoz.cloud:443" for SigNoz Cloud)
|
||||
/// * `ingestion_key` - Optional SigNoz Cloud ingestion key. When provided, it is
|
||||
/// sent as the `signoz-ingestion-key` gRPC metadata header on every export.
|
||||
/// * `environment` - Deployment environment label (e.g. "sandbox", "mainnet", "canary").
|
||||
/// Attached as the `deployment.environment` OTel resource attribute.
|
||||
/// * `sample_ratio` - Trace sampling ratio in 0.0..=1.0 (e.g. 0.1 = 10% of traces).
|
||||
/// Used to limit cost when exporting from many nodes; clamped to [0.0, 1.0].
|
||||
/// * `export_timeout_secs` - Timeout in seconds for each OTLP export batch. Prevents
|
||||
/// unbounded blocking if the collector is slow or unreachable.
|
||||
#[cfg(feature = "otel-otlp")]
|
||||
pub fn init_otel_layer<S>(
|
||||
service_name: &str,
|
||||
endpoint: &str,
|
||||
ingestion_key: Option<&str>,
|
||||
environment: &str,
|
||||
sample_ratio: f64,
|
||||
export_timeout_secs: u64,
|
||||
) -> Result<
|
||||
(
|
||||
tracing_opentelemetry::OpenTelemetryLayer<S, opentelemetry_sdk::trace::SdkTracer>,
|
||||
opentelemetry_sdk::trace::SdkTracerProvider,
|
||||
),
|
||||
Box<dyn std::error::Error + Send + Sync>,
|
||||
>
|
||||
where
|
||||
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
|
||||
{
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_otlp::WithTonicConfig;
|
||||
use opentelemetry_sdk::trace::Sampler;
|
||||
use std::time::Duration;
|
||||
|
||||
let registry = nym_bin_common::logging::tracing_subscriber::Registry::default()
|
||||
.with(nym_bin_common::logging::tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with(
|
||||
nym_bin_common::logging::tracing_tree::HierarchicalLayer::new(4)
|
||||
.with_targets(true)
|
||||
.with_bracketed_fields(true),
|
||||
);
|
||||
// Validate endpoint URI early to fail with a clear message
|
||||
if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") {
|
||||
return Err(format!(
|
||||
"invalid OTLP endpoint URI: {endpoint} (must start with http:// or https://)"
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let tracer = nym_bin_common::logging::opentelemetry_jaeger::new_collector_pipeline()
|
||||
.with_endpoint("http://44.199.230.10:14268/api/traces")
|
||||
.with_service_name($service_name)
|
||||
.with_isahc()
|
||||
.with_trace_config(
|
||||
nym_bin_common::logging::opentelemetry::sdk::trace::config().with_sampler(
|
||||
nym_bin_common::logging::opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(
|
||||
0.1,
|
||||
),
|
||||
),
|
||||
)
|
||||
.install_batch(nym_bin_common::logging::opentelemetry::runtime::Tokio)
|
||||
.expect("Could not init tracer");
|
||||
let sample_ratio_clamped = sample_ratio.clamp(0.0, 1.0);
|
||||
|
||||
let telemetry = nym_bin_common::logging::tracing_opentelemetry::layer().with_tracer(tracer);
|
||||
let mut builder = opentelemetry_otlp::SpanExporter::builder()
|
||||
.with_tonic()
|
||||
.with_endpoint(endpoint)
|
||||
.with_timeout(Duration::from_secs(export_timeout_secs));
|
||||
|
||||
registry.with(telemetry).init();
|
||||
};
|
||||
// Explicitly configure TLS when the endpoint uses HTTPS
|
||||
if endpoint.starts_with("https://") {
|
||||
builder =
|
||||
builder.with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
|
||||
}
|
||||
|
||||
if let Some(key) = ingestion_key {
|
||||
let mut metadata = tonic::metadata::MetadataMap::new();
|
||||
metadata.insert(
|
||||
"signoz-ingestion-key",
|
||||
key.parse()
|
||||
.map_err(|_| "invalid ingestion key format (value redacted)")?,
|
||||
);
|
||||
builder = builder.with_metadata(metadata);
|
||||
}
|
||||
|
||||
let exporter = builder
|
||||
.build()
|
||||
.map_err(|e| format!("failed to build OTLP exporter for endpoint {endpoint}: {e}"))?;
|
||||
|
||||
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
|
||||
.with_sampler(Sampler::TraceIdRatioBased(sample_ratio_clamped))
|
||||
.with_batch_exporter(exporter)
|
||||
.with_resource(
|
||||
opentelemetry_sdk::Resource::builder()
|
||||
.with_service_name(service_name.to_owned())
|
||||
.with_attribute(opentelemetry::KeyValue::new(
|
||||
"deployment.environment",
|
||||
environment.to_owned(),
|
||||
))
|
||||
.build(),
|
||||
)
|
||||
.build();
|
||||
|
||||
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
|
||||
let tracer = tracer_provider.tracer(service_name.to_owned());
|
||||
|
||||
Ok((
|
||||
tracing_opentelemetry::layer().with_tracer(tracer),
|
||||
tracer_provider,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn banner(crate_name: &str, crate_version: &str) -> String {
|
||||
|
||||
@@ -128,54 +128,95 @@ impl ManagedConnection {
|
||||
|
||||
async fn run(self) {
|
||||
let address = self.address;
|
||||
let reconnection_attempt = self.current_reconnection.load(Ordering::Acquire);
|
||||
let connect_start = tokio::time::Instant::now();
|
||||
let connection_fut = TcpStream::connect(address);
|
||||
|
||||
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
|
||||
Ok(stream_res) => match stream_res {
|
||||
Ok(stream) => {
|
||||
debug!("Managed to establish connection to {}", self.address);
|
||||
let connect_ms = connect_start.elapsed().as_millis() as u64;
|
||||
debug!(
|
||||
peer = %address,
|
||||
connect_ms,
|
||||
"Managed to establish connection to {}", self.address
|
||||
);
|
||||
|
||||
let noise_start = tokio::time::Instant::now();
|
||||
let noise_stream =
|
||||
match upgrade_noise_initiator(stream, &self.noise_config).await {
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
error!("Failed to perform Noise handshake with {address} - {err}");
|
||||
// we failed to finish the noise handshake - increase reconnection attempt
|
||||
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
|
||||
warn!(
|
||||
event = "connection.failed.noise",
|
||||
peer = %address,
|
||||
error = %err,
|
||||
connect_ms,
|
||||
noise_handshake_ms,
|
||||
reconnection_attempt,
|
||||
exit_reason = "noise_error",
|
||||
"Failed to perform Noise initiator handshake with {address}"
|
||||
);
|
||||
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
|
||||
return;
|
||||
}
|
||||
};
|
||||
// if we managed to connect AND do the noise handshake, reset the reconnection count (whatever it might have been)
|
||||
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
|
||||
self.current_reconnection.store(0, Ordering::Release);
|
||||
debug!("Noise initiator handshake completed for {:?}", address);
|
||||
debug!(
|
||||
peer = %address,
|
||||
connect_ms,
|
||||
noise_handshake_ms,
|
||||
"Noise initiator handshake completed for {:?}", address
|
||||
);
|
||||
Framed::new(noise_stream, NymCodec)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!("failed to establish connection to {address} (err: {err})",);
|
||||
let connect_ms = connect_start.elapsed().as_millis() as u64;
|
||||
warn!(
|
||||
event = "connection.failed.connect",
|
||||
peer = %address,
|
||||
error = %err,
|
||||
connect_ms,
|
||||
reconnection_attempt,
|
||||
exit_reason = "connect_error",
|
||||
"failed to establish connection to {address}"
|
||||
);
|
||||
return;
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
debug!(
|
||||
let connect_ms = connect_start.elapsed().as_millis() as u64;
|
||||
warn!(
|
||||
event = "connection.failed.timeout",
|
||||
peer = %address,
|
||||
timeout_ms = self.connection_timeout.as_millis() as u64,
|
||||
connect_ms,
|
||||
reconnection_attempt,
|
||||
exit_reason = "timeout",
|
||||
"failed to connect to {address} within {:?}",
|
||||
self.connection_timeout
|
||||
);
|
||||
|
||||
// we failed to connect - increase reconnection attempt
|
||||
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Take whatever the receiver channel produces and put it on the connection.
|
||||
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
|
||||
// about neither receiver nor the connection, it doesn't matter which one gets consumed
|
||||
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
|
||||
warn!("Failed to forward packets to {address}: {err}");
|
||||
warn!(
|
||||
event = "connection.forward_error",
|
||||
peer = %address,
|
||||
error = %err,
|
||||
exit_reason = "forward_error",
|
||||
"Failed to forward packets to {address}: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
|
||||
peer = %address,
|
||||
exit_reason = "sender_dropped",
|
||||
"connection manager to {address} finished"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -272,16 +313,18 @@ impl SendWithoutResponse for Client {
|
||||
trace!("Sending packet to {address}");
|
||||
|
||||
// TODO: optimisation for the future: rather than constantly using legacy encoding,
|
||||
// once we're addressing by node_id (and thus have full node info here),
|
||||
// we could simply infer supported encoding based on their version
|
||||
// use the mix packet type / flags to pick encoding per packet
|
||||
let framed_packet =
|
||||
FramedNymPacket::from_mix_packet(packet, self.config.use_legacy_packet_encoding);
|
||||
|
||||
let Some(sender) = self.active_connections.get_mut(&address) else {
|
||||
// there was never a connection to begin with
|
||||
debug!("establishing initial connection to {address}");
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
|
||||
// for sending for as soon as the connection is created
|
||||
debug!(
|
||||
event = "mixclient.try_send",
|
||||
peer = %address,
|
||||
result = "not_connected",
|
||||
"establishing initial connection to {address}"
|
||||
);
|
||||
self.make_connection(address, framed_packet);
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
@@ -289,15 +332,24 @@ impl SendWithoutResponse for Client {
|
||||
));
|
||||
};
|
||||
|
||||
let channel_capacity = sender.channel.max_capacity();
|
||||
let channel_available = sender.channel.capacity();
|
||||
let channel_used = channel_capacity - channel_available;
|
||||
|
||||
let sending_res = sender.channel.try_send(framed_packet);
|
||||
drop(sender);
|
||||
|
||||
sending_res.map_err(|err| {
|
||||
match err {
|
||||
TrySendError::Full(_) => {
|
||||
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
warn!(
|
||||
event = "mixclient.try_send",
|
||||
peer = %address,
|
||||
result = "full_dropped",
|
||||
channel_capacity,
|
||||
channel_used,
|
||||
"dropping packet: connection buffer to {address} is full ({channel_used}/{channel_capacity})"
|
||||
);
|
||||
io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
@@ -305,11 +357,13 @@ impl SendWithoutResponse for Client {
|
||||
}
|
||||
TrySendError::Closed(dropped) => {
|
||||
debug!(
|
||||
"Connection to {address} seems to be dead. attempting to re-establish it...",
|
||||
event = "mixclient.try_send",
|
||||
peer = %address,
|
||||
result = "closed_reconnecting",
|
||||
channel_capacity,
|
||||
channel_used,
|
||||
"connection to {address} dead, attempting re-establishment"
|
||||
);
|
||||
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, dropped);
|
||||
io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
|
||||
@@ -1,24 +1,22 @@
|
||||
# Multi-stage Dockerfile for Nym localnet
|
||||
# Stage 1: Build binaries
|
||||
# Stage 2: Slim runtime with only the final binaries
|
||||
# Single-stage Dockerfile for Nym localnet
|
||||
# Builds: nym-node, nym-network-requester, nym-socks5-client
|
||||
# Target: Apple Container Runtime with host networking
|
||||
|
||||
# --- Build stage ---
|
||||
FROM rust:latest AS builder
|
||||
FROM rust:latest
|
||||
|
||||
WORKDIR /usr/src/nym
|
||||
COPY ./ ./
|
||||
|
||||
ENV CARGO_BUILD_JOBS=8
|
||||
|
||||
RUN cargo build --release --locked -p nym-node --features otel && \
|
||||
cargo build --release --locked -p nym-network-requester -p nym-socks5-client
|
||||
# Build all required binaries in release mode
|
||||
RUN cargo build --release --locked \
|
||||
-p nym-node \
|
||||
-p nym-network-requester \
|
||||
-p nym-socks5-client
|
||||
|
||||
# --- Runtime stage ---
|
||||
FROM debian:trixie-slim
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
ca-certificates \
|
||||
build-essential \
|
||||
# Install runtime dependencies including Go for wireguard-go
|
||||
RUN apt update && apt install -y \
|
||||
python3 \
|
||||
python3-pip \
|
||||
netcat-openbsd \
|
||||
@@ -26,33 +24,31 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
iproute2 \
|
||||
net-tools \
|
||||
wireguard-tools \
|
||||
golang-go \
|
||||
git \
|
||||
iptables \
|
||||
curl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install Go and build wireguard-go, then clean up
|
||||
ARG TARGETARCH
|
||||
RUN curl -fsSL "https://go.dev/dl/go1.23.6.linux-${TARGETARCH}.tar.gz" \
|
||||
| tar -C /usr/local -xz && \
|
||||
export PATH="/usr/local/go/bin:$PATH" && \
|
||||
git clone https://git.zx2c4.com/wireguard-go && \
|
||||
# Install wireguard-go (userspace WireGuard implementation)
|
||||
RUN git clone https://git.zx2c4.com/wireguard-go && \
|
||||
cd wireguard-go && \
|
||||
make && \
|
||||
cp wireguard-go /usr/local/bin/ && \
|
||||
cd .. && \
|
||||
rm -rf wireguard-go /usr/local/go && \
|
||||
apt-get purge -y --auto-remove build-essential curl
|
||||
rm -rf wireguard-go
|
||||
|
||||
# Install Python dependencies for build_topology.py
|
||||
RUN pip3 install --break-system-packages base58
|
||||
|
||||
# Copy only the compiled binaries from the builder stage
|
||||
COPY --from=builder /usr/src/nym/target/release/nym-node /usr/local/bin/
|
||||
COPY --from=builder /usr/src/nym/target/release/nym-network-requester /usr/local/bin/
|
||||
COPY --from=builder /usr/src/nym/target/release/nym-socks5-client /usr/local/bin/
|
||||
# Move binaries to /usr/local/bin for easy access
|
||||
RUN cp target/release/nym-node /usr/local/bin/ && \
|
||||
cp target/release/nym-network-requester /usr/local/bin/ && \
|
||||
cp target/release/nym-socks5-client /usr/local/bin/
|
||||
|
||||
# Copy supporting scripts
|
||||
COPY ./docker/localnet/build_topology.py /usr/local/bin/
|
||||
|
||||
WORKDIR /nym
|
||||
|
||||
# Default command
|
||||
CMD ["nym-node", "--help"]
|
||||
|
||||
+37
-128
@@ -1,71 +1,35 @@
|
||||
# Nym Localnet
|
||||
# Nym Localnet for Kata Container Runtimes
|
||||
|
||||
A complete Nym mixnet test environment with OpenTelemetry instrumentation.
|
||||
Supports both Docker Desktop and Apple Container Runtime on macOS.
|
||||
A complete Nym mixnet test environment running on Apple's container runtime for macOS (for now).
|
||||
|
||||
## Overview
|
||||
|
||||
This localnet setup provides a fully functional Nym mixnet for local development and testing:
|
||||
- **3 mixnodes** (layer 1, 2, 3)
|
||||
- **2 gateways** (entry + exit mode)
|
||||
- **1 gateway** (entry + exit mode)
|
||||
- **1 network-requester** (service provider)
|
||||
- **1 SOCKS5 client**
|
||||
- **OpenTelemetry tracing** via OTLP/gRPC to SigNoz (or any OTLP collector)
|
||||
|
||||
All components run in isolated containers with proper networking and dynamic IP resolution.
|
||||
When the `otel` feature is enabled (default), every nym-node exports traces covering
|
||||
the full packet lifecycle: ingress, Sphinx processing, forwarding, and final-hop delivery.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
### Required
|
||||
- **macOS** (tested on macOS Sequoia 15.0+)
|
||||
- **Docker Desktop** (recommended) or **Apple Container Runtime**
|
||||
- **Apple Container Runtime** - Built into macOS
|
||||
- **Docker Desktop** (for building images only)
|
||||
- **Python 3** with `base58` library
|
||||
|
||||
### SigNoz (for trace viewing)
|
||||
|
||||
SigNoz is an open-source APM that receives and visualises OpenTelemetry data.
|
||||
Install it locally with Docker Compose -- this takes about 2 minutes:
|
||||
|
||||
```bash
|
||||
# Clone the SigNoz repository
|
||||
git clone -b main https://github.com/SigNoz/signoz.git ~/signoz
|
||||
cd ~/signoz/deploy
|
||||
|
||||
# Start SigNoz (runs ClickHouse, otel-collector, query-service, frontend)
|
||||
docker compose up -d
|
||||
|
||||
# Verify it is running
|
||||
docker ps --filter "name=signoz" --format "table {{.Names}}\t{{.Status}}"
|
||||
```
|
||||
|
||||
Once running:
|
||||
- **SigNoz UI**: http://localhost:8080
|
||||
- **OTLP gRPC collector**: localhost:4317 (used by nym-nodes)
|
||||
- **OTLP HTTP collector**: localhost:4318
|
||||
|
||||
The localnet script auto-detects the SigNoz Docker network (`signoz-net`) and
|
||||
routes OTel traffic directly to the collector container -- no manual endpoint
|
||||
configuration needed.
|
||||
|
||||
To stop SigNoz later:
|
||||
```bash
|
||||
cd ~/signoz/deploy && docker compose down
|
||||
```
|
||||
|
||||
### Installation
|
||||
```bash
|
||||
# Install Python dependencies
|
||||
pip3 install --break-system-packages base58
|
||||
|
||||
# Verify Docker is installed
|
||||
docker --version
|
||||
```
|
||||
|
||||
If using Apple Container Runtime instead of Docker:
|
||||
```bash
|
||||
# Verify container runtime is available
|
||||
container --version
|
||||
|
||||
# Verify Docker is installed (for building)
|
||||
docker --version
|
||||
```
|
||||
|
||||
## Quick Start
|
||||
@@ -118,17 +82,7 @@ Ports published to host:
|
||||
- 20001-20005 → Verloc ports
|
||||
- 30001-30005 → HTTP APIs
|
||||
- 41264/41265 → LP control ports (registration)
|
||||
- 51822/51823 → WireGuard tunnel ports (gateway/gateway2; only used when WireGuard is enabled)
|
||||
|
||||
### WireGuard and privileges
|
||||
|
||||
By default, gateways run with **WireGuard disabled** (`--wireguard-enabled false`). No elevated capabilities are required: the script does not use `--cap-add=NET_ADMIN` or `--device /dev/net/tun`, so localnet runs without net admin privileges and is suitable for mixnet packet testing and SOCKS5 over the mixnet.
|
||||
|
||||
To enable WireGuard VPN routing in localnet (e.g. for two-hop VPN tests), set `WIREGUARD_ENABLED=1` before starting. The script will then add `--cap-add=NET_ADMIN` and `--device /dev/net/tun` to the gateway containers and configure IP forwarding and NAT. This may not work in all Docker environments (e.g. some hosted runners restrict capabilities).
|
||||
|
||||
```bash
|
||||
WIREGUARD_ENABLED=1 ./localnet.sh start
|
||||
```
|
||||
- 51822/51823 → WireGuard tunnel ports (gateway/gateway2)
|
||||
|
||||
### Startup Flow
|
||||
|
||||
@@ -244,99 +198,54 @@ container logs nym-gateway --follow
|
||||
### Status
|
||||
```bash
|
||||
# List all containers
|
||||
docker ps --filter "name=nym-" --format "table {{.Names}}\t{{.Status}}"
|
||||
container list
|
||||
|
||||
# Check specific container
|
||||
docker logs nym-gateway
|
||||
container logs nym-gateway
|
||||
|
||||
# Inspect network
|
||||
docker network inspect nym-localnet-network
|
||||
container network inspect nym-localnet-network
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
### Basic SOCKS5 Test
|
||||
```bash
|
||||
# Simple HTTP request through the mixnet
|
||||
curl -x socks5h://127.0.0.1:1080 https://httpbin.org/get
|
||||
# Simple HTTP request with redirect following
|
||||
curl -L --socks5 localhost:1080 http://example.com
|
||||
|
||||
# HTTPS request
|
||||
curl -x socks5h://127.0.0.1:1080 https://nymtech.net
|
||||
curl -L --socks5 localhost:1080 https://nymtech.net
|
||||
|
||||
# Download a file
|
||||
curl -x socks5h://127.0.0.1:1080 \
|
||||
curl -L --socks5 localhost:1080 \
|
||||
https://test-download-files-nym.s3.amazonaws.com/download-files/1MB.zip \
|
||||
--output /tmp/test.zip
|
||||
```
|
||||
|
||||
### Load Testing
|
||||
|
||||
A load test script is included to generate sustained traffic and populate SigNoz
|
||||
with meaningful trace data:
|
||||
|
||||
```bash
|
||||
# Default: 10 concurrent workers, 60 seconds
|
||||
./loadtest.sh
|
||||
|
||||
# Heavier load: 20 workers for 2 minutes
|
||||
./loadtest.sh -c 20 -d 120
|
||||
|
||||
# Light single-threaded test
|
||||
./loadtest.sh -c 1 -d 10
|
||||
|
||||
# Target a specific URL
|
||||
./loadtest.sh -c 5 -d 30 -u https://httpbin.org/bytes/4096
|
||||
```
|
||||
|
||||
The script reports live progress, then prints a summary with request counts,
|
||||
throughput, and latency percentiles (p50/p95/p99).
|
||||
|
||||
### Verify Network Topology
|
||||
```bash
|
||||
# View the generated topology
|
||||
docker exec nym-gateway cat /localnet/network.json | jq .
|
||||
container exec nym-gateway cat /localnet/network.json | jq .
|
||||
|
||||
# Check container status
|
||||
docker ps --filter "name=nym-" --format "table {{.Names}}\t{{.Status}}"
|
||||
# Check container IPs
|
||||
container list | grep nym-
|
||||
|
||||
# Verify all bonding files exist
|
||||
docker exec nym-gateway ls -la /localnet/
|
||||
container exec nym-gateway ls -la /localnet/
|
||||
```
|
||||
|
||||
### Test Mixnet Routing
|
||||
```bash
|
||||
# All traffic flows through: client -> gateway -> mix1 -> mix2 -> mix3 -> gateway -> internet
|
||||
# All traffic flows through: client → mix1 → mix2 → mix3 → gateway → internet
|
||||
# Watch logs to verify routing:
|
||||
docker logs nym-mixnode1 --follow &
|
||||
docker logs nym-mixnode2 --follow &
|
||||
docker logs nym-mixnode3 --follow &
|
||||
docker logs nym-gateway --follow &
|
||||
container logs nym-mixnode1 --follow &
|
||||
container logs nym-mixnode2 --follow &
|
||||
container logs nym-mixnode3 --follow &
|
||||
container logs nym-gateway --follow &
|
||||
|
||||
# Make a request
|
||||
curl -x socks5h://127.0.0.1:1080 https://nymtech.net
|
||||
```
|
||||
|
||||
## OpenTelemetry
|
||||
|
||||
OTel is enabled by default. Each nym-node exports traces via OTLP/gRPC covering
|
||||
packet ingress, Sphinx processing, forwarding, and final-hop delivery.
|
||||
|
||||
### Viewing Traces
|
||||
|
||||
- **SigNoz UI**: http://localhost:8080 -- filter by `serviceName = nym-node`
|
||||
- **Terminal report** (queries ClickHouse directly, no login needed):
|
||||
|
||||
```bash
|
||||
./otel-report.sh # last 15 minutes
|
||||
./otel-report.sh 60 # last 60 minutes
|
||||
./otel-report.sh live # auto-refresh every 10s
|
||||
```
|
||||
|
||||
### Disabling OTel
|
||||
|
||||
```bash
|
||||
OTEL_ENABLE=0 ./localnet.sh start # disable
|
||||
OTEL_ENDPOINT=http://my-collector:4317 ./localnet.sh start # custom collector
|
||||
curl -L --socks5 localhost:1080 https://nymtech.com
|
||||
```
|
||||
|
||||
### LP (Lewes Protocol) Testing
|
||||
@@ -380,11 +289,8 @@ This makes localnet perfect for rapid LP protocol development and testing.
|
||||
docker/localnet/
|
||||
├── README.md # This file
|
||||
├── localnet.sh # Main orchestration script
|
||||
├── loadtest.sh # Load test / traffic generator
|
||||
├── otel-report.sh # Terminal-based OTel metrics report
|
||||
├── Dockerfile.localnet # Multi-stage Docker image (builder + slim runtime)
|
||||
├── build_topology.py # Topology generator
|
||||
└── localnet-logs.sh # Tmux-based multi-container log viewer
|
||||
├── Dockerfile.localnet # Docker image definition
|
||||
└── build_topology.py # Topology generator
|
||||
```
|
||||
|
||||
## How It Works
|
||||
@@ -674,11 +580,14 @@ start_mixnode 4 "$MIXNODE4_CONTAINER"
|
||||
|
||||
### Container Runtime
|
||||
|
||||
**Docker Desktop** is the default and recommended runtime; no extra setup is required for mixnet testing.
|
||||
Apple's container runtime is a native macOS container system:
|
||||
- Uses Virtualization.framework for isolation
|
||||
- Lightweight VMs for each container
|
||||
- Native macOS integration
|
||||
- Separate image store from Docker
|
||||
- Natively uses [Kata Containers](https://github.com/kata-containers/kata-containers) images
|
||||
|
||||
**Apple Container Runtime** is an optional alternative on macOS. It natively uses [Kata Containers](https://github.com/kata-containers/kata-containers) images and is only required if you use `container` instead of Docker (e.g. for consistency with other Apple tooling). Kata is also the path that provides a kernel with `CONFIG_TUN=y` if you need TUN/WireGuard inside containers under the Apple runtime.
|
||||
|
||||
### Initial setup for [Container Runtime](https://github.com/apple/container) (optional)
|
||||
### Initial setup for [Container Runtime](https://github.com/apple/container)
|
||||
|
||||
- **MUST** have MacOS Tahoe for inter-container networking
|
||||
- `brew install --cask container`
|
||||
@@ -722,7 +631,7 @@ Both are ephemeral by default (cleaned up on stop).
|
||||
- **No Docker Compose**: Uses custom orchestration script
|
||||
- **Dynamic IPs**: Container IPs may change between restarts
|
||||
- **Port conflicts**: Cannot run alongside services using same ports
|
||||
- **TUN device**: Only required when `WIREGUARD_ENABLED=1`; otherwise gateways run without it
|
||||
- **TUN device**: Gateway requires `ip` command for network interfaces
|
||||
|
||||
## Support
|
||||
|
||||
|
||||
@@ -1,297 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Nym Localnet Load Test
|
||||
# Generates sustained traffic through the mixnet SOCKS5 proxy to produce
|
||||
# OTel traces and exercise the packet pipeline end-to-end.
|
||||
#
|
||||
# Usage:
|
||||
# ./loadtest.sh # defaults: 10 concurrent, 60s, mixed sizes
|
||||
# ./loadtest.sh -c 20 -d 120 # 20 concurrent, 120s
|
||||
# ./loadtest.sh -s 64k # fixed 64KB responses (many Sphinx fragments)
|
||||
# ./loadtest.sh -s 1k -c 5 -d 30 # small payloads, 5 workers
|
||||
#
|
||||
# Payload sizes (-s flag) map to Sphinx packet fragmentation:
|
||||
# 1k = ~1 Sphinx packet (sub-MTU, minimal fragmentation)
|
||||
# 4k = ~2-3 packets (small payload)
|
||||
# 16k = ~8-10 packets (medium payload)
|
||||
# 64k = ~32-35 packets (large payload, stresses forwarding)
|
||||
# 256k = ~128-130 packets (heavy payload, stresses queues)
|
||||
# 1m = ~512 packets (very heavy, potential backpressure)
|
||||
#
|
||||
# Prerequisites:
|
||||
# - Localnet running (./localnet.sh start)
|
||||
# - SOCKS5 proxy available on localhost:1080
|
||||
|
||||
set -e
|
||||
|
||||
CONCURRENCY=10
|
||||
DURATION=60
|
||||
PROXY="socks5h://127.0.0.1:1080"
|
||||
PAYLOAD_SIZE=""
|
||||
CUSTOM_URL=""
|
||||
STATS_INTERVAL=5
|
||||
|
||||
# Default targets: mixed sizes for general testing
|
||||
TARGETS=(
|
||||
"https://httpbin.org/get"
|
||||
"https://httpbin.org/bytes/1024"
|
||||
"https://httpbin.org/delay/1"
|
||||
"https://example.com"
|
||||
"https://nym.com"
|
||||
)
|
||||
|
||||
# Convert human-readable size to bytes for httpbin
|
||||
parse_size() {
|
||||
local s
|
||||
s=$(echo "$1" | tr '[:upper:]' '[:lower:]')
|
||||
local num
|
||||
num=$(echo "$s" | sed 's/[a-z]*$//')
|
||||
case "$s" in
|
||||
*m|*mb) echo $(( num * 1024 * 1024 )) ;;
|
||||
*k|*kb) echo $(( num * 1024 )) ;;
|
||||
*) echo "$num" ;;
|
||||
esac
|
||||
}
|
||||
|
||||
usage() {
|
||||
echo "Usage: $0 [-c concurrency] [-d duration_secs] [-s payload_size] [-u url] [-p proxy]"
|
||||
echo ""
|
||||
echo "Options:"
|
||||
echo " -c Number of concurrent workers (default: $CONCURRENCY)"
|
||||
echo " -d Test duration in seconds (default: $DURATION)"
|
||||
echo " -s Response payload size: 1k, 4k, 16k, 64k, 256k, 1m (default: mixed)"
|
||||
echo " -u Custom target URL (overrides -s and default targets)"
|
||||
echo " -p SOCKS5 proxy address (default: $PROXY)"
|
||||
echo ""
|
||||
echo "Examples:"
|
||||
echo " $0 # 10 workers, 60s, mixed targets/sizes"
|
||||
echo " $0 -s 1k # small payloads (~1 Sphinx packet each)"
|
||||
echo " $0 -s 64k -c 5 # large payloads, 5 workers"
|
||||
echo " $0 -s 256k -c 2 -d 30 # very large payloads, observe queue pressure"
|
||||
echo " $0 -c 20 -d 120 # heavier concurrency, 2 minutes"
|
||||
exit 0
|
||||
}
|
||||
|
||||
while getopts "c:d:s:u:p:h" opt; do
|
||||
case $opt in
|
||||
c) CONCURRENCY=$OPTARG ;;
|
||||
d) DURATION=$OPTARG ;;
|
||||
s) PAYLOAD_SIZE=$OPTARG ;;
|
||||
u) CUSTOM_URL=$OPTARG ;;
|
||||
p) PROXY=$OPTARG ;;
|
||||
h) usage ;;
|
||||
*) usage ;;
|
||||
esac
|
||||
done
|
||||
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
BLUE='\033[0;34m'
|
||||
NC='\033[0m'
|
||||
|
||||
log_info() { echo -e "${BLUE}[INFO]${NC} $*"; }
|
||||
log_ok() { echo -e "${GREEN}[OK]${NC} $*"; }
|
||||
log_warn() { echo -e "${YELLOW}[WARN]${NC} $*"; }
|
||||
log_err() { echo -e "${RED}[ERROR]${NC} $*"; }
|
||||
|
||||
# Build sized URL if -s was specified
|
||||
SIZED_URL=""
|
||||
SIZE_LABEL="mixed"
|
||||
if [ -n "$PAYLOAD_SIZE" ]; then
|
||||
PAYLOAD_BYTES=$(parse_size "$PAYLOAD_SIZE")
|
||||
SIZED_URL="https://httpbin.org/bytes/${PAYLOAD_BYTES}"
|
||||
SIZE_LABEL="${PAYLOAD_SIZE} (~${PAYLOAD_BYTES} bytes)"
|
||||
fi
|
||||
|
||||
# Preflight checks
|
||||
if ! nc -z 127.0.0.1 1080 2>/dev/null; then
|
||||
log_err "SOCKS5 proxy not reachable on localhost:1080. Is the localnet running?"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Counters (written to temp files for cross-process aggregation)
|
||||
STATS_DIR=$(mktemp -d)
|
||||
cleanup() {
|
||||
kill $(jobs -p) 2>/dev/null || true
|
||||
rm -rf "$STATS_DIR"
|
||||
}
|
||||
trap cleanup INT TERM EXIT
|
||||
|
||||
pick_url() {
|
||||
if [ -n "$CUSTOM_URL" ]; then
|
||||
echo "$CUSTOM_URL"
|
||||
elif [ -n "$PAYLOAD_SIZE" ]; then
|
||||
echo "$SIZED_URL"
|
||||
else
|
||||
local idx=$((RANDOM % ${#TARGETS[@]}))
|
||||
echo "${TARGETS[$idx]}"
|
||||
fi
|
||||
}
|
||||
|
||||
# Millisecond timestamp (works on both GNU and BSD/macOS date)
|
||||
now_ms() {
|
||||
python3 -c 'import time; print(int(time.time()*1000))'
|
||||
}
|
||||
|
||||
# Worker function: runs requests in a loop until duration expires
|
||||
worker() {
|
||||
local id=$1
|
||||
local end_time=$2
|
||||
local ok=0
|
||||
local fail=0
|
||||
|
||||
while [ "$(date +%s)" -lt "$end_time" ]; do
|
||||
local url
|
||||
url=$(pick_url)
|
||||
local start_ms
|
||||
start_ms=$(now_ms)
|
||||
|
||||
if curl -x "$PROXY" -m 15 -sf -o /dev/null -w "" "$url" 2>/dev/null; then
|
||||
ok=$((ok + 1))
|
||||
else
|
||||
fail=$((fail + 1))
|
||||
fi
|
||||
|
||||
local end_ms
|
||||
end_ms=$(now_ms)
|
||||
local latency=$((end_ms - start_ms))
|
||||
|
||||
echo "$latency" >> "$STATS_DIR/latencies_${id}.txt"
|
||||
done
|
||||
|
||||
echo "$ok" > "$STATS_DIR/ok_${id}.txt"
|
||||
echo "$fail" > "$STATS_DIR/fail_${id}.txt"
|
||||
}
|
||||
|
||||
echo ""
|
||||
log_info "=== Nym Localnet Load Test ==="
|
||||
log_info "Concurrency: $CONCURRENCY workers"
|
||||
log_info "Duration: ${DURATION}s"
|
||||
log_info "Payload: $SIZE_LABEL"
|
||||
if [ -n "$CUSTOM_URL" ]; then
|
||||
log_info "Target: $CUSTOM_URL"
|
||||
elif [ -n "$PAYLOAD_SIZE" ]; then
|
||||
log_info "Target: $SIZED_URL"
|
||||
else
|
||||
log_info "Targets: ${#TARGETS[@]} rotating URLs"
|
||||
fi
|
||||
log_info "Proxy: $PROXY"
|
||||
echo ""
|
||||
|
||||
# Quick connectivity check
|
||||
log_info "Preflight: testing SOCKS5 proxy..."
|
||||
if curl -x "$PROXY" -m 15 -sf -o /dev/null "https://httpbin.org/get"; then
|
||||
log_ok "SOCKS5 proxy is working"
|
||||
else
|
||||
log_err "SOCKS5 proxy test failed. Check localnet status."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
END_TIME=$(( $(date +%s) + DURATION ))
|
||||
START_TIME=$(date +%s)
|
||||
|
||||
log_info "Starting $CONCURRENCY workers for ${DURATION}s..."
|
||||
echo ""
|
||||
|
||||
for i in $(seq 1 "$CONCURRENCY"); do
|
||||
worker "$i" "$END_TIME" &
|
||||
done
|
||||
|
||||
# Progress reporter (counts completed latency entries as a proxy for request count)
|
||||
while [ "$(date +%s)" -lt "$END_TIME" ]; do
|
||||
sleep "$STATS_INTERVAL"
|
||||
elapsed=$(( $(date +%s) - START_TIME ))
|
||||
remaining=$(( END_TIME - $(date +%s) ))
|
||||
if [ "$remaining" -lt 0 ]; then remaining=0; fi
|
||||
|
||||
total=0
|
||||
for f in "$STATS_DIR"/latencies_*.txt; do
|
||||
if [ -f "$f" ]; then
|
||||
count=$(wc -l < "$f" 2>/dev/null || echo 0)
|
||||
total=$((total + count))
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$elapsed" -gt 0 ]; then
|
||||
rps=$(echo "scale=1; $total / $elapsed" | bc 2>/dev/null || echo "?")
|
||||
else
|
||||
rps="?"
|
||||
fi
|
||||
|
||||
printf "\r [%3ds / %3ds] requests: %d | ~%s req/s | remaining: %ds " \
|
||||
"$elapsed" "$DURATION" "$total" "$rps" "$remaining"
|
||||
done
|
||||
|
||||
echo ""
|
||||
log_info "Waiting for workers to finish..."
|
||||
wait 2>/dev/null || true
|
||||
|
||||
# Final stats
|
||||
echo ""
|
||||
log_info "=== Results ==="
|
||||
total_ok=0
|
||||
total_fail=0
|
||||
all_latencies=""
|
||||
|
||||
for f in "$STATS_DIR"/ok_*.txt; do
|
||||
[ -f "$f" ] && total_ok=$((total_ok + $(cat "$f" 2>/dev/null || echo 0)))
|
||||
done
|
||||
for f in "$STATS_DIR"/fail_*.txt; do
|
||||
[ -f "$f" ] && total_fail=$((total_fail + $(cat "$f" 2>/dev/null || echo 0)))
|
||||
done
|
||||
for f in "$STATS_DIR"/latencies_*.txt; do
|
||||
[ -f "$f" ] && all_latencies="$all_latencies $(cat "$f" 2>/dev/null | tr '\n' ' ')"
|
||||
done
|
||||
|
||||
total=$((total_ok + total_fail))
|
||||
actual_duration=$(( $(date +%s) - START_TIME ))
|
||||
|
||||
echo ""
|
||||
echo " Total requests: $total"
|
||||
echo " Successful: $total_ok"
|
||||
echo " Failed: $total_fail"
|
||||
if [ "$actual_duration" -gt 0 ]; then
|
||||
rps=$(echo "scale=2; $total / $actual_duration" | bc 2>/dev/null || echo "?")
|
||||
echo " Duration: ${actual_duration}s"
|
||||
echo " Throughput: ~${rps} req/s"
|
||||
fi
|
||||
|
||||
if [ -n "$all_latencies" ]; then
|
||||
sorted=$(echo "$all_latencies" | tr ' ' '\n' | sort -n | grep -v '^$')
|
||||
count=$(echo "$sorted" | wc -l | tr -d ' ')
|
||||
if [ "$count" -gt 0 ]; then
|
||||
p50_idx=$(( count * 50 / 100 ))
|
||||
p95_idx=$(( count * 95 / 100 ))
|
||||
p99_idx=$(( count * 99 / 100 ))
|
||||
[ "$p50_idx" -lt 1 ] && p50_idx=1
|
||||
[ "$p95_idx" -lt 1 ] && p95_idx=1
|
||||
[ "$p99_idx" -lt 1 ] && p99_idx=1
|
||||
|
||||
min_lat=$(echo "$sorted" | head -1)
|
||||
max_lat=$(echo "$sorted" | tail -1)
|
||||
p50=$(echo "$sorted" | sed -n "${p50_idx}p")
|
||||
p95=$(echo "$sorted" | sed -n "${p95_idx}p")
|
||||
p99=$(echo "$sorted" | sed -n "${p99_idx}p")
|
||||
|
||||
echo ""
|
||||
echo " Latency (ms):"
|
||||
echo " min: ${min_lat}ms"
|
||||
echo " p50: ${p50}ms"
|
||||
echo " p95: ${p95}ms"
|
||||
echo " p99: ${p99}ms"
|
||||
echo " max: ${max_lat}ms"
|
||||
fi
|
||||
fi
|
||||
|
||||
echo ""
|
||||
if [ "$total_fail" -gt 0 ] && [ "$total" -gt 0 ]; then
|
||||
fail_pct=$(echo "scale=1; $total_fail * 100 / $total" | bc 2>/dev/null || echo "?")
|
||||
log_warn "Failure rate: ${fail_pct}% -- ${total_fail} of ${total} failed"
|
||||
else
|
||||
log_ok "All requests succeeded"
|
||||
fi
|
||||
echo ""
|
||||
log_info "View traces in SigNoz: http://localhost:8080/traces"
|
||||
log_info "Filter by service: nym-node"
|
||||
echo ""
|
||||
@@ -5,13 +5,6 @@
|
||||
|
||||
SESSION_NAME="nym-localnet-logs"
|
||||
|
||||
# Detect runtime
|
||||
if command -v container &> /dev/null; then
|
||||
RUNTIME="container"
|
||||
else
|
||||
RUNTIME="docker"
|
||||
fi
|
||||
|
||||
# Container names
|
||||
CONTAINERS=(
|
||||
"nym-mixnode1"
|
||||
@@ -24,9 +17,9 @@ CONTAINERS=(
|
||||
|
||||
# Check if containers are running
|
||||
running_containers=()
|
||||
for ctr in "${CONTAINERS[@]}"; do
|
||||
if $RUNTIME inspect "$ctr" &>/dev/null; then
|
||||
running_containers+=("$ctr")
|
||||
for container in "${CONTAINERS[@]}"; do
|
||||
if container inspect "$container" &>/dev/null; then
|
||||
running_containers+=("$container")
|
||||
fi
|
||||
done
|
||||
|
||||
@@ -39,11 +32,11 @@ fi
|
||||
# Check if we're already in tmux
|
||||
if [ -n "$TMUX" ]; then
|
||||
# Inside tmux - create new window
|
||||
tmux new-window -n "logs" "$RUNTIME logs -f ${running_containers[0]}"
|
||||
tmux new-window -n "logs" "container logs -f ${running_containers[0]}"
|
||||
|
||||
# Split for remaining containers
|
||||
for ((i=1; i<${#running_containers[@]}; i++)); do
|
||||
tmux split-window -t logs "$RUNTIME logs -f ${running_containers[$i]}"
|
||||
tmux split-window -t logs "container logs -f ${running_containers[$i]}"
|
||||
tmux select-layout -t logs tiled
|
||||
done
|
||||
|
||||
@@ -55,11 +48,11 @@ else
|
||||
exec tmux attach-session -t "$SESSION_NAME"
|
||||
else
|
||||
# Create new session
|
||||
tmux new-session -d -s "$SESSION_NAME" -n "logs" "$RUNTIME logs -f ${running_containers[0]}"
|
||||
tmux new-session -d -s "$SESSION_NAME" -n "logs" "container logs -f ${running_containers[0]}"
|
||||
|
||||
# Split for remaining containers
|
||||
for ((i=1; i<${#running_containers[@]}; i++)); do
|
||||
tmux split-window -t "$SESSION_NAME:logs" "$RUNTIME logs -f ${running_containers[$i]}"
|
||||
tmux split-window -t "$SESSION_NAME:logs" "container logs -f ${running_containers[$i]}"
|
||||
tmux select-layout -t "$SESSION_NAME:logs" tiled
|
||||
done
|
||||
|
||||
|
||||
+77
-140
@@ -2,8 +2,8 @@
|
||||
|
||||
set -ex
|
||||
|
||||
# Nym Localnet Orchestration Script
|
||||
# Supports both Docker and Apple Container Runtime
|
||||
# Nym Localnet Orchestration Script for Apple Container Runtime
|
||||
# Emulates docker-compose functionality
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
|
||||
@@ -14,54 +14,6 @@ NYM_VOLUME_PATH="/tmp/nym-localnet-home-$$"
|
||||
|
||||
SUFFIX=${NYM_NODE_SUFFIX:-localnet}
|
||||
|
||||
# Detect container runtime: prefer Apple 'container' if available, fall back to docker
|
||||
if command -v container &> /dev/null; then
|
||||
RUNTIME="container"
|
||||
HOST_INTERNAL="host.containers.internal"
|
||||
else
|
||||
RUNTIME="docker"
|
||||
HOST_INTERNAL="host.docker.internal"
|
||||
fi
|
||||
|
||||
# WireGuard: set to 1 only if you need VPN routing in localnet (requires NET_ADMIN and /dev/net/tun).
|
||||
# Default 0: mixnet-only, no elevated capabilities required.
|
||||
WIREGUARD_ENABLED=${WIREGUARD_ENABLED:-0}
|
||||
|
||||
# OpenTelemetry configuration
|
||||
# Set OTEL_ENABLE=1 to enable OTel tracing on all nym-node instances.
|
||||
# OTEL_ENDPOINT should point to the OTLP gRPC collector reachable from containers.
|
||||
# When SigNoz runs in Docker (signoz-net), we route to its collector directly.
|
||||
OTEL_ENABLE=${OTEL_ENABLE:-1}
|
||||
if [ -z "${OTEL_ENDPOINT:-}" ]; then
|
||||
SIGNOZ_NET=$(docker network ls --filter name=signoz-net --format '{{.Name}}' 2>/dev/null || true)
|
||||
if [ "$RUNTIME" = "docker" ] && [ -n "$SIGNOZ_NET" ]; then
|
||||
OTEL_ENDPOINT="http://signoz-otel-collector:4317"
|
||||
OTEL_SIGNOZ_NET="$SIGNOZ_NET"
|
||||
else
|
||||
OTEL_ENDPOINT="http://${HOST_INTERNAL}:4317"
|
||||
OTEL_SIGNOZ_NET=""
|
||||
fi
|
||||
fi
|
||||
|
||||
# Build OTel flags for nym-node run commands
|
||||
otel_flags() {
|
||||
if [ "$OTEL_ENABLE" = "1" ]; then
|
||||
echo "--otel --otel-endpoint $OTEL_ENDPOINT"
|
||||
fi
|
||||
}
|
||||
|
||||
# WireGuard capability flags for gateway containers (only when WIREGUARD_ENABLED=1)
|
||||
wireguard_cap_args() {
|
||||
if [ "$WIREGUARD_ENABLED" = "1" ]; then
|
||||
echo "--cap-add=NET_ADMIN --device /dev/net/tun"
|
||||
fi
|
||||
}
|
||||
|
||||
# --wireguard-enabled value for nym-node
|
||||
wireguard_flag() {
|
||||
[ "$WIREGUARD_ENABLED" = "1" ] && echo "true" || echo "false"
|
||||
}
|
||||
|
||||
# Container names
|
||||
INIT_CONTAINER="nym-localnet-init"
|
||||
MIXNODE1_CONTAINER="nym-mixnode1"
|
||||
@@ -112,13 +64,13 @@ cleanup_host_state() {
|
||||
done
|
||||
}
|
||||
|
||||
# Check prerequisites
|
||||
# Check if container command exists
|
||||
check_prerequisites() {
|
||||
if ! command -v docker &> /dev/null; then
|
||||
log_error "Docker not found"
|
||||
if ! command -v container &> /dev/null; then
|
||||
log_error "Apple 'container' command not found"
|
||||
log_error "Install from: https://github.com/apple/container"
|
||||
exit 1
|
||||
fi
|
||||
log_info "Using runtime: $RUNTIME"
|
||||
}
|
||||
|
||||
# Build the Docker image
|
||||
@@ -128,6 +80,7 @@ build_image() {
|
||||
|
||||
cd "$PROJECT_ROOT"
|
||||
|
||||
# Build with Docker
|
||||
log_info "Building with Docker..."
|
||||
if ! docker build \
|
||||
-f "$SCRIPT_DIR/Dockerfile.localnet" \
|
||||
@@ -137,24 +90,30 @@ build_image() {
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# If using Apple container runtime, transfer image from Docker
|
||||
if [ "$RUNTIME" = "container" ]; then
|
||||
log_info "Transferring image to Apple container runtime..."
|
||||
TEMP_IMAGE="/tmp/nym-localnet-image-$$.tar"
|
||||
if ! docker save -o "$TEMP_IMAGE" "$IMAGE_NAME"; then
|
||||
log_error "Failed to save Docker image"
|
||||
exit 1
|
||||
fi
|
||||
if ! container image load --input "$TEMP_IMAGE"; then
|
||||
rm -f "$TEMP_IMAGE"
|
||||
log_error "Failed to load image into container runtime"
|
||||
exit 1
|
||||
fi
|
||||
# Transfer image to container runtime
|
||||
log_info "Transferring image to container runtime..."
|
||||
|
||||
# Save to temporary file (container image load doesn't support stdin)
|
||||
TEMP_IMAGE="/tmp/nym-localnet-image-$$.tar"
|
||||
if ! docker save -o "$TEMP_IMAGE" "$IMAGE_NAME"; then
|
||||
log_error "Failed to save Docker image"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Load into container runtime from file
|
||||
if ! container image load --input "$TEMP_IMAGE"; then
|
||||
rm -f "$TEMP_IMAGE"
|
||||
if ! container image inspect "$IMAGE_NAME" &>/dev/null; then
|
||||
log_error "Image not found in container runtime after load"
|
||||
exit 1
|
||||
fi
|
||||
log_error "Failed to load image into container runtime"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Clean up temporary file
|
||||
rm -f "$TEMP_IMAGE"
|
||||
|
||||
# Verify image is available
|
||||
if ! container image inspect "$IMAGE_NAME" &>/dev/null; then
|
||||
log_error "Image not found in container runtime after load"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
log_success "Image built and loaded: $IMAGE_NAME"
|
||||
@@ -196,7 +155,7 @@ NETWORK_NAME="nym-localnet-network"
|
||||
# Create container network
|
||||
create_network() {
|
||||
log_info "Creating container network: $NETWORK_NAME"
|
||||
if $RUNTIME network create "$NETWORK_NAME" 2>/dev/null; then
|
||||
if container network create "$NETWORK_NAME" 2>/dev/null; then
|
||||
log_success "Network created: $NETWORK_NAME"
|
||||
else
|
||||
log_info "Network $NETWORK_NAME already exists or creation failed"
|
||||
@@ -205,9 +164,9 @@ create_network() {
|
||||
|
||||
# Remove container network
|
||||
remove_network() {
|
||||
if $RUNTIME network list | grep -q "$NETWORK_NAME"; then
|
||||
if container network list | grep -q "$NETWORK_NAME"; then
|
||||
log_info "Removing network: $NETWORK_NAME"
|
||||
$RUNTIME network rm "$NETWORK_NAME" 2>/dev/null || true
|
||||
container network rm "$NETWORK_NAME" 2>/dev/null || true
|
||||
log_success "Network removed"
|
||||
fi
|
||||
}
|
||||
@@ -224,10 +183,7 @@ start_mixnode() {
|
||||
local verloc_port="2000${node_id}"
|
||||
local http_port="3000${node_id}"
|
||||
|
||||
local otel_args
|
||||
otel_args=$(otel_flags)
|
||||
|
||||
$RUNTIME run \
|
||||
container run \
|
||||
--name "$container_name" \
|
||||
-m 2G \
|
||||
--network "$NETWORK_NAME" \
|
||||
@@ -259,7 +215,7 @@ start_mixnode() {
|
||||
sleep 2;
|
||||
done;
|
||||
echo "Starting mix'"${node_id}"'...";
|
||||
exec nym-node '"${otel_args}"' run --id mix'"${node_id}"'-localnet --unsafe-disable-replay-protection --local
|
||||
exec nym-node run --id mix'"${node_id}"'-localnet --unsafe-disable-replay-protection --local
|
||||
'
|
||||
|
||||
log_success "$container_name started"
|
||||
@@ -268,14 +224,9 @@ start_mixnode() {
|
||||
start_gateway() {
|
||||
log_info "Starting $GATEWAY_CONTAINER..."
|
||||
|
||||
local otel_args wg_flag
|
||||
otel_args=$(otel_flags)
|
||||
wg_flag=$(wireguard_flag)
|
||||
|
||||
$RUNTIME run \
|
||||
container run \
|
||||
--name "$GATEWAY_CONTAINER" \
|
||||
-m 2G \
|
||||
$(wireguard_cap_args) \
|
||||
--network "$NETWORK_NAME" \
|
||||
-p 9000:9000 \
|
||||
-p 10004:10004 \
|
||||
@@ -304,9 +255,11 @@ start_gateway() {
|
||||
--http-bind-address=0.0.0.0:30004 \
|
||||
--http-access-token=lala \
|
||||
--public-ips $CONTAINER_IP \
|
||||
--enable-lp true \
|
||||
--lp-use-mock-ecash true \
|
||||
--output=json \
|
||||
--wireguard-enabled '"$wg_flag"' \
|
||||
--wireguard-enabled true \
|
||||
--wireguard-userspace true \
|
||||
--bonding-information-output="/localnet/gateway.json";
|
||||
|
||||
echo "Waiting for network.json...";
|
||||
@@ -314,7 +267,7 @@ start_gateway() {
|
||||
sleep 2;
|
||||
done;
|
||||
echo "Starting gateway with LP listener (mock ecash)...";
|
||||
exec nym-node '"${otel_args}"' run --id gateway-localnet --unsafe-disable-replay-protection --local --wireguard-enabled '"$wg_flag"' --lp-use-mock-ecash true
|
||||
exec nym-node run --id gateway-localnet --unsafe-disable-replay-protection --local --wireguard-enabled true --wireguard-userspace true --lp-use-mock-ecash true
|
||||
'
|
||||
|
||||
log_success "$GATEWAY_CONTAINER started"
|
||||
@@ -338,14 +291,9 @@ start_gateway() {
|
||||
start_gateway2() {
|
||||
log_info "Starting $GATEWAY2_CONTAINER..."
|
||||
|
||||
local otel_args wg_flag
|
||||
otel_args=$(otel_flags)
|
||||
wg_flag=$(wireguard_flag)
|
||||
|
||||
$RUNTIME run \
|
||||
container run \
|
||||
--name "$GATEWAY2_CONTAINER" \
|
||||
-m 2G \
|
||||
$(wireguard_cap_args) \
|
||||
--network "$NETWORK_NAME" \
|
||||
-p 9001:9001 \
|
||||
-p 10005:10005 \
|
||||
@@ -374,9 +322,11 @@ start_gateway2() {
|
||||
--http-bind-address=0.0.0.0:30005 \
|
||||
--http-access-token=lala \
|
||||
--public-ips $CONTAINER_IP \
|
||||
--enable-lp true \
|
||||
--lp-use-mock-ecash true \
|
||||
--output=json \
|
||||
--wireguard-enabled '"$wg_flag"' \
|
||||
--wireguard-enabled true \
|
||||
--wireguard-userspace true \
|
||||
--bonding-information-output="/localnet/gateway2.json";
|
||||
|
||||
echo "Waiting for network.json...";
|
||||
@@ -384,7 +334,7 @@ start_gateway2() {
|
||||
sleep 2;
|
||||
done;
|
||||
echo "Starting gateway2 with LP listener (mock ecash)...";
|
||||
exec nym-node '"${otel_args}"' run --id gateway2-localnet --unsafe-disable-replay-protection --local --wireguard-enabled '"$wg_flag"' --lp-use-mock-ecash true
|
||||
exec nym-node run --id gateway2-localnet --unsafe-disable-replay-protection --local --wireguard-enabled true --wireguard-userspace true --lp-use-mock-ecash true
|
||||
'
|
||||
|
||||
log_success "$GATEWAY2_CONTAINER started"
|
||||
@@ -408,12 +358,12 @@ start_gateway2() {
|
||||
start_network_requester() {
|
||||
log_info "Starting $REQUESTER_CONTAINER..."
|
||||
|
||||
# Get gateway IP address (first IP only, in case container has multiple networks)
|
||||
# Get gateway IP address
|
||||
log_info "Getting gateway IP address..."
|
||||
GATEWAY_IP=$($RUNTIME exec "$GATEWAY_CONTAINER" hostname -i | awk '{print $1}')
|
||||
GATEWAY_IP=$(container exec "$GATEWAY_CONTAINER" hostname -i)
|
||||
log_info "Gateway IP: $GATEWAY_IP"
|
||||
|
||||
$RUNTIME run \
|
||||
container run \
|
||||
--name "$REQUESTER_CONTAINER" \
|
||||
--network "$NETWORK_NAME" \
|
||||
-v "$VOLUME_PATH:/localnet" \
|
||||
@@ -448,7 +398,7 @@ start_network_requester() {
|
||||
start_socks5_client() {
|
||||
log_info "Starting $SOCKS5_CONTAINER..."
|
||||
|
||||
$RUNTIME run \
|
||||
container run \
|
||||
--name "$SOCKS5_CONTAINER" \
|
||||
--network "$NETWORK_NAME" \
|
||||
-p 1080:1080 \
|
||||
@@ -501,15 +451,15 @@ stop_containers() {
|
||||
log_info "Stopping all containers..."
|
||||
|
||||
for container_name in "${ALL_CONTAINERS[@]}"; do
|
||||
if $RUNTIME inspect "$container_name" &>/dev/null; then
|
||||
if container inspect "$container_name" &>/dev/null; then
|
||||
log_info "Stopping $container_name"
|
||||
$RUNTIME stop "$container_name" 2>/dev/null || true
|
||||
$RUNTIME rm "$container_name" 2>/dev/null || true
|
||||
container stop "$container_name" 2>/dev/null || true
|
||||
container rm "$container_name" 2>/dev/null || true
|
||||
fi
|
||||
done
|
||||
|
||||
# Also clean up init container if it exists
|
||||
$RUNTIME rm "$INIT_CONTAINER" 2>/dev/null || true
|
||||
container rm "$INIT_CONTAINER" 2>/dev/null || true
|
||||
|
||||
log_success "All containers stopped"
|
||||
|
||||
@@ -517,7 +467,7 @@ stop_containers() {
|
||||
remove_network
|
||||
}
|
||||
|
||||
# Show $RUNTIME logs
|
||||
# Show container logs
|
||||
show_logs() {
|
||||
local container_name=${1:-}
|
||||
|
||||
@@ -528,8 +478,8 @@ show_logs() {
|
||||
fi
|
||||
|
||||
# Show logs for specific container
|
||||
if $RUNTIME inspect "$container_name" &>/dev/null; then
|
||||
$RUNTIME logs -f "$container_name"
|
||||
if container inspect "$container_name" &>/dev/null; then
|
||||
container logs -f "$container_name"
|
||||
else
|
||||
log_error "Container not found: $container_name"
|
||||
log_info "Available containers:"
|
||||
@@ -546,8 +496,8 @@ show_status() {
|
||||
echo ""
|
||||
|
||||
for container_name in "${ALL_CONTAINERS[@]}"; do
|
||||
if $RUNTIME inspect "$container_name" &>/dev/null; then
|
||||
local status=$($RUNTIME inspect "$container_name" 2>/dev/null | grep -o '"Status":"[^"]*"' | cut -d'"' -f4 || echo "unknown")
|
||||
if container inspect "$container_name" &>/dev/null; then
|
||||
local status=$(container inspect "$container_name" 2>/dev/null | grep -o '"Status":"[^"]*"' | cut -d'"' -f4 || echo "unknown")
|
||||
echo -e " ${GREEN}●${NC} $container_name - $status"
|
||||
else
|
||||
echo -e " ${RED}○${NC} $container_name - not running"
|
||||
@@ -602,13 +552,13 @@ build_topology() {
|
||||
log_success " $file created"
|
||||
done
|
||||
|
||||
# Get container IPs (first IP only, containers may be on multiple networks)
|
||||
# Get container IPs
|
||||
log_info "Getting container IP addresses..."
|
||||
MIX1_IP=$($RUNTIME exec "$MIXNODE1_CONTAINER" hostname -i | awk '{print $1}')
|
||||
MIX2_IP=$($RUNTIME exec "$MIXNODE2_CONTAINER" hostname -i | awk '{print $1}')
|
||||
MIX3_IP=$($RUNTIME exec "$MIXNODE3_CONTAINER" hostname -i | awk '{print $1}')
|
||||
GATEWAY_IP=$($RUNTIME exec "$GATEWAY_CONTAINER" hostname -i | awk '{print $1}')
|
||||
GATEWAY2_IP=$($RUNTIME exec "$GATEWAY2_CONTAINER" hostname -i | awk '{print $1}')
|
||||
MIX1_IP=$(container exec "$MIXNODE1_CONTAINER" hostname -i)
|
||||
MIX2_IP=$(container exec "$MIXNODE2_CONTAINER" hostname -i)
|
||||
MIX3_IP=$(container exec "$MIXNODE3_CONTAINER" hostname -i)
|
||||
GATEWAY_IP=$(container exec "$GATEWAY_CONTAINER" hostname -i)
|
||||
GATEWAY2_IP=$(container exec "$GATEWAY2_CONTAINER" hostname -i)
|
||||
|
||||
log_info "Container IPs:"
|
||||
echo " mix1: $MIX1_IP"
|
||||
@@ -618,7 +568,7 @@ build_topology() {
|
||||
echo " gateway2: $GATEWAY2_IP"
|
||||
|
||||
# Run build_topology.py in a container with access to the volumes
|
||||
$RUNTIME run \
|
||||
container run \
|
||||
--name "nym-localnet-topology-builder" \
|
||||
--network "$NETWORK_NAME" \
|
||||
-v "$VOLUME_PATH:/localnet" \
|
||||
@@ -657,33 +607,20 @@ start_all() {
|
||||
start_mixnode 3 "$MIXNODE3_CONTAINER"
|
||||
start_gateway
|
||||
start_gateway2
|
||||
|
||||
# Connect nym containers to SigNoz network for direct OTLP routing
|
||||
if [ -n "${OTEL_SIGNOZ_NET:-}" ]; then
|
||||
log_info "Connecting containers to SigNoz network ($OTEL_SIGNOZ_NET)..."
|
||||
for c in "$MIXNODE1_CONTAINER" "$MIXNODE2_CONTAINER" "$MIXNODE3_CONTAINER" \
|
||||
"$GATEWAY_CONTAINER" "$GATEWAY2_CONTAINER"; do
|
||||
docker network connect "$OTEL_SIGNOZ_NET" "$c" 2>/dev/null && \
|
||||
log_success " $c connected to $OTEL_SIGNOZ_NET" || true
|
||||
done
|
||||
fi
|
||||
|
||||
build_topology
|
||||
|
||||
# Configure networking for WireGuard VPN routing only when WIREGUARD_ENABLED=1
|
||||
if [ "$WIREGUARD_ENABLED" = "1" ]; then
|
||||
log_info "Configuring gateway networking (IP forwarding, NAT) for WireGuard..."
|
||||
for gw in "$GATEWAY_CONTAINER" "$GATEWAY2_CONTAINER"; do
|
||||
if $RUNTIME exec "$gw" sh -c "
|
||||
echo 1 > /proc/sys/net/ipv4/ip_forward 2>/dev/null
|
||||
iptables-legacy -t nat -A POSTROUTING -o eth0 -j MASQUERADE 2>/dev/null
|
||||
" 2>/dev/null; then
|
||||
log_success "Configured $gw"
|
||||
else
|
||||
log_warn "Could not configure NAT on $gw. WireGuard VPN routing may not work."
|
||||
fi
|
||||
done
|
||||
fi
|
||||
# Configure networking for two-hop WireGuard routing on both gateways
|
||||
# Note: Runs after build_topology to ensure gateways have finished WireGuard setup
|
||||
log_info "Configuring gateway networking (IP forwarding, NAT)..."
|
||||
for gw in "$GATEWAY_CONTAINER" "$GATEWAY2_CONTAINER"; do
|
||||
container exec "$gw" sh -c "
|
||||
# Enable IP forwarding
|
||||
echo 1 > /proc/sys/net/ipv4/ip_forward
|
||||
# Add NAT masquerade for outbound traffic
|
||||
iptables-legacy -t nat -A POSTROUTING -o eth0 -j MASQUERADE
|
||||
"
|
||||
log_success "Configured $gw"
|
||||
done
|
||||
|
||||
start_network_requester
|
||||
start_socks5_client
|
||||
|
||||
@@ -1,222 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Nym Localnet OTel Report
|
||||
# Queries ClickHouse directly to produce a terminal-based summary of
|
||||
# the core metrics captured by the OTel-instrumented nym-nodes.
|
||||
#
|
||||
# Usage:
|
||||
# ./otel-report.sh # last 15 minutes
|
||||
# ./otel-report.sh 60 # last 60 minutes
|
||||
# ./otel-report.sh live # live mode: refresh every 10s
|
||||
#
|
||||
# Prerequisites: localnet + SigNoz running
|
||||
|
||||
set -e
|
||||
|
||||
CH_CONTAINER="signoz-clickhouse"
|
||||
TRACES_TABLE="signoz_traces.distributed_signoz_index_v3"
|
||||
LOOKBACK_MIN=${1:-15}
|
||||
LIVE=false
|
||||
|
||||
if [ "$1" = "live" ]; then
|
||||
LIVE=true
|
||||
LOOKBACK_MIN=5
|
||||
fi
|
||||
|
||||
BLUE='\033[0;34m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
RED='\033[0;31m'
|
||||
BOLD='\033[1m'
|
||||
DIM='\033[2m'
|
||||
NC='\033[0m'
|
||||
|
||||
ch() {
|
||||
docker exec "$CH_CONTAINER" clickhouse-client --query "$1" 2>/dev/null
|
||||
}
|
||||
|
||||
divider() {
|
||||
echo -e "${DIM}$(printf '%.0s-' {1..78})${NC}"
|
||||
}
|
||||
|
||||
print_report() {
|
||||
local window="$1"
|
||||
|
||||
echo ""
|
||||
echo -e "${BOLD} Nym Localnet -- OTel Packet Pipeline Report${NC}"
|
||||
echo -e " ${DIM}Window: last ${window} minutes | $(date '+%Y-%m-%d %H:%M:%S')${NC}"
|
||||
divider
|
||||
|
||||
# 1. Throughput per operation
|
||||
echo -e "\n${BOLD} [1] Packet Throughput (packets/sec by operation)${NC}\n"
|
||||
ch "
|
||||
SELECT
|
||||
name AS operation,
|
||||
count(*) AS total,
|
||||
round(count(*) / (${window} * 60), 1) AS per_sec
|
||||
FROM ${TRACES_TABLE}
|
||||
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
|
||||
AND serviceName = 'nym-node'
|
||||
AND name IN (
|
||||
'handle_received_nym_packet',
|
||||
'mixnode.sphinx_full_unwrap',
|
||||
'mixnode.forward_packet',
|
||||
'mixnode.final_hop'
|
||||
)
|
||||
GROUP BY name
|
||||
ORDER BY total DESC
|
||||
FORMAT PrettyCompactNoEscapes
|
||||
"
|
||||
|
||||
divider
|
||||
|
||||
# 2. Latency per operation
|
||||
echo -e "\n${BOLD} [2] Processing Latency (milliseconds)${NC}\n"
|
||||
ch "
|
||||
SELECT
|
||||
name AS operation,
|
||||
round(quantile(0.50)(duration_nano / 1e6), 3) AS p50_ms,
|
||||
round(quantile(0.95)(duration_nano / 1e6), 3) AS p95_ms,
|
||||
round(quantile(0.99)(duration_nano / 1e6), 3) AS p99_ms,
|
||||
round(quantile(0.999)(duration_nano / 1e6), 3) AS p999_ms,
|
||||
round(avg(duration_nano / 1e6), 3) AS avg_ms
|
||||
FROM ${TRACES_TABLE}
|
||||
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
|
||||
AND serviceName = 'nym-node'
|
||||
AND name IN (
|
||||
'handle_received_nym_packet',
|
||||
'mixnode.sphinx_full_unwrap',
|
||||
'mixnode.forward_packet',
|
||||
'mixnode.final_hop'
|
||||
)
|
||||
AND duration_nano < 60000000000
|
||||
GROUP BY name
|
||||
ORDER BY p50_ms DESC
|
||||
FORMAT PrettyCompactNoEscapes
|
||||
"
|
||||
|
||||
divider
|
||||
|
||||
# 3. Error rate
|
||||
echo -e "\n${BOLD} [3] Error Rate${NC}\n"
|
||||
local errors
|
||||
errors=$(ch "
|
||||
SELECT
|
||||
name,
|
||||
countIf(has_error = true) AS errors,
|
||||
count(*) AS total,
|
||||
round(100.0 * countIf(has_error = true) / count(*), 3) AS error_pct
|
||||
FROM ${TRACES_TABLE}
|
||||
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
|
||||
AND serviceName = 'nym-node'
|
||||
AND name IN (
|
||||
'handle_received_nym_packet',
|
||||
'mixnode.sphinx_full_unwrap',
|
||||
'mixnode.forward_packet',
|
||||
'mixnode.final_hop'
|
||||
)
|
||||
GROUP BY name
|
||||
HAVING errors > 0
|
||||
ORDER BY errors DESC
|
||||
FORMAT PrettyCompactNoEscapes
|
||||
")
|
||||
|
||||
if [ -z "$errors" ]; then
|
||||
echo -e " ${GREEN}No errors detected across all operations${NC}"
|
||||
else
|
||||
echo "$errors"
|
||||
fi
|
||||
|
||||
divider
|
||||
|
||||
# 4. Forwarding ratio (are packets being dropped between stages?)
|
||||
echo -e "\n${BOLD} [4] Pipeline Funnel (packet drop detection)${NC}\n"
|
||||
ch "
|
||||
SELECT
|
||||
name AS stage,
|
||||
count(*) AS packets,
|
||||
round(100.0 * count(*) / max(total_ingress), 1) AS pct_of_ingress
|
||||
FROM ${TRACES_TABLE}
|
||||
CROSS JOIN (
|
||||
SELECT count(*) AS total_ingress
|
||||
FROM ${TRACES_TABLE}
|
||||
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
|
||||
AND serviceName = 'nym-node'
|
||||
AND name = 'handle_received_nym_packet'
|
||||
) AS t
|
||||
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
|
||||
AND serviceName = 'nym-node'
|
||||
AND name IN (
|
||||
'handle_received_nym_packet',
|
||||
'mixnode.sphinx_full_unwrap',
|
||||
'mixnode.forward_packet',
|
||||
'mixnode.final_hop'
|
||||
)
|
||||
GROUP BY name
|
||||
ORDER BY packets DESC
|
||||
FORMAT PrettyCompactNoEscapes
|
||||
"
|
||||
|
||||
echo ""
|
||||
echo -e " ${DIM}Expected ratios: sphinx_unwrap ~ 100%, forward ~ 75% (3 of 4 hops forward),${NC}"
|
||||
echo -e " ${DIM}final_hop ~ 25% (1 of 4 hops is the last one). Significantly lower = drops.${NC}"
|
||||
|
||||
divider
|
||||
|
||||
# 5. Throughput over time (1-minute buckets)
|
||||
echo -e "\n${BOLD} [5] Throughput Timeline (1-min buckets, ingress packets)${NC}\n"
|
||||
ch "
|
||||
SELECT
|
||||
toStartOfMinute(timestamp) AS minute,
|
||||
count(*) AS packets,
|
||||
round(count(*) / 60, 1) AS per_sec
|
||||
FROM ${TRACES_TABLE}
|
||||
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
|
||||
AND serviceName = 'nym-node'
|
||||
AND name = 'handle_received_nym_packet'
|
||||
GROUP BY minute
|
||||
ORDER BY minute
|
||||
FORMAT PrettyCompactNoEscapes
|
||||
"
|
||||
|
||||
divider
|
||||
|
||||
# 6. Latency spikes (potential TCP congestion / backpressure indicators)
|
||||
echo -e "\n${BOLD} [6] Latency Spikes (sphinx_unwrap p99 per minute)${NC}\n"
|
||||
ch "
|
||||
SELECT
|
||||
toStartOfMinute(timestamp) AS minute,
|
||||
round(quantile(0.99)(duration_nano / 1e6), 3) AS p99_ms,
|
||||
round(quantile(0.50)(duration_nano / 1e6), 3) AS p50_ms,
|
||||
round(p99_ms / greatest(p50_ms, 0.001), 1) AS spike_ratio,
|
||||
count(*) AS samples
|
||||
FROM ${TRACES_TABLE}
|
||||
WHERE timestamp >= now() - INTERVAL ${window} MINUTE
|
||||
AND serviceName = 'nym-node'
|
||||
AND name = 'mixnode.sphinx_full_unwrap'
|
||||
GROUP BY minute
|
||||
ORDER BY minute
|
||||
FORMAT PrettyCompactNoEscapes
|
||||
"
|
||||
|
||||
echo ""
|
||||
echo -e " ${DIM}spike_ratio > 10x suggests backpressure or queue buildup.${NC}"
|
||||
echo -e " ${DIM}Sustained high p99 across minutes may indicate TCP meltdown.${NC}"
|
||||
|
||||
divider
|
||||
echo ""
|
||||
echo -e " ${BLUE}SigNoz UI:${NC} http://localhost:8080"
|
||||
echo -e " ${DIM}Traces tab -> Filter: serviceName = nym-node${NC}"
|
||||
echo ""
|
||||
}
|
||||
|
||||
if [ "$LIVE" = "true" ]; then
|
||||
while true; do
|
||||
clear
|
||||
print_report "$LOOKBACK_MIN"
|
||||
echo -e " ${DIM}Refreshing in 10s... (Ctrl+C to stop)${NC}"
|
||||
sleep 10
|
||||
done
|
||||
else
|
||||
print_report "$LOOKBACK_MIN"
|
||||
fi
|
||||
+1
-1
@@ -1 +1 @@
|
||||
0.87%
|
||||
0.90%
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
33.087
|
||||
31.932
|
||||
|
||||
@@ -1 +1 @@
|
||||
Wednesday, February 11th 2026, 11:35:05 UTC
|
||||
Thursday, February 19th 2026, 13:59:34 UTC
|
||||
|
||||
@@ -11,7 +11,7 @@ options:
|
||||
--no_routing_history Display node stats without routing history
|
||||
--no_verloc_metrics Display node stats without verloc metrics
|
||||
-m, --markdown Display results in markdown format
|
||||
-o, --output [OUTPUT]
|
||||
-o [OUTPUT], --output [OUTPUT]
|
||||
Save results to file (in current dir or supply with
|
||||
path without filename)
|
||||
```
|
||||
|
||||
@@ -12,7 +12,8 @@ usage: nym-node-cli install [-h] [-V] [-d BRANCH] [-v]
|
||||
options:
|
||||
-h, --help show this help message and exit
|
||||
-V, --version show program's version number and exit
|
||||
-d, --dev BRANCH Define github branch (default: develop)
|
||||
-d BRANCH, --dev BRANCH
|
||||
Define github branch (default: develop)
|
||||
-v, --verbose Show full error tracebacks
|
||||
--mode {mixnode,entry-gateway,exit-gateway}
|
||||
Node mode: 'mixnode', 'entry-gateway', or 'exit-
|
||||
|
||||
@@ -62,8 +62,6 @@ Options:
|
||||
Tunnel port announced to external clients wishing to connect to the wireguard interface. Useful in the instances where the node is behind a proxy [env: NYMNODE_WG_ANNOUNCED_PORT=]
|
||||
--wireguard-private-network-prefix <WIREGUARD_PRIVATE_NETWORK_PREFIX>
|
||||
The prefix denoting the maximum number of the clients that can be connected via Wireguard. The maximum value for IPv4 is 32 and for IPv6 is 128 [env: NYMNODE_WG_PRIVATE_NETWORK_PREFIX=]
|
||||
--wireguard-userspace <WIREGUARD_USERSPACE>
|
||||
Use userspace implementation of WireGuard (wireguard-go) instead of kernel module. Useful in containerized environments without kernel WireGuard support [env: NYMNODE_WG_USERSPACE=] [possible values: true, false]
|
||||
--verloc-bind-address <VERLOC_BIND_ADDRESS>
|
||||
Socket address this node will use for binding its verloc API. default: `[::]:1790` [env: NYMNODE_VERLOC_BIND_ADDRESS=]
|
||||
--verloc-announce-port <VERLOC_ANNOUNCE_PORT>
|
||||
@@ -82,8 +80,6 @@ Options:
|
||||
Endpoint to query to retrieve current upgrade mode attestation. This argument should never be set outside testnets and local networks [env: NYMNODE_UPGRADE_MODE_ATTESTATION_URL=]
|
||||
--upgrade-mode-attester-public-key <UPGRADE_MODE_ATTESTER_PUBLIC_KEY>
|
||||
Expected public key of the entity signing the published attestation. This argument should never be set outside testnets and local networks [env: NYMNODE_UPGRADE_MODE_ATTESTER_PUBKEY=]
|
||||
--lp-use-mock-ecash <LP_USE_MOCK_ECASH>
|
||||
Use mock ecash manager for LP testing. WARNING: Only use this for local testing! Never enable in production. When enabled, the LP listener will accept any credential without blockchain verification [env: NYMNODE_LP_USE_MOCK_ECASH=] [possible values: true, false]
|
||||
--upstream-exit-policy-url <UPSTREAM_EXIT_POLICY_URL>
|
||||
Specifies the url for an upstream source of the exit policy used by this node [env: NYMNODE_UPSTREAM_EXIT_POLICY=]
|
||||
--open-proxy <OPEN_PROXY>
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
{
|
||||
"nym-cli": "Nym-cli",
|
||||
"diagnostic-tool": "Diagnostic Tool",
|
||||
"echo-server": "Echo Server",
|
||||
"standalone-tcpproxy": "TcpProxy Binaries (Standalone)"
|
||||
}
|
||||
|
||||
@@ -0,0 +1,134 @@
|
||||
import { Steps } from 'nextra/components';
|
||||
|
||||
# Diagnostic Tool
|
||||
|
||||
The Diagnostic Tool is a standalone binary designed to perform various network tests, including DNS, HTTP, and gateway connectivity tests. This tool helps diagnose connectivity issues and provides insights into network performance.
|
||||
|
||||
It’s also possible to run it within the daemon with the same CLI interface.
|
||||
|
||||
## Download Binary
|
||||
|
||||
To get `nym-diagnostic` follow these steps:
|
||||
<Steps>
|
||||
###### 1. Download `nym-vpn-core`
|
||||
- Navigate to [github.com/nymtech/nym-vpn-client/releases](https://github.com/nymtech/nym-vpn-client/releases)
|
||||
- Find latest `nym-vpn-core-<VERSION>`
|
||||
- Download version for your system
|
||||
|
||||
###### 2. Install or extract and make executable
|
||||
|
||||
- If you downloaded `.deb` installer, install it with this command:
|
||||
```sh
|
||||
sudo dpkg -i <FILE_NAME>
|
||||
```
|
||||
|
||||
- If you downloaded `.tar.gz`, in terminal you can extract the file with
|
||||
```sh
|
||||
tar -xvf <FILE_NAME>
|
||||
```
|
||||
|
||||
- Navigate inside the directory and make executable:
|
||||
```sh
|
||||
cd nym-vpn-core-<VERSION>
|
||||
chmod +x ./*
|
||||
```
|
||||
</ Steps>
|
||||
|
||||
## CLI Usage
|
||||
|
||||
The Diagnostic Tool can be executed from the command line interface (CLI). Below are the usage instructions and options available. Read in the chapter [*Tests Performed*](#tests-performed) about the purpose and outcome of these commands.
|
||||
|
||||
### Command Syntax
|
||||
|
||||
```sh
|
||||
./nym-diagnostic [command] [options]
|
||||
./nym-vpnc diagnostic [command] [options]
|
||||
```
|
||||
|
||||
#### `run` command arguments
|
||||
|
||||
The most useful command is `run`, here are the options for that command:
|
||||
|
||||
```sh
|
||||
-h, --help Display help information and exit.
|
||||
--skip-dns Skip the DNS tests
|
||||
--skip-http Skip the HTTP tests
|
||||
--gateway <ID_KEY> Run the gateway connectivity test on the given gateway. Skip those tests if not provided
|
||||
-v, --verbose Enable verbose output for detailed logging.
|
||||
```
|
||||
|
||||
#### `register` command arguments
|
||||
|
||||
Command `register` requires valid credential. Here are the options for that command:
|
||||
|
||||
```sh
|
||||
--gateway <ID_KEY> Register to the given gateway
|
||||
--storage-path Path to the directory containing the credentials database. If it is not valid registration will be skipped.
|
||||
--skip-wireguard Skip Wireguard tests
|
||||
```
|
||||
|
||||
### Command Examples
|
||||
|
||||
|
||||
- Run all tests on a gateway:
|
||||
```sh
|
||||
./nym-diagnostic run --gateway <ID_KEY>
|
||||
```
|
||||
|
||||
- Run the DNS tests only:
|
||||
```sh
|
||||
./nym-diagnostic run --skip-http
|
||||
```
|
||||
|
||||
- Register to a gateway:
|
||||
```sh
|
||||
sudo ./nym-diagnostic register --gateway <ID_KEY> --storage-path /var/lib/nym-vpnd/mainnet
|
||||
# sudo is required to read the database
|
||||
```
|
||||
|
||||
- You can also run DNS and HTTP tests from `nym-vpnc` (installation [here](/developers/nymvpncli)):
|
||||
```sh
|
||||
./nym-vpnc diagnostic run
|
||||
```
|
||||
|
||||
|
||||
## Tests Performed
|
||||
|
||||
The Diagnostic Tool runs the following tests:
|
||||
|
||||
|
||||
### 1. DNS Test
|
||||
|
||||
- **Purpose**: To check the resolution DNS availability.
|
||||
- **Process**: We try to resolve all the domain names present in a given nym network environment with different DNS configurations
|
||||
- **Output**: Displays the resolved IP address and the time taken for the resolution.
|
||||
|
||||
|
||||
### 2. HTTP Test
|
||||
|
||||
- **Purpose**: To verify the accessibility of the NymVPN API.
|
||||
- **Process**: The tool query the `health` endpoint as well as the `nodes/described` endpoint.
|
||||
- **Output**: Displays the response of the `health` endpoint, the time skew and the number of nodes in the network (sanity check)
|
||||
|
||||
### 3. Gateway Test
|
||||
|
||||
- **Purpose**: To check the connectivity to a given gateway.
|
||||
- **Process**: The tool fetches information about the gateway, then establishes a TCP connection, upgrades it to WS and sends a request
|
||||
- **Output**: Display the gateway reported information, the status of the connections and the WS response.
|
||||
|
||||
### 4. Registration Test
|
||||
|
||||
- **Purpose:** To check the correctness of the registration process.
|
||||
- **Process:** The tool tries to build a mixnet client to the provided gateway and then tries to register to the entry authenticator
|
||||
- **Output:** Display the status of the different steps
|
||||
- **Caveat:** This test requires a credential to be spent, which is why it is available as a separate command only
|
||||
|
||||
### 5. Wireguard Test
|
||||
|
||||
- **Purpose:** To check the soundness of a wireguard connection
|
||||
- **Process:** The tool uses the registration data from the previous step to establish a wireguard connection and ping an IP.
|
||||
- **Output:** Display the ping RTTs and any error that might have happened
|
||||
|
||||
## Reports
|
||||
|
||||
Reports are logged in a JSON format and also returned by the commands for a future use
|
||||
@@ -62,12 +62,13 @@ There are multiple ways to monitor performance of nodes and the machines on whic
|
||||
|
||||
### Guides to Setup Own Metrics
|
||||
|
||||
A list of different scripts, templates and guides for easier navigation:
|
||||
A list of different tools, templates and guides for easier navigation:
|
||||
|
||||
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx): a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
|
||||
|
||||
* [Diagnostic Tool](/developers/tools/diagnostic-tool): diagnose connectivity issues and provides insights into network performance
|
||||
|
||||
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx) - a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
|
||||
* [Prometheus and Grafana](performance-and-testing/prometheus-grafana.mdx) self-hosted setup
|
||||
* [Nym-node CPU cron service](https://gist.github.com/tommyv1987/97e939a7adf491333d686a8eaa68d4bd) - an easy bash script by Nym core developer [@tommy1987](https://gist.github.com/tommyv1987), designed to monitor a CPU usage of your node, running locally
|
||||
* Nym's script [`prom_targets.py`](https://github.com/nymtech/nym/blob/develop/scripts/prom_targets.py) - a useful python program to request data from API and can be run on its own or plugged to more sophisticated flows
|
||||
|
||||
### Collecting Testing Metrics
|
||||
|
||||
|
||||
@@ -202,3 +202,13 @@ PING_RETRIES=10 PING_TIMEOUT=5 CONCURRENCY=16 ./test-nodes-pings.sh
|
||||
You can look up the IPs from `ping_not_working.csv`, using some online database, like [ipinfo.io](https://ipinfo.io).
|
||||
|
||||
Feel invited to share the outcome with Nym team, mentors and the rest of the operators in our [Matrix Node Operators channel](https://matrix.to/#/#operators:nymtech.chat).
|
||||
|
||||
## Guides to Setup Own Metrics
|
||||
|
||||
A list of different tools, templates and guides for easier navigation:
|
||||
|
||||
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx): a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
|
||||
|
||||
* [Diagnostic Tool](/developers/tools/diagnostic-tool): diagnose connectivity issues and provides insights into network performance
|
||||
|
||||
* [Prometheus and Grafana](performance-and-testing/prometheus-grafana.mdx) self-hosted setup
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-data-observatory"
|
||||
version = "1.0.1"
|
||||
version = "1.0.4"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::{Result, anyhow};
|
||||
use sqlx::{Postgres, migrate::Migrator, postgres::PgConnectOptions};
|
||||
use std::env;
|
||||
use std::str::FromStr;
|
||||
use tracing::info;
|
||||
|
||||
@@ -19,7 +20,15 @@ pub(crate) struct Storage {
|
||||
|
||||
impl Storage {
|
||||
pub async fn init(connection_url: String) -> Result<Self> {
|
||||
let connect_options = PgConnectOptions::from_str(&connection_url)?;
|
||||
let mut connect_options = PgConnectOptions::from_str(&connection_url)?;
|
||||
|
||||
let ssl_cert_path = env::var("PG_CERT").ok();
|
||||
|
||||
if let Some(ssl_cert) = ssl_cert_path {
|
||||
connect_options = connect_options
|
||||
.ssl_mode(sqlx::postgres::PgSslMode::Require)
|
||||
.ssl_root_cert(ssl_cert);
|
||||
}
|
||||
|
||||
let pool = DbPool::connect_with(connect_options)
|
||||
.await
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::db::{
|
||||
queries::price::insert_nym_prices,
|
||||
};
|
||||
use core::str;
|
||||
use std::env;
|
||||
use tokio::time::Duration;
|
||||
|
||||
use crate::db::DbPool;
|
||||
@@ -29,10 +30,24 @@ impl PriceScraper {
|
||||
async fn get_coingecko_prices(&self) -> anyhow::Result<CoingeckoPriceResponse> {
|
||||
tracing::info!("💰 Fetching CoinGecko prices from {COINGECKO_API_URL}");
|
||||
|
||||
let response = reqwest::get(COINGECKO_API_URL)
|
||||
.await?
|
||||
.json::<CoingeckoPriceResponse>()
|
||||
.await;
|
||||
let mut url = COINGECKO_API_URL.to_string();
|
||||
|
||||
let coin_gecko_api_key = env::var("COINGECKO_API_KEY").ok();
|
||||
|
||||
if let Some(api_key) = coin_gecko_api_key {
|
||||
url = format!("{url}&x_cg_demo_api_key={api_key}")
|
||||
}
|
||||
|
||||
let response = reqwest::get(url).await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
tracing::error!(
|
||||
"CoinGecko price query returned error: {}",
|
||||
response.status()
|
||||
);
|
||||
}
|
||||
|
||||
let response = response.json::<CoingeckoPriceResponse>().await;
|
||||
|
||||
tracing::info!("Got response {:?}", response);
|
||||
match response {
|
||||
|
||||
@@ -40,6 +40,8 @@ thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-indicatif = { workspace = true }
|
||||
tracing-subscriber.workspace = true
|
||||
opentelemetry = { workspace = true, features = ["trace"], optional = true }
|
||||
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
|
||||
tokio = { workspace = true, features = ["macros", "sync", "rt-multi-thread"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
tokio-stream = { workspace = true }
|
||||
@@ -135,6 +137,7 @@ rand_chacha = { workspace = true }
|
||||
|
||||
[features]
|
||||
tokio-console = ["console-subscriber", "nym-task/tokio-tracing"]
|
||||
otel = ["nym-bin-common/otel-otlp", "dep:opentelemetry", "dep:opentelemetry_sdk"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
+102
-28
@@ -8,7 +8,6 @@ use crate::cli::commands::{
|
||||
use crate::env::vars::{NYMNODE_CONFIG_ENV_FILE_ARG, NYMNODE_NO_BANNER_ARG};
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use nym_bin_common::bin_info;
|
||||
use std::future::Future;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
pub(crate) mod commands;
|
||||
@@ -22,6 +21,43 @@ fn pretty_build_info_static() -> &'static str {
|
||||
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
|
||||
}
|
||||
|
||||
/// OpenTelemetry-related CLI arguments. Only present when built with the `otel` feature.
|
||||
#[cfg(feature = "otel")]
|
||||
#[derive(Args, Debug, Clone)]
|
||||
pub(crate) struct OtelArgs {
|
||||
/// Enable OpenTelemetry tracing export via OTLP/gRPC.
|
||||
#[clap(long, env = "NYMNODE_OTEL_ENABLE")]
|
||||
pub(crate) otel: bool,
|
||||
|
||||
/// OpenTelemetry OTLP collector endpoint (gRPC).
|
||||
/// Only used when --otel is enabled.
|
||||
/// For SigNoz Cloud use https://ingest.<region>.signoz.cloud:443
|
||||
#[clap(
|
||||
long,
|
||||
env = "NYMNODE_OTEL_ENDPOINT",
|
||||
default_value = "http://localhost:4317"
|
||||
)]
|
||||
pub(crate) otel_endpoint: String,
|
||||
|
||||
/// SigNoz Cloud ingestion key for authenticated OTLP export.
|
||||
/// Only needed for SigNoz Cloud (not self-hosted).
|
||||
#[clap(long, env = "NYMNODE_OTEL_KEY")]
|
||||
pub(crate) otel_key: Option<String>,
|
||||
|
||||
/// Deployment environment label attached to all exported traces.
|
||||
/// Used to distinguish sandbox / mainnet / canary in the OTel backend.
|
||||
#[clap(long, env = "NYMNODE_OTEL_ENV", default_value = "mainnet")]
|
||||
pub(crate) otel_env: String,
|
||||
|
||||
/// Trace sampling ratio (0.0 to 1.0). e.g. 0.1 = 10%% of traces exported. Reduces cost.
|
||||
#[clap(long, env = "NYMNODE_OTEL_SAMPLE_RATIO", default_value = "0.1")]
|
||||
pub(crate) otel_sample_ratio: f64,
|
||||
|
||||
/// Timeout in seconds for each OTLP export batch. Prevents unbounded blocking.
|
||||
#[clap(long, env = "NYMNODE_OTEL_EXPORT_TIMEOUT", default_value = "10")]
|
||||
pub(crate) otel_export_timeout: u64,
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Cli {
|
||||
@@ -40,44 +76,82 @@ pub(crate) struct Cli {
|
||||
)]
|
||||
pub(crate) no_banner: bool,
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
#[clap(flatten)]
|
||||
pub(crate) otel: OtelArgs,
|
||||
|
||||
#[clap(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
fn execute_async<F: Future>(fut: F) -> anyhow::Result<F::Output> {
|
||||
Ok(tokio::runtime::Builder::new_multi_thread()
|
||||
pub(crate) fn execute(self) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()?
|
||||
.block_on(fut))
|
||||
.build()?;
|
||||
|
||||
// Set up tracing inside the runtime so the OTel batch exporter (when enabled)
|
||||
// can spawn its background tasks on the tokio reactor.
|
||||
let use_otel = matches!(self.command, Commands::Run(..));
|
||||
let _otel_guard = runtime.block_on(async { self.setup_logging(use_otel) })?;
|
||||
|
||||
// `_otel_guard` is dropped at function exit, flushing pending spans via its Drop impl
|
||||
runtime.block_on(async {
|
||||
match self.command {
|
||||
Commands::BuildInfo(args) => build_info::execute(args)?,
|
||||
Commands::BondingInformation(args) => bonding_information::execute(args).await?,
|
||||
Commands::NodeDetails(args) => node_details::execute(args).await?,
|
||||
Commands::Run(args) => run::execute(*args).await?,
|
||||
Commands::Migrate(args) => migrate::execute(*args)?,
|
||||
Commands::Sign(args) => sign::execute(args).await?,
|
||||
Commands::TestThroughput(args) => test_throughput::execute(args)?,
|
||||
Commands::UnsafeResetSphinxKeys(args) => reset_sphinx_keys::execute(args).await?,
|
||||
Commands::Debug(debug) => match debug.command {
|
||||
DebugCommands::ResetProvidersGatewayDbs(args) => {
|
||||
debug::reset_providers_dbs::execute(args).await?
|
||||
}
|
||||
},
|
||||
}
|
||||
Ok::<(), anyhow::Error>(())
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn execute(self) -> anyhow::Result<()> {
|
||||
// NOTE: `test_throughput` sets up its own logger as it has to include additional layers
|
||||
if !matches!(self.command, Commands::TestThroughput(..)) {
|
||||
crate::logging::setup_tracing_logger()?;
|
||||
#[cfg(feature = "otel")]
|
||||
fn build_otel_config(&self) -> Option<crate::logging::OtelConfig> {
|
||||
if self.otel.otel {
|
||||
Some(crate::logging::OtelConfig {
|
||||
endpoint: self.otel.otel_endpoint.clone(),
|
||||
service_name: "nym-node".to_string(),
|
||||
ingestion_key: self.otel.otel_key.clone(),
|
||||
environment: self.otel.otel_env.clone(),
|
||||
sample_ratio: self.otel.otel_sample_ratio,
|
||||
export_timeout_secs: self.otel.otel_export_timeout,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
match self.command {
|
||||
Commands::BuildInfo(args) => build_info::execute(args)?,
|
||||
Commands::BondingInformation(args) => {
|
||||
{ Self::execute_async(bonding_information::execute(args))? }?
|
||||
}
|
||||
Commands::NodeDetails(args) => { Self::execute_async(node_details::execute(args))? }?,
|
||||
Commands::Run(args) => { Self::execute_async(run::execute(*args))? }?,
|
||||
Commands::Migrate(args) => migrate::execute(*args)?,
|
||||
Commands::Sign(args) => { Self::execute_async(sign::execute(args))? }?,
|
||||
Commands::TestThroughput(args) => test_throughput::execute(args)?,
|
||||
Commands::UnsafeResetSphinxKeys(args) => {
|
||||
{ Self::execute_async(reset_sphinx_keys::execute(args))? }?
|
||||
}
|
||||
Commands::Debug(debug) => match debug.command {
|
||||
DebugCommands::ResetProvidersGatewayDbs(args) => {
|
||||
{ Self::execute_async(debug::reset_providers_dbs::execute(args))? }?
|
||||
}
|
||||
},
|
||||
#[cfg(feature = "otel")]
|
||||
fn setup_logging(&self, use_otel: bool) -> anyhow::Result<Option<crate::logging::OtelGuard>> {
|
||||
if matches!(self.command, Commands::TestThroughput(..)) {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(())
|
||||
let otel_config = if use_otel {
|
||||
self.build_otel_config()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
crate::logging::setup_tracing_logger(otel_config)
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "otel"))]
|
||||
fn setup_logging(&self, _use_otel: bool) -> anyhow::Result<Option<()>> {
|
||||
if matches!(self.command, Commands::TestThroughput(..)) {
|
||||
return Ok(None);
|
||||
}
|
||||
crate::logging::setup_tracing_logger()?;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+111
-1
@@ -7,6 +7,42 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
use tracing_subscriber::{EnvFilter, Layer};
|
||||
|
||||
/// Configuration for OpenTelemetry OTLP export.
|
||||
#[cfg(feature = "otel")]
|
||||
pub(crate) struct OtelConfig {
|
||||
/// OTLP/gRPC collector endpoint, e.g. `http://localhost:4317`
|
||||
/// or `https://ingest.eu.signoz.cloud:443` for SigNoz Cloud.
|
||||
pub endpoint: String,
|
||||
/// Service name reported to the collector (appears in SigNoz "Services" view).
|
||||
pub service_name: String,
|
||||
/// Optional SigNoz Cloud ingestion key for authenticated export.
|
||||
/// Sent as the `signoz-ingestion-key` gRPC metadata header.
|
||||
pub ingestion_key: Option<String>,
|
||||
/// Deployment environment label, e.g. `mainnet`, `sandbox`, `canary`.
|
||||
/// Attached as the `deployment.environment` OTel resource attribute.
|
||||
pub environment: String,
|
||||
/// Trace sampling ratio in 0.0..=1.0 (e.g. 0.1 = 10% of traces). Used to limit cost.
|
||||
pub sample_ratio: f64,
|
||||
/// Timeout in seconds for each OTLP export batch. Prevents unbounded blocking.
|
||||
pub export_timeout_secs: u64,
|
||||
}
|
||||
|
||||
/// Handle returned when OTel is active. Flushes pending spans on drop
|
||||
/// to prevent telemetry loss during panics or early exits.
|
||||
#[cfg(feature = "otel")]
|
||||
pub(crate) struct OtelGuard {
|
||||
pub provider: opentelemetry_sdk::trace::SdkTracerProvider,
|
||||
}
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
impl Drop for OtelGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.provider.shutdown() {
|
||||
eprintln!("OpenTelemetry shutdown error in Drop: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
|
||||
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
|
||||
directive.into().parse().map_err(From::from)
|
||||
@@ -22,12 +58,86 @@ pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
|
||||
Ok(filter)
|
||||
}
|
||||
|
||||
/// Initialise the tracing subscriber stack.
|
||||
///
|
||||
/// When the `otel` feature is enabled **and** an `OtelConfig` is supplied, an
|
||||
/// OTLP exporter layer is added and the returned `OtelGuard` must be used to
|
||||
/// flush pending spans on shutdown.
|
||||
#[cfg(feature = "otel")]
|
||||
pub(crate) fn setup_tracing_logger(otel: Option<OtelConfig>) -> anyhow::Result<Option<OtelGuard>> {
|
||||
let stderr_layer =
|
||||
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
|
||||
|
||||
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
|
||||
let console_layer = console_subscriber::spawn();
|
||||
|
||||
if let Some(otel_config) = otel {
|
||||
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
|
||||
&otel_config.service_name,
|
||||
&otel_config.endpoint,
|
||||
otel_config.ingestion_key.as_deref(),
|
||||
&otel_config.environment,
|
||||
otel_config.sample_ratio,
|
||||
otel_config.export_timeout_secs,
|
||||
).map_err(|e| anyhow::anyhow!(
|
||||
"failed to initialise OpenTelemetry exporter (endpoint: {}, service: {}): {e}",
|
||||
otel_config.endpoint,
|
||||
otel_config.service_name,
|
||||
))?;
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(console_layer)
|
||||
.with(stderr_layer)
|
||||
.with(otel_layer)
|
||||
.init();
|
||||
|
||||
Ok(Some(OtelGuard { provider }))
|
||||
} else {
|
||||
tracing_subscriber::registry()
|
||||
.with(console_layer)
|
||||
.with(stderr_layer)
|
||||
.init();
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
if let Some(otel_config) = otel {
|
||||
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
|
||||
&otel_config.service_name,
|
||||
&otel_config.endpoint,
|
||||
otel_config.ingestion_key.as_deref(),
|
||||
&otel_config.environment,
|
||||
otel_config.sample_ratio,
|
||||
otel_config.export_timeout_secs,
|
||||
).map_err(|e| anyhow::anyhow!(
|
||||
"failed to initialise OpenTelemetry exporter (endpoint: {}, service: {}): {e}",
|
||||
otel_config.endpoint,
|
||||
otel_config.service_name,
|
||||
))?;
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(stderr_layer)
|
||||
.with(otel_layer)
|
||||
.init();
|
||||
|
||||
Ok(Some(OtelGuard { provider }))
|
||||
} else {
|
||||
tracing_subscriber::registry()
|
||||
.with(stderr_layer)
|
||||
.init();
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}}
|
||||
}
|
||||
|
||||
/// Non-OTel variant -- identical subscriber stack without the OTLP layer.
|
||||
#[cfg(not(feature = "otel"))]
|
||||
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
|
||||
let stderr_layer =
|
||||
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
|
||||
|
||||
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
|
||||
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
|
||||
let console_layer = console_subscriber::spawn();
|
||||
|
||||
tracing_subscriber::registry()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node::key_rotation::active_keys::SphinxKeyGuard;
|
||||
use crate::node::mixnet::shared::SharedData;
|
||||
use futures::StreamExt;
|
||||
use nym_noise::connection::Connection;
|
||||
@@ -20,7 +21,10 @@ use std::net::SocketAddr;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::codec::Framed;
|
||||
use tracing::{debug, error, instrument, trace, warn};
|
||||
use tracing::{Span, debug, error, instrument, trace, warn};
|
||||
|
||||
/// How often (in packets) the stream-level span updates its packet count.
|
||||
const SPAN_UPDATE_INTERVAL: u64 = 10_000;
|
||||
|
||||
struct PendingReplayCheckPackets {
|
||||
// map of rotation id used for packet creation to the packets
|
||||
@@ -51,6 +55,10 @@ impl PendingReplayCheckPackets {
|
||||
.push(packet.packet)
|
||||
}
|
||||
|
||||
fn total_count(&self) -> usize {
|
||||
self.packets.values().map(|v| v.len()).sum()
|
||||
}
|
||||
|
||||
fn replay_tags(&self) -> HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>> {
|
||||
let mut replay_tags = HashMap::with_capacity(self.packets.len());
|
||||
'outer: for (rotation_id, packets) in &self.packets {
|
||||
@@ -130,20 +138,54 @@ impl ConnectionHandler {
|
||||
Some(now + delay)
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.forward_packet",
|
||||
skip(self, mix_packet, delay),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote_addr = %self.remote_address,
|
||||
delay_ms = tracing::field::Empty,
|
||||
)
|
||||
)]
|
||||
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
|
||||
if !self.shared.processing_config.forward_hop_processing_enabled {
|
||||
trace!("this nym-node does not support forward hop packets");
|
||||
warn!(
|
||||
event = "packet.dropped.forward_disabled",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: forward hop processing disabled"
|
||||
);
|
||||
self.shared.dropped_forward_packet(self.remote_address.ip());
|
||||
return;
|
||||
}
|
||||
|
||||
let forward_instant = self.create_delay_target(now, delay);
|
||||
if let Some(target) = forward_instant {
|
||||
Span::current().record(
|
||||
"delay_ms",
|
||||
target.saturating_duration_since(now).as_millis() as u64,
|
||||
);
|
||||
}
|
||||
self.shared.forward_mix_packet(mix_packet, forward_instant);
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.final_hop",
|
||||
skip(self, final_hop_data),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote_addr = %self.remote_address,
|
||||
client_online,
|
||||
disk_fallback = false,
|
||||
ack_forwarded = false,
|
||||
)
|
||||
)]
|
||||
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop) {
|
||||
if !self.shared.processing_config.final_hop_processing_enabled {
|
||||
trace!("this nym-node does not support final hop packets");
|
||||
warn!(
|
||||
event = "packet.dropped.final_hop_disabled",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: final hop processing disabled"
|
||||
);
|
||||
self.shared
|
||||
.dropped_final_hop_packet(self.remote_address.ip());
|
||||
return;
|
||||
@@ -151,11 +193,13 @@ impl ConnectionHandler {
|
||||
|
||||
let client = final_hop_data.destination;
|
||||
let message = final_hop_data.message;
|
||||
let has_ack = final_hop_data.forward_ack.is_some();
|
||||
|
||||
// if possible attempt to push message directly to the client
|
||||
match self.shared.try_push_message_to_client(client, message) {
|
||||
Err(unsent_plaintext) => {
|
||||
// if that failed, store it on disk (to be 🔥 soon...)
|
||||
// if that failed, store it on disk
|
||||
Span::current().record("client_online", false);
|
||||
match self
|
||||
.shared
|
||||
.store_processed_packet_payload(client, unsent_plaintext)
|
||||
@@ -163,6 +207,7 @@ impl ConnectionHandler {
|
||||
{
|
||||
Err(err) => error!("Failed to store client data - {err}"),
|
||||
Ok(_) => {
|
||||
Span::current().record("disk_fallback", true);
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
@@ -172,13 +217,18 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => trace!("Pushed received packet to {client}"),
|
||||
Ok(_) => {
|
||||
Span::current().record("client_online", true);
|
||||
trace!("Pushed received packet to {client}");
|
||||
}
|
||||
}
|
||||
|
||||
// if we managed to either push message directly to the [online] client or store it at
|
||||
// its inbox, it means that it must exist at this gateway, hence we can send the
|
||||
// received ack back into the network
|
||||
// disk, forward the ack
|
||||
self.shared.forward_ack_packet(final_hop_data.forward_ack);
|
||||
if has_ack {
|
||||
Span::current().record("ack_forwarded", true);
|
||||
}
|
||||
}
|
||||
|
||||
fn within_deferral_threshold(&self, now: Instant) -> bool {
|
||||
@@ -206,32 +256,86 @@ impl ConnectionHandler {
|
||||
|
||||
if !time_threshold {
|
||||
warn!(
|
||||
"{}: time failure - {}",
|
||||
event = "replay_detection.deferral_exceeded",
|
||||
threshold_type = "time",
|
||||
deferred_count = self.pending_packets.total_count(),
|
||||
deferral_ms = now.saturating_duration_since(self.pending_packets.last_acquired_mutex).as_millis() as u64,
|
||||
remote_addr = %self.remote_address,
|
||||
"{}: time deferral threshold exceeded with {} pending packets",
|
||||
self.remote_address,
|
||||
self.pending_packets.packets.len()
|
||||
self.pending_packets.total_count()
|
||||
)
|
||||
}
|
||||
|
||||
if !count_threshold {
|
||||
warn!("{}, count failure", self.remote_address)
|
||||
warn!(
|
||||
event = "replay_detection.deferral_exceeded",
|
||||
threshold_type = "count",
|
||||
deferred_count = self.pending_packets.total_count(),
|
||||
remote_addr = %self.remote_address,
|
||||
"{}: count deferral threshold exceeded",
|
||||
self.remote_address
|
||||
)
|
||||
}
|
||||
|
||||
time_threshold && count_threshold
|
||||
}
|
||||
|
||||
/// Resolve the sphinx key for the given rotation, recording the rotation
|
||||
/// label on the current tracing span. Returns `ExpiredKey` if the requested
|
||||
/// odd/even key has already been rotated out.
|
||||
fn resolve_rotation_key(
|
||||
&self,
|
||||
rotation: SphinxKeyRotation,
|
||||
) -> Result<SphinxKeyGuard, PacketProcessingError> {
|
||||
let rotation_label = match rotation {
|
||||
SphinxKeyRotation::Unknown => "unknown",
|
||||
SphinxKeyRotation::OddRotation => "odd",
|
||||
SphinxKeyRotation::EvenRotation => "even",
|
||||
};
|
||||
Span::current().record("key_rotation", rotation_label);
|
||||
|
||||
match rotation {
|
||||
SphinxKeyRotation::Unknown => Ok(self.shared.sphinx_keys.primary()),
|
||||
SphinxKeyRotation::OddRotation => self.shared.sphinx_keys.odd().ok_or_else(|| {
|
||||
warn!(
|
||||
event = "packet.dropped.expired_key",
|
||||
key_rotation = "odd",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: odd key rotation expired"
|
||||
);
|
||||
PacketProcessingError::ExpiredKey
|
||||
}),
|
||||
SphinxKeyRotation::EvenRotation => self.shared.sphinx_keys.even().ok_or_else(|| {
|
||||
warn!(
|
||||
event = "packet.dropped.expired_key",
|
||||
key_rotation = "even",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: even key rotation expired"
|
||||
);
|
||||
PacketProcessingError::ExpiredKey
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.sphinx_partial_unwrap",
|
||||
skip(self, packet),
|
||||
level = "debug",
|
||||
fields(key_rotation, unwrap_result,)
|
||||
)]
|
||||
fn try_partially_unwrap_packet(
|
||||
&self,
|
||||
packet: FramedNymPacket,
|
||||
) -> Result<PartialyUnwrappedPacketWithKeyRotation, PacketProcessingError> {
|
||||
// based on the received sphinx key rotation information,
|
||||
// attempt to choose appropriate key for processing the packet
|
||||
match packet.header().key_rotation {
|
||||
let rotation = packet.header().key_rotation;
|
||||
|
||||
let result = match rotation {
|
||||
SphinxKeyRotation::Unknown => {
|
||||
let primary = self.shared.sphinx_keys.primary();
|
||||
// Unknown rotation: try primary, fallback to secondary
|
||||
let primary = self.resolve_rotation_key(rotation)?;
|
||||
let primary_rotation = primary.rotation_id();
|
||||
|
||||
// we have to try both keys, start with the primary as it has higher likelihood of being correct
|
||||
// if let Ok(partially_unwrapped) = PartiallyUnwrappedPacket::new()
|
||||
match PartiallyUnwrappedPacket::new(packet, primary.inner().as_ref()) {
|
||||
Ok(unwrapped_packet) => {
|
||||
Ok(unwrapped_packet.with_key_rotation(primary_rotation))
|
||||
@@ -248,25 +352,17 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
SphinxKeyRotation::OddRotation => {
|
||||
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
|
||||
return Err(PacketProcessingError::ExpiredKey);
|
||||
};
|
||||
let odd_rotation = odd_key.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet, odd_key.inner().as_ref())
|
||||
_ => {
|
||||
let key = self.resolve_rotation_key(rotation)?;
|
||||
let rotation_id = key.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet, key.inner().as_ref())
|
||||
.map_err(|(_, err)| err)
|
||||
.map(|p| p.with_key_rotation(odd_rotation))
|
||||
.map(|p| p.with_key_rotation(rotation_id))
|
||||
}
|
||||
SphinxKeyRotation::EvenRotation => {
|
||||
let Some(even_key) = self.shared.sphinx_keys.even() else {
|
||||
return Err(PacketProcessingError::ExpiredKey);
|
||||
};
|
||||
let even_rotation = even_key.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet, even_key.inner().as_ref())
|
||||
.map_err(|(_, err)| err)
|
||||
.map(|p| p.with_key_rotation(even_rotation))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Span::current().record("unwrap_result", if result.is_ok() { "ok" } else { "err" });
|
||||
result
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_replay_detection(
|
||||
@@ -280,6 +376,12 @@ impl ConnectionHandler {
|
||||
Ok(unwrapped) => unwrapped,
|
||||
Err(err) => {
|
||||
trace!("failed to process received mix packet: {err}");
|
||||
warn!(
|
||||
event = "packet.dropped.malformed",
|
||||
error = %err,
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping malformed packet"
|
||||
);
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
@@ -316,7 +418,9 @@ impl ConnectionHandler {
|
||||
|
||||
// 3. forward the packet to the relevant sink (if enabled)
|
||||
match unwrapped_packet {
|
||||
Err(err) => trace!("failed to process received mix packet: {err}"),
|
||||
Err(err) => {
|
||||
trace!("failed to process received mix packet: {err}");
|
||||
}
|
||||
Ok(processed_packet) => match processed_packet.processing_data {
|
||||
MixProcessingResultData::ForwardHop { packet, delay } => {
|
||||
self.handle_forward_packet(now, packet, delay);
|
||||
@@ -334,6 +438,7 @@ impl ConnectionHandler {
|
||||
packets: HashMap<u32, Vec<PartiallyUnwrappedPacket>>,
|
||||
replay_check_results: HashMap<u32, Vec<bool>>,
|
||||
) {
|
||||
let mut replays_detected: u64 = 0;
|
||||
for (rotation_id, packets) in packets {
|
||||
let Some(replay_checks) = replay_check_results.get(&rotation_id) else {
|
||||
// this should never happen, but if we messed up, and it does, don't panic, just drop the packets
|
||||
@@ -342,6 +447,13 @@ impl ConnectionHandler {
|
||||
};
|
||||
for (packet, &replayed) in packets.into_iter().zip(replay_checks) {
|
||||
let unwrapped_packet = if replayed {
|
||||
replays_detected += 1;
|
||||
warn!(
|
||||
event = "packet.dropped.replay",
|
||||
remote_addr = %self.remote_address,
|
||||
rotation_id,
|
||||
"dropping replayed packet"
|
||||
);
|
||||
Err(PacketProcessingError::PacketReplay)
|
||||
} else {
|
||||
packet.finalise_unwrapping()
|
||||
@@ -350,6 +462,13 @@ impl ConnectionHandler {
|
||||
self.handle_unwrapped_packet(now, unwrapped_packet).await;
|
||||
}
|
||||
}
|
||||
if replays_detected > 0 {
|
||||
debug!(
|
||||
replays_detected,
|
||||
remote_addr = %self.remote_address,
|
||||
"replay detection batch completed with replays"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_pending_packets_batch_no_locking(&mut self, now: Instant) -> bool {
|
||||
@@ -379,13 +498,22 @@ impl ConnectionHandler {
|
||||
true
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.replay_check_batch",
|
||||
skip(self),
|
||||
level = "debug",
|
||||
fields(batch_size, mutex_wait_ms,)
|
||||
)]
|
||||
async fn handle_pending_packets_batch(&mut self, now: Instant) {
|
||||
let batch = self.pending_packets.reset(now);
|
||||
let replay_tags = self.pending_packets.replay_tags();
|
||||
if replay_tags.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let batch_size = self.pending_packets.total_count();
|
||||
Span::current().record("batch_size", batch_size as u64);
|
||||
|
||||
let mutex_start = Instant::now();
|
||||
let Ok(replay_check_results) = self
|
||||
.shared
|
||||
.replay_protection_filter
|
||||
@@ -396,37 +524,25 @@ impl ConnectionHandler {
|
||||
self.shared.shutdown_token.cancel();
|
||||
return;
|
||||
};
|
||||
Span::current().record("mutex_wait_ms", mutex_start.elapsed().as_millis() as u64);
|
||||
|
||||
let batch = self.pending_packets.reset(now);
|
||||
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.sphinx_full_unwrap",
|
||||
skip(self, packet),
|
||||
level = "debug",
|
||||
fields(key_rotation)
|
||||
)]
|
||||
fn try_full_unwrap_packet(
|
||||
&self,
|
||||
packet: FramedNymPacket,
|
||||
) -> Result<MixProcessingResult, PacketProcessingError> {
|
||||
// based on the received sphinx key rotation information,
|
||||
// attempt to choose appropriate key for processing the packet
|
||||
// NOTE: due to the function signatures, outfox packets will **only** attempt primary key
|
||||
// if no rotation information is available (but that's fine given outfox is not really in use,
|
||||
// and by the time we need it, the rotation info should be present)
|
||||
match packet.header().key_rotation {
|
||||
SphinxKeyRotation::Unknown => {
|
||||
process_framed_packet(packet, self.shared.sphinx_keys.primary().inner().as_ref())
|
||||
}
|
||||
SphinxKeyRotation::OddRotation => {
|
||||
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
|
||||
return Err(PacketProcessingError::ExpiredKey);
|
||||
};
|
||||
process_framed_packet(packet, odd_key.inner().as_ref())
|
||||
}
|
||||
SphinxKeyRotation::EvenRotation => {
|
||||
let Some(even_key) = self.shared.sphinx_keys.even() else {
|
||||
return Err(PacketProcessingError::ExpiredKey);
|
||||
};
|
||||
process_framed_packet(packet, even_key.inner().as_ref())
|
||||
}
|
||||
}
|
||||
let key = self.resolve_rotation_key(packet.header().key_rotation)?;
|
||||
process_framed_packet(packet, key.inner().as_ref())
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_no_replay_detection(
|
||||
@@ -456,23 +572,36 @@ impl ConnectionHandler {
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
skip(self),
|
||||
name = "mixnode.connection",
|
||||
skip(self, socket),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote = %self.remote_address
|
||||
remote = %self.remote_address,
|
||||
noise_handshake_ms = tracing::field::Empty,
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn handle_connection(&mut self, socket: TcpStream) {
|
||||
let handshake_start = Instant::now();
|
||||
let noise_stream = match upgrade_noise_responder(socket, &self.shared.noise_config).await {
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to perform Noise handshake with {:?} - {err}",
|
||||
self.remote_address
|
||||
Span::current().record(
|
||||
"noise_handshake_ms",
|
||||
handshake_start.elapsed().as_millis() as u64,
|
||||
);
|
||||
warn!(
|
||||
event = "connection.failed.noise",
|
||||
remote_addr = %self.remote_address,
|
||||
error = %err,
|
||||
"Noise responder handshake failed"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
Span::current().record(
|
||||
"noise_handshake_ms",
|
||||
handshake_start.elapsed().as_millis() as u64,
|
||||
);
|
||||
debug!(
|
||||
"Noise responder handshake completed for {:?}",
|
||||
self.remote_address
|
||||
@@ -481,26 +610,58 @@ impl ConnectionHandler {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.stream",
|
||||
skip(self, mixnet_connection),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote = %self.remote_address,
|
||||
packets_processed = 0u64,
|
||||
exit_reason,
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn handle_stream(
|
||||
&mut self,
|
||||
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
|
||||
) {
|
||||
let mut packets_processed: u64 = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shared.shutdown_token.cancelled() => {
|
||||
trace!("connection handler: received shutdown");
|
||||
Span::current().record("exit_reason", "shutdown");
|
||||
break
|
||||
}
|
||||
maybe_framed_nym_packet = mixnet_connection.next() => {
|
||||
match maybe_framed_nym_packet {
|
||||
Some(Ok(packet)) => self.handle_received_nym_packet(packet).await,
|
||||
Some(Ok(packet)) => {
|
||||
self.handle_received_nym_packet(packet).await;
|
||||
packets_processed += 1;
|
||||
if packets_processed.is_multiple_of(SPAN_UPDATE_INTERVAL) {
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
debug!("connection got corrupted with: {err}");
|
||||
warn!(
|
||||
event = "connection.corrupted",
|
||||
remote_addr = %self.remote_address,
|
||||
error = %err,
|
||||
packets_processed,
|
||||
"connection stream corrupted"
|
||||
);
|
||||
Span::current().record("exit_reason", "corrupted");
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
return
|
||||
}
|
||||
None => {
|
||||
debug!("connection got closed by the remote");
|
||||
debug!(
|
||||
remote_addr = %self.remote_address,
|
||||
packets_processed,
|
||||
"connection closed by remote"
|
||||
);
|
||||
Span::current().record("exit_reason", "closed_by_remote");
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -508,6 +669,7 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
debug!("exiting and closing connection");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,11 +56,17 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
if let Err(err) = self.mixnet_client.send_without_response(packet) {
|
||||
if err.kind() == io::ErrorKind::WouldBlock {
|
||||
// we only know for sure if we dropped a packet if our sending queue was full
|
||||
// in any other case the connection might still be re-established (or created for the first time)
|
||||
// and the packet might get sent, but we won't know about it
|
||||
warn!(
|
||||
event = "packet.dropped.buffer_full",
|
||||
next_hop = %next_hop,
|
||||
"dropping packet: egress connection buffer full (WouldBlock)"
|
||||
);
|
||||
self.metrics.mixnet.egress_dropped_forward_packet(next_hop)
|
||||
} else if err.kind() == io::ErrorKind::NotConnected {
|
||||
// let's give the benefit of the doubt and assume we manage to establish connection
|
||||
debug!(
|
||||
next_hop = %next_hop,
|
||||
"packet queued for not-yet-connected peer"
|
||||
);
|
||||
self.metrics.mixnet.egress_sent_forward_packet(next_hop)
|
||||
}
|
||||
} else {
|
||||
@@ -86,7 +92,11 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
let next_hop = new_packet.packet.next_hop();
|
||||
|
||||
if !self.routing_filter.should_route(next_hop.as_ref().ip()) {
|
||||
debug!("dropping packet as the egress address does not belong to any known node");
|
||||
warn!(
|
||||
event = "packet.dropped.routing_filter",
|
||||
next_hop = %next_hop,
|
||||
"dropping packet: egress address does not belong to any known node"
|
||||
);
|
||||
self.metrics
|
||||
.mixnet
|
||||
.egress_dropped_forward_packet(next_hop.into());
|
||||
@@ -125,7 +135,7 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
C: SendWithoutResponse,
|
||||
F: RoutingFilter,
|
||||
{
|
||||
let mut processed = 0;
|
||||
let mut processed: u64 = 0;
|
||||
trace!("starting PacketForwarder");
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -145,11 +155,29 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
#[allow(clippy::unwrap_used)]
|
||||
self.handle_new_packet(new_packet.unwrap());
|
||||
let channel_len = self.packet_sender.len();
|
||||
if processed % 1000 == 0 {
|
||||
let delay_queue_len = self.delay_queue.len();
|
||||
if processed.is_multiple_of(1000) {
|
||||
match channel_len {
|
||||
n if n > 1000 => error!("there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"),
|
||||
n if n > 500 => warn!("there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"),
|
||||
n => trace!("there are currently {n} mix packets waiting to get forwarded"),
|
||||
n if n > 1000 => error!(
|
||||
event = "forwarder.queue_overload",
|
||||
channel_depth = n,
|
||||
delay_queue_depth = delay_queue_len,
|
||||
packets_processed = processed,
|
||||
"there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"
|
||||
),
|
||||
n if n > 500 => warn!(
|
||||
event = "forwarder.queue_high",
|
||||
channel_depth = n,
|
||||
delay_queue_depth = delay_queue_len,
|
||||
packets_processed = processed,
|
||||
"there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"
|
||||
),
|
||||
n => trace!(
|
||||
channel_depth = n,
|
||||
delay_queue_depth = delay_queue_len,
|
||||
packets_processed = processed,
|
||||
"forwarder queue status"
|
||||
),
|
||||
}
|
||||
}
|
||||
self.update_channel_size_metric(channel_len);
|
||||
|
||||
@@ -5,7 +5,8 @@ use nym_gateway::node::{
|
||||
ActiveClientsStore, GatewayStorage, GatewayStorageError, InboxGatewayStorage,
|
||||
};
|
||||
use nym_sphinx_types::DestinationAddressBytes;
|
||||
use tracing::debug;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SharedFinalHopData {
|
||||
@@ -27,14 +28,37 @@ impl SharedFinalHopData {
|
||||
message: Vec<u8>,
|
||||
) -> Result<(), Vec<u8>> {
|
||||
match self.active_clients.get_sender(client_address) {
|
||||
None => Err(message),
|
||||
None => {
|
||||
debug!(
|
||||
event = "gateway.push_to_client",
|
||||
client_found = false,
|
||||
send_result = "client_not_found",
|
||||
"client {client_address} not found in active clients"
|
||||
);
|
||||
Err(message)
|
||||
}
|
||||
Some(sender_channel) => {
|
||||
let send_start = Instant::now();
|
||||
if let Err(unsent) = sender_channel.unbounded_send(vec![message]) {
|
||||
warn!(
|
||||
event = "gateway.push_to_client",
|
||||
client_found = true,
|
||||
send_result = "channel_closed",
|
||||
send_us = send_start.elapsed().as_micros() as u64,
|
||||
"client {client_address} channel closed, message not delivered"
|
||||
);
|
||||
// the unwrap here is fine as the original message got returned;
|
||||
// plus we're only ever sending 1 message at the time (for now)
|
||||
#[allow(clippy::unwrap_used)]
|
||||
Err(unsent.into_inner().pop().unwrap())
|
||||
} else {
|
||||
debug!(
|
||||
event = "gateway.push_to_client",
|
||||
client_found = true,
|
||||
send_result = "ok",
|
||||
send_us = send_start.elapsed().as_micros() as u64,
|
||||
"pushed message to client {client_address}"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -46,8 +70,21 @@ impl SharedFinalHopData {
|
||||
client_address: DestinationAddressBytes,
|
||||
message: Vec<u8>,
|
||||
) -> Result<(), GatewayStorageError> {
|
||||
let start = Instant::now();
|
||||
debug!("Storing received message for {client_address} on the disk...",);
|
||||
|
||||
self.storage.store_message(client_address, message).await
|
||||
let result = self.storage.store_message(client_address, message).await;
|
||||
let store_us = start.elapsed().as_micros() as u64;
|
||||
if result.is_ok() {
|
||||
debug!(
|
||||
event = "gateway.disk_store",
|
||||
store_us, "stored message for {client_address} on disk in {store_us}us"
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
event = "gateway.disk_store_failed",
|
||||
store_us, "failed to store message for {client_address} on disk after {store_us}us"
|
||||
);
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,6 +185,7 @@ impl SharedData {
|
||||
}
|
||||
|
||||
pub(super) fn forward_mix_packet(&self, packet: MixPacket, delay_until: Option<Instant>) {
|
||||
let has_delay = delay_until.is_some();
|
||||
if self
|
||||
.mixnet_forwarder
|
||||
.forward_packet(PacketToForward::new(packet, delay_until))
|
||||
@@ -192,6 +193,8 @@ impl SharedData {
|
||||
&& !self.shutdown_token.is_cancelled()
|
||||
{
|
||||
error!(
|
||||
event = "forwarder.channel_send_failed",
|
||||
has_delay,
|
||||
"failed to forward sphinx packet on the channel while the process is not going through the shutdown!"
|
||||
);
|
||||
self.shutdown_token.cancel();
|
||||
|
||||
Reference in New Issue
Block a user