Compare commits

...

64 Commits

Author SHA1 Message Date
Floriane TUERNAL SABOTINOV a19a6e3348 cleanup final 2025-10-21 13:10:10 +02:00
Floriane TUERNAL SABOTINOV e9135d54fc cleanup and PR ready 2025-10-20 11:22:42 +02:00
Floriane TUERNAL SABOTINOV 32f1ebebf9 refactor init tracer and use of tracing_opentelemetry span for async tracing 2025-10-17 11:27:45 +02:00
Floriane TUERNAL SABOTINOV 12f7f7b669 featurized otel 2025-10-16 13:53:52 +02:00
Floriane TUERNAL SABOTINOV f3f75ae751 cleanup and add guard to keep the tracer provider alive 2025-10-14 09:33:54 +02:00
Floriane TUERNAL SABOTINOV 20c1717a65 continue trace)id propagation into sphinx 2025-10-08 16:14:05 +02:00
Floriane TUERNAL SABOTINOV 6ae78d9f4d add trace_id to sphinx packet 2025-10-07 16:57:02 +02:00
Floriane TUERNAL SABOTINOV afb2467afc otel adaptation for sphinx instrumentation 2025-10-02 15:54:46 +02:00
Floriane TUERNAL SABOTINOV 44a7df511a traceparent injection into headers 2025-10-01 10:35:21 +02:00
Floriane TUERNAL SABOTINOV c89ca8075f add traceparent to http headers 2025-09-30 12:30:00 +02:00
Floriane TUERNAL SABOTINOV 5f9f4fb1ab websocket otel trace handling correction 2025-09-29 16:31:16 +02:00
Floriane TUERNAL SABOTINOV 75e146c301 wip 2025-09-26 13:33:47 +02:00
Floriane TUERNAL SABOTINOV 2ed1a0000b wip 2025-09-26 13:33:47 +02:00
Floriane TUERNAL SABOTINOV ee138fe751 add instrument to sdk send message 2025-09-26 13:33:47 +02:00
Floriane TUERNAL SABOTINOV 6b988e591d wip 2025-09-26 13:33:47 +02:00
Floriane TUERNAL SABOTINOV 6cc12a4aad debug node 2025-09-26 13:33:10 +02:00
Floriane TUERNAL SABOTINOV e1398f4422 add manual spans to handle_request functions 2025-09-26 13:27:57 +02:00
Floriane TUERNAL SABOTINOV 20b0be351d cleanup 2025-09-26 13:25:56 +02:00
Floriane TUERNAL SABOTINOV af824b7043 use otel span to parent tracing span 2025-09-26 13:23:58 +02:00
Floriane TUERNAL SABOTINOV d32332a82e add context to message handling 2025-09-26 13:22:21 +02:00
Floriane TUERNAL SABOTINOV 5e8f402c5d change context propagation method to trace_id sharing to go accros async move barrier 2025-09-26 13:22:19 +02:00
Floriane TUERNAL SABOTINOV ad4ae0d00d wip 2025-09-26 13:18:13 +02:00
Floriane TUERNAL SABOTINOV aed16fcacd wip 2025-09-26 13:17:24 +02:00
Floriane TUERNAL SABOTINOV 14f983c481 wip 2025-09-26 13:17:22 +02:00
Floriane TUERNAL SABOTINOV 587e2f4b53 wip 2025-09-26 13:16:43 +02:00
Floriane TUERNAL SABOTINOV 4d8348b304 context extraction 2025-09-26 13:15:44 +02:00
Floriane TUERNAL SABOTINOV 750df1e0bd parent/chil propagation explicitation 2025-09-26 13:13:07 +02:00
Floriane TUERNAL SABOTINOV 94614e9d90 try debug panic when using parent span 2025-09-26 13:13:06 +02:00
Floriane TUERNAL SABOTINOV 002055cb9f rm clone because panic 2025-09-26 13:10:14 +02:00
Floriane TUERNAL SABOTINOV a117d9089b propagation to parent function 2025-09-26 13:10:12 +02:00
Floriane TUERNAL SABOTINOV ed8ca4e4ea wip 2025-09-26 13:09:19 +02:00
Floriane TUERNAL SABOTINOV 587f6bf8a8 add method to extract trace_id from ContextCarrier 2025-09-26 13:09:17 +02:00
Floriane TUERNAL SABOTINOV 55a7f67407 use attach() instead of creating new span 2025-09-26 13:08:19 +02:00
Floriane TUERNAL SABOTINOV e5c88e30b4 forget to enter span 2025-09-26 13:08:19 +02:00
Floriane TUERNAL SABOTINOV dd4945e269 verify existence of context 2025-09-26 13:08:19 +02:00
Floriane TUERNAL SABOTINOV 0adeb6e677 use authenticatev2 and not authenticate 2025-09-26 13:08:19 +02:00
Floriane TUERNAL SABOTINOV b7b399d71c manual propagators for context from client to node 2025-09-26 13:08:16 +02:00
Floriane TUERNAL SABOTINOV 65b5c258e8 wip 2025-09-26 13:05:33 +02:00
Floriane TUERNAL SABOTINOV da1463924f experiment do I preserve trace_id with with method 2025-09-26 13:05:32 +02:00
Floriane TUERNAL SABOTINOV 749ceec727 try debug with otel specific tools 2025-09-26 13:05:32 +02:00
Floriane TUERNAL SABOTINOV c639a7e1c1 print debug trace 2025-09-26 13:05:31 +02:00
Floriane TUERNAL SABOTINOV e0876a6238 establishing initial authentication common trace_id 2025-09-26 13:04:23 +02:00
Floriane TUERNAL SABOTINOV 89f5ab11a5 context propagation helpers 2025-09-26 13:01:43 +02:00
Floriane TUERNAL SABOTINOV 4063071e08 Revert "broken: context won't satisfy spawn async move"
This reverts commit 2b11479ad4801e1efa8ab0aca4ca577bd3f195fe.
2025-09-26 13:01:42 +02:00
Floriane TUERNAL SABOTINOV 99a0f4ea30 broken: context won't satisfy spawn async move 2025-09-26 12:59:24 +02:00
Floriane TUERNAL SABOTINOV dfa969d754 try debug fresh handler context 2025-09-26 12:50:06 +02:00
Floriane TUERNAL SABOTINOV 428dd0543f debug surb_reply client 2025-09-26 12:50:06 +02:00
Floriane TUERNAL SABOTINOV b90088148c try debug client gets stuck 2025-09-26 12:50:06 +02:00
Floriane TUERNAL SABOTINOV 08a45c188d try keep context accross async call 2025-09-26 12:50:05 +02:00
Floriane TUERNAL SABOTINOV 806e8629fe instrumentation of nym-node to nym-gateway workflow 2025-09-26 12:48:27 +02:00
Floriane TUERNAL SABOTINOV c22bb99da6 fixed trace_id distibuted from sdk to gateway 2025-09-26 12:45:08 +02:00
Floriane TUERNAL SABOTINOV 91ee54e5a7 fix trace_id propagation from sdk to mix-node 2025-09-26 12:45:08 +02:00
Floriane TUERNAL SABOTINOV 572fc331b0 try debug difference trace_id otel by use async for gateway and attach it to span in surb_reply 2025-09-26 12:45:07 +02:00
Floriane TUERNAL SABOTINOV c5590bddc5 add context propagation 2025-09-26 12:44:04 +02:00
Floriane TUERNAL SABOTINOV c9aed048d0 change surb_reply to test relation with nym-node + basic instrumentation 2025-09-26 12:43:08 +02:00
Floriane TUERNAL SABOTINOV b8f7d54f18 fix signoz ingestion 2025-09-26 12:39:12 +02:00
Floriane TUERNAL SABOTINOV c04e173ee6 configure format for sending logs to signoz 2025-09-26 12:39:12 +02:00
Floriane TUERNAL SABOTINOV 34143f1d58 setup otel for surb_reply 2025-09-26 12:39:12 +02:00
Floriane TUERNAL SABOTINOV 47b88737e6 fix otel panic 2025-09-26 12:39:12 +02:00
Floriane TUERNAL SABOTINOV 76ddef285c setup otel from run to forward_sphinx_packet 2025-09-26 12:39:08 +02:00
Floriane TUERNAL SABOTINOV 89fe17f19c revert changes from previous commit 2025-09-26 12:22:54 +02:00
Mark Sinclair 57d2824521 wip: sdk surb-reply example add otel tracing 2025-09-26 12:22:54 +02:00
Mark Sinclair b8058dd7ba wip: tracing in nym-node 2025-09-26 12:22:51 +02:00
Mark Sinclair 95d08c7f21 add opentelemetry for debugging and testing behind otel feature flag 2025-09-26 12:16:10 +02:00
126 changed files with 2814 additions and 10777 deletions
Generated
+888 -936
View File
File diff suppressed because it is too large Load Diff
+8 -3
View File
@@ -292,8 +292,11 @@ nix = "0.27.1"
notify = "5.1.0"
okapi = "0.7.0"
once_cell = "1.21.3"
opentelemetry = "0.19.0"
opentelemetry-jaeger = "0.18.0"
opentelemetry = "0.30.0"
opentelemetry-otlp = "0.30.0"
opentelemetry-semantic-conventions = "0.30.0"
opentelemetry_sdk = "0.30.0"
opentelemetry-stdout = "0.30.0"
parking_lot = "0.12.3"
pem = "0.8"
petgraph = "0.6.5"
@@ -351,8 +354,10 @@ toml = "0.8.22"
tower = "0.5.2"
tower-http = "0.5.2"
tracing = "0.1.41"
tracing-core = "0.1.33"
tracing-log = "0.2"
tracing-opentelemetry = "0.19.0"
tracing-opentelemetry = "0.31.0"
tracing-serde = "0.2.0"
tracing-subscriber = "0.3.19"
tracing-tree = "0.2.2"
tracing-indicatif = "0.3.9"
-1
View File
@@ -46,7 +46,6 @@ nym-bandwidth-controller = { path = "../../common/bandwidth-controller" }
nym-bin-common = { path = "../../common/bin-common", features = [
"output_format",
"clap",
"basic_tracing",
] }
nym-client-core = { path = "../../common/client-core", features = [
"fs-credentials-storage",
+2 -2
View File
@@ -4,7 +4,7 @@
use std::error::Error;
use clap::{crate_name, crate_version, Parser};
use nym_bin_common::logging::{maybe_print_banner, setup_tracing_logger};
use nym_bin_common::logging::{maybe_print_banner, setup_no_otel_logger};
use nym_network_defaults::setup_env;
pub mod client;
@@ -20,7 +20,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
if !args.no_banner {
maybe_print_banner(crate_name!(), crate_version!());
}
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
if let Err(err) = commands::execute(args).await {
log::error!("{err}");
+2 -2
View File
@@ -184,7 +184,7 @@ impl Handler {
});
// the ack control is now responsible for chunking, etc.
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type, None);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send message to the input buffer: {err}");
@@ -217,7 +217,7 @@ impl Handler {
});
let input_msg =
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type, None);
if let Err(err) = self.msg_input.send(input_msg).await {
if !self.shutdown_token.is_cancelled() {
error!("Failed to send anonymous message to the input buffer: {err}");
-1
View File
@@ -27,7 +27,6 @@ zeroize = { workspace = true }
nym-bin-common = { path = "../../common/bin-common", features = [
"output_format",
"clap",
"basic_tracing",
] }
nym-client-core = { path = "../../common/client-core", features = [
"fs-credentials-storage",
+2 -2
View File
@@ -4,7 +4,7 @@
use std::error::Error;
use clap::{crate_name, crate_version, Parser};
use nym_bin_common::logging::{maybe_print_banner, setup_tracing_logger};
use nym_bin_common::logging::{maybe_print_banner, setup_no_otel_logger};
use nym_network_defaults::setup_env;
mod commands;
@@ -19,7 +19,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
if !args.no_banner {
maybe_print_banner(crate_name!(), crate_version!());
}
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
if let Err(err) = commands::execute(args).await {
log::error!("{err}");
+24 -14
View File
@@ -8,24 +8,30 @@ license = { workspace = true }
repository = { workspace = true }
[dependencies]
chrono = { workspace = true, optional = true }
cfg-if = { workspace = true }
clap = { workspace = true, features = ["derive"], optional = true }
clap_complete = { workspace = true, optional = true }
clap_complete_fig = { workspace = true, optional = true }
const-str = { workspace = true }
log = { workspace = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true,features=["metrics", "grpc-tonic", "tls",
"tls-webpki-roots"], optional = true }
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"], optional = true }
opentelemetry-stdout = { workspace = true, features = ["trace", "metrics"], optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
schemars = { workspace = true, features = ["preserve_order"], optional = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, optional = true }
## tracing
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
tracing-tree = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
opentelemetry-jaeger = { workspace = true, features = ["rt-tokio", "collector_client", "isahc_collector_client"], optional = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tracing-core = { workspace = true }
tracing-opentelemetry = { workspace = true, optional = true }
tracing-serde = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
tracing-tree = { workspace = true }
utoipa = { workspace = true, optional = true }
opentelemetry = { workspace = true, features = ["rt-tokio"], optional = true }
[build-dependencies]
vergen = { workspace = true, features = ["build", "git", "gitcl", "rustc", "cargo"] }
@@ -35,13 +41,17 @@ default = []
openapi = ["utoipa"]
output_format = ["serde_json", "dep:clap"]
bin_info_schema = ["schemars"]
basic_tracing = ["dep:tracing", "tracing-subscriber"]
tracing = [
"basic_tracing",
"tracing-tree",
"opentelemetry-jaeger",
tokio-console = ["otel"]
otel = [
"chrono",
"tracing-opentelemetry",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"opentelemetry-stdout",
"opentelemetry_sdk",
"serde_json",
"rand",
]
clap = ["dep:clap", "dep:clap_complete", "dep:clap_complete_fig"]
models = []
+3
View File
@@ -4,6 +4,9 @@
pub mod build_information;
pub mod logging;
#[cfg(feature = "otel")]
pub mod opentelemetry;
#[cfg(feature = "clap")]
pub mod completions;
+20
View File
@@ -0,0 +1,20 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#[cfg(feature = "otel")]
use opentelemetry_otlp::ExporterBuildError;
#[derive(thiserror::Error, Debug)]
pub enum TracingError {
#[error("tracing logger already initialised")]
TracingLoggerAlreadyInitialised,
#[error("Logging error: {0}")]
TracingTryInitError(tracing_subscriber::util::TryInitError),
#[cfg(feature = "otel")]
#[error("{0}")]
TracingExporterBuildError(#[from] ExporterBuildError),
#[error("{0}")]
TracingFilterParseError(#[from] tracing_subscriber::filter::ParseError),
}
+39 -47
View File
@@ -1,19 +1,12 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod error;
use error::TracingError;
use serde::{Deserialize, Serialize};
use std::io::IsTerminal;
#[cfg(feature = "tracing")]
pub use opentelemetry;
#[cfg(feature = "tracing")]
pub use opentelemetry_jaeger;
#[cfg(feature = "tracing")]
pub use tracing_opentelemetry;
#[cfg(feature = "tracing")]
pub use tracing_subscriber;
#[cfg(feature = "tracing")]
pub use tracing_tree;
use tracing_subscriber::{filter::Directive, layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Debug, Default, Copy, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
@@ -22,7 +15,6 @@ pub struct LoggingSettings {
}
// don't call init so that we could attach additional layers
#[cfg(feature = "basic_tracing")]
pub fn build_tracing_logger() -> impl tracing_subscriber::layer::SubscriberExt {
use tracing_subscriber::prelude::*;
@@ -31,7 +23,6 @@ pub fn build_tracing_logger() -> impl tracing_subscriber::layer::SubscriberExt {
.with(default_tracing_env_filter())
}
#[cfg(feature = "basic_tracing")]
pub fn default_tracing_env_filter() -> tracing_subscriber::filter::EnvFilter {
if ::std::env::var("RUST_LOG").is_ok() {
tracing_subscriber::filter::EnvFilter::from_default_env()
@@ -43,7 +34,6 @@ pub fn default_tracing_env_filter() -> tracing_subscriber::filter::EnvFilter {
}
}
#[cfg(feature = "basic_tracing")]
pub fn default_tracing_fmt_layer<S, W>(
writer: W,
) -> impl tracing_subscriber::Layer<S> + Sync + Send + 'static
@@ -63,45 +53,47 @@ where
.with_target(false)
}
#[cfg(feature = "basic_tracing")]
pub fn setup_tracing_logger() {
use tracing_subscriber::util::SubscriberInitExt;
build_tracing_logger().init()
/// Creates a tracing filter that sets more granular log levels for specific crates.
/// This allows for finer control over logging verbosity.
pub(crate) fn granual_filtered_env() -> Result<tracing_subscriber::filter::EnvFilter, TracingError>
{
fn directive_checked(directive: impl Into<String>) -> Result<Directive, TracingError> {
directive.into().parse().map_err(From::from)
}
let mut filter = default_tracing_env_filter();
// these crates are more granularly filtered
let filter_crates = ["defguard_wireguard_rs"];
for crate_name in filter_crates {
filter = filter.add_directive(directive_checked(format!("{crate_name}=warn"))?);
}
Ok(filter)
}
pub fn setup_no_otel_logger() -> Result<(), TracingError> {
// Only set up if not already initialized
if tracing::dispatcher::has_been_set() {
// It shouldn't be - this is really checking that it is torn down between async command executions
return Err(TracingError::TracingLoggerAlreadyInitialised);
}
let registry = tracing_subscriber::registry()
.with(default_tracing_fmt_layer(std::io::stderr))
.with(granual_filtered_env()?);
registry
.try_init()
.map_err(|e| TracingError::TracingTryInitError(e))?;
Ok(())
}
// TODO: This has to be a macro, running it as a function does not work for the file_appender for some reason
#[cfg(feature = "tracing")]
#[macro_export]
macro_rules! setup_tracing {
($service_name: expr) => {
use nym_bin_common::logging::tracing_subscriber::layer::SubscriberExt;
use nym_bin_common::logging::tracing_subscriber::util::SubscriberInitExt;
let registry = nym_bin_common::logging::tracing_subscriber::Registry::default()
.with(nym_bin_common::logging::tracing_subscriber::EnvFilter::from_default_env())
.with(
nym_bin_common::logging::tracing_tree::HierarchicalLayer::new(4)
.with_targets(true)
.with_bracketed_fields(true),
);
let tracer = nym_bin_common::logging::opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://44.199.230.10:14268/api/traces")
.with_service_name($service_name)
.with_isahc()
.with_trace_config(
nym_bin_common::logging::opentelemetry::sdk::trace::config().with_sampler(
nym_bin_common::logging::opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(
0.1,
),
),
)
.install_batch(nym_bin_common::logging::opentelemetry::runtime::Tokio)
.expect("Could not init tracer");
let telemetry = nym_bin_common::logging::tracing_opentelemetry::layer().with_tracer(tracer);
registry.with(telemetry).init();
setup_no_otel_logger()
};
}
@@ -0,0 +1,47 @@
use opentelemetry_sdk::trace::IdGenerator;
use opentelemetry::trace::{TraceId, SpanId};
use rand::RngCore;
#[derive(Clone, Debug)]
pub struct Compact13BytesIdGenerator;
impl IdGenerator for Compact13BytesIdGenerator {
fn new_trace_id(&self) -> TraceId {
let mut rng = rand::thread_rng();
let mut bytes = [0u8; 16];
// Fill the first 13 bytes with random data
rng.fill_bytes(&mut bytes[0..12]);
// Set the last 4 bytes to zero
bytes[12] = 0;
bytes[13] = 0;
bytes[14] = 0;
bytes[15] = 0;
TraceId::from_bytes(bytes)
}
fn new_span_id(&self) -> SpanId {
let mut rng = rand::thread_rng();
let mut bytes = [0u8; 8];
rng.fill_bytes(&mut bytes);
SpanId::from_bytes(bytes)
}
}
pub fn compress_trace_id(trace_id: &TraceId) -> [u8; 12] {
let bytes = trace_id.to_bytes();
let mut compressed = [0u8; 12];
compressed.copy_from_slice(&bytes[0..12]);
compressed
}
pub fn decompress_trace_id(compressed: &[u8; 12]) -> [u8; 16] {
let mut bytes = [0u8; 16];
bytes[0..12].copy_from_slice(compressed);
bytes[12..].copy_from_slice(&[0u8; 4]);
bytes
}
@@ -0,0 +1,156 @@
use opentelemetry::{Context, TraceFlags};
use opentelemetry::propagation::{Injector, Extractor, TextMapPropagator};
use opentelemetry::trace::{SpanContext, TraceContextExt, TraceId};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::IdGenerator};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use std::collections::HashMap;
use std::fmt::Display;
use tracing::instrument;
/// Make a Carrier for context propagation
pub struct ContextCarrier {
data: HashMap<String, String>,
}
impl ContextCarrier {
pub fn new_empty() -> Self {
ContextCarrier {
data: HashMap::new(),
}
}
pub fn new_with_data(data: HashMap<String, String>) -> Self {
if data.is_empty() {
return ContextCarrier::new_empty();
}
ContextCarrier { data }
}
pub fn new_with_current_context(context: Context) -> Self {
let mut carrier = ContextCarrier::new_empty();
let propagator = TraceContextPropagator::new();
propagator.inject_context(&context, &mut carrier);
carrier
}
pub fn iter(&self) -> impl Iterator<Item = (&String, &String)> {
self.data.iter()
}
pub fn from_map(data: HashMap<String, String>) -> Self {
ContextCarrier { data }
}
pub fn into_map(self) -> HashMap<String, String> {
self.data
}
pub fn extract_trace_id(&self) -> Option<TraceId> {
self.get("traceparent").and_then(|tp| {
let parts: Vec<&str> = tp.split('-').collect();
if parts.len() == 4 {
TraceId::from_hex(parts[1]).ok()
} else {
None
}
})
}
pub fn extract_trace_id_into_bytes(&self) -> Option<[u8; 16]> {
self.extract_trace_id().map(|id| id.to_bytes())
}
pub fn extract_traceparent(&self) -> Option<String> {
self.get("traceparent").map(|s| s.to_string())
}
}
impl Injector for ContextCarrier {
fn set(&mut self, key: &str, value: String) {
self.data.insert(key.to_string(), value);
}
}
impl Extractor for ContextCarrier {
fn get(&self, key: &str) -> Option<&str> {
self.data.get(key).map(|s| s.as_str())
}
fn keys(&self) -> Vec<&str> {
self.data.keys().map(|k| k.as_str()).collect()
}
}
impl Display for ContextCarrier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.data)
}
}
pub struct ManualContextPropagator {
pub root_span: tracing::Span,
pub trace_id: TraceId,
}
impl ManualContextPropagator {
#[instrument(skip_all, level = "debug")]
pub fn new(name: &str, context: HashMap<String, String>) -> Self {
let carrier = ContextCarrier::new_with_data(context);
let trace_id = match carrier.extract_trace_id() {
Some(id) => id,
None => Context::current().span().span_context().trace_id(),
};
let root_span_builder = new_span_context_with_id(trace_id.clone());
let root_span = tracing::info_span!("trace_root", name = %name, trace_id = %trace_id);
root_span.set_parent(root_span_builder);
ManualContextPropagator {
root_span,
trace_id,
}
}
#[instrument(skip_all, level = "debug")]
pub fn new_from_tid(name: &str, trace_id: TraceId) -> Self {
let root_span_builder = new_span_context_with_id(trace_id.clone());
let root_span = tracing::info_span!("trace_root", name = %name, trace_id = %trace_id);
root_span.set_parent(root_span_builder);
ManualContextPropagator {
root_span,
trace_id,
}
}
pub fn root_span(&self) -> &tracing::Span {
&self.root_span
}
}
#[instrument(skip_all, level = "debug")]
pub fn new_span_context_with_id(trace_id: TraceId) -> Context {
let id_gen = opentelemetry_sdk::trace::RandomIdGenerator::default();
let span_id = id_gen.new_span_id();
let span_context = SpanContext::new(
trace_id,
span_id,
TraceFlags::SAMPLED,
true,
Default::default(),
);
Context::current().with_remote_span_context(span_context)
}
#[instrument(skip_all, level = "debug")]
pub fn extract_trace_id_from_tracing_cx() -> TraceId {
let cx = tracing::Span::current().context();
let binding = cx.span();
let trace_id = binding.span_context().trace_id();
trace_id
}
+308
View File
@@ -0,0 +1,308 @@
pub mod context;
pub mod compact_id_generator;
mod trace_id_format;
use tracing::{info, Level};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::fmt;
use crate::logging::default_tracing_env_filter;
use crate::logging::error::TracingError;
use crate::opentelemetry::compact_id_generator::Compact13BytesIdGenerator;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig;
use opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::{trace::Sampler, Resource};
use opentelemetry_semantic_conventions::resource::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_VERSION};
use opentelemetry_semantic_conventions::SCHEMA_URL;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::fmt::format::FmtSpan;
pub struct TracerProviderGuard(Option<SdkTracerProvider>);
impl Drop for TracerProviderGuard {
fn drop(&mut self) {
if let Some(tracer_provider) = self.0.take() {
// Ensure all spans are flushed before exit
if let Err(e) = tracer_provider.shutdown() {
eprintln!("Error shutting down tracer provider: {:?}", e);
}
}
}
}
pub(crate) fn granual_filtered_env() -> Result<tracing_subscriber::filter::EnvFilter, TracingError>
{
fn directive_checked(directive: impl Into<String>) -> Result<Directive, TracingError> {
directive.into().parse().map_err(From::from)
}
let mut filter = default_tracing_env_filter();
// these crates are more granularly filtered
let filter_crates = ["defguard_wireguard_rs"];
for crate_name in filter_crates {
filter = filter.add_directive(directive_checked(format!("{crate_name}=warn"))?);
}
Ok(filter)
}
pub fn setup_tracing_logger(service_name: String) -> Result<TracerProviderGuard, TracingError> {
if tracing::dispatcher::has_been_set() {
// It shouldn't be - this is really checking that it is torn down between async command executions
return Err(TracingError::TracingLoggerAlreadyInitialised);
}
// define ingestion points
let endpoint = std::env::var("SIGNOZ_ENDPOINT").expect("SIGNOZ_ENDPOINT not set");
let key = std::env::var("SIGNOZ_INGESTION_KEY").expect("SIGNOZ_INGESTION_KEY not set");
let mut metadata = MetadataMap::new();
metadata.insert(
"signoz-ingestion-key",
key.parse().expect("Could not parse signoz ingestion key"),
);
// Build resources
let resource = build_resource(&service_name);
// Initialize tracer and meter providers
let tracer_provider = init_tracer_provider(&endpoint, metadata.clone(), resource.clone())?;
let meter_provider = init_meter_provider(&endpoint, metadata.clone(), resource.clone())?;
// Bridge tracing and opentelemetry
let tracer = tracer_provider.tracer("otel-subscriber");
let fmt_layer = fmt::layer()
.json()
.with_writer(std::io::stderr)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_span_list(false)
.with_current_span(true)
.event_format(trace_id_format::TraceIdFormat);
let registry = tracing_subscriber::registry()
.with(fmt_layer)
.with(granual_filtered_env()?)
.with(tracing_subscriber::filter::LevelFilter::from_level(Level::INFO))
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer));
registry.try_init().map_err(TracingError::TracingTryInitError)?;
global::set_tracer_provider(tracer_provider.clone());
global::set_meter_provider(meter_provider.clone());
info!("Tracing initialized with service name: {}", service_name);
Ok(TracerProviderGuard(Some(tracer_provider)))
}
fn build_resource(service_name: &str) -> Resource {
Resource::builder()
.with_service_name(service_name.to_string())
.with_schema_url(
[
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
],
SCHEMA_URL,
)
.build()
}
fn init_tracer_provider(
endpoint: &str,
metadata: MetadataMap,
resource: Resource,
) -> Result<SdkTracerProvider, TracingError> {
let mut exporter_builder = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_metadata(metadata)
.with_endpoint(endpoint);
if endpoint.starts_with("https://") {
exporter_builder =
exporter_builder.with_tls_config(ClientTlsConfig::new().with_enabled_roots());
}
let exporter = exporter_builder.build()?;
let tracer = SdkTracerProvider::builder()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
1.0,
))))
.with_id_generator(Compact13BytesIdGenerator)
.with_resource(resource)
.with_batch_exporter(exporter)
.build();
global::set_tracer_provider(tracer.clone());
Ok(tracer)
}
fn init_meter_provider(
endpoint: &str,
metadata: MetadataMap,
resource: Resource,
) -> Result<SdkMeterProvider, TracingError> {
let mut exporter_builder = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_metadata(metadata)
.with_endpoint(endpoint)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default());
if endpoint.starts_with("https://") {
exporter_builder = exporter_builder.with_tls_config(ClientTlsConfig::new().with_enabled_roots());
}
let exporter = exporter_builder.build()?;
let reader = PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(30))
.build();
let stdout_reader =
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
let meter_provider = MeterProviderBuilder::default()
.with_resource(resource)
.with_reader(reader)
.with_reader(stdout_reader)
.build();
global::set_meter_provider(meter_provider.clone());
Ok(meter_provider)
}
// pub fn setup_tracing_logger(service_name: String) -> Result<(), TracingError> {
// if tracing::dispatcher::has_been_set() {
// // It shouldn't be - this is really checking that it is torn down between async command executions
// return Err(TracingError::TracingLoggerAlreadyInitialised);
// }
// let key =
// std::env::var("SIGNOZ_INGESTION_KEY".to_string()).expect("SIGNOZ_INGESTION_KEY not set");
// let mut metadata = MetadataMap::new();
// metadata.insert(
// "signoz-ingestion-key",
// key.parse().expect("Could not parse signoz ingestion key"),
// );
// let tracer_provider = init_tracer_provider(metadata.clone(), service_name.clone())?;
// let meter_provider = init_meter_provider(metadata.clone(), service_name.clone())?;
// let tracer = tracer_provider.tracer("tracing-otel-subscriber");
// let fmt_layer = fmt::layer()
// .json()
// .with_writer(std::io::stderr)
// .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
// .with_span_list(false)
// .with_current_span(true)
// .event_format(trace_id_format::TraceIdFormat);
// cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
// // instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
// let console_layer = console_subscriber::spawn();
// tracing_subscriber::registry()
// .with(console_layer)
// .with(fmt_layer)
// .with(granual_filtered_env()?)
// .with(tracing_subscriber::filter::LevelFilter::from_level(Level::INFO))
// .with(MetricsLayer::new(meter_provider))
// .with(OpenTelemetryLayer::new(tracer))
// .try_init()
// .map_err(|e| TracingError::TracingTryInitError(e))?;
// } else {
// tracing_subscriber::registry()
// .with(fmt_layer)
// .with(granual_filtered_env()?)
// .with(tracing_subscriber::filter::LevelFilter::from_level(Level::INFO))
// .with(MetricsLayer::new(meter_provider))
// .with(OpenTelemetryLayer::new(tracer))
// .try_init()
// .map_err(|e| TracingError::TracingTryInitError(e))?;
// }}
// Ok(())
// }
// fn resource(service_name: String) -> Resource {
// Resource::builder()
// .with_service_name(service_name)
// .with_schema_url(
// [
// KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
// KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
// ],
// SCHEMA_URL,
// )
// .build()
// }
// fn init_tracer_provider(metadata: MetadataMap, service_name: String) -> Result<SdkTracerProvider, TracingError> {
// let endpoint = std::env::var("SIGNOZ_ENDPOINT".to_string()).expect("SIGNOZ_ENDPOINT not set");
// info!("SIGNOZ_ENDPOINT = {}", endpoint);
// let mut exporter_builder = opentelemetry_otlp::SpanExporter::builder()
// .with_tonic()
// .with_metadata(metadata)
// .with_endpoint(&endpoint);
// if endpoint.starts_with("https://") {
// exporter_builder =
// exporter_builder.with_tls_config(ClientTlsConfig::new().with_enabled_roots());
// }
// let exporter = exporter_builder.build()?;
// let tracer = SdkTracerProvider::builder()
// .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
// 1.0,
// ))))
// .with_id_generator(Compact13BytesIdGenerator)
// .with_resource(resource(service_name))
// .with_batch_exporter(exporter)
// .build();
// global::set_tracer_provider(tracer.clone());
// Ok(tracer)
// }
// fn init_meter_provider(metadata: MetadataMap, service_name: String) -> Result<SdkMeterProvider, TracingError> {
// let endpoint = std::env::var("SIGNOZ_ENDPOINT".to_string()).expect("SIGNOZ_ENDPOINT not set");
// let mut exporter_builder = opentelemetry_otlp::MetricExporter::builder()
// .with_tonic()
// .with_metadata(metadata)
// .with_endpoint(&endpoint)
// .with_temporality(opentelemetry_sdk::metrics::Temporality::default());
// if endpoint.starts_with("https://") {
// exporter_builder = exporter_builder.with_tls_config(ClientTlsConfig::new().with_enabled_roots());
// }
// let exporter = exporter_builder.build()?;
// let reader = PeriodicReader::builder(exporter)
// .with_interval(std::time::Duration::from_secs(30))
// .build();
// let stdout_reader =
// PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
// let meter_provider = MeterProviderBuilder::default()
// .with_resource(resource(service_name))
// .with_reader(reader)
// .with_reader(stdout_reader)
// .build();
// global::set_meter_provider(meter_provider.clone());
// Ok(meter_provider)
// }
@@ -0,0 +1,88 @@
use chrono::Utc;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::{SpanId, TraceId};
use serde::ser::{SerializeMap, Serializer as _};
use std::io;
use tracing::{Event, Subscriber};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_serde::fields::AsMap;
use tracing_serde::AsSerde;
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields};
use tracing_subscriber::registry::LookupSpan;
pub struct WriteAdaptor<'a> {
fmt_write: &'a mut dyn std::fmt::Write,
}
impl<'a> WriteAdaptor<'a> {
pub fn new(fmt_write: &'a mut dyn std::fmt::Write) -> Self {
Self { fmt_write }
}
}
impl<'a> io::Write for WriteAdaptor<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let s =
std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
self.fmt_write
.write_str(s)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(s.as_bytes().len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub struct TraceIdFormat;
impl<S, N> FormatEvent<S, N> for TraceIdFormat
where
S: Subscriber + for<'lookup> LookupSpan<'lookup>,
N: for<'writer> FormatFields<'writer> + 'static,
{
fn format_event(
&self,
_ctx: &FmtContext<'_, S, N>,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> std::fmt::Result
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let meta = event.metadata();
let mut visit = || {
let mut serializer = serde_json::Serializer::new(WriteAdaptor::new(&mut writer));
let mut serializer = serializer.serialize_map(None)?;
serializer.serialize_entry("timestamp", &Utc::now().to_rfc3339())?;
serializer.serialize_entry("level", &meta.level().as_serde())?;
serializer.serialize_entry("fields", &event.field_map())?;
serializer.serialize_entry("target", meta.target())?;
let current_span = tracing::Span::current();
let context = current_span.context();
let span_ref = context.span();
let span_context = span_ref.span_context();
let trace_id = span_context.trace_id();
if trace_id != TraceId::INVALID {
serializer.serialize_entry("trace_id", &trace_id.to_string())?;
let span_id = span_context.span_id();
if span_id != SpanId::INVALID {
serializer.serialize_entry("span_id", &span_id.to_string())?;
}
}
serializer.end()
};
visit().map_err(|_| std::fmt::Error)?;
writeln!(writer)
}
}
+1
View File
@@ -123,6 +123,7 @@ cli = ["clap", "comfy-table"]
fs-credentials-storage = ["nym-credential-storage/persistent-storage"]
fs-surb-storage = ["nym-client-core-surb-storage/fs-surb-storage"]
fs-gateways-storage = ["nym-client-core-gateways-storage/fs-gateways-storage"]
otel = ["nym-sphinx/otel"]
wasm = ["nym-gateway-client/wasm"]
metrics-server = []
@@ -99,6 +99,7 @@ pub struct ClientOutput {
}
impl ClientOutput {
#[instrument(name = "ClientOutput::register_receiver", skip_all)]
pub fn register_receiver(
&mut self,
) -> Result<mpsc::UnboundedReceiver<Vec<ReconstructedMessage>>, ClientCoreError> {
@@ -433,6 +434,7 @@ where
// buffer controlling all messages fetched from provider
// required so that other components would be able to use them (say the websocket)
#[instrument(skip_all)]
fn start_received_messages_buffer_controller(
local_encryption_keypair: Arc<x25519::KeyPair>,
query_receiver: ReceivedBufferRequestReceiver,
@@ -465,6 +467,7 @@ where
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn start_gateway_client(
config: &Config,
initialisation_result: InitialisationResult,
@@ -571,6 +574,7 @@ where
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn setup_gateway_transceiver(
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
config: &Config,
@@ -728,7 +732,7 @@ where
shutdown_tracker,
)
}
#[instrument(skip_all)]
fn start_mix_traffic_controller(
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
shutdown_tracker: &ShutdownTracker,
@@ -838,6 +842,7 @@ where
Ok(client.get_key_rotation_info().await?.into())
}
#[instrument(skip_all)]
pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
where
S::ReplyStore: Send + Sync,
@@ -29,6 +29,8 @@ pub enum InputMessage {
data: Vec<u8>,
lane: TransmissionLane,
max_retransmissions: Option<u32>,
// add trace_id for optional tracing of individual messages in debug mode
trace_id: Option<[u8; 12]>,
},
/// Creates a message used for a duplex anonymous communication where the recipient
@@ -45,6 +47,7 @@ pub enum InputMessage {
reply_surbs: u32,
lane: TransmissionLane,
max_retransmissions: Option<u32>,
trace_id: Option<[u8; 12]>,
},
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
@@ -90,12 +93,14 @@ impl InputMessage {
data: Vec<u8>,
lane: TransmissionLane,
packet_type: Option<PacketType>,
trace_id: Option<[u8; 12]>,
) -> Self {
let message = InputMessage::Regular {
recipient,
data,
lane,
max_retransmissions: None,
trace_id,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -110,6 +115,7 @@ impl InputMessage {
reply_surbs: u32,
lane: TransmissionLane,
packet_type: Option<PacketType>,
trace_id: Option<[u8; 12]>,
) -> Self {
let message = InputMessage::Anonymous {
recipient,
@@ -117,6 +123,7 @@ impl InputMessage {
reply_surbs,
lane,
max_retransmissions: None,
trace_id,
};
if let Some(packet_type) = packet_type {
InputMessage::new_wrapper(message, packet_type)
@@ -185,4 +192,13 @@ impl InputMessage {
self.set_max_retransmissions(max_retransmissions);
self
}
pub fn trace_id(&self) -> Option<[u8; 12]> {
match self {
InputMessage::Regular { trace_id, .. } => *trace_id,
InputMessage::Anonymous { trace_id, .. } => *trace_id,
InputMessage::Premade { .. } | InputMessage::Reply { .. } => None,
InputMessage::MessageWrapper { message, .. } => message.trace_id(),
}
}
}
@@ -70,6 +70,7 @@ where
.send_reply(recipient_tag, data, lane, max_retransmissions);
}
#[instrument(skip_all)]
async fn handle_plain_message(
&mut self,
recipient: Recipient,
@@ -77,16 +78,18 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
trace_id: Option<[u8; 12]>,
) {
if let Err(err) = self
.message_handler
.try_send_plain_message(recipient, content, lane, packet_type, max_retransmissions)
.try_send_plain_message(recipient, content, lane, packet_type, max_retransmissions, trace_id)
.await
{
warn!("failed to send a plain message - {err}")
}
}
#[instrument(skip_all)]
async fn handle_repliable_message(
&mut self,
recipient: Recipient,
@@ -95,6 +98,7 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
trace_id: Option<[u8; 12]>,
) {
if let Err(err) = self
.message_handler
@@ -105,6 +109,7 @@ where
lane,
packet_type,
max_retransmissions,
trace_id,
)
.await
{
@@ -113,20 +118,29 @@ where
}
#[allow(clippy::panic)]
#[instrument(skip_all)]
async fn on_input_message(&mut self, msg: InputMessage) {
let trace_id = msg.trace_id();
if let Some(tid) = trace_id {
tracing::warn!("Processing input message with trace_id: {:?}", tid);
}
match msg {
InputMessage::Regular {
recipient,
data,
lane,
max_retransmissions,
..
} => {
warn!("Handling regular input message with trace_id: {:?}", trace_id);
self.handle_plain_message(
recipient,
data,
lane,
PacketType::Mix,
max_retransmissions,
trace_id
)
.await
}
@@ -136,7 +150,9 @@ where
reply_surbs,
lane,
max_retransmissions,
..
} => {
warn!("Handling anonymous input message with trace_id: {:?}", trace_id);
self.handle_repliable_message(
recipient,
data,
@@ -144,6 +160,7 @@ where
lane,
PacketType::Mix,
max_retransmissions,
trace_id
)
.await
}
@@ -153,6 +170,7 @@ where
lane,
max_retransmissions,
} => {
warn!("Handling reply input message with trace_id: {:?}", trace_id);
self.handle_reply(recipient_tag, data, lane, max_retransmissions)
.await;
}
@@ -166,13 +184,16 @@ where
data,
lane,
max_retransmissions,
..
} => {
tracing::warn!("Handling regular input message with trace_id: {:?}", trace_id);
self.handle_plain_message(
recipient,
data,
lane,
packet_type,
max_retransmissions,
trace_id
)
.await
}
@@ -182,6 +203,7 @@ where
reply_surbs,
lane,
max_retransmissions,
..
} => {
self.handle_repliable_message(
recipient,
@@ -190,6 +212,7 @@ where
lane,
packet_type,
max_retransmissions,
trace_id
)
.await
}
@@ -213,6 +236,7 @@ where
};
}
#[instrument(skip_all)]
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started InputMessageListener with graceful shutdown support");
@@ -60,7 +60,7 @@ where
// TODO: Figure out retransmission packet type signaling
self.message_handler
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type)
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type, None)
.await
}
@@ -27,7 +27,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn};
// TODO: move that error elsewhere since it seems to be contaminating different files
#[derive(Debug, Error)]
@@ -476,6 +476,7 @@ where
self.forward_messages(msgs, lane).await;
}
#[instrument(skip_all)]
pub(crate) async fn try_send_plain_message(
&mut self,
recipient: Recipient,
@@ -483,6 +484,7 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
trace_id: Option<[u8; 12]>,
) -> Result<(), PreparationError> {
let message = NymMessage::new_plain(message);
self.try_split_and_send_non_reply_message(
@@ -491,10 +493,12 @@ where
lane,
packet_type,
max_retransmissions,
trace_id,
)
.await
}
#[instrument(skip_all)]
pub(crate) async fn try_split_and_send_non_reply_message(
&mut self,
message: NymMessage,
@@ -502,6 +506,7 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
trace_id: Option<[u8; 12]>,
) -> Result<(), PreparationError> {
debug!("Sending non-reply message with packet type {packet_type}");
// TODO: I really dislike existence of this assertion, it implies code has to be re-organised
@@ -534,6 +539,7 @@ where
&self.config.ack_key,
&recipient,
packet_type,
trace_id
)?;
let real_message = RealMessage::new(
@@ -585,6 +591,7 @@ where
TransmissionLane::AdditionalReplySurbs,
packet_type,
max_retransmissions,
None,
)
.await?;
@@ -602,6 +609,7 @@ where
lane: TransmissionLane,
packet_type: PacketType,
max_retransmissions: Option<u32>,
trace_id: Option<[u8; 12]>,
) -> Result<(), SurbWrappedPreparationError> {
debug!("Sending message with reply SURBs with packet type {packet_type}");
let sender_tag = self.get_or_create_sender_tag(&recipient);
@@ -625,6 +633,7 @@ where
lane,
packet_type,
max_retransmissions,
trace_id,
)
.await?;
@@ -639,6 +648,7 @@ where
recipient: Recipient,
chunk: Fragment,
packet_type: PacketType,
trace_id: Option<[u8; 12]>,
) -> Result<PreparedFragment, PreparationError> {
debug!("Sending single chunk with packet type {packet_type}");
let topology_permit = self.topology_access.get_read_permit().await;
@@ -650,6 +660,7 @@ where
&self.config.ack_key,
&recipient,
packet_type,
trace_id,
)?;
Ok(prepared_fragment)
@@ -80,6 +80,7 @@ impl StatisticsControl {
stats_report.into(),
TransmissionLane::General,
None,
None,
);
if let Err(err) = self.report_tx.send(report_message).await {
tracing::error!("Failed to report client stats: {err:?}");
@@ -1043,6 +1043,12 @@ impl<C, St> GatewayClient<C, St> {
}
// Note: this requires prior authentication
#[instrument(skip_all,
fields(
gateway = %self.gateway_identity,
gateway_address = %self.gateway_address
)
)]
pub fn start_listening_for_mixnet_messages(&mut self) -> Result<(), GatewayClientError> {
if !self.authenticated {
return Err(GatewayClientError::NotAuthenticated);
+15 -1
View File
@@ -19,11 +19,11 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true, features = ["log"] }
time = { workspace = true }
subtle = { workspace = true }
zeroize = { workspace = true }
nym-bin-common = { path = "../bin-common" }
nym-crypto = { path = "../crypto", features = ["aead", "hashing"] }
nym-pemstore = { path = "../pemstore" }
nym-sphinx = { path = "../nymsphinx" }
@@ -34,6 +34,11 @@ nym-task = { path = "../task" }
nym-credentials = { path = "../credentials" }
nym-credentials-interface = { path = "../credentials-interface" }
opentelemetry = { workspace = true, features = ["trace"], optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
tracing-opentelemetry = { workspace = true, optional = true }
tracing = { workspace = true, features = ["std", "attributes", "tracing-attributes"] }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
workspace = true
features = ["time"]
@@ -51,3 +56,12 @@ anyhow = { workspace = true }
nym-compact-ecash = { path = "../nym_offline_compact_ecash" } # we need specific imports in tests
nym-test-utils = { path = "../test-utils" }
tokio = { workspace = true, features = ["full"] }
[features]
default = []
otel = [
"nym-bin-common/otel",
"opentelemetry",
"opentelemetry_sdk",
"tracing-opentelemetry",
]
@@ -4,6 +4,7 @@
use crate::{AuthenticationFailure, GatewayRequestsError, SharedGatewayKey};
use nym_crypto::asymmetric::ed25519;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::iter;
use std::time::Duration;
use subtle::ConstantTimeEq;
@@ -16,6 +17,9 @@ pub struct AuthenticateRequest {
pub content: AuthenticateRequestContent,
pub request_signature: ed25519::Signature,
#[serde(default)]
pub otel_context: Option<HashMap<String, String>>,
}
impl AuthenticateRequest {
@@ -23,6 +27,7 @@ impl AuthenticateRequest {
protocol_version: u8,
shared_key: &SharedGatewayKey,
identity_keys: &ed25519::KeyPair,
otel_context: Option<HashMap<String, String>>,
) -> Result<AuthenticateRequest, GatewayRequestsError> {
let content = AuthenticateRequestContent::new(
protocol_version,
@@ -35,6 +40,7 @@ impl AuthenticateRequest {
Ok(AuthenticateRequest {
content,
request_signature,
otel_context,
})
}
@@ -8,12 +8,16 @@ use crate::{
AUTHENTICATE_V2_PROTOCOL_VERSION, CREDENTIAL_UPDATE_V2_PROTOCOL_VERSION,
INITIAL_PROTOCOL_VERSION,
};
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::context::ContextCarrier;
use nym_credentials_interface::CredentialSpendingData;
use nym_crypto::asymmetric::ed25519;
use nym_sphinx::DestinationAddressBytes;
use nym_statistics_common::types::SessionType;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use tracing::{instrument, warn};
use tungstenite::Message;
pub mod authenticate;
@@ -76,8 +80,13 @@ pub enum ClientControlRequest {
address: String,
enc_address: String,
iv: String,
/// this is a trace id that is used in testing and performance verification
/// in mainnet, this will always be set to None
#[serde(default)]
otel_context: Option<HashMap<String, String>>,
},
AuthenticateV2(Box<AuthenticateRequest>),
#[serde(alias = "handshakePayload")]
@@ -127,14 +136,25 @@ impl ClientControlRequest {
let nonce = shared_key.random_nonce_or_iv();
let ciphertext = shared_key.encrypt_naive(address.as_bytes_ref(), Some(&nonce))?;
#[cfg(feature = "otel")]
let context_carrier = {
let context = opentelemetry::Context::current();
ContextCarrier::new_with_current_context(context).into_map()
};
Ok(ClientControlRequest::Authenticate {
protocol_version,
address: address.as_base58_string(),
enc_address: bs58::encode(&ciphertext).into_string(),
iv: bs58::encode(&nonce).into_string(),
#[cfg(feature = "otel")]
otel_context: Some(context_carrier),
#[cfg(not(feature = "otel"))]
otel_context: None,
})
}
#[instrument]
pub fn new_authenticate_v2(
shared_key: &SharedGatewayKey,
identity_keys: &ed25519::KeyPair,
@@ -142,8 +162,27 @@ impl ClientControlRequest {
// if we're using v2 authentication, we must announce at least that protocol version
let protocol_version = AUTHENTICATE_V2_PROTOCOL_VERSION;
#[cfg(feature = "otel")]
let context_carrier = {
use nym_bin_common::opentelemetry::context::extract_trace_id_from_tracing_cx;
let trace_id = extract_trace_id_from_tracing_cx();
use tracing_opentelemetry::OpenTelemetrySpanExt;
let current_span = tracing::Span::current();
let otel_context = current_span.context();
ContextCarrier::new_with_current_context(otel_context).into_map()
};
#[cfg(not(feature = "otel"))]
let context_carrier: HashMap<String, String> = HashMap::new();
Ok(ClientControlRequest::AuthenticateV2(Box::new(
AuthenticateRequest::new(protocol_version, shared_key, identity_keys)?,
AuthenticateRequest::new(
protocol_version,
shared_key,
identity_keys,
Some(context_carrier)
)?,
)))
}
+4 -1
View File
@@ -11,9 +11,11 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
# TODO: Remove otel from default before release
default=["tunneling"]
tunneling=[]
network-defaults = ["dep:nym-network-defaults"]
otel = ["nym-bin-common/otel", "opentelemetry", "opentelemetry_sdk"]
debug-inventory = ["nym-http-api-client-macro/debug-inventory"]
[dependencies]
@@ -24,6 +26,8 @@ reqwest = { workspace = true, features = ["json", "gzip", "deflate", "brotli", "
http.workspace = true
url = { workspace = true }
once_cell = { workspace = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true}
@@ -53,4 +57,3 @@ features = ["tokio"]
[dev-dependencies]
tokio = { workspace = true, features = ["rt", "macros"] }
+16
View File
@@ -976,6 +976,22 @@ impl ApiClientCore for Client {
self.apply_hosts_to_req(&mut req);
// if opentelemetry is activated add the current trace context to the request
#[cfg(feature = "otel")]
{
use opentelemetry::Context;
use nym_bin_common::opentelemetry::context::ContextCarrier;
let carrier = ContextCarrier::new_with_current_context(Context::current());
if let Some(traceparent) = carrier.extract_traceparent() {
if let Ok(header_value) = HeaderValue::from_str(&traceparent) {
req.headers_mut()
.insert("traceparent", header_value);
}
}
}
let mut rb = RequestBuilder::from_parts(self.reqwest_client.clone(), req);
rb = rb
+3
View File
@@ -18,6 +18,7 @@ bytes = { workspace = true, optional = true }
colored = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
mime = { workspace = true, optional = true }
nym-bin-common = { path = "../bin-common" }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_yaml = { workspace = true, optional = true }
@@ -49,6 +50,8 @@ middleware = [
"zeroize"
]
otel = ["nym-bin-common/otel"]
utoipa = ["dep:utoipa"]
[lints]
@@ -56,6 +56,16 @@ async fn log_request(
let host = header_map(request.headers().get(HOST), "Unknown Host".to_string());
// Extract traceparent from headers if it exists
#[cfg(feature = "otel")]
let traceparent = request
.headers()
.get("traceparent")
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string());
#[cfg(not(feature = "otel"))]
let traceparent: Option<String> = None;
let start = Instant::now();
// run request through all middleware, incl. extractors
let res = next.run(request).await;
@@ -82,10 +92,10 @@ async fn log_request(
match level {
LogLevel::Debug => debug!(
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent}"
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent} traceparent: {traceparent:?}",
),
LogLevel::Info => info!(
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent}"
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent} traceparent: {traceparent:?}"
),
}
+1
View File
@@ -231,6 +231,7 @@ where
&address,
&address,
PacketType::Mix,
None
)?)
}
+6
View File
@@ -8,12 +8,14 @@ license = { workspace = true }
repository = { workspace = true }
[dependencies]
sphinx-packet = { workspace = true }
tracing = { workspace = true }
rand = { workspace = true }
rand_distr = { workspace = true }
rand_chacha = { workspace = true }
thiserror = { workspace = true }
nym-bin-common = { path = "../bin-common" }
nym-sphinx-acknowledgements = { path = "acknowledgements" }
nym-sphinx-addressing = { path = "addressing" }
nym-sphinx-anonymous-replies = { path = "anonymous-replies" }
@@ -55,3 +57,7 @@ outfox = [
"nym-sphinx-params/outfox",
"nym-sphinx-types/outfox",
]
otel = [
"nym-bin-common/otel",
]
@@ -59,7 +59,7 @@ impl SurbAck {
};
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
let destination = recipient.as_sphinx_destination(None);
let surb_ack_payload = prepare_identifier(rng, ack_key, marshaled_fragment_id);
let packet_size = match packet_type {
+7 -1
View File
@@ -8,14 +8,20 @@ license = { workspace = true }
repository = { workspace = true }
[dependencies]
nym-bin-common = { path = "../../bin-common", features = ["opentelemetry"] } # for trace id compression/decompression
nym-crypto = { path = "../../crypto", features = ["asymmetric", "sphinx"] } # all addresses are expressed in terms on their crypto keys
nym-sphinx-types = { path = "../types", features = ["sphinx"] } # we need to be able to refer to some types defined inside sphinx crate
serde = { workspace = true } # implementing serialization/deserialization for some types, like `Recipient`
thiserror = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
rand = { workspace = true }
nym-crypto = { path = "../../crypto", features = ["rand"] }
bincode = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
[features]
default = []
otel = ["nym-bin-common/otel"]
+16 -2
View File
@@ -150,12 +150,26 @@ impl Recipient {
// TODO: Currently the `DestinationAddress` is equivalent to `ClientIdentity`, but perhaps
// it shouldn't be? Maybe it should be (for example) H(`ClientIdentity || ClientEncryptionKey`)
// instead? That is an open question.
pub fn as_sphinx_destination(&self) -> Destination {
pub fn as_sphinx_destination(&self, trace_id: Option<[u8; 12]>) -> Destination {
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::compact_id_generator::decompress_trace_id;
#[cfg(feature = "otel")]
let trace_id_16 = if let Some(trace_id) = trace_id {
decompress_trace_id(&trace_id)
} else {
decompress_trace_id(&[0u8; 12])
};
#[cfg(not(feature = "otel"))]
let trace_id_16 = {
_ = trace_id;
[0u8; 16]
};
// since the nym mix network differs slightly in design from loopix, we do not care
// about "surb_id" field at all and just use the default value.
Destination::new(
self.client_identity.derive_destination_address(),
Default::default(),
trace_id_16,
)
}
@@ -82,7 +82,7 @@ impl ReplySurb {
topology.random_route_to_egress(rng, recipient.gateway())?
};
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let destination = recipient.as_sphinx_destination();
let destination = recipient.as_sphinx_destination(None);
let mut surb_material = SURBMaterial::new(route, delays, destination);
if use_legacy_surb_format && !disable_mix_hops {
+1 -1
View File
@@ -125,7 +125,7 @@ where
let route = topology.random_route_to_egress(rng, full_address.gateway())?;
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
let destination = full_address.as_sphinx_destination();
let destination = full_address.as_sphinx_destination(None);
let rotation_id = topology.current_key_rotation();
let sphinx_key_rotation = SphinxKeyRotation::from(rotation_id);
+6
View File
@@ -11,8 +11,10 @@ repository = { workspace = true }
bytes = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
thiserror = { workspace = true }
opentelemetry = { workspace = true }
tracing = { workspace = true }
nym-bin-common = { path = "../../bin-common" }
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
nym-sphinx-params = { path = "../params", features = ["sphinx", "outfox"] }
nym-sphinx-forwarding = { path = "../forwarding" }
@@ -21,3 +23,7 @@ nym-sphinx-acknowledgements = { path = "../acknowledgements" }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
[features]
default = []
otel = ["nym-bin-common/otel"]
+38 -8
View File
@@ -2,6 +2,12 @@
// SPDX-License-Identifier: Apache-2.0
use crate::packet::FramedNymPacket;
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::{
compact_id_generator::decompress_trace_id,
context::ManualContextPropagator,
};
use nym_sphinx_acknowledgements::surb_ack::{SurbAck, SurbAckRecoveryError};
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError};
use nym_sphinx_forwarding::packet::MixPacket;
@@ -14,7 +20,9 @@ use nym_sphinx_types::{
};
use std::fmt::Display;
use thiserror::Error;
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info, instrument, trace, warn};
#[cfg(feature = "otel")]
use tracing::warn_span;
#[derive(Debug)]
pub enum MixProcessingResultData {
@@ -236,6 +244,7 @@ fn perform_framed_packet_processing(
})
}
#[instrument(skip_all)]
fn wrap_processed_sphinx_packet(
packet: nym_sphinx_types::ProcessedPacket,
packet_size: PacketSize,
@@ -258,15 +267,36 @@ fn wrap_processed_sphinx_packet(
// sphinx all together?
ProcessedPacketData::FinalHop {
destination,
#[cfg(feature = "otel")]
identifier,
#[cfg(not(feature = "otel"))]
identifier: _,
payload,
} => process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
key_rotation,
),
} => {
// if we have a trace id in the destination, we log it for easier correlation later on
#[cfg(feature = "otel")]
let span = match identifier[0..12].try_into().map(|b: [u8; 12]| b) {
Ok(trace_bytes) if !trace_bytes.iter().all(|b| *b == 0) => {
let full_trace_id_bytes = decompress_trace_id(&trace_bytes);
let full_trace_id = opentelemetry::trace::TraceId::from_bytes(full_trace_id_bytes);
let context_propagator = ManualContextPropagator::new_from_tid("final_hop", full_trace_id);
warn_span!(parent: &context_propagator.root_span, "final_hop_processing", trace_id=%full_trace_id)
}
_ => {
warn_span!("final_hop_processing")
}
};
#[cfg(feature = "otel")]
let _entered_span = span.enter();
process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
key_rotation,
)
}
}?;
Ok(MixProcessingResult {
+9 -17
View File
@@ -163,19 +163,6 @@ pub trait FragmentPreparer {
})
}
/// Tries to convert this [`Fragment`] into a [`SphinxPacket`] that can be sent through the Nym mix-network,
/// such that it contains required SURB-ACK and public component of the ephemeral key used to
/// derive the shared key.
/// Also all the data, apart from the said public component, is encrypted with an ephemeral shared key.
/// This method can fail if the provided network topology is invalid.
/// It returns total expected delay as well as the [`SphinxPacket`] (including first hop address)
/// to be sent through the network.
///
/// The procedure is as follows:
/// For each fragment:
/// - compute SURB_ACK
/// - generate (x, g^x)
/// - compute k = KDF(remote encryption key ^ x) this is equivalent to KDF( dh(remote, x) )
/// - compute v_b = AES-128-CTR(k, serialized_fragment)
/// - compute vk_b = g^x || v_b
/// - compute sphinx_plaintext = SURB_ACK || g^x || v_b
@@ -189,6 +176,7 @@ pub trait FragmentPreparer {
packet_sender: &Recipient,
packet_recipient: &Recipient,
packet_type: PacketType,
trace_id: Option<[u8; 12]>,
) -> Result<PreparedFragment, NymTopologyError> {
debug!("Preparing chunk for sending");
// each plain or repliable packet (i.e. not a reply) attaches an ephemeral public key so that the recipient
@@ -249,7 +237,8 @@ pub trait FragmentPreparer {
topology.random_route_to_egress(&mut rng, destination)?
};
let destination = packet_recipient.as_sphinx_destination();
let destination = packet_recipient.as_sphinx_destination(trace_id);
tracing::warn!("Packet destination with trace id: {:?}", &destination.identifier);
// including set of delays
let delays =
@@ -274,9 +263,10 @@ pub trait FragmentPreparer {
)?,
};
// from the previously constructed route extract the first hop
let first_hop_address =
NymNodeRoutingAddress::try_from(route.first().unwrap().address).unwrap();
// - compute k = KDF(remote encryption key ^ x) this is equivalent to KDF( dh(remote, x) )
// from the previously constructed route extract the first hop
let first_hop_address =
NymNodeRoutingAddress::try_from(route.first().unwrap().address).unwrap();
Ok(PreparedFragment {
// the round-trip delay is the sum of delays of all hops on the forward route as
@@ -428,6 +418,7 @@ where
ack_key: &AckKey,
packet_recipient: &Recipient,
packet_type: PacketType,
trace_id: Option<[u8; 12]>,
) -> Result<PreparedFragment, NymTopologyError> {
let sender = self.sender_address;
@@ -439,6 +430,7 @@ where
&sender,
packet_recipient,
packet_type,
trace_id,
)
}
+2
View File
@@ -10,6 +10,8 @@ repository = { workspace = true }
[dependencies]
sphinx-packet = { workspace = true, optional = true }
nym-outfox = { path = "../../../nym-outfox", optional = true }
# TODO add optional
nym-bin-common = { path = "../../../common/bin-common" }
thiserror = { workspace = true }
[features]
-2
View File
@@ -32,8 +32,6 @@ tracing.workspace = true
url.workspace = true
# TEMP
#nym-bin-common = { path = "../bin-common", features = ["basic_tracing"]}
[build-dependencies]
@@ -350,6 +350,7 @@ impl SocksClient {
self.config.connection_start_surbs,
TransmissionLane::ConnectionId(self.connection_id),
self.packet_type,
None
);
self.input_sender
.send(input_message)
@@ -373,6 +374,7 @@ impl SocksClient {
msg.into_bytes(),
TransmissionLane::ConnectionId(self.connection_id),
self.packet_type,
None
);
self.input_sender
.send(input_message)
@@ -439,6 +441,7 @@ impl SocksClient {
per_request_surbs,
lane,
packet_type,
None
)
} else {
InputMessage::new_regular(
@@ -446,6 +449,7 @@ impl SocksClient {
provider_message.into_bytes(),
lane,
packet_type,
None
)
}
})
+1
View File
@@ -29,6 +29,7 @@ x25519-dalek = { workspace = true, features = ["static_secrets"] }
cosmwasm-std = { workspace = true }
cosmrs = { workspace = true }
nym-bin-common = { path = "../../common/bin-common" }
nym-validator-client = { path = "../../common/client-libs/validator-client" }
nym-mixnet-contract-common = { path = "../../common/cosmwasm-smart-contracts/mixnet-contract" }
nym-vesting-contract-common = { path = "../../common/cosmwasm-smart-contracts/vesting-contract" }
+16
View File
@@ -50,6 +50,7 @@ zeroize = { workspace = true }
# internal
nym-api-requests = { path = "../nym-api/nym-api-requests" }
nym-bin-common = { path = "../common/bin-common" }
nym-credentials = { path = "../common/credentials" }
nym-credentials-interface = { path = "../common/credentials-interface" }
nym-credential-verification = { path = "../common/credential-verification" }
@@ -82,9 +83,24 @@ nym-service-provider-requests-common = { path = "../common/service-provider-requ
defguard_wireguard_rs = { workspace = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
tracing-opentelemetry = { workspace = true, optional = true }
[dev-dependencies]
nym-gateway-storage = { path = "../common/gateway-storage", features = ["mock"] }
nym-wireguard = { path = "../common/wireguard", features = ["mock"] }
mock_instant = "0.6.0"
time = { workspace = true }
[features]
default = []
otel = [
"nym-bin-common/otel",
"nym-client-core/otel",
"nym-gateway-requests/otel",
"nym-sphinx/otel",
"opentelemetry",
"opentelemetry_sdk",
"tracing-opentelemetry",
]
@@ -14,6 +14,8 @@ use futures::{
future::{FusedFuture, OptionFuture},
FutureExt, StreamExt,
};
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::context::ManualContextPropagator;
use nym_credential_verification::CredentialVerifier;
use nym_credential_verification::{
bandwidth_storage_manager::BandwidthStorageManager, ClientBandwidth,
@@ -31,6 +33,8 @@ use nym_sphinx::forwarding::packet::MixPacket;
use nym_statistics_common::{gateways::GatewaySessionEvent, types::SessionType};
use nym_validator_client::coconut::EcashApiError;
use rand::{random, CryptoRng, Rng};
#[cfg(feature = "otel")]
use std::collections::HashMap;
use std::{process, time::Duration};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -147,6 +151,8 @@ pub(crate) struct AuthenticatedHandler<R, S> {
// senders that are used to return the result of the ping to the handler requesting the ping.
is_active_request_receiver: IsActiveRequestReceiver,
is_active_ping_pending_reply: Option<(u64, IsActiveResultSender)>,
#[cfg(feature = "otel")]
pub otel_propagator: Option<ManualContextPropagator>,
}
// explicitly remove handle from the global store upon being dropped
@@ -189,6 +195,21 @@ impl<R, S> AuthenticatedHandler<R, S> {
client_address: client.address.as_base58_string(),
})?;
#[cfg(feature = "otel")]
let manual_ctx_propagator = {
let context = match client.otel_context {
Some(ref ctx) => ctx.clone(),
None => HashMap::new(),
};
let manual_ctx_propagator = if !context.is_empty() {
Some(ManualContextPropagator::new("upgrading_fresh_to_authenticated", context))
} else {
None
};
manual_ctx_propagator
};
let handler = AuthenticatedHandler {
bandwidth_storage_manager: BandwidthStorageManager::new(
Box::new(fresh.shared_state.storage.clone()),
@@ -202,6 +223,8 @@ impl<R, S> AuthenticatedHandler<R, S> {
mix_receiver,
is_active_request_receiver,
is_active_ping_pending_reply: None,
#[cfg(feature = "otel")]
otel_propagator: manual_ctx_propagator,
};
handler.send_metrics(GatewaySessionEvent::new_session_start(
handler.client.address,
@@ -227,7 +250,17 @@ impl<R, S> AuthenticatedHandler<R, S> {
/// # Arguments
///
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
#[instrument(skip_all)]
fn forward_packet(&self, mix_packet: MixPacket) {
#[cfg(feature = "otel")]
{
let span = match &self.otel_propagator {
Some(propagator) => info_span!(parent: &propagator.root_span, "forwarding_mix_packet"),
None => info_span!("forwarding_mix_packet_no_otel"),
};
let _enter = span.enter();
}
if let Err(err) = self
.inner
.shared_state
@@ -287,6 +320,18 @@ impl<R, S> AuthenticatedHandler<R, S> {
&mut self,
mix_packet: MixPacket,
) -> Result<ServerResponse, RequestHandlingError> {
trace!("forwarding sphinx packet");
#[cfg(feature = "otel")]
let span = {
let span = match &self.otel_propagator {
Some(propagator) => info_span!(parent: &propagator.root_span, "handling_forward_sphinx"),
None => debug_span!("handling_forward_sphinx_no_otel"),
};
span
};
#[cfg(feature = "otel")]
let _enter = span.enter();
let required_bandwidth = mix_packet.packet().len() as i64;
let remaining_bandwidth = self
@@ -305,8 +350,20 @@ impl<R, S> AuthenticatedHandler<R, S> {
/// # Arguments
///
/// * `bin_msg`: raw message to handle.
#[instrument(skip_all)]
async fn handle_binary(&mut self, bin_msg: Vec<u8>) -> Message {
trace!("binary request");
#[cfg(feature = "otel")]
let span = {
let span = match &self.otel_propagator {
Some(propagator) => info_span!(parent: &propagator.root_span, "handling_binary_request"),
None => info_span!("handling_binary_request_no_otel"),
};
span
};
#[cfg(feature = "otel")]
let _enter = span.enter();
// this function decrypts the request and checks the MAC
match BinaryRequest::try_from_encrypted_tagged_bytes(bin_msg, &self.client.shared_keys) {
Err(e) => {
@@ -379,6 +436,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
Ok(SensitiveServerResponse::KeyUpgradeAck {}.encrypt(&self.client.shared_keys)?)
}
#[instrument(skip_all)]
async fn handle_encrypted_text_request(
&mut self,
ciphertext: Vec<u8>,
@@ -408,6 +466,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
/// # Arguments
///
/// * `raw_request`: raw message to handle.
#[instrument(skip_all)]
async fn handle_text(&mut self, raw_request: String) -> Message
where
R: Rng + CryptoRng,
@@ -552,6 +611,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
}
}
#[instrument(skip_all)]
async fn handle_is_active_request(
&mut self,
reply_tx: IsActiveResultSender,
@@ -582,16 +642,31 @@ impl<R, S> AuthenticatedHandler<R, S> {
/// Simultaneously listens for incoming client requests, which realistically should only be
/// binary requests to forward sphinx packets or increase bandwidth
/// and for sphinx packets received from the mix network that should be sent back to the client.
#[instrument(level = "debug", skip_all,
fields(
client = %self.client.address.as_base58_string()
)
)]
pub(crate) async fn listen_for_requests(mut self)
where
R: Rng + CryptoRng,
S: AsyncRead + AsyncWrite + Unpin,
{
trace!("Started listening for ALL incoming requests...");
// Ping timeout future used to check if the client responded to our ping request
let mut ping_timeout: OptionFuture<_> = None.into();
#[cfg(feature = "otel")]
let from_client_span = {
let span = match &self.otel_propagator {
Some(propagator) => info_span!(parent: &propagator.root_span, "authenticated_client_handler_listen"),
None => tracing::debug_span!("authenticated_client_handler_listen_no_otel"),
};
span
};
#[cfg(feature = "otel")]
let _enter = from_client_span.enter();
loop {
tokio::select! {
// Received a request to ping the client to check if it's still active
@@ -609,10 +684,10 @@ impl<R, S> AuthenticatedHandler<R, S> {
},
// The ping timeout expired, meaning the client didn't respond to our ping request
_ = &mut ping_timeout, if !ping_timeout.is_terminated() => {
ping_timeout = None.into();
self.handle_ping_timeout().await;
ping_timeout = None.into();
self.handle_ping_timeout().await;
},
socket_msg = self.inner.read_websocket_message() => {
socket_msg = self.inner.read_websocket_message().in_current_span() => {
let socket_msg = match socket_msg {
None => break,
Some(Ok(socket_msg)) => socket_msg,
@@ -627,7 +702,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
}
if let Some(response) = self.handle_request(socket_msg).await {
if let Err(err) = self.inner.send_websocket_message(response).await {
if let Err(err) = self.inner.send_websocket_message(response).in_current_span().await {
debug!(
"Failed to send message over websocket: {err}. Assuming the connection is dead.",
);
@@ -635,7 +710,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
}
}
},
mix_messages = self.mix_receiver.next() => {
mix_messages = self.mix_receiver.next().in_current_span() => {
let mix_messages = match mix_messages {
None => {
debug!("mix receiver was closed! Assuming the connection is dead.");
@@ -13,6 +13,8 @@ use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::context::ManualContextPropagator;
use nym_credentials_interface::AvailableBandwidth;
use nym_crypto::aes::cipher::crypto_common::rand_core::RngCore;
use nym_crypto::asymmetric::ed25519;
@@ -34,6 +36,8 @@ use nym_node_metrics::events::MetricsEvent;
use nym_sphinx::DestinationAddressBytes;
use nym_task::ShutdownToken;
use rand::CryptoRng;
#[cfg(feature = "otel")]
use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::Duration;
use thiserror::Error;
@@ -41,7 +45,11 @@ use time::OffsetDateTime;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::timeout;
use tokio_tungstenite::tungstenite::{protocol::Message, Error as WsError};
use tracing::*;
#[cfg(feature = "otel")]
use tracing::info_span;
use tracing::{debug, error, info, instrument, Instrument, warn};
#[derive(Debug, Error)]
pub(crate) enum InitialAuthenticationError {
@@ -163,6 +171,7 @@ impl<R, S> FreshHandler<R, S> {
/// Attempts to perform websocket handshake with the remote and upgrades the raw TCP socket
/// to the framed WebSocket.
#[instrument(skip_all)]
pub(crate) async fn perform_websocket_handshake(&mut self) -> Result<(), WsError>
where
S: AsyncRead + AsyncWrite + Unpin,
@@ -186,6 +195,7 @@ impl<R, S> FreshHandler<R, S> {
/// # Arguments
///
/// * `init_msg`: a client handshake init message which should contain its identity public key as well as an ephemeral key.
#[instrument(skip_all)]
async fn perform_registration_handshake(
&mut self,
init_msg: Vec<u8>,
@@ -211,6 +221,7 @@ impl<R, S> FreshHandler<R, S> {
}
/// Attempts to read websocket message from the associated socket.
#[instrument(skip_all)]
pub(crate) async fn read_websocket_message(&mut self) -> Option<Result<Message, WsError>>
where
S: AsyncRead + AsyncWrite + Unpin,
@@ -226,6 +237,7 @@ impl<R, S> FreshHandler<R, S> {
/// # Arguments
///
/// * `msg`: WebSocket message to write back to the client.
#[instrument(skip_all)]
pub(crate) async fn send_websocket_message(
&mut self,
msg: impl Into<Message>,
@@ -242,6 +254,7 @@ impl<R, S> FreshHandler<R, S> {
}
}
#[instrument(skip_all)]
pub(crate) async fn send_error_response(
&mut self,
err: impl std::error::Error,
@@ -269,6 +282,7 @@ impl<R, S> FreshHandler<R, S> {
///
/// * `shared_keys`: keys derived between the client and gateway.
/// * `packets`: unwrapped packets that are to be pushed back to the client.
#[instrument(skip_all)]
pub(crate) async fn push_packets_to_client(
&mut self,
shared_keys: &SharedGatewayKey,
@@ -632,6 +646,8 @@ impl<R, S> FreshHandler<R, S> {
address,
shared_keys.key,
session_request_start,
#[cfg(feature = "otel")]
None,
)),
ServerResponse::Authenticate {
protocol_version: Some(negotiated_protocol),
@@ -641,9 +657,14 @@ impl<R, S> FreshHandler<R, S> {
))
}
#[instrument(skip_all, fields(
address = %request.content.client_identity.derive_destination_address(),
))]
async fn handle_authenticate_v2(
&mut self,
request: Box<AuthenticateRequest>,
#[cfg(feature = "otel")]
otel_context: Option<HashMap<String, String>>,
) -> Result<InitialAuthResult, InitialAuthenticationError>
where
S: AsyncRead + AsyncWrite + Unpin,
@@ -718,6 +739,8 @@ impl<R, S> FreshHandler<R, S> {
address,
shared_key.key,
session_request_start,
#[cfg(feature = "otel")]
otel_context,
)),
ServerResponse::Authenticate {
protocol_version: Some(negotiated_protocol),
@@ -816,6 +839,8 @@ impl<R, S> FreshHandler<R, S> {
remote_address,
shared_keys,
OffsetDateTime::now_utc(),
#[cfg(feature = "otel")]
None
);
Ok(InitialAuthResult::new(
@@ -846,6 +871,7 @@ impl<R, S> FreshHandler<R, S> {
}
}
#[instrument(skip_all)]
pub(crate) async fn handle_initial_client_request(
&mut self,
request: ClientControlRequest,
@@ -854,17 +880,45 @@ impl<R, S> FreshHandler<R, S> {
S: AsyncRead + AsyncWrite + Unpin + Send,
R: CryptoRng + RngCore + Send,
{
// we can handle stateless client requests without prior authentication, like `ClientControlRequest::SupportedProtocol`
// extract and set up opentelemetry context if provided
#[cfg(feature = "otel")]
let (context_propagator, otel_ctx) = if let ClientControlRequest::AuthenticateV2(ref auth_req) = request {
if let Some(otel_context) = &auth_req.otel_context {
info!("=== OpenTelemetry context provided in the request: {otel_context:?} ===");
(Some(ManualContextPropagator::new("handling_initial_client_request_with_otel", otel_context.clone())), Some(otel_context.clone()))
} else {
debug!("No OpenTelemetry context provided in the request");
(None, None)
}
} else {
debug!("No OpenTelemetry context provided in the request");
(None, None)
};
#[cfg(feature = "otel")]
let child_span = match context_propagator {
Some(ref propagator) => {
let span = info_span!(parent: &propagator.root_span, "=== Handling initial client request with otel context ===");
span
}
None => tracing::debug_span!("=== Handling initial client request without otel context ==="),
};
#[cfg(feature = "otel")]
let _enter = child_span.enter();
let auth_result = match request {
ClientControlRequest::Authenticate {
protocol_version,
address,
enc_address,
iv,
otel_context: _,
} => {
self.handle_legacy_authenticate(protocol_version, address, enc_address, iv)
.await
}
#[cfg(feature = "otel")]
ClientControlRequest::AuthenticateV2(req) => self.handle_authenticate_v2(req, otel_ctx).await,
#[cfg(not(feature = "otel"))]
ClientControlRequest::AuthenticateV2(req) => self.handle_authenticate_v2(req).await,
ClientControlRequest::RegisterHandshakeInitRequest {
protocol_version,
@@ -889,13 +943,12 @@ impl<R, S> FreshHandler<R, S> {
}
other => debug!("authentication failure: {other}"),
}
self.send_and_forget_error_response(&err).await;
return Err(err);
}
};
// try to send auth response back to the client
// try to send auth response back to the client
if let Err(source) = self
.send_websocket_message(auth_result.server_response)
.await
@@ -912,9 +965,11 @@ impl<R, S> FreshHandler<R, S> {
warn!("could not establish client details");
return Err(InitialAuthenticationError::EmptyClientDetails);
};
Ok(Some(client_details))
}
#[instrument(skip_all)]
pub(crate) async fn handle_until_authenticated_or_failure(
mut self,
) -> Option<AuthenticatedHandler<R, S>>
@@ -953,7 +1008,7 @@ impl<R, S> FreshHandler<R, S> {
registration_details.session_request_timestamp,
);
return AuthenticatedHandler::upgrade(
let auth_handle = AuthenticatedHandler::upgrade(
self,
registration_details,
mix_receiver,
@@ -962,10 +1017,12 @@ impl<R, S> FreshHandler<R, S> {
.await
.inspect_err(|err| error!("failed to upgrade client handler: {err}"))
.ok();
return auth_handle;
}
}
}
#[instrument(skip_all)]
pub(crate) async fn wait_for_initial_message(
&mut self,
) -> Result<ClientControlRequest, InitialAuthenticationError>
@@ -1016,6 +1073,7 @@ impl<R, S> FreshHandler<R, S> {
.map_err(|_| InitialAuthenticationError::InvalidRequest)
}
#[instrument(skip_all)]
pub(crate) async fn start_handling(self)
where
S: AsyncRead + AsyncWrite + Unpin + Send,
@@ -1025,10 +1083,10 @@ impl<R, S> FreshHandler<R, S> {
let shutdown = self.shutdown.clone();
tokio::select! {
_ = shutdown.cancelled() => {
trace!("received cancellation")
tracing::trace!("received cancellation")
}
_ = super::handle_connection(self) => {
debug!("finished connection handler for {remote}")
_ = super::handle_connection(self).instrument(tracing::debug_span!("connection_handler", remote = %remote)) => {
tracing::debug!("finished connection handler for {remote}")
}
}
}
@@ -7,11 +7,15 @@ use nym_gateway_requests::shared_key::SharedGatewayKey;
use nym_gateway_requests::ServerResponse;
use nym_sphinx::DestinationAddressBytes;
use rand::{CryptoRng, Rng};
#[cfg(feature = "otel")]
use std::collections::HashMap;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::WebSocketStream;
use tracing::{debug, instrument, trace, warn};
#[cfg(feature = "otel")]
use tracing::Instrument;
pub(crate) use self::authenticated::AuthenticatedHandler;
pub(crate) use self::fresh::FreshHandler;
@@ -48,6 +52,8 @@ pub(crate) struct ClientDetails {
// note, this does **NOT ALWAYS** indicate timestamp of when client connected
// it is (for v2 auth) timestamp the client **signed** when it created the request
pub(crate) session_request_timestamp: OffsetDateTime,
#[cfg(feature = "otel")]
pub(crate) otel_context: Option<HashMap<String, String>>,
}
impl ClientDetails {
@@ -56,12 +62,16 @@ impl ClientDetails {
address: DestinationAddressBytes,
shared_keys: SharedGatewayKey,
session_request_timestamp: OffsetDateTime,
#[cfg(feature = "otel")]
otel_context: Option<HashMap<String, String>>,
) -> Self {
ClientDetails {
address,
id,
shared_keys,
session_request_timestamp,
#[cfg(feature = "otel")]
otel_context,
}
}
}
@@ -92,7 +102,7 @@ impl InitialAuthResult {
}
// imo there's no point in including the peer address in anything higher than debug
#[instrument(level = "debug", skip_all, fields(peer = %handle.peer_address))]
#[instrument(skip_all)]
pub(crate) async fn handle_connection<R, S>(mut handle: FreshHandler<R, S>)
where
R: Rng + CryptoRng + Send,
@@ -118,7 +128,23 @@ where
trace!("managed to perform websocket handshake!");
if let Some(auth_handle) = handle.handle_until_authenticated_or_failure().await {
auth_handle.listen_for_requests().await
#[cfg(feature = "otel")]
{
let from_client_span = {
let parent = match auth_handle.otel_propagator.as_ref() {
Some(propagator) => propagator.root_span(),
None => &tracing::Span::current(), // fallback to current span if no propagator
};
tracing::info_span!(parent: parent, "listening for requests")
};
auth_handle.listen_for_requests()
.instrument(from_client_span)
.await
}
#[cfg(not(feature = "otel"))]
{
auth_handle.listen_for_requests().await;
}
}
trace!("the handler is done!");
@@ -53,6 +53,7 @@ impl Listener {
)
}
#[instrument(skip_all)]
fn try_handle_accepted_connection(&self, accepted: io::Result<(TcpStream, SocketAddr)>) {
match accepted {
Ok((socket, remote_address)) => {
@@ -83,6 +84,7 @@ impl Listener {
.network
.new_ingress_websocket_client();
// 4. spawn the task handling the client connection
self.shutdown.try_spawn_named(
async move {
@@ -90,7 +92,7 @@ impl Listener {
let metrics_ref = handle.shared_state.metrics.clone();
// 4.1. handle all client requests until connection gets terminated
handle.start_handling().await;
handle.start_handling().in_current_span().await;
// 4.2. decrement the connection counter
metrics_ref.network.disconnected_ingress_websocket_client();
@@ -104,6 +106,7 @@ impl Listener {
// TODO: change the signature to pub(crate) async fn run(&self, handler: Handler)
#[instrument(skip_all)]
pub async fn run(&mut self) {
info!("Starting websocket listener at {}", self.address);
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
@@ -122,7 +125,7 @@ impl Listener {
trace!("client_handling::Listener: received shutdown");
break
}
connection = tcp_listener.accept() => {
connection = tcp_listener.accept().in_current_span() => {
self.try_handle_accepted_connection(connection)
}
}
@@ -979,6 +979,7 @@ fn create_input_message(
response_packet,
lane,
packet_type,
None,
))
} else {
tracing::error!("No nym-address or sender tag provided");
+1 -1
View File
@@ -94,7 +94,7 @@ nym-topology = { path = "../common/topology" }
nym-api-requests = { path = "nym-api-requests" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-http-api-client = { path = "../common/http-api-client" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "openapi", "basic_tracing"] }
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "openapi"] }
nym-node-tester-utils = { path = "../common/node-tester-utils" }
nym-node-requests = { path = "../nym-node/nym-node-requests" }
nym-types = { path = "../common/types" }
+1 -1
View File
@@ -31,7 +31,7 @@ async fn main() -> Result<(), anyhow::Error> {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
console_subscriber::init();
} else {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
}}
info!("Starting nym api...");
+2
View File
@@ -1221,12 +1221,14 @@ fn create_input_message(
surbs,
TransmissionLane::General,
None,
None
),
IncludedSurbs::ExposeSelfAddress => nym_sdk::mixnet::InputMessage::new_regular(
recipient,
data,
TransmissionLane::General,
None,
None
),
}
}
@@ -45,9 +45,7 @@ utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
zeroize.workspace = true
nym-bin-common = { path = "../../common/bin-common", features = [
"basic_tracing",
] }
nym-bin-common = { path = "../../common/bin-common" }
nym-compact-ecash = { path = "../../common/nym_offline_compact_ecash" }
nym-config = { path = "../../common/config" }
nym-crypto = { path = "../../common/crypto", features = [
@@ -6,7 +6,7 @@ cfg_if::cfg_if! {
use crate::cli::Cli;
use clap::Parser;
use nym_bin_common::bin_info_owned;
use nym_bin_common::logging::setup_tracing_logger;
use nym_bin_common::logging::setup_no_otel_logger;
use nym_network_defaults::setup_env;
use tracing::{info, trace};
@@ -29,7 +29,7 @@ async fn main() -> anyhow::Result<()> {
trace!("args: {cli:#?}");
setup_env(cli.config_env_file.as_ref());
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
let bin_info = bin_info_owned!();
info!("using the following version: {bin_info}");
+1
View File
@@ -195,5 +195,6 @@ fn create_input_message(
surbs,
TransmissionLane::General,
None,
None,
)
}
+1 -1
View File
@@ -30,7 +30,7 @@ utoipa-swagger-ui = { workspace = true, features = ["axum"] }
tokio-postgres = { workspace = true }
# internal
nym-bin-common = { path = "../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../common/bin-common" }
nym-client-core = { path = "../common/client-core" }
nym-crypto = { path = "../common/crypto" }
nym-network-defaults = { path = "../common/network-defaults" }
+1 -1
View File
@@ -187,7 +187,7 @@ async fn nym_topology_from_env() -> anyhow::Result<NymTopology> {
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
let args = Args::parse();
+12 -2
View File
@@ -31,6 +31,7 @@ humantime-serde = { workspace = true }
human-repr = { workspace = true }
ipnetwork = { workspace = true }
indicatif = { workspace = true }
opentelemetry = { workspace = true, optional = true } # make it optional later
rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
@@ -50,7 +51,6 @@ cupid = { workspace = true }
sysinfo = { workspace = true }
nym-bin-common = { path = "../common/bin-common", features = [
"basic_tracing",
"output_format",
] }
nym-client-core-config-types = { path = "../common/client-core/config-types", features = [
@@ -130,7 +130,17 @@ criterion = { workspace = true, features = ["async_tokio"] }
rand_chacha = { workspace = true }
[features]
tokio-console = ["console-subscriber", "nym-task/tokio-tracing"]
default = []
tokio-console = ["console-subscriber"]
otel = [
"nym-bin-common/otel",
"nym-gateway/otel",
"nym-http-api-common/otel",
"nym-sphinx-framing/otel",
"nym-sphinx-addressing/otel",
"opentelemetry",
]
[lints]
workspace = true
+2 -1
View File
@@ -8,7 +8,7 @@ use crate::node::mixnet::packet_forwarding::global::is_global_ip;
use crate::node::NymNode;
use std::fs;
use std::net::IpAddr;
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, instrument, trace, warn};
mod args;
@@ -37,6 +37,7 @@ fn check_public_ips(ips: &[IpAddr], local: bool) -> Result<(), NymNodeError> {
Ok(())
}
#[instrument(skip_all)]
pub(crate) async fn execute(mut args: Args) -> Result<(), NymNodeError> {
trace!("passed arguments: {args:#?}");
+132 -17
View File
@@ -6,10 +6,21 @@ use crate::cli::commands::{
test_throughput,
};
use crate::env::vars::{NYMNODE_CONFIG_ENV_FILE_ARG, NYMNODE_NO_BANNER_ARG};
// use crate::error::NymNodeError;
use clap::{Args, Parser, Subcommand};
use nym_bin_common::bin_info;
use nym_bin_common::{
bin_info,
logging::setup_no_otel_logger,
};
#[cfg(feature = "otel")]
use nym_bin_common::logging::error::TracingError;
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::setup_tracing_logger;
#[cfg(feature = "otel")]
use tracing::Instrument;
use std::future::Future;
use std::sync::OnceLock;
use tracing::instrument;
pub(crate) mod commands;
mod helpers;
@@ -52,30 +63,134 @@ impl Cli {
.block_on(fut))
}
#[instrument]
pub(crate) fn execute(self) -> anyhow::Result<()> {
// NOTE: `test_throughput` sets up its own logger as it has to include additional layers
if !matches!(self.command, Commands::TestThroughput(..)) {
crate::logging::setup_tracing_logger()?;
}
// if !matches!(self.command, Commands::TestThroughput(..)) {
// crate::logging::setup_tracing_logger()?;
// }
match self.command {
Commands::BuildInfo(args) => build_info::execute(args)?,
Commands::BondingInformation(args) => {
{ Self::execute_async(bonding_information::execute(args))? }?
}
Commands::NodeDetails(args) => { Self::execute_async(node_details::execute(args))? }?,
Commands::Run(args) => { Self::execute_async(run::execute(*args))? }?,
Commands::Migrate(args) => migrate::execute(*args)?,
Commands::Sign(args) => { Self::execute_async(sign::execute(args))? }?,
Commands::TestThroughput(args) => test_throughput::execute(args)?,
Commands::UnsafeResetSphinxKeys(args) => {
{ Self::execute_async(reset_sphinx_keys::execute(args))? }?
}
// Sync commands get logger w. no OTEL
Commands::BuildInfo(args) => {
setup_no_otel_logger()?;
build_info::execute(args)?
},
Commands::Migrate(args) => {
setup_no_otel_logger()?;
migrate::execute(*args)?
},
Commands::Debug(debug) => match debug.command {
DebugCommands::ResetProvidersGatewayDbs(args) => {
{ Self::execute_async(debug::reset_providers_dbs::execute(args))? }?
let _ = Self::execute_async(debug::reset_providers_dbs::execute(args))?;
}
},
Commands::TestThroughput(args) => {
// Has its own logging setup
test_throughput::execute(args)?
},
// SigNoz/OTEL run in async context
Commands::BondingInformation(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard = setup_tracing_logger("nym-node".to_string())
.map_err(TracingError::from)?;
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
bonding_information::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
bonding_information::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
Commands::NodeDetails(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard = setup_tracing_logger("nym-node".to_string())
.map_err(TracingError::from)?;
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
node_details::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
node_details::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
Commands::Run(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard = setup_tracing_logger("nym-node".to_string())
.map_err(TracingError::from)?;
tracing::warn!("OpenTelemetry is enabled for this nym-node instance.");
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
run::execute(*args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
run::execute(*args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
Commands::Sign(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard = setup_tracing_logger("nym-node".to_string())
.map_err(TracingError::from)?;
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
sign::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
sign::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
Commands::UnsafeResetSphinxKeys(args) => Self::execute_async(async move {
#[cfg(feature = "otel")]
{
let _guard = setup_tracing_logger("nym-node".to_string())
.map_err(TracingError::from)?;
let main_span = tracing::info_span!("startup", service = "nym-node");
async {
reset_sphinx_keys::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
.instrument(main_span)
.await
}
#[cfg(not(feature = "otel"))]
{
setup_no_otel_logger().expect("failed to initialize logging");
reset_sphinx_keys::execute(args).await?;
Ok::<(), anyhow::Error>(())
}
})??,
}
Ok(())
}
+14 -14
View File
@@ -427,20 +427,20 @@ impl Config {
pub fn validate(&self) -> Result<(), NymNodeError> {
self.mixnet.validate()?;
// it's not allowed to run mixnode mode alongside entry mode
if self.modes.mixnode && self.modes.entry {
return Err(NymNodeError::config_validation_failure(
"illegal modes configuration - node cannot run as a mixnode and an entry gateway",
));
}
// nor it's allowed to run mixnode mode alongside exit mode
// (use two separate checks for better error messages)
if self.modes.mixnode && self.modes.exit {
return Err(NymNodeError::config_validation_failure(
"illegal modes configuration - node cannot run as a mixnode and an exit gateway",
));
}
// // it's not allowed to run mixnode mode alongside entry mode
// if self.modes.mixnode && self.modes.entry {
// return Err(NymNodeError::config_validation_failure(
// "illegal modes configuration - node cannot run as a mixnode and an entry gateway",
// ));
// }
//
// // nor it's allowed to run mixnode mode alongside exit mode
// // (use two separate checks for better error messages)
// if self.modes.mixnode && self.modes.exit {
// return Err(NymNodeError::config_validation_failure(
// "illegal modes configuration - node cannot run as a mixnode and an exit gateway",
// ));
// }
Ok(())
}
+3
View File
@@ -76,6 +76,9 @@ pub enum KeyIOFailure {
#[derive(Debug, Error)]
pub enum NymNodeError {
// #[error("Failed to setup tracing logger")]
// TracingSetupFailure(#[source] anyhow::Error),
#[error("this binary version no longer supports migration from legacy mixnodes and gateways")]
UnsupportedMigration,
+5 -26
View File
@@ -1,11 +1,11 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_bin_common::logging::{default_tracing_env_filter, default_tracing_fmt_layer};
use nym_bin_common::logging::default_tracing_env_filter;
use tracing_subscriber::filter::Directive;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
// use tracing_subscriber::layer::SubscriberExt;
// use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
@@ -20,25 +20,4 @@ pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
filter = filter.add_directive(directive_checked(format!("{crate_name}=warn"))?);
}
Ok(filter)
}
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
let stderr_layer =
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
let console_layer = console_subscriber::spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(stderr_layer)
.init();
} else {
tracing_subscriber::registry()
.with(stderr_layer)
.init();
}}
Ok(())
}
}
+1
View File
@@ -481,6 +481,7 @@ impl ConnectionHandler {
.await
}
#[instrument(skip_all)]
pub(crate) async fn handle_stream(
&mut self,
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
+3 -3
View File
@@ -4,7 +4,7 @@
use crate::node::mixnet::SharedData;
use nym_task::ShutdownToken;
use std::net::SocketAddr;
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info, instrument, Instrument, trace};
pub(crate) struct Listener {
bind_address: SocketAddr,
@@ -18,7 +18,7 @@ impl Listener {
shared_data,
}
}
#[instrument(skip_all, level = "debug")]
pub(crate) async fn run(&mut self, shutdown: ShutdownToken) {
info!("attempting to run mixnet listener on {}", self.bind_address);
@@ -38,7 +38,7 @@ impl Listener {
trace!("mixnet listener: received shutdown");
break
}
connection = tcp_listener.accept() => {
connection = tcp_listener.accept().in_current_span() => {
self.shared_data.try_handle_connection(connection);
}
}
@@ -13,7 +13,7 @@ use nym_sphinx_forwarding::packet::MixPacket;
use nym_task::ShutdownToken;
use std::io;
use tokio::time::Instant;
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, instrument, trace, warn};
pub(crate) mod global;
@@ -46,6 +46,7 @@ impl<C, F> PacketForwarder<C, F> {
self.packet_sender.clone()
}
#[instrument(skip_all)]
fn forward_packet(&mut self, packet: MixPacket)
where
C: SendWithoutResponse,
@@ -78,6 +79,7 @@ impl<C, F> PacketForwarder<C, F> {
self.forward_packet(delayed_packet);
}
#[instrument(skip_all)]
fn handle_new_packet(&mut self, new_packet: PacketToForward)
where
C: SendWithoutResponse,
@@ -120,6 +122,7 @@ impl<C, F> PacketForwarder<C, F> {
.update_packet_forwarder_queue_size(channel_size)
}
#[instrument(skip_all)]
pub async fn run(&mut self, shutdown_token: ShutdownToken)
where
C: SendWithoutResponse,
+3 -2
View File
@@ -23,7 +23,7 @@ use std::time::Duration;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::{debug, error};
use tracing::{debug, error, instrument, Instrument};
pub(crate) mod final_hop;
@@ -164,6 +164,7 @@ impl SharedData {
}
}
#[instrument(skip_all)]
pub(super) fn try_handle_connection(
&self,
accepted: io::Result<(TcpStream, SocketAddr)>,
@@ -173,7 +174,7 @@ impl SharedData {
debug!("accepted incoming mixnet connection from: {remote_addr}");
let mut handler = ConnectionHandler::new(self, remote_addr);
let join_handle =
tokio::spawn(async move { handler.handle_connection(socket).await });
tokio::spawn(async move { handler.handle_connection(socket).in_current_span().await });
self.log_connected_clients();
Some(join_handle)
}
+9 -4
View File
@@ -66,7 +66,7 @@ use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, info, trace};
use tracing::{debug, info, instrument, Instrument, trace};
use zeroize::Zeroizing;
pub mod bonding_information;
@@ -601,6 +601,7 @@ impl NymNode {
})
}
#[instrument(skip_all)]
async fn start_gateway_tasks(
&mut self,
cached_network: CachedNetwork,
@@ -637,7 +638,7 @@ impl NymNode {
.build_websocket_listener(active_clients_store.clone())
.await?;
self.shutdown_tracker()
.try_spawn_named(async move { websocket.run().await }, "EntryWebsocket");
.try_spawn_named(async move { websocket.run().in_current_span().await }, "EntryWebsocket");
} else {
info!("node not running in entry mode: the websocket will remain closed");
}
@@ -680,7 +681,8 @@ impl NymNode {
let authenticator = gateway_tasks_builder
.build_wireguard_authenticator(topology_provider)
.await?;
let started_authenticator = authenticator.start_service_provider().await?;
let started_authenticator = authenticator.start_service_provider()
.await?;
active_clients_store.insert_embedded(started_authenticator.handle);
info!(
@@ -1050,6 +1052,7 @@ impl NymNode {
Ok(())
}
#[instrument(skip_all)]
pub(crate) async fn start_mixnet_listener<F>(
&self,
active_clients_store: &ActiveClientsStore,
@@ -1117,7 +1120,7 @@ impl NymNode {
let shutdown_token = self.shutdown_token();
self.shutdown_tracker().try_spawn_named(
async move { mixnet_listener.run(shutdown_token).await },
async move { mixnet_listener.run(shutdown_token).in_current_span().await },
"MixnetListener",
);
@@ -1146,6 +1149,7 @@ impl NymNode {
Ok(())
}
#[instrument(skip_all)]
async fn start_nym_node_tasks(mut self) -> Result<ShutdownManager, NymNodeError> {
info!("starting Nym Node {} with the following modes: mixnode: {}, entry: {}, exit: {}, wireguard: {}",
self.ed25519_identity_key(),
@@ -1219,6 +1223,7 @@ impl NymNode {
Ok(self.shutdown_manager)
}
#[instrument(skip_all)]
pub(crate) async fn run(mut self) -> Result<(), NymNodeError> {
let mut shutdown_signals = self.shutdown_manager.detach_shutdown_signals();
+1 -1
View File
@@ -20,7 +20,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing = { workspace = true }
url = { workspace = true }
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "basic_tracing"] }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
nym-ecash-signer-check = { path = "../common/ecash-signer-check" }
nym-network-defaults = { path = "../common/network-defaults" }
nym-task = { path = "../common/task" }
+2 -2
View File
@@ -4,7 +4,7 @@
use crate::cli::Cli;
use clap::Parser;
use nym_bin_common::bin_info_owned;
use nym_bin_common::logging::setup_tracing_logger;
use nym_bin_common::logging::setup_no_otel_logger;
use tracing::{info, trace};
mod cli;
@@ -13,7 +13,7 @@ pub(crate) mod test_result;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
let cli = Cli::parse();
trace!("args: {cli:#?}");
+2 -2
View File
@@ -32,8 +32,8 @@ humantime = { workspace = true }
humantime-serde.workspace = true
# internal
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "basic_tracing"] }
nym-config = { path = "../common/config" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
nym-config = { path = "../common/config" }
nym-ecash-time = { path = "../common/ecash-time" }
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
nym-compact-ecash = { path = "../common/nym_offline_compact_ecash" }
+2 -2
View File
@@ -8,7 +8,7 @@
use crate::cli::Cli;
use clap::{crate_name, crate_version, Parser};
use nym_bin_common::logging::{maybe_print_banner, setup_tracing_logger};
use nym_bin_common::logging::{maybe_print_banner, setup_no_otel_logger};
use nym_network_defaults::setup_env;
pub mod cli;
@@ -25,7 +25,7 @@ async fn main() -> anyhow::Result<()> {
let args = Cli::parse();
setup_env(args.config_env_file.as_ref());
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
if !args.no_banner {
maybe_print_banner(crate_name!(), crate_version!());
-9416
View File
File diff suppressed because it is too large Load Diff
@@ -13,5 +13,5 @@ clap = { version = "4.0", features = ["derive"] }
log = "0.4"
serde_json = "1.0.0"
nym-bin-common = { path = "../../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../../common/bin-common" }
nym-store-cipher = { path = "../../common/store-cipher" }
@@ -8,7 +8,7 @@
use anyhow::{anyhow, Result};
use clap::Parser;
use nym_bin_common::logging::setup_tracing_logger;
use nym_bin_common::logging::setup_no_otel_logger;
use nym_store_cipher::{
Aes256Gcm, Algorithm, EncryptedData, KdfInfo, Params, StoreCipher, Version, ARGON2_SALT_SIZE,
CURRENT_VERSION,
@@ -52,7 +52,7 @@ enum ParseMode {
}
fn main() -> Result<()> {
setup_tracing_logger();
setup_no_otel_logger().expect("failed to initialize logging");
let args = Args::parse();
let file = File::open(args.file)?;
let parse = if args.raw {
+1 -1
View File
@@ -13,7 +13,7 @@ crate-type = ["cdylib"]
tokio = { workspace = true, features = ["full"] }
# Nym clients, addressing, packet format, common tools (logging), ffi shared
nym-sdk = { path = "../../rust/nym-sdk/" }
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../../../common/bin-common" }
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
nym-ffi-shared = { path = "../shared" }
lazy_static = { workspace = true }
+1 -1
View File
@@ -12,7 +12,7 @@ use crate::types::types::{CMessageCallback, CStringCallback, ReceivedMessage, St
#[no_mangle]
pub extern "C" fn init_logging() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
}
#[no_mangle]
+1 -1
View File
@@ -14,7 +14,7 @@ uniffi = { workspace = true, features = ["cli"] }
# Nym clients, addressing, packet format, common tools (logging), ffi shared
nym-sdk = { path = "../../rust/nym-sdk/" }
nym-crypto = { path = "../../../common/crypto" }
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../../../common/bin-common" }
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
nym-ffi-shared = { path = "../shared" }
# Async runtime
+1 -1
View File
@@ -36,7 +36,7 @@ enum GoWrapError {
#[no_mangle]
fn init_logging() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
}
#[no_mangle]
+23 -6
View File
@@ -43,9 +43,7 @@ nym-socks5-requests = { path = "../../../common/socks5/requests" }
nym-ordered-buffer = { path = "../../../common/socks5/ordered-buffer" }
nym-service-providers-common = { path = "../../../service-providers/common" }
nym-sphinx-addressing = { path = "../../../common/nymsphinx/addressing" }
nym-bin-common = { path = "../../../common/bin-common", features = [
"basic_tracing",
] }
nym-bin-common = { path = "../../../common/bin-common" }
bytecodec = { workspace = true }
httpcodec = { workspace = true }
bytes = { workspace = true }
@@ -71,10 +69,20 @@ tokio-util.workspace = true
uuid = { workspace = true, features = ["v4", "serde"] }
bincode = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }
# tracing.workspace = true
# tracing-subscriber = { workspace = true, features = ["env-filter"] }
dirs.workspace = true
opentelemetry = { workspace = true, optional = true}
opentelemetry_sdk = { workspace = true, optional = true }
tracing = { workspace = true, features = ["std"] }
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
tracing-core = { workspace = true }
tracing-opentelemetry = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true, optional = true }
opentelemetry-semantic-conventions = { workspace = true, optional = true }
[dev-dependencies]
anyhow = { workspace = true }
dotenvy = { workspace = true }
@@ -82,7 +90,7 @@ reqwest = { workspace = true, features = ["json", "socks"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
time = { workspace = true }
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
nym-bin-common = { path = "../../../common/bin-common" }
# extra dependencies for libp2p examples
#libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub" ]}
@@ -93,3 +101,12 @@ hex = { workspace = true }
[features]
libp2p-vanilla = []
otel = [
"nym-bin-common/otel",
"nym-gateway-requests/otel",
"opentelemetry",
"opentelemetry_sdk",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",
"tracing-opentelemetry",
]
+1 -1
View File
@@ -6,7 +6,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// right now, only sandbox has coconut setup
// this should be run from the `sdk/rust/nym-sdk` directory
setup_env(Some("../../../envs/sandbox.env"));
+1 -1
View File
@@ -3,7 +3,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Create client builder, including ephemeral keys. The builder can be usable in the context
// where you don't want to connect just yet.
@@ -4,7 +4,7 @@ use std::path::PathBuf;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Specify some config options
let config_dir = PathBuf::from("/tmp/mixnet-client");
+1 -1
View File
@@ -3,7 +3,7 @@
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
todo!()
}
+1 -1
View File
@@ -12,7 +12,7 @@ use tokio::signal::ctrl_c;
// Run with: cargo run --example client_pool -- ../../../envs/<NETWORK>.env
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to setup logging");
setup_env(std::env::args().nth(1));
let conn_pool = ClientPool::new(2); // Start the Client Pool with 2 Clients always being kept in reserve
@@ -1,102 +1,104 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_sdk::mixnet;
use futures::StreamExt;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_topology::provider_trait::{async_trait, ToTopologyMetadata, TopologyProvider};
use nym_topology::{EpochRewardedSet, NymTopology};
use nym_validator_client::nym_api::NymApiClientExt;
use url::Url;
struct MyTopologyProvider {
validator_client: nym_http_api_client::Client,
}
impl MyTopologyProvider {
fn new(nym_api_url: Url) -> MyTopologyProvider {
let validator_client = nym_http_api_client::Client::builder(nym_api_url)
.expect("Failed to create API client builder")
.build()
.expect("Failed to build API client");
MyTopologyProvider { validator_client }
}
async fn get_topology(&self) -> NymTopology {
let rewarded_set = self
.validator_client
.get_current_rewarded_set()
.await
.unwrap();
let mixnodes_response = self
.validator_client
.get_all_basic_active_mixing_assigned_nodes_with_metadata()
.await
.unwrap();
let metadata = mixnodes_response.metadata.to_topology_metadata();
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
let mut base_topology = NymTopology::new(metadata, epoch_rewarded_set, Vec::new());
// in our topology provider only use mixnodes that have node_id divisible by 3
// and has exactly 100 performance score
// why? because this is just an example to showcase arbitrary uses and capabilities of this trait
let filtered_mixnodes = mixnodes_response
.nodes
.into_iter()
.filter(|mix| mix.node_id % 3 == 0 && mix.performance.is_hundred())
.collect::<Vec<_>>();
let gateways = self
.validator_client
.get_all_basic_entry_assigned_nodes_with_metadata()
.await
.unwrap()
.nodes;
base_topology.add_skimmed_nodes(&filtered_mixnodes);
base_topology.add_skimmed_nodes(&gateways);
base_topology
}
}
#[async_trait]
impl TopologyProvider for MyTopologyProvider {
// this will be manually refreshed on a timer specified inside mixnet client config
async fn get_new_topology(&mut self) -> Option<NymTopology> {
Some(self.get_topology().await)
}
}
use nym_sdk::{mixnet};
use nym_topology::{
CachedEpochRewardedSet, EntryDetails, HardcodedTopologyProvider, NymTopology,
NymTopologyMetadata, RoutingNode, SupportedRoles,
};
use std::time::Duration;
use time::OffsetDateTime;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to setup logging");
let nym_api = "https://validator.nymtech.net/api/".parse().unwrap();
let my_topology_provider = MyTopologyProvider::new(nym_api);
// let nym_api = "https://validator.nymtech.net/api/".parse().unwrap();
// let my_topology_provider = MyTopologyProvider::new(nym_api);
let topology_metadata = NymTopologyMetadata::new(0, 0, OffsetDateTime::now_utc());
let mut rewarded_set = CachedEpochRewardedSet::default();
rewarded_set.entry_gateways.insert(1);
rewarded_set.layer1.insert(1);
rewarded_set.layer2.insert(1);
rewarded_set.layer3.insert(1);
let nodes = vec![RoutingNode {
node_id: 1,
mix_host: "127.0.0.1:1789".parse().unwrap(),
entry: Some(EntryDetails {
ip_addresses: vec!["127.0.0.1".parse().unwrap()],
clients_ws_port: 9000,
hostname: None,
clients_wss_port: None,
}),
identity_key: "PUT IDENTITY KEY HERE"
.parse()
.unwrap(),
sphinx_key: "PUT SPHINX KEY HERE"
.parse()
.unwrap(),
supported_roles: SupportedRoles {
mixnode: true,
mixnet_entry: true,
mixnet_exit: true,
},
}];
let topology_provider =
HardcodedTopologyProvider::new(NymTopology::new(topology_metadata, rewarded_set, nodes));
// Passing no config makes the client fire up an ephemeral session and figure things out on its own
let mut client = mixnet::MixnetClientBuilder::new_ephemeral()
.custom_topology_provider(Box::new(my_topology_provider))
.custom_topology_provider(Box::new(topology_provider))
.build()
.unwrap()
.connect_to_mixnet()
.await
.unwrap();
let our_address = client.nym_address();
let our_address = *client.nym_address();
println!("Our client nym address is: {our_address}");
// Send a message through the mixnet to ourselves
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
let sender = client.split_sender();
println!("Waiting for message (ctrl-c to exit)");
client
.on_messages(|msg| println!("Received: {}", String::from_utf8_lossy(&msg.message)))
.await;
const MAX_MESSAGES: usize = 100;
// receiving task
let receiving_task_handle = tokio::spawn(async move {
let mut received_count = 0;
while let Some(received) = client.next().await {
received_count += 1;
println!(
"{received_count}: received: {}",
String::from_utf8_lossy(&received.message)
);
if received_count >= MAX_MESSAGES {
break;
}
}
client.disconnect().await;
});
// sending task
let sending_task_handle = tokio::spawn(async move {
loop {
if sender
.send_plain_message(our_address, "hello there")
.await
.is_err()
{
break;
}
sleep(Duration::from_millis(100)).await;
}
});
// wait for both tasks to be done
println!("waiting for shutdown");
sending_task_handle.await.unwrap();
receiving_task_handle.await.unwrap();
}
@@ -0,0 +1,115 @@
use nym_sdk::mixnet::{
AnonymousSenderTag, MixnetClientBuilder, MixnetMessageSender, ReconstructedMessage,
};
use nym_topology::{CachedEpochRewardedSet, EntryDetails, HardcodedTopologyProvider, NymTopology, NymTopologyMetadata, RoutingNode, SupportedRoles};
#[cfg(feature = "otel")]
use opentelemetry::trace::{TraceContextExt, Tracer};
#[cfg(feature = "otel")]
use opentelemetry::{global, Context};
use time::OffsetDateTime;
use tracing::warn;
use tracing::instrument;
#[tokio::main]
#[instrument(name = "sdk-example-surb-reply", skip_all)]
async fn main() {
#[cfg(feature = "otel")]
{
nym_bin_common::opentelemetry::setup_tracing_logger("local-sdk-example-surb-reply".to_string()).unwrap();
let tracer = global::tracer("local-sdk-example-surb-reply");
let span = tracer.start("client-root-span");
let cx = Context::current_with_span(span);
let _guard = cx.clone().attach();
let trace_id = cx.span().span_context().trace_id();
warn!("Main TRACE_ID: {:?}", trace_id);
}
#[cfg(not(feature = "otel"))]
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Create a mixnet client which connect to a local node
let topology_metadata = NymTopologyMetadata::new(0, 0, OffsetDateTime::now_utc());
let mut rewarded_set = CachedEpochRewardedSet::default();
rewarded_set.entry_gateways.insert(1);
rewarded_set.layer1.insert(1);
rewarded_set.layer2.insert(1);
rewarded_set.layer3.insert(1);
let nodes = vec![RoutingNode {
node_id: 1,
mix_host: "127.0.0.1:1789".parse().unwrap(),
entry: Some(EntryDetails {
ip_addresses: vec!["127.0.0.1".parse().unwrap()],
clients_ws_port: 9000,
hostname: None,
clients_wss_port: None,
}),
identity_key: "put here identity_key"
.parse()
.unwrap(),
sphinx_key: "put here sphinx_key"
.parse()
.unwrap(),
supported_roles: SupportedRoles {
mixnode: true,
mixnet_entry: true,
mixnet_exit: true,
},
}];
let topology_provider =
HardcodedTopologyProvider::new(NymTopology::new(topology_metadata, rewarded_set, nodes));
let mut client = MixnetClientBuilder::new_ephemeral()
.custom_topology_provider(Box::new(topology_provider))
.build().unwrap()
.connect_to_mixnet()
.await
.unwrap();
let our_address = client.nym_address();
println!("\nOur client nym address is: {our_address}");
// Send a message through the mixnet to ourselves using our nym address
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
// we're going to parse the sender_tag (AnonymousSenderTag) from the incoming message and use it to 'reply' to ourselves instead of our Nym address.
// we know there will be a sender_tag since the sdk sends SURBs along with messages by default.
println!("Waiting for message\n");
// get the actual message - discard the empty vec sent along with a potential SURB topup request
let mut message: Vec<ReconstructedMessage> = Vec::new();
while let Some(new_message) = client.wait_for_messages().await {
if new_message.is_empty() {
continue;
}
message = new_message;
break;
}
let mut parsed = String::new();
if let Some(r) = message.first() {
parsed = String::from_utf8(r.message.clone()).unwrap();
}
// parse sender_tag: we will use this to reply to sender without needing their Nym address
let return_recipient: AnonymousSenderTag = message[0].sender_tag.unwrap();
println!(
"\nReceived the following message: {parsed} \nfrom sender with surb bucket {return_recipient}"
);
// reply to self with it: note we use `send_str_reply` instead of `send_str`
println!("Replying with using SURBs");
client
.send_reply(return_recipient, "hi an0n!")
.await
.unwrap();
println!("Waiting for message (once you see it, ctrl-c to exit)\n");
client
.on_messages(|msg| println!("\nReceived: {}", String::from_utf8_lossy(&msg.message)))
.await;
}
@@ -8,7 +8,7 @@ use nym_topology::provider_trait::async_trait;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Just some plain data to pretend we have some external storage that the application
// implementer is using.
@@ -7,7 +7,7 @@ use nym_topology::{NymTopology, NymTopologyMetadata, RoutingNode, SupportedRoles
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Passing no config makes the client fire up an ephemeral session and figure shit out on its own
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
@@ -7,7 +7,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Passing no config makes the client fire up an ephemeral session and figure stuff out on its own
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
+1 -1
View File
@@ -6,7 +6,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
// An example of creating a client relying on a testnet, in this case Sandbox.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// relative root is `sdk/rust/nym-sdk/` for fallback file path
let env_path =
std::env::var("NYM_ENV_PATH").unwrap_or_else(|_| "../../../envs/sandbox.env".to_string());
+1 -1
View File
@@ -3,7 +3,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// Passing no config makes the client fire up an ephemeral session and figure shit out on its own
// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
+1 -1
View File
@@ -2,7 +2,7 @@ use nym_sdk::mixnet;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
println!("Connecting receiver");
let mut receiving_client = mixnet::MixnetClient::connect_new().await.unwrap();
+241 -56
View File
@@ -1,72 +1,257 @@
use nym_sdk::mixnet::{
AnonymousSenderTag, MixnetClientBuilder, MixnetMessageSender, ReconstructedMessage,
StoragePaths,
};
use std::path::PathBuf;
use tempfile::TempDir;
use nym_sdk::DebugConfig;
#[cfg(feature = "otel")]
use opentelemetry::trace::TraceContextExt;
#[cfg(feature = "otel")]
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing::warn;
use tracing::instrument;
#[cfg(feature = "otel")]
use tracing::Instrument;
#[tokio::main]
#[instrument(name = "sdk-example-surb-reply", skip_all)]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Setup OpenTelemetry tracing
#[cfg(feature = "otel")]
{
let _guard = nym_bin_common::opentelemetry::setup_tracing_logger("sdk-example-surb-reply".to_string()).unwrap();
let main_span = tracing::info_span!("startup", service = "sdk-example-surb-reply");
async {
let tracing_context = tracing::Span::current();
tracing::info!("Current tracing context: {:?}", tracing_context);
let cx = tracing::Span::current().context();
let sc = cx.span();
let spcx = sc.span_context();
tracing::debug!("Current OTEL context: {:?}, trace_id: {:?}", cx, spcx.trace_id());
// Specify some config options
let config_dir: PathBuf = TempDir::new().unwrap().path().to_path_buf();
let storage_paths = StoragePaths::new_from_dir(&config_dir).unwrap();
// Ignore performance requirements for the sake of the example
let mut debug_config = DebugConfig::default();
debug_config.cover_traffic.disable_loop_cover_traffic_stream = true;
debug_config
.traffic
.disable_main_poisson_packet_distribution = true;
// Create the client with a storage backend, and enable it by giving it some paths. If keys
// exists at these paths, they will be loaded, otherwise they will be generated.
let client = MixnetClientBuilder::new_with_default_storage(storage_paths)
.await
.unwrap()
.build()
.unwrap();
debug_config.topology.minimum_mixnode_performance = 0;
debug_config.topology.minimum_gateway_performance = 0;
// Now we connect to the mixnet, using keys now stored in the paths provided.
let mut client = client.connect_to_mixnet().await.unwrap();
// Create a mixnet client which connect to a specific node
let client_builder = MixnetClientBuilder::new_ephemeral();
let mixnet_client = client_builder
.debug_config(debug_config)
.request_gateway("FtR9Mb9y9EViYU3at6Qf7MzNHaMw8gofMicwqoscMBMP".to_string())
.build()
.unwrap();
// Be able to get our client address
let our_address = client.nym_address();
println!("\nOur client nym address is: {our_address}");
// Now we connect to the mixnet, using keys now stored in the paths provided.
// let mut client = client.connect_to_mixnet().await.unwrap();
// Send a message through the mixnet to ourselves using our nym address
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
let mut client = mixnet_client.connect_to_mixnet().await.unwrap();
// we're going to parse the sender_tag (AnonymousSenderTag) from the incoming message and use it to 'reply' to ourselves instead of our Nym address.
// we know there will be a sender_tag since the sdk sends SURBs along with messages by default.
println!("Waiting for message\n");
// Be able to get our client address
let our_address = client.nym_address();
println!("\nOur client nym address is: {our_address}");
// get the actual message - discard the empty vec sent along with a potential SURB topup request
let mut message: Vec<ReconstructedMessage> = Vec::new();
while let Some(new_message) = client.wait_for_messages().await {
if new_message.is_empty() {
continue;
// Send a message through the mixnet to ourselves using our nym address
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
// we're going to parse the sender_tag (AnonymousSenderTag) from the incoming message and use it to 'reply' to ourselves instead of our Nym address.
// we know there will be a sender_tag since the sdk sends SURBs along with messages by default.
println!("Waiting for message\n");
// get the actual message - discard the empty vec sent along with a potential SURB topup request
let mut message: Vec<ReconstructedMessage> = Vec::new();
while let Some(new_message) = client.wait_for_messages().await {
if new_message.is_empty() {
continue;
}
message = new_message;
break;
}
let mut parsed = String::new();
if let Some(r) = message.first() {
parsed = String::from_utf8(r.message.clone()).unwrap();
}
// parse sender_tag: we will use this to reply to sender without needing their Nym address
let return_recipient: AnonymousSenderTag = message[0].sender_tag.unwrap();
println!(
"\nReceived the following message: {parsed} \nfrom sender with surb bucket {return_recipient}"
);
// reply to self with it: note we use `send_str_reply` instead of `send_str`
println!("Replying with using SURBs");
client
.send_reply(return_recipient, "hi an0n!")
.await
.unwrap();
println!("Waiting for message (once you see it, ctrl-c to exit)\n");
client
.on_messages(|msg| println!("\nReceived: {}", String::from_utf8_lossy(&msg.message)))
.await;
}.instrument(main_span).await;
}
// due to the way instrumentation works in async contexts, the totality of the async code we want to trace needs to be under the same block
// This is ugly and unfortunate but cannot think of another way around it right now.
#[cfg(not(feature = "otel"))]
{
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
let mut debug_config = DebugConfig::default();
debug_config.cover_traffic.disable_loop_cover_traffic_stream = true;
debug_config
.traffic
.disable_main_poisson_packet_distribution = true;
debug_config.topology.minimum_mixnode_performance = 0;
debug_config.topology.minimum_gateway_performance = 0;
let client_builder = MixnetClientBuilder::new_ephemeral();
let mixnet_client = client_builder
.debug_config(debug_config)
.request_gateway("FtR9Mb9y9EViYU3at6Qf7MzNHaMw8gofMicwqoscMBMP".to_string())
.build()
.unwrap();
let mut client = mixnet_client.connect_to_mixnet().await.unwrap();
let our_address = client.nym_address();
println!("\nOur client nym address is: {our_address}");
client
.send_plain_message(*our_address, "hello there")
.await
.unwrap();
println!("Waiting for message\n");
let mut message: Vec<ReconstructedMessage> = Vec::new();
while let Some(new_message) = client.wait_for_messages().await {
if new_message.is_empty() {
continue;
}
message = new_message;
break;
}
message = new_message;
break;
let mut parsed = String::new();
if let Some(r) = message.first() {
parsed = String::from_utf8(r.message.clone()).unwrap();
}
let return_recipient: AnonymousSenderTag = message[0].sender_tag.unwrap();
println!(
"\nReceived the following message: {parsed} \nfrom sender with surb bucket {return_recipient}"
);
println!("Replying with using SURBs");
client
.send_reply(return_recipient, "hi an0n!")
.await
.unwrap();
println!("Waiting for message (once you see it, ctrl-c to exit)\n");
client
.on_messages(|msg| println!("\nReceived: {}", String::from_utf8_lossy(&msg.message)))
.await;
}
let mut parsed = String::new();
if let Some(r) = message.first() {
parsed = String::from_utf8(r.message.clone()).unwrap();
}
// parse sender_tag: we will use this to reply to sender without needing their Nym address
let return_recipient: AnonymousSenderTag = message[0].sender_tag.unwrap();
println!(
"\nReceived the following message: {parsed} \nfrom sender with surb bucket {return_recipient}"
);
// reply to self with it: note we use `send_str_reply` instead of `send_str`
println!("Replying with using SURBs");
client
.send_reply(return_recipient, "hi an0n!")
.await
.unwrap();
println!("Waiting for message (once you see it, ctrl-c to exit)\n");
client
.on_messages(|msg| println!("\nReceived: {}", String::from_utf8_lossy(&msg.message)))
.await;
}
// #[tokio::main]
// #[instrument(name = "sdk-example-surb-reply", skip_all)]
// async fn main() {
// // Setup OpenTelemetry tracing
// #[cfg(feature = "otel")]
// let _guard = nym_bin_common::opentelemetry::setup_tracing_logger("sdk-example-surb-reply".to_string()).unwrap();
// #[cfg(feature = "otel")]
// let main_span = tracing::info_span!("startup", service = "sdk-example-surb-reply");
// #[cfg(feature = "otel")]
// let _main_span_guard = main_span.enter();
// #[cfg(feature = "otel")]
// let tracing_context = tracing::Span::current();
// #[cfg(feature = "otel")]
// tracing::info!("Current tracing context: {:?}", tracing_context);
// #[cfg(feature = "otel")]
// let otel_context = Context::current();
// #[cfg(feature = "otel")]
// tracing::info!("Current OTEL context: {:?}", otel_context);
// #[cfg(not(feature = "otel"))]
// nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
// // Ignore performance requirements for the sake of the example
// let mut debug_config = DebugConfig::default();
// debug_config.cover_traffic.disable_loop_cover_traffic_stream = true;
// debug_config
// .traffic
// .disable_main_poisson_packet_distribution = true;
// debug_config.topology.minimum_mixnode_performance = 0;
// debug_config.topology.minimum_gateway_performance = 0;
// // Create a mixnet client which connect to a specific node
// let client_builder = MixnetClientBuilder::new_ephemeral();
// let mixnet_client = client_builder
// .debug_config(debug_config)
// .request_gateway("FtR9Mb9y9EViYU3at6Qf7MzNHaMw8gofMicwqoscMBMP".to_string())
// .build()
// .unwrap();
// // Now we connect to the mixnet, using keys now stored in the paths provided.
// // let mut client = client.connect_to_mixnet().await.unwrap();
// let mut client = mixnet_client.connect_to_mixnet().await.unwrap();
// // Be able to get our client address
// let our_address = client.nym_address();
// println!("\nOur client nym address is: {our_address}");
// // Send a message through the mixnet to ourselves using our nym address
// client
// .send_plain_message(*our_address, "hello there")
// .await
// .unwrap();
// // we're going to parse the sender_tag (AnonymousSenderTag) from the incoming message and use it to 'reply' to ourselves instead of our Nym address.
// // we know there will be a sender_tag since the sdk sends SURBs along with messages by default.
// println!("Waiting for message\n");
// // get the actual message - discard the empty vec sent along with a potential SURB topup request
// let mut message: Vec<ReconstructedMessage> = Vec::new();
// while let Some(new_message) = client.wait_for_messages().await {
// if new_message.is_empty() {
// continue;
// }
// message = new_message;
// break;
// }
// let mut parsed = String::new();
// if let Some(r) = message.first() {
// parsed = String::from_utf8(r.message.clone()).unwrap();
// }
// // parse sender_tag: we will use this to reply to sender without needing their Nym address
// let return_recipient: AnonymousSenderTag = message[0].sender_tag.unwrap();
// println!(
// "\nReceived the following message: {parsed} \nfrom sender with surb bucket {return_recipient}"
// );
// // reply to self with it: note we use `send_str_reply` instead of `send_str`
// println!("Replying with using SURBs");
// client
// .send_reply(return_recipient, "hi an0n!")
// .await
// .unwrap();
// println!("Waiting for message (once you see it, ctrl-c to exit)\n");
// client
// .on_messages(|msg| println!("\nReceived: {}", String::from_utf8_lossy(&msg.message)))
// .await;
// }
+28 -12
View File
@@ -31,6 +31,7 @@ use nym_crypto::hkdf::DerivationMaterial;
use nym_socks5_client_core::config::Socks5;
use nym_task::ShutdownTracker;
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::RoutingNode;
use nym_validator_client::{nyxd, QueryHttpRpcNyxdClient, UserAgent};
use rand::rngs::OsRng;
use std::path::Path;
@@ -38,6 +39,7 @@ use std::path::PathBuf;
#[cfg(unix)]
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use url::Url;
use zeroize::Zeroizing;
@@ -71,6 +73,7 @@ pub struct MixnetClientBuilder<S: MixnetClientStorage = Ephemeral> {
impl MixnetClientBuilder<Ephemeral> {
/// Creates a client builder with ephemeral storage.
#[must_use]
#[instrument(name = "MixnetClientBuilder::new_ephemeral", skip_all)]
pub fn new_ephemeral() -> Self {
MixnetClientBuilder {
..Default::default()
@@ -79,6 +82,7 @@ impl MixnetClientBuilder<Ephemeral> {
/// Create a client builder with default values.
#[must_use]
#[instrument(name = "MixnetClientBuilder::new", skip_all)]
pub fn new() -> Self {
Self::new_ephemeral()
}
@@ -537,27 +541,37 @@ where
}
}
async fn new_gateway_setup(&self) -> Result<GatewaySetup, ClientCoreError> {
/// Attempt to retrieve list of all gateways available for registration
async fn available_gateways(&mut self) -> Result<Vec<RoutingNode>, ClientCoreError> {
if let Some(ref mut custom_provider) = self.custom_topology_provider {
if let Some(topology) = custom_provider.get_new_topology().await {
return Ok(topology.entry_gateways().cloned().collect());
}
}
let nym_api_endpoints = self.get_api_endpoints();
let selection_spec = GatewaySelectionSpecification::new(
self.config.user_chosen_gateway.clone(),
None,
self.force_tls,
);
let user_agent = self.user_agent.clone();
let topology_cfg = &self.config.debug_config.topology;
let user_agent = self.user_agent.clone();
let mut rng = OsRng;
let available_gateways = gateways_for_init(
gateways_for_init(
&mut rng,
&nym_api_endpoints,
user_agent,
topology_cfg.minimum_gateway_performance,
topology_cfg.ignore_ingress_epoch_role,
)
.await?;
.await
}
async fn new_gateway_setup(&mut self) -> Result<GatewaySetup, ClientCoreError> {
let selection_spec = GatewaySelectionSpecification::new(
self.config.user_chosen_gateway.clone(),
None,
self.force_tls,
);
let available_gateways = self.available_gateways().await?;
Ok(GatewaySetup::New {
specification: selection_spec,
@@ -665,6 +679,7 @@ where
)
}
#[instrument(skip_all)]
async fn connect_to_mixnet_common(mut self) -> Result<(BaseClient, Recipient)> {
self.setup_client_keys().await?;
self.setup_gateway().await?;
@@ -801,6 +816,7 @@ where
/// let client = client.connect_to_mixnet().await.unwrap();
/// }
/// ```
#[instrument(skip_all)]
pub async fn connect_to_mixnet(self) -> Result<MixnetClient> {
if self.socks5_config.is_some() {
return Err(Error::Socks5Config { set: true });
@@ -24,6 +24,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::RwLockReadGuard;
use tracing::instrument;
/// Client connected to the Nym mixnet.
pub struct MixnetClient {
@@ -64,6 +65,7 @@ pub struct MixnetClient {
impl MixnetClient {
#[allow(clippy::too_many_arguments)]
#[instrument(name = "MixnetClient::new", skip_all)]
pub(crate) fn new(
nym_address: Recipient,
identity_keys: Arc<ed25519::KeyPair>,
+2
View File
@@ -59,6 +59,7 @@ impl MixnetMessageSinkTranslator for DefaultMixnetMessageSinkTranslator {
bytes,
self.lane,
self.packet_type,
None
)),
IncludedSurbs::Amount(surbs) => Ok(InputMessage::new_anonymous(
**recipient,
@@ -66,6 +67,7 @@ impl MixnetMessageSinkTranslator for DefaultMixnetMessageSinkTranslator {
*surbs,
self.lane,
self.packet_type,
None
)),
},
SinkDestination::Reply(tag) => Ok(InputMessage::new_reply(
+22
View File
@@ -5,8 +5,14 @@ use crate::mixnet::{AnonymousSenderTag, IncludedSurbs, Recipient};
use crate::Result;
use async_trait::async_trait;
use nym_client_core::client::inbound_messages::InputMessage;
#[cfg(feature = "otel")]
use nym_bin_common::opentelemetry::{
compact_id_generator::compress_trace_id,
context::extract_trace_id_from_tracing_cx,
};
use nym_sphinx::params::PacketType;
use nym_task::connections::TransmissionLane;
use tracing::instrument;
// defined to guarantee common interface regardless of whether you're using the full client
// or just the sending handler
@@ -35,6 +41,7 @@ pub trait MixnetMessageSender {
/// client.send_plain_message(recipient, "hi").await.unwrap();
/// }
/// ```
#[instrument(skip_all)]
async fn send_plain_message<M>(&self, address: Recipient, message: M) -> Result<()>
where
M: AsRef<[u8]> + Send,
@@ -60,6 +67,7 @@ pub trait MixnetMessageSender {
/// client.send_message(recipient, "hi".to_owned().into_bytes(), surbs).await.unwrap();
/// }
/// ```
#[instrument(skip_all)]
async fn send_message<M>(
&self,
address: Recipient,
@@ -69,6 +77,17 @@ pub trait MixnetMessageSender {
where
M: AsRef<[u8]> + Send,
{
// In case of surb opentelemetry tracing, we extract the context and trace_id
// in the 12 bytes format
#[cfg(feature = "otel")]
let trace_id = {
let trace_id = extract_trace_id_from_tracing_cx();
tracing::info!("[DEBUG] Trace id in send_message: {:?}", trace_id);
Some(compress_trace_id(&trace_id))
};
#[cfg(not(feature = "otel"))]
let trace_id = None;
let lane = TransmissionLane::General;
let input_msg = match surbs {
IncludedSurbs::Amount(surbs) => InputMessage::new_anonymous(
@@ -77,12 +96,14 @@ pub trait MixnetMessageSender {
surbs,
lane,
self.packet_type(),
trace_id,
),
IncludedSurbs::ExposeSelfAddress => InputMessage::new_regular(
address,
message.as_ref().to_vec(),
lane,
self.packet_type(),
trace_id,
),
};
self.send(input_msg).await
@@ -103,6 +124,7 @@ pub trait MixnetMessageSender {
/// client.send_reply(tag, b"hi").await.unwrap();
/// }
/// ```
#[instrument(skip_all)]
async fn send_reply<M>(&self, recipient_tag: AnonymousSenderTag, message: M) -> Result<()>
where
M: AsRef<[u8]> + Send,
@@ -32,7 +32,7 @@ struct Args {
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_tracing_logger();
nym_bin_common::logging::setup_no_otel_logger().expect("failed to setup logging");
let args = Args::parse();
let nym_addr: Recipient =

Some files were not shown because too many files have changed in this diff Show More