Promethus is our friend (#4408)
* Generic prom wrapper idea * Extend packet_statistics control with prom metrics * Replace counters with Counters * Add legacy mixnode api route * fmt * Sanitize metric names * Format metrics * Script to make prom targets * More metrics * Update script * Make sure we dont panic in the future * Remove fragile test * Add metrics endpoint auth * Remove per IP metrics * Update target script, node_exporter setup * Remove prom from client * Simplify node stat * Centralize metrice, break cpucycles temporarily * Remove prometheus from mixnode * Add cpu-cycles to Prom * Further centralize Registry * Cleanup old tracing * Remove spurious assignment * Move cpu-cycles to metrics * Add features * setup_logging before logging * Remove cpucycle measurement in favour of time * Cleanup, hygine
This commit is contained in:
Generated
+57
-37
@@ -1429,14 +1429,6 @@ dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpu-cycles"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.9"
|
||||
@@ -1916,12 +1908,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "darling"
|
||||
version = "0.20.3"
|
||||
version = "0.20.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e"
|
||||
checksum = "fc5d6b04b3fd0ba9926f945895de7d806260a2d7431ba82e7edaecb043c4c6b8"
|
||||
dependencies = [
|
||||
"darling_core 0.20.3",
|
||||
"darling_macro 0.20.3",
|
||||
"darling_core 0.20.5",
|
||||
"darling_macro 0.20.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1954,9 +1946,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "darling_core"
|
||||
version = "0.20.3"
|
||||
version = "0.20.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621"
|
||||
checksum = "04e48a959bcd5c761246f5d090ebc2fbf7b9cd527a492b07a67510c108f1e7e3"
|
||||
dependencies = [
|
||||
"fnv",
|
||||
"ident_case",
|
||||
@@ -1990,11 +1982,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "darling_macro"
|
||||
version = "0.20.3"
|
||||
version = "0.20.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5"
|
||||
checksum = "1d1545d67a2149e1d93b7e5c7752dce5a7426eb5d1357ddcfd89336b94444f77"
|
||||
dependencies = [
|
||||
"darling_core 0.20.3",
|
||||
"darling_core 0.20.5",
|
||||
"quote",
|
||||
"syn 2.0.38",
|
||||
]
|
||||
@@ -5717,6 +5709,17 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-metrics"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"dashmap",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"prometheus",
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-mixnet-client"
|
||||
version = "0.1.0"
|
||||
@@ -5757,19 +5760,19 @@ dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"bs58 0.5.0",
|
||||
"cfg-if",
|
||||
"clap 4.4.7",
|
||||
"colored",
|
||||
"cpu-cycles",
|
||||
"cupid",
|
||||
"dirs 4.0.0",
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"nym-bin-common",
|
||||
"nym-config",
|
||||
"nym-contracts-common",
|
||||
"nym-crypto",
|
||||
"nym-metrics",
|
||||
"nym-mixnet-client",
|
||||
"nym-mixnode-common",
|
||||
"nym-node",
|
||||
@@ -5782,7 +5785,6 @@ dependencies = [
|
||||
"nym-topology",
|
||||
"nym-types",
|
||||
"nym-validator-client",
|
||||
"opentelemetry",
|
||||
"rand 0.7.3",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -5791,7 +5793,6 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"toml 0.5.11",
|
||||
"tracing",
|
||||
"url",
|
||||
]
|
||||
|
||||
@@ -5800,13 +5801,12 @@ name = "nym-mixnode-common"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"cfg-if",
|
||||
"cpu-cycles",
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
"log",
|
||||
"nym-bin-common",
|
||||
"nym-crypto",
|
||||
"nym-metrics",
|
||||
"nym-network-defaults",
|
||||
"nym-sphinx-acknowledgements",
|
||||
"nym-sphinx-addressing",
|
||||
@@ -5821,7 +5821,6 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"url",
|
||||
]
|
||||
|
||||
@@ -7441,6 +7440,21 @@ dependencies = [
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prometheus"
|
||||
version = "0.13.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"fnv",
|
||||
"lazy_static",
|
||||
"memchr",
|
||||
"parking_lot 0.12.1",
|
||||
"protobuf",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prometheus-client"
|
||||
version = "0.19.0"
|
||||
@@ -7563,10 +7577,16 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "psl"
|
||||
version = "2.1.14"
|
||||
name = "protobuf"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "383703acfc34f7a00724846c14dc5ea4407c59e5aedcbbb18a1c0c1a23fe5013"
|
||||
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
|
||||
|
||||
[[package]]
|
||||
name = "psl"
|
||||
version = "2.1.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dc74a6e6a56708be1cf5c4c4d1a0dc21d33b2dcaa24e731b7fa9c287ce4f916f"
|
||||
dependencies = [
|
||||
"psl-types",
|
||||
]
|
||||
@@ -7962,13 +7982,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.10.2"
|
||||
version = "1.10.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
|
||||
checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-automata 0.4.3",
|
||||
"regex-automata 0.4.6",
|
||||
"regex-syntax 0.8.2",
|
||||
]
|
||||
|
||||
@@ -7983,9 +8003,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.4.3"
|
||||
version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
|
||||
checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -8808,9 +8828,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_with"
|
||||
version = "3.4.0"
|
||||
version = "3.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23"
|
||||
checksum = "1b0ed1662c5a68664f45b76d18deb0e234aff37207086803165c961eb695e981"
|
||||
dependencies = [
|
||||
"base64 0.21.4",
|
||||
"chrono",
|
||||
@@ -8825,11 +8845,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_with_macros"
|
||||
version = "3.4.0"
|
||||
version = "3.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788"
|
||||
checksum = "568577ff0ef47b879f736cd66740e022f3672788cdf002a05a4e609ea5a6fb15"
|
||||
dependencies = [
|
||||
"darling 0.20.3",
|
||||
"darling 0.20.5",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.38",
|
||||
|
||||
+19
-6
@@ -32,7 +32,7 @@ members = [
|
||||
"common/cosmwasm-smart-contracts/coconut-bandwidth-contract",
|
||||
"common/cosmwasm-smart-contracts/coconut-dkg",
|
||||
"common/cosmwasm-smart-contracts/contracts-common",
|
||||
# "common/cosmwasm-smart-contracts/ephemera",
|
||||
# "common/cosmwasm-smart-contracts/ephemera",
|
||||
"common/cosmwasm-smart-contracts/group-contract",
|
||||
"common/cosmwasm-smart-contracts/mixnet-contract",
|
||||
"common/cosmwasm-smart-contracts/multisig-contract",
|
||||
@@ -57,6 +57,7 @@ members = [
|
||||
"common/nonexhaustive-delayqueue",
|
||||
"common/nymcoconut",
|
||||
"common/nym-id",
|
||||
"common/nym-metrics",
|
||||
"common/nymsphinx",
|
||||
"common/nymsphinx/acknowledgements",
|
||||
"common/nymsphinx/addressing",
|
||||
@@ -112,9 +113,10 @@ members = [
|
||||
"tools/nymvisor",
|
||||
"tools/ts-rs-cli",
|
||||
"wasm/client",
|
||||
# "wasm/full-nym-wasm",
|
||||
# "wasm/full-nym-wasm",
|
||||
"wasm/mix-fetch",
|
||||
"wasm/node-tester",
|
||||
"common/nym-metrics",
|
||||
]
|
||||
|
||||
default-members = [
|
||||
@@ -130,7 +132,16 @@ default-members = [
|
||||
"nym-validator-rewarder",
|
||||
]
|
||||
|
||||
exclude = ["explorer", "contracts", "nym-wallet", "nym-connect/mobile/src-tauri", "nym-connect/desktop", "nym-vpn/ui/src-tauri", "cpu-cycles", "sdk/ffi/cpp"]
|
||||
exclude = [
|
||||
"explorer",
|
||||
"contracts",
|
||||
"nym-wallet",
|
||||
"nym-connect/mobile/src-tauri",
|
||||
"nym-connect/desktop",
|
||||
"nym-vpn/ui/src-tauri",
|
||||
"cpu-cycles",
|
||||
"sdk/ffi/cpp",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
authors = ["Nym Technologies SA"]
|
||||
@@ -180,10 +191,12 @@ utoipa-swagger-ui = "3.1.5"
|
||||
url = "2.4"
|
||||
zeroize = "1.6.0"
|
||||
|
||||
prometheus = { version = "0.13.0" }
|
||||
|
||||
# coconut/DKG related
|
||||
# unfortunately until https://github.com/zkcrypto/bls12_381/issues/10 is resolved, we have to rely on the fork
|
||||
# as we need to be able to serialize Gt so that we could create the lookup table for baby-step-giant-step algorithm
|
||||
bls12_381 = { git = "https://github.com/jstuczyn/bls12_381", branch ="feature/gt-serialization-0.8.0" }
|
||||
bls12_381 = { git = "https://github.com/jstuczyn/bls12_381", branch = "feature/gt-serialization-0.8.0" }
|
||||
group = "0.13.0"
|
||||
ff = "0.13.0"
|
||||
|
||||
@@ -208,9 +221,9 @@ cw-controllers = { version = "=1.1.0" }
|
||||
bip32 = "0.5.1"
|
||||
|
||||
# temporarily using a fork again (yay.) because we need staking and slashing support
|
||||
cosmrs = { git = "https://github.com/jstuczyn/cosmos-rust", branch ="nym-temp/all-validator-features" }
|
||||
cosmrs = { git = "https://github.com/jstuczyn/cosmos-rust", branch = "nym-temp/all-validator-features" }
|
||||
#cosmrs = { git = "https://github.com/jstuczyn/cosmos-rust", branch = "nym-temp/all-validator-features" } # unfortuntely we need a fork by yours truly to get the staking support
|
||||
tendermint = "0.34" # same version as used by cosmrs
|
||||
tendermint = "0.34" # same version as used by cosmrs
|
||||
tendermint-rpc = "0.34" # same version as used by cosmrs
|
||||
prost = "0.12"
|
||||
|
||||
|
||||
@@ -25,9 +25,6 @@ tokio-util = { workspace = true, features = ["codec"] }
|
||||
url = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
## tracing
|
||||
tracing = { version = "0.1.37", optional = true }
|
||||
|
||||
nym-crypto = { path = "../crypto" }
|
||||
nym-network-defaults = { path = "../network-defaults" }
|
||||
nym-sphinx-acknowledgements = { path = "../nymsphinx/acknowledgements" }
|
||||
@@ -39,9 +36,4 @@ nym-sphinx-types = { path = "../nymsphinx/types" }
|
||||
nym-task = { path = "../task" }
|
||||
nym-validator-client = { path = "../client-libs/validator-client" }
|
||||
nym-bin-common = { path = "../bin-common" }
|
||||
|
||||
cfg-if = "1.0.0"
|
||||
cpu-cycles = { path = "../../cpu-cycles", optional = true }
|
||||
|
||||
[features]
|
||||
cpucycles = ["cpu-cycles", "tracing"]
|
||||
nym-metrics = { path = "../nym-metrics" }
|
||||
|
||||
@@ -2,40 +2,3 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
pub mod packet_processor;
|
||||
pub mod verloc;
|
||||
|
||||
pub fn cpu_cycles() -> Result<i64, Box<dyn std::error::Error>> {
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(feature = "cpucycles")] {
|
||||
Ok(cpu_cycles::cpucycles()?)
|
||||
} else {
|
||||
Err("`cpucycles` feature is not turned on!".into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! measure {
|
||||
( $x:expr ) => {{
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(feature = "cpucycles")] {
|
||||
let start_cycles = $crate::cpu_cycles();
|
||||
// if the block needs to return something, we can return it
|
||||
let r = $x;
|
||||
let end_cycles = $crate::cpu_cycles();
|
||||
let name = if let Some(meta) = tracing::Span::current().metadata() {
|
||||
meta.name()
|
||||
} else {
|
||||
"measure"
|
||||
};
|
||||
match (start_cycles, end_cycles) {
|
||||
(Ok(start), Ok(end)) => log::trace!("{} cpucycles: {}", name, end - start),
|
||||
(Err(e), _) => error!("{e}"),
|
||||
(_, Err(e)) => error!("{e}"),
|
||||
}
|
||||
r
|
||||
} else {
|
||||
$x
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::measure;
|
||||
use crate::packet_processor::error::MixProcessingError;
|
||||
use log::*;
|
||||
use nym_metrics::nanos;
|
||||
use nym_sphinx_acknowledgements::surb_ack::SurbAck;
|
||||
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
|
||||
use nym_sphinx_forwarding::packet::MixPacket;
|
||||
@@ -15,8 +15,6 @@ use nym_sphinx_types::{
|
||||
};
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use tracing::instrument;
|
||||
|
||||
type ForwardAck = MixPacket;
|
||||
|
||||
@@ -51,15 +49,11 @@ impl SphinxPacketProcessor {
|
||||
}
|
||||
|
||||
/// Performs a fresh sphinx unwrapping using no cache.
|
||||
#[cfg_attr(
|
||||
feature = "cpucycles",
|
||||
instrument(skip(self, packet), fields(cpucycles))
|
||||
)]
|
||||
fn perform_initial_packet_processing(
|
||||
&self,
|
||||
packet: NymPacket,
|
||||
) -> Result<NymProcessedPacket, MixProcessingError> {
|
||||
measure!({
|
||||
nanos!("perform_initial_packet_processing", {
|
||||
packet.process(&self.sphinx_key).map_err(|err| {
|
||||
debug!("Failed to unwrap NymPacket packet: {err}");
|
||||
MixProcessingError::NymPacketProcessingError(err)
|
||||
@@ -68,17 +62,12 @@ impl SphinxPacketProcessor {
|
||||
}
|
||||
|
||||
/// Takes the received framed packet and tries to unwrap it from the sphinx encryption.
|
||||
#[cfg_attr(
|
||||
feature = "cpucycles",
|
||||
instrument(skip(self, received), fields(cpucycles))
|
||||
)]
|
||||
fn perform_initial_unwrapping(
|
||||
&self,
|
||||
received: FramedNymPacket,
|
||||
) -> Result<NymProcessedPacket, MixProcessingError> {
|
||||
measure!({
|
||||
nanos!("perform_initial_unwrapping", {
|
||||
let packet = received.into_inner();
|
||||
|
||||
self.perform_initial_packet_processing(packet)
|
||||
})
|
||||
}
|
||||
@@ -223,16 +212,12 @@ impl SphinxPacketProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
feature = "cpucycles",
|
||||
instrument(skip(self, received), fields(cpucycles))
|
||||
)]
|
||||
pub fn process_received(
|
||||
&self,
|
||||
received: FramedNymPacket,
|
||||
) -> Result<MixProcessingResult, MixProcessingError> {
|
||||
// explicit packet size will help to correctly parse final hop
|
||||
measure!({
|
||||
nanos!("process_received", {
|
||||
let packet_size = received.packet_size();
|
||||
let packet_type = received.packet_type();
|
||||
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "nym-metrics"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
prometheus = { workspace = true }
|
||||
log = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
regex = "1.1"
|
||||
lazy_static = "1.4"
|
||||
@@ -0,0 +1,180 @@
|
||||
use dashmap::DashMap;
|
||||
pub use log::error;
|
||||
use log::{debug, warn};
|
||||
use regex::Regex;
|
||||
use std::fmt;
|
||||
pub use std::time::Instant;
|
||||
|
||||
use prometheus::{core::Collector, Counter, Encoder as _, Gauge, Registry, TextEncoder};
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! nanos {
|
||||
( $name:literal, $x:expr ) => {{
|
||||
let start = $crate::Instant::now();
|
||||
// if the block needs to return something, we can return it
|
||||
let r = $x;
|
||||
let duration = start.elapsed().as_nanos() as f64;
|
||||
$crate::REGISTRY.inc_by(&format!("{}_nanos", $name), duration);
|
||||
r
|
||||
}};
|
||||
}
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref RE: Regex = Regex::new(r"[^a-zA-Z0-9_]").unwrap();
|
||||
pub static ref REGISTRY: MetricsController = MetricsController::default();
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MetricsController {
|
||||
registry: Registry,
|
||||
registry_index: DashMap<String, Metric>,
|
||||
}
|
||||
|
||||
enum Metric {
|
||||
C(Box<Counter>),
|
||||
G(Box<Gauge>),
|
||||
}
|
||||
|
||||
fn fq_name(c: &dyn Collector) -> String {
|
||||
c.desc()
|
||||
.first()
|
||||
.map(|d| d.fq_name.clone())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
impl Metric {
|
||||
fn fq_name(&self) -> String {
|
||||
match self {
|
||||
Metric::C(c) => fq_name(c.as_ref()),
|
||||
Metric::G(g) => fq_name(g.as_ref()),
|
||||
}
|
||||
}
|
||||
|
||||
fn inc_by(&self, value: f64) {
|
||||
match self {
|
||||
Metric::C(c) => c.inc_by(value),
|
||||
Metric::G(g) => g.add(value),
|
||||
}
|
||||
}
|
||||
|
||||
fn set(&self, value: f64) {
|
||||
match self {
|
||||
Metric::C(_c) => {
|
||||
warn!("Cannot set value for counter {:?}", self.fq_name());
|
||||
}
|
||||
Metric::G(g) => g.set(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for MetricsController {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let mut buffer = vec![];
|
||||
let encoder = TextEncoder::new();
|
||||
let metrics = self.registry.gather();
|
||||
match encoder.encode(&metrics, &mut buffer) {
|
||||
Ok(_) => {}
|
||||
Err(e) => return write!(f, "Error encoding metrics to buffer: {}", e),
|
||||
}
|
||||
let output = match String::from_utf8(buffer) {
|
||||
Ok(output) => output,
|
||||
Err(e) => return write!(f, "Error decoding metrics to String: {}", e),
|
||||
};
|
||||
write!(f, "{}", output)
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricsController {
|
||||
pub fn set(&self, name: &str, value: f64) {
|
||||
if let Some(metric) = self.registry_index.get(name) {
|
||||
metric.set(value);
|
||||
} else {
|
||||
let gauge = match Gauge::new(sanitize_metric_name(name), name) {
|
||||
Ok(g) => g,
|
||||
Err(e) => {
|
||||
debug!("Failed to create gauge {:?}:\n{}", name, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
self.register_gauge(Box::new(gauge));
|
||||
self.set(name, value)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inc_by(&self, name: &str, value: f64) {
|
||||
if let Some(metric) = self.registry_index.get(name) {
|
||||
metric.inc_by(value);
|
||||
} else {
|
||||
let counter = match Counter::new(sanitize_metric_name(name), name) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
debug!("Failed to create counter {:?}:\n{}", name, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
self.register_counter(Box::new(counter));
|
||||
self.inc_by(name, value)
|
||||
}
|
||||
}
|
||||
|
||||
fn register_gauge(&self, metric: Box<Gauge>) {
|
||||
let fq_name = metric
|
||||
.desc()
|
||||
.first()
|
||||
.map(|d| d.fq_name.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
if self.registry_index.contains_key(&fq_name) {
|
||||
return;
|
||||
}
|
||||
|
||||
match self.registry.register(metric.clone()) {
|
||||
Ok(_) => {
|
||||
self.registry_index
|
||||
.insert(fq_name, Metric::G(metric.clone()));
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to register {:?}:\n{}", fq_name, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn register_counter(&self, metric: Box<Counter>) {
|
||||
let fq_name = metric
|
||||
.desc()
|
||||
.first()
|
||||
.map(|d| d.fq_name.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
if self.registry_index.contains_key(&fq_name) {
|
||||
return;
|
||||
}
|
||||
match self.registry.register(metric.clone()) {
|
||||
Ok(_) => {
|
||||
self.registry_index
|
||||
.insert(fq_name, Metric::C(metric.clone()));
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to register {:?}:\n{}", fq_name, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn sanitize_metric_name(name: &str) -> String {
|
||||
RE.replace_all(name, "_").to_string()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_sanitization() {
|
||||
assert_eq!(
|
||||
sanitize_metric_name("packets_sent_34.242.65.133:1789"),
|
||||
"packets_sent_34_242_65_133_1789"
|
||||
)
|
||||
}
|
||||
}
|
||||
+2
-15
@@ -26,6 +26,7 @@ cupid = "0.6.1"
|
||||
dirs = "4.0"
|
||||
futures = { workspace = true }
|
||||
humantime-serde = "1.0"
|
||||
lazy_static = "1.4"
|
||||
log = { workspace = true }
|
||||
rand = "0.7.3"
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
@@ -35,14 +36,8 @@ tokio = { workspace = true, features = ["rt-multi-thread", "net", "signal"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
toml = "0.5.8"
|
||||
url = { workspace = true, features = ["serde"] }
|
||||
cfg-if = "1.0.0"
|
||||
thiserror = { workspace = true }
|
||||
|
||||
## tracing
|
||||
tracing = { workspace = true, optional = true }
|
||||
opentelemetry = { version = "0.19.0", optional = true }
|
||||
|
||||
|
||||
# internal
|
||||
nym-node = { path = "../nym-node" }
|
||||
|
||||
@@ -51,6 +46,7 @@ nym-crypto = { path = "../common/crypto" }
|
||||
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
|
||||
nym-mixnet-client = { path = "../common/client-libs/mixnet-client" }
|
||||
nym-mixnode-common = { path = "../common/mixnode-common" }
|
||||
nym-metrics = { path = "../common/nym-metrics" }
|
||||
nym-nonexhaustive-delayqueue = { path = "../common/nonexhaustive-delayqueue" }
|
||||
nym-sphinx = { path = "../common/nymsphinx" }
|
||||
nym-sphinx-params = { path = "../common/nymsphinx/params" }
|
||||
@@ -60,7 +56,6 @@ nym-types = { path = "../common/types" }
|
||||
nym-topology = { path = "../common/topology" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
|
||||
cpu-cycles = { path = "../cpu-cycles", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = [
|
||||
@@ -73,14 +68,6 @@ tokio = { workspace = true, features = [
|
||||
nym-sphinx-types = { path = "../common/nymsphinx/types" }
|
||||
nym-sphinx-params = { path = "../common/nymsphinx/params" }
|
||||
|
||||
[features]
|
||||
cpucycles = [
|
||||
"nym-mixnode-common/cpucycles",
|
||||
"tracing",
|
||||
"opentelemetry",
|
||||
"nym-bin-common/tracing",
|
||||
]
|
||||
|
||||
[package.metadata.deb]
|
||||
name = "nym-mixnode"
|
||||
maintainer-scripts = "debian"
|
||||
|
||||
@@ -42,6 +42,9 @@ pub(crate) struct Init {
|
||||
|
||||
#[clap(short, long, default_value_t = OutputFormat::default())]
|
||||
output: OutputFormat,
|
||||
|
||||
#[clap(long)]
|
||||
metrics_key: Option<String>,
|
||||
}
|
||||
|
||||
impl From<Init> for OverrideConfig {
|
||||
@@ -53,6 +56,7 @@ impl From<Init> for OverrideConfig {
|
||||
verloc_port: init_config.verloc_port,
|
||||
http_api_port: init_config.http_api_port,
|
||||
nym_apis: init_config.nym_apis,
|
||||
metrics_key: init_config.metrics_key,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,6 +59,7 @@ struct OverrideConfig {
|
||||
verloc_port: Option<u16>,
|
||||
http_api_port: Option<u16>,
|
||||
nym_apis: Option<Vec<url::Url>>,
|
||||
metrics_key: Option<String>,
|
||||
}
|
||||
|
||||
pub(crate) async fn execute(args: Cli) -> anyhow::Result<()> {
|
||||
@@ -83,6 +84,7 @@ fn override_config(config: Config, args: OverrideConfig) -> Config {
|
||||
.with_optional(Config::with_mix_port, args.mix_port)
|
||||
.with_optional(Config::with_verloc_port, args.verloc_port)
|
||||
.with_optional(Config::with_http_api_port, args.http_api_port)
|
||||
.with_optional(Config::with_metrics_key, args.metrics_key)
|
||||
.with_optional_custom_env(
|
||||
Config::with_custom_nym_apis,
|
||||
args.nym_apis,
|
||||
|
||||
@@ -44,6 +44,9 @@ pub(crate) struct Run {
|
||||
|
||||
#[clap(short, long, default_value_t = OutputFormat::default())]
|
||||
output: OutputFormat,
|
||||
|
||||
#[clap(long)]
|
||||
metrics_key: Option<String>,
|
||||
}
|
||||
|
||||
impl From<Run> for OverrideConfig {
|
||||
@@ -55,6 +58,7 @@ impl From<Run> for OverrideConfig {
|
||||
verloc_port: run_config.verloc_port,
|
||||
http_api_port: run_config.http_api_port,
|
||||
nym_apis: run_config.nym_apis,
|
||||
metrics_key: run_config.metrics_key,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,6 +80,7 @@ fn default_mixnode_http_config() -> config::Http {
|
||||
DEFAULT_HTTP_API_LISTENING_PORT,
|
||||
),
|
||||
landing_page_assets_path: None,
|
||||
metrics_key: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,6 +209,15 @@ impl Config {
|
||||
pub fn get_nym_api_endpoints(&self) -> Vec<Url> {
|
||||
self.mixnode.nym_api_urls.clone()
|
||||
}
|
||||
|
||||
pub fn with_metrics_key(mut self, metrics_key: String) -> Self {
|
||||
self.http.metrics_key = Some(metrics_key);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn metrics_key(&self) -> Option<&String> {
|
||||
self.http.metrics_key.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, PartialEq, Serialize)]
|
||||
|
||||
@@ -95,6 +95,7 @@ impl From<ConfigV1_1_32> for Config {
|
||||
value.mixnode.http_api_port,
|
||||
),
|
||||
landing_page_assets_path: None,
|
||||
metrics_key: None,
|
||||
},
|
||||
// /\ ADDED
|
||||
mixnode: MixNode {
|
||||
|
||||
@@ -57,6 +57,8 @@ bind_address = '{{ http.bind_address }}'
|
||||
# Path to assets directory of custom landing page of this node
|
||||
landing_page_assets_path = '{{ http.landing_page_assets_path }}'
|
||||
|
||||
metrics_key = '{{ http.metrics_key }}'
|
||||
|
||||
[storage_paths]
|
||||
|
||||
# Path to file containing private identity key.
|
||||
|
||||
+1
-27
@@ -3,18 +3,11 @@
|
||||
|
||||
use ::nym_config::defaults::setup_env;
|
||||
use clap::{crate_name, crate_version, Parser};
|
||||
use log::info;
|
||||
use nym_bin_common::bin_info;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use nym_bin_common::logging::{maybe_print_banner, setup_logging};
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use nym_bin_common::setup_tracing;
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use nym_mixnode_common::measure;
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use tracing::instrument;
|
||||
|
||||
mod commands;
|
||||
mod config;
|
||||
@@ -41,12 +34,6 @@ struct Cli {
|
||||
command: commands::Commands,
|
||||
}
|
||||
|
||||
#[cfg(feature = "cpucycles")]
|
||||
#[instrument(fields(cpucycles))]
|
||||
fn test_function() {
|
||||
measure!({})
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let args = Cli::parse();
|
||||
@@ -56,23 +43,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
maybe_print_banner(crate_name!(), crate_version!());
|
||||
}
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(feature = "cpucycles")] {
|
||||
setup_tracing!("mixnode");
|
||||
info!("CPU cycles measurement is ON")
|
||||
} else {
|
||||
setup_logging();
|
||||
info!("CPU cycles measurement is OFF")
|
||||
}
|
||||
}
|
||||
setup_logging();
|
||||
|
||||
commands::execute(args).await?;
|
||||
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(feature = "cpucycles")] {
|
||||
opentelemetry::global::shutdown_tracer_provider();
|
||||
}}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use crate::node::http::legacy::description::description;
|
||||
use crate::node::http::legacy::hardware::hardware;
|
||||
use crate::node::http::legacy::state::MixnodeAppState;
|
||||
use crate::node::http::legacy::stats::metrics;
|
||||
use crate::node::http::legacy::stats::stats;
|
||||
use crate::node::http::legacy::verloc::verloc;
|
||||
use crate::node::node_description::NodeDescription;
|
||||
@@ -29,6 +30,7 @@ pub(crate) mod api_routes {
|
||||
pub(crate) const VERLOC: &str = "/verloc";
|
||||
pub(crate) const DESCRIPTION: &str = "/description";
|
||||
pub(crate) const STATS: &str = "/stats";
|
||||
pub(crate) const METRICS: &str = "/metrics";
|
||||
pub(crate) const HARDWARE: &str = "/hardware";
|
||||
}
|
||||
|
||||
@@ -44,6 +46,7 @@ pub(crate) fn routes<S: Send + Sync + 'static + Clone>(
|
||||
)
|
||||
.route(api_routes::STATS, get(stats))
|
||||
.route(api_routes::HARDWARE, get(hardware))
|
||||
.route(api_routes::METRICS, get(metrics))
|
||||
.fallback(not_found)
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use axum::extract::FromRef;
|
||||
pub(crate) struct MixnodeAppState {
|
||||
pub(crate) verloc: VerlocState,
|
||||
pub(crate) stats: SharedNodeStats,
|
||||
pub(crate) metrics_key: Option<String>,
|
||||
}
|
||||
|
||||
impl FromRef<MixnodeAppState> for VerlocState {
|
||||
|
||||
@@ -1,33 +1,59 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node::node_statistics::{NodeStats, NodeStatsSimple, SharedNodeStats};
|
||||
use axum::extract::{Query, State};
|
||||
use crate::node::node_statistics::{NodeStatsSimple, SharedNodeStats};
|
||||
|
||||
use axum::{
|
||||
extract::{Query, State},
|
||||
http::HeaderMap,
|
||||
};
|
||||
use nym_metrics::REGISTRY;
|
||||
use nym_node::http::api::{FormattedResponse, Output};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::state::MixnodeAppState;
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum NodeStatsResponse {
|
||||
Full(NodeStats),
|
||||
Full(String),
|
||||
Simple(NodeStatsSimple),
|
||||
}
|
||||
|
||||
pub(crate) async fn metrics(State(state): State<MixnodeAppState>, headers: HeaderMap) -> String {
|
||||
if let Some(metrics_key) = state.metrics_key {
|
||||
if let Some(auth) = headers.get("Authorization") {
|
||||
if auth.to_str().unwrap_or_default() == format!("Bearer {}", metrics_key) {
|
||||
REGISTRY.to_string()
|
||||
} else {
|
||||
"Unauthorized".to_string()
|
||||
}
|
||||
} else {
|
||||
"Unauthorized".to_string()
|
||||
}
|
||||
} else {
|
||||
"Set metrics_key in config to enable Prometheus metrics".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn stats(
|
||||
Query(params): Query<StatsQueryParams>,
|
||||
State(stats): State<SharedNodeStats>,
|
||||
) -> MixnodeStatsResponse {
|
||||
let output = params.output.unwrap_or_default();
|
||||
|
||||
let snapshot_data = stats.clone_data().await;
|
||||
|
||||
// there's no point in returning the entire hashmap of sending destinations in regular mode
|
||||
let response = if params.debug {
|
||||
NodeStatsResponse::Full(snapshot_data)
|
||||
let response = generate_stats(params.debug, stats).await;
|
||||
output.to_response(response)
|
||||
}
|
||||
|
||||
async fn generate_stats(full: bool, stats: SharedNodeStats) -> NodeStatsResponse {
|
||||
let snapshot_data = stats.clone_data().await;
|
||||
if full {
|
||||
NodeStatsResponse::Full(REGISTRY.to_string())
|
||||
} else {
|
||||
NodeStatsResponse::Simple(snapshot_data.simplify())
|
||||
};
|
||||
output.to_response(response)
|
||||
}
|
||||
}
|
||||
|
||||
pub type MixnodeStatsResponse = FormattedResponse<NodeStatsResponse>;
|
||||
|
||||
@@ -64,6 +64,12 @@ impl<'a> HttpApiBuilder<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub(crate) fn with_metrics_key(mut self, metrics_key: Option<&String>) -> Self {
|
||||
self.legacy_mixnode.metrics_key = metrics_key.map(|k| k.to_string());
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub(crate) fn with_verloc(mut self, verloc: VerlocState) -> Self {
|
||||
self.legacy_mixnode.verloc = verloc;
|
||||
|
||||
@@ -9,7 +9,7 @@ use crate::node::TaskClient;
|
||||
use futures::StreamExt;
|
||||
use log::debug;
|
||||
use log::{error, info, warn};
|
||||
use nym_mixnode_common::measure;
|
||||
use nym_metrics::nanos;
|
||||
use nym_sphinx::forwarding::packet::MixPacket;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_sphinx::framing::packet::FramedNymPacket;
|
||||
@@ -19,9 +19,6 @@ use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use tracing::instrument;
|
||||
|
||||
pub(crate) mod packet_processing;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -53,10 +50,6 @@ impl ConnectionHandler {
|
||||
.expect("the delay-forwarder has died!");
|
||||
}
|
||||
|
||||
#[cfg_attr(
|
||||
feature = "cpucycles",
|
||||
instrument(skip(self, framed_sphinx_packet), fields(cpucycles))
|
||||
)]
|
||||
fn handle_received_packet(&self, framed_sphinx_packet: FramedNymPacket) {
|
||||
//
|
||||
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
||||
@@ -66,7 +59,7 @@ impl ConnectionHandler {
|
||||
|
||||
// all processing such, key caching, etc. was done.
|
||||
// however, if it was a forward hop, we still need to delay it
|
||||
measure!({
|
||||
nanos!("handle_received_packet", {
|
||||
match self.packet_processor.process_received(framed_sphinx_packet) {
|
||||
Err(err) => debug!("We failed to process received sphinx packet - {err}"),
|
||||
Ok(res) => match res {
|
||||
|
||||
@@ -72,11 +72,13 @@ impl MixNode {
|
||||
&self,
|
||||
atomic_verloc_result: AtomicVerlocResult,
|
||||
node_stats_pointer: SharedNodeStats,
|
||||
metrics_key: Option<&String>,
|
||||
task_client: TaskClient,
|
||||
) -> Result<(), MixnodeError> {
|
||||
HttpApiBuilder::new(&self.config, &self.identity_keypair, &self.sphinx_keypair)
|
||||
.with_verloc(VerlocState::new(atomic_verloc_result))
|
||||
.with_mixing_stats(node_stats_pointer)
|
||||
.with_metrics_key(metrics_key)
|
||||
.with_descriptor(self.descriptor.clone())
|
||||
.start(task_client)
|
||||
}
|
||||
@@ -249,6 +251,7 @@ impl MixNode {
|
||||
self.start_http_api(
|
||||
atomic_verloc_results,
|
||||
node_stats_pointer,
|
||||
self.config.metrics_key(),
|
||||
shutdown.subscribe().named("http-api"),
|
||||
)?;
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_metrics::REGISTRY;
|
||||
|
||||
use super::TaskClient;
|
||||
use futures::channel::mpsc;
|
||||
use futures::lock::Mutex;
|
||||
@@ -15,7 +17,7 @@ use std::time::{Duration, SystemTime};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
|
||||
// convenience aliases
|
||||
type PacketsMap = HashMap<String, u64>;
|
||||
type PacketsMap = HashMap<String, f64>;
|
||||
type PacketDataReceiver = mpsc::UnboundedReceiver<PacketEvent>;
|
||||
type PacketDataSender = mpsc::UnboundedSender<PacketEvent>;
|
||||
|
||||
@@ -27,14 +29,15 @@ pub(crate) struct SharedNodeStats {
|
||||
impl SharedNodeStats {
|
||||
pub(crate) fn new() -> Self {
|
||||
let now = SystemTime::now();
|
||||
|
||||
SharedNodeStats {
|
||||
inner: Arc::new(RwLock::new(NodeStats {
|
||||
update_time: now,
|
||||
previous_update_time: now,
|
||||
packets_received_since_startup: 0,
|
||||
packets_sent_since_startup: HashMap::new(),
|
||||
packets_explicitly_dropped_since_startup: HashMap::new(),
|
||||
packets_received_since_last_update: 0,
|
||||
packets_received_since_startup: 0.,
|
||||
packets_sent_since_startup_all: 0.,
|
||||
packets_dropped_since_startup_all: 0.,
|
||||
packets_received_since_last_update: 0.,
|
||||
packets_sent_since_last_update: HashMap::new(),
|
||||
packets_explicitly_dropped_since_last_update: HashMap::new(),
|
||||
})),
|
||||
@@ -43,7 +46,7 @@ impl SharedNodeStats {
|
||||
|
||||
pub(crate) async fn update(
|
||||
&self,
|
||||
new_received: u64,
|
||||
new_received: f64,
|
||||
new_sent: PacketsMap,
|
||||
new_dropped: PacketsMap,
|
||||
) {
|
||||
@@ -54,20 +57,21 @@ impl SharedNodeStats {
|
||||
guard.update_time = snapshot_time;
|
||||
|
||||
guard.packets_received_since_startup += new_received;
|
||||
for (mix, count) in &new_sent {
|
||||
*guard
|
||||
.packets_sent_since_startup
|
||||
.entry(mix.clone())
|
||||
.or_insert(0) += *count;
|
||||
for count in new_sent.values() {
|
||||
guard.packets_sent_since_startup_all += count;
|
||||
}
|
||||
|
||||
for (mix, count) in &new_dropped {
|
||||
*guard
|
||||
.packets_explicitly_dropped_since_last_update
|
||||
.entry(mix.clone())
|
||||
.or_insert(0) += *count;
|
||||
for count in new_dropped.values() {
|
||||
guard.packets_dropped_since_startup_all += count;
|
||||
}
|
||||
|
||||
REGISTRY.inc_by("packets_received_since_startup", new_received);
|
||||
REGISTRY.inc_by("packets_sent_since_startup_all", new_sent.values().sum());
|
||||
REGISTRY.inc_by(
|
||||
"packets_dropped_since_startup_all",
|
||||
new_dropped.values().sum(),
|
||||
);
|
||||
|
||||
guard.packets_received_since_last_update = new_received;
|
||||
guard.packets_sent_since_last_update = new_sent;
|
||||
guard.packets_explicitly_dropped_since_last_update = new_dropped;
|
||||
@@ -82,27 +86,18 @@ impl SharedNodeStats {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub struct NodeStats {
|
||||
#[serde(serialize_with = "humantime_serde::serialize")]
|
||||
update_time: SystemTime,
|
||||
|
||||
#[serde(serialize_with = "humantime_serde::serialize")]
|
||||
previous_update_time: SystemTime,
|
||||
|
||||
packets_received_since_startup: u64,
|
||||
|
||||
// note: sent does not imply forwarded. We don't know if it was delivered successfully
|
||||
packets_sent_since_startup: PacketsMap,
|
||||
|
||||
// we know for sure we dropped packets to those destinations
|
||||
packets_explicitly_dropped_since_startup: PacketsMap,
|
||||
|
||||
packets_received_since_last_update: u64,
|
||||
|
||||
packets_received_since_startup: f64,
|
||||
packets_sent_since_startup_all: f64,
|
||||
packets_dropped_since_startup_all: f64,
|
||||
packets_received_since_last_update: f64,
|
||||
// note: sent does not imply forwarded. We don't know if it was delivered successfully
|
||||
packets_sent_since_last_update: PacketsMap,
|
||||
|
||||
// we know for sure we dropped packets to those destinations
|
||||
packets_explicitly_dropped_since_last_update: PacketsMap,
|
||||
}
|
||||
@@ -112,10 +107,10 @@ impl Default for NodeStats {
|
||||
NodeStats {
|
||||
update_time: SystemTime::UNIX_EPOCH,
|
||||
previous_update_time: SystemTime::UNIX_EPOCH,
|
||||
packets_received_since_startup: 0,
|
||||
packets_sent_since_startup: Default::default(),
|
||||
packets_explicitly_dropped_since_startup: Default::default(),
|
||||
packets_received_since_last_update: 0,
|
||||
packets_received_since_startup: 0.,
|
||||
packets_sent_since_startup_all: 0.,
|
||||
packets_dropped_since_startup_all: 0.,
|
||||
packets_received_since_last_update: 0.,
|
||||
packets_sent_since_last_update: Default::default(),
|
||||
packets_explicitly_dropped_since_last_update: Default::default(),
|
||||
}
|
||||
@@ -128,11 +123,8 @@ impl NodeStats {
|
||||
update_time: self.update_time,
|
||||
previous_update_time: self.previous_update_time,
|
||||
packets_received_since_startup: self.packets_received_since_startup,
|
||||
packets_sent_since_startup: self.packets_sent_since_startup.values().sum(),
|
||||
packets_explicitly_dropped_since_startup: self
|
||||
.packets_explicitly_dropped_since_startup
|
||||
.values()
|
||||
.sum(),
|
||||
packets_sent_since_startup: self.packets_sent_since_startup_all,
|
||||
packets_explicitly_dropped_since_startup: self.packets_dropped_since_startup_all,
|
||||
packets_received_since_last_update: self.packets_received_since_last_update,
|
||||
packets_sent_since_last_update: self.packets_sent_since_last_update.values().sum(),
|
||||
packets_explicitly_dropped_since_last_update: self
|
||||
@@ -151,21 +143,21 @@ pub struct NodeStatsSimple {
|
||||
#[serde(serialize_with = "humantime_serde::serialize")]
|
||||
previous_update_time: SystemTime,
|
||||
|
||||
packets_received_since_startup: u64,
|
||||
packets_received_since_startup: f64,
|
||||
|
||||
// note: sent does not imply forwarded. We don't know if it was delivered successfully
|
||||
packets_sent_since_startup: u64,
|
||||
packets_sent_since_startup: f64,
|
||||
|
||||
// we know for sure we dropped those packets
|
||||
packets_explicitly_dropped_since_startup: u64,
|
||||
packets_explicitly_dropped_since_startup: f64,
|
||||
|
||||
packets_received_since_last_update: u64,
|
||||
packets_received_since_last_update: f64,
|
||||
|
||||
// note: sent does not imply forwarded. We don't know if it was delivered successfully
|
||||
packets_sent_since_last_update: u64,
|
||||
packets_sent_since_last_update: f64,
|
||||
|
||||
// we know for sure we dropped those packets
|
||||
packets_explicitly_dropped_since_last_update: u64,
|
||||
packets_explicitly_dropped_since_last_update: f64,
|
||||
}
|
||||
|
||||
pub(crate) enum PacketEvent {
|
||||
@@ -203,14 +195,14 @@ impl CurrentPacketData {
|
||||
|
||||
async fn increment_sent(&self, destination: String) {
|
||||
let mut unlocked = self.inner.sent.lock().await;
|
||||
let receiver_count = unlocked.entry(destination).or_insert(0);
|
||||
*receiver_count += 1;
|
||||
let receiver_count = unlocked.entry(destination).or_insert(0.);
|
||||
*receiver_count += 1.;
|
||||
}
|
||||
|
||||
async fn increment_dropped(&self, destination: String) {
|
||||
let mut unlocked = self.inner.dropped.lock().await;
|
||||
let dropped_count = unlocked.entry(destination).or_insert(0);
|
||||
*dropped_count += 1;
|
||||
let dropped_count = unlocked.entry(destination).or_insert(0.);
|
||||
*dropped_count += 1.;
|
||||
}
|
||||
|
||||
async fn acquire_and_reset(&self) -> (u64, PacketsMap, PacketsMap) {
|
||||
@@ -332,7 +324,9 @@ impl StatsUpdater {
|
||||
async fn update_stats(&self) {
|
||||
// grab new data since last update
|
||||
let (received, sent, dropped) = self.current_packet_data.acquire_and_reset().await;
|
||||
self.current_stats.update(received, sent, dropped).await;
|
||||
self.current_stats
|
||||
.update(received as f64, sent, dropped)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn run(&mut self) {
|
||||
@@ -376,21 +370,18 @@ impl PacketStatsConsoleLogger {
|
||||
|
||||
info!(
|
||||
"Since startup mixed {} packets! ({} in last {} seconds)",
|
||||
stats.packets_sent_since_startup.values().sum::<u64>(),
|
||||
stats.packets_sent_since_last_update.values().sum::<u64>(),
|
||||
stats.packets_sent_since_startup_all,
|
||||
stats.packets_sent_since_last_update.values().sum::<f64>(),
|
||||
difference_secs,
|
||||
);
|
||||
if !stats.packets_explicitly_dropped_since_startup.is_empty() {
|
||||
if stats.packets_dropped_since_startup_all > 0. {
|
||||
info!(
|
||||
"Since startup dropped {} packets! ({} in last {} seconds)",
|
||||
stats
|
||||
.packets_explicitly_dropped_since_startup
|
||||
.values()
|
||||
.sum::<u64>(),
|
||||
stats.packets_dropped_since_startup_all,
|
||||
stats
|
||||
.packets_explicitly_dropped_since_last_update
|
||||
.values()
|
||||
.sum::<u64>(),
|
||||
.sum::<f64>(),
|
||||
difference_secs,
|
||||
);
|
||||
}
|
||||
@@ -403,22 +394,19 @@ impl PacketStatsConsoleLogger {
|
||||
);
|
||||
trace!(
|
||||
"Since startup sent packets to the following: \n{:#?} \n And in last {} seconds: {:#?})",
|
||||
stats.packets_sent_since_startup,
|
||||
stats.packets_sent_since_startup_all,
|
||||
difference_secs,
|
||||
stats.packets_sent_since_last_update
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"Since startup mixed {} packets!",
|
||||
stats.packets_sent_since_startup.values().sum::<u64>(),
|
||||
stats.packets_sent_since_startup_all,
|
||||
);
|
||||
if !stats.packets_explicitly_dropped_since_startup.is_empty() {
|
||||
if stats.packets_dropped_since_startup_all > 0. {
|
||||
info!(
|
||||
"Since startup dropped {} packets!",
|
||||
stats
|
||||
.packets_explicitly_dropped_since_startup
|
||||
.values()
|
||||
.sum::<u64>(),
|
||||
stats.packets_dropped_since_startup_all,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -427,8 +415,8 @@ impl PacketStatsConsoleLogger {
|
||||
stats.packets_received_since_startup
|
||||
);
|
||||
trace!(
|
||||
"Since startup sent packets to the following: \n{:#?}",
|
||||
stats.packets_sent_since_startup
|
||||
"Since startup sent packets {}",
|
||||
stats.packets_sent_since_startup_all
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -545,14 +533,11 @@ mod tests {
|
||||
|
||||
// Get output (stats)
|
||||
let stats = node_stats_pointer.read().await;
|
||||
assert_eq!(&stats.packets_sent_since_startup.get("foo"), &Some(&2u64));
|
||||
assert_eq!(&stats.packets_sent_since_startup.len(), &1);
|
||||
assert_eq!(
|
||||
&stats.packets_sent_since_last_update.get("foo"),
|
||||
&Some(&2u64)
|
||||
);
|
||||
assert_eq!(&stats.packets_sent_since_startup_all, &2.);
|
||||
assert_eq!(&stats.packets_sent_since_last_update.get("foo"), &Some(&2.));
|
||||
assert_eq!(&stats.packets_sent_since_last_update.len(), &1);
|
||||
assert_eq!(&stats.packets_received_since_startup, &0u64);
|
||||
assert!(&stats.packets_explicitly_dropped_since_startup.is_empty());
|
||||
assert_eq!(&stats.packets_received_since_startup, &0.);
|
||||
assert_eq!(&stats.packets_dropped_since_startup_all, &0.);
|
||||
assert_eq!(REGISTRY.to_string(), "# HELP packets_dropped_since_startup_all packets_dropped_since_startup_all\n# TYPE packets_dropped_since_startup_all counter\npackets_dropped_since_startup_all 0\n# HELP packets_received_since_startup packets_received_since_startup\n# TYPE packets_received_since_startup counter\npackets_received_since_startup 0\n# HELP packets_sent_since_startup_all packets_sent_since_startup_all\n# TYPE packets_sent_since_startup_all counter\npackets_sent_since_startup_all 2\n")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,6 +49,9 @@ pub struct Http {
|
||||
/// Path to assets directory of custom landing page of this node.
|
||||
#[serde(deserialize_with = "de_maybe_path")]
|
||||
pub landing_page_assets_path: Option<PathBuf>,
|
||||
|
||||
#[serde(default)]
|
||||
pub metrics_key: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for Http {
|
||||
@@ -56,6 +59,7 @@ impl Default for Http {
|
||||
Http {
|
||||
bind_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), DEFAULT_HTTP_PORT),
|
||||
landing_page_assets_path: None,
|
||||
metrics_key: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,13 +20,13 @@ echo "Using $localnetdir for the localnet"
|
||||
|
||||
# initialise mixnet
|
||||
echo "initialising mixnode1..."
|
||||
cargo run --release --bin nym-mixnode -- init --id "mix1-$suffix" --host 127.0.0.1 --mix-port 10001 --verloc-port 20001 --http-api-port 30001 --output=json >>"$localnetdir/mix1.json"
|
||||
cargo run --release --bin nym-mixnode -- init --id "mix1-$suffix" --host 127.0.0.1 --mix-port 10001 --verloc-port 20001 --http-api-port 30001 --metrics-key=lala --output=json >>"$localnetdir/mix1.json"
|
||||
|
||||
echo "initialising mixnode2..."
|
||||
cargo run --release --bin nym-mixnode -- init --id "mix2-$suffix" --host 127.0.0.1 --mix-port 10002 --verloc-port 20002 --http-api-port 30002 --output=json >>"$localnetdir/mix2.json"
|
||||
cargo run --release --bin nym-mixnode -- init --id "mix2-$suffix" --host 127.0.0.1 --mix-port 10002 --verloc-port 20002 --http-api-port 30002 --metrics-key=lala --output=json >>"$localnetdir/mix2.json"
|
||||
|
||||
echo "initialising mixnode3..."
|
||||
cargo run --release --bin nym-mixnode -- init --id "mix3-$suffix" --host 127.0.0.1 --mix-port 10003 --verloc-port 20003 --http-api-port 30003 --output=json >>"$localnetdir/mix3.json"
|
||||
cargo run --release --bin nym-mixnode -- init --id "mix3-$suffix" --host 127.0.0.1 --mix-port 10003 --verloc-port 20003 --http-api-port 30003 --metrics-key=lala --output=json >>"$localnetdir/mix3.json"
|
||||
|
||||
echo "initialising gateway..."
|
||||
cargo run --release --bin nym-gateway -- init --id "gateway-$suffix" --host 127.0.0.1 --mix-port 10004 --clients-port 9000 --output=json >>"$localnetdir/gateway.json"
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Check if the script is run as root
|
||||
if [ "$(id -u)" -ne 0 ]; then
|
||||
echo "This script must be run as root. Please use sudo or log in as the root user."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Hardcoded node_exporter version
|
||||
node_exporter_version="1.7.0"
|
||||
|
||||
# Create a user for node_exporter without a home directory
|
||||
useradd --no-create-home --shell /bin/false node_exporter
|
||||
|
||||
# Download node_exporter
|
||||
echo "Downloading node_exporter version $node_exporter_version..."
|
||||
wget "https://github.com/prometheus/node_exporter/releases/download/v$node_exporter_version/node_exporter-$node_exporter_version.linux-amd64.tar.gz" -O /tmp/node_exporter-$node_exporter_version.linux-amd64.tar.gz
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to download node_exporter."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Unarchive node_exporter
|
||||
echo "Unarchiving node_exporter..."
|
||||
tar xvfz /tmp/node_exporter-$node_exporter_version.linux-amd64.tar.gz -C /tmp
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to unarchive node_exporter."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Move node_exporter to /usr/local/bin
|
||||
echo "Moving node_exporter to /usr/local/bin..."
|
||||
mv /tmp/node_exporter-$node_exporter_version.linux-amd64/node_exporter /usr/local/bin/node_exporter
|
||||
if [ $? -ne 0 ]; then
|
||||
echo "Failed to move node_exporter."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Set ownership and permissions
|
||||
chown node_exporter:node_exporter /usr/local/bin/node_exporter
|
||||
chmod 0755 /usr/local/bin/node_exporter
|
||||
|
||||
# Create node_exporter service file
|
||||
echo "Creating node_exporter service file..."
|
||||
cat <<EOF > /etc/systemd/system/node_exporter.service
|
||||
[Unit]
|
||||
Description=Node Exporter
|
||||
Wants=network-online.target
|
||||
After=network-online.target
|
||||
|
||||
[Service]
|
||||
User=node_exporter
|
||||
Group=node_exporter
|
||||
Type=simple
|
||||
ExecStart=/usr/local/bin/node_exporter --web.config.file /etc/prometheus_node_exporter/configuration.yml
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
EOF
|
||||
|
||||
|
||||
mkdir -p /etc/prometheus_node_exporter/
|
||||
|
||||
sudo cat << EOF > /etc/prometheus_node_exporter/configuration.yml
|
||||
basic_auth_users:
|
||||
prometheus: "\$2y\$10\$aB1RMr6ZGg2psbMOezmfluVzGcH/VHIqP4Lksx0DWuw/QSr9Iccwu"
|
||||
|
||||
EOF
|
||||
|
||||
ufw allow 9100
|
||||
|
||||
# Reload systemd, enable and start node_exporter service
|
||||
echo "Configuring systemd for node_exporter..."
|
||||
systemctl daemon-reload
|
||||
systemctl enable node_exporter.service
|
||||
systemctl start node_exporter.service
|
||||
|
||||
# Cleanup
|
||||
echo "Cleaning up..."
|
||||
rm -rf /tmp/node_exporter-${node_exporter_version}.linux-amd64.tar.gz
|
||||
rm -rf /tmp/node_exporter-${node_exporter_version}.linux-amd64
|
||||
|
||||
echo "node_exporter installation and configuration complete."
|
||||
@@ -0,0 +1,63 @@
|
||||
import argparse
|
||||
import os
|
||||
import requests
|
||||
import json
|
||||
from datetime import datetime
|
||||
from collections import namedtuple
|
||||
|
||||
Config = namedtuple("Config", ["port", "outfile", "outlink"])
|
||||
|
||||
|
||||
def config_to_targets(config, mixnodes):
|
||||
prom_targets = [make_prom_target(mixnode, config.port) for mixnode in mixnodes]
|
||||
with open(config.outfile, "w") as f:
|
||||
json.dump(prom_targets, f)
|
||||
|
||||
os.chmod(config.outfile, 0o777)
|
||||
os.rename(config.outfile, config.outlink)
|
||||
os.chmod(config.outlink, 0o777)
|
||||
|
||||
print(f"Prometheus -> {len(prom_targets)} targets written to {config.outlink}")
|
||||
|
||||
|
||||
def make_prom_target(mixnode, port=None):
|
||||
bond_info = mixnode.get("bond_information", {})
|
||||
mix_node = bond_info.get("mix_node", {})
|
||||
host = mix_node.get("host", None)
|
||||
port = port if port else mix_node.get("http_api_port", None)
|
||||
if host is None or port is None:
|
||||
return None
|
||||
|
||||
return {
|
||||
"targets": [f"{host}:{port}"],
|
||||
"labels": {
|
||||
"mix_node_host": host,
|
||||
"identity_key": mix_node.get("identity_key", None),
|
||||
"sphinx_key": mix_node.get("sphinx_key", None),
|
||||
"mix_node_version": mix_node.get("version", None),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Create prometheus targets for rewarded set mixnodes."
|
||||
)
|
||||
parser.add_argument(
|
||||
"apiurl",
|
||||
type=str,
|
||||
help="Nym Api url",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
nym_api = args.apiurl
|
||||
|
||||
targets = [
|
||||
Config(None, "/tmp/temp_targets_mix.json", "/tmp/prom_targets_mix.json"),
|
||||
Config(9100, "/tmp/temp_targets_node.json", "/tmp/prom_targets_node.json"),
|
||||
]
|
||||
|
||||
mixnodes = requests.get(f"{nym_api}/api/v1/mixnodes").json()
|
||||
|
||||
for config in targets:
|
||||
config_to_targets(config, mixnodes)
|
||||
@@ -0,0 +1 @@
|
||||
requests
|
||||
Reference in New Issue
Block a user