otel: add tracing

This commit is contained in:
Tommy Verrall
2026-02-16 13:46:17 +01:00
parent d93d25ebae
commit 40a3cd28b7
12 changed files with 2191 additions and 1763 deletions
Generated
+1596 -1613
View File
File diff suppressed because it is too large Load Diff
+4 -4
View File
@@ -309,8 +309,9 @@ nix = "0.30.1"
notify = "5.1.0"
num_enum = "0.7.5"
once_cell = "1.21.3"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
opentelemetry = "0.31.0"
opentelemetry_sdk = "0.31.0"
opentelemetry-otlp = "0.31.0"
parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
@@ -368,9 +369,8 @@ tower = "0.5.2"
tower-http = "0.6.6"
tracing = "0.1.41"
tracing-log = "0.2"
tracing-opentelemetry = "0.19.0"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = "0.3.20"
tracing-tree = "0.2.2"
tracing-indicatif = "0.3.9"
tracing-test = "0.2.5"
ts-rs = "10.1.0"
+11 -9
View File
@@ -19,12 +19,14 @@ serde_json = { workspace = true, optional = true }
## tracing
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
tracing-tree = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
opentelemetry-jaeger = { workspace = true, features = ["rt-tokio", "collector_client", "isahc_collector_client"], optional = true }
tracing-opentelemetry = { workspace = true, optional = true }
utoipa = { workspace = true, optional = true }
opentelemetry = { workspace = true, features = ["rt-tokio"], optional = true }
opentelemetry = { workspace = true, features = ["trace"], optional = true }
## otel-otlp (modern OTLP export to SigNoz/any OTLP collector)
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "trace"], optional = true }
[build-dependencies]
@@ -35,13 +37,13 @@ default = []
openapi = ["utoipa"]
output_format = ["serde_json", "dep:clap"]
bin_info_schema = ["schemars"]
basic_tracing = ["dep:tracing", "tracing-subscriber"]
tracing = [
basic_tracing = ["dep:tracing", "dep:tracing-subscriber"]
otel-otlp = [
"basic_tracing",
"tracing-tree",
"opentelemetry-jaeger",
"tracing-opentelemetry",
"opentelemetry",
"dep:opentelemetry",
"dep:opentelemetry_sdk",
"dep:opentelemetry-otlp",
"dep:tracing-opentelemetry",
]
clap = ["dep:clap", "dep:clap_complete", "dep:clap_complete_fig"]
models = []
+46 -38
View File
@@ -4,16 +4,9 @@
use serde::{Deserialize, Serialize};
use std::io::IsTerminal;
#[cfg(feature = "tracing")]
pub use opentelemetry;
#[cfg(feature = "tracing")]
pub use opentelemetry_jaeger;
#[cfg(feature = "tracing")]
pub use tracing_opentelemetry;
// Re-export tracing_subscriber for consumers that need to compose layers
#[cfg(feature = "basic_tracing")]
pub use tracing_subscriber;
#[cfg(feature = "tracing")]
pub use tracing_tree;
#[derive(Debug, Default, Copy, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
@@ -69,40 +62,55 @@ pub fn setup_tracing_logger() {
build_tracing_logger().init()
}
// TODO: This has to be a macro, running it as a function does not work for the file_appender for some reason
#[cfg(feature = "tracing")]
#[macro_export]
macro_rules! setup_tracing {
($service_name: expr) => {
use nym_bin_common::logging::tracing_subscriber::layer::SubscriberExt;
use nym_bin_common::logging::tracing_subscriber::util::SubscriberInitExt;
/// Initialize an OpenTelemetry tracing layer that exports spans via OTLP/gRPC.
///
/// This produces a layer compatible with `tracing_subscriber::registry()` that
/// sends traces to any OTLP-compatible collector (SigNoz, Grafana Tempo, etc).
///
/// Returns both the tracing layer and the [`SdkTracerProvider`] so the caller
/// can invoke [`SdkTracerProvider::shutdown`] for graceful flush on exit.
///
/// # Arguments
/// * `service_name` - The service name reported to the collector (e.g. "nym-node")
/// * `endpoint` - The OTLP/gRPC collector endpoint (e.g. "http://localhost:4317")
#[cfg(feature = "otel-otlp")]
pub fn init_otel_layer<S>(
service_name: &str,
endpoint: &str,
) -> Result<
(
tracing_opentelemetry::OpenTelemetryLayer<S, opentelemetry_sdk::trace::SdkTracer>,
opentelemetry_sdk::trace::SdkTracerProvider,
),
Box<dyn std::error::Error + Send + Sync>,
>
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
let registry = nym_bin_common::logging::tracing_subscriber::Registry::default()
.with(nym_bin_common::logging::tracing_subscriber::EnvFilter::from_default_env())
.with(
nym_bin_common::logging::tracing_tree::HierarchicalLayer::new(4)
.with_targets(true)
.with_bracketed_fields(true),
);
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()?;
let tracer = nym_bin_common::logging::opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://44.199.230.10:14268/api/traces")
.with_service_name($service_name)
.with_isahc()
.with_trace_config(
nym_bin_common::logging::opentelemetry::sdk::trace::config().with_sampler(
nym_bin_common::logging::opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(
0.1,
),
),
)
.install_batch(nym_bin_common::logging::opentelemetry::runtime::Tokio)
.expect("Could not init tracer");
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(
opentelemetry_sdk::Resource::builder()
.with_service_name(service_name.to_owned())
.build(),
)
.build();
let telemetry = nym_bin_common::logging::tracing_opentelemetry::layer().with_tracer(tracer);
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
let tracer = tracer_provider.tracer(service_name.to_owned());
registry.with(telemetry).init();
};
Ok((
tracing_opentelemetry::layer().with_tracer(tracer),
tracer_provider,
))
}
pub fn banner(crate_name: &str, crate_version: &str) -> String {
+79 -28
View File
@@ -128,54 +128,95 @@ impl ManagedConnection {
async fn run(self) {
let address = self.address;
let reconnection_attempt = self.current_reconnection.load(Ordering::Acquire);
let connect_start = tokio::time::Instant::now();
let connection_fut = TcpStream::connect(address);
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", self.address);
let connect_ms = connect_start.elapsed().as_millis() as u64;
debug!(
peer = %address,
connect_ms,
"Managed to establish connection to {}", self.address
);
let noise_start = tokio::time::Instant::now();
let noise_stream =
match upgrade_noise_initiator(stream, &self.noise_config).await {
Ok(noise_stream) => noise_stream,
Err(err) => {
error!("Failed to perform Noise handshake with {address} - {err}");
// we failed to finish the noise handshake - increase reconnection attempt
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
warn!(
event = "connection.failed.noise",
peer = %address,
error = %err,
connect_ms,
noise_handshake_ms,
reconnection_attempt,
exit_reason = "noise_error",
"Failed to perform Noise initiator handshake with {address}"
);
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
// if we managed to connect AND do the noise handshake, reset the reconnection count (whatever it might have been)
let noise_handshake_ms = noise_start.elapsed().as_millis() as u64;
self.current_reconnection.store(0, Ordering::Release);
debug!("Noise initiator handshake completed for {:?}", address);
debug!(
peer = %address,
connect_ms,
noise_handshake_ms,
"Noise initiator handshake completed for {:?}", address
);
Framed::new(noise_stream, NymCodec)
}
Err(err) => {
debug!("failed to establish connection to {address} (err: {err})",);
let connect_ms = connect_start.elapsed().as_millis() as u64;
warn!(
event = "connection.failed.connect",
peer = %address,
error = %err,
connect_ms,
reconnection_attempt,
exit_reason = "connect_error",
"failed to establish connection to {address}"
);
return;
}
},
Err(_) => {
debug!(
let connect_ms = connect_start.elapsed().as_millis() as u64;
warn!(
event = "connection.failed.timeout",
peer = %address,
timeout_ms = self.connection_timeout.as_millis() as u64,
connect_ms,
reconnection_attempt,
exit_reason = "timeout",
"failed to connect to {address} within {:?}",
self.connection_timeout
);
// we failed to connect - increase reconnection attempt
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};
// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {address}: {err}");
warn!(
event = "connection.forward_error",
peer = %address,
error = %err,
exit_reason = "forward_error",
"Failed to forward packets to {address}: {err}"
);
}
debug!(
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
peer = %address,
exit_reason = "sender_dropped",
"connection manager to {address} finished"
);
}
}
@@ -271,17 +312,16 @@ impl SendWithoutResponse for Client {
let address = packet.next_hop_address();
trace!("Sending packet to {address}");
// TODO: optimisation for the future: rather than constantly using legacy encoding,
// once we're addressing by node_id (and thus have full node info here),
// we could simply infer supported encoding based on their version
let framed_packet =
FramedNymPacket::from_mix_packet(packet, self.config.use_legacy_packet_encoding);
let Some(sender) = self.active_connections.get_mut(&address) else {
// there was never a connection to begin with
debug!("establishing initial connection to {address}");
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
debug!(
event = "mixclient.try_send",
peer = %address,
result = "not_connected",
"establishing initial connection to {address}"
);
self.make_connection(address, framed_packet);
return Err(io::Error::new(
io::ErrorKind::NotConnected,
@@ -289,15 +329,24 @@ impl SendWithoutResponse for Client {
));
};
let channel_capacity = sender.channel.max_capacity();
let channel_available = sender.channel.capacity();
let channel_used = channel_capacity - channel_available;
let sending_res = sender.channel.try_send(framed_packet);
drop(sender);
sending_res.map_err(|err| {
match err {
TrySendError::Full(_) => {
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
warn!(
event = "mixclient.try_send",
peer = %address,
result = "full_dropped",
channel_capacity,
channel_used,
"dropping packet: connection buffer to {address} is full ({channel_used}/{channel_capacity})"
);
io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
@@ -305,11 +354,13 @@ impl SendWithoutResponse for Client {
}
TrySendError::Closed(dropped) => {
debug!(
"Connection to {address} seems to be dead. attempting to re-establish it...",
event = "mixclient.try_send",
peer = %address,
result = "closed_reconnecting",
channel_capacity,
channel_used,
"connection to {address} dead, attempting re-establishment"
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, dropped);
io::Error::new(
io::ErrorKind::ConnectionAborted,
+3
View File
@@ -40,6 +40,8 @@ thiserror.workspace = true
tracing.workspace = true
tracing-indicatif = { workspace = true }
tracing-subscriber.workspace = true
opentelemetry = { workspace = true, features = ["trace"], optional = true }
opentelemetry_sdk = { workspace = true, features = ["trace"], optional = true }
tokio = { workspace = true, features = ["macros", "sync", "rt-multi-thread"] }
tokio-util = { workspace = true, features = ["codec"] }
tokio-stream = { workspace = true }
@@ -135,6 +137,7 @@ rand_chacha = { workspace = true }
[features]
tokio-console = ["console-subscriber", "nym-task/tokio-tracing"]
otel = ["nym-bin-common/otel-otlp", "dep:opentelemetry", "dep:opentelemetry_sdk"]
[lints]
workspace = true
+71 -19
View File
@@ -40,6 +40,20 @@ pub(crate) struct Cli {
)]
pub(crate) no_banner: bool,
/// Enable OpenTelemetry tracing export via OTLP/gRPC.
/// Requires the binary to be built with the `otel` feature.
#[clap(long, env = "NYMNODE_OTEL_ENABLE")]
pub(crate) otel: bool,
/// OpenTelemetry OTLP collector endpoint (gRPC).
/// Only used when --otel is enabled.
#[clap(
long,
env = "NYMNODE_OTEL_ENDPOINT",
default_value = "http://localhost:4317"
)]
pub(crate) otel_endpoint: String,
#[clap(subcommand)]
command: Commands,
}
@@ -54,30 +68,68 @@ impl Cli {
pub(crate) fn execute(self) -> anyhow::Result<()> {
// NOTE: `test_throughput` sets up its own logger as it has to include additional layers
#[cfg(feature = "otel")]
let _otel_guard = if !matches!(self.command, Commands::TestThroughput(..)) {
let otel_config = if self.otel {
Some(crate::logging::OtelConfig {
endpoint: self.otel_endpoint.clone(),
service_name: "nym-node".to_string(),
})
} else {
None
};
crate::logging::setup_tracing_logger(otel_config)?
} else {
None
};
#[cfg(not(feature = "otel"))]
if !matches!(self.command, Commands::TestThroughput(..)) {
crate::logging::setup_tracing_logger()?;
let otel_config = if self.otel {
Some(crate::logging::OtelConfig {
endpoint: self.otel_endpoint.clone(),
service_name: "nym-node".to_string(),
})
} else {
None
};
crate::logging::setup_tracing_logger(otel_config)?;
}
match self.command {
Commands::BuildInfo(args) => build_info::execute(args)?,
Commands::BondingInformation(args) => {
{ Self::execute_async(bonding_information::execute(args))? }?
}
Commands::NodeDetails(args) => { Self::execute_async(node_details::execute(args))? }?,
Commands::Run(args) => { Self::execute_async(run::execute(*args))? }?,
Commands::Migrate(args) => migrate::execute(*args)?,
Commands::Sign(args) => { Self::execute_async(sign::execute(args))? }?,
Commands::TestThroughput(args) => test_throughput::execute(args)?,
Commands::UnsafeResetSphinxKeys(args) => {
{ Self::execute_async(reset_sphinx_keys::execute(args))? }?
}
Commands::Debug(debug) => match debug.command {
DebugCommands::ResetProvidersGatewayDbs(args) => {
{ Self::execute_async(debug::reset_providers_dbs::execute(args))? }?
let result = (|| -> anyhow::Result<()> {
match self.command {
Commands::BuildInfo(args) => build_info::execute(args)?,
Commands::BondingInformation(args) => {
{ Self::execute_async(bonding_information::execute(args))? }?
}
},
Commands::NodeDetails(args) => {
{ Self::execute_async(node_details::execute(args))? }?
}
Commands::Run(args) => { Self::execute_async(run::execute(*args))? }?,
Commands::Migrate(args) => migrate::execute(*args)?,
Commands::Sign(args) => { Self::execute_async(sign::execute(args))? }?,
Commands::TestThroughput(args) => test_throughput::execute(args)?,
Commands::UnsafeResetSphinxKeys(args) => {
{ Self::execute_async(reset_sphinx_keys::execute(args))? }?
}
Commands::Debug(debug) => match debug.command {
DebugCommands::ResetProvidersGatewayDbs(args) => {
{ Self::execute_async(debug::reset_providers_dbs::execute(args))? }?
}
},
}
Ok(())
})();
// Flush any pending OTel spans before exit
#[cfg(feature = "otel")]
if let Some(guard) = _otel_guard {
if let Err(e) = guard.provider.shutdown() {
eprintln!("OpenTelemetry shutdown error: {e}");
}
}
Ok(())
result
}
}
+77 -2
View File
@@ -7,6 +7,20 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
/// Configuration for OpenTelemetry OTLP export.
/// Fields are only read when the `otel` feature is enabled.
#[allow(dead_code)]
pub(crate) struct OtelConfig {
pub endpoint: String,
pub service_name: String,
}
/// Handle returned when OTel is active so the caller can trigger a graceful shutdown.
#[cfg(feature = "otel")]
pub(crate) struct OtelGuard {
pub provider: opentelemetry_sdk::trace::SdkTracerProvider,
}
pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
directive.into().parse().map_err(From::from)
@@ -22,12 +36,73 @@ pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
Ok(filter)
}
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
/// Initialise the tracing subscriber stack.
///
/// When the `otel` feature is enabled **and** an `OtelConfig` is supplied, an
/// OTLP exporter layer is added and the returned `OtelGuard` must be used to
/// flush pending spans on shutdown.
#[cfg(feature = "otel")]
pub(crate) fn setup_tracing_logger(
otel: Option<OtelConfig>,
) -> anyhow::Result<Option<OtelGuard>> {
let stderr_layer =
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
let console_layer = console_subscriber::spawn();
if let Some(otel_config) = otel {
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
&otel_config.service_name,
&otel_config.endpoint,
).map_err(|e| anyhow::anyhow!("failed to initialise OpenTelemetry: {e}"))?;
tracing_subscriber::registry()
.with(console_layer)
.with(stderr_layer)
.with(otel_layer)
.init();
Ok(Some(OtelGuard { provider }))
} else {
tracing_subscriber::registry()
.with(console_layer)
.with(stderr_layer)
.init();
Ok(None)
}
} else {
if let Some(otel_config) = otel {
let (otel_layer, provider) = nym_bin_common::logging::init_otel_layer(
&otel_config.service_name,
&otel_config.endpoint,
).map_err(|e| anyhow::anyhow!("failed to initialise OpenTelemetry: {e}"))?;
tracing_subscriber::registry()
.with(stderr_layer)
.with(otel_layer)
.init();
Ok(Some(OtelGuard { provider }))
} else {
tracing_subscriber::registry()
.with(stderr_layer)
.init();
Ok(None)
}
}}
}
/// Non-OTel variant -- identical subscriber stack without the OTLP layer.
#[cfg(not(feature = "otel"))]
pub(crate) fn setup_tracing_logger(otel: Option<OtelConfig>) -> anyhow::Result<()> {
let _ = otel;
let stderr_layer =
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
let console_layer = console_subscriber::spawn();
tracing_subscriber::registry()
+219 -32
View File
@@ -20,7 +20,7 @@ use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
use tracing::{debug, error, instrument, trace, warn};
use tracing::{debug, error, instrument, trace, warn, Span};
struct PendingReplayCheckPackets {
// map of rotation id used for packet creation to the packets
@@ -51,6 +51,10 @@ impl PendingReplayCheckPackets {
.push(packet.packet)
}
fn total_count(&self) -> usize {
self.packets.values().map(|v| v.len()).sum()
}
fn replay_tags(&self) -> HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>> {
let mut replay_tags = HashMap::with_capacity(self.packets.len());
'outer: for (rotation_id, packets) in &self.packets {
@@ -130,20 +134,54 @@ impl ConnectionHandler {
Some(now + delay)
}
#[instrument(
name = "mixnode.forward_packet",
skip(self, mix_packet, delay),
level = "debug",
fields(
remote_addr = %self.remote_address,
delay_ms,
)
)]
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
if !self.shared.processing_config.forward_hop_processing_enabled {
trace!("this nym-node does not support forward hop packets");
warn!(
event = "packet.dropped.forward_disabled",
remote_addr = %self.remote_address,
"dropping packet: forward hop processing disabled"
);
self.shared.dropped_forward_packet(self.remote_address.ip());
return;
}
let forward_instant = self.create_delay_target(now, delay);
Span::current().record(
"delay_ms",
forward_instant
.map(|i| i.saturating_duration_since(now).as_millis() as u64)
.unwrap_or(0),
);
self.shared.forward_mix_packet(mix_packet, forward_instant);
}
#[instrument(
name = "mixnode.final_hop",
skip(self, final_hop_data),
level = "debug",
fields(
remote_addr = %self.remote_address,
client_online,
disk_fallback = false,
ack_forwarded = false,
)
)]
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop) {
if !self.shared.processing_config.final_hop_processing_enabled {
trace!("this nym-node does not support final hop packets");
warn!(
event = "packet.dropped.final_hop_disabled",
remote_addr = %self.remote_address,
"dropping packet: final hop processing disabled"
);
self.shared
.dropped_final_hop_packet(self.remote_address.ip());
return;
@@ -151,11 +189,11 @@ impl ConnectionHandler {
let client = final_hop_data.destination;
let message = final_hop_data.message;
let has_ack = final_hop_data.forward_ack.is_some();
// if possible attempt to push message directly to the client
match self.shared.try_push_message_to_client(client, message) {
Err(unsent_plaintext) => {
// if that failed, store it on disk (to be 🔥 soon...)
Span::current().record("client_online", false);
match self
.shared
.store_processed_packet_payload(client, unsent_plaintext)
@@ -163,6 +201,7 @@ impl ConnectionHandler {
{
Err(err) => error!("Failed to store client data - {err}"),
Ok(_) => {
Span::current().record("disk_fallback", true);
self.shared
.metrics
.mixnet
@@ -172,13 +211,16 @@ impl ConnectionHandler {
}
}
}
Ok(_) => trace!("Pushed received packet to {client}"),
Ok(_) => {
Span::current().record("client_online", true);
trace!("Pushed received packet to {client}");
}
}
// if we managed to either push message directly to the [online] client or store it at
// its inbox, it means that it must exist at this gateway, hence we can send the
// received ack back into the network
self.shared.forward_ack_packet(final_hop_data.forward_ack);
if has_ack {
Span::current().record("ack_forwarded", true);
}
}
fn within_deferral_threshold(&self, now: Instant) -> bool {
@@ -206,32 +248,56 @@ impl ConnectionHandler {
if !time_threshold {
warn!(
"{}: time failure - {}",
event = "replay_detection.deferral_exceeded",
threshold_type = "time",
deferred_count = self.pending_packets.total_count(),
deferral_ms = now.saturating_duration_since(self.pending_packets.last_acquired_mutex).as_millis() as u64,
remote_addr = %self.remote_address,
"{}: time deferral threshold exceeded with {} pending packets",
self.remote_address,
self.pending_packets.packets.len()
self.pending_packets.total_count()
)
}
if !count_threshold {
warn!("{}, count failure", self.remote_address)
warn!(
event = "replay_detection.deferral_exceeded",
threshold_type = "count",
deferred_count = self.pending_packets.total_count(),
remote_addr = %self.remote_address,
"{}: count deferral threshold exceeded",
self.remote_address
)
}
time_threshold && count_threshold
}
#[instrument(
name = "mixnode.sphinx_partial_unwrap",
skip(self, packet),
level = "debug",
fields(
key_rotation,
unwrap_result,
)
)]
fn try_partially_unwrap_packet(
&self,
packet: FramedNymPacket,
) -> Result<PartialyUnwrappedPacketWithKeyRotation, PacketProcessingError> {
// based on the received sphinx key rotation information,
// attempt to choose appropriate key for processing the packet
match packet.header().key_rotation {
let rotation_label = match packet.header().key_rotation {
SphinxKeyRotation::Unknown => "unknown",
SphinxKeyRotation::OddRotation => "odd",
SphinxKeyRotation::EvenRotation => "even",
};
Span::current().record("key_rotation", rotation_label);
let result = match packet.header().key_rotation {
SphinxKeyRotation::Unknown => {
let primary = self.shared.sphinx_keys.primary();
let primary_rotation = primary.rotation_id();
// we have to try both keys, start with the primary as it has higher likelihood of being correct
// if let Ok(partially_unwrapped) = PartiallyUnwrappedPacket::new()
match PartiallyUnwrappedPacket::new(packet, primary.inner().as_ref()) {
Ok(unwrapped_packet) => {
Ok(unwrapped_packet.with_key_rotation(primary_rotation))
@@ -250,6 +316,12 @@ impl ConnectionHandler {
}
SphinxKeyRotation::OddRotation => {
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "odd",
remote_addr = %self.remote_address,
"dropping packet: odd key rotation expired"
);
return Err(PacketProcessingError::ExpiredKey);
};
let odd_rotation = odd_key.rotation_id();
@@ -259,6 +331,12 @@ impl ConnectionHandler {
}
SphinxKeyRotation::EvenRotation => {
let Some(even_key) = self.shared.sphinx_keys.even() else {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "even",
remote_addr = %self.remote_address,
"dropping packet: even key rotation expired"
);
return Err(PacketProcessingError::ExpiredKey);
};
let even_rotation = even_key.rotation_id();
@@ -266,7 +344,10 @@ impl ConnectionHandler {
.map_err(|(_, err)| err)
.map(|p| p.with_key_rotation(even_rotation))
}
}
};
Span::current().record("unwrap_result", if result.is_ok() { "ok" } else { "err" });
result
}
async fn handle_received_packet_with_replay_detection(
@@ -280,6 +361,12 @@ impl ConnectionHandler {
Ok(unwrapped) => unwrapped,
Err(err) => {
trace!("failed to process received mix packet: {err}");
warn!(
event = "packet.dropped.malformed",
error = %err,
remote_addr = %self.remote_address,
"dropping malformed packet"
);
self.shared
.metrics
.mixnet
@@ -316,7 +403,9 @@ impl ConnectionHandler {
// 3. forward the packet to the relevant sink (if enabled)
match unwrapped_packet {
Err(err) => trace!("failed to process received mix packet: {err}"),
Err(err) => {
trace!("failed to process received mix packet: {err}");
}
Ok(processed_packet) => match processed_packet.processing_data {
MixProcessingResultData::ForwardHop { packet, delay } => {
self.handle_forward_packet(now, packet, delay);
@@ -334,6 +423,7 @@ impl ConnectionHandler {
packets: HashMap<u32, Vec<PartiallyUnwrappedPacket>>,
replay_check_results: HashMap<u32, Vec<bool>>,
) {
let mut replays_detected: u64 = 0;
for (rotation_id, packets) in packets {
let Some(replay_checks) = replay_check_results.get(&rotation_id) else {
// this should never happen, but if we messed up, and it does, don't panic, just drop the packets
@@ -342,6 +432,13 @@ impl ConnectionHandler {
};
for (packet, &replayed) in packets.into_iter().zip(replay_checks) {
let unwrapped_packet = if replayed {
replays_detected += 1;
warn!(
event = "packet.dropped.replay",
remote_addr = %self.remote_address,
rotation_id,
"dropping replayed packet"
);
Err(PacketProcessingError::PacketReplay)
} else {
packet.finalise_unwrapping()
@@ -350,6 +447,13 @@ impl ConnectionHandler {
self.handle_unwrapped_packet(now, unwrapped_packet).await;
}
}
if replays_detected > 0 {
debug!(
replays_detected,
remote_addr = %self.remote_address,
"replay detection batch completed with replays"
);
}
}
async fn handle_pending_packets_batch_no_locking(&mut self, now: Instant) -> bool {
@@ -379,13 +483,26 @@ impl ConnectionHandler {
true
}
#[instrument(
name = "mixnode.replay_check_batch",
skip(self),
level = "debug",
fields(
batch_size,
mutex_wait_ms,
)
)]
async fn handle_pending_packets_batch(&mut self, now: Instant) {
let batch_size = self.pending_packets.total_count();
Span::current().record("batch_size", batch_size as u64);
let batch = self.pending_packets.reset(now);
let replay_tags = self.pending_packets.replay_tags();
if replay_tags.is_empty() {
return;
}
let mutex_start = Instant::now();
let Ok(replay_check_results) = self
.shared
.replay_protection_filter
@@ -396,32 +513,56 @@ impl ConnectionHandler {
self.shared.shutdown_token.cancel();
return;
};
Span::current().record(
"mutex_wait_ms",
mutex_start.elapsed().as_millis() as u64,
);
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
.await;
}
#[instrument(
name = "mixnode.sphinx_full_unwrap",
skip(self, packet),
level = "debug",
fields(key_rotation)
)]
fn try_full_unwrap_packet(
&self,
packet: FramedNymPacket,
) -> Result<MixProcessingResult, PacketProcessingError> {
// based on the received sphinx key rotation information,
// attempt to choose appropriate key for processing the packet
// NOTE: due to the function signatures, outfox packets will **only** attempt primary key
// if no rotation information is available (but that's fine given outfox is not really in use,
// and by the time we need it, the rotation info should be present)
let rotation_label = match packet.header().key_rotation {
SphinxKeyRotation::Unknown => "unknown",
SphinxKeyRotation::OddRotation => "odd",
SphinxKeyRotation::EvenRotation => "even",
};
Span::current().record("key_rotation", rotation_label);
match packet.header().key_rotation {
SphinxKeyRotation::Unknown => {
process_framed_packet(packet, self.shared.sphinx_keys.primary().inner().as_ref())
}
SphinxKeyRotation::OddRotation => {
let Some(odd_key) = self.shared.sphinx_keys.odd() else {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "odd",
remote_addr = %self.remote_address,
"dropping packet: odd key rotation expired"
);
return Err(PacketProcessingError::ExpiredKey);
};
process_framed_packet(packet, odd_key.inner().as_ref())
}
SphinxKeyRotation::EvenRotation => {
let Some(even_key) = self.shared.sphinx_keys.even() else {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "even",
remote_addr = %self.remote_address,
"dropping packet: even key rotation expired"
);
return Err(PacketProcessingError::ExpiredKey);
};
process_framed_packet(packet, even_key.inner().as_ref())
@@ -456,23 +597,36 @@ impl ConnectionHandler {
}
#[instrument(
skip(self),
name = "mixnode.connection",
skip(self, socket),
level = "debug",
fields(
remote = %self.remote_address
remote = %self.remote_address,
noise_handshake_ms,
)
)]
pub(crate) async fn handle_connection(&mut self, socket: TcpStream) {
let handshake_start = Instant::now();
let noise_stream = match upgrade_noise_responder(socket, &self.shared.noise_config).await {
Ok(noise_stream) => noise_stream,
Err(err) => {
error!(
"Failed to perform Noise handshake with {:?} - {err}",
self.remote_address
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
warn!(
event = "connection.failed.noise",
remote_addr = %self.remote_address,
error = %err,
"Noise responder handshake failed"
);
return;
}
};
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
debug!(
"Noise responder handshake completed for {:?}",
self.remote_address
@@ -481,26 +635,58 @@ impl ConnectionHandler {
.await
}
#[instrument(
name = "mixnode.stream",
skip(self, mixnet_connection),
level = "debug",
fields(
remote = %self.remote_address,
packets_processed = 0u64,
exit_reason,
)
)]
pub(crate) async fn handle_stream(
&mut self,
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
) {
let mut packets_processed: u64 = 0;
loop {
tokio::select! {
biased;
_ = self.shared.shutdown_token.cancelled() => {
trace!("connection handler: received shutdown");
Span::current().record("exit_reason", "shutdown");
break
}
maybe_framed_nym_packet = mixnet_connection.next() => {
match maybe_framed_nym_packet {
Some(Ok(packet)) => self.handle_received_nym_packet(packet).await,
Some(Ok(packet)) => {
self.handle_received_nym_packet(packet).await;
packets_processed += 1;
if packets_processed % 10000 == 0 {
Span::current().record("packets_processed", packets_processed);
}
}
Some(Err(err)) => {
debug!("connection got corrupted with: {err}");
warn!(
event = "connection.corrupted",
remote_addr = %self.remote_address,
error = %err,
packets_processed,
"connection stream corrupted"
);
Span::current().record("exit_reason", "corrupted");
Span::current().record("packets_processed", packets_processed);
return
}
None => {
debug!("connection got closed by the remote");
debug!(
remote_addr = %self.remote_address,
packets_processed,
"connection closed by remote"
);
Span::current().record("exit_reason", "closed_by_remote");
Span::current().record("packets_processed", packets_processed);
return
}
}
@@ -508,6 +694,7 @@ impl ConnectionHandler {
}
}
Span::current().record("packets_processed", packets_processed);
debug!("exiting and closing connection");
}
}
@@ -55,12 +55,17 @@ impl<C, F> PacketForwarder<C, F> {
if let Err(err) = self.mixnet_client.send_without_response(packet) {
if err.kind() == io::ErrorKind::WouldBlock {
// we only know for sure if we dropped a packet if our sending queue was full
// in any other case the connection might still be re-established (or created for the first time)
// and the packet might get sent, but we won't know about it
warn!(
event = "packet.dropped.buffer_full",
next_hop = %next_hop,
"dropping packet: egress connection buffer full (WouldBlock)"
);
self.metrics.mixnet.egress_dropped_forward_packet(next_hop)
} else if err.kind() == io::ErrorKind::NotConnected {
// let's give the benefit of the doubt and assume we manage to establish connection
debug!(
next_hop = %next_hop,
"packet queued for not-yet-connected peer"
);
self.metrics.mixnet.egress_sent_forward_packet(next_hop)
}
} else {
@@ -86,7 +91,11 @@ impl<C, F> PacketForwarder<C, F> {
let next_hop = new_packet.packet.next_hop();
if !self.routing_filter.should_route(next_hop.as_ref().ip()) {
debug!("dropping packet as the egress address does not belong to any known node");
warn!(
event = "packet.dropped.routing_filter",
next_hop = %next_hop,
"dropping packet: egress address does not belong to any known node"
);
self.metrics
.mixnet
.egress_dropped_forward_packet(next_hop.into());
@@ -125,7 +134,7 @@ impl<C, F> PacketForwarder<C, F> {
C: SendWithoutResponse,
F: RoutingFilter,
{
let mut processed = 0;
let mut processed: u64 = 0;
trace!("starting PacketForwarder");
loop {
tokio::select! {
@@ -145,11 +154,29 @@ impl<C, F> PacketForwarder<C, F> {
#[allow(clippy::unwrap_used)]
self.handle_new_packet(new_packet.unwrap());
let channel_len = self.packet_sender.len();
let delay_queue_len = self.delay_queue.len();
if processed % 1000 == 0 {
match channel_len {
n if n > 1000 => error!("there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"),
n if n > 500 => warn!("there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"),
n => trace!("there are currently {n} mix packets waiting to get forwarded"),
n if n > 1000 => error!(
event = "forwarder.queue_overload",
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"there are currently {n} mix packets waiting to get forwarded - the node seems to be significantly overloaded!"
),
n if n > 500 => warn!(
event = "forwarder.queue_high",
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"there are currently {n} mix packets waiting to get forwarded - is the node overloaded?"
),
n => trace!(
channel_depth = n,
delay_queue_depth = delay_queue_len,
packets_processed = processed,
"forwarder queue status"
),
}
}
self.update_channel_size_metric(channel_len);
+40 -4
View File
@@ -5,7 +5,8 @@ use nym_gateway::node::{
ActiveClientsStore, GatewayStorage, GatewayStorageError, InboxGatewayStorage,
};
use nym_sphinx_types::DestinationAddressBytes;
use tracing::debug;
use tokio::time::Instant;
use tracing::{debug, warn};
#[derive(Clone)]
pub(crate) struct SharedFinalHopData {
@@ -27,14 +28,34 @@ impl SharedFinalHopData {
message: Vec<u8>,
) -> Result<(), Vec<u8>> {
match self.active_clients.get_sender(client_address) {
None => Err(message),
None => {
debug!(
event = "gateway.push_to_client",
client_found = false,
send_result = "client_not_found",
"client {client_address} not found in active clients"
);
Err(message)
}
Some(sender_channel) => {
if let Err(unsent) = sender_channel.unbounded_send(vec![message]) {
warn!(
event = "gateway.push_to_client",
client_found = true,
send_result = "channel_closed",
"client {client_address} channel closed, message not delivered"
);
// the unwrap here is fine as the original message got returned;
// plus we're only ever sending 1 message at the time (for now)
#[allow(clippy::unwrap_used)]
Err(unsent.into_inner().pop().unwrap())
} else {
debug!(
event = "gateway.push_to_client",
client_found = true,
send_result = "ok",
"pushed message to client {client_address}"
);
Ok(())
}
}
@@ -46,8 +67,23 @@ impl SharedFinalHopData {
client_address: DestinationAddressBytes,
message: Vec<u8>,
) -> Result<(), GatewayStorageError> {
let start = Instant::now();
debug!("Storing received message for {client_address} on the disk...",);
self.storage.store_message(client_address, message).await
let result = self.storage.store_message(client_address, message).await;
let store_ms = start.elapsed().as_millis() as u64;
if result.is_ok() {
debug!(
event = "gateway.disk_store",
store_ms,
"stored message for {client_address} on disk in {store_ms}ms"
);
} else {
warn!(
event = "gateway.disk_store_failed",
store_ms,
"failed to store message for {client_address} on disk after {store_ms}ms"
);
}
result
}
}
+9 -5
View File
@@ -185,16 +185,20 @@ impl SharedData {
}
pub(super) fn forward_mix_packet(&self, packet: MixPacket, delay_until: Option<Instant>) {
let has_delay = delay_until.is_some();
if self
.mixnet_forwarder
.forward_packet(PacketToForward::new(packet, delay_until))
.is_err()
&& !self.shutdown_token.is_cancelled()
{
error!(
"failed to forward sphinx packet on the channel while the process is not going through the shutdown!"
);
self.shutdown_token.cancel();
if !self.shutdown_token.is_cancelled() {
error!(
event = "forwarder.channel_send_failed",
has_delay,
"failed to forward sphinx packet on the channel while the process is not going through the shutdown!"
);
self.shutdown_token.cancel();
}
}
}