pr feedback
- Moved OTel CLI options into a separate OtelArgs - Otel is built behind the feature flag otel - Store timing is in microseconds - Restore comments to existing files
This commit is contained in:
@@ -312,10 +312,13 @@ impl SendWithoutResponse for Client {
|
||||
let address = packet.next_hop_address();
|
||||
trace!("Sending packet to {address}");
|
||||
|
||||
// TODO: optimisation for the future: rather than constantly using legacy encoding,
|
||||
// use the mix packet type / flags to pick encoding per packet
|
||||
let framed_packet =
|
||||
FramedNymPacket::from_mix_packet(packet, self.config.use_legacy_packet_encoding);
|
||||
|
||||
let Some(sender) = self.active_connections.get_mut(&address) else {
|
||||
// there was never a connection to begin with
|
||||
debug!(
|
||||
event = "mixclient.try_send",
|
||||
peer = %address,
|
||||
|
||||
+52
-38
@@ -21,26 +21,11 @@ fn pretty_build_info_static() -> &'static str {
|
||||
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Cli {
|
||||
/// Path pointing to an env file that configures the nym-node and overrides any preconfigured values.
|
||||
#[clap(
|
||||
short,
|
||||
long,
|
||||
env = NYMNODE_CONFIG_ENV_FILE_ARG
|
||||
)]
|
||||
pub(crate) config_env_file: Option<std::path::PathBuf>,
|
||||
|
||||
/// Flag used for disabling the printed banner in tty.
|
||||
#[clap(
|
||||
long,
|
||||
env = NYMNODE_NO_BANNER_ARG
|
||||
)]
|
||||
pub(crate) no_banner: bool,
|
||||
|
||||
/// OpenTelemetry-related CLI arguments. Only present when built with the `otel` feature.
|
||||
#[cfg(feature = "otel")]
|
||||
#[derive(Args, Debug, Clone)]
|
||||
pub(crate) struct OtelArgs {
|
||||
/// Enable OpenTelemetry tracing export via OTLP/gRPC.
|
||||
/// Requires the binary to be built with the `otel` feature.
|
||||
#[clap(long, env = "NYMNODE_OTEL_ENABLE")]
|
||||
pub(crate) otel: bool,
|
||||
|
||||
@@ -71,6 +56,29 @@ pub(crate) struct Cli {
|
||||
/// Timeout in seconds for each OTLP export batch. Prevents unbounded blocking.
|
||||
#[clap(long, env = "NYMNODE_OTEL_EXPORT_TIMEOUT", default_value = "10")]
|
||||
pub(crate) otel_export_timeout: u64,
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Cli {
|
||||
/// Path pointing to an env file that configures the nym-node and overrides any preconfigured values.
|
||||
#[clap(
|
||||
short,
|
||||
long,
|
||||
env = NYMNODE_CONFIG_ENV_FILE_ARG
|
||||
)]
|
||||
pub(crate) config_env_file: Option<std::path::PathBuf>,
|
||||
|
||||
/// Flag used for disabling the printed banner in tty.
|
||||
#[clap(
|
||||
long,
|
||||
env = NYMNODE_NO_BANNER_ARG
|
||||
)]
|
||||
pub(crate) no_banner: bool,
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
#[clap(flatten)]
|
||||
pub(crate) otel: OtelArgs,
|
||||
|
||||
#[clap(subcommand)]
|
||||
command: Commands,
|
||||
@@ -78,13 +86,19 @@ pub(crate) struct Cli {
|
||||
|
||||
impl Cli {
|
||||
pub(crate) fn execute(self) -> anyhow::Result<()> {
|
||||
// test_throughput sets up its own logger and builds a runtime internally.
|
||||
if let Commands::TestThroughput(args) = self.command {
|
||||
return test_throughput::execute(args);
|
||||
}
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
// Set up tracing inside the runtime so the OTel batch exporter
|
||||
// Set up tracing inside the runtime so the OTel batch exporter (when enabled)
|
||||
// can spawn its background tasks on the tokio reactor.
|
||||
let _otel_guard = runtime.block_on(async { self.setup_logging() })?;
|
||||
let use_otel = matches!(self.command, Commands::Run(..));
|
||||
let _otel_guard = runtime.block_on(async { self.setup_logging(use_otel) })?;
|
||||
|
||||
// `_otel_guard` is dropped at function exit, flushing pending spans via its Drop impl
|
||||
runtime.block_on(async {
|
||||
@@ -95,7 +109,7 @@ impl Cli {
|
||||
Commands::Run(args) => run::execute(*args).await?,
|
||||
Commands::Migrate(args) => migrate::execute(*args)?,
|
||||
Commands::Sign(args) => sign::execute(args).await?,
|
||||
Commands::TestThroughput(args) => test_throughput::execute(args)?,
|
||||
Commands::TestThroughput(..) => unreachable!(),
|
||||
Commands::UnsafeResetSphinxKeys(args) => reset_sphinx_keys::execute(args).await?,
|
||||
Commands::Debug(debug) => match debug.command {
|
||||
DebugCommands::ResetProvidersGatewayDbs(args) => {
|
||||
@@ -107,15 +121,16 @@ impl Cli {
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
fn build_otel_config(&self) -> Option<crate::logging::OtelConfig> {
|
||||
if self.otel {
|
||||
if self.otel.otel {
|
||||
Some(crate::logging::OtelConfig {
|
||||
endpoint: self.otel_endpoint.clone(),
|
||||
endpoint: self.otel.otel_endpoint.clone(),
|
||||
service_name: "nym-node".to_string(),
|
||||
ingestion_key: self.otel_key.clone(),
|
||||
environment: self.otel_env.clone(),
|
||||
sample_ratio: self.otel_sample_ratio,
|
||||
export_timeout_secs: self.otel_export_timeout,
|
||||
ingestion_key: self.otel.otel_key.clone(),
|
||||
environment: self.otel.otel_env.clone(),
|
||||
sample_ratio: self.otel.otel_sample_ratio,
|
||||
export_timeout_secs: self.otel.otel_export_timeout,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
@@ -123,19 +138,18 @@ impl Cli {
|
||||
}
|
||||
|
||||
#[cfg(feature = "otel")]
|
||||
fn setup_logging(&self) -> anyhow::Result<Option<crate::logging::OtelGuard>> {
|
||||
if matches!(self.command, Commands::TestThroughput(..)) {
|
||||
return Ok(None);
|
||||
}
|
||||
crate::logging::setup_tracing_logger(self.build_otel_config())
|
||||
fn setup_logging(&self, use_otel: bool) -> anyhow::Result<Option<crate::logging::OtelGuard>> {
|
||||
let otel_config = if use_otel {
|
||||
self.build_otel_config()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
crate::logging::setup_tracing_logger(otel_config)
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "otel"))]
|
||||
fn setup_logging(&self) -> anyhow::Result<Option<()>> {
|
||||
if matches!(self.command, Commands::TestThroughput(..)) {
|
||||
return Ok(None);
|
||||
}
|
||||
crate::logging::setup_tracing_logger(self.build_otel_config())?;
|
||||
fn setup_logging(&self, _use_otel: bool) -> anyhow::Result<Option<()>> {
|
||||
crate::logging::setup_tracing_logger()?;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,8 +8,7 @@ use tracing_subscriber::util::SubscriberInitExt;
|
||||
use tracing_subscriber::{EnvFilter, Layer};
|
||||
|
||||
/// Configuration for OpenTelemetry OTLP export.
|
||||
/// Fields are only read when the `otel` feature is enabled.
|
||||
#[allow(dead_code)]
|
||||
#[cfg(feature = "otel")]
|
||||
pub(crate) struct OtelConfig {
|
||||
/// OTLP/gRPC collector endpoint, e.g. `http://localhost:4317`
|
||||
/// or `https://ingest.eu.signoz.cloud:443` for SigNoz Cloud.
|
||||
@@ -134,8 +133,7 @@ pub(crate) fn setup_tracing_logger(otel: Option<OtelConfig>) -> anyhow::Result<O
|
||||
|
||||
/// Non-OTel variant -- identical subscriber stack without the OTLP layer.
|
||||
#[cfg(not(feature = "otel"))]
|
||||
pub(crate) fn setup_tracing_logger(otel: Option<OtelConfig>) -> anyhow::Result<()> {
|
||||
let _ = otel;
|
||||
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
|
||||
let stderr_layer =
|
||||
default_tracing_fmt_layer(std::io::stderr).with_filter(granual_filtered_env()?);
|
||||
|
||||
|
||||
@@ -195,8 +195,10 @@ impl ConnectionHandler {
|
||||
let message = final_hop_data.message;
|
||||
let has_ack = final_hop_data.forward_ack.is_some();
|
||||
|
||||
// if possible attempt to push message directly to the client
|
||||
match self.shared.try_push_message_to_client(client, message) {
|
||||
Err(unsent_plaintext) => {
|
||||
// if that failed, store it on disk
|
||||
Span::current().record("client_online", false);
|
||||
match self
|
||||
.shared
|
||||
@@ -221,6 +223,8 @@ impl ConnectionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// if we managed to either push message directly to the [online] client or store it at
|
||||
// disk, forward the ack
|
||||
self.shared.forward_ack_packet(final_hop_data.forward_ack);
|
||||
if has_ack {
|
||||
Span::current().record("ack_forwarded", true);
|
||||
|
||||
@@ -55,6 +55,7 @@ impl<C, F> PacketForwarder<C, F> {
|
||||
|
||||
if let Err(err) = self.mixnet_client.send_without_response(packet) {
|
||||
if err.kind() == io::ErrorKind::WouldBlock {
|
||||
// we only know for sure if we dropped a packet if our sending queue was full
|
||||
warn!(
|
||||
event = "packet.dropped.buffer_full",
|
||||
next_hop = %next_hop,
|
||||
|
||||
@@ -73,16 +73,16 @@ impl SharedFinalHopData {
|
||||
let start = Instant::now();
|
||||
debug!("Storing received message for {client_address} on the disk...",);
|
||||
let result = self.storage.store_message(client_address, message).await;
|
||||
let store_ms = start.elapsed().as_millis() as u64;
|
||||
let store_us = start.elapsed().as_micros() as u64;
|
||||
if result.is_ok() {
|
||||
debug!(
|
||||
event = "gateway.disk_store",
|
||||
store_ms, "stored message for {client_address} on disk in {store_ms}ms"
|
||||
store_us, "stored message for {client_address} on disk in {store_us}us"
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
event = "gateway.disk_store_failed",
|
||||
store_ms, "failed to store message for {client_address} on disk after {store_ms}ms"
|
||||
store_us, "failed to store message for {client_address} on disk after {store_us}us"
|
||||
);
|
||||
}
|
||||
result
|
||||
|
||||
Reference in New Issue
Block a user