Compare commits

...

70 Commits

Author SHA1 Message Date
Floriane TUERNAL SABOTINOV 8aa5e61ba6 wip 2025-10-27 19:11:13 +01:00
Floriane TUERNAL SABOTINOV 3bda29aed7 featurized InputMessage transformation 2025-10-24 16:50:47 +02:00
Floriane TUERNAL SABOTINOV 69bba68060 wip 2025-10-22 12:14:44 +02:00
Floriane TUERNAL SABOTINOV c2a06298d1 fmt 2025-10-22 09:17:40 +02:00
Floriane TUERNAL SABOTINOV 2831f4ae55 wip 2025-10-22 09:07:07 +02:00
Floriane TUERNAL SABOTINOV 2344a43f62 rebase reconciliation 2025-10-21 15:21:55 +02:00
Floriane TUERNAL SABOTINOV 8427b09ed7 cleanup final 2025-10-21 14:18:57 +02:00
Floriane TUERNAL SABOTINOV ec09c7e807 cleanup and PR ready 2025-10-21 14:18:57 +02:00
Floriane TUERNAL SABOTINOV 6eab87fe05 refactor init tracer and use of tracing_opentelemetry span for async tracing 2025-10-21 14:18:57 +02:00
Floriane TUERNAL SABOTINOV 9c13fe534b featurized otel 2025-10-21 14:18:55 +02:00
Floriane TUERNAL SABOTINOV ad431e8b09 cleanup and add guard to keep the tracer provider alive 2025-10-21 14:16:21 +02:00
Floriane TUERNAL SABOTINOV b292c4a624 continue trace)id propagation into sphinx 2025-10-21 14:16:19 +02:00
Floriane TUERNAL SABOTINOV 9a96be3d41 add trace_id to sphinx packet 2025-10-21 14:13:38 +02:00
Floriane TUERNAL SABOTINOV 8e771c3530 otel adaptation for sphinx instrumentation 2025-10-21 14:09:04 +02:00
Floriane TUERNAL SABOTINOV 47e1e912f5 traceparent injection into headers 2025-10-21 14:09:04 +02:00
Floriane TUERNAL SABOTINOV 4116bb321a add traceparent to http headers 2025-10-21 14:09:02 +02:00
Floriane TUERNAL SABOTINOV 60cde66451 websocket otel trace handling correction 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV be37715552 wip 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV f05c5d2eae wip 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV e9cbd0aff5 add instrument to sdk send message 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 36b07b1bcf wip 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 827591631d debug node 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 682feac75e add manual spans to handle_request functions 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 28058944f2 cleanup 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV ca9b1d656c use otel span to parent tracing span 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 2955e506de add context to message handling 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 254a414c49 change context propagation method to trace_id sharing to go accros async move barrier 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 7d087996b4 wip 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV c5e66687b6 wip 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV faaafbeae4 wip 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 32d7328c7b wip 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 6ba20e7f54 context extraction 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 74c064135b parent/chil propagation explicitation 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 1fef24a6ba try debug panic when using parent span 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 2ae73b7f31 rm clone because panic 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV b93779850b propagation to parent function 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 651d0bb578 wip 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 4af1835dbd add method to extract trace_id from ContextCarrier 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV f1bf9861cd use attach() instead of creating new span 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 15f56bccb1 forget to enter span 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 26da5da44f verify existence of context 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV b9a2efe0c9 use authenticatev2 and not authenticate 2025-10-21 14:08:12 +02:00
Floriane TUERNAL SABOTINOV 8492c7161c manual propagators for context from client to node 2025-10-21 14:08:10 +02:00
Floriane TUERNAL SABOTINOV 76770fa30c wip 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV 9d774860e5 experiment do I preserve trace_id with with method 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV 7d7bebacd2 try debug with otel specific tools 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV cfe8c08ca6 print debug trace 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV 9d8c328ce1 establishing initial authentication common trace_id 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV c4543c83d5 context propagation helpers 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV 1fa615991b Revert "broken: context won't satisfy spawn async move"
This reverts commit 2b11479ad4801e1efa8ab0aca4ca577bd3f195fe.
2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV 9f3061a2eb broken: context won't satisfy spawn async move 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV 953f930c4b try debug fresh handler context 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV c2b620de92 debug surb_reply client 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV d55fce1917 try debug client gets stuck 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV bda45b405c try keep context accross async call 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV bb9f0540e8 instrumentation of nym-node to nym-gateway workflow 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV e3569ccc16 fixed trace_id distibuted from sdk to gateway 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV 5c9d58bad3 fix trace_id propagation from sdk to mix-node 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV 6c03ab69fb try debug difference trace_id otel by use async for gateway and attach it to span in surb_reply 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV 89329ab3ee add context propagation 2025-10-21 14:07:01 +02:00
Floriane TUERNAL SABOTINOV fee2ae5964 change surb_reply to test relation with nym-node + basic instrumentation 2025-10-21 14:06:59 +02:00
Floriane TUERNAL SABOTINOV f56edf63b5 fix signoz ingestion 2025-10-21 13:59:59 +02:00
Floriane TUERNAL SABOTINOV dacd35470d configure format for sending logs to signoz 2025-10-21 13:59:59 +02:00
Floriane TUERNAL SABOTINOV acc11e8776 setup otel for surb_reply 2025-10-21 13:59:59 +02:00
Floriane TUERNAL SABOTINOV a11de8eb74 fix otel panic 2025-10-21 13:59:59 +02:00
Floriane TUERNAL SABOTINOV fa603be538 setup otel from run to forward_sphinx_packet 2025-10-21 13:59:59 +02:00
Floriane TUERNAL SABOTINOV 2447520010 revert changes from previous commit 2025-10-21 13:59:59 +02:00
Mark Sinclair 5888102359 wip: sdk surb-reply example add otel tracing 2025-10-21 13:59:59 +02:00
Mark Sinclair 7d7acc2691 wip: tracing in nym-node 2025-10-21 13:59:59 +02:00
Mark Sinclair 20e6243738 add opentelemetry for debugging and testing behind otel feature flag 2025-10-21 13:59:55 +02:00
132 changed files with 2994 additions and 11003 deletions
Generated
+1106 -1188
View File
File diff suppressed because it is too large Load Diff
+8 -3
View File
@@ -300,8 +300,11 @@ nix = "0.27.1"
notify = "5.1.0"
okapi = "0.7.0"
once_cell = "1.21.3"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
opentelemetry = "0.30.0"
opentelemetry-otlp = "0.30.0"
opentelemetry-semantic-conventions = "0.30.0"
opentelemetry_sdk = "0.30.0"
opentelemetry-stdout = "0.30.0"
parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
@@ -360,8 +363,10 @@ toml = "0.8.22"
tower = "0.5.2"
tower-http = "0.5.2"
tracing = "0.1.41"
tracing-core = "0.1.33"
tracing-log = "0.2"
tracing-opentelemetry = "0.19.0"
tracing-opentelemetry = "0.31.0"
tracing-serde = "0.2.0"
tracing-subscriber = "0.3.19"
tracing-tree = "0.2.2"
tracing-indicatif = "0.3.9"
+8 -1
View File
@@ -46,7 +46,6 @@ nym-bandwidth-controller = { path = "../../common/bandwidth-controller" }
nym-bin-common = { path = "../../common/bin-common", features = [
"output_format",
"clap",
"basic_tracing",
] }
nym-client-core = { path = "../../common/client-core", features = [
"fs-credentials-storage",
@@ -71,3 +70,11 @@ nym-client-websocket-requests = { path = "websocket-requests" }
nym-id = { path = "../../common/nym-id" }
[dev-dependencies]
[features]
default = []
otel = [
"nym-client-core/otel",
"nym-bin-common/otel",
"nym-gateway-requests/otel",
]
+2 -2
View File
@@ -4,7 +4,7 @@
use std::error::Error;
use clap::{crate_name, crate_version, Parser};
use nym_bin_common::logging::{maybe_print_banner, setup_tracing_logger};
use nym_bin_common::logging::{maybe_print_banner, setup_no_otel_logger};
use nym_network_defaults::setup_env;
pub mod client;
@@ -20,7 +20,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
if !args.no_banner {
maybe_print_banner(crate_name!(), crate_version!());
}
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
if let Err(err) = commands::execute(args).await {
log::error!("{err}");
+17 -3
View File
@@ -184,7 +184,14 @@ impl Handler {
});
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
let input_msg = InputMessage::new_regular(
recipient,
message,
lane,
self.packet_type,
#[cfg(feature = "otel")]
None,
);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send message to the input buffer: {err}");
@@ -216,8 +223,15 @@ impl Handler {
TransmissionLane::ConnectionId(id)
});
let input_msg =
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
let input_msg = InputMessage::new_anonymous(
recipient,
message,
reply_surbs,
lane,
self.packet_type,
#[cfg(feature = "otel")]
None,
);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send anonymous message to the input buffer: {err}");
+4 -1
View File
@@ -27,7 +27,6 @@ zeroize = { workspace = true }
nym-bin-common = { path = "../../common/bin-common", features = [
"output_format",
"clap",
"basic_tracing",
] }
nym-client-core = { path = "../../common/client-core", features = [
"fs-credentials-storage",
@@ -54,3 +53,7 @@ nym-validator-client = { path = "../../common/client-libs/validator-client", fea
[features]
default = []
eth = []
otel = [
"nym-socks5-client-core/otel",
"nym-bin-common/otel",
]
+2 -2
View File
@@ -4,7 +4,7 @@
use std::error::Error;
use clap::{crate_name, crate_version, Parser};
use nym_bin_common::logging::{maybe_print_banner, setup_tracing_logger};
use nym_bin_common::logging::{maybe_print_banner, setup_no_otel_logger};
use nym_network_defaults::setup_env;
mod commands;
@@ -19,7 +19,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
if !args.no_banner {
maybe_print_banner(crate_name!(), crate_version!());
}
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
if let Err(err) = commands::execute(args).await {
log::error!("{err}");
+24 -14
View File
@@ -8,24 +8,30 @@ license = { workspace = true }
repository = { workspace = true }
[dependencies]
chrono = { workspace = true, optional = true }
cfg-if = { workspace = true }
clap = { workspace = true, features = ["derive"], optional = true }
clap_complete = { workspace = true, optional = true }
clap_complete_fig = { workspace = true, optional = true }
const-str = { workspace = true }
log = { workspace = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true,features=["metrics", "grpc-tonic", "tls",
"tls-webpki-roots"], optional = true }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"], optional = true }
opentelemetry-stdout = { workspace = true, features = ["trace", "metrics"], optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
schemars = { workspace = true, features = ["preserve_order"], optional = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, optional = true }
## tracing
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
tracing-tree = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
opentelemetry-jaeger = { workspace = true, features = ["rt-tokio", "collector_client", "isahc_collector_client"], optional = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tracing-core = { workspace = true }
tracing-opentelemetry = { workspace = true, optional = true }
tracing-serde = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
tracing-tree = { workspace = true }
utoipa = { workspace = true, optional = true }
opentelemetry = { workspace = true, features = ["rt-tokio"], optional = true }
[build-dependencies]
vergen = { workspace = true, features = ["build", "git", "gitcl", "rustc", "cargo"] }
@@ -35,13 +41,17 @@ default = []
openapi = ["utoipa"]
output_format = ["serde_json", "dep:clap"]
bin_info_schema = ["schemars"]
basic_tracing = ["dep:tracing", "tracing-subscriber"]
tracing = [
"basic_tracing",
"tracing-tree",
"opentelemetry-jaeger",
tokio-console = ["otel"]
otel = [
"chrono",
"tracing-opentelemetry",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry-stdout",
"opentelemetry_sdk",
"serde_json",
"rand",
]
clap = ["dep:clap", "dep:clap_complete", "dep:clap_complete_fig"]
models = []
+3
View File
@@ -4,6 +4,9 @@
pub mod build_information;
pub mod logging;
#[cfg(feature = "otel")]
pub mod opentelemetry;
#[cfg(feature = "clap")]
pub mod completions;
+20
View File
@@ -0,0 +1,20 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#[cfg(feature = "otel")]
use opentelemetry_otlp::ExporterBuildError;
#[derive(thiserror::Error, Debug)]
pub enum TracingError {
#[error("tracing logger already initialised")]
TracingLoggerAlreadyInitialised,
#[error("Logging error: {0}")]
TracingTryInitError(tracing_subscriber::util::TryInitError),
#[cfg(feature = "otel")]
#[error("{0}")]
TracingExporterBuildError(#[from] ExporterBuildError),
#[error("{0}")]
TracingFilterParseError(#[from] tracing_subscriber::filter::ParseError),
}
+39 -47
View File
@@ -1,19 +1,12 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod error;
use error::TracingError;
use serde::{Deserialize, Serialize};
use std::io::IsTerminal;
#[cfg(feature = "tracing")]
pub use opentelemetry;
#[cfg(feature = "tracing")]
pub use opentelemetry_jaeger;
#[cfg(feature = "tracing")]
pub use tracing_opentelemetry;
#[cfg(feature = "tracing")]
pub use tracing_subscriber;
#[cfg(feature = "tracing")]
pub use tracing_tree;
use tracing_subscriber::{filter::Directive, layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Debug, Default, Copy, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
@@ -22,7 +15,6 @@ pub struct LoggingSettings {
}
// don't call init so that we could attach additional layers
#[cfg(feature = "basic_tracing")]
pub fn build_tracing_logger() -> impl tracing_subscriber::layer::SubscriberExt {
use tracing_subscriber::prelude::*;
@@ -31,7 +23,6 @@ pub fn build_tracing_logger() -> impl tracing_subscriber::layer::SubscriberExt {
.with(default_tracing_env_filter())
}
#[cfg(feature = "basic_tracing")]
pub fn default_tracing_env_filter() -> tracing_subscriber::filter::EnvFilter {
if ::std::env::var("RUST_LOG").is_ok() {
tracing_subscriber::filter::EnvFilter::from_default_env()
@@ -43,7 +34,6 @@ pub fn default_tracing_env_filter() -> tracing_subscriber::filter::EnvFilter {
}
}
#[cfg(feature = "basic_tracing")]
pub fn default_tracing_fmt_layer<S, W>(
writer: W,
) -> impl tracing_subscriber::Layer<S> + Sync + Send + 'static
@@ -63,45 +53,47 @@ where
.with_target(false)
}
#[cfg(feature = "basic_tracing")]
pub fn setup_tracing_logger() {
use tracing_subscriber::util::SubscriberInitExt;
build_tracing_logger().init()
/// Creates a tracing filter that sets more granular log levels for specific crates.
/// This allows for finer control over logging verbosity.
pub(crate) fn granual_filtered_env() -> Result<tracing_subscriber::filter::EnvFilter, TracingError>
{
fn directive_checked(directive: impl Into<String>) -> Result<Directive, TracingError> {
directive.into().parse().map_err(From::from)
}
let mut filter = default_tracing_env_filter();
// these crates are more granularly filtered
let filter_crates = ["defguard_wireguard_rs"];
for crate_name in filter_crates {
filter = filter.add_directive(directive_checked(format!("{crate_name}=warn"))?);
}
Ok(filter)
}
pub fn setup_no_otel_logger() -> Result<(), TracingError> {
// Only set up if not already initialized
if tracing::dispatcher::has_been_set() {
// It shouldn't be - this is really checking that it is torn down between async command executions
return Err(TracingError::TracingLoggerAlreadyInitialised);
}
let registry = tracing_subscriber::registry()
.with(default_tracing_fmt_layer(std::io::stderr))
.with(granual_filtered_env()?);
registry
.try_init()
.map_err(|e| TracingError::TracingTryInitError(e))?;
Ok(())
}
// TODO: This has to be a macro, running it as a function does not work for the file_appender for some reason
#[cfg(feature = "tracing")]
#[macro_export]
macro_rules! setup_tracing {
($service_name: expr) => {
use nym_bin_common::logging::tracing_subscriber::layer::SubscriberExt;
use nym_bin_common::logging::tracing_subscriber::util::SubscriberInitExt;
let registry = nym_bin_common::logging::tracing_subscriber::Registry::default()
.with(nym_bin_common::logging::tracing_subscriber::EnvFilter::from_default_env())
.with(
nym_bin_common::logging::tracing_tree::HierarchicalLayer::new(4)
.with_targets(true)
.with_bracketed_fields(true),
);
let tracer = nym_bin_common::logging::opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://44.199.230.10:14268/api/traces")
.with_service_name($service_name)
.with_isahc()
.with_trace_config(
nym_bin_common::logging::opentelemetry::sdk::trace::config().with_sampler(
nym_bin_common::logging::opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(
0.1,
),
),
)
.install_batch(nym_bin_common::logging::opentelemetry::runtime::Tokio)
.expect("Could not init tracer");
let telemetry = nym_bin_common::logging::tracing_opentelemetry::layer().with_tracer(tracer);
registry.with(telemetry).init();
setup_no_otel_logger()
};
}
@@ -0,0 +1,47 @@
use opentelemetry::trace::{SpanId, TraceId};
use opentelemetry_sdk::trace::IdGenerator;
use rand::RngCore;
#[derive(Clone, Debug)]
pub struct Compact13BytesIdGenerator;
impl IdGenerator for Compact13BytesIdGenerator {
fn new_trace_id(&self) -> TraceId {
let mut rng = rand::thread_rng();
let mut bytes = [0u8; 16];
// Fill the first 13 bytes with random data
rng.fill_bytes(&mut bytes[0..12]);
// Set the last 4 bytes to zero
bytes[12] = 0;
bytes[13] = 0;
bytes[14] = 0;
bytes[15] = 0;
TraceId::from_bytes(bytes)
}
fn new_span_id(&self) -> SpanId {
let mut rng = rand::thread_rng();
let mut bytes = [0u8; 8];
rng.fill_bytes(&mut bytes);
SpanId::from_bytes(bytes)
}
}
pub fn compress_trace_id(trace_id: &TraceId) -> [u8; 12] {
let bytes = trace_id.to_bytes();
let mut compressed = [0u8; 12];
compressed.copy_from_slice(&bytes[0..12]);
compressed
}
pub fn decompress_trace_id(compressed: &[u8; 12]) -> [u8; 16] {
let mut bytes = [0u8; 16];
bytes[0..12].copy_from_slice(compressed);
bytes[12..].copy_from_slice(&[0u8; 4]);
bytes
}
@@ -0,0 +1,155 @@
use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
use opentelemetry::trace::{SpanContext, TraceContextExt, TraceId};
use opentelemetry::{Context, TraceFlags};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::IdGenerator};
use std::collections::HashMap;
use std::fmt::Display;
use tracing::instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;
/// Make a Carrier for context propagation
pub struct ContextCarrier {
data: HashMap<String, String>,
}
impl ContextCarrier {
pub fn new_empty() -> Self {
ContextCarrier {
data: HashMap::new(),
}
}
pub fn new_with_data(data: HashMap<String, String>) -> Self {
if data.is_empty() {
return ContextCarrier::new_empty();
}
ContextCarrier { data }
}
pub fn new_with_current_context(context: Context) -> Self {
let mut carrier = ContextCarrier::new_empty();
let propagator = TraceContextPropagator::new();
propagator.inject_context(&context, &mut carrier);
carrier
}
pub fn iter(&self) -> impl Iterator<Item = (&String, &String)> {
self.data.iter()
}
pub fn from_map(data: HashMap<String, String>) -> Self {
ContextCarrier { data }
}
pub fn into_map(self) -> HashMap<String, String> {
self.data
}
pub fn extract_trace_id(&self) -> Option<TraceId> {
self.get("traceparent").and_then(|tp| {
let parts: Vec<&str> = tp.split('-').collect();
if parts.len() == 4 {
TraceId::from_hex(parts[1]).ok()
} else {
None
}
})
}
pub fn extract_trace_id_into_bytes(&self) -> Option<[u8; 16]> {
self.extract_trace_id().map(|id| id.to_bytes())
}
pub fn extract_traceparent(&self) -> Option<String> {
self.get("traceparent").map(|s| s.to_string())
}
}
impl Injector for ContextCarrier {
fn set(&mut self, key: &str, value: String) {
self.data.insert(key.to_string(), value);
}
}
impl Extractor for ContextCarrier {
fn get(&self, key: &str) -> Option<&str> {
self.data.get(key).map(|s| s.as_str())
}
fn keys(&self) -> Vec<&str> {
self.data.keys().map(|k| k.as_str()).collect()
}
}
impl Display for ContextCarrier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.data)
}
}
pub struct ManualContextPropagator {
pub root_span: tracing::Span,
pub trace_id: TraceId,
}
impl ManualContextPropagator {
#[instrument(skip_all, level = "debug")]
pub fn new(name: &str, context: HashMap<String, String>) -> Self {
let carrier = ContextCarrier::new_with_data(context);
let trace_id = match carrier.extract_trace_id() {
Some(id) => id,
None => Context::current().span().span_context().trace_id(),
};
let root_span_builder = new_span_context_with_id(trace_id.clone());
let root_span = tracing::info_span!("trace_root", name = %name, trace_id = %trace_id);
root_span.set_parent(root_span_builder);
ManualContextPropagator {
root_span,
trace_id,
}
}
#[instrument(skip_all, level = "debug")]
pub fn new_from_tid(name: &str, trace_id: TraceId) -> Self {
let root_span_builder = new_span_context_with_id(trace_id.clone());
let root_span = tracing::info_span!("trace_root", name = %name, trace_id = %trace_id);
root_span.set_parent(root_span_builder);
ManualContextPropagator {
root_span,
trace_id,
}
}
pub fn root_span(&self) -> &tracing::Span {
&self.root_span
}
}
#[instrument(skip_all, level = "debug")]
pub fn new_span_context_with_id(trace_id: TraceId) -> Context {
let id_gen = opentelemetry_sdk::trace::RandomIdGenerator::default();
let span_id = id_gen.new_span_id();
let span_context = SpanContext::new(
trace_id,
span_id,
TraceFlags::SAMPLED,
true,
Default::default(),
);
Context::current().with_remote_span_context(span_context)
}
#[instrument(skip_all, level = "debug")]
pub fn extract_trace_id_from_tracing_cx() -> TraceId {
let cx = tracing::Span::current().context();
let binding = cx.span();
let trace_id = binding.span_context().trace_id();
trace_id
}
+186
View File
@@ -0,0 +1,186 @@
pub mod compact_id_generator;
pub mod context;
mod trace_id_format;
use tracing::{Level, info};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::fmt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use crate::logging::default_tracing_env_filter;
use crate::logging::error::TracingError;
use crate::opentelemetry::compact_id_generator::Compact13BytesIdGenerator;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{KeyValue, global};
use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig;
use opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::{Resource, trace::Sampler};
use opentelemetry_semantic_conventions::SCHEMA_URL;
use opentelemetry_semantic_conventions::resource::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_VERSION};
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::fmt::format::FmtSpan;
pub struct TracerProviderGuard(Option<SdkTracerProvider>);
impl Drop for TracerProviderGuard {
fn drop(&mut self) {
if let Some(tracer_provider) = self.0.take() {
// Ensure all spans are flushed before exit
if let Err(e) = tracer_provider.shutdown() {
eprintln!("Error shutting down tracer provider: {:?}", e);
}
}
}
}
pub(crate) fn granual_filtered_env() -> Result<tracing_subscriber::filter::EnvFilter, TracingError>
{
fn directive_checked(directive: impl Into<String>) -> Result<Directive, TracingError> {
directive.into().parse().map_err(From::from)
}
let mut filter = default_tracing_env_filter();
// these crates are more granularly filtered
let filter_crates = ["defguard_wireguard_rs"];
for crate_name in filter_crates {
filter = filter.add_directive(directive_checked(format!("{crate_name}=warn"))?);
}
Ok(filter)
}
pub fn setup_tracing_logger(service_name: String) -> Result<TracerProviderGuard, TracingError> {
if tracing::dispatcher::has_been_set() {
// It shouldn't be - this is really checking that it is torn down between async command executions
return Err(TracingError::TracingLoggerAlreadyInitialised);
}
// define ingestion points
let endpoint = std::env::var("SIGNOZ_ENDPOINT").expect("SIGNOZ_ENDPOINT not set");
let key = std::env::var("SIGNOZ_INGESTION_KEY").expect("SIGNOZ_INGESTION_KEY not set");
let mut metadata = MetadataMap::new();
metadata.insert(
"signoz-ingestion-key",
key.parse().expect("Could not parse signoz ingestion key"),
);
// Build resources
let resource = build_resource(&service_name);
// Initialize tracer and meter providers
let tracer_provider = init_tracer_provider(&endpoint, metadata.clone(), resource.clone())?;
let meter_provider = init_meter_provider(&endpoint, metadata.clone(), resource.clone())?;
// Bridge tracing and opentelemetry
let tracer = tracer_provider.tracer("otel-subscriber");
let fmt_layer = fmt::layer()
.json()
.with_writer(std::io::stderr)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_span_list(false)
.with_current_span(true)
.event_format(trace_id_format::TraceIdFormat);
let registry = tracing_subscriber::registry()
.with(fmt_layer)
.with(granual_filtered_env()?)
.with(tracing_subscriber::filter::LevelFilter::from_level(
Level::INFO,
))
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer));
registry
.try_init()
.map_err(TracingError::TracingTryInitError)?;
global::set_tracer_provider(tracer_provider.clone());
global::set_meter_provider(meter_provider.clone());
info!("Tracing initialized with service name: {}", service_name);
Ok(TracerProviderGuard(Some(tracer_provider)))
}
fn build_resource(service_name: &str) -> Resource {
Resource::builder()
.with_service_name(service_name.to_string())
.with_schema_url(
[
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
],
SCHEMA_URL,
)
.build()
}
fn init_tracer_provider(
endpoint: &str,
metadata: MetadataMap,
resource: Resource,
) -> Result<SdkTracerProvider, TracingError> {
let mut exporter_builder = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_metadata(metadata)
.with_endpoint(endpoint);
if endpoint.starts_with("https://") {
exporter_builder =
exporter_builder.with_tls_config(ClientTlsConfig::new().with_enabled_roots());
}
let exporter = exporter_builder.build()?;
let tracer = SdkTracerProvider::builder()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
1.0,
))))
.with_id_generator(Compact13BytesIdGenerator)
.with_resource(resource)
.with_batch_exporter(exporter)
.build();
global::set_tracer_provider(tracer.clone());
Ok(tracer)
}
fn init_meter_provider(
endpoint: &str,
metadata: MetadataMap,
resource: Resource,
) -> Result<SdkMeterProvider, TracingError> {
let mut exporter_builder = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_metadata(metadata)
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default());
if endpoint.starts_with("https://") {
exporter_builder =
exporter_builder.with_tls_config(ClientTlsConfig::new().with_enabled_roots());
}
let exporter = exporter_builder.build()?;
let reader = PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(30))
.build();
let stdout_reader =
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
let meter_provider = MeterProviderBuilder::default()
.with_resource(resource)
.with_reader(reader)
.with_reader(stdout_reader)
.build();
global::set_meter_provider(meter_provider.clone());
Ok(meter_provider)
}
@@ -0,0 +1,88 @@
use chrono::Utc;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::{SpanId, TraceId};
use serde::ser::{SerializeMap, Serializer as _};
use std::io;
use tracing::{Event, Subscriber};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_serde::AsSerde;
use tracing_serde::fields::AsMap;
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields};
use tracing_subscriber::registry::LookupSpan;
pub struct WriteAdaptor<'a> {
fmt_write: &'a mut dyn std::fmt::Write,
}
impl<'a> WriteAdaptor<'a> {
pub fn new(fmt_write: &'a mut dyn std::fmt::Write) -> Self {
Self { fmt_write }
}
}
impl<'a> io::Write for WriteAdaptor<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let s =
std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
self.fmt_write
.write_str(s)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(s.as_bytes().len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub struct TraceIdFormat;
impl<S, N> FormatEvent<S, N> for TraceIdFormat
where
S: Subscriber + for<'lookup> LookupSpan<'lookup>,
N: for<'writer> FormatFields<'writer> + 'static,
{
fn format_event(
&self,
_ctx: &FmtContext<'_, S, N>,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> std::fmt::Result
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let meta = event.metadata();
let mut visit = || {
let mut serializer = serde_json::Serializer::new(WriteAdaptor::new(&mut writer));
let mut serializer = serializer.serialize_map(None)?;
serializer.serialize_entry("timestamp", &Utc::now().to_rfc3339())?;
serializer.serialize_entry("level", &meta.level().as_serde())?;
serializer.serialize_entry("fields", &event.field_map())?;
serializer.serialize_entry("target", meta.target())?;
let current_span = tracing::Span::current();
let context = current_span.context();
let span_ref = context.span();
let span_context = span_ref.span_context();
let trace_id = span_context.trace_id();
if trace_id != TraceId::INVALID {
serializer.serialize_entry("trace_id", &trace_id.to_string())?;
let span_id = span_context.span_id();
if span_id != SpanId::INVALID {
serializer.serialize_entry("span_id", &span_id.to_string())?;
}
}
serializer.end()
};
visit().map_err(|_| std::fmt::Error)?;
writeln!(writer)
}
}
+1
View File
@@ -126,6 +126,7 @@ cli = ["clap", "comfy-table"]
fs-credentials-storage = ["nym-credential-storage/persistent-storage"]
fs-surb-storage = ["nym-client-core-surb-storage/fs-surb-storage"]
fs-gateways-storage = ["nym-client-core-gateways-storage/fs-gateways-storage"]
otel = ["nym-sphinx/otel"]
wasm = ["nym-gateway-client/wasm"]
metrics-server = []
@@ -67,6 +67,7 @@ use std::path::Path;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use url::Url;
#[cfg(target_arch = "wasm32")]
@@ -121,6 +122,7 @@ pub struct ClientOutput {
}
impl ClientOutput {
#[instrument(name = "ClientOutput::register_receiver", skip_all)]
pub fn register_receiver(
&mut self,
) -> Result<mpsc::UnboundedReceiver<Vec<ReconstructedMessage>>, ClientCoreError> {
@@ -384,6 +386,7 @@ where
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
fn start_real_traffic_controller(
controller_config: real_messages_control::Config,
key_rotation_config: KeyRotationConfig,
@@ -475,6 +478,7 @@ where
// buffer controlling all messages fetched from provider
// required so that other components would be able to use them (say the websocket)
#[instrument(skip_all)]
fn start_received_messages_buffer_controller(
local_encryption_keypair: Arc<x25519::KeyPair>,
query_receiver: ReceivedBufferRequestReceiver,
@@ -507,6 +511,7 @@ where
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn start_gateway_client(
config: &Config,
initialisation_result: InitialisationResult,
@@ -613,6 +618,7 @@ where
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn setup_gateway_transceiver(
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
config: &Config,
@@ -770,7 +776,7 @@ where
shutdown_tracker,
)
}
#[instrument(skip_all)]
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown_tracker: &ShutdownTracker,
@@ -887,6 +893,7 @@ where
Ok(client.get_key_rotation_info().await?.into())
}
#[instrument(skip_all)]
pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
where
S::ReplyStore: Send + Sync,
@@ -29,6 +29,8 @@ pub enum InputMessage {
data: Vec<u8>,
lane: TransmissionLane,
max_retransmissions: Option<u32>,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
},
/// Creates a message used for a duplex anonymous communication where the recipient
@@ -45,6 +47,8 @@ pub enum InputMessage {
reply_surbs: u32,
lane: TransmissionLane,
max_retransmissions: Option<u32>,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
},
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
@@ -90,12 +94,16 @@ impl InputMessage {
data: Vec<u8>,
lane: TransmissionLane,
packet_type: Option<PacketType>,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) -> Self {
let message = InputMessage::Regular {
recipient,
data,
lane,
max_retransmissions: None,
#[cfg(feature = "otel")]
trace_id,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -110,6 +118,8 @@ impl InputMessage {
reply_surbs: u32,
lane: TransmissionLane,
packet_type: Option<PacketType>,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) -> Self {
let message = InputMessage::Anonymous {
recipient,
@@ -117,6 +127,8 @@ impl InputMessage {
reply_surbs,
lane,
max_retransmissions: None,
#[cfg(feature = "otel")]
trace_id,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -185,4 +197,14 @@ impl InputMessage {
self.set_max_retransmissions(max_retransmissions);
self
}
#[cfg(feature = "otel")]
pub fn trace_id(&self) -> Option<[u8; 12]> {
match self {
InputMessage::Regular { trace_id, .. } => *trace_id,
InputMessage::Anonymous { trace_id, .. } => *trace_id,
InputMessage::Premade { .. } | InputMessage::Reply { .. } => None,
InputMessage::MessageWrapper { message, .. } => message.trace_id(),
}
}
}
@@ -70,6 +70,7 @@ where
.send_reply(recipient_tag, data, lane, max_retransmissions);
}
#[instrument(skip_all)]
async fn handle_plain_message(
&mut self,
recipient: Recipient,
@@ -77,16 +78,27 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) {
if let Err(err) = self
.message_handler
.try_send_plain_message(recipient, content, lane, packet_type, max_retransmissions)
.try_send_plain_message(
recipient,
content,
lane,
packet_type,
max_retransmissions,
#[cfg(feature = "otel")]
trace_id,
)
.await
{
warn!("failed to send a plain message - {err}")
}
}
#[instrument(skip_all)]
async fn handle_repliable_message(
&mut self,
recipient: Recipient,
@@ -95,6 +107,8 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) {
if let Err(err) = self
.message_handler
@@ -105,6 +119,8 @@ where
lane,
packet_type,
max_retransmissions,
#[cfg(feature = "otel")]
trace_id,
)
.await
{
@@ -113,20 +129,36 @@ where
}
#[allow(clippy::panic)]
#[instrument(skip_all)]
async fn on_input_message(&mut self, msg: InputMessage) {
#[cfg(feature = "otel")]
let trace_id = msg.trace_id();
#[cfg(feature = "otel")]
if let Some(tid) = trace_id {
tracing::info!("Processing input message with trace_id: {:?}", tid);
}
match msg {
InputMessage::Regular {
recipient,
data,
lane,
max_retransmissions,
..
} => {
#[cfg(feature = "otel")]
info!(
"Handling regular input message with trace_id: {:?}",
trace_id
);
self.handle_plain_message(
recipient,
data,
lane,
PacketType::Mix,
max_retransmissions,
#[cfg(feature = "otel")]
trace_id,
)
.await
}
@@ -136,7 +168,13 @@ where
reply_surbs,
lane,
max_retransmissions,
..
} => {
#[cfg(feature = "otel")]
tracing::info!(
"Handling anonymous input message with trace_id: {:?}",
trace_id
);
self.handle_repliable_message(
recipient,
data,
@@ -144,6 +182,8 @@ where
lane,
PacketType::Mix,
max_retransmissions,
#[cfg(feature = "otel")]
trace_id,
)
.await
}
@@ -153,6 +193,8 @@ where
lane,
max_retransmissions,
} => {
#[cfg(feature = "otel")]
info!("Handling reply input message with trace_id: {:?}", trace_id);
self.handle_reply(recipient_tag, data, lane, max_retransmissions)
.await;
}
@@ -166,13 +208,21 @@ where
data,
lane,
max_retransmissions,
..
} => {
#[cfg(feature = "otel")]
tracing::info!(
"Handling regular input message with trace_id: {:?}",
trace_id
);
self.handle_plain_message(
recipient,
data,
lane,
packet_type,
max_retransmissions,
#[cfg(feature = "otel")]
trace_id,
)
.await
}
@@ -182,6 +232,7 @@ where
reply_surbs,
lane,
max_retransmissions,
..
} => {
self.handle_repliable_message(
recipient,
@@ -190,6 +241,8 @@ where
lane,
packet_type,
max_retransmissions,
#[cfg(feature = "otel")]
trace_id,
)
.await
}
@@ -213,6 +266,7 @@ where
};
}
#[instrument(skip_all)]
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started InputMessageListener with graceful shutdown support");
@@ -60,7 +60,13 @@ where
// TODO: Figure out retransmission packet type signaling
self.message_handler
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type)
.try_prepare_single_chunk_for_sending(
packet_recipient,
chunk_data,
packet_type,
#[cfg(feature = "otel")]
None
)
.await
}
@@ -27,7 +27,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn};
// TODO: move that error elsewhere since it seems to be contaminating different files
#[derive(Debug, Error)]
@@ -476,6 +476,7 @@ where
self.forward_messages(msgs, lane).await;
}
#[instrument(skip_all)]
pub(crate) async fn try_send_plain_message(
&mut self,
recipient: Recipient,
@@ -483,6 +484,8 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) -> Result<(), PreparationError> {
let message = NymMessage::new_plain(message);
self.try_split_and_send_non_reply_message(
@@ -491,10 +494,13 @@ where
lane,
packet_type,
max_retransmissions,
#[cfg(feature = "otel")]
trace_id,
)
.await
}
#[instrument(skip_all)]
pub(crate) async fn try_split_and_send_non_reply_message(
&mut self,
message: NymMessage,
@@ -502,6 +508,8 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) -> Result<(), PreparationError> {
debug!("Sending non-reply message with packet type {packet_type}");
// TODO: I really dislike existence of this assertion, it implies code has to be re-organised
@@ -534,6 +542,8 @@ where
&self.config.ack_key,
&recipient,
packet_type,
#[cfg(feature = "otel")]
trace_id,
)?;
let real_message = RealMessage::new(
@@ -585,6 +595,8 @@ where
TransmissionLane::AdditionalReplySurbs,
packet_type,
max_retransmissions,
#[cfg(feature = "otel")]
None,
)
.await?;
@@ -602,6 +614,8 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) -> Result<(), SurbWrappedPreparationError> {
debug!("Sending message with reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -625,6 +639,8 @@ where
lane,
packet_type,
max_retransmissions,
#[cfg(feature = "otel")]
trace_id,
)
.await?;
@@ -639,6 +655,8 @@ where
recipient: Recipient,
chunk: Fragment,
packet_type: PacketType,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) -> Result<PreparedFragment, PreparationError> {
debug!("Sending single chunk with packet type {packet_type}");
let topology_permit = self.topology_access.get_read_permit().await;
@@ -650,6 +668,8 @@ where
&self.config.ack_key,
&recipient,
packet_type,
#[cfg(feature = "otel")]
trace_id,
)?;
Ok(prepared_fragment)
@@ -80,6 +80,8 @@ impl StatisticsControl {
stats_report.into(),
TransmissionLane::General,
None,
#[cfg(feature = "otel")]
None,
);
if let Err(err) = self.report_tx.send(report_message).await {
tracing::error!("Failed to report client stats: {err:?}");
@@ -1043,6 +1043,12 @@ impl<C, St> GatewayClient<C, St> {
}
// Note: this requires prior authentication
#[instrument(skip_all,
fields(
gateway = %self.gateway_identity,
gateway_address = %self.gateway_address
)
)]
pub fn start_listening_for_mixnet_messages(&mut self) -> Result<(), GatewayClientError> {
if !self.authenticated {
return Err(GatewayClientError::NotAuthenticated);
@@ -267,6 +267,7 @@ impl Client {
}
impl SendWithoutResponse for Client {
#[instrument(skip(self, packet))]
fn send_without_response(&self, packet: MixPacket) -> io::Result<()> {
let address = packet.next_hop_address();
trace!("Sending packet to {address}");
+15 -1
View File
@@ -19,11 +19,11 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true, features = ["log"] }
time = { workspace = true }
subtle = { workspace = true }
zeroize = { workspace = true }
nym-bin-common = { path = "../bin-common" }
nym-crypto = { path = "../crypto", features = ["aead", "hashing"] }
nym-pemstore = { path = "../pemstore" }
nym-sphinx = { path = "../nymsphinx" }
@@ -34,6 +34,11 @@ nym-task = { path = "../task" }
nym-credentials = { path = "../credentials" }
nym-credentials-interface = { path = "../credentials-interface" }
opentelemetry = { workspace = true, features = ["trace"], optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
tracing-opentelemetry = { workspace = true, optional = true }
tracing = { workspace = true, features = ["std", "attributes", "tracing-attributes"] }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
workspace = true
features = ["time"]
@@ -51,3 +56,12 @@ anyhow = { workspace = true }
nym-compact-ecash = { path = "../nym_offline_compact_ecash" } # we need specific imports in tests
nym-test-utils = { path = "../test-utils" }
tokio = { workspace = true, features = ["full"] }
[features]
default = []
otel = [
"nym-bin-common/otel",
"opentelemetry",
"opentelemetry_sdk",
"tracing-opentelemetry",
]
@@ -4,6 +4,7 @@
use crate::{AuthenticationFailure, GatewayRequestsError, SharedGatewayKey};
use nym_crypto::asymmetric::ed25519;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::iter;
use std::time::Duration;
use subtle::ConstantTimeEq;
@@ -16,6 +17,9 @@ pub struct AuthenticateRequest {
pub content: AuthenticateRequestContent,
pub request_signature: ed25519::Signature,
#[serde(default)]
pub otel_context: Option<HashMap<String, String>>,
}
impl AuthenticateRequest {
@@ -23,6 +27,7 @@ impl AuthenticateRequest {
protocol_version: u8,
shared_key: &SharedGatewayKey,
identity_keys: &ed25519::KeyPair,
otel_context: Option<HashMap<String, String>>,
) -> Result<AuthenticateRequest, GatewayRequestsError> {
let content = AuthenticateRequestContent::new(
protocol_version,
@@ -35,6 +40,7 @@ impl AuthenticateRequest {
Ok(AuthenticateRequest {
content,
request_signature,
otel_context,
})
}
@@ -8,12 +8,16 @@ use crate::{
AUTHENTICATE_V2_PROTOCOL_VERSION, CREDENTIAL_UPDATE_V2_PROTOCOL_VERSION,
INITIAL_PROTOCOL_VERSION,
};
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::context::ContextCarrier;
use nym_credentials_interface::CredentialSpendingData;
use nym_crypto::asymmetric::ed25519;
use nym_sphinx::DestinationAddressBytes;
use nym_statistics_common::types::SessionType;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use tracing::{instrument, warn};
use tungstenite::Message;
pub mod authenticate;
@@ -76,6 +80,10 @@ pub enum ClientControlRequest {
address: String,
enc_address: String,
iv: String,
/// this is a trace id that is used in testing and performance verification
/// in mainnet, this will always be set to None
#[serde(default)]
otel_context: Option<HashMap<String, String>>,
},
AuthenticateV2(Box<AuthenticateRequest>),
@@ -127,14 +135,25 @@ impl ClientControlRequest {
let nonce = shared_key.random_nonce_or_iv();
let ciphertext = shared_key.encrypt_naive(address.as_bytes_ref(), Some(&nonce))?;
#[cfg(feature = "otel")]
let context_carrier = {
let context = opentelemetry::Context::current();
ContextCarrier::new_with_current_context(context).into_map()
};
Ok(ClientControlRequest::Authenticate {
protocol_version,
address: address.as_base58_string(),
enc_address: bs58::encode(&ciphertext).into_string(),
iv: bs58::encode(&nonce).into_string(),
#[cfg(feature = "otel")]
otel_context: Some(context_carrier),
#[cfg(not(feature = "otel"))]
otel_context: None,
})
}
#[instrument(skip_all)]
pub fn new_authenticate_v2(
shared_key: &SharedGatewayKey,
identity_keys: &ed25519::KeyPair,
@@ -142,8 +161,25 @@ impl ClientControlRequest {
// if we're using v2 authentication, we must announce at least that protocol version
let protocol_version = AUTHENTICATE_V2_PROTOCOL_VERSION;
#[cfg(feature = "otel")]
let context_carrier = {
use nym_bin_common::opentelemetry::context::extract_trace_id_from_tracing_cx;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let current_span = tracing::Span::current();
let otel_context = current_span.context();
ContextCarrier::new_with_current_context(otel_context).into_map()
};
#[cfg(not(feature = "otel"))]
let context_carrier: HashMap<String, String> = HashMap::new();
Ok(ClientControlRequest::AuthenticateV2(Box::new(
AuthenticateRequest::new(protocol_version, shared_key, identity_keys)?,
AuthenticateRequest::new(
protocol_version,
shared_key,
identity_keys,
Some(context_carrier),
)?,
)))
}
+4 -1
View File
@@ -11,9 +11,11 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
# TODO: Remove otel from default before release
default=["tunneling"]
tunneling=[]
network-defaults = ["dep:nym-network-defaults"]
otel = ["nym-bin-common/otel", "opentelemetry", "opentelemetry_sdk"]
debug-inventory = ["nym-http-api-client-macro/debug-inventory"]
[dependencies]
@@ -24,6 +26,8 @@ reqwest = { workspace = true, features = ["json", "gzip", "deflate", "brotli", "
http.workspace = true
url = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true}
@@ -53,4 +57,3 @@ features = ["tokio"]
[dev-dependencies]
tokio = { workspace = true, features = ["rt", "macros"] }
+15
View File
@@ -986,6 +986,21 @@ impl ApiClientCore for Client {
self.apply_hosts_to_req(&mut req);
// if opentelemetry is activated add the current trace context to the request
#[cfg(feature = "otel")]
{
use nym_bin_common::opentelemetry::context::ContextCarrier;
use opentelemetry::Context;
let carrier = ContextCarrier::new_with_current_context(Context::current());
if let Some(traceparent) = carrier.extract_traceparent() {
if let Ok(header_value) = HeaderValue::from_str(&traceparent) {
req.headers_mut().insert("traceparent", header_value);
}
}
}
let mut rb = RequestBuilder::from_parts(self.reqwest_client.clone(), req);
rb = rb
+3
View File
@@ -18,6 +18,7 @@ bytes = { workspace = true, optional = true }
colored = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
mime = { workspace = true, optional = true }
nym-bin-common = { path = "../bin-common" }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_yaml = { workspace = true, optional = true }
@@ -49,6 +50,8 @@ middleware = [
"zeroize"
]
otel = ["nym-bin-common/otel"]
utoipa = ["dep:utoipa"]
[lints]
@@ -56,6 +56,16 @@ async fn log_request(
let host = header_map(request.headers().get(HOST), "Unknown Host".to_string());
// Extract traceparent from headers if it exists
#[cfg(feature = "otel")]
let traceparent = request
.headers()
.get("traceparent")
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string());
#[cfg(not(feature = "otel"))]
let traceparent: Option<String> = None;
let start = Instant::now();
// run request through all middleware, incl. extractors
let res = next.run(request).await;
@@ -82,10 +92,10 @@ async fn log_request(
match level {
LogLevel::Debug => debug!(
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent}"
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent} traceparent: {traceparent:?}",
),
LogLevel::Info => info!(
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent}"
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent} traceparent: {traceparent:?}"
),
}
+4
View File
@@ -30,3 +30,7 @@ workspace = true
## wasm-only dependencies
[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-utils]
path = "../wasm/utils"
[ features ]
default = []
otel = ["nym-sphinx/otel"]
+2
View File
@@ -231,6 +231,8 @@ where
&address,
&address,
PacketType::Mix,
#[cfg(feature = "otel")]
None,
)?)
}
+10
View File
@@ -8,12 +8,14 @@ license = { workspace = true }
repository = { workspace = true }
[dependencies]
sphinx-packet = { workspace = true }
tracing = { workspace = true }
rand = { workspace = true }
rand_distr = { workspace = true }
rand_chacha = { workspace = true }
thiserror = { workspace = true }
nym-bin-common = { path = "../bin-common" }
nym-sphinx-acknowledgements = { path = "acknowledgements" }
nym-sphinx-addressing = { path = "addressing" }
nym-sphinx-anonymous-replies = { path = "anonymous-replies" }
@@ -55,3 +57,11 @@ outfox = [
"nym-sphinx-params/outfox",
"nym-sphinx-types/outfox",
]
otel = [
"nym-bin-common/otel",
"nym-sphinx-acknowledgements/otel",
"nym-sphinx-addressing/otel",
"nym-sphinx-cover/otel",
"nym-sphinx-anonymous-replies/otel",
]
@@ -24,3 +24,4 @@ nym-topology = { path = "../../topology" }
[features]
serde = ["dep:serde", "generic-array"]
otel = ["nym-sphinx-addressing/otel"]
@@ -61,7 +61,10 @@ impl SurbAck {
};
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
#[cfg(not(feature = "otel"))]
let destination = recipient.as_sphinx_destination();
#[cfg(feature = "otel")]
let destination = recipient.as_sphinx_destination(None);
let surb_ack_payload = prepare_identifier(rng, ack_key, marshaled_fragment_id);
let packet_size = match packet_type {
+7 -1
View File
@@ -8,14 +8,20 @@ license = { workspace = true }
repository = { workspace = true }
[dependencies]
nym-bin-common = { path = "../../bin-common", features = ["opentelemetry"] } # for trace id compression/decompression
nym-crypto = { path = "../../crypto", features = ["asymmetric", "sphinx"] } # all addresses are expressed in terms on their crypto keys
nym-sphinx-types = { path = "../types", features = ["sphinx"] } # we need to be able to refer to some types defined inside sphinx crate
serde = { workspace = true } # implementing serialization/deserialization for some types, like `Recipient`
thiserror = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
rand = { workspace = true }
nym-crypto = { path = "../../crypto", features = ["rand"] }
bincode = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
[features]
default = []
otel = ["nym-bin-common/otel"]
+18 -2
View File
@@ -153,12 +153,28 @@ impl Recipient {
// TODO: Currently the `DestinationAddress` is equivalent to `ClientIdentity`, but perhaps
// it shouldn't be? Maybe it should be (for example) H(`ClientIdentity || ClientEncryptionKey`)
// instead? That is an open question.
pub fn as_sphinx_destination(&self) -> Destination {
pub fn as_sphinx_destination(
&self,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>
) -> Destination {
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::compact_id_generator::decompress_trace_id;
#[cfg(feature = "otel")]
let trace_id_16 = if let Some(trace_id) = trace_id {
decompress_trace_id(&trace_id)
} else {
decompress_trace_id(&[0u8; 12])
};
// since the nym mix network differs slightly in design from loopix, we do not care
// about "surb_id" field at all and just use the default value.
Destination::new(
self.client_identity.derive_destination_address(),
Default::default(),
#[cfg(not(feature = "otel"))]
[0u8; 16],
#[cfg(feature = "otel")]
trace_id_16
)
}
@@ -25,3 +25,7 @@ workspace = true
[dev-dependencies]
rand_chacha = { workspace = true }
[ features ]
default = []
otel = ["nym-sphinx-addressing/otel"]
@@ -82,6 +82,9 @@ impl ReplySurb {
topology.random_route_to_egress(rng, recipient.gateway())?
};
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
#[cfg(feature = "otel")]
let destination = recipient.as_sphinx_destination(None);
#[cfg(not(feature = "otel"))]
let destination = recipient.as_sphinx_destination();
let mut surb_material = SURBMaterial::new(route, delays, destination);
+3
View File
@@ -20,3 +20,6 @@ nym-sphinx-params = { path = "../params" }
nym-sphinx-routing = { path = "../routing" }
nym-sphinx-types = { path = "../types" }
nym-topology = { path = "../../topology" }
[features]
otel = ["nym-sphinx-addressing/otel"]
+3
View File
@@ -125,7 +125,10 @@ where
let route = topology.random_route_to_egress(rng, full_address.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
#[cfg(not(feature = "otel"))]
let destination = full_address.as_sphinx_destination();
#[cfg(feature = "otel")]
let destination = full_address.as_sphinx_destination(None);
let rotation_id = topology.current_key_rotation();
let sphinx_key_rotation = SphinxKeyRotation::from(rotation_id);
+6
View File
@@ -11,8 +11,10 @@ repository = { workspace = true }
bytes = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
thiserror = { workspace = true }
opentelemetry = { workspace = true }
tracing = { workspace = true }
nym-bin-common = { path = "../../bin-common" }
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
nym-sphinx-params = { path = "../params", features = ["sphinx", "outfox"] }
nym-sphinx-forwarding = { path = "../forwarding" }
@@ -21,3 +23,7 @@ nym-sphinx-acknowledgements = { path = "../acknowledgements" }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
[features]
default = []
otel = ["nym-bin-common/otel"]
+42 -9
View File
@@ -2,6 +2,11 @@
// SPDX-License-Identifier: Apache-2.0
use crate::packet::FramedNymPacket;
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::{
compact_id_generator::decompress_trace_id, context::ManualContextPropagator,
};
use nym_sphinx_acknowledgements::surb_ack::{SurbAck, SurbAckRecoveryError};
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError};
use nym_sphinx_forwarding::packet::MixPacket;
@@ -14,7 +19,9 @@ use nym_sphinx_types::{
};
use std::fmt::Display;
use thiserror::Error;
use tracing::{debug, error, info, trace};
#[cfg(feature = "otel")]
use tracing::warn_span;
use tracing::{debug, error, info, instrument, trace, warn};
#[derive(Debug)]
pub enum MixProcessingResultData {
@@ -154,6 +161,7 @@ impl PartiallyUnwrappedPacket {
})
}
#[instrument(skip_all)]
pub fn finalise_unwrapping(self) -> Result<MixProcessingResult, PacketProcessingError> {
let packet_size = self.received_data.packet_size();
let packet_type = self.received_data.packet_type();
@@ -236,6 +244,7 @@ fn perform_framed_packet_processing(
})
}
#[instrument(skip_all)]
fn wrap_processed_sphinx_packet(
packet: nym_sphinx_types::ProcessedPacket,
packet_size: PacketSize,
@@ -258,15 +267,38 @@ fn wrap_processed_sphinx_packet(
// sphinx all together?
ProcessedPacketData::FinalHop {
destination,
identifier: _,
#[cfg(feature = "otel")]
identifier,
#[cfg(not(feature = "otel"))]
identifier: _,
payload,
} => process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
key_rotation,
),
} => {
// if we have a trace id in the destination, we log it for easier correlation later on
#[cfg(feature = "otel")]
let span = match identifier[0..12].try_into().map(|b: [u8; 12]| b) {
Ok(trace_bytes) if !trace_bytes.iter().all(|b| *b == 0) => {
let full_trace_id_bytes = decompress_trace_id(&trace_bytes);
let full_trace_id =
opentelemetry::trace::TraceId::from_bytes(full_trace_id_bytes);
let context_propagator =
ManualContextPropagator::new_from_tid("final_hop", full_trace_id);
tracing::info_span!(parent: &context_propagator.root_span, "final_hop_processing", trace_id=%full_trace_id)
}
_ => {
tracing::debug_span!("final_hop_processing")
}
};
#[cfg(feature = "otel")]
let _entered_span = span.enter();
process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
key_rotation,
)
}
}?;
Ok(MixProcessingResult {
@@ -312,6 +344,7 @@ fn wrap_processed_outfox_packet(
}
}
#[instrument(skip_all)]
fn perform_final_processing(
packet: NymProcessedPacket,
packet_size: PacketSize,
+17 -13
View File
@@ -163,19 +163,6 @@ pub trait FragmentPreparer {
})
}
/// Tries to convert this [`Fragment`] into a [`SphinxPacket`] that can be sent through the Nym mix-network,
/// such that it contains required SURB-ACK and public component of the ephemeral key used to
/// derive the shared key.
/// Also all the data, apart from the said public component, is encrypted with an ephemeral shared key.
/// This method can fail if the provided network topology is invalid.
/// It returns total expected delay as well as the [`SphinxPacket`] (including first hop address)
/// to be sent through the network.
///
/// The procedure is as follows:
/// For each fragment:
/// - compute SURB_ACK
/// - generate (x, g^x)
/// - compute k = KDF(remote encryption key ^ x) this is equivalent to KDF( dh(remote, x) )
/// - compute v_b = AES-128-CTR(k, serialized_fragment)
/// - compute vk_b = g^x || v_b
/// - compute sphinx_plaintext = SURB_ACK || g^x || v_b
@@ -189,6 +176,8 @@ pub trait FragmentPreparer {
packet_sender: &Recipient,
packet_recipient: &Recipient,
packet_type: PacketType,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) -> Result<PreparedFragment, NymTopologyError> {
debug!("Preparing chunk for sending");
// each plain or repliable packet (i.e. not a reply) attaches an ephemeral public key so that the recipient
@@ -249,6 +238,16 @@ pub trait FragmentPreparer {
topology.random_route_to_egress(&mut rng, destination)?
};
#[cfg(feature = "otel")]
let destination = packet_recipient.as_sphinx_destination(trace_id);
#[cfg(feature = "otel")]
tracing::info!(
"Packet destination with trace id: {:?}",
&destination.identifier
);
#[cfg(not(feature = "otel"))]
let destination = packet_recipient.as_sphinx_destination();
// including set of delays
@@ -274,6 +273,7 @@ pub trait FragmentPreparer {
)?,
};
// - compute k = KDF(remote encryption key ^ x) this is equivalent to KDF( dh(remote, x) )
// from the previously constructed route extract the first hop
let first_hop_address =
NymNodeRoutingAddress::try_from(route.first().unwrap().address).unwrap();
@@ -428,6 +428,8 @@ where
ack_key: &AckKey,
packet_recipient: &Recipient,
packet_type: PacketType,
#[cfg(feature = "otel")]
trace_id: Option<[u8; 12]>,
) -> Result<PreparedFragment, NymTopologyError> {
let sender = self.sender_address;
@@ -439,6 +441,8 @@ where
&sender,
packet_recipient,
packet_type,
#[cfg(feature = "otel")]
trace_id,
)
}
+2
View File
@@ -10,6 +10,8 @@ repository = { workspace = true }
[dependencies]
sphinx-packet = { workspace = true, optional = true }
nym-outfox = { path = "../../../nym-outfox", optional = true }
# TODO add optional
nym-bin-common = { path = "../../../common/bin-common" }
thiserror = { workspace = true }
[features]
-2
View File
@@ -32,8 +32,6 @@ tracing.workspace = true
url.workspace = true
# TEMP
#nym-bin-common = { path = "../bin-common", features = ["basic_tracing"]}
[build-dependencies]
+1
View File
@@ -37,3 +37,4 @@ nym-validator-client = { path = "../client-libs/validator-client" }
[features]
default = []
otel = ["nym-sphinx/otel"]
@@ -350,6 +350,8 @@ impl SocksClient {
self.config.connection_start_surbs,
TransmissionLane::ConnectionId(self.connection_id),
self.packet_type,
#[cfg(feature = "otel")]
None,
);
self.input_sender
.send(input_message)
@@ -373,6 +375,8 @@ impl SocksClient {
msg.into_bytes(),
TransmissionLane::ConnectionId(self.connection_id),
self.packet_type,
#[cfg(feature = "otel")]
None,
);
self.input_sender
.send(input_message)
@@ -439,6 +443,8 @@ impl SocksClient {
per_request_surbs,
lane,
packet_type,
#[cfg(feature = "otel")]
None,
)
} else {
InputMessage::new_regular(
@@ -446,6 +452,8 @@ impl SocksClient {
provider_message.into_bytes(),
lane,
packet_type,
#[cfg(feature = "otel")]
None,
)
}
})
+1
View File
@@ -29,6 +29,7 @@ x25519-dalek = { workspace = true, features = ["static_secrets"] }
cosmwasm-std = { workspace = true }
cosmrs = { workspace = true }
nym-bin-common = { path = "../../common/bin-common" }
nym-validator-client = { path = "../../common/client-libs/validator-client" }
nym-mixnet-contract-common = { path = "../../common/cosmwasm-smart-contracts/mixnet-contract" }
nym-vesting-contract-common = { path = "../../common/cosmwasm-smart-contracts/vesting-contract" }
+17
View File
@@ -50,6 +50,7 @@ zeroize = { workspace = true }
# internal
nym-api-requests = { path = "../nym-api/nym-api-requests" }
nym-bin-common = { path = "../common/bin-common" }
nym-credentials = { path = "../common/credentials" }
nym-credentials-interface = { path = "../common/credentials-interface" }
nym-credential-verification = { path = "../common/credential-verification" }
@@ -82,9 +83,25 @@ nym-service-provider-requests-common = { path = "../common/service-provider-requ
defguard_wireguard_rs = { workspace = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
tracing-opentelemetry = { workspace = true, optional = true }
[dev-dependencies]
nym-gateway-storage = { path = "../common/gateway-storage", features = ["mock"] }
nym-wireguard = { path = "../common/wireguard", features = ["mock"] }
mock_instant = "0.6.0"
time = { workspace = true }
[features]
default = []
otel = [
"nym-bin-common/otel",
"nym-client-core/otel",
"nym-gateway-requests/otel",
"nym-sphinx/otel",
"nym-sdk/otel",
"opentelemetry",
"opentelemetry_sdk",
"tracing-opentelemetry",
]
@@ -14,6 +14,8 @@ use futures::{
future::{FusedFuture, OptionFuture},
FutureExt, StreamExt,
};
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::context::ManualContextPropagator;
use nym_credential_verification::CredentialVerifier;
use nym_credential_verification::{
bandwidth_storage_manager::BandwidthStorageManager, ClientBandwidth,
@@ -31,6 +33,8 @@ use nym_sphinx::forwarding::packet::MixPacket;
use nym_statistics_common::{gateways::GatewaySessionEvent, types::SessionType};
use nym_validator_client::coconut::EcashApiError;
use rand::{random, CryptoRng, Rng};
#[cfg(feature = "otel")]
use std::collections::HashMap;
use std::{process, time::Duration};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -147,6 +151,8 @@ pub(crate) struct AuthenticatedHandler<R, S> {
// senders that are used to return the result of the ping to the handler requesting the ping.
is_active_request_receiver: IsActiveRequestReceiver,
is_active_ping_pending_reply: Option<(u64, IsActiveResultSender)>,
#[cfg(feature = "otel")]
pub otel_propagator: Option<ManualContextPropagator>,
}
// explicitly remove handle from the global store upon being dropped
@@ -189,6 +195,24 @@ impl<R, S> AuthenticatedHandler<R, S> {
client_address: client.address.as_base58_string(),
})?;
#[cfg(feature = "otel")]
let manual_ctx_propagator = {
let context = match client.otel_context {
Some(ref ctx) => ctx.clone(),
None => HashMap::new(),
};
let manual_ctx_propagator = if !context.is_empty() {
Some(ManualContextPropagator::new(
"upgrading_fresh_to_authenticated",
context,
))
} else {
None
};
manual_ctx_propagator
};
let handler = AuthenticatedHandler {
bandwidth_storage_manager: BandwidthStorageManager::new(
Box::new(fresh.shared_state.storage.clone()),
@@ -202,6 +226,8 @@ impl<R, S> AuthenticatedHandler<R, S> {
mix_receiver,
is_active_request_receiver,
is_active_ping_pending_reply: None,
#[cfg(feature = "otel")]
otel_propagator: manual_ctx_propagator,
};
handler.send_metrics(GatewaySessionEvent::new_session_start(
handler.client.address,
@@ -227,7 +253,19 @@ impl<R, S> AuthenticatedHandler<R, S> {
/// # Arguments
///
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
#[instrument(skip_all)]
fn forward_packet(&self, mix_packet: MixPacket) {
#[cfg(feature = "otel")]
{
let span = match &self.otel_propagator {
Some(propagator) => {
info_span!(parent: &propagator.root_span, "forwarding_mix_packet")
}
None => info_span!("forwarding_mix_packet_no_otel"),
};
let _enter = span.enter();
}
if let Err(err) = self
.inner
.shared_state
@@ -287,11 +325,26 @@ impl<R, S> AuthenticatedHandler<R, S> {
&mut self,
mix_packet: MixPacket,
) -> Result<ServerResponse, RequestHandlingError> {
trace!("forwarding sphinx packet");
#[cfg(feature = "otel")]
let span = {
let span = match &self.otel_propagator {
Some(propagator) => {
info_span!(parent: &propagator.root_span, "handling_forward_sphinx")
}
None => debug_span!("handling_forward_sphinx_no_otel"),
};
span
};
#[cfg(feature = "otel")]
let _enter = span.enter();
let required_bandwidth = mix_packet.packet().len() as i64;
let remaining_bandwidth = self
.bandwidth_storage_manager
.try_use_bandwidth(required_bandwidth)
.in_current_span()
.await?;
self.forward_packet(mix_packet);
@@ -305,8 +358,22 @@ impl<R, S> AuthenticatedHandler<R, S> {
/// # Arguments
///
/// * `bin_msg`: raw message to handle.
#[instrument(skip_all)]
async fn handle_binary(&mut self, bin_msg: Vec<u8>) -> Message {
trace!("binary request");
#[cfg(feature = "otel")]
let span = {
let span = match &self.otel_propagator {
Some(propagator) => {
info_span!(parent: &propagator.root_span, "handling_binary_request")
}
None => info_span!("handling_binary_request_no_otel"),
};
span
};
#[cfg(feature = "otel")]
let _enter = span.enter();
// this function decrypts the request and checks the MAC
match BinaryRequest::try_from_encrypted_tagged_bytes(bin_msg, &self.client.shared_keys) {
Err(e) => {
@@ -316,9 +383,11 @@ impl<R, S> AuthenticatedHandler<R, S> {
Ok(request) => match request {
// currently only a single type exists
BinaryRequest::ForwardSphinx { packet }
| BinaryRequest::ForwardSphinxV2 { packet } => {
self.handle_forward_sphinx(packet).await.into_ws_message()
}
| BinaryRequest::ForwardSphinxV2 { packet } => self
.handle_forward_sphinx(packet)
.in_current_span()
.await
.into_ws_message(),
_ => RequestHandlingError::UnknownBinaryRequest.into_error_message(),
},
}
@@ -379,6 +448,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
Ok(SensitiveServerResponse::KeyUpgradeAck {}.encrypt(&self.client.shared_keys)?)
}
#[instrument(skip_all)]
async fn handle_encrypted_text_request(
&mut self,
ciphertext: Vec<u8>,
@@ -408,6 +478,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
/// # Arguments
///
/// * `raw_request`: raw message to handle.
#[instrument(skip_all)]
async fn handle_text(&mut self, raw_request: String) -> Message
where
R: Rng + CryptoRng,
@@ -552,6 +623,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
}
}
#[instrument(skip_all)]
async fn handle_is_active_request(
&mut self,
reply_tx: IsActiveResultSender,
@@ -582,20 +654,37 @@ impl<R, S> AuthenticatedHandler<R, S> {
/// Simultaneously listens for incoming client requests, which realistically should only be
/// binary requests to forward sphinx packets or increase bandwidth
/// and for sphinx packets received from the mix network that should be sent back to the client.
#[instrument(level = "debug", skip_all,
fields(
client = %self.client.address.as_base58_string()
)
)]
pub(crate) async fn listen_for_requests(mut self)
where
R: Rng + CryptoRng,
S: AsyncRead + AsyncWrite + Unpin,
{
trace!("Started listening for ALL incoming requests...");
// Ping timeout future used to check if the client responded to our ping request
let mut ping_timeout: OptionFuture<_> = None.into();
#[cfg(feature = "otel")]
let from_client_span = {
let span = match &self.otel_propagator {
Some(propagator) => {
info_span!(parent: &propagator.root_span, "authenticated_client_handler_listen")
}
None => tracing::debug_span!("authenticated_client_handler_listen_no_otel"),
};
span
};
#[cfg(feature = "otel")]
let _enter = from_client_span.enter();
loop {
tokio::select! {
// Received a request to ping the client to check if it's still active
tx = self.is_active_request_receiver.next() => {
tx = self.is_active_request_receiver.next().in_current_span() => {
match tx {
None => break,
Some(reply_tx) => {
@@ -609,10 +698,10 @@ impl<R, S> AuthenticatedHandler<R, S> {
},
// The ping timeout expired, meaning the client didn't respond to our ping request
_ = &mut ping_timeout, if !ping_timeout.is_terminated() => {
ping_timeout = None.into();
self.handle_ping_timeout().await;
ping_timeout = None.into();
self.handle_ping_timeout().await;
},
socket_msg = self.inner.read_websocket_message() => {
socket_msg = self.inner.read_websocket_message().in_current_span() => {
let socket_msg = match socket_msg {
None => break,
Some(Ok(socket_msg)) => socket_msg,
@@ -627,7 +716,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
}
if let Some(response) = self.handle_request(socket_msg).await {
if let Err(err) = self.inner.send_websocket_message(response).await {
if let Err(err) = self.inner.send_websocket_message(response).in_current_span().await {
debug!(
"Failed to send message over websocket: {err}. Assuming the connection is dead.",
);
@@ -635,7 +724,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
}
}
},
mix_messages = self.mix_receiver.next() => {
mix_messages = self.mix_receiver.next().in_current_span() => {
let mix_messages = match mix_messages {
None => {
debug!("mix receiver was closed! Assuming the connection is dead.");
@@ -13,6 +13,8 @@ use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::context::ManualContextPropagator;
use nym_credentials_interface::AvailableBandwidth;
use nym_crypto::aes::cipher::crypto_common::rand_core::RngCore;
use nym_crypto::asymmetric::ed25519;
@@ -34,6 +36,8 @@ use nym_node_metrics::events::MetricsEvent;
use nym_sphinx::DestinationAddressBytes;
use nym_task::ShutdownToken;
use rand::CryptoRng;
#[cfg(feature = "otel")]
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::Duration;
use thiserror::Error;
@@ -41,7 +45,9 @@ use time::OffsetDateTime;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::timeout;
use tokio_tungstenite::tungstenite::{protocol::Message, Error as WsError};
use tracing::*;
#[cfg(feature = "otel")]
use tracing::info_span;
use tracing::{debug, error, info, instrument, warn, Instrument};
#[derive(Debug, Error)]
pub(crate) enum InitialAuthenticationError {
@@ -163,6 +169,7 @@ impl<R, S> FreshHandler<R, S> {
/// Attempts to perform websocket handshake with the remote and upgrades the raw TCP socket
/// to the framed WebSocket.
#[instrument(skip_all)]
pub(crate) async fn perform_websocket_handshake(&mut self) -> Result<(), WsError>
where
S: AsyncRead + AsyncWrite + Unpin,
@@ -186,6 +193,7 @@ impl<R, S> FreshHandler<R, S> {
/// # Arguments
///
/// * `init_msg`: a client handshake init message which should contain its identity public key as well as an ephemeral key.
#[instrument(skip_all)]
async fn perform_registration_handshake(
&mut self,
init_msg: Vec<u8>,
@@ -211,6 +219,7 @@ impl<R, S> FreshHandler<R, S> {
}
/// Attempts to read websocket message from the associated socket.
#[instrument(skip_all)]
pub(crate) async fn read_websocket_message(&mut self) -> Option<Result<Message, WsError>>
where
S: AsyncRead + AsyncWrite + Unpin,
@@ -226,6 +235,7 @@ impl<R, S> FreshHandler<R, S> {
/// # Arguments
///
/// * `msg`: WebSocket message to write back to the client.
#[instrument(skip_all)]
pub(crate) async fn send_websocket_message(
&mut self,
msg: impl Into<Message>,
@@ -242,6 +252,7 @@ impl<R, S> FreshHandler<R, S> {
}
}
#[instrument(skip_all)]
pub(crate) async fn send_error_response(
&mut self,
err: impl std::error::Error,
@@ -269,6 +280,7 @@ impl<R, S> FreshHandler<R, S> {
///
/// * `shared_keys`: keys derived between the client and gateway.
/// * `packets`: unwrapped packets that are to be pushed back to the client.
#[instrument(skip_all)]
pub(crate) async fn push_packets_to_client(
&mut self,
shared_keys: &SharedGatewayKey,
@@ -326,6 +338,7 @@ impl<R, S> FreshHandler<R, S> {
///
/// * `client_address`: address of the client that is going to receive the messages.
/// * `shared_keys`: shared keys derived between the client and the gateway used to encrypt and tag the messages.
#[instrument(skip_all)]
async fn push_stored_messages_to_client(
&mut self,
client_address: DestinationAddressBytes,
@@ -632,6 +645,8 @@ impl<R, S> FreshHandler<R, S> {
address,
shared_keys.key,
session_request_start,
#[cfg(feature = "otel")]
None,
)),
ServerResponse::Authenticate {
protocol_version: Some(negotiated_protocol),
@@ -641,9 +656,13 @@ impl<R, S> FreshHandler<R, S> {
))
}
#[instrument(skip_all, fields(
address = %request.content.client_identity.derive_destination_address(),
))]
async fn handle_authenticate_v2(
&mut self,
request: Box<AuthenticateRequest>,
#[cfg(feature = "otel")] otel_context: Option<HashMap<String, String>>,
) -> Result<InitialAuthResult, InitialAuthenticationError>
where
S: AsyncRead + AsyncWrite + Unpin,
@@ -718,6 +737,8 @@ impl<R, S> FreshHandler<R, S> {
address,
shared_key.key,
session_request_start,
#[cfg(feature = "otel")]
otel_context,
)),
ServerResponse::Authenticate {
protocol_version: Some(negotiated_protocol),
@@ -816,6 +837,8 @@ impl<R, S> FreshHandler<R, S> {
remote_address,
shared_keys,
OffsetDateTime::now_utc(),
#[cfg(feature = "otel")]
None,
);
Ok(InitialAuthResult::new(
@@ -846,6 +869,7 @@ impl<R, S> FreshHandler<R, S> {
}
}
#[instrument(skip_all)]
pub(crate) async fn handle_initial_client_request(
&mut self,
request: ClientControlRequest,
@@ -854,17 +878,58 @@ impl<R, S> FreshHandler<R, S> {
S: AsyncRead + AsyncWrite + Unpin + Send,
R: CryptoRng + RngCore + Send,
{
// we can handle stateless client requests without prior authentication, like `ClientControlRequest::SupportedProtocol`
// extract and set up opentelemetry context if provided
#[cfg(feature = "otel")]
let (context_propagator, otel_ctx) =
if let ClientControlRequest::AuthenticateV2(ref auth_req) = request {
if let Some(otel_context) = &auth_req.otel_context {
info!(
"=== OpenTelemetry context provided in the request: {otel_context:?} ==="
);
(
Some(ManualContextPropagator::new(
"handling_initial_client_request_with_otel",
otel_context.clone(),
)),
Some(otel_context.clone()),
)
} else {
debug!("No OpenTelemetry context provided in the request");
(None, None)
}
} else {
debug!("No OpenTelemetry context provided in the request");
(None, None)
};
#[cfg(feature = "otel")]
let child_span = match context_propagator {
Some(ref propagator) => {
let span = info_span!(parent: &propagator.root_span, "=== Handling initial client request with otel context ===");
span
}
None => {
tracing::debug_span!("=== Handling initial client request without otel context ===")
}
};
#[cfg(feature = "otel")]
let _enter = child_span.enter();
let auth_result = match request {
ClientControlRequest::Authenticate {
protocol_version,
address,
enc_address,
iv,
otel_context: _,
} => {
self.handle_legacy_authenticate(protocol_version, address, enc_address, iv)
.await
}
#[cfg(feature = "otel")]
ClientControlRequest::AuthenticateV2(req) => {
self.handle_authenticate_v2(req, otel_ctx).await
}
#[cfg(not(feature = "otel"))]
ClientControlRequest::AuthenticateV2(req) => self.handle_authenticate_v2(req).await,
ClientControlRequest::RegisterHandshakeInitRequest {
protocol_version,
@@ -889,7 +954,6 @@ impl<R, S> FreshHandler<R, S> {
}
other => debug!("authentication failure: {other}"),
}
self.send_and_forget_error_response(&err).await;
return Err(err);
}
@@ -912,9 +976,11 @@ impl<R, S> FreshHandler<R, S> {
warn!("could not establish client details");
return Err(InitialAuthenticationError::EmptyClientDetails);
};
Ok(Some(client_details))
}
#[instrument(skip_all)]
pub(crate) async fn handle_until_authenticated_or_failure(
mut self,
) -> Option<AuthenticatedHandler<R, S>>
@@ -953,7 +1019,7 @@ impl<R, S> FreshHandler<R, S> {
registration_details.session_request_timestamp,
);
return AuthenticatedHandler::upgrade(
let auth_handle = AuthenticatedHandler::upgrade(
self,
registration_details,
mix_receiver,
@@ -962,10 +1028,12 @@ impl<R, S> FreshHandler<R, S> {
.await
.inspect_err(|err| error!("failed to upgrade client handler: {err}"))
.ok();
return auth_handle;
}
}
}
#[instrument(skip_all)]
pub(crate) async fn wait_for_initial_message(
&mut self,
) -> Result<ClientControlRequest, InitialAuthenticationError>
@@ -1016,6 +1084,7 @@ impl<R, S> FreshHandler<R, S> {
.map_err(|_| InitialAuthenticationError::InvalidRequest)
}
#[instrument(skip_all)]
pub(crate) async fn start_handling(self)
where
S: AsyncRead + AsyncWrite + Unpin + Send,
@@ -1025,10 +1094,10 @@ impl<R, S> FreshHandler<R, S> {
let shutdown = self.shutdown.clone();
tokio::select! {
_ = shutdown.cancelled() => {
trace!("received cancellation")
tracing::trace!("received cancellation")
}
_ = super::handle_connection(self) => {
debug!("finished connection handler for {remote}")
_ = super::handle_connection(self).instrument(tracing::debug_span!("connection_handler", remote = %remote)) => {
tracing::debug!("finished connection handler for {remote}")
}
}
}
@@ -7,10 +7,13 @@ use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_gateway_requests::ServerResponse;
use nym_sphinx::DestinationAddressBytes;
use rand::{CryptoRng, Rng};
#[cfg(feature = "otel")]
use std::collections::HashMap;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::WebSocketStream;
use tracing::Instrument;
use tracing::{debug, instrument, trace, warn};
pub(crate) use self::authenticated::AuthenticatedHandler;
@@ -48,6 +51,8 @@ pub(crate) struct ClientDetails {
// note, this does **NOT ALWAYS** indicate timestamp of when client connected
// it is (for v2 auth) timestamp the client **signed** when it created the request
pub(crate) session_request_timestamp: OffsetDateTime,
#[cfg(feature = "otel")]
pub(crate) otel_context: Option<HashMap<String, String>>,
}
impl ClientDetails {
@@ -56,12 +61,15 @@ impl ClientDetails {
address: DestinationAddressBytes,
shared_keys: SharedGatewayKey,
session_request_timestamp: OffsetDateTime,
#[cfg(feature = "otel")] otel_context: Option<HashMap<String, String>>,
) -> Self {
ClientDetails {
address,
id,
shared_keys,
session_request_timestamp,
#[cfg(feature = "otel")]
otel_context,
}
}
}
@@ -92,7 +100,7 @@ impl InitialAuthResult {
}
// imo there's no point in including the peer address in anything higher than debug
#[instrument(level = "debug", skip_all, fields(peer = %handle.peer_address))]
#[instrument(skip_all)]
pub(crate) async fn handle_connection<R, S>(mut handle: FreshHandler<R, S>)
where
R: Rng + CryptoRng + Send,
@@ -117,8 +125,25 @@ where
trace!("managed to perform websocket handshake!");
if let Some(auth_handle) = handle.handle_until_authenticated_or_failure().await {
auth_handle.listen_for_requests().await
if let Some(auth_handle) = handle.handle_until_authenticated_or_failure().in_current_span().await {
#[cfg(feature = "otel")]
{
let from_client_span = {
let parent = match auth_handle.otel_propagator.as_ref() {
Some(propagator) => propagator.root_span(),
None => &tracing::Span::current(), // fallback to current span if no propagator
};
tracing::info_span!(parent: parent, "listening for requests")
};
auth_handle
.listen_for_requests()
.instrument(from_client_span)
.await
}
#[cfg(not(feature = "otel"))]
{
auth_handle.listen_for_requests().await;
}
}
trace!("the handler is done!");
@@ -53,6 +53,7 @@ impl Listener {
)
}
#[instrument(skip_all)]
fn try_handle_accepted_connection(&self, accepted: io::Result<(TcpStream, SocketAddr)>) {
match accepted {
Ok((socket, remote_address)) => {
@@ -90,7 +91,7 @@ impl Listener {
let metrics_ref = handle.shared_state.metrics.clone();
// 4.1. handle all client requests until connection gets terminated
handle.start_handling().await;
handle.start_handling().in_current_span().await;
// 4.2. decrement the connection counter
metrics_ref.network.disconnected_ingress_websocket_client();
@@ -104,6 +105,7 @@ impl Listener {
// TODO: change the signature to pub(crate) async fn run(&self, handler: Handler)
#[instrument(skip_all)]
pub async fn run(&mut self) {
info!("Starting websocket listener at {}", self.address);
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
@@ -122,7 +124,7 @@ impl Listener {
trace!("client_handling::Listener: received shutdown");
break
}
connection = tcp_listener.accept() => {
connection = tcp_listener.accept().in_current_span() => {
self.try_handle_accepted_connection(connection)
}
}
@@ -977,6 +977,8 @@ fn create_input_message(
response_packet,
lane,
packet_type,
#[cfg(feature = "otel")]
None,
))
} else {
tracing::error!("No nym-address or sender tag provided");
+5 -1
View File
@@ -95,7 +95,7 @@ nym-topology = { path = "../common/topology" }
nym-api-requests = { path = "nym-api-requests" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-http-api-client = { path = "../common/http-api-client" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "openapi", "basic_tracing"] }
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "openapi"] }
nym-node-tester-utils = { path = "../common/node-tester-utils" }
nym-node-requests = { path = "../nym-node/nym-node-requests" }
nym-types = { path = "../common/types" }
@@ -110,6 +110,10 @@ nym-ecash-signer-check = { path = "../common/ecash-signer-check" }
no-reward = []
v2-performance = []
generate-ts = ["ts-rs"]
otel = [
"nym-bin-common/otel",
"nym-node-tester-utils/otel",
]
[build-dependencies]
anyhow = { workspace = true }
+1 -1
View File
@@ -31,7 +31,7 @@ async fn main() -> Result<(), anyhow::Error> {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
console_subscriber::init();
} else {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
}}
info!("Starting nym api...");
@@ -45,9 +45,7 @@ utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
zeroize.workspace = true
nym-bin-common = { path = "../../common/bin-common", features = [
"basic_tracing",
] }
nym-bin-common = { path = "../../common/bin-common" }
nym-compact-ecash = { path = "../../common/nym_offline_compact_ecash" }
nym-config = { path = "../../common/config" }
nym-crypto = { path = "../../common/crypto", features = [
@@ -6,7 +6,7 @@ cfg_if::cfg_if! {
use crate::cli::Cli;
use clap::Parser;
use nym_bin_common::bin_info_owned;
use nym_bin_common::logging::setup_tracing_logger;
use nym_bin_common::logging::setup_no_otel_logger;
use nym_network_defaults::setup_env;
use tracing::{info, trace};
@@ -29,7 +29,7 @@ async fn main() -> anyhow::Result<()> {
trace!("args: {cli:#?}");
setup_env(cli.config_env_file.as_ref());
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
let bin_info = bin_info_owned!();
info!("using the following version: {bin_info}");
+4
View File
@@ -73,3 +73,7 @@ vergen-gitcl = { workspace = true, default-features = false, features = [
[package.metadata.cargo-machete]
ignored = ["vergen", "nym-http-api-client"]
[features]
default = []
otel = ["nym-sdk/otel", "nym-http-api-client/otel", "nym-bin-common/otel"]
+2
View File
@@ -84,6 +84,8 @@ fn create_input_message(
surbs,
lane,
packet_type,
#[cfg(feature = "otel")]
None
))
}
+1 -1
View File
@@ -30,7 +30,7 @@ utoipa-swagger-ui = { workspace = true, features = ["axum"] }
tokio-postgres = { workspace = true }
# internal
nym-bin-common = { path = "../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../common/bin-common" }
nym-client-core = { path = "../common/client-core" }
nym-crypto = { path = "../common/crypto" }
nym-network-defaults = { path = "../common/network-defaults" }
+1 -1
View File
@@ -189,7 +189,7 @@ async fn nym_topology_from_env() -> anyhow::Result<NymTopology> {
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
let args = Args::parse();
+12 -2
View File
@@ -31,6 +31,7 @@ humantime-serde = { workspace = true }
human-repr = { workspace = true }
ipnetwork = { workspace = true }
indicatif = { workspace = true }
opentelemetry = { workspace = true, optional = true } # make it optional later
rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
@@ -50,7 +51,6 @@ cupid = { workspace = true }
sysinfo = { workspace = true }
nym-bin-common = { path = "../common/bin-common", features = [
"basic_tracing",
"output_format",
] }
nym-client-core-config-types = { path = "../common/client-core/config-types", features = [
@@ -130,7 +130,17 @@ criterion = { workspace = true, features = ["async_tokio"] }
rand_chacha = { workspace = true }
[features]
tokio-console = ["console-subscriber", "nym-task/tokio-tracing"]
default = []
tokio-console = ["console-subscriber"]
otel = [
"nym-bin-common/otel",
"nym-gateway/otel",
"nym-http-api-common/otel",
"nym-sphinx-framing/otel",
"nym-sphinx-addressing/otel",
"opentelemetry",
]
[lints]
workspace = true
+2 -1
View File
@@ -8,7 +8,7 @@ use crate::node::bonding_information::BondingInformation;
use crate::node::mixnet::packet_forwarding::global::is_global_ip;
use std::fs;
use std::net::IpAddr;
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, instrument, trace, warn};
mod args;
@@ -39,6 +39,7 @@ fn check_public_ips(ips: &[IpAddr], local: bool) -> Result<(), NymNodeError> {
Ok(())
}
#[instrument(skip_all)]
pub(crate) async fn execute(mut args: Args) -> Result<(), NymNodeError> {
trace!("passed arguments: {args:#?}");
+127 -15
View File
@@ -6,10 +6,18 @@ use crate::cli::commands::{
test_throughput,
};
use crate::env::vars::{NYMNODE_CONFIG_ENV_FILE_ARG, NYMNODE_NO_BANNER_ARG};
// use crate::error::NymNodeError;
use clap::{Args, Parser, Subcommand};
use nym_bin_common::bin_info;
#[cfg(feature = "otel")]
use nym_bin_common::logging::error::TracingError;
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::setup_tracing_logger;
use nym_bin_common::{bin_info, logging::setup_no_otel_logger};
use std::future::Future;
use std::sync::OnceLock;
#[cfg(feature = "otel")]
use tracing::Instrument;
use tracing::instrument;
pub(crate) mod commands;
mod helpers;
@@ -52,30 +60,134 @@ impl Cli {
.block_on(fut))
}
#[instrument]
pub(crate) fn execute(self) -> anyhow::Result<()> {
// NOTE: `test_throughput` sets up its own logger as it has to include additional layers
if !matches!(self.command, Commands::TestThroughput(..)) {
crate::logging::setup_tracing_logger()?;
}
// if !matches!(self.command, Commands::TestThroughput(..)) {
// crate::logging::setup_tracing_logger()?;
// }
match self.command {
Commands::BuildInfo(args) => build_info::execute(args)?,
Commands::BondingInformation(args) => {
{ Self::execute_async(bonding_information::execute(args))? }?
// Sync commands get logger w. no OTEL
Commands::BuildInfo(args) => {
setup_no_otel_logger()?;
build_info::execute(args)?
}
Commands::NodeDetails(args) => { Self::execute_async(node_details::execute(args))? }?,
Commands::Run(args) => { Self::execute_async(run::execute(*args))? }?,
Commands::Migrate(args) => migrate::execute(*args)?,
Commands::Sign(args) => { Self::execute_async(sign::execute(args))? }?,
Commands::TestThroughput(args) => test_throughput::execute(args)?,
Commands::UnsafeResetSphinxKeys(args) => {
{ Self::execute_async(reset_sphinx_keys::execute(args))? }?
Commands::Migrate(args) => {
setup_no_otel_logger()?;
migrate::execute(*args)?
}
Commands::Debug(debug) => match debug.command {
DebugCommands::ResetProvidersGatewayDbs(args) => {
{ Self::execute_async(debug::reset_providers_dbs::execute(args))? }?
let _ = Self::execute_async(debug::reset_providers_dbs::execute(args))?;
}
},
Commands::TestThroughput(args) => {
// Has its own logging setup
test_throughput::execute(args)?
}
// SigNoz/OTEL run in async context
Commands::BondingInformation(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard =
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
bonding_information::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
bonding_information::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
Commands::NodeDetails(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard =
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
node_details::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
node_details::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
Commands::Run(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard =
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
tracing::warn!("OpenTelemetry is enabled for this nym-node instance.");
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
run::execute(*args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
run::execute(*args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
Commands::Sign(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard =
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
sign::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
sign::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
Commands::UnsafeResetSphinxKeys(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard =
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
reset_sphinx_keys::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
reset_sphinx_keys::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
}
Ok(())
}
+14 -14
View File
@@ -428,20 +428,20 @@ impl Config {
pub fn validate(&self) -> Result<(), NymNodeError> {
self.mixnet.validate()?;
// it's not allowed to run mixnode mode alongside entry mode
if self.modes.mixnode && self.modes.entry {
return Err(NymNodeError::config_validation_failure(
"illegal modes configuration - node cannot run as a mixnode and an entry gateway",
));
}
// nor it's allowed to run mixnode mode alongside exit mode
// (use two separate checks for better error messages)
if self.modes.mixnode && self.modes.exit {
return Err(NymNodeError::config_validation_failure(
"illegal modes configuration - node cannot run as a mixnode and an exit gateway",
));
}
// // it's not allowed to run mixnode mode alongside entry mode
// if self.modes.mixnode && self.modes.entry {
// return Err(NymNodeError::config_validation_failure(
// "illegal modes configuration - node cannot run as a mixnode and an entry gateway",
// ));
// }
//
// // nor it's allowed to run mixnode mode alongside exit mode
// // (use two separate checks for better error messages)
// if self.modes.mixnode && self.modes.exit {
// return Err(NymNodeError::config_validation_failure(
// "illegal modes configuration - node cannot run as a mixnode and an exit gateway",
// ));
// }
Ok(())
}
+2
View File
@@ -76,6 +76,8 @@ pub enum KeyIOFailure {
#[derive(Debug, Error)]
pub enum NymNodeError {
// #[error("Failed to setup tracing logger")]
// TracingSetupFailure(#[source] anyhow::Error),
#[error("this binary version no longer supports migration from legacy mixnodes and gateways")]
UnsupportedMigration,
+4 -25
View File
@@ -1,11 +1,11 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_bin_common::logging::{default_tracing_env_filter, default_tracing_fmt_layer};
use nym_bin_common::logging::default_tracing_env_filter;
use tracing_subscriber::filter::Directive;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
// use tracing_subscriber::layer::SubscriberExt;
// use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
@@ -21,24 +21,3 @@ pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
}
Ok(filter)
}
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
let stderr_layer =
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
let console_layer = console_subscriber::spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(stderr_layer)
.init();
} else {
tracing_subscriber::registry()
.with(stderr_layer)
.init();
}}
Ok(())
}
+8 -3
View File
@@ -20,7 +20,7 @@ use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
use tracing::{debug, error, instrument, trace, warn};
use tracing::{debug, error, instrument, Instrument, trace, warn};
struct PendingReplayCheckPackets {
// map of rotation id used for packet creation to the packets
@@ -130,6 +130,7 @@ impl ConnectionHandler {
Some(now + delay)
}
#[instrument(skip_all, level = "debug")]
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
if !self.shared.processing_config.forward_hop_processing_enabled {
trace!("this nym-node does not support forward hop packets");
@@ -141,6 +142,7 @@ impl ConnectionHandler {
self.shared.forward_mix_packet(mix_packet, forward_instant);
}
#[instrument(skip_all, level = "debug")]
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop) {
if !self.shared.processing_config.final_hop_processing_enabled {
trace!("this nym-node does not support final hop packets");
@@ -269,6 +271,7 @@ impl ConnectionHandler {
}
}
#[instrument(skip_all, level = "debug")]
async fn handle_received_packet_with_replay_detection(
&mut self,
now: Instant,
@@ -379,6 +382,7 @@ impl ConnectionHandler {
true
}
#[instrument(skip_all)]
async fn handle_pending_packets_batch(&mut self, now: Instant) {
let batch = self.pending_packets.reset(now);
let replay_tags = self.pending_packets.replay_tags();
@@ -481,6 +485,7 @@ impl ConnectionHandler {
.await
}
#[instrument(skip_all)]
pub(crate) async fn handle_stream(
&mut self,
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
@@ -492,9 +497,9 @@ impl ConnectionHandler {
trace!("connection handler: received shutdown");
break
}
maybe_framed_nym_packet = mixnet_connection.next() => {
maybe_framed_nym_packet = mixnet_connection.next().in_current_span() => {
match maybe_framed_nym_packet {
Some(Ok(packet)) => self.handle_received_nym_packet(packet).await,
Some(Ok(packet)) => self.handle_received_nym_packet(packet).in_current_span().await,
Some(Err(err)) => {
debug!("connection got corrupted with: {err}");
return
+3 -3
View File
@@ -4,7 +4,7 @@
use crate::node::mixnet::SharedData;
use nym_task::ShutdownToken;
use std::net::SocketAddr;
use tracing::{debug, error, info, trace};
use tracing::{Instrument, debug, error, info, instrument, trace};
pub(crate) struct Listener {
bind_address: SocketAddr,
@@ -18,7 +18,7 @@ impl Listener {
shared_data,
}
}
#[instrument(skip_all, level = "debug")]
pub(crate) async fn run(&mut self, shutdown: ShutdownToken) {
info!("attempting to run mixnet listener on {}", self.bind_address);
@@ -41,7 +41,7 @@ impl Listener {
trace!("mixnet listener: received shutdown");
break
}
connection = tcp_listener.accept() => {
connection = tcp_listener.accept().in_current_span() => {
self.shared_data.try_handle_connection(connection);
}
}
@@ -13,7 +13,7 @@ use nym_sphinx_forwarding::packet::MixPacket;
use nym_task::ShutdownToken;
use std::io;
use tokio::time::Instant;
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, instrument, Instrument, trace, warn};
pub(crate) mod global;
@@ -46,6 +46,7 @@ impl<C, F> PacketForwarder<C, F> {
self.packet_sender.clone()
}
#[instrument(skip_all)]
fn forward_packet(&mut self, packet: MixPacket)
where
C: SendWithoutResponse,
@@ -78,6 +79,7 @@ impl<C, F> PacketForwarder<C, F> {
self.forward_packet(delayed_packet);
}
#[instrument(skip_all)]
fn handle_new_packet(&mut self, new_packet: PacketToForward)
where
C: SendWithoutResponse,
@@ -120,6 +122,7 @@ impl<C, F> PacketForwarder<C, F> {
.update_packet_forwarder_queue_size(channel_size)
}
#[instrument(skip_all)]
pub async fn run(&mut self, shutdown_token: ShutdownToken)
where
C: SendWithoutResponse,
@@ -130,16 +133,16 @@ impl<C, F> PacketForwarder<C, F> {
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
_ = shutdown_token.cancelled().in_current_span() => {
debug!("PacketForwarder: Received shutdown");
break;
}
delayed = self.delay_queue.next() => {
delayed = self.delay_queue.next().in_current_span() => {
// SAFETY: `stream` implementation of `NonExhaustiveDelayQueue` never returns `None`
#[allow(clippy::unwrap_used)]
self.handle_done_delaying(delayed.unwrap());
}
new_packet = self.packet_receiver.next() => {
new_packet = self.packet_receiver.next().in_current_span() => {
// this one is impossible to ever panic - the struct itself contains a sender
// and hence it can't happen that ALL senders are dropped
#[allow(clippy::unwrap_used)]
+6 -3
View File
@@ -23,7 +23,7 @@ use std::time::Duration;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::{debug, error};
use tracing::{Instrument, debug, error, instrument};
pub(crate) mod final_hop;
@@ -164,6 +164,7 @@ impl SharedData {
}
}
#[instrument(skip_all)]
pub(super) fn try_handle_connection(
&self,
accepted: io::Result<(TcpStream, SocketAddr)>,
@@ -172,8 +173,9 @@ impl SharedData {
Ok((socket, remote_addr)) => {
debug!("accepted incoming mixnet connection from: {remote_addr}");
let mut handler = ConnectionHandler::new(self, remote_addr);
let join_handle =
tokio::spawn(async move { handler.handle_connection(socket).await });
let join_handle = tokio::spawn(async move {
handler.handle_connection(socket).in_current_span().await
});
self.log_connected_clients();
Some(join_handle)
}
@@ -184,6 +186,7 @@ impl SharedData {
}
}
#[instrument(skip_all)]
pub(super) fn forward_mix_packet(&self, packet: MixPacket, delay_until: Option<Instant>) {
if self
.mixnet_forwarder
+12 -6
View File
@@ -66,7 +66,7 @@ use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, info, trace};
use tracing::{Instrument, debug, info, instrument, trace};
use zeroize::Zeroizing;
pub mod bonding_information;
@@ -601,6 +601,7 @@ impl NymNode {
})
}
#[instrument(skip_all)]
async fn start_gateway_tasks(
&mut self,
cached_network: CachedNetwork,
@@ -636,8 +637,10 @@ impl NymNode {
let mut websocket = gateway_tasks_builder
.build_websocket_listener(active_clients_store.clone())
.await?;
self.shutdown_tracker()
.try_spawn_named(async move { websocket.run().await }, "EntryWebsocket");
self.shutdown_tracker().try_spawn_named(
async move { websocket.run().in_current_span().await },
"EntryWebsocket",
);
} else {
info!("node not running in entry mode: the websocket will remain closed");
}
@@ -1056,6 +1059,7 @@ impl NymNode {
Ok(())
}
#[instrument(skip_all)]
pub(crate) async fn start_mixnet_listener<F>(
&self,
active_clients_store: &ActiveClientsStore,
@@ -1099,7 +1103,7 @@ impl NymNode {
let shutdown_token = self.shutdown_token();
self.shutdown_tracker().try_spawn_named(
async move { packet_forwarder.run(shutdown_token).await },
async move { packet_forwarder.run(shutdown_token).in_current_span().await },
"PacketForwarder",
);
@@ -1123,7 +1127,7 @@ impl NymNode {
let shutdown_token = self.shutdown_token();
self.shutdown_tracker().try_spawn_named(
async move { mixnet_listener.run(shutdown_token).await },
async move { mixnet_listener.run(shutdown_token).in_current_span().await },
"MixnetListener",
);
@@ -1152,6 +1156,7 @@ impl NymNode {
Ok(())
}
#[instrument(skip_all)]
async fn start_nym_node_tasks(mut self) -> Result<ShutdownManager, NymNodeError> {
info!(
"starting Nym Node {} with the following modes: mixnode: {}, entry: {}, exit: {}, wireguard: {}",
@@ -1226,6 +1231,7 @@ impl NymNode {
Ok(self.shutdown_manager)
}
#[instrument(skip_all)]
pub(crate) async fn run(mut self) -> Result<(), NymNodeError> {
let mut shutdown_signals = self.shutdown_manager.detach_shutdown_signals();
@@ -1236,7 +1242,7 @@ impl NymNode {
// ideally we'd also do some cleanup here, but currently there's no easy way to access the handles
return Ok(())
}
startup_result = self.start_nym_node_tasks() => {
startup_result = self.start_nym_node_tasks().in_current_span() => {
let mut shutdown_manager = startup_result?;
shutdown_manager.replace_shutdown_signals(shutdown_signals);
shutdown_manager.run_until_shutdown().await;
+1 -1
View File
@@ -20,7 +20,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing = { workspace = true }
url = { workspace = true }
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "basic_tracing"] }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
nym-ecash-signer-check = { path = "../common/ecash-signer-check" }
nym-network-defaults = { path = "../common/network-defaults" }
nym-task = { path = "../common/task" }
+2 -2
View File
@@ -4,7 +4,7 @@
use crate::cli::Cli;
use clap::Parser;
use nym_bin_common::bin_info_owned;
use nym_bin_common::logging::setup_tracing_logger;
use nym_bin_common::logging::setup_no_otel_logger;
use tracing::{info, trace};
mod cli;
@@ -13,7 +13,7 @@ pub(crate) mod test_result;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
let cli = Cli::parse();
trace!("args: {cli:#?}");
+2 -2
View File
@@ -32,8 +32,8 @@ humantime = { workspace = true }
humantime-serde.workspace = true
# internal
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "basic_tracing"] }
nym-config = { path = "../common/config" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
nym-config = { path = "../common/config" }
nym-ecash-time = { path = "../common/ecash-time" }
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
nym-compact-ecash = { path = "../common/nym_offline_compact_ecash" }
+2 -2
View File
@@ -8,7 +8,7 @@
use crate::cli::Cli;
use clap::{Parser, crate_name, crate_version};
use nym_bin_common::logging::{maybe_print_banner, setup_tracing_logger};
use nym_bin_common::logging::{maybe_print_banner, setup_no_otel_logger};
use nym_network_defaults::setup_env;
pub mod cli;
@@ -25,7 +25,7 @@ async fn main() -> anyhow::Result<()> {
let args = Cli::parse();
setup_env(args.config_env_file.as_ref());
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
if !args.no_banner {
maybe_print_banner(crate_name!(), crate_version!());
-9395
View File
File diff suppressed because it is too large Load Diff
@@ -13,5 +13,5 @@ clap = { version = "4.0", features = ["derive"] }
log = "0.4"
serde_json = "1.0.0"
nym-bin-common = { path = "../../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../../common/bin-common" }
nym-store-cipher = { path = "../../common/store-cipher" }
@@ -8,7 +8,7 @@
use anyhow::{anyhow, Result};
use clap::Parser;
use nym_bin_common::logging::setup_tracing_logger;
use nym_bin_common::logging::setup_no_otel_logger;
use nym_store_cipher::{
Aes256Gcm, Algorithm, EncryptedData, KdfInfo, Params, StoreCipher, Version, ARGON2_SALT_SIZE,
CURRENT_VERSION,
@@ -52,7 +52,7 @@ enum ParseMode {
}
fn main() -> Result<()> {
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
let args = Args::parse();
let file = File::open(args.file)?;
let parse = if args.raw {
+1 -1
View File
@@ -13,7 +13,7 @@ crate-type = ["cdylib"]
tokio = { workspace = true, features = ["full"] }
# Nym clients, addressing, packet format, common tools (logging), ffi shared
nym-sdk = { path = "../../rust/nym-sdk/" }
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../../../common/bin-common" }
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
nym-ffi-shared = { path = "../shared" }
lazy_static = { workspace = true }
+1 -1
View File
@@ -12,7 +12,7 @@ use crate::types::types::{CMessageCallback, CStringCallback, ReceivedMessage, St
#[no_mangle]
pub extern "C" fn init_logging() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
}
#[no_mangle]
+1 -1
View File
@@ -14,7 +14,7 @@ uniffi = { workspace = true, features = ["cli"] }
# Nym clients, addressing, packet format, common tools (logging), ffi shared
nym-sdk = { path = "../../rust/nym-sdk/" }
nym-crypto = { path = "../../../common/crypto" }
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../../../common/bin-common" }
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
nym-ffi-shared = { path = "../shared" }
# Async runtime
+1 -1
View File
@@ -36,7 +36,7 @@ enum GoWrapError {
#[no_mangle]
fn init_logging() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
}
#[no_mangle]
+25 -6
View File
@@ -43,9 +43,7 @@ nym-socks5-requests = { path = "../../../common/socks5/requests" }
nym-ordered-buffer = { path = "../../../common/socks5/ordered-buffer" }
nym-service-providers-common = { path = "../../../service-providers/common" }
nym-sphinx-addressing = { path = "../../../common/nymsphinx/addressing" }
nym-bin-common = { path = "../../../common/bin-common", features = [
"basic_tracing",
] }
nym-bin-common = { path = "../../../common/bin-common" }
bytecodec = { workspace = true }
httpcodec = { workspace = true }
bytes = { workspace = true }
@@ -71,10 +69,20 @@ tokio-util.workspace = true
uuid = { workspace = true, features = ["v4", "serde"] }
bincode = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }
# tracing.workspace = true
# tracing-subscriber = { workspace = true, features = ["env-filter"] }
dirs.workspace = true
opentelemetry = { workspace = true, optional = true}
opentelemetry_sdk = { workspace = true, optional = true }
tracing = { workspace = true, features = ["std"] }
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
tracing-core = { workspace = true }
tracing-opentelemetry = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true, optional = true }
opentelemetry-semantic-conventions = { workspace = true, optional = true }
[dev-dependencies]
anyhow = { workspace = true }
dotenvy = { workspace = true }
@@ -82,7 +90,7 @@ reqwest = { workspace = true, features = ["json", "socks"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
time = { workspace = true }
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../../../common/bin-common" }
# extra dependencies for libp2p examples
#libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub" ]}
@@ -93,3 +101,14 @@ hex = { workspace = true }
[features]
libp2p-vanilla = []
otel = [
"nym-bin-common/otel",
"nym-gateway-requests/otel",
"nym-socks5-client-core/otel",
"nym-client-core/otel",
"opentelemetry",
"opentelemetry_sdk",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"tracing-opentelemetry",
]
+1 -1
View File
@@ -6,7 +6,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// right now, only sandbox has coconut setup
// this should be run from the `sdk/rust/nym-sdk` directory
setup_env(Some("../../../envs/sandbox.env"));
+1 -1
View File
@@ -3,7 +3,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Create client builder, including ephemeral keys. The builder can be usable in the context
// where you don't want to connect just yet.
@@ -4,7 +4,7 @@ use std::path::PathBuf;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Specify some config options
let config_dir = PathBuf::from("/tmp/mixnet-client");
+1 -1
View File
@@ -3,7 +3,7 @@
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
todo!()
}
+1 -1
View File
@@ -12,7 +12,7 @@ use tokio::signal::ctrl_c;
// Run with: cargo run --example client_pool -- ../../../envs/<NETWORK>.env
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to setup logging");
setup_env(std::env::args().nth(1));
let conn_pool = ClientPool::new(2); // Start the Client Pool with 2 Clients always being kept in reserve
@@ -1,102 +1,100 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use futures::StreamExt;
use nym_sdk::mixnet;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_topology::provider_trait::{async_trait, ToTopologyMetadata, TopologyProvider};
use nym_topology::{EpochRewardedSet, NymTopology};
use nym_validator_client::nym_api::NymApiClientExt;
use url::Url;
struct MyTopologyProvider {
validator_client: nym_http_api_client::Client,
}
impl MyTopologyProvider {
fn new(nym_api_url: Url) -> MyTopologyProvider {
let validator_client = nym_http_api_client::Client::builder(nym_api_url)
.expect("Failed to create API client builder")
.build()
.expect("Failed to build API client");
MyTopologyProvider { validator_client }
}
async fn get_topology(&self) -> NymTopology {
let rewarded_set = self
.validator_client
.get_current_rewarded_set()
.await
.unwrap();
let mixnodes_response = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes_with_metadata()
.await
.unwrap();
let metadata = mixnodes_response.metadata.to_topology_metadata();
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
let mut base_topology = NymTopology::new(metadata, epoch_rewarded_set, Vec::new());
// in our topology provider only use mixnodes that have node_id divisible by 3
// and has exactly 100 performance score
// why? because this is just an example to showcase arbitrary uses and capabilities of this trait
let filtered_mixnodes = mixnodes_response
.nodes
.into_iter()
.filter(|mix| mix.node_id % 3 == 0 && mix.performance.is_hundred())
.collect::<Vec<_>>();
let gateways = self
.validator_client
.get_all_basic_entry_assigned_nodes_with_metadata()
.await
.unwrap()
.nodes;
base_topology.add_skimmed_nodes(&filtered_mixnodes);
base_topology.add_skimmed_nodes(&gateways);
base_topology
}
}
#[async_trait]
impl TopologyProvider for MyTopologyProvider {
// this will be manually refreshed on a timer specified inside mixnet client config
async fn get_new_topology(&mut self) -> Option<NymTopology> {
Some(self.get_topology().await)
}
}
use nym_topology::{
CachedEpochRewardedSet, EntryDetails, HardcodedTopologyProvider, NymTopology,
NymTopologyMetadata, RoutingNode, SupportedRoles,
};
use std::time::Duration;
use time::OffsetDateTime;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to setup logging");
let nym_api = "https://validator.nymtech.net/api/".parse().unwrap();
let my_topology_provider = MyTopologyProvider::new(nym_api);
// let nym_api = "https://validator.nymtech.net/api/".parse().unwrap();
// let my_topology_provider = MyTopologyProvider::new(nym_api);
let topology_metadata = NymTopologyMetadata::new(0, 0, OffsetDateTime::now_utc());
let mut rewarded_set = CachedEpochRewardedSet::default();
rewarded_set.entry_gateways.insert(1);
rewarded_set.layer1.insert(1);
rewarded_set.layer2.insert(1);
rewarded_set.layer3.insert(1);
let nodes = vec![RoutingNode {
node_id: 1,
mix_host: "127.0.0.1:1789".parse().unwrap(),
entry: Some(EntryDetails {
ip_addresses: vec!["127.0.0.1".parse().unwrap()],
clients_ws_port: 9000,
hostname: None,
clients_wss_port: None,
}),
identity_key: "PUT IDENTITY KEY HERE".parse().unwrap(),
sphinx_key: "PUT SPHINX KEY HERE".parse().unwrap(),
supported_roles: SupportedRoles {
mixnode: true,
mixnet_entry: true,
mixnet_exit: true,
},
}];
let topology_provider =
HardcodedTopologyProvider::new(NymTopology::new(topology_metadata, rewarded_set, nodes));
// Passing no config makes the client fire up an ephemeral session and figure things out on its own
let mut client = mixnet::MixnetClientBuilder::new_ephemeral()
.custom_topology_provider(Box::new(my_topology_provider))
.custom_topology_provider(Box::new(topology_provider))
.build()
.unwrap()
.connect_to_mixnet()
.await
.unwrap();
let our_address = client.nym_address();
let our_address = *client.nym_address();
println!("Our client nym address is: {our_address}");
// Send a message through the mixnet to ourselves
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
let sender = client.split_sender();
println!("Waiting for message (ctrl-c to exit)");
client
.on_messages(|msg| println!("Received: {}", String::from_utf8_lossy(&msg.message)))
.await;
const MAX_MESSAGES: usize = 100;
// receiving task
let receiving_task_handle = tokio::spawn(async move {
let mut received_count = 0;
while let Some(received) = client.next().await {
received_count += 1;
println!(
"{received_count}: received: {}",
String::from_utf8_lossy(&received.message)
);
if received_count >= MAX_MESSAGES {
break;
}
}
client.disconnect().await;
});
// sending task
let sending_task_handle = tokio::spawn(async move {
loop {
if sender
.send_plain_message(our_address, "hello there")
.await
.is_err()
{
break;
}
sleep(Duration::from_millis(100)).await;
}
});
// wait for both tasks to be done
println!("waiting for shutdown");
sending_task_handle.await.unwrap();
receiving_task_handle.await.unwrap();
}
@@ -0,0 +1,118 @@
use nym_sdk::mixnet::{
AnonymousSenderTag, MixnetClientBuilder, MixnetMessageSender, ReconstructedMessage,
};
use nym_topology::{
CachedEpochRewardedSet, EntryDetails, HardcodedTopologyProvider, NymTopology,
NymTopologyMetadata, RoutingNode, SupportedRoles,
};
#[cfg(feature = "otel")]
use opentelemetry::trace::{TraceContextExt, Tracer};
#[cfg(feature = "otel")]
use opentelemetry::{global, Context};
use time::OffsetDateTime;
use tracing::instrument;
use tracing::warn;
#[tokio::main]
#[instrument(name = "sdk-example-surb-reply", skip_all)]
async fn main() {
#[cfg(feature = "otel")]
{
nym_bin_common::opentelemetry::setup_tracing_logger(
"local-sdk-example-surb-reply".to_string(),
)
.unwrap();
let tracer = global::tracer("local-sdk-example-surb-reply");
let span = tracer.start("client-root-span");
let cx = Context::current_with_span(span);
let _guard = cx.clone().attach();
let trace_id = cx.span().span_context().trace_id();
warn!("Main TRACE_ID: {:?}", trace_id);
}
#[cfg(not(feature = "otel"))]
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Create a mixnet client which connect to a local node
let topology_metadata = NymTopologyMetadata::new(0, 0, OffsetDateTime::now_utc());
let mut rewarded_set = CachedEpochRewardedSet::default();
rewarded_set.entry_gateways.insert(1);
rewarded_set.layer1.insert(1);
rewarded_set.layer2.insert(1);
rewarded_set.layer3.insert(1);
let nodes = vec![RoutingNode {
node_id: 1,
mix_host: "127.0.0.1:1789".parse().unwrap(),
entry: Some(EntryDetails {
ip_addresses: vec!["127.0.0.1".parse().unwrap()],
clients_ws_port: 9000,
hostname: None,
clients_wss_port: None,
}),
identity_key: "put here identity_key".parse().unwrap(),
sphinx_key: "put here sphinx_key".parse().unwrap(),
supported_roles: SupportedRoles {
mixnode: true,
mixnet_entry: true,
mixnet_exit: true,
},
}];
let topology_provider =
HardcodedTopologyProvider::new(NymTopology::new(topology_metadata, rewarded_set, nodes));
let mut client = MixnetClientBuilder::new_ephemeral()
.custom_topology_provider(Box::new(topology_provider))
.build()
.unwrap()
.connect_to_mixnet()
.await
.unwrap();
let our_address = client.nym_address();
println!("\nOur client nym address is: {our_address}");
// Send a message through the mixnet to ourselves using our nym address
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
// we're going to parse the sender_tag (AnonymousSenderTag) from the incoming message and use it to 'reply' to ourselves instead of our Nym address.
// we know there will be a sender_tag since the sdk sends SURBs along with messages by default.
println!("Waiting for message\n");
// get the actual message - discard the empty vec sent along with a potential SURB topup request
let mut message: Vec<ReconstructedMessage> = Vec::new();
while let Some(new_message) = client.wait_for_messages().await {
if new_message.is_empty() {
continue;
}
message = new_message;
break;
}
let mut parsed = String::new();
if let Some(r) = message.first() {
parsed = String::from_utf8(r.message.clone()).unwrap();
}
// parse sender_tag: we will use this to reply to sender without needing their Nym address
let return_recipient: AnonymousSenderTag = message[0].sender_tag.unwrap();
println!(
"\nReceived the following message: {parsed} \nfrom sender with surb bucket {return_recipient}"
);
// reply to self with it: note we use `send_str_reply` instead of `send_str`
println!("Replying with using SURBs");
client
.send_reply(return_recipient, "hi an0n!")
.await
.unwrap();
println!("Waiting for message (once you see it, ctrl-c to exit)\n");
client
.on_messages(|msg| println!("\nReceived: {}", String::from_utf8_lossy(&msg.message)))
.await;
}
@@ -8,7 +8,7 @@ use nym_topology::provider_trait::async_trait;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Just some plain data to pretend we have some external storage that the application
// implementer is using.
@@ -7,7 +7,7 @@ use nym_topology::{NymTopology, NymTopologyMetadata, RoutingNode, SupportedRoles
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Passing no config makes the client fire up an ephemeral session and figure shit out on its own
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
@@ -7,7 +7,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Passing no config makes the client fire up an ephemeral session and figure stuff out on its own
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
+1 -1
View File
@@ -6,7 +6,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
// An example of creating a client relying on a testnet, in this case Sandbox.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// relative root is `sdk/rust/nym-sdk/` for fallback file path
let env_path =
std::env::var("NYM_ENV_PATH").unwrap_or_else(|_| "../../../envs/sandbox.env".to_string());
+1 -1
View File
@@ -3,7 +3,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Passing no config makes the client fire up an ephemeral session and figure shit out on its own
// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
+1 -1
View File
@@ -2,7 +2,7 @@ use nym_sdk::mixnet;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
println!("Connecting receiver");
let mut receiving_client = mixnet::MixnetClient::connect_new().await.unwrap();

Some files were not shown because too many files have changed in this diff Show More