Compare commits
70 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8aa5e61ba6 | |||
| 3bda29aed7 | |||
| 69bba68060 | |||
| c2a06298d1 | |||
| 2831f4ae55 | |||
| 2344a43f62 | |||
| 8427b09ed7 | |||
| ec09c7e807 | |||
| 6eab87fe05 | |||
| 9c13fe534b | |||
| ad431e8b09 | |||
| b292c4a624 | |||
| 9a96be3d41 | |||
| 8e771c3530 | |||
| 47e1e912f5 | |||
| 4116bb321a | |||
| 60cde66451 | |||
| be37715552 | |||
| f05c5d2eae | |||
| e9cbd0aff5 | |||
| 36b07b1bcf | |||
| 827591631d | |||
| 682feac75e | |||
| 28058944f2 | |||
| ca9b1d656c | |||
| 2955e506de | |||
| 254a414c49 | |||
| 7d087996b4 | |||
| c5e66687b6 | |||
| faaafbeae4 | |||
| 32d7328c7b | |||
| 6ba20e7f54 | |||
| 74c064135b | |||
| 1fef24a6ba | |||
| 2ae73b7f31 | |||
| b93779850b | |||
| 651d0bb578 | |||
| 4af1835dbd | |||
| f1bf9861cd | |||
| 15f56bccb1 | |||
| 26da5da44f | |||
| b9a2efe0c9 | |||
| 8492c7161c | |||
| 76770fa30c | |||
| 9d774860e5 | |||
| 7d7bebacd2 | |||
| cfe8c08ca6 | |||
| 9d8c328ce1 | |||
| c4543c83d5 | |||
| 1fa615991b | |||
| 9f3061a2eb | |||
| 953f930c4b | |||
| c2b620de92 | |||
| d55fce1917 | |||
| bda45b405c | |||
| bb9f0540e8 | |||
| e3569ccc16 | |||
| 5c9d58bad3 | |||
| 6c03ab69fb | |||
| 89329ab3ee | |||
| fee2ae5964 | |||
| f56edf63b5 | |||
| dacd35470d | |||
| acc11e8776 | |||
| a11de8eb74 | |||
| fa603be538 | |||
| 2447520010 | |||
| 5888102359 | |||
| 7d7acc2691 | |||
| 20e6243738 |
Generated
+1106
-1188
File diff suppressed because it is too large
Load Diff
+8
-3
@@ -300,8 +300,11 @@ nix = "0.27.1"
|
||||
notify = "5.1.0"
|
||||
okapi = "0.7.0"
|
||||
once_cell = "1.21.3"
|
||||
opentelemetry = "0.19.0"
|
||||
opentelemetry-jaeger = "0.18.0"
|
||||
opentelemetry = "0.30.0"
|
||||
opentelemetry-otlp = "0.30.0"
|
||||
opentelemetry-semantic-conventions = "0.30.0"
|
||||
opentelemetry_sdk = "0.30.0"
|
||||
opentelemetry-stdout = "0.30.0"
|
||||
parking_lot = "0.12.3"
|
||||
pem = "0.8"
|
||||
petgraph = "0.6.5"
|
||||
@@ -360,8 +363,10 @@ toml = "0.8.22"
|
||||
tower = "0.5.2"
|
||||
tower-http = "0.5.2"
|
||||
tracing = "0.1.41"
|
||||
tracing-core = "0.1.33"
|
||||
tracing-log = "0.2"
|
||||
tracing-opentelemetry = "0.19.0"
|
||||
tracing-opentelemetry = "0.31.0"
|
||||
tracing-serde = "0.2.0"
|
||||
tracing-subscriber = "0.3.19"
|
||||
tracing-tree = "0.2.2"
|
||||
tracing-indicatif = "0.3.9"
|
||||
|
||||
@@ -46,7 +46,6 @@ nym-bandwidth-controller = { path = "../../common/bandwidth-controller" }
|
||||
nym-bin-common = { path = "../../common/bin-common", features = [
|
||||
"output_format",
|
||||
"clap",
|
||||
"basic_tracing",
|
||||
] }
|
||||
nym-client-core = { path = "../../common/client-core", features = [
|
||||
"fs-credentials-storage",
|
||||
@@ -71,3 +70,11 @@ nym-client-websocket-requests = { path = "websocket-requests" }
|
||||
nym-id = { path = "../../common/nym-id" }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
[features]
|
||||
default = []
|
||||
otel = [
|
||||
"nym-client-core/otel",
|
||||
"nym-bin-common/otel",
|
||||
"nym-gateway-requests/otel",
|
||||
]
|
||||
@@ -4,7 +4,7 @@
|
||||
use std::error::Error;
|
||||
|
||||
use clap::{crate_name, crate_version, Parser};
|
||||
use nym_bin_common::logging::{maybe_print_banner, setup_tracing_logger};
|
||||
use nym_bin_common::logging::{maybe_print_banner, setup_no_otel_logger};
|
||||
use nym_network_defaults::setup_env;
|
||||
|
||||
pub mod client;
|
||||
@@ -20,7 +20,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
if !args.no_banner {
|
||||
maybe_print_banner(crate_name!(), crate_version!());
|
||||
}
|
||||
setup_tracing_logger();
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
if let Err(err) = commands::execute(args).await {
|
||||
log::error!("{err}");
|
||||
|
||||
@@ -184,7 +184,14 @@ impl Handler {
|
||||
});
|
||||
|
||||
// the ack control is now responsible for chunking, etc.
|
||||
let input_msg = InputMessage::new_regular(recipient, message, lane, self.packet_type);
|
||||
let input_msg = InputMessage::new_regular(
|
||||
recipient,
|
||||
message,
|
||||
lane,
|
||||
self.packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
);
|
||||
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||
if !self.shutdown_token.is_cancelled() {
|
||||
error!("Failed to send message to the input buffer: {err}");
|
||||
@@ -216,8 +223,15 @@ impl Handler {
|
||||
TransmissionLane::ConnectionId(id)
|
||||
});
|
||||
|
||||
let input_msg =
|
||||
InputMessage::new_anonymous(recipient, message, reply_surbs, lane, self.packet_type);
|
||||
let input_msg = InputMessage::new_anonymous(
|
||||
recipient,
|
||||
message,
|
||||
reply_surbs,
|
||||
lane,
|
||||
self.packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
);
|
||||
if let Err(err) = self.msg_input.send(input_msg).await {
|
||||
if !self.shutdown_token.is_cancelled() {
|
||||
error!("Failed to send anonymous message to the input buffer: {err}");
|
||||
|
||||
@@ -27,7 +27,6 @@ zeroize = { workspace = true }
|
||||
nym-bin-common = { path = "../../common/bin-common", features = [
|
||||
"output_format",
|
||||
"clap",
|
||||
"basic_tracing",
|
||||
] }
|
||||
nym-client-core = { path = "../../common/client-core", features = [
|
||||
"fs-credentials-storage",
|
||||
@@ -54,3 +53,7 @@ nym-validator-client = { path = "../../common/client-libs/validator-client", fea
|
||||
[features]
|
||||
default = []
|
||||
eth = []
|
||||
otel = [
|
||||
"nym-socks5-client-core/otel",
|
||||
"nym-bin-common/otel",
|
||||
]
|
||||
@@ -4,7 +4,7 @@
|
||||
use std::error::Error;
|
||||
|
||||
use clap::{crate_name, crate_version, Parser};
|
||||
use nym_bin_common::logging::{maybe_print_banner, setup_tracing_logger};
|
||||
use nym_bin_common::logging::{maybe_print_banner, setup_no_otel_logger};
|
||||
use nym_network_defaults::setup_env;
|
||||
|
||||
mod commands;
|
||||
@@ -19,7 +19,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
if !args.no_banner {
|
||||
maybe_print_banner(crate_name!(), crate_version!());
|
||||
}
|
||||
setup_tracing_logger();
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
if let Err(err) = commands::execute(args).await {
|
||||
log::error!("{err}");
|
||||
|
||||
@@ -8,24 +8,30 @@ license = { workspace = true }
|
||||
repository = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
chrono = { workspace = true, optional = true }
|
||||
cfg-if = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"], optional = true }
|
||||
clap_complete = { workspace = true, optional = true }
|
||||
clap_complete_fig = { workspace = true, optional = true }
|
||||
const-str = { workspace = true }
|
||||
log = { workspace = true }
|
||||
opentelemetry = { workspace = true, optional = true }
|
||||
opentelemetry-otlp = { workspace = true,features=["metrics", "grpc-tonic", "tls",
|
||||
"tls-webpki-roots"], optional = true }
|
||||
opentelemetry-semantic-conventions = { workspace = true, features = ["semconv_experimental"], optional = true }
|
||||
opentelemetry-stdout = { workspace = true, features = ["trace", "metrics"], optional = true }
|
||||
opentelemetry_sdk = { workspace = true, optional = true }
|
||||
rand = { workspace = true, optional = true }
|
||||
schemars = { workspace = true, features = ["preserve_order"], optional = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true, optional = true }
|
||||
|
||||
## tracing
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
|
||||
tracing-tree = { workspace = true, optional = true }
|
||||
tracing = { workspace = true, optional = true }
|
||||
opentelemetry-jaeger = { workspace = true, features = ["rt-tokio", "collector_client", "isahc_collector_client"], optional = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-core = { workspace = true }
|
||||
tracing-opentelemetry = { workspace = true, optional = true }
|
||||
tracing-serde = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
|
||||
tracing-tree = { workspace = true }
|
||||
utoipa = { workspace = true, optional = true }
|
||||
opentelemetry = { workspace = true, features = ["rt-tokio"], optional = true }
|
||||
|
||||
|
||||
[build-dependencies]
|
||||
vergen = { workspace = true, features = ["build", "git", "gitcl", "rustc", "cargo"] }
|
||||
@@ -35,13 +41,17 @@ default = []
|
||||
openapi = ["utoipa"]
|
||||
output_format = ["serde_json", "dep:clap"]
|
||||
bin_info_schema = ["schemars"]
|
||||
basic_tracing = ["dep:tracing", "tracing-subscriber"]
|
||||
tracing = [
|
||||
"basic_tracing",
|
||||
"tracing-tree",
|
||||
"opentelemetry-jaeger",
|
||||
tokio-console = ["otel"]
|
||||
otel = [
|
||||
"chrono",
|
||||
"tracing-opentelemetry",
|
||||
"opentelemetry",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"opentelemetry-stdout",
|
||||
"opentelemetry_sdk",
|
||||
"serde_json",
|
||||
"rand",
|
||||
]
|
||||
clap = ["dep:clap", "dep:clap_complete", "dep:clap_complete_fig"]
|
||||
models = []
|
||||
|
||||
@@ -4,6 +4,9 @@
|
||||
pub mod build_information;
|
||||
pub mod logging;
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
pub mod opentelemetry;
|
||||
|
||||
#[cfg(feature = "clap")]
|
||||
pub mod completions;
|
||||
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
#[cfg(feature = "otel")]
|
||||
use opentelemetry_otlp::ExporterBuildError;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum TracingError {
|
||||
#[error("tracing logger already initialised")]
|
||||
TracingLoggerAlreadyInitialised,
|
||||
|
||||
#[error("Logging error: {0}")]
|
||||
TracingTryInitError(tracing_subscriber::util::TryInitError),
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
#[error("{0}")]
|
||||
TracingExporterBuildError(#[from] ExporterBuildError),
|
||||
|
||||
#[error("{0}")]
|
||||
TracingFilterParseError(#[from] tracing_subscriber::filter::ParseError),
|
||||
}
|
||||
@@ -1,19 +1,12 @@
|
||||
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
pub mod error;
|
||||
|
||||
use error::TracingError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::IsTerminal;
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use opentelemetry;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use opentelemetry_jaeger;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use tracing_opentelemetry;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use tracing_subscriber;
|
||||
#[cfg(feature = "tracing")]
|
||||
pub use tracing_tree;
|
||||
use tracing_subscriber::{filter::Directive, layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
#[derive(Debug, Default, Copy, Clone, Deserialize, PartialEq, Eq, Serialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
@@ -22,7 +15,6 @@ pub struct LoggingSettings {
|
||||
}
|
||||
|
||||
// don't call init so that we could attach additional layers
|
||||
#[cfg(feature = "basic_tracing")]
|
||||
pub fn build_tracing_logger() -> impl tracing_subscriber::layer::SubscriberExt {
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
@@ -31,7 +23,6 @@ pub fn build_tracing_logger() -> impl tracing_subscriber::layer::SubscriberExt {
|
||||
.with(default_tracing_env_filter())
|
||||
}
|
||||
|
||||
#[cfg(feature = "basic_tracing")]
|
||||
pub fn default_tracing_env_filter() -> tracing_subscriber::filter::EnvFilter {
|
||||
if ::std::env::var("RUST_LOG").is_ok() {
|
||||
tracing_subscriber::filter::EnvFilter::from_default_env()
|
||||
@@ -43,7 +34,6 @@ pub fn default_tracing_env_filter() -> tracing_subscriber::filter::EnvFilter {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "basic_tracing")]
|
||||
pub fn default_tracing_fmt_layer<S, W>(
|
||||
writer: W,
|
||||
) -> impl tracing_subscriber::Layer<S> + Sync + Send + 'static
|
||||
@@ -63,45 +53,47 @@ where
|
||||
.with_target(false)
|
||||
}
|
||||
|
||||
#[cfg(feature = "basic_tracing")]
|
||||
pub fn setup_tracing_logger() {
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
build_tracing_logger().init()
|
||||
/// Creates a tracing filter that sets more granular log levels for specific crates.
|
||||
/// This allows for finer control over logging verbosity.
|
||||
pub(crate) fn granual_filtered_env() -> Result<tracing_subscriber::filter::EnvFilter, TracingError>
|
||||
{
|
||||
fn directive_checked(directive: impl Into<String>) -> Result<Directive, TracingError> {
|
||||
directive.into().parse().map_err(From::from)
|
||||
}
|
||||
|
||||
let mut filter = default_tracing_env_filter();
|
||||
|
||||
// these crates are more granularly filtered
|
||||
let filter_crates = ["defguard_wireguard_rs"];
|
||||
for crate_name in filter_crates {
|
||||
filter = filter.add_directive(directive_checked(format!("{crate_name}=warn"))?);
|
||||
}
|
||||
Ok(filter)
|
||||
}
|
||||
|
||||
pub fn setup_no_otel_logger() -> Result<(), TracingError> {
|
||||
// Only set up if not already initialized
|
||||
if tracing::dispatcher::has_been_set() {
|
||||
// It shouldn't be - this is really checking that it is torn down between async command executions
|
||||
return Err(TracingError::TracingLoggerAlreadyInitialised);
|
||||
}
|
||||
|
||||
let registry = tracing_subscriber::registry()
|
||||
.with(default_tracing_fmt_layer(std::io::stderr))
|
||||
.with(granual_filtered_env()?);
|
||||
|
||||
registry
|
||||
.try_init()
|
||||
.map_err(|e| TracingError::TracingTryInitError(e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: This has to be a macro, running it as a function does not work for the file_appender for some reason
|
||||
#[cfg(feature = "tracing")]
|
||||
#[macro_export]
|
||||
macro_rules! setup_tracing {
|
||||
($service_name: expr) => {
|
||||
use nym_bin_common::logging::tracing_subscriber::layer::SubscriberExt;
|
||||
use nym_bin_common::logging::tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
let registry = nym_bin_common::logging::tracing_subscriber::Registry::default()
|
||||
.with(nym_bin_common::logging::tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with(
|
||||
nym_bin_common::logging::tracing_tree::HierarchicalLayer::new(4)
|
||||
.with_targets(true)
|
||||
.with_bracketed_fields(true),
|
||||
);
|
||||
|
||||
let tracer = nym_bin_common::logging::opentelemetry_jaeger::new_collector_pipeline()
|
||||
.with_endpoint("http://44.199.230.10:14268/api/traces")
|
||||
.with_service_name($service_name)
|
||||
.with_isahc()
|
||||
.with_trace_config(
|
||||
nym_bin_common::logging::opentelemetry::sdk::trace::config().with_sampler(
|
||||
nym_bin_common::logging::opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(
|
||||
0.1,
|
||||
),
|
||||
),
|
||||
)
|
||||
.install_batch(nym_bin_common::logging::opentelemetry::runtime::Tokio)
|
||||
.expect("Could not init tracer");
|
||||
|
||||
let telemetry = nym_bin_common::logging::tracing_opentelemetry::layer().with_tracer(tracer);
|
||||
|
||||
registry.with(telemetry).init();
|
||||
setup_no_otel_logger()
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
use opentelemetry::trace::{SpanId, TraceId};
|
||||
use opentelemetry_sdk::trace::IdGenerator;
|
||||
use rand::RngCore;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Compact13BytesIdGenerator;
|
||||
|
||||
impl IdGenerator for Compact13BytesIdGenerator {
|
||||
fn new_trace_id(&self) -> TraceId {
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut bytes = [0u8; 16];
|
||||
|
||||
// Fill the first 13 bytes with random data
|
||||
rng.fill_bytes(&mut bytes[0..12]);
|
||||
// Set the last 4 bytes to zero
|
||||
bytes[12] = 0;
|
||||
bytes[13] = 0;
|
||||
bytes[14] = 0;
|
||||
bytes[15] = 0;
|
||||
|
||||
TraceId::from_bytes(bytes)
|
||||
}
|
||||
|
||||
fn new_span_id(&self) -> SpanId {
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut bytes = [0u8; 8];
|
||||
rng.fill_bytes(&mut bytes);
|
||||
|
||||
SpanId::from_bytes(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compress_trace_id(trace_id: &TraceId) -> [u8; 12] {
|
||||
let bytes = trace_id.to_bytes();
|
||||
|
||||
let mut compressed = [0u8; 12];
|
||||
compressed.copy_from_slice(&bytes[0..12]);
|
||||
|
||||
compressed
|
||||
}
|
||||
|
||||
pub fn decompress_trace_id(compressed: &[u8; 12]) -> [u8; 16] {
|
||||
let mut bytes = [0u8; 16];
|
||||
bytes[0..12].copy_from_slice(compressed);
|
||||
bytes[12..].copy_from_slice(&[0u8; 4]);
|
||||
bytes
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
|
||||
use opentelemetry::trace::{SpanContext, TraceContextExt, TraceId};
|
||||
use opentelemetry::{Context, TraceFlags};
|
||||
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::IdGenerator};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use tracing::instrument;
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
/// Make a Carrier for context propagation
|
||||
pub struct ContextCarrier {
|
||||
data: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl ContextCarrier {
|
||||
pub fn new_empty() -> Self {
|
||||
ContextCarrier {
|
||||
data: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_data(data: HashMap<String, String>) -> Self {
|
||||
if data.is_empty() {
|
||||
return ContextCarrier::new_empty();
|
||||
}
|
||||
|
||||
ContextCarrier { data }
|
||||
}
|
||||
|
||||
pub fn new_with_current_context(context: Context) -> Self {
|
||||
let mut carrier = ContextCarrier::new_empty();
|
||||
let propagator = TraceContextPropagator::new();
|
||||
propagator.inject_context(&context, &mut carrier);
|
||||
carrier
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&String, &String)> {
|
||||
self.data.iter()
|
||||
}
|
||||
|
||||
pub fn from_map(data: HashMap<String, String>) -> Self {
|
||||
ContextCarrier { data }
|
||||
}
|
||||
|
||||
pub fn into_map(self) -> HashMap<String, String> {
|
||||
self.data
|
||||
}
|
||||
|
||||
pub fn extract_trace_id(&self) -> Option<TraceId> {
|
||||
self.get("traceparent").and_then(|tp| {
|
||||
let parts: Vec<&str> = tp.split('-').collect();
|
||||
if parts.len() == 4 {
|
||||
TraceId::from_hex(parts[1]).ok()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn extract_trace_id_into_bytes(&self) -> Option<[u8; 16]> {
|
||||
self.extract_trace_id().map(|id| id.to_bytes())
|
||||
}
|
||||
|
||||
pub fn extract_traceparent(&self) -> Option<String> {
|
||||
self.get("traceparent").map(|s| s.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl Injector for ContextCarrier {
|
||||
fn set(&mut self, key: &str, value: String) {
|
||||
self.data.insert(key.to_string(), value);
|
||||
}
|
||||
}
|
||||
|
||||
impl Extractor for ContextCarrier {
|
||||
fn get(&self, key: &str) -> Option<&str> {
|
||||
self.data.get(key).map(|s| s.as_str())
|
||||
}
|
||||
|
||||
fn keys(&self) -> Vec<&str> {
|
||||
self.data.keys().map(|k| k.as_str()).collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ContextCarrier {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self.data)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ManualContextPropagator {
|
||||
pub root_span: tracing::Span,
|
||||
pub trace_id: TraceId,
|
||||
}
|
||||
|
||||
impl ManualContextPropagator {
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub fn new(name: &str, context: HashMap<String, String>) -> Self {
|
||||
let carrier = ContextCarrier::new_with_data(context);
|
||||
let trace_id = match carrier.extract_trace_id() {
|
||||
Some(id) => id,
|
||||
None => Context::current().span().span_context().trace_id(),
|
||||
};
|
||||
|
||||
let root_span_builder = new_span_context_with_id(trace_id.clone());
|
||||
|
||||
let root_span = tracing::info_span!("trace_root", name = %name, trace_id = %trace_id);
|
||||
root_span.set_parent(root_span_builder);
|
||||
|
||||
ManualContextPropagator {
|
||||
root_span,
|
||||
trace_id,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub fn new_from_tid(name: &str, trace_id: TraceId) -> Self {
|
||||
let root_span_builder = new_span_context_with_id(trace_id.clone());
|
||||
|
||||
let root_span = tracing::info_span!("trace_root", name = %name, trace_id = %trace_id);
|
||||
root_span.set_parent(root_span_builder);
|
||||
|
||||
ManualContextPropagator {
|
||||
root_span,
|
||||
trace_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn root_span(&self) -> &tracing::Span {
|
||||
&self.root_span
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub fn new_span_context_with_id(trace_id: TraceId) -> Context {
|
||||
let id_gen = opentelemetry_sdk::trace::RandomIdGenerator::default();
|
||||
let span_id = id_gen.new_span_id();
|
||||
let span_context = SpanContext::new(
|
||||
trace_id,
|
||||
span_id,
|
||||
TraceFlags::SAMPLED,
|
||||
true,
|
||||
Default::default(),
|
||||
);
|
||||
|
||||
Context::current().with_remote_span_context(span_context)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub fn extract_trace_id_from_tracing_cx() -> TraceId {
|
||||
let cx = tracing::Span::current().context();
|
||||
let binding = cx.span();
|
||||
let trace_id = binding.span_context().trace_id();
|
||||
trace_id
|
||||
}
|
||||
@@ -0,0 +1,186 @@
|
||||
pub mod compact_id_generator;
|
||||
pub mod context;
|
||||
mod trace_id_format;
|
||||
|
||||
use tracing::{Level, info};
|
||||
use tracing_subscriber::filter::Directive;
|
||||
use tracing_subscriber::fmt;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
use crate::logging::default_tracing_env_filter;
|
||||
use crate::logging::error::TracingError;
|
||||
use crate::opentelemetry::compact_id_generator::Compact13BytesIdGenerator;
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
use opentelemetry::{KeyValue, global};
|
||||
use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
|
||||
use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig;
|
||||
use opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
|
||||
use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider};
|
||||
use opentelemetry_sdk::trace::SdkTracerProvider;
|
||||
use opentelemetry_sdk::{Resource, trace::Sampler};
|
||||
use opentelemetry_semantic_conventions::SCHEMA_URL;
|
||||
use opentelemetry_semantic_conventions::resource::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_VERSION};
|
||||
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
|
||||
use tracing_subscriber::fmt::format::FmtSpan;
|
||||
|
||||
pub struct TracerProviderGuard(Option<SdkTracerProvider>);
|
||||
|
||||
impl Drop for TracerProviderGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Some(tracer_provider) = self.0.take() {
|
||||
// Ensure all spans are flushed before exit
|
||||
if let Err(e) = tracer_provider.shutdown() {
|
||||
eprintln!("Error shutting down tracer provider: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn granual_filtered_env() -> Result<tracing_subscriber::filter::EnvFilter, TracingError>
|
||||
{
|
||||
fn directive_checked(directive: impl Into<String>) -> Result<Directive, TracingError> {
|
||||
directive.into().parse().map_err(From::from)
|
||||
}
|
||||
|
||||
let mut filter = default_tracing_env_filter();
|
||||
|
||||
// these crates are more granularly filtered
|
||||
let filter_crates = ["defguard_wireguard_rs"];
|
||||
for crate_name in filter_crates {
|
||||
filter = filter.add_directive(directive_checked(format!("{crate_name}=warn"))?);
|
||||
}
|
||||
Ok(filter)
|
||||
}
|
||||
|
||||
pub fn setup_tracing_logger(service_name: String) -> Result<TracerProviderGuard, TracingError> {
|
||||
if tracing::dispatcher::has_been_set() {
|
||||
// It shouldn't be - this is really checking that it is torn down between async command executions
|
||||
return Err(TracingError::TracingLoggerAlreadyInitialised);
|
||||
}
|
||||
|
||||
// define ingestion points
|
||||
let endpoint = std::env::var("SIGNOZ_ENDPOINT").expect("SIGNOZ_ENDPOINT not set");
|
||||
let key = std::env::var("SIGNOZ_INGESTION_KEY").expect("SIGNOZ_INGESTION_KEY not set");
|
||||
let mut metadata = MetadataMap::new();
|
||||
metadata.insert(
|
||||
"signoz-ingestion-key",
|
||||
key.parse().expect("Could not parse signoz ingestion key"),
|
||||
);
|
||||
|
||||
// Build resources
|
||||
let resource = build_resource(&service_name);
|
||||
|
||||
// Initialize tracer and meter providers
|
||||
let tracer_provider = init_tracer_provider(&endpoint, metadata.clone(), resource.clone())?;
|
||||
let meter_provider = init_meter_provider(&endpoint, metadata.clone(), resource.clone())?;
|
||||
|
||||
// Bridge tracing and opentelemetry
|
||||
let tracer = tracer_provider.tracer("otel-subscriber");
|
||||
let fmt_layer = fmt::layer()
|
||||
.json()
|
||||
.with_writer(std::io::stderr)
|
||||
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
|
||||
.with_span_list(false)
|
||||
.with_current_span(true)
|
||||
.event_format(trace_id_format::TraceIdFormat);
|
||||
|
||||
let registry = tracing_subscriber::registry()
|
||||
.with(fmt_layer)
|
||||
.with(granual_filtered_env()?)
|
||||
.with(tracing_subscriber::filter::LevelFilter::from_level(
|
||||
Level::INFO,
|
||||
))
|
||||
.with(MetricsLayer::new(meter_provider.clone()))
|
||||
.with(OpenTelemetryLayer::new(tracer));
|
||||
|
||||
registry
|
||||
.try_init()
|
||||
.map_err(TracingError::TracingTryInitError)?;
|
||||
|
||||
global::set_tracer_provider(tracer_provider.clone());
|
||||
global::set_meter_provider(meter_provider.clone());
|
||||
|
||||
info!("Tracing initialized with service name: {}", service_name);
|
||||
|
||||
Ok(TracerProviderGuard(Some(tracer_provider)))
|
||||
}
|
||||
|
||||
fn build_resource(service_name: &str) -> Resource {
|
||||
Resource::builder()
|
||||
.with_service_name(service_name.to_string())
|
||||
.with_schema_url(
|
||||
[
|
||||
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
|
||||
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
|
||||
],
|
||||
SCHEMA_URL,
|
||||
)
|
||||
.build()
|
||||
}
|
||||
|
||||
fn init_tracer_provider(
|
||||
endpoint: &str,
|
||||
metadata: MetadataMap,
|
||||
resource: Resource,
|
||||
) -> Result<SdkTracerProvider, TracingError> {
|
||||
let mut exporter_builder = opentelemetry_otlp::SpanExporter::builder()
|
||||
.with_tonic()
|
||||
.with_metadata(metadata)
|
||||
.with_endpoint(endpoint);
|
||||
|
||||
if endpoint.starts_with("https://") {
|
||||
exporter_builder =
|
||||
exporter_builder.with_tls_config(ClientTlsConfig::new().with_enabled_roots());
|
||||
}
|
||||
|
||||
let exporter = exporter_builder.build()?;
|
||||
|
||||
let tracer = SdkTracerProvider::builder()
|
||||
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
|
||||
1.0,
|
||||
))))
|
||||
.with_id_generator(Compact13BytesIdGenerator)
|
||||
.with_resource(resource)
|
||||
.with_batch_exporter(exporter)
|
||||
.build();
|
||||
|
||||
global::set_tracer_provider(tracer.clone());
|
||||
Ok(tracer)
|
||||
}
|
||||
|
||||
fn init_meter_provider(
|
||||
endpoint: &str,
|
||||
metadata: MetadataMap,
|
||||
resource: Resource,
|
||||
) -> Result<SdkMeterProvider, TracingError> {
|
||||
let mut exporter_builder = opentelemetry_otlp::MetricExporter::builder()
|
||||
.with_tonic()
|
||||
.with_metadata(metadata)
|
||||
.with_endpoint(endpoint)
|
||||
.with_temporality(opentelemetry_sdk::metrics::Temporality::default());
|
||||
|
||||
if endpoint.starts_with("https://") {
|
||||
exporter_builder =
|
||||
exporter_builder.with_tls_config(ClientTlsConfig::new().with_enabled_roots());
|
||||
}
|
||||
|
||||
let exporter = exporter_builder.build()?;
|
||||
|
||||
let reader = PeriodicReader::builder(exporter)
|
||||
.with_interval(std::time::Duration::from_secs(30))
|
||||
.build();
|
||||
|
||||
let stdout_reader =
|
||||
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
|
||||
|
||||
let meter_provider = MeterProviderBuilder::default()
|
||||
.with_resource(resource)
|
||||
.with_reader(reader)
|
||||
.with_reader(stdout_reader)
|
||||
.build();
|
||||
|
||||
global::set_meter_provider(meter_provider.clone());
|
||||
|
||||
Ok(meter_provider)
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
use chrono::Utc;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use opentelemetry::{SpanId, TraceId};
|
||||
use serde::ser::{SerializeMap, Serializer as _};
|
||||
use std::io;
|
||||
use tracing::{Event, Subscriber};
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
use tracing_serde::AsSerde;
|
||||
use tracing_serde::fields::AsMap;
|
||||
use tracing_subscriber::fmt::format::Writer;
|
||||
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields};
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
|
||||
pub struct WriteAdaptor<'a> {
|
||||
fmt_write: &'a mut dyn std::fmt::Write,
|
||||
}
|
||||
|
||||
impl<'a> WriteAdaptor<'a> {
|
||||
pub fn new(fmt_write: &'a mut dyn std::fmt::Write) -> Self {
|
||||
Self { fmt_write }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> io::Write for WriteAdaptor<'a> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let s =
|
||||
std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
|
||||
|
||||
self.fmt_write
|
||||
.write_str(s)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
|
||||
Ok(s.as_bytes().len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TraceIdFormat;
|
||||
|
||||
impl<S, N> FormatEvent<S, N> for TraceIdFormat
|
||||
where
|
||||
S: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
N: for<'writer> FormatFields<'writer> + 'static,
|
||||
{
|
||||
fn format_event(
|
||||
&self,
|
||||
_ctx: &FmtContext<'_, S, N>,
|
||||
mut writer: Writer<'_>,
|
||||
event: &Event<'_>,
|
||||
) -> std::fmt::Result
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
let meta = event.metadata();
|
||||
|
||||
let mut visit = || {
|
||||
let mut serializer = serde_json::Serializer::new(WriteAdaptor::new(&mut writer));
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
serializer.serialize_entry("timestamp", &Utc::now().to_rfc3339())?;
|
||||
serializer.serialize_entry("level", &meta.level().as_serde())?;
|
||||
serializer.serialize_entry("fields", &event.field_map())?;
|
||||
serializer.serialize_entry("target", meta.target())?;
|
||||
|
||||
let current_span = tracing::Span::current();
|
||||
let context = current_span.context();
|
||||
let span_ref = context.span();
|
||||
let span_context = span_ref.span_context();
|
||||
|
||||
let trace_id = span_context.trace_id();
|
||||
if trace_id != TraceId::INVALID {
|
||||
serializer.serialize_entry("trace_id", &trace_id.to_string())?;
|
||||
|
||||
let span_id = span_context.span_id();
|
||||
if span_id != SpanId::INVALID {
|
||||
serializer.serialize_entry("span_id", &span_id.to_string())?;
|
||||
}
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
};
|
||||
|
||||
visit().map_err(|_| std::fmt::Error)?;
|
||||
writeln!(writer)
|
||||
}
|
||||
}
|
||||
@@ -126,6 +126,7 @@ cli = ["clap", "comfy-table"]
|
||||
fs-credentials-storage = ["nym-credential-storage/persistent-storage"]
|
||||
fs-surb-storage = ["nym-client-core-surb-storage/fs-surb-storage"]
|
||||
fs-gateways-storage = ["nym-client-core-gateways-storage/fs-gateways-storage"]
|
||||
otel = ["nym-sphinx/otel"]
|
||||
wasm = ["nym-gateway-client/wasm"]
|
||||
metrics-server = []
|
||||
|
||||
|
||||
@@ -67,6 +67,7 @@ use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::instrument;
|
||||
use url::Url;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
@@ -121,6 +122,7 @@ pub struct ClientOutput {
|
||||
}
|
||||
|
||||
impl ClientOutput {
|
||||
#[instrument(name = "ClientOutput::register_receiver", skip_all)]
|
||||
pub fn register_receiver(
|
||||
&mut self,
|
||||
) -> Result<mpsc::UnboundedReceiver<Vec<ReconstructedMessage>>, ClientCoreError> {
|
||||
@@ -384,6 +386,7 @@ where
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
fn start_real_traffic_controller(
|
||||
controller_config: real_messages_control::Config,
|
||||
key_rotation_config: KeyRotationConfig,
|
||||
@@ -475,6 +478,7 @@ where
|
||||
|
||||
// buffer controlling all messages fetched from provider
|
||||
// required so that other components would be able to use them (say the websocket)
|
||||
#[instrument(skip_all)]
|
||||
fn start_received_messages_buffer_controller(
|
||||
local_encryption_keypair: Arc<x25519::KeyPair>,
|
||||
query_receiver: ReceivedBufferRequestReceiver,
|
||||
@@ -507,6 +511,7 @@ where
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
async fn start_gateway_client(
|
||||
config: &Config,
|
||||
initialisation_result: InitialisationResult,
|
||||
@@ -613,6 +618,7 @@ where
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all)]
|
||||
async fn setup_gateway_transceiver(
|
||||
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
|
||||
config: &Config,
|
||||
@@ -770,7 +776,7 @@ where
|
||||
shutdown_tracker,
|
||||
)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
fn start_mix_traffic_controller(
|
||||
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
|
||||
shutdown_tracker: &ShutdownTracker,
|
||||
@@ -887,6 +893,7 @@ where
|
||||
Ok(client.get_key_rotation_info().await?.into())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
|
||||
where
|
||||
S::ReplyStore: Send + Sync,
|
||||
|
||||
@@ -29,6 +29,8 @@ pub enum InputMessage {
|
||||
data: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
max_retransmissions: Option<u32>,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
},
|
||||
|
||||
/// Creates a message used for a duplex anonymous communication where the recipient
|
||||
@@ -45,6 +47,8 @@ pub enum InputMessage {
|
||||
reply_surbs: u32,
|
||||
lane: TransmissionLane,
|
||||
max_retransmissions: Option<u32>,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
},
|
||||
|
||||
/// Attempt to use our internally received and stored `ReplySurb` to send the message back
|
||||
@@ -90,12 +94,16 @@ impl InputMessage {
|
||||
data: Vec<u8>,
|
||||
lane: TransmissionLane,
|
||||
packet_type: Option<PacketType>,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) -> Self {
|
||||
let message = InputMessage::Regular {
|
||||
recipient,
|
||||
data,
|
||||
lane,
|
||||
max_retransmissions: None,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
};
|
||||
if let Some(packet_type) = packet_type {
|
||||
InputMessage::new_wrapper(message, packet_type)
|
||||
@@ -110,6 +118,8 @@ impl InputMessage {
|
||||
reply_surbs: u32,
|
||||
lane: TransmissionLane,
|
||||
packet_type: Option<PacketType>,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) -> Self {
|
||||
let message = InputMessage::Anonymous {
|
||||
recipient,
|
||||
@@ -117,6 +127,8 @@ impl InputMessage {
|
||||
reply_surbs,
|
||||
lane,
|
||||
max_retransmissions: None,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
};
|
||||
if let Some(packet_type) = packet_type {
|
||||
InputMessage::new_wrapper(message, packet_type)
|
||||
@@ -185,4 +197,14 @@ impl InputMessage {
|
||||
self.set_max_retransmissions(max_retransmissions);
|
||||
self
|
||||
}
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
pub fn trace_id(&self) -> Option<[u8; 12]> {
|
||||
match self {
|
||||
InputMessage::Regular { trace_id, .. } => *trace_id,
|
||||
InputMessage::Anonymous { trace_id, .. } => *trace_id,
|
||||
InputMessage::Premade { .. } | InputMessage::Reply { .. } => None,
|
||||
InputMessage::MessageWrapper { message, .. } => message.trace_id(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+55
-1
@@ -70,6 +70,7 @@ where
|
||||
.send_reply(recipient_tag, data, lane, max_retransmissions);
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_plain_message(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
@@ -77,16 +78,27 @@ where
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
max_retransmissions: Option<u32>,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) {
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
.try_send_plain_message(recipient, content, lane, packet_type, max_retransmissions)
|
||||
.try_send_plain_message(
|
||||
recipient,
|
||||
content,
|
||||
lane,
|
||||
packet_type,
|
||||
max_retransmissions,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("failed to send a plain message - {err}")
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_repliable_message(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
@@ -95,6 +107,8 @@ where
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
max_retransmissions: Option<u32>,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) {
|
||||
if let Err(err) = self
|
||||
.message_handler
|
||||
@@ -105,6 +119,8 @@ where
|
||||
lane,
|
||||
packet_type,
|
||||
max_retransmissions,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -113,20 +129,36 @@ where
|
||||
}
|
||||
|
||||
#[allow(clippy::panic)]
|
||||
#[instrument(skip_all)]
|
||||
async fn on_input_message(&mut self, msg: InputMessage) {
|
||||
#[cfg(feature = "otel")]
|
||||
let trace_id = msg.trace_id();
|
||||
#[cfg(feature = "otel")]
|
||||
if let Some(tid) = trace_id {
|
||||
tracing::info!("Processing input message with trace_id: {:?}", tid);
|
||||
}
|
||||
|
||||
match msg {
|
||||
InputMessage::Regular {
|
||||
recipient,
|
||||
data,
|
||||
lane,
|
||||
max_retransmissions,
|
||||
..
|
||||
} => {
|
||||
#[cfg(feature = "otel")]
|
||||
info!(
|
||||
"Handling regular input message with trace_id: {:?}",
|
||||
trace_id
|
||||
);
|
||||
self.handle_plain_message(
|
||||
recipient,
|
||||
data,
|
||||
lane,
|
||||
PacketType::Mix,
|
||||
max_retransmissions,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -136,7 +168,13 @@ where
|
||||
reply_surbs,
|
||||
lane,
|
||||
max_retransmissions,
|
||||
..
|
||||
} => {
|
||||
#[cfg(feature = "otel")]
|
||||
tracing::info!(
|
||||
"Handling anonymous input message with trace_id: {:?}",
|
||||
trace_id
|
||||
);
|
||||
self.handle_repliable_message(
|
||||
recipient,
|
||||
data,
|
||||
@@ -144,6 +182,8 @@ where
|
||||
lane,
|
||||
PacketType::Mix,
|
||||
max_retransmissions,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -153,6 +193,8 @@ where
|
||||
lane,
|
||||
max_retransmissions,
|
||||
} => {
|
||||
#[cfg(feature = "otel")]
|
||||
info!("Handling reply input message with trace_id: {:?}", trace_id);
|
||||
self.handle_reply(recipient_tag, data, lane, max_retransmissions)
|
||||
.await;
|
||||
}
|
||||
@@ -166,13 +208,21 @@ where
|
||||
data,
|
||||
lane,
|
||||
max_retransmissions,
|
||||
..
|
||||
} => {
|
||||
#[cfg(feature = "otel")]
|
||||
tracing::info!(
|
||||
"Handling regular input message with trace_id: {:?}",
|
||||
trace_id
|
||||
);
|
||||
self.handle_plain_message(
|
||||
recipient,
|
||||
data,
|
||||
lane,
|
||||
packet_type,
|
||||
max_retransmissions,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -182,6 +232,7 @@ where
|
||||
reply_surbs,
|
||||
lane,
|
||||
max_retransmissions,
|
||||
..
|
||||
} => {
|
||||
self.handle_repliable_message(
|
||||
recipient,
|
||||
@@ -190,6 +241,8 @@ where
|
||||
lane,
|
||||
packet_type,
|
||||
max_retransmissions,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -213,6 +266,7 @@ where
|
||||
};
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
|
||||
debug!("Started InputMessageListener with graceful shutdown support");
|
||||
|
||||
|
||||
+7
-1
@@ -60,7 +60,13 @@ where
|
||||
|
||||
// TODO: Figure out retransmission packet type signaling
|
||||
self.message_handler
|
||||
.try_prepare_single_chunk_for_sending(packet_recipient, chunk_data, packet_type)
|
||||
.try_prepare_single_chunk_for_sending(
|
||||
packet_recipient,
|
||||
chunk_data,
|
||||
packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
None
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
|
||||
// TODO: move that error elsewhere since it seems to be contaminating different files
|
||||
#[derive(Debug, Error)]
|
||||
@@ -476,6 +476,7 @@ where
|
||||
self.forward_messages(msgs, lane).await;
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn try_send_plain_message(
|
||||
&mut self,
|
||||
recipient: Recipient,
|
||||
@@ -483,6 +484,8 @@ where
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
max_retransmissions: Option<u32>,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) -> Result<(), PreparationError> {
|
||||
let message = NymMessage::new_plain(message);
|
||||
self.try_split_and_send_non_reply_message(
|
||||
@@ -491,10 +494,13 @@ where
|
||||
lane,
|
||||
packet_type,
|
||||
max_retransmissions,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn try_split_and_send_non_reply_message(
|
||||
&mut self,
|
||||
message: NymMessage,
|
||||
@@ -502,6 +508,8 @@ where
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
max_retransmissions: Option<u32>,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) -> Result<(), PreparationError> {
|
||||
debug!("Sending non-reply message with packet type {packet_type}");
|
||||
// TODO: I really dislike existence of this assertion, it implies code has to be re-organised
|
||||
@@ -534,6 +542,8 @@ where
|
||||
&self.config.ack_key,
|
||||
&recipient,
|
||||
packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)?;
|
||||
|
||||
let real_message = RealMessage::new(
|
||||
@@ -585,6 +595,8 @@ where
|
||||
TransmissionLane::AdditionalReplySurbs,
|
||||
packet_type,
|
||||
max_retransmissions,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -602,6 +614,8 @@ where
|
||||
lane: TransmissionLane,
|
||||
packet_type: PacketType,
|
||||
max_retransmissions: Option<u32>,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) -> Result<(), SurbWrappedPreparationError> {
|
||||
debug!("Sending message with reply SURBs with packet type {packet_type}");
|
||||
let sender_tag = self.get_or_create_sender_tag(&recipient);
|
||||
@@ -625,6 +639,8 @@ where
|
||||
lane,
|
||||
packet_type,
|
||||
max_retransmissions,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -639,6 +655,8 @@ where
|
||||
recipient: Recipient,
|
||||
chunk: Fragment,
|
||||
packet_type: PacketType,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) -> Result<PreparedFragment, PreparationError> {
|
||||
debug!("Sending single chunk with packet type {packet_type}");
|
||||
let topology_permit = self.topology_access.get_read_permit().await;
|
||||
@@ -650,6 +668,8 @@ where
|
||||
&self.config.ack_key,
|
||||
&recipient,
|
||||
packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)?;
|
||||
|
||||
Ok(prepared_fragment)
|
||||
|
||||
@@ -80,6 +80,8 @@ impl StatisticsControl {
|
||||
stats_report.into(),
|
||||
TransmissionLane::General,
|
||||
None,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
);
|
||||
if let Err(err) = self.report_tx.send(report_message).await {
|
||||
tracing::error!("Failed to report client stats: {err:?}");
|
||||
|
||||
@@ -1043,6 +1043,12 @@ impl<C, St> GatewayClient<C, St> {
|
||||
}
|
||||
|
||||
// Note: this requires prior authentication
|
||||
#[instrument(skip_all,
|
||||
fields(
|
||||
gateway = %self.gateway_identity,
|
||||
gateway_address = %self.gateway_address
|
||||
)
|
||||
)]
|
||||
pub fn start_listening_for_mixnet_messages(&mut self) -> Result<(), GatewayClientError> {
|
||||
if !self.authenticated {
|
||||
return Err(GatewayClientError::NotAuthenticated);
|
||||
|
||||
@@ -267,6 +267,7 @@ impl Client {
|
||||
}
|
||||
|
||||
impl SendWithoutResponse for Client {
|
||||
#[instrument(skip(self, packet))]
|
||||
fn send_without_response(&self, packet: MixPacket) -> io::Result<()> {
|
||||
let address = packet.next_hop_address();
|
||||
trace!("Sending packet to {address}");
|
||||
|
||||
@@ -19,11 +19,11 @@ serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
strum = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true, features = ["log"] }
|
||||
time = { workspace = true }
|
||||
subtle = { workspace = true }
|
||||
zeroize = { workspace = true }
|
||||
|
||||
nym-bin-common = { path = "../bin-common" }
|
||||
nym-crypto = { path = "../crypto", features = ["aead", "hashing"] }
|
||||
nym-pemstore = { path = "../pemstore" }
|
||||
nym-sphinx = { path = "../nymsphinx" }
|
||||
@@ -34,6 +34,11 @@ nym-task = { path = "../task" }
|
||||
nym-credentials = { path = "../credentials" }
|
||||
nym-credentials-interface = { path = "../credentials-interface" }
|
||||
|
||||
opentelemetry = { workspace = true, features = ["trace"], optional = true }
|
||||
opentelemetry_sdk = { workspace = true, optional = true }
|
||||
tracing-opentelemetry = { workspace = true, optional = true }
|
||||
tracing = { workspace = true, features = ["std", "attributes", "tracing-attributes"] }
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies.tokio]
|
||||
workspace = true
|
||||
features = ["time"]
|
||||
@@ -51,3 +56,12 @@ anyhow = { workspace = true }
|
||||
nym-compact-ecash = { path = "../nym_offline_compact_ecash" } # we need specific imports in tests
|
||||
nym-test-utils = { path = "../test-utils" }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
otel = [
|
||||
"nym-bin-common/otel",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"tracing-opentelemetry",
|
||||
]
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use crate::{AuthenticationFailure, GatewayRequestsError, SharedGatewayKey};
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::iter;
|
||||
use std::time::Duration;
|
||||
use subtle::ConstantTimeEq;
|
||||
@@ -16,6 +17,9 @@ pub struct AuthenticateRequest {
|
||||
pub content: AuthenticateRequestContent,
|
||||
|
||||
pub request_signature: ed25519::Signature,
|
||||
|
||||
#[serde(default)]
|
||||
pub otel_context: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
impl AuthenticateRequest {
|
||||
@@ -23,6 +27,7 @@ impl AuthenticateRequest {
|
||||
protocol_version: u8,
|
||||
shared_key: &SharedGatewayKey,
|
||||
identity_keys: &ed25519::KeyPair,
|
||||
otel_context: Option<HashMap<String, String>>,
|
||||
) -> Result<AuthenticateRequest, GatewayRequestsError> {
|
||||
let content = AuthenticateRequestContent::new(
|
||||
protocol_version,
|
||||
@@ -35,6 +40,7 @@ impl AuthenticateRequest {
|
||||
Ok(AuthenticateRequest {
|
||||
content,
|
||||
request_signature,
|
||||
otel_context,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -8,12 +8,16 @@ use crate::{
|
||||
AUTHENTICATE_V2_PROTOCOL_VERSION, CREDENTIAL_UPDATE_V2_PROTOCOL_VERSION,
|
||||
INITIAL_PROTOCOL_VERSION,
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
use nym_bin_common::opentelemetry::context::ContextCarrier;
|
||||
use nym_credentials_interface::CredentialSpendingData;
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use nym_statistics_common::types::SessionType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use tracing::{instrument, warn};
|
||||
use tungstenite::Message;
|
||||
|
||||
pub mod authenticate;
|
||||
@@ -76,6 +80,10 @@ pub enum ClientControlRequest {
|
||||
address: String,
|
||||
enc_address: String,
|
||||
iv: String,
|
||||
/// this is a trace id that is used in testing and performance verification
|
||||
/// in mainnet, this will always be set to None
|
||||
#[serde(default)]
|
||||
otel_context: Option<HashMap<String, String>>,
|
||||
},
|
||||
|
||||
AuthenticateV2(Box<AuthenticateRequest>),
|
||||
@@ -127,14 +135,25 @@ impl ClientControlRequest {
|
||||
let nonce = shared_key.random_nonce_or_iv();
|
||||
let ciphertext = shared_key.encrypt_naive(address.as_bytes_ref(), Some(&nonce))?;
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
let context_carrier = {
|
||||
let context = opentelemetry::Context::current();
|
||||
ContextCarrier::new_with_current_context(context).into_map()
|
||||
};
|
||||
|
||||
Ok(ClientControlRequest::Authenticate {
|
||||
protocol_version,
|
||||
address: address.as_base58_string(),
|
||||
enc_address: bs58::encode(&ciphertext).into_string(),
|
||||
iv: bs58::encode(&nonce).into_string(),
|
||||
#[cfg(feature = "otel")]
|
||||
otel_context: Some(context_carrier),
|
||||
#[cfg(not(feature = "otel"))]
|
||||
otel_context: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn new_authenticate_v2(
|
||||
shared_key: &SharedGatewayKey,
|
||||
identity_keys: &ed25519::KeyPair,
|
||||
@@ -142,8 +161,25 @@ impl ClientControlRequest {
|
||||
// if we're using v2 authentication, we must announce at least that protocol version
|
||||
let protocol_version = AUTHENTICATE_V2_PROTOCOL_VERSION;
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
let context_carrier = {
|
||||
use nym_bin_common::opentelemetry::context::extract_trace_id_from_tracing_cx;
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
let current_span = tracing::Span::current();
|
||||
let otel_context = current_span.context();
|
||||
ContextCarrier::new_with_current_context(otel_context).into_map()
|
||||
};
|
||||
#[cfg(not(feature = "otel"))]
|
||||
let context_carrier: HashMap<String, String> = HashMap::new();
|
||||
|
||||
Ok(ClientControlRequest::AuthenticateV2(Box::new(
|
||||
AuthenticateRequest::new(protocol_version, shared_key, identity_keys)?,
|
||||
AuthenticateRequest::new(
|
||||
protocol_version,
|
||||
shared_key,
|
||||
identity_keys,
|
||||
Some(context_carrier),
|
||||
)?,
|
||||
)))
|
||||
}
|
||||
|
||||
|
||||
@@ -11,9 +11,11 @@ license.workspace = true
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[features]
|
||||
# TODO: Remove otel from default before release
|
||||
default=["tunneling"]
|
||||
tunneling=[]
|
||||
network-defaults = ["dep:nym-network-defaults"]
|
||||
otel = ["nym-bin-common/otel", "opentelemetry", "opentelemetry_sdk"]
|
||||
debug-inventory = ["nym-http-api-client-macro/debug-inventory"]
|
||||
|
||||
[dependencies]
|
||||
@@ -24,6 +26,8 @@ reqwest = { workspace = true, features = ["json", "gzip", "deflate", "brotli", "
|
||||
http.workspace = true
|
||||
url = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
opentelemetry = { workspace = true, optional = true }
|
||||
opentelemetry_sdk = { workspace = true, optional = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
serde_yaml = { workspace = true}
|
||||
@@ -53,4 +57,3 @@ features = ["tokio"]
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["rt", "macros"] }
|
||||
|
||||
|
||||
@@ -986,6 +986,21 @@ impl ApiClientCore for Client {
|
||||
|
||||
self.apply_hosts_to_req(&mut req);
|
||||
|
||||
// if opentelemetry is activated add the current trace context to the request
|
||||
#[cfg(feature = "otel")]
|
||||
{
|
||||
use nym_bin_common::opentelemetry::context::ContextCarrier;
|
||||
use opentelemetry::Context;
|
||||
|
||||
let carrier = ContextCarrier::new_with_current_context(Context::current());
|
||||
|
||||
if let Some(traceparent) = carrier.extract_traceparent() {
|
||||
if let Ok(header_value) = HeaderValue::from_str(&traceparent) {
|
||||
req.headers_mut().insert("traceparent", header_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut rb = RequestBuilder::from_parts(self.reqwest_client.clone(), req);
|
||||
|
||||
rb = rb
|
||||
|
||||
@@ -18,6 +18,7 @@ bytes = { workspace = true, optional = true }
|
||||
colored = { workspace = true, optional = true }
|
||||
futures = { workspace = true, optional = true }
|
||||
mime = { workspace = true, optional = true }
|
||||
nym-bin-common = { path = "../bin-common" }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
serde_yaml = { workspace = true, optional = true }
|
||||
@@ -49,6 +50,8 @@ middleware = [
|
||||
"zeroize"
|
||||
]
|
||||
|
||||
otel = ["nym-bin-common/otel"]
|
||||
|
||||
utoipa = ["dep:utoipa"]
|
||||
|
||||
[lints]
|
||||
|
||||
@@ -56,6 +56,16 @@ async fn log_request(
|
||||
|
||||
let host = header_map(request.headers().get(HOST), "Unknown Host".to_string());
|
||||
|
||||
// Extract traceparent from headers if it exists
|
||||
#[cfg(feature = "otel")]
|
||||
let traceparent = request
|
||||
.headers()
|
||||
.get("traceparent")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
#[cfg(not(feature = "otel"))]
|
||||
let traceparent: Option<String> = None;
|
||||
|
||||
let start = Instant::now();
|
||||
// run request through all middleware, incl. extractors
|
||||
let res = next.run(request).await;
|
||||
@@ -82,10 +92,10 @@ async fn log_request(
|
||||
|
||||
match level {
|
||||
LogLevel::Debug => debug!(
|
||||
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent}"
|
||||
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent} traceparent: {traceparent:?}",
|
||||
),
|
||||
LogLevel::Info => info!(
|
||||
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent}"
|
||||
"[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent} traceparent: {traceparent:?}"
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@@ -30,3 +30,7 @@ workspace = true
|
||||
## wasm-only dependencies
|
||||
[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-utils]
|
||||
path = "../wasm/utils"
|
||||
|
||||
[ features ]
|
||||
default = []
|
||||
otel = ["nym-sphinx/otel"]
|
||||
@@ -231,6 +231,8 @@ where
|
||||
&address,
|
||||
&address,
|
||||
PacketType::Mix,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
)?)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,12 +8,14 @@ license = { workspace = true }
|
||||
repository = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
sphinx-packet = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rand_distr = { workspace = true }
|
||||
rand_chacha = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
nym-bin-common = { path = "../bin-common" }
|
||||
nym-sphinx-acknowledgements = { path = "acknowledgements" }
|
||||
nym-sphinx-addressing = { path = "addressing" }
|
||||
nym-sphinx-anonymous-replies = { path = "anonymous-replies" }
|
||||
@@ -55,3 +57,11 @@ outfox = [
|
||||
"nym-sphinx-params/outfox",
|
||||
"nym-sphinx-types/outfox",
|
||||
]
|
||||
|
||||
otel = [
|
||||
"nym-bin-common/otel",
|
||||
"nym-sphinx-acknowledgements/otel",
|
||||
"nym-sphinx-addressing/otel",
|
||||
"nym-sphinx-cover/otel",
|
||||
"nym-sphinx-anonymous-replies/otel",
|
||||
]
|
||||
@@ -24,3 +24,4 @@ nym-topology = { path = "../../topology" }
|
||||
|
||||
[features]
|
||||
serde = ["dep:serde", "generic-array"]
|
||||
otel = ["nym-sphinx-addressing/otel"]
|
||||
|
||||
@@ -61,7 +61,10 @@ impl SurbAck {
|
||||
};
|
||||
|
||||
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
|
||||
#[cfg(not(feature = "otel"))]
|
||||
let destination = recipient.as_sphinx_destination();
|
||||
#[cfg(feature = "otel")]
|
||||
let destination = recipient.as_sphinx_destination(None);
|
||||
|
||||
let surb_ack_payload = prepare_identifier(rng, ack_key, marshaled_fragment_id);
|
||||
let packet_size = match packet_type {
|
||||
|
||||
@@ -8,14 +8,20 @@ license = { workspace = true }
|
||||
repository = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
nym-bin-common = { path = "../../bin-common", features = ["opentelemetry"] } # for trace id compression/decompression
|
||||
nym-crypto = { path = "../../crypto", features = ["asymmetric", "sphinx"] } # all addresses are expressed in terms on their crypto keys
|
||||
nym-sphinx-types = { path = "../types", features = ["sphinx"] } # we need to be able to refer to some types defined inside sphinx crate
|
||||
serde = { workspace = true } # implementing serialization/deserialization for some types, like `Recipient`
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
rand = { workspace = true }
|
||||
nym-crypto = { path = "../../crypto", features = ["rand"] }
|
||||
bincode = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
otel = ["nym-bin-common/otel"]
|
||||
@@ -153,12 +153,28 @@ impl Recipient {
|
||||
// TODO: Currently the `DestinationAddress` is equivalent to `ClientIdentity`, but perhaps
|
||||
// it shouldn't be? Maybe it should be (for example) H(`ClientIdentity || ClientEncryptionKey`)
|
||||
// instead? That is an open question.
|
||||
pub fn as_sphinx_destination(&self) -> Destination {
|
||||
pub fn as_sphinx_destination(
|
||||
&self,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>
|
||||
) -> Destination {
|
||||
#[cfg(feature = "otel")]
|
||||
use nym_bin_common::opentelemetry::compact_id_generator::decompress_trace_id;
|
||||
#[cfg(feature = "otel")]
|
||||
let trace_id_16 = if let Some(trace_id) = trace_id {
|
||||
decompress_trace_id(&trace_id)
|
||||
} else {
|
||||
decompress_trace_id(&[0u8; 12])
|
||||
};
|
||||
|
||||
// since the nym mix network differs slightly in design from loopix, we do not care
|
||||
// about "surb_id" field at all and just use the default value.
|
||||
Destination::new(
|
||||
self.client_identity.derive_destination_address(),
|
||||
Default::default(),
|
||||
#[cfg(not(feature = "otel"))]
|
||||
[0u8; 16],
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id_16
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -25,3 +25,7 @@ workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rand_chacha = { workspace = true }
|
||||
|
||||
[ features ]
|
||||
default = []
|
||||
otel = ["nym-sphinx-addressing/otel"]
|
||||
@@ -82,6 +82,9 @@ impl ReplySurb {
|
||||
topology.random_route_to_egress(rng, recipient.gateway())?
|
||||
};
|
||||
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
|
||||
#[cfg(feature = "otel")]
|
||||
let destination = recipient.as_sphinx_destination(None);
|
||||
#[cfg(not(feature = "otel"))]
|
||||
let destination = recipient.as_sphinx_destination();
|
||||
|
||||
let mut surb_material = SURBMaterial::new(route, delays, destination);
|
||||
|
||||
@@ -20,3 +20,6 @@ nym-sphinx-params = { path = "../params" }
|
||||
nym-sphinx-routing = { path = "../routing" }
|
||||
nym-sphinx-types = { path = "../types" }
|
||||
nym-topology = { path = "../../topology" }
|
||||
|
||||
[features]
|
||||
otel = ["nym-sphinx-addressing/otel"]
|
||||
@@ -125,7 +125,10 @@ where
|
||||
|
||||
let route = topology.random_route_to_egress(rng, full_address.gateway())?;
|
||||
let delays = nym_sphinx_routing::generate_hop_delays(average_packet_delay, route.len());
|
||||
#[cfg(not(feature = "otel"))]
|
||||
let destination = full_address.as_sphinx_destination();
|
||||
#[cfg(feature = "otel")]
|
||||
let destination = full_address.as_sphinx_destination(None);
|
||||
|
||||
let rotation_id = topology.current_key_rotation();
|
||||
let sphinx_key_rotation = SphinxKeyRotation::from(rotation_id);
|
||||
|
||||
@@ -11,8 +11,10 @@ repository = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
thiserror = { workspace = true }
|
||||
opentelemetry = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
nym-bin-common = { path = "../../bin-common" }
|
||||
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
|
||||
nym-sphinx-params = { path = "../params", features = ["sphinx", "outfox"] }
|
||||
nym-sphinx-forwarding = { path = "../forwarding" }
|
||||
@@ -21,3 +23,7 @@ nym-sphinx-acknowledgements = { path = "../acknowledgements" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
otel = ["nym-bin-common/otel"]
|
||||
@@ -2,6 +2,11 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::packet::FramedNymPacket;
|
||||
#[cfg(feature = "otel")]
|
||||
use nym_bin_common::opentelemetry::{
|
||||
compact_id_generator::decompress_trace_id, context::ManualContextPropagator,
|
||||
};
|
||||
|
||||
use nym_sphinx_acknowledgements::surb_ack::{SurbAck, SurbAckRecoveryError};
|
||||
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError};
|
||||
use nym_sphinx_forwarding::packet::MixPacket;
|
||||
@@ -14,7 +19,9 @@ use nym_sphinx_types::{
|
||||
};
|
||||
use std::fmt::Display;
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, error, info, trace};
|
||||
#[cfg(feature = "otel")]
|
||||
use tracing::warn_span;
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MixProcessingResultData {
|
||||
@@ -154,6 +161,7 @@ impl PartiallyUnwrappedPacket {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn finalise_unwrapping(self) -> Result<MixProcessingResult, PacketProcessingError> {
|
||||
let packet_size = self.received_data.packet_size();
|
||||
let packet_type = self.received_data.packet_type();
|
||||
@@ -236,6 +244,7 @@ fn perform_framed_packet_processing(
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
fn wrap_processed_sphinx_packet(
|
||||
packet: nym_sphinx_types::ProcessedPacket,
|
||||
packet_size: PacketSize,
|
||||
@@ -258,15 +267,38 @@ fn wrap_processed_sphinx_packet(
|
||||
// sphinx all together?
|
||||
ProcessedPacketData::FinalHop {
|
||||
destination,
|
||||
identifier: _,
|
||||
#[cfg(feature = "otel")]
|
||||
identifier,
|
||||
#[cfg(not(feature = "otel"))]
|
||||
identifier: _,
|
||||
payload,
|
||||
} => process_final_hop(
|
||||
destination,
|
||||
payload.recover_plaintext()?,
|
||||
packet_size,
|
||||
packet_type,
|
||||
key_rotation,
|
||||
),
|
||||
} => {
|
||||
// if we have a trace id in the destination, we log it for easier correlation later on
|
||||
#[cfg(feature = "otel")]
|
||||
let span = match identifier[0..12].try_into().map(|b: [u8; 12]| b) {
|
||||
Ok(trace_bytes) if !trace_bytes.iter().all(|b| *b == 0) => {
|
||||
let full_trace_id_bytes = decompress_trace_id(&trace_bytes);
|
||||
let full_trace_id =
|
||||
opentelemetry::trace::TraceId::from_bytes(full_trace_id_bytes);
|
||||
let context_propagator =
|
||||
ManualContextPropagator::new_from_tid("final_hop", full_trace_id);
|
||||
tracing::info_span!(parent: &context_propagator.root_span, "final_hop_processing", trace_id=%full_trace_id)
|
||||
}
|
||||
_ => {
|
||||
tracing::debug_span!("final_hop_processing")
|
||||
}
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
let _entered_span = span.enter();
|
||||
|
||||
process_final_hop(
|
||||
destination,
|
||||
payload.recover_plaintext()?,
|
||||
packet_size,
|
||||
packet_type,
|
||||
key_rotation,
|
||||
)
|
||||
}
|
||||
}?;
|
||||
|
||||
Ok(MixProcessingResult {
|
||||
@@ -312,6 +344,7 @@ fn wrap_processed_outfox_packet(
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
fn perform_final_processing(
|
||||
packet: NymProcessedPacket,
|
||||
packet_size: PacketSize,
|
||||
|
||||
@@ -163,19 +163,6 @@ pub trait FragmentPreparer {
|
||||
})
|
||||
}
|
||||
|
||||
/// Tries to convert this [`Fragment`] into a [`SphinxPacket`] that can be sent through the Nym mix-network,
|
||||
/// such that it contains required SURB-ACK and public component of the ephemeral key used to
|
||||
/// derive the shared key.
|
||||
/// Also all the data, apart from the said public component, is encrypted with an ephemeral shared key.
|
||||
/// This method can fail if the provided network topology is invalid.
|
||||
/// It returns total expected delay as well as the [`SphinxPacket`] (including first hop address)
|
||||
/// to be sent through the network.
|
||||
///
|
||||
/// The procedure is as follows:
|
||||
/// For each fragment:
|
||||
/// - compute SURB_ACK
|
||||
/// - generate (x, g^x)
|
||||
/// - compute k = KDF(remote encryption key ^ x) this is equivalent to KDF( dh(remote, x) )
|
||||
/// - compute v_b = AES-128-CTR(k, serialized_fragment)
|
||||
/// - compute vk_b = g^x || v_b
|
||||
/// - compute sphinx_plaintext = SURB_ACK || g^x || v_b
|
||||
@@ -189,6 +176,8 @@ pub trait FragmentPreparer {
|
||||
packet_sender: &Recipient,
|
||||
packet_recipient: &Recipient,
|
||||
packet_type: PacketType,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) -> Result<PreparedFragment, NymTopologyError> {
|
||||
debug!("Preparing chunk for sending");
|
||||
// each plain or repliable packet (i.e. not a reply) attaches an ephemeral public key so that the recipient
|
||||
@@ -249,6 +238,16 @@ pub trait FragmentPreparer {
|
||||
topology.random_route_to_egress(&mut rng, destination)?
|
||||
};
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
let destination = packet_recipient.as_sphinx_destination(trace_id);
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
tracing::info!(
|
||||
"Packet destination with trace id: {:?}",
|
||||
&destination.identifier
|
||||
);
|
||||
|
||||
#[cfg(not(feature = "otel"))]
|
||||
let destination = packet_recipient.as_sphinx_destination();
|
||||
|
||||
// including set of delays
|
||||
@@ -274,6 +273,7 @@ pub trait FragmentPreparer {
|
||||
)?,
|
||||
};
|
||||
|
||||
// - compute k = KDF(remote encryption key ^ x) this is equivalent to KDF( dh(remote, x) )
|
||||
// from the previously constructed route extract the first hop
|
||||
let first_hop_address =
|
||||
NymNodeRoutingAddress::try_from(route.first().unwrap().address).unwrap();
|
||||
@@ -428,6 +428,8 @@ where
|
||||
ack_key: &AckKey,
|
||||
packet_recipient: &Recipient,
|
||||
packet_type: PacketType,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id: Option<[u8; 12]>,
|
||||
) -> Result<PreparedFragment, NymTopologyError> {
|
||||
let sender = self.sender_address;
|
||||
|
||||
@@ -439,6 +441,8 @@ where
|
||||
&sender,
|
||||
packet_recipient,
|
||||
packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
trace_id,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ repository = { workspace = true }
|
||||
[dependencies]
|
||||
sphinx-packet = { workspace = true, optional = true }
|
||||
nym-outfox = { path = "../../../nym-outfox", optional = true }
|
||||
# TODO add optional
|
||||
nym-bin-common = { path = "../../../common/bin-common" }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
[features]
|
||||
|
||||
@@ -32,8 +32,6 @@ tracing.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
|
||||
# TEMP
|
||||
#nym-bin-common = { path = "../bin-common", features = ["basic_tracing"]}
|
||||
|
||||
|
||||
[build-dependencies]
|
||||
|
||||
@@ -37,3 +37,4 @@ nym-validator-client = { path = "../client-libs/validator-client" }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
otel = ["nym-sphinx/otel"]
|
||||
|
||||
@@ -350,6 +350,8 @@ impl SocksClient {
|
||||
self.config.connection_start_surbs,
|
||||
TransmissionLane::ConnectionId(self.connection_id),
|
||||
self.packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
);
|
||||
self.input_sender
|
||||
.send(input_message)
|
||||
@@ -373,6 +375,8 @@ impl SocksClient {
|
||||
msg.into_bytes(),
|
||||
TransmissionLane::ConnectionId(self.connection_id),
|
||||
self.packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
);
|
||||
self.input_sender
|
||||
.send(input_message)
|
||||
@@ -439,6 +443,8 @@ impl SocksClient {
|
||||
per_request_surbs,
|
||||
lane,
|
||||
packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
InputMessage::new_regular(
|
||||
@@ -446,6 +452,8 @@ impl SocksClient {
|
||||
provider_message.into_bytes(),
|
||||
lane,
|
||||
packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -29,6 +29,7 @@ x25519-dalek = { workspace = true, features = ["static_secrets"] }
|
||||
cosmwasm-std = { workspace = true }
|
||||
cosmrs = { workspace = true }
|
||||
|
||||
nym-bin-common = { path = "../../common/bin-common" }
|
||||
nym-validator-client = { path = "../../common/client-libs/validator-client" }
|
||||
nym-mixnet-contract-common = { path = "../../common/cosmwasm-smart-contracts/mixnet-contract" }
|
||||
nym-vesting-contract-common = { path = "../../common/cosmwasm-smart-contracts/vesting-contract" }
|
||||
|
||||
@@ -50,6 +50,7 @@ zeroize = { workspace = true }
|
||||
|
||||
# internal
|
||||
nym-api-requests = { path = "../nym-api/nym-api-requests" }
|
||||
nym-bin-common = { path = "../common/bin-common" }
|
||||
nym-credentials = { path = "../common/credentials" }
|
||||
nym-credentials-interface = { path = "../common/credentials-interface" }
|
||||
nym-credential-verification = { path = "../common/credential-verification" }
|
||||
@@ -82,9 +83,25 @@ nym-service-provider-requests-common = { path = "../common/service-provider-requ
|
||||
|
||||
|
||||
defguard_wireguard_rs = { workspace = true }
|
||||
opentelemetry = { workspace = true, optional = true }
|
||||
opentelemetry_sdk = { workspace = true, optional = true }
|
||||
tracing-opentelemetry = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
nym-gateway-storage = { path = "../common/gateway-storage", features = ["mock"] }
|
||||
nym-wireguard = { path = "../common/wireguard", features = ["mock"] }
|
||||
mock_instant = "0.6.0"
|
||||
time = { workspace = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
otel = [
|
||||
"nym-bin-common/otel",
|
||||
"nym-client-core/otel",
|
||||
"nym-gateway-requests/otel",
|
||||
"nym-sphinx/otel",
|
||||
"nym-sdk/otel",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"tracing-opentelemetry",
|
||||
]
|
||||
|
||||
@@ -14,6 +14,8 @@ use futures::{
|
||||
future::{FusedFuture, OptionFuture},
|
||||
FutureExt, StreamExt,
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
use nym_bin_common::opentelemetry::context::ManualContextPropagator;
|
||||
use nym_credential_verification::CredentialVerifier;
|
||||
use nym_credential_verification::{
|
||||
bandwidth_storage_manager::BandwidthStorageManager, ClientBandwidth,
|
||||
@@ -31,6 +33,8 @@ use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_statistics_common::{gateways::GatewaySessionEvent, types::SessionType};
|
||||
use nym_validator_client::coconut::EcashApiError;
|
||||
use rand::{random, CryptoRng, Rng};
|
||||
#[cfg(feature = "otel")]
|
||||
use std::collections::HashMap;
|
||||
use std::{process, time::Duration};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -147,6 +151,8 @@ pub(crate) struct AuthenticatedHandler<R, S> {
|
||||
// senders that are used to return the result of the ping to the handler requesting the ping.
|
||||
is_active_request_receiver: IsActiveRequestReceiver,
|
||||
is_active_ping_pending_reply: Option<(u64, IsActiveResultSender)>,
|
||||
#[cfg(feature = "otel")]
|
||||
pub otel_propagator: Option<ManualContextPropagator>,
|
||||
}
|
||||
|
||||
// explicitly remove handle from the global store upon being dropped
|
||||
@@ -189,6 +195,24 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
client_address: client.address.as_base58_string(),
|
||||
})?;
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
let manual_ctx_propagator = {
|
||||
let context = match client.otel_context {
|
||||
Some(ref ctx) => ctx.clone(),
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
let manual_ctx_propagator = if !context.is_empty() {
|
||||
Some(ManualContextPropagator::new(
|
||||
"upgrading_fresh_to_authenticated",
|
||||
context,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
manual_ctx_propagator
|
||||
};
|
||||
|
||||
let handler = AuthenticatedHandler {
|
||||
bandwidth_storage_manager: BandwidthStorageManager::new(
|
||||
Box::new(fresh.shared_state.storage.clone()),
|
||||
@@ -202,6 +226,8 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
mix_receiver,
|
||||
is_active_request_receiver,
|
||||
is_active_ping_pending_reply: None,
|
||||
#[cfg(feature = "otel")]
|
||||
otel_propagator: manual_ctx_propagator,
|
||||
};
|
||||
handler.send_metrics(GatewaySessionEvent::new_session_start(
|
||||
handler.client.address,
|
||||
@@ -227,7 +253,19 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `mix_packet`: packet received from the client that should get forwarded into the network.
|
||||
#[instrument(skip_all)]
|
||||
fn forward_packet(&self, mix_packet: MixPacket) {
|
||||
#[cfg(feature = "otel")]
|
||||
{
|
||||
let span = match &self.otel_propagator {
|
||||
Some(propagator) => {
|
||||
info_span!(parent: &propagator.root_span, "forwarding_mix_packet")
|
||||
}
|
||||
None => info_span!("forwarding_mix_packet_no_otel"),
|
||||
};
|
||||
let _enter = span.enter();
|
||||
}
|
||||
|
||||
if let Err(err) = self
|
||||
.inner
|
||||
.shared_state
|
||||
@@ -287,11 +325,26 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
&mut self,
|
||||
mix_packet: MixPacket,
|
||||
) -> Result<ServerResponse, RequestHandlingError> {
|
||||
trace!("forwarding sphinx packet");
|
||||
#[cfg(feature = "otel")]
|
||||
let span = {
|
||||
let span = match &self.otel_propagator {
|
||||
Some(propagator) => {
|
||||
info_span!(parent: &propagator.root_span, "handling_forward_sphinx")
|
||||
}
|
||||
None => debug_span!("handling_forward_sphinx_no_otel"),
|
||||
};
|
||||
span
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
let _enter = span.enter();
|
||||
|
||||
let required_bandwidth = mix_packet.packet().len() as i64;
|
||||
|
||||
let remaining_bandwidth = self
|
||||
.bandwidth_storage_manager
|
||||
.try_use_bandwidth(required_bandwidth)
|
||||
.in_current_span()
|
||||
.await?;
|
||||
self.forward_packet(mix_packet);
|
||||
|
||||
@@ -305,8 +358,22 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `bin_msg`: raw message to handle.
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_binary(&mut self, bin_msg: Vec<u8>) -> Message {
|
||||
trace!("binary request");
|
||||
#[cfg(feature = "otel")]
|
||||
let span = {
|
||||
let span = match &self.otel_propagator {
|
||||
Some(propagator) => {
|
||||
info_span!(parent: &propagator.root_span, "handling_binary_request")
|
||||
}
|
||||
None => info_span!("handling_binary_request_no_otel"),
|
||||
};
|
||||
span
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
let _enter = span.enter();
|
||||
|
||||
// this function decrypts the request and checks the MAC
|
||||
match BinaryRequest::try_from_encrypted_tagged_bytes(bin_msg, &self.client.shared_keys) {
|
||||
Err(e) => {
|
||||
@@ -316,9 +383,11 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
Ok(request) => match request {
|
||||
// currently only a single type exists
|
||||
BinaryRequest::ForwardSphinx { packet }
|
||||
| BinaryRequest::ForwardSphinxV2 { packet } => {
|
||||
self.handle_forward_sphinx(packet).await.into_ws_message()
|
||||
}
|
||||
| BinaryRequest::ForwardSphinxV2 { packet } => self
|
||||
.handle_forward_sphinx(packet)
|
||||
.in_current_span()
|
||||
.await
|
||||
.into_ws_message(),
|
||||
_ => RequestHandlingError::UnknownBinaryRequest.into_error_message(),
|
||||
},
|
||||
}
|
||||
@@ -379,6 +448,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
Ok(SensitiveServerResponse::KeyUpgradeAck {}.encrypt(&self.client.shared_keys)?)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_encrypted_text_request(
|
||||
&mut self,
|
||||
ciphertext: Vec<u8>,
|
||||
@@ -408,6 +478,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `raw_request`: raw message to handle.
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_text(&mut self, raw_request: String) -> Message
|
||||
where
|
||||
R: Rng + CryptoRng,
|
||||
@@ -552,6 +623,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_is_active_request(
|
||||
&mut self,
|
||||
reply_tx: IsActiveResultSender,
|
||||
@@ -582,20 +654,37 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
/// Simultaneously listens for incoming client requests, which realistically should only be
|
||||
/// binary requests to forward sphinx packets or increase bandwidth
|
||||
/// and for sphinx packets received from the mix network that should be sent back to the client.
|
||||
#[instrument(level = "debug", skip_all,
|
||||
fields(
|
||||
client = %self.client.address.as_base58_string()
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn listen_for_requests(mut self)
|
||||
where
|
||||
R: Rng + CryptoRng,
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
trace!("Started listening for ALL incoming requests...");
|
||||
|
||||
// Ping timeout future used to check if the client responded to our ping request
|
||||
let mut ping_timeout: OptionFuture<_> = None.into();
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
let from_client_span = {
|
||||
let span = match &self.otel_propagator {
|
||||
Some(propagator) => {
|
||||
info_span!(parent: &propagator.root_span, "authenticated_client_handler_listen")
|
||||
}
|
||||
None => tracing::debug_span!("authenticated_client_handler_listen_no_otel"),
|
||||
};
|
||||
span
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
let _enter = from_client_span.enter();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Received a request to ping the client to check if it's still active
|
||||
tx = self.is_active_request_receiver.next() => {
|
||||
tx = self.is_active_request_receiver.next().in_current_span() => {
|
||||
match tx {
|
||||
None => break,
|
||||
Some(reply_tx) => {
|
||||
@@ -609,10 +698,10 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
},
|
||||
// The ping timeout expired, meaning the client didn't respond to our ping request
|
||||
_ = &mut ping_timeout, if !ping_timeout.is_terminated() => {
|
||||
ping_timeout = None.into();
|
||||
self.handle_ping_timeout().await;
|
||||
ping_timeout = None.into();
|
||||
self.handle_ping_timeout().await;
|
||||
},
|
||||
socket_msg = self.inner.read_websocket_message() => {
|
||||
socket_msg = self.inner.read_websocket_message().in_current_span() => {
|
||||
let socket_msg = match socket_msg {
|
||||
None => break,
|
||||
Some(Ok(socket_msg)) => socket_msg,
|
||||
@@ -627,7 +716,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
}
|
||||
|
||||
if let Some(response) = self.handle_request(socket_msg).await {
|
||||
if let Err(err) = self.inner.send_websocket_message(response).await {
|
||||
if let Err(err) = self.inner.send_websocket_message(response).in_current_span().await {
|
||||
debug!(
|
||||
"Failed to send message over websocket: {err}. Assuming the connection is dead.",
|
||||
);
|
||||
@@ -635,7 +724,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
|
||||
}
|
||||
}
|
||||
},
|
||||
mix_messages = self.mix_receiver.next() => {
|
||||
mix_messages = self.mix_receiver.next().in_current_span() => {
|
||||
let mix_messages = match mix_messages {
|
||||
None => {
|
||||
debug!("mix receiver was closed! Assuming the connection is dead.");
|
||||
|
||||
@@ -13,6 +13,8 @@ use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
use nym_bin_common::opentelemetry::context::ManualContextPropagator;
|
||||
use nym_credentials_interface::AvailableBandwidth;
|
||||
use nym_crypto::aes::cipher::crypto_common::rand_core::RngCore;
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
@@ -34,6 +36,8 @@ use nym_node_metrics::events::MetricsEvent;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use nym_task::ShutdownToken;
|
||||
use rand::CryptoRng;
|
||||
#[cfg(feature = "otel")]
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
@@ -41,7 +45,9 @@ use time::OffsetDateTime;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::tungstenite::{protocol::Message, Error as WsError};
|
||||
use tracing::*;
|
||||
#[cfg(feature = "otel")]
|
||||
use tracing::info_span;
|
||||
use tracing::{debug, error, info, instrument, warn, Instrument};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum InitialAuthenticationError {
|
||||
@@ -163,6 +169,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
|
||||
/// Attempts to perform websocket handshake with the remote and upgrades the raw TCP socket
|
||||
/// to the framed WebSocket.
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn perform_websocket_handshake(&mut self) -> Result<(), WsError>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
@@ -186,6 +193,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `init_msg`: a client handshake init message which should contain its identity public key as well as an ephemeral key.
|
||||
#[instrument(skip_all)]
|
||||
async fn perform_registration_handshake(
|
||||
&mut self,
|
||||
init_msg: Vec<u8>,
|
||||
@@ -211,6 +219,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
}
|
||||
|
||||
/// Attempts to read websocket message from the associated socket.
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn read_websocket_message(&mut self) -> Option<Result<Message, WsError>>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
@@ -226,6 +235,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `msg`: WebSocket message to write back to the client.
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn send_websocket_message(
|
||||
&mut self,
|
||||
msg: impl Into<Message>,
|
||||
@@ -242,6 +252,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn send_error_response(
|
||||
&mut self,
|
||||
err: impl std::error::Error,
|
||||
@@ -269,6 +280,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
///
|
||||
/// * `shared_keys`: keys derived between the client and gateway.
|
||||
/// * `packets`: unwrapped packets that are to be pushed back to the client.
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn push_packets_to_client(
|
||||
&mut self,
|
||||
shared_keys: &SharedGatewayKey,
|
||||
@@ -326,6 +338,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
///
|
||||
/// * `client_address`: address of the client that is going to receive the messages.
|
||||
/// * `shared_keys`: shared keys derived between the client and the gateway used to encrypt and tag the messages.
|
||||
#[instrument(skip_all)]
|
||||
async fn push_stored_messages_to_client(
|
||||
&mut self,
|
||||
client_address: DestinationAddressBytes,
|
||||
@@ -632,6 +645,8 @@ impl<R, S> FreshHandler<R, S> {
|
||||
address,
|
||||
shared_keys.key,
|
||||
session_request_start,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
)),
|
||||
ServerResponse::Authenticate {
|
||||
protocol_version: Some(negotiated_protocol),
|
||||
@@ -641,9 +656,13 @@ impl<R, S> FreshHandler<R, S> {
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(
|
||||
address = %request.content.client_identity.derive_destination_address(),
|
||||
))]
|
||||
async fn handle_authenticate_v2(
|
||||
&mut self,
|
||||
request: Box<AuthenticateRequest>,
|
||||
#[cfg(feature = "otel")] otel_context: Option<HashMap<String, String>>,
|
||||
) -> Result<InitialAuthResult, InitialAuthenticationError>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
@@ -718,6 +737,8 @@ impl<R, S> FreshHandler<R, S> {
|
||||
address,
|
||||
shared_key.key,
|
||||
session_request_start,
|
||||
#[cfg(feature = "otel")]
|
||||
otel_context,
|
||||
)),
|
||||
ServerResponse::Authenticate {
|
||||
protocol_version: Some(negotiated_protocol),
|
||||
@@ -816,6 +837,8 @@ impl<R, S> FreshHandler<R, S> {
|
||||
remote_address,
|
||||
shared_keys,
|
||||
OffsetDateTime::now_utc(),
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
);
|
||||
|
||||
Ok(InitialAuthResult::new(
|
||||
@@ -846,6 +869,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn handle_initial_client_request(
|
||||
&mut self,
|
||||
request: ClientControlRequest,
|
||||
@@ -854,17 +878,58 @@ impl<R, S> FreshHandler<R, S> {
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
R: CryptoRng + RngCore + Send,
|
||||
{
|
||||
// we can handle stateless client requests without prior authentication, like `ClientControlRequest::SupportedProtocol`
|
||||
// extract and set up opentelemetry context if provided
|
||||
#[cfg(feature = "otel")]
|
||||
let (context_propagator, otel_ctx) =
|
||||
if let ClientControlRequest::AuthenticateV2(ref auth_req) = request {
|
||||
if let Some(otel_context) = &auth_req.otel_context {
|
||||
info!(
|
||||
"=== OpenTelemetry context provided in the request: {otel_context:?} ==="
|
||||
);
|
||||
(
|
||||
Some(ManualContextPropagator::new(
|
||||
"handling_initial_client_request_with_otel",
|
||||
otel_context.clone(),
|
||||
)),
|
||||
Some(otel_context.clone()),
|
||||
)
|
||||
} else {
|
||||
debug!("No OpenTelemetry context provided in the request");
|
||||
(None, None)
|
||||
}
|
||||
} else {
|
||||
debug!("No OpenTelemetry context provided in the request");
|
||||
(None, None)
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
let child_span = match context_propagator {
|
||||
Some(ref propagator) => {
|
||||
let span = info_span!(parent: &propagator.root_span, "=== Handling initial client request with otel context ===");
|
||||
span
|
||||
}
|
||||
None => {
|
||||
tracing::debug_span!("=== Handling initial client request without otel context ===")
|
||||
}
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
let _enter = child_span.enter();
|
||||
|
||||
let auth_result = match request {
|
||||
ClientControlRequest::Authenticate {
|
||||
protocol_version,
|
||||
address,
|
||||
enc_address,
|
||||
iv,
|
||||
otel_context: _,
|
||||
} => {
|
||||
self.handle_legacy_authenticate(protocol_version, address, enc_address, iv)
|
||||
.await
|
||||
}
|
||||
#[cfg(feature = "otel")]
|
||||
ClientControlRequest::AuthenticateV2(req) => {
|
||||
self.handle_authenticate_v2(req, otel_ctx).await
|
||||
}
|
||||
#[cfg(not(feature = "otel"))]
|
||||
ClientControlRequest::AuthenticateV2(req) => self.handle_authenticate_v2(req).await,
|
||||
ClientControlRequest::RegisterHandshakeInitRequest {
|
||||
protocol_version,
|
||||
@@ -889,7 +954,6 @@ impl<R, S> FreshHandler<R, S> {
|
||||
}
|
||||
other => debug!("authentication failure: {other}"),
|
||||
}
|
||||
|
||||
self.send_and_forget_error_response(&err).await;
|
||||
return Err(err);
|
||||
}
|
||||
@@ -912,9 +976,11 @@ impl<R, S> FreshHandler<R, S> {
|
||||
warn!("could not establish client details");
|
||||
return Err(InitialAuthenticationError::EmptyClientDetails);
|
||||
};
|
||||
|
||||
Ok(Some(client_details))
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn handle_until_authenticated_or_failure(
|
||||
mut self,
|
||||
) -> Option<AuthenticatedHandler<R, S>>
|
||||
@@ -953,7 +1019,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
registration_details.session_request_timestamp,
|
||||
);
|
||||
|
||||
return AuthenticatedHandler::upgrade(
|
||||
let auth_handle = AuthenticatedHandler::upgrade(
|
||||
self,
|
||||
registration_details,
|
||||
mix_receiver,
|
||||
@@ -962,10 +1028,12 @@ impl<R, S> FreshHandler<R, S> {
|
||||
.await
|
||||
.inspect_err(|err| error!("failed to upgrade client handler: {err}"))
|
||||
.ok();
|
||||
return auth_handle;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn wait_for_initial_message(
|
||||
&mut self,
|
||||
) -> Result<ClientControlRequest, InitialAuthenticationError>
|
||||
@@ -1016,6 +1084,7 @@ impl<R, S> FreshHandler<R, S> {
|
||||
.map_err(|_| InitialAuthenticationError::InvalidRequest)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn start_handling(self)
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
@@ -1025,10 +1094,10 @@ impl<R, S> FreshHandler<R, S> {
|
||||
let shutdown = self.shutdown.clone();
|
||||
tokio::select! {
|
||||
_ = shutdown.cancelled() => {
|
||||
trace!("received cancellation")
|
||||
tracing::trace!("received cancellation")
|
||||
}
|
||||
_ = super::handle_connection(self) => {
|
||||
debug!("finished connection handler for {remote}")
|
||||
_ = super::handle_connection(self).instrument(tracing::debug_span!("connection_handler", remote = %remote)) => {
|
||||
tracing::debug!("finished connection handler for {remote}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,10 +7,13 @@ use nym_gateway_requests::shared_key::SharedGatewayKey;
|
||||
use nym_gateway_requests::ServerResponse;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use rand::{CryptoRng, Rng};
|
||||
#[cfg(feature = "otel")]
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tracing::Instrument;
|
||||
use tracing::{debug, instrument, trace, warn};
|
||||
|
||||
pub(crate) use self::authenticated::AuthenticatedHandler;
|
||||
@@ -48,6 +51,8 @@ pub(crate) struct ClientDetails {
|
||||
// note, this does **NOT ALWAYS** indicate timestamp of when client connected
|
||||
// it is (for v2 auth) timestamp the client **signed** when it created the request
|
||||
pub(crate) session_request_timestamp: OffsetDateTime,
|
||||
#[cfg(feature = "otel")]
|
||||
pub(crate) otel_context: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
impl ClientDetails {
|
||||
@@ -56,12 +61,15 @@ impl ClientDetails {
|
||||
address: DestinationAddressBytes,
|
||||
shared_keys: SharedGatewayKey,
|
||||
session_request_timestamp: OffsetDateTime,
|
||||
#[cfg(feature = "otel")] otel_context: Option<HashMap<String, String>>,
|
||||
) -> Self {
|
||||
ClientDetails {
|
||||
address,
|
||||
id,
|
||||
shared_keys,
|
||||
session_request_timestamp,
|
||||
#[cfg(feature = "otel")]
|
||||
otel_context,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -92,7 +100,7 @@ impl InitialAuthResult {
|
||||
}
|
||||
|
||||
// imo there's no point in including the peer address in anything higher than debug
|
||||
#[instrument(level = "debug", skip_all, fields(peer = %handle.peer_address))]
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn handle_connection<R, S>(mut handle: FreshHandler<R, S>)
|
||||
where
|
||||
R: Rng + CryptoRng + Send,
|
||||
@@ -117,8 +125,25 @@ where
|
||||
|
||||
trace!("managed to perform websocket handshake!");
|
||||
|
||||
if let Some(auth_handle) = handle.handle_until_authenticated_or_failure().await {
|
||||
auth_handle.listen_for_requests().await
|
||||
if let Some(auth_handle) = handle.handle_until_authenticated_or_failure().in_current_span().await {
|
||||
#[cfg(feature = "otel")]
|
||||
{
|
||||
let from_client_span = {
|
||||
let parent = match auth_handle.otel_propagator.as_ref() {
|
||||
Some(propagator) => propagator.root_span(),
|
||||
None => &tracing::Span::current(), // fallback to current span if no propagator
|
||||
};
|
||||
tracing::info_span!(parent: parent, "listening for requests")
|
||||
};
|
||||
auth_handle
|
||||
.listen_for_requests()
|
||||
.instrument(from_client_span)
|
||||
.await
|
||||
}
|
||||
#[cfg(not(feature = "otel"))]
|
||||
{
|
||||
auth_handle.listen_for_requests().await;
|
||||
}
|
||||
}
|
||||
|
||||
trace!("the handler is done!");
|
||||
|
||||
@@ -53,6 +53,7 @@ impl Listener {
|
||||
)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
fn try_handle_accepted_connection(&self, accepted: io::Result<(TcpStream, SocketAddr)>) {
|
||||
match accepted {
|
||||
Ok((socket, remote_address)) => {
|
||||
@@ -90,7 +91,7 @@ impl Listener {
|
||||
let metrics_ref = handle.shared_state.metrics.clone();
|
||||
|
||||
// 4.1. handle all client requests until connection gets terminated
|
||||
handle.start_handling().await;
|
||||
handle.start_handling().in_current_span().await;
|
||||
|
||||
// 4.2. decrement the connection counter
|
||||
metrics_ref.network.disconnected_ingress_websocket_client();
|
||||
@@ -104,6 +105,7 @@ impl Listener {
|
||||
|
||||
// TODO: change the signature to pub(crate) async fn run(&self, handler: Handler)
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn run(&mut self) {
|
||||
info!("Starting websocket listener at {}", self.address);
|
||||
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
|
||||
@@ -122,7 +124,7 @@ impl Listener {
|
||||
trace!("client_handling::Listener: received shutdown");
|
||||
break
|
||||
}
|
||||
connection = tcp_listener.accept() => {
|
||||
connection = tcp_listener.accept().in_current_span() => {
|
||||
self.try_handle_accepted_connection(connection)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -977,6 +977,8 @@ fn create_input_message(
|
||||
response_packet,
|
||||
lane,
|
||||
packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
None,
|
||||
))
|
||||
} else {
|
||||
tracing::error!("No nym-address or sender tag provided");
|
||||
|
||||
+5
-1
@@ -95,7 +95,7 @@ nym-topology = { path = "../common/topology" }
|
||||
nym-api-requests = { path = "nym-api-requests" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-http-api-client = { path = "../common/http-api-client" }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "openapi", "basic_tracing"] }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "openapi"] }
|
||||
nym-node-tester-utils = { path = "../common/node-tester-utils" }
|
||||
nym-node-requests = { path = "../nym-node/nym-node-requests" }
|
||||
nym-types = { path = "../common/types" }
|
||||
@@ -110,6 +110,10 @@ nym-ecash-signer-check = { path = "../common/ecash-signer-check" }
|
||||
no-reward = []
|
||||
v2-performance = []
|
||||
generate-ts = ["ts-rs"]
|
||||
otel = [
|
||||
"nym-bin-common/otel",
|
||||
"nym-node-tester-utils/otel",
|
||||
]
|
||||
|
||||
[build-dependencies]
|
||||
anyhow = { workspace = true }
|
||||
|
||||
+1
-1
@@ -31,7 +31,7 @@ async fn main() -> Result<(), anyhow::Error> {
|
||||
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
|
||||
console_subscriber::init();
|
||||
} else {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
}}
|
||||
|
||||
info!("Starting nym api...");
|
||||
|
||||
@@ -45,9 +45,7 @@ utoipa = { workspace = true, features = ["axum_extras", "time"] }
|
||||
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
|
||||
zeroize.workspace = true
|
||||
|
||||
nym-bin-common = { path = "../../common/bin-common", features = [
|
||||
"basic_tracing",
|
||||
] }
|
||||
nym-bin-common = { path = "../../common/bin-common" }
|
||||
nym-compact-ecash = { path = "../../common/nym_offline_compact_ecash" }
|
||||
nym-config = { path = "../../common/config" }
|
||||
nym-crypto = { path = "../../common/crypto", features = [
|
||||
|
||||
@@ -6,7 +6,7 @@ cfg_if::cfg_if! {
|
||||
use crate::cli::Cli;
|
||||
use clap::Parser;
|
||||
use nym_bin_common::bin_info_owned;
|
||||
use nym_bin_common::logging::setup_tracing_logger;
|
||||
use nym_bin_common::logging::setup_no_otel_logger;
|
||||
use nym_network_defaults::setup_env;
|
||||
use tracing::{info, trace};
|
||||
|
||||
@@ -29,7 +29,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
trace!("args: {cli:#?}");
|
||||
|
||||
setup_env(cli.config_env_file.as_ref());
|
||||
setup_tracing_logger();
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
let bin_info = bin_info_owned!();
|
||||
info!("using the following version: {bin_info}");
|
||||
|
||||
@@ -73,3 +73,7 @@ vergen-gitcl = { workspace = true, default-features = false, features = [
|
||||
|
||||
[package.metadata.cargo-machete]
|
||||
ignored = ["vergen", "nym-http-api-client"]
|
||||
|
||||
[features]
|
||||
default = []
|
||||
otel = ["nym-sdk/otel", "nym-http-api-client/otel", "nym-bin-common/otel"]
|
||||
@@ -84,6 +84,8 @@ fn create_input_message(
|
||||
surbs,
|
||||
lane,
|
||||
packet_type,
|
||||
#[cfg(feature = "otel")]
|
||||
None
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ utoipa-swagger-ui = { workspace = true, features = ["axum"] }
|
||||
tokio-postgres = { workspace = true }
|
||||
|
||||
# internal
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["basic_tracing"] }
|
||||
nym-bin-common = { path = "../common/bin-common" }
|
||||
nym-client-core = { path = "../common/client-core" }
|
||||
nym-crypto = { path = "../common/crypto" }
|
||||
nym-network-defaults = { path = "../common/network-defaults" }
|
||||
|
||||
@@ -189,7 +189,7 @@ async fn nym_topology_from_env() -> anyhow::Result<NymTopology> {
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
|
||||
+12
-2
@@ -31,6 +31,7 @@ humantime-serde = { workspace = true }
|
||||
human-repr = { workspace = true }
|
||||
ipnetwork = { workspace = true }
|
||||
indicatif = { workspace = true }
|
||||
opentelemetry = { workspace = true, optional = true } # make it optional later
|
||||
rand = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json.workspace = true
|
||||
@@ -50,7 +51,6 @@ cupid = { workspace = true }
|
||||
sysinfo = { workspace = true }
|
||||
|
||||
nym-bin-common = { path = "../common/bin-common", features = [
|
||||
"basic_tracing",
|
||||
"output_format",
|
||||
] }
|
||||
nym-client-core-config-types = { path = "../common/client-core/config-types", features = [
|
||||
@@ -130,7 +130,17 @@ criterion = { workspace = true, features = ["async_tokio"] }
|
||||
rand_chacha = { workspace = true }
|
||||
|
||||
[features]
|
||||
tokio-console = ["console-subscriber", "nym-task/tokio-tracing"]
|
||||
default = []
|
||||
tokio-console = ["console-subscriber"]
|
||||
otel = [
|
||||
"nym-bin-common/otel",
|
||||
"nym-gateway/otel",
|
||||
"nym-http-api-common/otel",
|
||||
"nym-sphinx-framing/otel",
|
||||
"nym-sphinx-addressing/otel",
|
||||
"opentelemetry",
|
||||
]
|
||||
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::node::bonding_information::BondingInformation;
|
||||
use crate::node::mixnet::packet_forwarding::global::is_global_ip;
|
||||
use std::fs;
|
||||
use std::net::IpAddr;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tracing::{debug, info, instrument, trace, warn};
|
||||
|
||||
mod args;
|
||||
|
||||
@@ -39,6 +39,7 @@ fn check_public_ips(ips: &[IpAddr], local: bool) -> Result<(), NymNodeError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn execute(mut args: Args) -> Result<(), NymNodeError> {
|
||||
trace!("passed arguments: {args:#?}");
|
||||
|
||||
|
||||
+127
-15
@@ -6,10 +6,18 @@ use crate::cli::commands::{
|
||||
test_throughput,
|
||||
};
|
||||
use crate::env::vars::{NYMNODE_CONFIG_ENV_FILE_ARG, NYMNODE_NO_BANNER_ARG};
|
||||
// use crate::error::NymNodeError;
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use nym_bin_common::bin_info;
|
||||
#[cfg(feature = "otel")]
|
||||
use nym_bin_common::logging::error::TracingError;
|
||||
#[cfg(feature = "otel")]
|
||||
use nym_bin_common::opentelemetry::setup_tracing_logger;
|
||||
use nym_bin_common::{bin_info, logging::setup_no_otel_logger};
|
||||
use std::future::Future;
|
||||
use std::sync::OnceLock;
|
||||
#[cfg(feature = "otel")]
|
||||
use tracing::Instrument;
|
||||
use tracing::instrument;
|
||||
|
||||
pub(crate) mod commands;
|
||||
mod helpers;
|
||||
@@ -52,30 +60,134 @@ impl Cli {
|
||||
.block_on(fut))
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub(crate) fn execute(self) -> anyhow::Result<()> {
|
||||
// NOTE: `test_throughput` sets up its own logger as it has to include additional layers
|
||||
if !matches!(self.command, Commands::TestThroughput(..)) {
|
||||
crate::logging::setup_tracing_logger()?;
|
||||
}
|
||||
// if !matches!(self.command, Commands::TestThroughput(..)) {
|
||||
// crate::logging::setup_tracing_logger()?;
|
||||
// }
|
||||
|
||||
match self.command {
|
||||
Commands::BuildInfo(args) => build_info::execute(args)?,
|
||||
Commands::BondingInformation(args) => {
|
||||
{ Self::execute_async(bonding_information::execute(args))? }?
|
||||
// Sync commands get logger w. no OTEL
|
||||
Commands::BuildInfo(args) => {
|
||||
setup_no_otel_logger()?;
|
||||
build_info::execute(args)?
|
||||
}
|
||||
Commands::NodeDetails(args) => { Self::execute_async(node_details::execute(args))? }?,
|
||||
Commands::Run(args) => { Self::execute_async(run::execute(*args))? }?,
|
||||
Commands::Migrate(args) => migrate::execute(*args)?,
|
||||
Commands::Sign(args) => { Self::execute_async(sign::execute(args))? }?,
|
||||
Commands::TestThroughput(args) => test_throughput::execute(args)?,
|
||||
Commands::UnsafeResetSphinxKeys(args) => {
|
||||
{ Self::execute_async(reset_sphinx_keys::execute(args))? }?
|
||||
Commands::Migrate(args) => {
|
||||
setup_no_otel_logger()?;
|
||||
migrate::execute(*args)?
|
||||
}
|
||||
Commands::Debug(debug) => match debug.command {
|
||||
DebugCommands::ResetProvidersGatewayDbs(args) => {
|
||||
{ Self::execute_async(debug::reset_providers_dbs::execute(args))? }?
|
||||
let _ = Self::execute_async(debug::reset_providers_dbs::execute(args))?;
|
||||
}
|
||||
},
|
||||
Commands::TestThroughput(args) => {
|
||||
// Has its own logging setup
|
||||
test_throughput::execute(args)?
|
||||
}
|
||||
// SigNoz/OTEL run in async context
|
||||
Commands::BondingInformation(args) => Self::execute_async(async move {
|
||||
#[cfg(feature = "otel")]
|
||||
{
|
||||
let _guard =
|
||||
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
|
||||
let main_span = tracing::info_span!("startup", service = "nym-node");
|
||||
async {
|
||||
bonding_information::execute(args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
.instrument(main_span)
|
||||
.await
|
||||
}
|
||||
#[cfg(not(feature = "otel"))]
|
||||
{
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
bonding_information::execute(args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
})??,
|
||||
Commands::NodeDetails(args) => Self::execute_async(async move {
|
||||
#[cfg(feature = "otel")]
|
||||
{
|
||||
let _guard =
|
||||
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
|
||||
let main_span = tracing::info_span!("startup", service = "nym-node");
|
||||
async {
|
||||
node_details::execute(args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
.instrument(main_span)
|
||||
.await
|
||||
}
|
||||
#[cfg(not(feature = "otel"))]
|
||||
{
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
node_details::execute(args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
})??,
|
||||
Commands::Run(args) => Self::execute_async(async move {
|
||||
#[cfg(feature = "otel")]
|
||||
{
|
||||
let _guard =
|
||||
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
|
||||
tracing::warn!("OpenTelemetry is enabled for this nym-node instance.");
|
||||
let main_span = tracing::info_span!("startup", service = "nym-node");
|
||||
async {
|
||||
run::execute(*args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
.instrument(main_span)
|
||||
.await
|
||||
}
|
||||
#[cfg(not(feature = "otel"))]
|
||||
{
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
run::execute(*args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
})??,
|
||||
Commands::Sign(args) => Self::execute_async(async move {
|
||||
#[cfg(feature = "otel")]
|
||||
{
|
||||
let _guard =
|
||||
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
|
||||
let main_span = tracing::info_span!("startup", service = "nym-node");
|
||||
async {
|
||||
sign::execute(args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
.instrument(main_span)
|
||||
.await
|
||||
}
|
||||
#[cfg(not(feature = "otel"))]
|
||||
{
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
sign::execute(args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
})??,
|
||||
Commands::UnsafeResetSphinxKeys(args) => Self::execute_async(async move {
|
||||
#[cfg(feature = "otel")]
|
||||
{
|
||||
let _guard =
|
||||
setup_tracing_logger("nym-node".to_string()).map_err(TracingError::from)?;
|
||||
let main_span = tracing::info_span!("startup", service = "nym-node");
|
||||
async {
|
||||
reset_sphinx_keys::execute(args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
.instrument(main_span)
|
||||
.await
|
||||
}
|
||||
#[cfg(not(feature = "otel"))]
|
||||
{
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
reset_sphinx_keys::execute(args).await?;
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
})??,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
+14
-14
@@ -428,20 +428,20 @@ impl Config {
|
||||
pub fn validate(&self) -> Result<(), NymNodeError> {
|
||||
self.mixnet.validate()?;
|
||||
|
||||
// it's not allowed to run mixnode mode alongside entry mode
|
||||
if self.modes.mixnode && self.modes.entry {
|
||||
return Err(NymNodeError::config_validation_failure(
|
||||
"illegal modes configuration - node cannot run as a mixnode and an entry gateway",
|
||||
));
|
||||
}
|
||||
|
||||
// nor it's allowed to run mixnode mode alongside exit mode
|
||||
// (use two separate checks for better error messages)
|
||||
if self.modes.mixnode && self.modes.exit {
|
||||
return Err(NymNodeError::config_validation_failure(
|
||||
"illegal modes configuration - node cannot run as a mixnode and an exit gateway",
|
||||
));
|
||||
}
|
||||
// // it's not allowed to run mixnode mode alongside entry mode
|
||||
// if self.modes.mixnode && self.modes.entry {
|
||||
// return Err(NymNodeError::config_validation_failure(
|
||||
// "illegal modes configuration - node cannot run as a mixnode and an entry gateway",
|
||||
// ));
|
||||
// }
|
||||
//
|
||||
// // nor it's allowed to run mixnode mode alongside exit mode
|
||||
// // (use two separate checks for better error messages)
|
||||
// if self.modes.mixnode && self.modes.exit {
|
||||
// return Err(NymNodeError::config_validation_failure(
|
||||
// "illegal modes configuration - node cannot run as a mixnode and an exit gateway",
|
||||
// ));
|
||||
// }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -76,6 +76,8 @@ pub enum KeyIOFailure {
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum NymNodeError {
|
||||
// #[error("Failed to setup tracing logger")]
|
||||
// TracingSetupFailure(#[source] anyhow::Error),
|
||||
#[error("this binary version no longer supports migration from legacy mixnodes and gateways")]
|
||||
UnsupportedMigration,
|
||||
|
||||
|
||||
+4
-25
@@ -1,11 +1,11 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_bin_common::logging::{default_tracing_env_filter, default_tracing_fmt_layer};
|
||||
use nym_bin_common::logging::default_tracing_env_filter;
|
||||
use tracing_subscriber::filter::Directive;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
use tracing_subscriber::{EnvFilter, Layer};
|
||||
// use tracing_subscriber::layer::SubscriberExt;
|
||||
// use tracing_subscriber::util::SubscriberInitExt;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
|
||||
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
|
||||
@@ -21,24 +21,3 @@ pub(crate) fn granual_filtered_env() -> anyhow::Result<EnvFilter> {
|
||||
}
|
||||
Ok(filter)
|
||||
}
|
||||
|
||||
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
|
||||
let stderr_layer =
|
||||
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
|
||||
|
||||
cfg_if::cfg_if! {if #[cfg(feature = "tokio-console")] {
|
||||
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
|
||||
let console_layer = console_subscriber::spawn();
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(console_layer)
|
||||
.with(stderr_layer)
|
||||
.init();
|
||||
} else {
|
||||
tracing_subscriber::registry()
|
||||
.with(stderr_layer)
|
||||
.init();
|
||||
}}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::net::SocketAddr;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::codec::Framed;
|
||||
use tracing::{debug, error, instrument, trace, warn};
|
||||
use tracing::{debug, error, instrument, Instrument, trace, warn};
|
||||
|
||||
struct PendingReplayCheckPackets {
|
||||
// map of rotation id used for packet creation to the packets
|
||||
@@ -130,6 +130,7 @@ impl ConnectionHandler {
|
||||
Some(now + delay)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
|
||||
if !self.shared.processing_config.forward_hop_processing_enabled {
|
||||
trace!("this nym-node does not support forward hop packets");
|
||||
@@ -141,6 +142,7 @@ impl ConnectionHandler {
|
||||
self.shared.forward_mix_packet(mix_packet, forward_instant);
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop) {
|
||||
if !self.shared.processing_config.final_hop_processing_enabled {
|
||||
trace!("this nym-node does not support final hop packets");
|
||||
@@ -269,6 +271,7 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
async fn handle_received_packet_with_replay_detection(
|
||||
&mut self,
|
||||
now: Instant,
|
||||
@@ -379,6 +382,7 @@ impl ConnectionHandler {
|
||||
true
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_pending_packets_batch(&mut self, now: Instant) {
|
||||
let batch = self.pending_packets.reset(now);
|
||||
let replay_tags = self.pending_packets.replay_tags();
|
||||
@@ -481,6 +485,7 @@ impl ConnectionHandler {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn handle_stream(
|
||||
&mut self,
|
||||
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
|
||||
@@ -492,9 +497,9 @@ impl ConnectionHandler {
|
||||
trace!("connection handler: received shutdown");
|
||||
break
|
||||
}
|
||||
maybe_framed_nym_packet = mixnet_connection.next() => {
|
||||
maybe_framed_nym_packet = mixnet_connection.next().in_current_span() => {
|
||||
match maybe_framed_nym_packet {
|
||||
Some(Ok(packet)) => self.handle_received_nym_packet(packet).await,
|
||||
Some(Ok(packet)) => self.handle_received_nym_packet(packet).in_current_span().await,
|
||||
Some(Err(err)) => {
|
||||
debug!("connection got corrupted with: {err}");
|
||||
return
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use crate::node::mixnet::SharedData;
|
||||
use nym_task::ShutdownToken;
|
||||
use std::net::SocketAddr;
|
||||
use tracing::{debug, error, info, trace};
|
||||
use tracing::{Instrument, debug, error, info, instrument, trace};
|
||||
|
||||
pub(crate) struct Listener {
|
||||
bind_address: SocketAddr,
|
||||
@@ -18,7 +18,7 @@ impl Listener {
|
||||
shared_data,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub(crate) async fn run(&mut self, shutdown: ShutdownToken) {
|
||||
info!("attempting to run mixnet listener on {}", self.bind_address);
|
||||
|
||||
@@ -41,7 +41,7 @@ impl Listener {
|
||||
trace!("mixnet listener: received shutdown");
|
||||
break
|
||||
}
|
||||
connection = tcp_listener.accept() => {
|
||||
connection = tcp_listener.accept().in_current_span() => {
|
||||
self.shared_data.try_handle_connection(connection);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use nym_sphinx_forwarding::packet::MixPacket;
|
||||
use nym_task::ShutdownToken;
|
||||
use std::io;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, error, trace, warn};
|
||||
use tracing::{debug, error, instrument, Instrument, trace, warn};
|
||||
|
||||
pub(crate) mod global;
|
||||
|
||||
@@ -46,6 +46,7 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
self.packet_sender.clone()
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
fn forward_packet(&mut self, packet: MixPacket)
|
||||
where
|
||||
C: SendWithoutResponse,
|
||||
@@ -78,6 +79,7 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
self.forward_packet(delayed_packet);
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
fn handle_new_packet(&mut self, new_packet: PacketToForward)
|
||||
where
|
||||
C: SendWithoutResponse,
|
||||
@@ -120,6 +122,7 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
.update_packet_forwarder_queue_size(channel_size)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn run(&mut self, shutdown_token: ShutdownToken)
|
||||
where
|
||||
C: SendWithoutResponse,
|
||||
@@ -130,16 +133,16 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown_token.cancelled() => {
|
||||
_ = shutdown_token.cancelled().in_current_span() => {
|
||||
debug!("PacketForwarder: Received shutdown");
|
||||
break;
|
||||
}
|
||||
delayed = self.delay_queue.next() => {
|
||||
delayed = self.delay_queue.next().in_current_span() => {
|
||||
// SAFETY: `stream` implementation of `NonExhaustiveDelayQueue` never returns `None`
|
||||
#[allow(clippy::unwrap_used)]
|
||||
self.handle_done_delaying(delayed.unwrap());
|
||||
}
|
||||
new_packet = self.packet_receiver.next() => {
|
||||
new_packet = self.packet_receiver.next().in_current_span() => {
|
||||
// this one is impossible to ever panic - the struct itself contains a sender
|
||||
// and hence it can't happen that ALL senders are dropped
|
||||
#[allow(clippy::unwrap_used)]
|
||||
|
||||
@@ -23,7 +23,7 @@ use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, error};
|
||||
use tracing::{Instrument, debug, error, instrument};
|
||||
|
||||
pub(crate) mod final_hop;
|
||||
|
||||
@@ -164,6 +164,7 @@ impl SharedData {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(super) fn try_handle_connection(
|
||||
&self,
|
||||
accepted: io::Result<(TcpStream, SocketAddr)>,
|
||||
@@ -172,8 +173,9 @@ impl SharedData {
|
||||
Ok((socket, remote_addr)) => {
|
||||
debug!("accepted incoming mixnet connection from: {remote_addr}");
|
||||
let mut handler = ConnectionHandler::new(self, remote_addr);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { handler.handle_connection(socket).await });
|
||||
let join_handle = tokio::spawn(async move {
|
||||
handler.handle_connection(socket).in_current_span().await
|
||||
});
|
||||
self.log_connected_clients();
|
||||
Some(join_handle)
|
||||
}
|
||||
@@ -184,6 +186,7 @@ impl SharedData {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(super) fn forward_mix_packet(&self, packet: MixPacket, delay_until: Option<Instant>) {
|
||||
if self
|
||||
.mixnet_forwarder
|
||||
|
||||
@@ -66,7 +66,7 @@ use std::ops::Deref;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{debug, info, trace};
|
||||
use tracing::{Instrument, debug, info, instrument, trace};
|
||||
use zeroize::Zeroizing;
|
||||
|
||||
pub mod bonding_information;
|
||||
@@ -601,6 +601,7 @@ impl NymNode {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn start_gateway_tasks(
|
||||
&mut self,
|
||||
cached_network: CachedNetwork,
|
||||
@@ -636,8 +637,10 @@ impl NymNode {
|
||||
let mut websocket = gateway_tasks_builder
|
||||
.build_websocket_listener(active_clients_store.clone())
|
||||
.await?;
|
||||
self.shutdown_tracker()
|
||||
.try_spawn_named(async move { websocket.run().await }, "EntryWebsocket");
|
||||
self.shutdown_tracker().try_spawn_named(
|
||||
async move { websocket.run().in_current_span().await },
|
||||
"EntryWebsocket",
|
||||
);
|
||||
} else {
|
||||
info!("node not running in entry mode: the websocket will remain closed");
|
||||
}
|
||||
@@ -1056,6 +1059,7 @@ impl NymNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn start_mixnet_listener<F>(
|
||||
&self,
|
||||
active_clients_store: &ActiveClientsStore,
|
||||
@@ -1099,7 +1103,7 @@ impl NymNode {
|
||||
let shutdown_token = self.shutdown_token();
|
||||
|
||||
self.shutdown_tracker().try_spawn_named(
|
||||
async move { packet_forwarder.run(shutdown_token).await },
|
||||
async move { packet_forwarder.run(shutdown_token).in_current_span().await },
|
||||
"PacketForwarder",
|
||||
);
|
||||
|
||||
@@ -1123,7 +1127,7 @@ impl NymNode {
|
||||
|
||||
let shutdown_token = self.shutdown_token();
|
||||
self.shutdown_tracker().try_spawn_named(
|
||||
async move { mixnet_listener.run(shutdown_token).await },
|
||||
async move { mixnet_listener.run(shutdown_token).in_current_span().await },
|
||||
"MixnetListener",
|
||||
);
|
||||
|
||||
@@ -1152,6 +1156,7 @@ impl NymNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn start_nym_node_tasks(mut self) -> Result<ShutdownManager, NymNodeError> {
|
||||
info!(
|
||||
"starting Nym Node {} with the following modes: mixnode: {}, entry: {}, exit: {}, wireguard: {}",
|
||||
@@ -1226,6 +1231,7 @@ impl NymNode {
|
||||
Ok(self.shutdown_manager)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn run(mut self) -> Result<(), NymNodeError> {
|
||||
let mut shutdown_signals = self.shutdown_manager.detach_shutdown_signals();
|
||||
|
||||
@@ -1236,7 +1242,7 @@ impl NymNode {
|
||||
// ideally we'd also do some cleanup here, but currently there's no easy way to access the handles
|
||||
return Ok(())
|
||||
}
|
||||
startup_result = self.start_nym_node_tasks() => {
|
||||
startup_result = self.start_nym_node_tasks().in_current_span() => {
|
||||
let mut shutdown_manager = startup_result?;
|
||||
shutdown_manager.replace_shutdown_signals(shutdown_signals);
|
||||
shutdown_manager.run_until_shutdown().await;
|
||||
|
||||
@@ -20,7 +20,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
tracing = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "basic_tracing"] }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
|
||||
nym-ecash-signer-check = { path = "../common/ecash-signer-check" }
|
||||
nym-network-defaults = { path = "../common/network-defaults" }
|
||||
nym-task = { path = "../common/task" }
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
use crate::cli::Cli;
|
||||
use clap::Parser;
|
||||
use nym_bin_common::bin_info_owned;
|
||||
use nym_bin_common::logging::setup_tracing_logger;
|
||||
use nym_bin_common::logging::setup_no_otel_logger;
|
||||
use tracing::{info, trace};
|
||||
|
||||
mod cli;
|
||||
@@ -13,7 +13,7 @@ pub(crate) mod test_result;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
setup_tracing_logger();
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
let cli = Cli::parse();
|
||||
trace!("args: {cli:#?}");
|
||||
|
||||
|
||||
@@ -32,8 +32,8 @@ humantime = { workspace = true }
|
||||
humantime-serde.workspace = true
|
||||
|
||||
# internal
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format", "basic_tracing"] }
|
||||
nym-config = { path = "../common/config" }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
|
||||
nym-config = { path = "../common/config" }
|
||||
nym-ecash-time = { path = "../common/ecash-time" }
|
||||
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
|
||||
nym-compact-ecash = { path = "../common/nym_offline_compact_ecash" }
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
use crate::cli::Cli;
|
||||
use clap::{Parser, crate_name, crate_version};
|
||||
use nym_bin_common::logging::{maybe_print_banner, setup_tracing_logger};
|
||||
use nym_bin_common::logging::{maybe_print_banner, setup_no_otel_logger};
|
||||
use nym_network_defaults::setup_env;
|
||||
|
||||
pub mod cli;
|
||||
@@ -25,7 +25,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let args = Cli::parse();
|
||||
setup_env(args.config_env_file.as_ref());
|
||||
setup_tracing_logger();
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
if !args.no_banner {
|
||||
maybe_print_banner(crate_name!(), crate_version!());
|
||||
|
||||
Generated
-9395
File diff suppressed because it is too large
Load Diff
@@ -13,5 +13,5 @@ clap = { version = "4.0", features = ["derive"] }
|
||||
log = "0.4"
|
||||
serde_json = "1.0.0"
|
||||
|
||||
nym-bin-common = { path = "../../common/bin-common", features = ["basic_tracing"] }
|
||||
nym-bin-common = { path = "../../common/bin-common" }
|
||||
nym-store-cipher = { path = "../../common/store-cipher" }
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use clap::Parser;
|
||||
use nym_bin_common::logging::setup_tracing_logger;
|
||||
use nym_bin_common::logging::setup_no_otel_logger;
|
||||
use nym_store_cipher::{
|
||||
Aes256Gcm, Algorithm, EncryptedData, KdfInfo, Params, StoreCipher, Version, ARGON2_SALT_SIZE,
|
||||
CURRENT_VERSION,
|
||||
@@ -52,7 +52,7 @@ enum ParseMode {
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
setup_tracing_logger();
|
||||
setup_no_otel_logger().expect("failed to initialize logging");
|
||||
let args = Args::parse();
|
||||
let file = File::open(args.file)?;
|
||||
let parse = if args.raw {
|
||||
|
||||
@@ -13,7 +13,7 @@ crate-type = ["cdylib"]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
# Nym clients, addressing, packet format, common tools (logging), ffi shared
|
||||
nym-sdk = { path = "../../rust/nym-sdk/" }
|
||||
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
|
||||
nym-bin-common = { path = "../../../common/bin-common" }
|
||||
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
|
||||
nym-ffi-shared = { path = "../shared" }
|
||||
lazy_static = { workspace = true }
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::types::types::{CMessageCallback, CStringCallback, ReceivedMessage, St
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn init_logging() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
|
||||
@@ -14,7 +14,7 @@ uniffi = { workspace = true, features = ["cli"] }
|
||||
# Nym clients, addressing, packet format, common tools (logging), ffi shared
|
||||
nym-sdk = { path = "../../rust/nym-sdk/" }
|
||||
nym-crypto = { path = "../../../common/crypto" }
|
||||
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
|
||||
nym-bin-common = { path = "../../../common/bin-common" }
|
||||
nym-sphinx-anonymous-replies = { path = "../../../common/nymsphinx/anonymous-replies" }
|
||||
nym-ffi-shared = { path = "../shared" }
|
||||
# Async runtime
|
||||
|
||||
@@ -36,7 +36,7 @@ enum GoWrapError {
|
||||
|
||||
#[no_mangle]
|
||||
fn init_logging() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
|
||||
@@ -43,9 +43,7 @@ nym-socks5-requests = { path = "../../../common/socks5/requests" }
|
||||
nym-ordered-buffer = { path = "../../../common/socks5/ordered-buffer" }
|
||||
nym-service-providers-common = { path = "../../../service-providers/common" }
|
||||
nym-sphinx-addressing = { path = "../../../common/nymsphinx/addressing" }
|
||||
nym-bin-common = { path = "../../../common/bin-common", features = [
|
||||
"basic_tracing",
|
||||
] }
|
||||
nym-bin-common = { path = "../../../common/bin-common" }
|
||||
bytecodec = { workspace = true }
|
||||
httpcodec = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
@@ -71,10 +69,20 @@ tokio-util.workspace = true
|
||||
uuid = { workspace = true, features = ["v4", "serde"] }
|
||||
bincode = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
tracing.workspace = true
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||
# tracing.workspace = true
|
||||
# tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||
dirs.workspace = true
|
||||
|
||||
opentelemetry = { workspace = true, optional = true}
|
||||
opentelemetry_sdk = { workspace = true, optional = true }
|
||||
|
||||
tracing = { workspace = true, features = ["std"] }
|
||||
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
|
||||
tracing-core = { workspace = true }
|
||||
tracing-opentelemetry = { workspace = true, optional = true }
|
||||
opentelemetry-otlp = { workspace = true, optional = true }
|
||||
opentelemetry-semantic-conventions = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = { workspace = true }
|
||||
dotenvy = { workspace = true }
|
||||
@@ -82,7 +90,7 @@ reqwest = { workspace = true, features = ["json", "socks"] }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
time = { workspace = true }
|
||||
nym-bin-common = { path = "../../../common/bin-common", features = ["basic_tracing"] }
|
||||
nym-bin-common = { path = "../../../common/bin-common" }
|
||||
|
||||
# extra dependencies for libp2p examples
|
||||
#libp2p = { git = "https://github.com/ChainSafe/rust-libp2p.git", rev = "e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6", features = [ "identify", "macros", "ping", "tokio", "tcp", "dns", "websocket", "noise", "mplex", "yamux", "gossipsub" ]}
|
||||
@@ -93,3 +101,14 @@ hex = { workspace = true }
|
||||
|
||||
[features]
|
||||
libp2p-vanilla = []
|
||||
otel = [
|
||||
"nym-bin-common/otel",
|
||||
"nym-gateway-requests/otel",
|
||||
"nym-socks5-client-core/otel",
|
||||
"nym-client-core/otel",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"opentelemetry-otlp",
|
||||
"opentelemetry-semantic-conventions",
|
||||
"tracing-opentelemetry",
|
||||
]
|
||||
|
||||
@@ -6,7 +6,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
// right now, only sandbox has coconut setup
|
||||
// this should be run from the `sdk/rust/nym-sdk` directory
|
||||
setup_env(Some("../../../envs/sandbox.env"));
|
||||
|
||||
@@ -3,7 +3,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
// Create client builder, including ephemeral keys. The builder can be usable in the context
|
||||
// where you don't want to connect just yet.
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::path::PathBuf;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
// Specify some config options
|
||||
let config_dir = PathBuf::from("/tmp/mixnet-client");
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
todo!()
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use tokio::signal::ctrl_c;
|
||||
// Run with: cargo run --example client_pool -- ../../../envs/<NETWORK>.env
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to setup logging");
|
||||
setup_env(std::env::args().nth(1));
|
||||
|
||||
let conn_pool = ClientPool::new(2); // Start the Client Pool with 2 Clients always being kept in reserve
|
||||
|
||||
@@ -1,102 +1,100 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use futures::StreamExt;
|
||||
use nym_sdk::mixnet;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_topology::provider_trait::{async_trait, ToTopologyMetadata, TopologyProvider};
|
||||
use nym_topology::{EpochRewardedSet, NymTopology};
|
||||
use nym_validator_client::nym_api::NymApiClientExt;
|
||||
use url::Url;
|
||||
|
||||
struct MyTopologyProvider {
|
||||
validator_client: nym_http_api_client::Client,
|
||||
}
|
||||
|
||||
impl MyTopologyProvider {
|
||||
fn new(nym_api_url: Url) -> MyTopologyProvider {
|
||||
let validator_client = nym_http_api_client::Client::builder(nym_api_url)
|
||||
.expect("Failed to create API client builder")
|
||||
.build()
|
||||
.expect("Failed to build API client");
|
||||
|
||||
MyTopologyProvider { validator_client }
|
||||
}
|
||||
|
||||
async fn get_topology(&self) -> NymTopology {
|
||||
let rewarded_set = self
|
||||
.validator_client
|
||||
.get_current_rewarded_set()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mixnodes_response = self
|
||||
.validator_client
|
||||
.get_all_basic_active_mixing_assigned_nodes_with_metadata()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let metadata = mixnodes_response.metadata.to_topology_metadata();
|
||||
|
||||
let epoch_rewarded_set: EpochRewardedSet = rewarded_set.into();
|
||||
let mut base_topology = NymTopology::new(metadata, epoch_rewarded_set, Vec::new());
|
||||
|
||||
// in our topology provider only use mixnodes that have node_id divisible by 3
|
||||
// and has exactly 100 performance score
|
||||
// why? because this is just an example to showcase arbitrary uses and capabilities of this trait
|
||||
let filtered_mixnodes = mixnodes_response
|
||||
.nodes
|
||||
.into_iter()
|
||||
.filter(|mix| mix.node_id % 3 == 0 && mix.performance.is_hundred())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let gateways = self
|
||||
.validator_client
|
||||
.get_all_basic_entry_assigned_nodes_with_metadata()
|
||||
.await
|
||||
.unwrap()
|
||||
.nodes;
|
||||
|
||||
base_topology.add_skimmed_nodes(&filtered_mixnodes);
|
||||
base_topology.add_skimmed_nodes(&gateways);
|
||||
base_topology
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TopologyProvider for MyTopologyProvider {
|
||||
// this will be manually refreshed on a timer specified inside mixnet client config
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
Some(self.get_topology().await)
|
||||
}
|
||||
}
|
||||
use nym_topology::{
|
||||
CachedEpochRewardedSet, EntryDetails, HardcodedTopologyProvider, NymTopology,
|
||||
NymTopologyMetadata, RoutingNode, SupportedRoles,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::time::sleep;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to setup logging");
|
||||
|
||||
let nym_api = "https://validator.nymtech.net/api/".parse().unwrap();
|
||||
let my_topology_provider = MyTopologyProvider::new(nym_api);
|
||||
// let nym_api = "https://validator.nymtech.net/api/".parse().unwrap();
|
||||
// let my_topology_provider = MyTopologyProvider::new(nym_api);
|
||||
|
||||
let topology_metadata = NymTopologyMetadata::new(0, 0, OffsetDateTime::now_utc());
|
||||
let mut rewarded_set = CachedEpochRewardedSet::default();
|
||||
rewarded_set.entry_gateways.insert(1);
|
||||
rewarded_set.layer1.insert(1);
|
||||
rewarded_set.layer2.insert(1);
|
||||
rewarded_set.layer3.insert(1);
|
||||
|
||||
let nodes = vec![RoutingNode {
|
||||
node_id: 1,
|
||||
mix_host: "127.0.0.1:1789".parse().unwrap(),
|
||||
entry: Some(EntryDetails {
|
||||
ip_addresses: vec!["127.0.0.1".parse().unwrap()],
|
||||
clients_ws_port: 9000,
|
||||
hostname: None,
|
||||
clients_wss_port: None,
|
||||
}),
|
||||
identity_key: "PUT IDENTITY KEY HERE".parse().unwrap(),
|
||||
sphinx_key: "PUT SPHINX KEY HERE".parse().unwrap(),
|
||||
supported_roles: SupportedRoles {
|
||||
mixnode: true,
|
||||
mixnet_entry: true,
|
||||
mixnet_exit: true,
|
||||
},
|
||||
}];
|
||||
|
||||
let topology_provider =
|
||||
HardcodedTopologyProvider::new(NymTopology::new(topology_metadata, rewarded_set, nodes));
|
||||
|
||||
// Passing no config makes the client fire up an ephemeral session and figure things out on its own
|
||||
let mut client = mixnet::MixnetClientBuilder::new_ephemeral()
|
||||
.custom_topology_provider(Box::new(my_topology_provider))
|
||||
.custom_topology_provider(Box::new(topology_provider))
|
||||
.build()
|
||||
.unwrap()
|
||||
.connect_to_mixnet()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let our_address = client.nym_address();
|
||||
let our_address = *client.nym_address();
|
||||
println!("Our client nym address is: {our_address}");
|
||||
|
||||
// Send a message through the mixnet to ourselves
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
let sender = client.split_sender();
|
||||
|
||||
println!("Waiting for message (ctrl-c to exit)");
|
||||
client
|
||||
.on_messages(|msg| println!("Received: {}", String::from_utf8_lossy(&msg.message)))
|
||||
.await;
|
||||
const MAX_MESSAGES: usize = 100;
|
||||
|
||||
// receiving task
|
||||
let receiving_task_handle = tokio::spawn(async move {
|
||||
let mut received_count = 0;
|
||||
while let Some(received) = client.next().await {
|
||||
received_count += 1;
|
||||
println!(
|
||||
"{received_count}: received: {}",
|
||||
String::from_utf8_lossy(&received.message)
|
||||
);
|
||||
if received_count >= MAX_MESSAGES {
|
||||
break;
|
||||
}
|
||||
}
|
||||
client.disconnect().await;
|
||||
});
|
||||
|
||||
// sending task
|
||||
let sending_task_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
if sender
|
||||
.send_plain_message(our_address, "hello there")
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
});
|
||||
|
||||
// wait for both tasks to be done
|
||||
println!("waiting for shutdown");
|
||||
sending_task_handle.await.unwrap();
|
||||
receiving_task_handle.await.unwrap();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
use nym_sdk::mixnet::{
|
||||
AnonymousSenderTag, MixnetClientBuilder, MixnetMessageSender, ReconstructedMessage,
|
||||
};
|
||||
use nym_topology::{
|
||||
CachedEpochRewardedSet, EntryDetails, HardcodedTopologyProvider, NymTopology,
|
||||
NymTopologyMetadata, RoutingNode, SupportedRoles,
|
||||
};
|
||||
#[cfg(feature = "otel")]
|
||||
use opentelemetry::trace::{TraceContextExt, Tracer};
|
||||
#[cfg(feature = "otel")]
|
||||
use opentelemetry::{global, Context};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::instrument;
|
||||
use tracing::warn;
|
||||
|
||||
#[tokio::main]
|
||||
#[instrument(name = "sdk-example-surb-reply", skip_all)]
|
||||
async fn main() {
|
||||
#[cfg(feature = "otel")]
|
||||
{
|
||||
nym_bin_common::opentelemetry::setup_tracing_logger(
|
||||
"local-sdk-example-surb-reply".to_string(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let tracer = global::tracer("local-sdk-example-surb-reply");
|
||||
let span = tracer.start("client-root-span");
|
||||
let cx = Context::current_with_span(span);
|
||||
let _guard = cx.clone().attach();
|
||||
|
||||
let trace_id = cx.span().span_context().trace_id();
|
||||
warn!("Main TRACE_ID: {:?}", trace_id);
|
||||
}
|
||||
#[cfg(not(feature = "otel"))]
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
// Create a mixnet client which connect to a local node
|
||||
let topology_metadata = NymTopologyMetadata::new(0, 0, OffsetDateTime::now_utc());
|
||||
let mut rewarded_set = CachedEpochRewardedSet::default();
|
||||
rewarded_set.entry_gateways.insert(1);
|
||||
rewarded_set.layer1.insert(1);
|
||||
rewarded_set.layer2.insert(1);
|
||||
rewarded_set.layer3.insert(1);
|
||||
|
||||
let nodes = vec![RoutingNode {
|
||||
node_id: 1,
|
||||
mix_host: "127.0.0.1:1789".parse().unwrap(),
|
||||
entry: Some(EntryDetails {
|
||||
ip_addresses: vec!["127.0.0.1".parse().unwrap()],
|
||||
clients_ws_port: 9000,
|
||||
hostname: None,
|
||||
clients_wss_port: None,
|
||||
}),
|
||||
identity_key: "put here identity_key".parse().unwrap(),
|
||||
sphinx_key: "put here sphinx_key".parse().unwrap(),
|
||||
supported_roles: SupportedRoles {
|
||||
mixnode: true,
|
||||
mixnet_entry: true,
|
||||
mixnet_exit: true,
|
||||
},
|
||||
}];
|
||||
|
||||
let topology_provider =
|
||||
HardcodedTopologyProvider::new(NymTopology::new(topology_metadata, rewarded_set, nodes));
|
||||
|
||||
let mut client = MixnetClientBuilder::new_ephemeral()
|
||||
.custom_topology_provider(Box::new(topology_provider))
|
||||
.build()
|
||||
.unwrap()
|
||||
.connect_to_mixnet()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let our_address = client.nym_address();
|
||||
println!("\nOur client nym address is: {our_address}");
|
||||
|
||||
// Send a message through the mixnet to ourselves using our nym address
|
||||
client
|
||||
.send_plain_message(*our_address, "hello there")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// we're going to parse the sender_tag (AnonymousSenderTag) from the incoming message and use it to 'reply' to ourselves instead of our Nym address.
|
||||
// we know there will be a sender_tag since the sdk sends SURBs along with messages by default.
|
||||
println!("Waiting for message\n");
|
||||
|
||||
// get the actual message - discard the empty vec sent along with a potential SURB topup request
|
||||
let mut message: Vec<ReconstructedMessage> = Vec::new();
|
||||
while let Some(new_message) = client.wait_for_messages().await {
|
||||
if new_message.is_empty() {
|
||||
continue;
|
||||
}
|
||||
message = new_message;
|
||||
break;
|
||||
}
|
||||
|
||||
let mut parsed = String::new();
|
||||
if let Some(r) = message.first() {
|
||||
parsed = String::from_utf8(r.message.clone()).unwrap();
|
||||
}
|
||||
// parse sender_tag: we will use this to reply to sender without needing their Nym address
|
||||
let return_recipient: AnonymousSenderTag = message[0].sender_tag.unwrap();
|
||||
println!(
|
||||
"\nReceived the following message: {parsed} \nfrom sender with surb bucket {return_recipient}"
|
||||
);
|
||||
|
||||
// reply to self with it: note we use `send_str_reply` instead of `send_str`
|
||||
println!("Replying with using SURBs");
|
||||
client
|
||||
.send_reply(return_recipient, "hi an0n!")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("Waiting for message (once you see it, ctrl-c to exit)\n");
|
||||
client
|
||||
.on_messages(|msg| println!("\nReceived: {}", String::from_utf8_lossy(&msg.message)))
|
||||
.await;
|
||||
}
|
||||
@@ -8,7 +8,7 @@ use nym_topology::provider_trait::async_trait;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
// Just some plain data to pretend we have some external storage that the application
|
||||
// implementer is using.
|
||||
|
||||
@@ -7,7 +7,7 @@ use nym_topology::{NymTopology, NymTopologyMetadata, RoutingNode, SupportedRoles
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
// Passing no config makes the client fire up an ephemeral session and figure shit out on its own
|
||||
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
|
||||
@@ -7,7 +7,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
// Passing no config makes the client fire up an ephemeral session and figure stuff out on its own
|
||||
let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
|
||||
@@ -6,7 +6,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
|
||||
// An example of creating a client relying on a testnet, in this case Sandbox.
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
// relative root is `sdk/rust/nym-sdk/` for fallback file path
|
||||
let env_path =
|
||||
std::env::var("NYM_ENV_PATH").unwrap_or_else(|_| "../../../envs/sandbox.env".to_string());
|
||||
|
||||
@@ -3,7 +3,7 @@ use nym_sdk::mixnet::MixnetMessageSender;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
// Passing no config makes the client fire up an ephemeral session and figure shit out on its own
|
||||
// let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
|
||||
@@ -2,7 +2,7 @@ use nym_sdk::mixnet;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
nym_bin_common::logging::setup_tracing_logger();
|
||||
nym_bin_common::logging::setup_no_otel_logger().expect("failed to initialize logging");
|
||||
|
||||
println!("Connecting receiver");
|
||||
let mut receiving_client = mixnet::MixnetClient::connect_new().await.unwrap();
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user