Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6131082cee | |||
| 8bd6a1006b | |||
| 1678fb6b94 | |||
| 94a3599b4d | |||
| a6bc54461a | |||
| 4f0c40dab7 | |||
| 3eff6e5e3b | |||
| a519f4ccb8 | |||
| a3ba3bfc5a | |||
| 988df7cff7 | |||
| 260f8e9714 | |||
| d28d0ac39e | |||
| dce4d6b34b | |||
| bc47e9a1b2 | |||
| 3b693741b2 | |||
| cb277fe487 | |||
| 8bb29f4d07 | |||
| e753f24ed1 | |||
| c7cd962627 | |||
| 00467e4440 | |||
| f3d1000472 | |||
| 597aae1a20 | |||
| 40a3cd28b7 |
@@ -3,4 +3,5 @@
|
||||
.gitignore
|
||||
**/node_modules
|
||||
**/target
|
||||
target-otel
|
||||
dist
|
||||
|
||||
@@ -76,3 +76,4 @@ CLAUDE.md
|
||||
.claude/settings.json
|
||||
|
||||
/notes
|
||||
/target-otel
|
||||
|
||||
Generated
+1627
-1626
File diff suppressed because it is too large
Load Diff
+5
-4
@@ -309,8 +309,10 @@ nix = "0.30.1"
|
||||
notify = "5.1.0"
|
||||
num_enum = "0.7.5"
|
||||
once_cell = "1.21.3"
|
||||
opentelemetry = "0.19.0"
|
||||
opentelemetry-jaeger = "0.18.0"
|
||||
opentelemetry = "0.31.0"
|
||||
opentelemetry_sdk = "0.31.0"
|
||||
opentelemetry-otlp = "0.31.0"
|
||||
tonic = "0.14.4"
|
||||
parking_lot = "0.12.3"
|
||||
pem = "0.8"
|
||||
petgraph = "0.6.5"
|
||||
@@ -368,9 +370,8 @@ tower = "0.5.2"
|
||||
tower-http = "0.6.6"
|
||||
tracing = "0.1.41"
|
||||
tracing-log = "0.2"
|
||||
tracing-opentelemetry = "0.19.0"
|
||||
tracing-opentelemetry = "0.32.1"
|
||||
tracing-subscriber = "0.3.20"
|
||||
tracing-tree = "0.2.2"
|
||||
tracing-indicatif = "0.3.9"
|
||||
tracing-test = "0.2.5"
|
||||
ts-rs = "10.1.0"
|
||||
|
||||
@@ -19,12 +19,15 @@ serde_json = { workspace = true, optional = true }
|
||||
|
||||
## tracing
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
|
||||
tracing-tree = { workspace = true, optional = true }
|
||||
tracing = { workspace = true, optional = true }
|
||||
opentelemetry-jaeger = { workspace = true, features = ["rt-tokio", "collector_client", "isahc_collector_client"], optional = true }
|
||||
tracing-opentelemetry = { workspace = true, optional = true }
|
||||
utoipa = { workspace = true, optional = true }
|
||||
opentelemetry = { workspace = true, features = ["rt-tokio"], optional = true }
|
||||
opentelemetry = { workspace = true, features = ["trace"], optional = true }
|
||||
|
||||
## otel-otlp (modern OTLP export to SigNoz/any OTLP collector)
|
||||
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
|
||||
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "trace", "tls-roots"], optional = true }
|
||||
tonic = { workspace = true, optional = true }
|
||||
|
||||
|
||||
[build-dependencies]
|
||||
@@ -35,13 +38,14 @@ default = []
|
||||
openapi = ["utoipa"]
|
||||
output_format = ["serde_json", "dep:clap"]
|
||||
bin_info_schema = ["schemars"]
|
||||
basic_tracing = ["dep:tracing", "tracing-subscriber"]
|
||||
tracing = [
|
||||
basic_tracing = ["dep:tracing", "dep:tracing-subscriber"]
|
||||
otel-otlp = [
|
||||
"basic_tracing",
|
||||
"tracing-tree",
|
||||
"opentelemetry-jaeger",
|
||||
"tracing-opentelemetry",
|
||||
"opentelemetry",
|
||||
"dep:opentelemetry",
|
||||
"dep:opentelemetry_sdk",
|
||||
"dep:opentelemetry-otlp",
|
||||
"dep:tracing-opentelemetry",
|
||||
"dep:tonic",
|
||||
]
|
||||
clap = ["dep:clap", "dep:clap_complete", "dep:clap_complete_fig"]
|
||||
models = []
|
||||
|
||||
@@ -4,16 +4,9 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::IsTerminal;
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use opentelemetry;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use opentelemetry_jaeger;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use tracing_opentelemetry;
|
||||
// Re-export tracing_subscriber for consumers that need to compose layers
|
||||
#[cfg(feature = "basic_tracing")]
|
||||
pub use tracing_subscriber;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use tracing_tree;
|
||||
|
||||
#[derive(Debug, Default, Copy, Clone, Deserialize, PartialEq, Eq, Serialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
@@ -69,40 +62,106 @@ pub fn setup_tracing_logger() {
|
||||
build_tracing_logger().init()
|
||||
}
|
||||
|
||||
// TODO: This has to be a macro, running it as a function does not work for the file_appender for some reason
|
||||
#[cfg(feature = "tracing")]
|
||||
#[macro_export]
|
||||
macro_rules! setup_tracing {
|
||||
($service_name: expr) => {
|
||||
use nym_bin_common::logging::tracing_subscriber::layer::SubscriberExt;
|
||||
use nym_bin_common::logging::tracing_subscriber::util::SubscriberInitExt;
|
||||
/// Initialize an OpenTelemetry tracing layer that exports spans via OTLP/gRPC.
|
||||
///
|
||||
/// This produces a layer compatible with `tracing_subscriber::registry()` that
|
||||
/// sends traces to any OTLP-compatible collector (SigNoz, Grafana Tempo, etc).
|
||||
///
|
||||
/// Returns both the tracing layer and the [`SdkTracerProvider`] so the caller
|
||||
/// can invoke [`SdkTracerProvider::shutdown`] for graceful flush on exit.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `service_name` - The service name reported to the collector (e.g. "nym-node")
|
||||
/// * `endpoint` - The OTLP/gRPC collector endpoint (e.g. "http://localhost:4317"
|
||||
/// or "https://ingest.eu.signoz.cloud:443" for SigNoz Cloud)
|
||||
/// * `ingestion_key` - Optional SigNoz Cloud ingestion key. When provided, it is
|
||||
/// sent as the `signoz-ingestion-key` gRPC metadata header on every export.
|
||||
/// * `environment` - Deployment environment label (e.g. "sandbox", "mainnet", "canary").
|
||||
/// Attached as the `deployment.environment` OTel resource attribute.
|
||||
/// * `sample_ratio` - Trace sampling ratio in 0.0..=1.0 (e.g. 0.1 = 10% of traces).
|
||||
/// Used to limit cost when exporting from many nodes; clamped to [0.0, 1.0].
|
||||
/// * `export_timeout_secs` - Timeout in seconds for each OTLP export batch. Prevents
|
||||
/// unbounded blocking if the collector is slow or unreachable.
|
||||
#[cfg(feature = "otel-otlp")]
|
||||
pub fn init_otel_layer<S>(
|
||||
service_name: &str,
|
||||
endpoint: &str,
|
||||
ingestion_key: Option<&str>,
|
||||
environment: &str,
|
||||
sample_ratio: f64,
|
||||
export_timeout_secs: u64,
|
||||
) -> Result<
|
||||
(
|
||||
tracing_opentelemetry::OpenTelemetryLayer<S, opentelemetry_sdk::trace::SdkTracer>,
|
||||
opentelemetry_sdk::trace::SdkTracerProvider,
|
||||
),
|
||||
Box<dyn std::error::Error + Send + Sync>,
|
||||
>
|
||||
where
|
||||
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
|
||||
{
|
||||
use opentelemetry::trace::TracerProvider as _;
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_otlp::WithTonicConfig;
|
||||
use opentelemetry_sdk::trace::Sampler;
|
||||
use std::time::Duration;
|
||||
|
||||
let registry = nym_bin_common::logging::tracing_subscriber::Registry::default()
|
||||
.with(nym_bin_common::logging::tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with(
|
||||
nym_bin_common::logging::tracing_tree::HierarchicalLayer::new(4)
|
||||
.with_targets(true)
|
||||
.with_bracketed_fields(true),
|
||||
);
|
||||
// Validate endpoint URI early to fail with a clear message
|
||||
if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") {
|
||||
return Err(format!(
|
||||
"invalid OTLP endpoint URI: {endpoint} (must start with http:// or https://)"
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let tracer = nym_bin_common::logging::opentelemetry_jaeger::new_collector_pipeline()
|
||||
.with_endpoint("http://44.199.230.10:14268/api/traces")
|
||||
.with_service_name($service_name)
|
||||
.with_isahc()
|
||||
.with_trace_config(
|
||||
nym_bin_common::logging::opentelemetry::sdk::trace::config().with_sampler(
|
||||
nym_bin_common::logging::opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(
|
||||
0.1,
|
||||
),
|
||||
),
|
||||
)
|
||||
.install_batch(nym_bin_common::logging::opentelemetry::runtime::Tokio)
|
||||
.expect("Could not init tracer");
|
||||
let sample_ratio_clamped = sample_ratio.clamp(0.0, 1.0);
|
||||
|
||||
let telemetry = nym_bin_common::logging::tracing_opentelemetry::layer().with_tracer(tracer);
|
||||
let mut builder = opentelemetry_otlp::SpanExporter::builder()
|
||||
.with_tonic()
|
||||
.with_endpoint(endpoint)
|
||||
.with_timeout(Duration::from_secs(export_timeout_secs));
|
||||
|
||||
registry.with(telemetry).init();
|
||||
};
|
||||
// Explicitly configure TLS when the endpoint uses HTTPS
|
||||
if endpoint.starts_with("https://") {
|
||||
builder =
|
||||
builder.with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
|
||||
}
|
||||
|
||||
if let Some(key) = ingestion_key {
|
||||
let mut metadata = tonic::metadata::MetadataMap::new();
|
||||
metadata.insert(
|
||||
"signoz-ingestion-key",
|
||||
key.parse()
|
||||
.map_err(|_| "invalid ingestion key format (value redacted)")?,
|
||||
);
|
||||
builder = builder.with_metadata(metadata);
|
||||
}
|
||||
|
||||
let exporter = builder
|
||||
.build()
|
||||
.map_err(|e| format!("failed to build OTLP exporter for endpoint {endpoint}: {e}"))?;
|
||||
|
||||
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
|
||||
.with_sampler(Sampler::TraceIdRatioBased(sample_ratio_clamped))
|
||||
.with_batch_exporter(exporter)
|
||||
.with_resource(
|
||||
opentelemetry_sdk::Resource::builder()
|
||||
.with_service_name(service_name.to_owned())
|
||||
.with_attribute(opentelemetry::KeyValue::new(
|
||||
"deployment.environment",
|
||||
environment.to_owned(),
|
||||
))
|
||||
.build(),
|
||||
)
|
||||
.build();
|
||||
|
||||
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
|
||||
let tracer = tracer_provider.tracer(service_name.to_owned());
|
||||
|
||||
Ok((
|
||||
tracing_opentelemetry::layer().with_tracer(tracer),
|
||||
tracer_provider,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn banner(crate_name: &str, crate_version: &str) -> String {
|
||||
|
||||
@@ -128,54 +128,95 @@ impl ManagedConnection {
|
||||
|
||||
async fn run(self) {
|
||||
let address = self.address;
|
||||
let reconnection_attempt = self.current_reconnection.load(Ordering::Acquire);
|
||||
let connect_start = tokio::time::Instant::now();
|
||||
let connection_fut = TcpStream::connect(address);
|
||||
|
||||
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
|
||||
Ok(stream_res) => match stream_res {
|
||||
Ok(stream) => {
|
||||
debug!("Managed to establish connection to {}", self.address);
|
||||
let connect_ms = connect_start.elapsed().as_millis() as u64;
|
||||
debug!(
|
||||
peer = %address,
|
||||
connect_ms,
|
||||
"Managed to establish connection to {}", self.address
|
||||
);
|
||||
|
||||
let noise_start = tokio::time::Instant::now();
|
||||
let noise_stream =
|
||||
match upgrade_noise_initiator(stream, &self.noise_config).await {
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
error!("Failed to perform Noise handshake with {address} - {err}");
|
||||
// we failed to finish the noise handshake - increase reconnection attempt
|
||||
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
|
||||
warn!(
|
||||
event = "connection.failed.noise",
|
||||
peer = %address,
|
||||
error = %err,
|
||||
connect_ms,
|
||||
noise_handshake_ms,
|
||||
reconnection_attempt,
|
||||
exit_reason = "noise_error",
|
||||
"Failed to perform Noise initiator handshake with {address}"
|
||||
);
|
||||
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
|
||||
return;
|
||||
}
|
||||
};
|
||||
// if we managed to connect AND do the noise handshake, reset the reconnection count (whatever it might have been)
|
||||
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
|
||||
self.current_reconnection.store(0, Ordering::Release);
|
||||
debug!("Noise initiator handshake completed for {:?}", address);
|
||||
debug!(
|
||||
peer = %address,
|
||||
connect_ms,
|
||||
noise_handshake_ms,
|
||||
"Noise initiator handshake completed for {:?}", address
|
||||
);
|
||||
Framed::new(noise_stream, NymCodec)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!("failed to establish connection to {address} (err: {err})",);
|
||||
let connect_ms = connect_start.elapsed().as_millis() as u64;
|
||||
warn!(
|
||||
event = "connection.failed.connect",
|
||||
peer = %address,
|
||||
error = %err,
|
||||
connect_ms,
|
||||
reconnection_attempt,
|
||||
exit_reason = "connect_error",
|
||||
"failed to establish connection to {address}"
|
||||
);
|
||||
return;
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
debug!(
|
||||
let connect_ms = connect_start.elapsed().as_millis() as u64;
|
||||
warn!(
|
||||
event = "connection.failed.timeout",
|
||||
peer = %address,
|
||||
timeout_ms = self.connection_timeout.as_millis() as u64,
|
||||
connect_ms,
|
||||
reconnection_attempt,
|
||||
exit_reason = "timeout",
|
||||
"failed to connect to {address} within {:?}",
|
||||
self.connection_timeout
|
||||
);
|
||||
|
||||
// we failed to connect - increase reconnection attempt
|
||||
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Take whatever the receiver channel produces and put it on the connection.
|
||||
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
|
||||
// about neither receiver nor the connection, it doesn't matter which one gets consumed
|
||||
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
|
||||
warn!("Failed to forward packets to {address}: {err}");
|
||||
warn!(
|
||||
event = "connection.forward_error",
|
||||
peer = %address,
|
||||
error = %err,
|
||||
exit_reason = "forward_error",
|
||||
"Failed to forward packets to {address}: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
|
||||
peer = %address,
|
||||
exit_reason = "sender_dropped",
|
||||
"connection manager to {address} finished"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -272,16 +313,18 @@ impl SendWithoutResponse for Client {
|
||||
trace!("Sending packet to {address}");
|
||||
|
||||
// TODO: optimisation for the future: rather than constantly using legacy encoding,
|
||||
// once we're addressing by node_id (and thus have full node info here),
|
||||
// we could simply infer supported encoding based on their version
|
||||
// use the mix packet type / flags to pick encoding per packet
|
||||
let framed_packet =
|
||||
FramedNymPacket::from_mix_packet(packet, self.config.use_legacy_packet_encoding);
|
||||
|
||||
let Some(sender) = self.active_connections.get_mut(&address) else {
|
||||
// there was never a connection to begin with
|
||||
debug!("establishing initial connection to {address}");
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
|
||||
// for sending for as soon as the connection is created
|
||||
debug!(
|
||||
event = "mixclient.try_send",
|
||||
peer = %address,
|
||||
result = "not_connected",
|
||||
"establishing initial connection to {address}"
|
||||
);
|
||||
self.make_connection(address, framed_packet);
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
@@ -289,15 +332,24 @@ impl SendWithoutResponse for Client {
|
||||
));
|
||||
};
|
||||
|
||||
let channel_capacity = sender.channel.max_capacity();
|
||||
let channel_available = sender.channel.capacity();
|
||||
let channel_used = channel_capacity - channel_available;
|
||||
|
||||
let sending_res = sender.channel.try_send(framed_packet);
|
||||
drop(sender);
|
||||
|
||||
sending_res.map_err(|err| {
|
||||
match err {
|
||||
TrySendError::Full(_) => {
|
||||
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
warn!(
|
||||
event = "mixclient.try_send",
|
||||
peer = %address,
|
||||
result = "full_dropped",
|
||||
channel_capacity,
|
||||
channel_used,
|
||||
"dropping packet: connection buffer to {address} is full ({channel_used}/{channel_capacity})"
|
||||
);
|
||||
io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
@@ -305,11 +357,13 @@ impl SendWithoutResponse for Client {
|
||||
}
|
||||
TrySendError::Closed(dropped) => {
|
||||
debug!(
|
||||
"Connection to {address} seems to be dead. attempting to re-establish it...",
|
||||
event = "mixclient.try_send",
|
||||
peer = %address,
|
||||
result = "closed_reconnecting",
|
||||
channel_capacity,
|
||||
channel_used,
|
||||
"connection to {address} dead, attempting re-establishment"
|
||||
);
|
||||
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, dropped);
|
||||
io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
0.87%
|
||||
0.90%
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
33.087
|
||||
31.932
|
||||
|
||||
@@ -1 +1 @@
|
||||
Wednesday, February 11th 2026, 11:35:05 UTC
|
||||
Thursday, February 19th 2026, 13:59:34 UTC
|
||||
|
||||
@@ -11,7 +11,7 @@ options:
|
||||
--no_routing_history Display node stats without routing history
|
||||
--no_verloc_metrics Display node stats without verloc metrics
|
||||
-m, --markdown Display results in markdown format
|
||||
-o, --output [OUTPUT]
|
||||
-o [OUTPUT], --output [OUTPUT]
|
||||
Save results to file (in current dir or supply with
|
||||
path without filename)
|
||||
```
|
||||
|
||||
@@ -12,7 +12,8 @@ usage: nym-node-cli install [-h] [-V] [-d BRANCH] [-v]
|
||||
options:
|
||||
-h, --help show this help message and exit
|
||||
-V, --version show program's version number and exit
|
||||
-d, --dev BRANCH Define github branch (default: develop)
|
||||
-d BRANCH, --dev BRANCH
|
||||
Define github branch (default: develop)
|
||||
-v, --verbose Show full error tracebacks
|
||||
--mode {mixnode,entry-gateway,exit-gateway}
|
||||
Node mode: 'mixnode', 'entry-gateway', or 'exit-
|
||||
|
||||
@@ -62,8 +62,6 @@ Options:
|
||||
Tunnel port announced to external clients wishing to connect to the wireguard interface. Useful in the instances where the node is behind a proxy [env: NYMNODE_WG_ANNOUNCED_PORT=]
|
||||
--wireguard-private-network-prefix <WIREGUARD_PRIVATE_NETWORK_PREFIX>
|
||||
The prefix denoting the maximum number of the clients that can be connected via Wireguard. The maximum value for IPv4 is 32 and for IPv6 is 128 [env: NYMNODE_WG_PRIVATE_NETWORK_PREFIX=]
|
||||
--wireguard-userspace <WIREGUARD_USERSPACE>
|
||||
Use userspace implementation of WireGuard (wireguard-go) instead of kernel module. Useful in containerized environments without kernel WireGuard support [env: NYMNODE_WG_USERSPACE=] [possible values: true, false]
|
||||
--verloc-bind-address <VERLOC_BIND_ADDRESS>
|
||||
Socket address this node will use for binding its verloc API. default: `[::]:1790` [env: NYMNODE_VERLOC_BIND_ADDRESS=]
|
||||
--verloc-announce-port <VERLOC_ANNOUNCE_PORT>
|
||||
@@ -82,8 +80,6 @@ Options:
|
||||
Endpoint to query to retrieve current upgrade mode attestation. This argument should never be set outside testnets and local networks [env: NYMNODE_UPGRADE_MODE_ATTESTATION_URL=]
|
||||
--upgrade-mode-attester-public-key <UPGRADE_MODE_ATTESTER_PUBLIC_KEY>
|
||||
Expected public key of the entity signing the published attestation. This argument should never be set outside testnets and local networks [env: NYMNODE_UPGRADE_MODE_ATTESTER_PUBKEY=]
|
||||
--lp-use-mock-ecash <LP_USE_MOCK_ECASH>
|
||||
Use mock ecash manager for LP testing. WARNING: Only use this for local testing! Never enable in production. When enabled, the LP listener will accept any credential without blockchain verification [env: NYMNODE_LP_USE_MOCK_ECASH=] [possible values: true, false]
|
||||
--upstream-exit-policy-url <UPSTREAM_EXIT_POLICY_URL>
|
||||
Specifies the url for an upstream source of the exit policy used by this node [env: NYMNODE_UPSTREAM_EXIT_POLICY=]
|
||||
--open-proxy <OPEN_PROXY>
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
{
|
||||
"nym-cli": "Nym-cli",
|
||||
"diagnostic-tool": "Diagnostic Tool",
|
||||
"echo-server": "Echo Server",
|
||||
"standalone-tcpproxy": "TcpProxy Binaries (Standalone)"
|
||||
}
|
||||
|
||||
@@ -0,0 +1,134 @@
|
||||
import { Steps } from 'nextra/components';
|
||||
|
||||
# Diagnostic Tool
|
||||
|
||||
The Diagnostic Tool is a standalone binary designed to perform various network tests, including DNS, HTTP, and gateway connectivity tests. This tool helps diagnose connectivity issues and provides insights into network performance.
|
||||
|
||||
It’s also possible to run it within the daemon with the same CLI interface.
|
||||
|
||||
## Download Binary
|
||||
|
||||
To get `nym-diagnostic` follow these steps:
|
||||
<Steps>
|
||||
###### 1. Download `nym-vpn-core`
|
||||
- Navigate to [github.com/nymtech/nym-vpn-client/releases](https://github.com/nymtech/nym-vpn-client/releases)
|
||||
- Find latest `nym-vpn-core-<VERSION>`
|
||||
- Download version for your system
|
||||
|
||||
###### 2. Install or extract and make executable
|
||||
|
||||
- If you downloaded `.deb` installer, install it with this command:
|
||||
```sh
|
||||
sudo dpkg -i <FILE_NAME>
|
||||
```
|
||||
|
||||
- If you downloaded `.tar.gz`, in terminal you can extract the file with
|
||||
```sh
|
||||
tar -xvf <FILE_NAME>
|
||||
```
|
||||
|
||||
- Navigate inside the directory and make executable:
|
||||
```sh
|
||||
cd nym-vpn-core-<VERSION>
|
||||
chmod +x ./*
|
||||
```
|
||||
</ Steps>
|
||||
|
||||
## CLI Usage
|
||||
|
||||
The Diagnostic Tool can be executed from the command line interface (CLI). Below are the usage instructions and options available. Read in the chapter [*Tests Performed*](#tests-performed) about the purpose and outcome of these commands.
|
||||
|
||||
### Command Syntax
|
||||
|
||||
```sh
|
||||
./nym-diagnostic [command] [options]
|
||||
./nym-vpnc diagnostic [command] [options]
|
||||
```
|
||||
|
||||
#### `run` command arguments
|
||||
|
||||
The most useful command is `run`, here are the options for that command:
|
||||
|
||||
```sh
|
||||
-h, --help Display help information and exit.
|
||||
--skip-dns Skip the DNS tests
|
||||
--skip-http Skip the HTTP tests
|
||||
--gateway <ID_KEY> Run the gateway connectivity test on the given gateway. Skip those tests if not provided
|
||||
-v, --verbose Enable verbose output for detailed logging.
|
||||
```
|
||||
|
||||
#### `register` command arguments
|
||||
|
||||
Command `register` requires valid credential. Here are the options for that command:
|
||||
|
||||
```sh
|
||||
--gateway <ID_KEY> Register to the given gateway
|
||||
--storage-path Path to the directory containing the credentials database. If it is not valid registration will be skipped.
|
||||
--skip-wireguard Skip Wireguard tests
|
||||
```
|
||||
|
||||
### Command Examples
|
||||
|
||||
|
||||
- Run all tests on a gateway:
|
||||
```sh
|
||||
./nym-diagnostic run --gateway <ID_KEY>
|
||||
```
|
||||
|
||||
- Run the DNS tests only:
|
||||
```sh
|
||||
./nym-diagnostic run --skip-http
|
||||
```
|
||||
|
||||
- Register to a gateway:
|
||||
```sh
|
||||
sudo ./nym-diagnostic register --gateway <ID_KEY> --storage-path /var/lib/nym-vpnd/mainnet
|
||||
# sudo is required to read the database
|
||||
```
|
||||
|
||||
- You can also run DNS and HTTP tests from `nym-vpnc` (installation [here](/developers/nymvpncli)):
|
||||
```sh
|
||||
./nym-vpnc diagnostic run
|
||||
```
|
||||
|
||||
|
||||
## Tests Performed
|
||||
|
||||
The Diagnostic Tool runs the following tests:
|
||||
|
||||
|
||||
### 1. DNS Test
|
||||
|
||||
- **Purpose**: To check the resolution DNS availability.
|
||||
- **Process**: We try to resolve all the domain names present in a given nym network environment with different DNS configurations
|
||||
- **Output**: Displays the resolved IP address and the time taken for the resolution.
|
||||
|
||||
|
||||
### 2. HTTP Test
|
||||
|
||||
- **Purpose**: To verify the accessibility of the NymVPN API.
|
||||
- **Process**: The tool query the `health` endpoint as well as the `nodes/described` endpoint.
|
||||
- **Output**: Displays the response of the `health` endpoint, the time skew and the number of nodes in the network (sanity check)
|
||||
|
||||
### 3. Gateway Test
|
||||
|
||||
- **Purpose**: To check the connectivity to a given gateway.
|
||||
- **Process**: The tool fetches information about the gateway, then establishes a TCP connection, upgrades it to WS and sends a request
|
||||
- **Output**: Display the gateway reported information, the status of the connections and the WS response.
|
||||
|
||||
### 4. Registration Test
|
||||
|
||||
- **Purpose:** To check the correctness of the registration process.
|
||||
- **Process:** The tool tries to build a mixnet client to the provided gateway and then tries to register to the entry authenticator
|
||||
- **Output:** Display the status of the different steps
|
||||
- **Caveat:** This test requires a credential to be spent, which is why it is available as a separate command only
|
||||
|
||||
### 5. Wireguard Test
|
||||
|
||||
- **Purpose:** To check the soundness of a wireguard connection
|
||||
- **Process:** The tool uses the registration data from the previous step to establish a wireguard connection and ping an IP.
|
||||
- **Output:** Display the ping RTTs and any error that might have happened
|
||||
|
||||
## Reports
|
||||
|
||||
Reports are logged in a JSON format and also returned by the commands for a future use
|
||||
@@ -62,12 +62,13 @@ There are multiple ways to monitor performance of nodes and the machines on whic
|
||||
|
||||
### Guides to Setup Own Metrics
|
||||
|
||||
A list of different scripts, templates and guides for easier navigation:
|
||||
A list of different tools, templates and guides for easier navigation:
|
||||
|
||||
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx): a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
|
||||
|
||||
* [Diagnostic Tool](/developers/tools/diagnostic-tool): diagnose connectivity issues and provides insights into network performance
|
||||
|
||||
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx) - a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
|
||||
* [Prometheus and Grafana](performance-and-testing/prometheus-grafana.mdx) self-hosted setup
|
||||
* [Nym-node CPU cron service](https://gist.github.com/tommyv1987/97e939a7adf491333d686a8eaa68d4bd) - an easy bash script by Nym core developer [@tommy1987](https://gist.github.com/tommyv1987), designed to monitor a CPU usage of your node, running locally
|
||||
* Nym's script [`prom_targets.py`](https://github.com/nymtech/nym/blob/develop/scripts/prom_targets.py) - a useful python program to request data from API and can be run on its own or plugged to more sophisticated flows
|
||||
|
||||
### Collecting Testing Metrics
|
||||
|
||||
|
||||
@@ -202,3 +202,13 @@ PING_RETRIES=10 PING_TIMEOUT=5 CONCURRENCY=16 ./test-nodes-pings.sh
|
||||
You can look up the IPs from `ping_not_working.csv`, using some online database, like [ipinfo.io](https://ipinfo.io).
|
||||
|
||||
Feel invited to share the outcome with Nym team, mentors and the rest of the operators in our [Matrix Node Operators channel](https://matrix.to/#/#operators:nymtech.chat).
|
||||
|
||||
## Guides to Setup Own Metrics
|
||||
|
||||
A list of different tools, templates and guides for easier navigation:
|
||||
|
||||
* [`nym-gateway-probe`](performance-and-testing/gateway-probe.mdx): a useful tool used under the hood of [Node Status Observatory](https://harbourmaster.nymtech.net)
|
||||
|
||||
* [Diagnostic Tool](/developers/tools/diagnostic-tool): diagnose connectivity issues and provides insights into network performance
|
||||
|
||||
* [Prometheus and Grafana](performance-and-testing/prometheus-grafana.mdx) self-hosted setup
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-data-observatory"
|
||||
version = "1.0.1"
|
||||
version = "1.0.4"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::{Result, anyhow};
|
||||
use sqlx::{Postgres, migrate::Migrator, postgres::PgConnectOptions};
|
||||
use std::env;
|
||||
use std::str::FromStr;
|
||||
use tracing::info;
|
||||
|
||||
@@ -19,7 +20,15 @@ pub(crate) struct Storage {
|
||||
|
||||
impl Storage {
|
||||
pub async fn init(connection_url: String) -> Result<Self> {
|
||||
let connect_options = PgConnectOptions::from_str(&connection_url)?;
|
||||
let mut connect_options = PgConnectOptions::from_str(&connection_url)?;
|
||||
|
||||
let ssl_cert_path = env::var("PG_CERT").ok();
|
||||
|
||||
if let Some(ssl_cert) = ssl_cert_path {
|
||||
connect_options = connect_options
|
||||
.ssl_mode(sqlx::postgres::PgSslMode::Require)
|
||||
.ssl_root_cert(ssl_cert);
|
||||
}
|
||||
|
||||
let pool = DbPool::connect_with(connect_options)
|
||||
.await
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::db::{
|
||||
queries::price::insert_nym_prices,
|
||||
};
|
||||
use core::str;
|
||||
use std::env;
|
||||
use tokio::time::Duration;
|
||||
|
||||
use crate::db::DbPool;
|
||||
@@ -29,10 +30,24 @@ impl PriceScraper {
|
||||
async fn get_coingecko_prices(&self) -> anyhow::Result<CoingeckoPriceResponse> {
|
||||
tracing::info!("💰 Fetching CoinGecko prices from {COINGECKO_API_URL}");
|
||||
|
||||
let response = reqwest::get(COINGECKO_API_URL)
|
||||
.await?
|
||||
.json::<CoingeckoPriceResponse>()
|
||||
.await;
|
||||
let mut url = COINGECKO_API_URL.to_string();
|
||||
|
||||
let coin_gecko_api_key = env::var("COINGECKO_API_KEY").ok();
|
||||
|
||||
if let Some(api_key) = coin_gecko_api_key {
|
||||
url = format!("{url}&x_cg_demo_api_key={api_key}")
|
||||
}
|
||||
|
||||
let response = reqwest::get(url).await?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
tracing::error!(
|
||||
"CoinGecko price query returned error: {}",
|
||||
response.status()
|
||||
);
|
||||
}
|
||||
|
||||
let response = response.json::<CoingeckoPriceResponse>().await;
|
||||
|
||||
tracing::info!("Got response {:?}", response);
|
||||
match response {
|
||||
|
||||
@@ -40,6 +40,8 @@ thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-indicatif = { workspace = true }
|
||||
tracing-subscriber.workspace = true
|
||||
opentelemetry = { workspace = true, features = ["trace"], optional = true }
|
||||
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
|
||||
tokio = { workspace = true, features = ["macros", "sync", "rt-multi-thread"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
tokio-stream = { workspace = true }
|
||||
@@ -135,6 +137,7 @@ rand_chacha = { workspace = true }
|
||||
|
||||
[features]
|
||||
tokio-console = ["console-subscriber", "nym-task/tokio-tracing"]
|
||||
otel = ["nym-bin-common/otel-otlp", "dep:opentelemetry", "dep:opentelemetry_sdk"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
+102
-28
@@ -8,7 +8,6 @@ use crate::cli::commands::{
|
||||
use crate::env::vars::{NYMNODE_CONFIG_ENV_FILE_ARG, NYMNODE_NO_BANNER_ARG};
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use nym_bin_common::bin_info;
|
||||
use std::future::Future;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
pub(crate) mod commands;
|
||||
@@ -22,6 +21,43 @@ fn pretty_build_info_static() -> &'static str {
|
||||
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
|
||||
}
|
||||
|
||||
/// OpenTelemetry-related CLI arguments. Only present when built with the `otel` feature.
|
||||
#[cfg(feature = "otel")]
|
||||
#[derive(Args, Debug, Clone)]
|
||||
pub(crate) struct OtelArgs {
|
||||
/// Enable OpenTelemetry tracing export via OTLP/gRPC.
|
||||
#[clap(long, env = "NYMNODE_OTEL_ENABLE")]
|
||||
pub(crate) otel: bool,
|
||||
|
||||
/// OpenTelemetry OTLP collector endpoint (gRPC).
|
||||
/// Only used when --otel is enabled.
|
||||
/// For SigNoz Cloud use https://ingest.<region>.signoz.cloud:443
|
||||
#[clap(
|
||||
long,
|
||||
env = "NYMNODE_OTEL_ENDPOINT",
|
||||
default_value = "http://localhost:4317"
|
||||
)]
|
||||
pub(crate) otel_endpoint: String,
|
||||
|
||||
/// SigNoz Cloud ingestion key for authenticated OTLP export.
|
||||
/// Only needed for SigNoz Cloud (not self-hosted).
|
||||
#[clap(long, env = "NYMNODE_OTEL_KEY")]
|
||||
pub(crate) otel_key: Option<String>,
|
||||
|
||||
/// Deployment environment label attached to all exported traces.
|
||||
/// Used to distinguish sandbox / mainnet / canary in the OTel backend.
|
||||
#[clap(long, env = "NYMNODE_OTEL_ENV", default_value = "mainnet")]
|
||||
pub(crate) otel_env: String,
|
||||
|
||||
/// Trace sampling ratio (0.0 to 1.0). e.g. 0.1 = 10%% of traces exported. Reduces cost.
|
||||
#[clap(long, env = "NYMNODE_OTEL_SAMPLE_RATIO", default_value = "0.1")]
|
||||
pub(crate) otel_sample_ratio: f64,
|
||||
|
||||
/// Timeout in seconds for each OTLP export batch. Prevents unbounded blocking.
|
||||
#[clap(long, env = "NYMNODE_OTEL_EXPORT_TIMEOUT", default_value = "10")]
|
||||
pub(crate) otel_export_timeout: u64,
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Cli {
|
||||
@@ -40,44 +76,82 @@ pub(crate) struct Cli {
|
||||
)]
|
||||
pub(crate) no_banner: bool,
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
#[clap(flatten)]
|
||||
pub(crate) otel: OtelArgs,
|
||||
|
||||
#[clap(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
fn execute_async<F: Future>(fut: F) -> anyhow::Result<F::Output> {
|
||||
Ok(tokio::runtime::Builder::new_multi_thread()
|
||||
pub(crate) fn execute(self) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()?
|
||||
.block_on(fut))
|
||||
.build()?;
|
||||
|
||||
// Set up tracing inside the runtime so the OTel batch exporter (when enabled)
|
||||
// can spawn its background tasks on the tokio reactor.
|
||||
let use_otel = matches!(self.command, Commands::Run(..));
|
||||
let _otel_guard = runtime.block_on(async { self.setup_logging(use_otel) })?;
|
||||
|
||||
// `_otel_guard` is dropped at function exit, flushing pending spans via its Drop impl
|
||||
runtime.block_on(async {
|
||||
match self.command {
|
||||
Commands::BuildInfo(args) => build_info::execute(args)?,
|
||||
Commands::BondingInformation(args) => bonding_information::execute(args).await?,
|
||||
Commands::NodeDetails(args) => node_details::execute(args).await?,
|
||||
Commands::Run(args) => run::execute(*args).await?,
|
||||
Commands::Migrate(args) => migrate::execute(*args)?,
|
||||
Commands::Sign(args) => sign::execute(args).await?,
|
||||
Commands::TestThroughput(args) => test_throughput::execute(args)?,
|
||||
Commands::UnsafeResetSphinxKeys(args) => reset_sphinx_keys::execute(args).await?,
|
||||
Commands::Debug(debug) => match debug.command {
|
||||
DebugCommands::ResetProvidersGatewayDbs(args) => {
|
||||
debug::reset_providers_dbs::execute(args).await?
|
||||
}
|
||||
},
|
||||
}
|
||||
Ok::<(), anyhow::Error>(())
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn execute(self) -> anyhow::Result<()> {
|
||||
// NOTE: `test_throughput` sets up its own logger as it has to include additional layers
|
||||
if !matches!(self.command, Commands::TestThroughput(..)) {
|
||||
crate::logging::setup_tracing_logger()?;
|
||||
#[cfg(feature = "otel")]
|
||||
fn build_otel_config(&self) -> Option<crate::logging::OtelConfig> {
|
||||
if self.otel.otel {
|
||||
Some(crate::logging::OtelConfig {
|
||||
endpoint: self.otel.otel_endpoint.clone(),
|
||||
service_name: "nym-node".to_string(),
|
||||
ingestion_key: self.otel.otel_key.clone(),
|
||||
environment: self.otel.otel_env.clone(),
|
||||
sample_ratio: self.otel.otel_sample_ratio,
|
||||
export_timeout_secs: self.otel.otel_export_timeout,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
match self.command {
|
||||
Commands::BuildInfo(args) => build_info::execute(args)?,
|
||||
Commands::BondingInformation(args) => {
|
||||
{ Self::execute_async(bonding_information::execute(args))? }?
|
||||
}
|
||||
Commands::NodeDetails(args) => { Self::execute_async(node_details::execute(args))? }?,
|
||||
Commands::Run(args) => { Self::execute_async(run::execute(*args))? }?,
|
||||
Commands::Migrate(args) => migrate::execute(*args)?,
|
||||
Commands::Sign(args) => { Self::execute_async(sign::execute(args))? }?,
|
||||
Commands::TestThroughput(args) => test_throughput::execute(args)?,
|
||||
Commands::UnsafeResetSphinxKeys(args) => {
|
||||
{ Self::execute_async(reset_sphinx_keys::execute(args))? }?
|
||||
}
|
||||
Commands::Debug(debug) => match debug.command {
|
||||
DebugCommands::ResetProvidersGatewayDbs(args) => {
|
||||
{ Self::execute_async(debug::reset_providers_dbs::execute(args))? }?
|
||||
}
|
||||
},
|
||||
#[cfg(feature = "otel")]
|
||||
fn setup_logging(&self, use_otel: bool) -> anyhow::Result<Option<crate::logging::OtelGuard>> {
|
||||
if matches!(self.command, Commands::TestThroughput(..)) {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(())
|
||||
let otel_config = if use_otel {
|
||||
self.build_otel_config()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
crate::logging::setup_tracing_logger(otel_config)
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "otel"))]
|
||||
fn setup_logging(&self, _use_otel: bool) -> anyhow::Result<Option<()>> {
|
||||
if matches!(self.command, Commands::TestThroughput(..)) {
|
||||
return Ok(None);
|
||||
}
|
||||
crate::logging::setup_tracing_logger()?;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+111
-1
@@ -7,6 +7,42 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
use tracing_subscriber::{EnvFilter, Layer};
|
||||
|
||||
/// Configuration for OpenTelemetry OTLP export.
|
||||
#[cfg(feature = "otel")]
|
||||
pub(crate) struct OtelConfig {
|
||||
/// OTLP/gRPC collector endpoint, e.g. `http://localhost:4317`
|
||||
/// or `https://ingest.eu.signoz.cloud:443` for SigNoz Cloud.
|
||||
pub endpoint: String,
|
||||
/// Service name reported to the collector (appears in SigNoz "Services" view).
|
||||
pub service_name: String,
|
||||
/// Optional SigNoz Cloud ingestion key for authenticated export.
|
||||
/// Sent as the `signoz-ingestion-key` gRPC metadata header.
|
||||
pub ingestion_key: Option<String>,
|
||||
/// Deployment environment label, e.g. `mainnet`, `sandbox`, `canary`.
|
||||
/// Attached as the `deployment.environment` OTel resource attribute.
|
||||
pub environment: String,
|
||||
/// Trace sampling ratio in 0.0..=1.0 (e.g. 0.1 = 10% of traces). Used to limit cost.
|
||||
pub sample_ratio: f64,
|
||||
/// Timeout in seconds for each OTLP export batch. Prevents unbounded blocking.
|
||||
pub export_timeout_secs: u64,
|
||||
}
|
||||
|
||||
/// Handle returned when OTel is active. Flushes pending spans on drop
|
||||
/// to prevent telemetry loss during panics or early exits.
|
||||
#[cfg(feature = "otel")]
|
||||
pub(crate) struct OtelGuard {
|
||||
pub provider: opentelemetry_sdk::trace::SdkTracerProvider,
|
||||
}
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
impl Drop for OtelGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.provider.shutdown() {
|
||||
eprintln!("OpenTelemetry shutdown error in Drop: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
|
||||
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
|
||||
directive.into().parse().map_err(From::from)
|
||||
@@ -22,12 +58,86 @@ pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
|
||||
Ok(filter)
|
||||
}
|
||||
|
||||
/// Initialise the tracing subscriber stack.
|
||||
///
|
||||
/// When the `otel` feature is enabled **and** an `OtelConfig` is supplied, an
|
||||
/// OTLP exporter layer is added and the returned `OtelGuard` must be used to
|
||||
/// flush pending spans on shutdown.
|
||||
#[cfg(feature = "otel")]
|
||||
pub(crate) fn setup_tracing_logger(otel: Option<OtelConfig>) -> anyhow::Result<Option<OtelGuard>> {
|
||||
let stderr_layer =
|
||||
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
|
||||
|
||||
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
|
||||
let console_layer = console_subscriber::spawn();
|
||||
|
||||
if let Some(otel_config) = otel {
|
||||
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
|
||||
&otel_config.service_name,
|
||||
&otel_config.endpoint,
|
||||
otel_config.ingestion_key.as_deref(),
|
||||
&otel_config.environment,
|
||||
otel_config.sample_ratio,
|
||||
otel_config.export_timeout_secs,
|
||||
).map_err(|e| anyhow::anyhow!(
|
||||
"failed to initialise OpenTelemetry exporter (endpoint: {}, service: {}): {e}",
|
||||
otel_config.endpoint,
|
||||
otel_config.service_name,
|
||||
))?;
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(console_layer)
|
||||
.with(stderr_layer)
|
||||
.with(otel_layer)
|
||||
.init();
|
||||
|
||||
Ok(Some(OtelGuard { provider }))
|
||||
} else {
|
||||
tracing_subscriber::registry()
|
||||
.with(console_layer)
|
||||
.with(stderr_layer)
|
||||
.init();
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
if let Some(otel_config) = otel {
|
||||
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
|
||||
&otel_config.service_name,
|
||||
&otel_config.endpoint,
|
||||
otel_config.ingestion_key.as_deref(),
|
||||
&otel_config.environment,
|
||||
otel_config.sample_ratio,
|
||||
otel_config.export_timeout_secs,
|
||||
).map_err(|e| anyhow::anyhow!(
|
||||
"failed to initialise OpenTelemetry exporter (endpoint: {}, service: {}): {e}",
|
||||
otel_config.endpoint,
|
||||
otel_config.service_name,
|
||||
))?;
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(stderr_layer)
|
||||
.with(otel_layer)
|
||||
.init();
|
||||
|
||||
Ok(Some(OtelGuard { provider }))
|
||||
} else {
|
||||
tracing_subscriber::registry()
|
||||
.with(stderr_layer)
|
||||
.init();
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}}
|
||||
}
|
||||
|
||||
/// Non-OTel variant -- identical subscriber stack without the OTLP layer.
|
||||
#[cfg(not(feature = "otel"))]
|
||||
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
|
||||
let stderr_layer =
|
||||
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
|
||||
|
||||
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
|
||||
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
|
||||
let console_layer = console_subscriber::spawn();
|
||||
|
||||
tracing_subscriber::registry()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node::key_rotation::active_keys::SphinxKeyGuard;
|
||||
use crate::node::mixnet::shared::SharedData;
|
||||
use futures::StreamExt;
|
||||
use nym_noise::connection::Connection;
|
||||
@@ -20,7 +21,10 @@ use std::net::SocketAddr;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::codec::Framed;
|
||||
use tracing::{debug, error, instrument, trace, warn};
|
||||
use tracing::{Span, debug, error, instrument, trace, warn};
|
||||
|
||||
/// How often (in packets) the stream-level span updates its packet count.
|
||||
const SPAN_UPDATE_INTERVAL: u64 = 10_000;
|
||||
|
||||
struct PendingReplayCheckPackets {
|
||||
// map of rotation id used for packet creation to the packets
|
||||
@@ -51,6 +55,10 @@ impl PendingReplayCheckPackets {
|
||||
.push(packet.packet)
|
||||
}
|
||||
|
||||
fn total_count(&self) -> usize {
|
||||
self.packets.values().map(|v| v.len()).sum()
|
||||
}
|
||||
|
||||
fn replay_tags(&self) -> HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>> {
|
||||
let mut replay_tags = HashMap::with_capacity(self.packets.len());
|
||||
'outer: for (rotation_id, packets) in &self.packets {
|
||||
@@ -130,20 +138,54 @@ impl ConnectionHandler {
|
||||
Some(now + delay)
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.forward_packet",
|
||||
skip(self, mix_packet, delay),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote_addr = %self.remote_address,
|
||||
delay_ms = tracing::field::Empty,
|
||||
)
|
||||
)]
|
||||
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
|
||||
if !self.shared.processing_config.forward_hop_processing_enabled {
|
||||
trace!("this nym-node does not support forward hop packets");
|
||||
warn!(
|
||||
event = "packet.dropped.forward_disabled",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: forward hop processing disabled"
|
||||
);
|
||||
self.shared.dropped_forward_packet(self.remote_address.ip());
|
||||
return;
|
||||
}
|
||||
|
||||
let forward_instant = self.create_delay_target(now, delay);
|
||||
if let Some(target) = forward_instant {
|
||||
Span::current().record(
|
||||
"delay_ms",
|
||||
target.saturating_duration_since(now).as_millis() as u64,
|
||||
);
|
||||
}
|
||||
self.shared.forward_mix_packet(mix_packet, forward_instant);
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.final_hop",
|
||||
skip(self, final_hop_data),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote_addr = %self.remote_address,
|
||||
client_online,
|
||||
disk_fallback = false,
|
||||
ack_forwarded = false,
|
||||
)
|
||||
)]
|
||||
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop) {
|
||||
if !self.shared.processing_config.final_hop_processing_enabled {
|
||||
trace!("this nym-node does not support final hop packets");
|
||||
warn!(
|
||||
event = "packet.dropped.final_hop_disabled",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: final hop processing disabled"
|
||||
);
|
||||
self.shared
|
||||
.dropped_final_hop_packet(self.remote_address.ip());
|
||||
return;
|
||||
@@ -151,11 +193,13 @@ impl ConnectionHandler {
|
||||
|
||||
let client = final_hop_data.destination;
|
||||
let message = final_hop_data.message;
|
||||
let has_ack = final_hop_data.forward_ack.is_some();
|
||||
|
||||
// if possible attempt to push message directly to the client
|
||||
match self.shared.try_push_message_to_client(client, message) {
|
||||
Err(unsent_plaintext) => {
|
||||
// if that failed, store it on disk (to be 🔥 soon...)
|
||||
// if that failed, store it on disk
|
||||
Span::current().record("client_online", false);
|
||||
match self
|
||||
.shared
|
||||
.store_processed_packet_payload(client, unsent_plaintext)
|
||||
@@ -163,6 +207,7 @@ impl ConnectionHandler {
|
||||
{
|
||||
Err(err) => error!("Failed to store client data - {err}"),
|
||||
Ok(_) => {
|
||||
Span::current().record("disk_fallback", true);
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
@@ -172,13 +217,18 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => trace!("Pushed received packet to {client}"),
|
||||
Ok(_) => {
|
||||
Span::current().record("client_online", true);
|
||||
trace!("Pushed received packet to {client}");
|
||||
}
|
||||
}
|
||||
|
||||
// if we managed to either push message directly to the [online] client or store it at
|
||||
// its inbox, it means that it must exist at this gateway, hence we can send the
|
||||
// received ack back into the network
|
||||
// disk, forward the ack
|
||||
self.shared.forward_ack_packet(final_hop_data.forward_ack);
|
||||
if has_ack {
|
||||
Span::current().record("ack_forwarded", true);
|
||||
}
|
||||
}
|
||||
|
||||
fn within_deferral_threshold(&self, now: Instant) -> bool {
|
||||
@@ -206,32 +256,86 @@ impl ConnectionHandler {
|
||||
|
||||
if !time_threshold {
|
||||
warn!(
|
||||
"{}: time failure - {}",
|
||||
event = "replay_detection.deferral_exceeded",
|
||||
threshold_type = "time",
|
||||
deferred_count = self.pending_packets.total_count(),
|
||||
deferral_ms = now.saturating_duration_since(self.pending_packets.last_acquired_mutex).as_millis() as u64,
|
||||
remote_addr = %self.remote_address,
|
||||
"{}: time deferral threshold exceeded with {} pending packets",
|
||||
self.remote_address,
|
||||
self.pending_packets.packets.len()
|
||||
self.pending_packets.total_count()
|
||||
)
|
||||
}
|
||||
|
||||
if !count_threshold {
|
||||
warn!("{}, count failure", self.remote_address)
|
||||
warn!(
|
||||
event = "replay_detection.deferral_exceeded",
|
||||
threshold_type = "count",
|
||||
deferred_count = self.pending_packets.total_count(),
|
||||
remote_addr = %self.remote_address,
|
||||
"{}: count deferral threshold exceeded",
|
||||
self.remote_address
|
||||
)
|
||||
}
|
||||
|
||||
time_threshold && count_threshold
|
||||
}
|
||||
|
||||
/// Resolve the sphinx key for the given rotation, recording the rotation
|
||||
/// label on the current tracing span. Returns `ExpiredKey` if the requested
|
||||
/// odd/even key has already been rotated out.
|
||||
fn resolve_rotation_key(
|
||||
&self,
|
||||
rotation: SphinxKeyRotation,
|
||||
) -> Result<SphinxKeyGuard, PacketProcessingError> {
|
||||
let rotation_label = match rotation {
|
||||
SphinxKeyRotation::Unknown => "unknown",
|
||||
SphinxKeyRotation::OddRotation => "odd",
|
||||
SphinxKeyRotation::EvenRotation => "even",
|
||||
};
|
||||
Span::current().record("key_rotation", rotation_label);
|
||||
|
||||
match rotation {
|
||||
SphinxKeyRotation::Unknown => Ok(self.shared.sphinx_keys.primary()),
|
||||
SphinxKeyRotation::OddRotation => self.shared.sphinx_keys.odd().ok_or_else(|| {
|
||||
warn!(
|
||||
event = "packet.dropped.expired_key",
|
||||
key_rotation = "odd",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: odd key rotation expired"
|
||||
);
|
||||
PacketProcessingError::ExpiredKey
|
||||
}),
|
||||
SphinxKeyRotation::EvenRotation => self.shared.sphinx_keys.even().ok_or_else(|| {
|
||||
warn!(
|
||||
event = "packet.dropped.expired_key",
|
||||
key_rotation = "even",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: even key rotation expired"
|
||||
);
|
||||
PacketProcessingError::ExpiredKey
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.sphinx_partial_unwrap",
|
||||
skip(self, packet),
|
||||
level = "debug",
|
||||
fields(key_rotation, unwrap_result,)
|
||||
)]
|
||||
fn try_partially_unwrap_packet(
|
||||
&self,
|
||||
packet: FramedNymPacket,
|
||||
) -> Result<PartialyUnwrappedPacketWithKeyRotation, PacketProcessingError> {
|
||||
// based on the received sphinx key rotation information,
|
||||
// attempt to choose appropriate key for processing the packet
|
||||
match packet.header().key_rotation {
|
||||
let rotation = packet.header().key_rotation;
|
||||
|
||||
let result = match rotation {
|
||||
SphinxKeyRotation::Unknown => {
|
||||
let primary = self.shared.sphinx_keys.primary();
|
||||
// Unknown rotation: try primary, fallback to secondary
|
||||
let primary = self.resolve_rotation_key(rotation)?;
|
||||
let primary_rotation = primary.rotation_id();
|
||||
|
||||
// we have to try both keys, start with the primary as it has higher likelihood of being correct
|
||||
// if let Ok(partially_unwrapped) = PartiallyUnwrappedPacket::new()
|
||||
match PartiallyUnwrappedPacket::new(packet, primary.inner().as_ref()) {
|
||||
Ok(unwrapped_packet) => {
|
||||
Ok(unwrapped_packet.with_key_rotation(primary_rotation))
|
||||
@@ -248,25 +352,17 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
SphinxKeyRotation::OddRotation => {
|
||||
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
|
||||
return Err(PacketProcessingError::ExpiredKey);
|
||||
};
|
||||
let odd_rotation = odd_key.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet, odd_key.inner().as_ref())
|
||||
_ => {
|
||||
let key = self.resolve_rotation_key(rotation)?;
|
||||
let rotation_id = key.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet, key.inner().as_ref())
|
||||
.map_err(|(_, err)| err)
|
||||
.map(|p| p.with_key_rotation(odd_rotation))
|
||||
.map(|p| p.with_key_rotation(rotation_id))
|
||||
}
|
||||
SphinxKeyRotation::EvenRotation => {
|
||||
let Some(even_key) = self.shared.sphinx_keys.even() else {
|
||||
return Err(PacketProcessingError::ExpiredKey);
|
||||
};
|
||||
let even_rotation = even_key.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet, even_key.inner().as_ref())
|
||||
.map_err(|(_, err)| err)
|
||||
.map(|p| p.with_key_rotation(even_rotation))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Span::current().record("unwrap_result", if result.is_ok() { "ok" } else { "err" });
|
||||
result
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_replay_detection(
|
||||
@@ -280,6 +376,12 @@ impl ConnectionHandler {
|
||||
Ok(unwrapped) => unwrapped,
|
||||
Err(err) => {
|
||||
trace!("failed to process received mix packet: {err}");
|
||||
warn!(
|
||||
event = "packet.dropped.malformed",
|
||||
error = %err,
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping malformed packet"
|
||||
);
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
@@ -316,7 +418,9 @@ impl ConnectionHandler {
|
||||
|
||||
// 3. forward the packet to the relevant sink (if enabled)
|
||||
match unwrapped_packet {
|
||||
Err(err) => trace!("failed to process received mix packet: {err}"),
|
||||
Err(err) => {
|
||||
trace!("failed to process received mix packet: {err}");
|
||||
}
|
||||
Ok(processed_packet) => match processed_packet.processing_data {
|
||||
MixProcessingResultData::ForwardHop { packet, delay } => {
|
||||
self.handle_forward_packet(now, packet, delay);
|
||||
@@ -334,6 +438,7 @@ impl ConnectionHandler {
|
||||
packets: HashMap<u32, Vec<PartiallyUnwrappedPacket>>,
|
||||
replay_check_results: HashMap<u32, Vec<bool>>,
|
||||
) {
|
||||
let mut replays_detected: u64 = 0;
|
||||
for (rotation_id, packets) in packets {
|
||||
let Some(replay_checks) = replay_check_results.get(&rotation_id) else {
|
||||
// this should never happen, but if we messed up, and it does, don't panic, just drop the packets
|
||||
@@ -342,6 +447,13 @@ impl ConnectionHandler {
|
||||
};
|
||||
for (packet, &replayed) in packets.into_iter().zip(replay_checks) {
|
||||
let unwrapped_packet = if replayed {
|
||||
replays_detected += 1;
|
||||
warn!(
|
||||
event = "packet.dropped.replay",
|
||||
remote_addr = %self.remote_address,
|
||||
rotation_id,
|
||||
"dropping replayed packet"
|
||||
);
|
||||
Err(PacketProcessingError::PacketReplay)
|
||||
} else {
|
||||
packet.finalise_unwrapping()
|
||||
@@ -350,6 +462,13 @@ impl ConnectionHandler {
|
||||
self.handle_unwrapped_packet(now, unwrapped_packet).await;
|
||||
}
|
||||
}
|
||||
if replays_detected > 0 {
|
||||
debug!(
|
||||
replays_detected,
|
||||
remote_addr = %self.remote_address,
|
||||
"replay detection batch completed with replays"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_pending_packets_batch_no_locking(&mut self, now: Instant) -> bool {
|
||||
@@ -379,13 +498,22 @@ impl ConnectionHandler {
|
||||
true
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.replay_check_batch",
|
||||
skip(self),
|
||||
level = "debug",
|
||||
fields(batch_size, mutex_wait_ms,)
|
||||
)]
|
||||
async fn handle_pending_packets_batch(&mut self, now: Instant) {
|
||||
let batch = self.pending_packets.reset(now);
|
||||
let replay_tags = self.pending_packets.replay_tags();
|
||||
if replay_tags.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let batch_size = self.pending_packets.total_count();
|
||||
Span::current().record("batch_size", batch_size as u64);
|
||||
|
||||
let mutex_start = Instant::now();
|
||||
let Ok(replay_check_results) = self
|
||||
.shared
|
||||
.replay_protection_filter
|
||||
@@ -396,37 +524,25 @@ impl ConnectionHandler {
|
||||
self.shared.shutdown_token.cancel();
|
||||
return;
|
||||
};
|
||||
Span::current().record("mutex_wait_ms", mutex_start.elapsed().as_millis() as u64);
|
||||
|
||||
let batch = self.pending_packets.reset(now);
|
||||
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.sphinx_full_unwrap",
|
||||
skip(self, packet),
|
||||
level = "debug",
|
||||
fields(key_rotation)
|
||||
)]
|
||||
fn try_full_unwrap_packet(
|
||||
&self,
|
||||
packet: FramedNymPacket,
|
||||
) -> Result<MixProcessingResult, PacketProcessingError> {
|
||||
// based on the received sphinx key rotation information,
|
||||
// attempt to choose appropriate key for processing the packet
|
||||
// NOTE: due to the function signatures, outfox packets will **only** attempt primary key
|
||||
// if no rotation information is available (but that's fine given outfox is not really in use,
|
||||
// and by the time we need it, the rotation info should be present)
|
||||
match packet.header().key_rotation {
|
||||
SphinxKeyRotation::Unknown => {
|
||||
process_framed_packet(packet, self.shared.sphinx_keys.primary().inner().as_ref())
|
||||
}
|
||||
SphinxKeyRotation::OddRotation => {
|
||||
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
|
||||
return Err(PacketProcessingError::ExpiredKey);
|
||||
};
|
||||
process_framed_packet(packet, odd_key.inner().as_ref())
|
||||
}
|
||||
SphinxKeyRotation::EvenRotation => {
|
||||
let Some(even_key) = self.shared.sphinx_keys.even() else {
|
||||
return Err(PacketProcessingError::ExpiredKey);
|
||||
};
|
||||
process_framed_packet(packet, even_key.inner().as_ref())
|
||||
}
|
||||
}
|
||||
let key = self.resolve_rotation_key(packet.header().key_rotation)?;
|
||||
process_framed_packet(packet, key.inner().as_ref())
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_no_replay_detection(
|
||||
@@ -456,23 +572,36 @@ impl ConnectionHandler {
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
skip(self),
|
||||
name = "mixnode.connection",
|
||||
skip(self, socket),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote = %self.remote_address
|
||||
remote = %self.remote_address,
|
||||
noise_handshake_ms = tracing::field::Empty,
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn handle_connection(&mut self, socket: TcpStream) {
|
||||
let handshake_start = Instant::now();
|
||||
let noise_stream = match upgrade_noise_responder(socket, &self.shared.noise_config).await {
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to perform Noise handshake with {:?} - {err}",
|
||||
self.remote_address
|
||||
Span::current().record(
|
||||
"noise_handshake_ms",
|
||||
handshake_start.elapsed().as_millis() as u64,
|
||||
);
|
||||
warn!(
|
||||
event = "connection.failed.noise",
|
||||
remote_addr = %self.remote_address,
|
||||
error = %err,
|
||||
"Noise responder handshake failed"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
Span::current().record(
|
||||
"noise_handshake_ms",
|
||||
handshake_start.elapsed().as_millis() as u64,
|
||||
);
|
||||
debug!(
|
||||
"Noise responder handshake completed for {:?}",
|
||||
self.remote_address
|
||||
@@ -481,26 +610,58 @@ impl ConnectionHandler {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.stream",
|
||||
skip(self, mixnet_connection),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote = %self.remote_address,
|
||||
packets_processed = 0u64,
|
||||
exit_reason,
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn handle_stream(
|
||||
&mut self,
|
||||
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
|
||||
) {
|
||||
let mut packets_processed: u64 = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shared.shutdown_token.cancelled() => {
|
||||
trace!("connection handler: received shutdown");
|
||||
Span::current().record("exit_reason", "shutdown");
|
||||
break
|
||||
}
|
||||
maybe_framed_nym_packet = mixnet_connection.next() => {
|
||||
match maybe_framed_nym_packet {
|
||||
Some(Ok(packet)) => self.handle_received_nym_packet(packet).await,
|
||||
Some(Ok(packet)) => {
|
||||
self.handle_received_nym_packet(packet).await;
|
||||
packets_processed += 1;
|
||||
if packets_processed.is_multiple_of(SPAN_UPDATE_INTERVAL) {
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
debug!("connection got corrupted with: {err}");
|
||||
warn!(
|
||||
event = "connection.corrupted",
|
||||
remote_addr = %self.remote_address,
|
||||
error = %err,
|
||||
packets_processed,
|
||||
"connection stream corrupted"
|
||||
);
|
||||
Span::current().record("exit_reason", "corrupted");
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
return
|
||||
}
|
||||
None => {
|
||||
debug!("connection got closed by the remote");
|
||||
debug!(
|
||||
remote_addr = %self.remote_address,
|
||||
packets_processed,
|
||||
"connection closed by remote"
|
||||
);
|
||||
Span::current().record("exit_reason", "closed_by_remote");
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -508,6 +669,7 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
debug!("exiting and closing connection");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,11 +56,17 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
if let Err(err) = self.mixnet_client.send_without_response(packet) {
|
||||
if err.kind() == io::ErrorKind::WouldBlock {
|
||||
// we only know for sure if we dropped a packet if our sending queue was full
|
||||
// in any other case the connection might still be re-established (or created for the first time)
|
||||
// and the packet might get sent, but we won't know about it
|
||||
warn!(
|
||||
event = "packet.dropped.buffer_full",
|
||||
next_hop = %next_hop,
|
||||
"dropping packet: egress connection buffer full (WouldBlock)"
|
||||
);
|
||||
self.metrics.mixnet.egress_dropped_forward_packet(next_hop)
|
||||
} else if err.kind() == io::ErrorKind::NotConnected {
|
||||
// let's give the benefit of the doubt and assume we manage to establish connection
|
||||
debug!(
|
||||
next_hop = %next_hop,
|
||||
"packet queued for not-yet-connected peer"
|
||||
);
|
||||
self.metrics.mixnet.egress_sent_forward_packet(next_hop)
|
||||
}
|
||||
} else {
|
||||
@@ -86,7 +92,11 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
let next_hop = new_packet.packet.next_hop();
|
||||
|
||||
if !self.routing_filter.should_route(next_hop.as_ref().ip()) {
|
||||
debug!("dropping packet as the egress address does not belong to any known node");
|
||||
warn!(
|
||||
event = "packet.dropped.routing_filter",
|
||||
next_hop = %next_hop,
|
||||
"dropping packet: egress address does not belong to any known node"
|
||||
);
|
||||
self.metrics
|
||||
.mixnet
|
||||
.egress_dropped_forward_packet(next_hop.into());
|
||||
@@ -125,7 +135,7 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
C: SendWithoutResponse,
|
||||
F: RoutingFilter,
|
||||
{
|
||||
let mut processed = 0;
|
||||
let mut processed: u64 = 0;
|
||||
trace!("starting PacketForwarder");
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -145,11 +155,29 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
#[allow(clippy::unwrap_used)]
|
||||
self.handle_new_packet(new_packet.unwrap());
|
||||
let channel_len = self.packet_sender.len();
|
||||
if processed % 1000 == 0 {
|
||||
let delay_queue_len = self.delay_queue.len();
|
||||
if processed.is_multiple_of(1000) {
|
||||
match channel_len {
|
||||
n if n > 1000 => error!("there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"),
|
||||
n if n > 500 => warn!("there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"),
|
||||
n => trace!("there are currently {n} mix packets waiting to get forwarded"),
|
||||
n if n > 1000 => error!(
|
||||
event = "forwarder.queue_overload",
|
||||
channel_depth = n,
|
||||
delay_queue_depth = delay_queue_len,
|
||||
packets_processed = processed,
|
||||
"there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"
|
||||
),
|
||||
n if n > 500 => warn!(
|
||||
event = "forwarder.queue_high",
|
||||
channel_depth = n,
|
||||
delay_queue_depth = delay_queue_len,
|
||||
packets_processed = processed,
|
||||
"there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"
|
||||
),
|
||||
n => trace!(
|
||||
channel_depth = n,
|
||||
delay_queue_depth = delay_queue_len,
|
||||
packets_processed = processed,
|
||||
"forwarder queue status"
|
||||
),
|
||||
}
|
||||
}
|
||||
self.update_channel_size_metric(channel_len);
|
||||
|
||||
@@ -5,7 +5,8 @@ use nym_gateway::node::{
|
||||
ActiveClientsStore, GatewayStorage, GatewayStorageError, InboxGatewayStorage,
|
||||
};
|
||||
use nym_sphinx_types::DestinationAddressBytes;
|
||||
use tracing::debug;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SharedFinalHopData {
|
||||
@@ -27,14 +28,37 @@ impl SharedFinalHopData {
|
||||
message: Vec<u8>,
|
||||
) -> Result<(), Vec<u8>> {
|
||||
match self.active_clients.get_sender(client_address) {
|
||||
None => Err(message),
|
||||
None => {
|
||||
debug!(
|
||||
event = "gateway.push_to_client",
|
||||
client_found = false,
|
||||
send_result = "client_not_found",
|
||||
"client {client_address} not found in active clients"
|
||||
);
|
||||
Err(message)
|
||||
}
|
||||
Some(sender_channel) => {
|
||||
let send_start = Instant::now();
|
||||
if let Err(unsent) = sender_channel.unbounded_send(vec![message]) {
|
||||
warn!(
|
||||
event = "gateway.push_to_client",
|
||||
client_found = true,
|
||||
send_result = "channel_closed",
|
||||
send_us = send_start.elapsed().as_micros() as u64,
|
||||
"client {client_address} channel closed, message not delivered"
|
||||
);
|
||||
// the unwrap here is fine as the original message got returned;
|
||||
// plus we're only ever sending 1 message at the time (for now)
|
||||
#[allow(clippy::unwrap_used)]
|
||||
Err(unsent.into_inner().pop().unwrap())
|
||||
} else {
|
||||
debug!(
|
||||
event = "gateway.push_to_client",
|
||||
client_found = true,
|
||||
send_result = "ok",
|
||||
send_us = send_start.elapsed().as_micros() as u64,
|
||||
"pushed message to client {client_address}"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -46,8 +70,21 @@ impl SharedFinalHopData {
|
||||
client_address: DestinationAddressBytes,
|
||||
message: Vec<u8>,
|
||||
) -> Result<(), GatewayStorageError> {
|
||||
let start = Instant::now();
|
||||
debug!("Storing received message for {client_address} on the disk...",);
|
||||
|
||||
self.storage.store_message(client_address, message).await
|
||||
let result = self.storage.store_message(client_address, message).await;
|
||||
let store_us = start.elapsed().as_micros() as u64;
|
||||
if result.is_ok() {
|
||||
debug!(
|
||||
event = "gateway.disk_store",
|
||||
store_us, "stored message for {client_address} on disk in {store_us}us"
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
event = "gateway.disk_store_failed",
|
||||
store_us, "failed to store message for {client_address} on disk after {store_us}us"
|
||||
);
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -185,6 +185,7 @@ impl SharedData {
|
||||
}
|
||||
|
||||
pub(super) fn forward_mix_packet(&self, packet: MixPacket, delay_until: Option<Instant>) {
|
||||
let has_delay = delay_until.is_some();
|
||||
if self
|
||||
.mixnet_forwarder
|
||||
.forward_packet(PacketToForward::new(packet, delay_until))
|
||||
@@ -192,6 +193,8 @@ impl SharedData {
|
||||
&& !self.shutdown_token.is_cancelled()
|
||||
{
|
||||
error!(
|
||||
event = "forwarder.channel_send_failed",
|
||||
has_delay,
|
||||
"failed to forward sphinx packet on the channel while the process is not going through the shutdown!"
|
||||
);
|
||||
self.shutdown_token.cancel();
|
||||
|
||||
Reference in New Issue
Block a user