Shipping raw metrics to PG (#5216)

* Shipping raw metrics to PG

* Put cancel token back in its place

* fmt
This commit is contained in:
Drazen Urch
2024-12-16 16:19:37 +01:00
committed by GitHub
parent 84d7004cb2
commit a3f3d83c1b
6 changed files with 236 additions and 50 deletions
+1
View File
@@ -27,6 +27,7 @@ tokio = { workspace = true, features = ["macros", "time"] }
tokio-util = { workspace = true }
utoipa = { workspace = true, features = ["axum_extras"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
tokio-postgres = "0.7"
# internal
nym-bin-common = { path = "../common/bin-common" }
+3 -2
View File
@@ -9,13 +9,14 @@ network=${NYM_NETWORK:-mainnet}
timeout=${LOCUST_TIMEOUT:-600}
users=${LOCUST_USERS:-10}
processes=${LOCUST_PROCESSES:-4}
_database_url=${DATABASE_URL}
RUST_LOG=info nym-network-monitor --env envs/"${network}".env --private-key "${_private_key}" &
RUST_LOG=info nym-network-monitor --env envs/"${network}".env --private-key "${_private_key}" --database-url "${_database_url}" &
nnm_pid=$!
sleep 10
python -m locust -H http://${NYM_NETWORK_MONITOR_HOST}:${NYM_NETWORK_MONITOR_PORT} --processes "${processes}" --autostart --autoquit 60 -u "${users}" -t "${timeout}"s &
python -m locust -H http://"${NYM_NETWORK_MONITOR_HOST}":"${NYM_NETWORK_MONITOR_PORT}" --processes "${processes}" --autostart --autoquit 60 -u "${users}" -t "${timeout}"s &
locust_pid=$!
wait $locust_pid
+123 -42
View File
@@ -1,7 +1,10 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use anyhow::Result;
use futures::{stream::FuturesUnordered, StreamExt};
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
use log::{debug, info};
use nym_sphinx::chunking::{monitoring, SentFragment};
use nym_topology::{gateway, mix, NymTopology};
@@ -10,6 +13,7 @@ use nym_validator_client::nym_api::routes::{API_VERSION, STATUS, SUBMIT_GATEWAY,
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use serde::{Deserialize, Serialize};
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type, Client};
use utoipa::ToSchema;
use crate::{NYM_API_URL, PRIVATE_KEY, TOPOLOGY};
@@ -23,20 +27,20 @@ struct HydratedRoute {
struct GatewayStats(u32, u32);
impl GatewayStats {
fn new(sent: u32, recv: u32) -> Self {
GatewayStats(sent, recv)
fn new(success: u32, failure: u32) -> Self {
GatewayStats(success, failure)
}
fn success(&self) -> u32 {
self.0
}
fn failed(&self) -> u32 {
fn failure(&self) -> u32 {
self.1
}
fn reliability(&self) -> f64 {
self.success() as f64 / (self.success() + self.failed()) as f64
self.success() as f64 / (self.success() + self.failure()) as f64
}
fn incr_success(&mut self) {
@@ -321,48 +325,125 @@ pub async fn monitor_mixnode_results() -> anyhow::Result<Vec<NodeResult>> {
.collect())
}
pub async fn submit_metrics() -> anyhow::Result<()> {
let node_stats = monitor_mixnode_results().await?;
let gateway_stats = monitor_gateway_results().await?;
async fn submit_node_stats_to_db(client: Arc<Client>) -> anyhow::Result<()> {
let client = Arc::clone(&client);
let node_stats = all_node_stats().await?;
info!("Submitting metrics to {}", *NYM_API_URL);
let client = reqwest::Client::new();
let sink = client
.copy_in("COPY node_stats (node_id, identity, reliability, complete_routes, incomplete_routes) FROM STDIN BINARY")
.await?;
let node_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_NODE}", &*NYM_API_URL);
let gateway_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_GATEWAY}", &*NYM_API_URL);
let writer = BinaryCopyInWriter::new(
sink,
&[Type::INT4, Type::TEXT, Type::FLOAT8, Type::INT8, Type::INT8],
);
pin_mut!(writer);
info!("Submitting {} mixnode measurements", node_stats.len());
for stat in node_stats {
writer
.as_mut()
.write(&[
&(stat.mix_id as i32),
&stat.identity,
&stat.reliability,
&(stat.complete_routes as i64),
&(stat.incomplete_routes as i64),
])
.await?;
}
node_stats
.chunks(10)
.map(|chunk| {
let monitor_message =
MonitorMessage::new(chunk.to_vec(), PRIVATE_KEY.get().expect("We've set this!"));
client.post(&node_submit_url).json(&monitor_message).send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
writer.finish().await?;
info!("Submitting {} gateway measurements", gateway_stats.len());
Ok(())
}
gateway_stats
.chunks(10)
.map(|chunk| {
let monitor_message =
MonitorMessage::new(chunk.to_vec(), PRIVATE_KEY.get().expect("We've set this!"));
client
.post(&gateway_submit_url)
.json(&monitor_message)
.send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
async fn submit_gateway_stats_to_db(client: Arc<Client>) -> anyhow::Result<()> {
let client = Arc::clone(&client);
let network_account = NetworkAccount::finalize()?;
let gateway_stats = network_account.gateway_stats;
let sink = client
.copy_in("COPY gateway_stats (identity, reliability, success, failure) FROM STDIN BINARY")
.await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT, Type::FLOAT8, Type::INT8, Type::INT8]);
pin_mut!(writer);
for (key, stats) in gateway_stats {
writer
.as_mut()
.write(&[
&key,
&stats.reliability(),
&(stats.success() as i64),
&(stats.failure() as i64),
])
.await?;
}
writer.finish().await?;
Ok(())
}
pub async fn submit_metrics_to_db(client: Arc<Client>) -> anyhow::Result<()> {
let client = Arc::clone(&client);
let client2 = Arc::clone(&client);
submit_node_stats_to_db(client).await?;
submit_gateway_stats_to_db(client2).await?;
Ok(())
}
pub async fn submit_metrics(client: Option<Arc<Client>>) -> anyhow::Result<()> {
if let Some(client) = client {
submit_metrics_to_db(client).await?;
}
if let Some(private_key) = PRIVATE_KEY.get() {
let node_stats = monitor_mixnode_results().await?;
let gateway_stats = monitor_gateway_results().await?;
info!("Submitting metrics to {}", *NYM_API_URL);
let client = reqwest::Client::new();
let node_submit_url = format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_NODE}", &*NYM_API_URL);
let gateway_submit_url =
format!("{}/{API_VERSION}/{STATUS}/{SUBMIT_GATEWAY}", &*NYM_API_URL);
info!("Submitting {} mixnode measurements", node_stats.len());
node_stats
.chunks(10)
.map(|chunk| {
let monitor_message = MonitorMessage::new(chunk.to_vec(), private_key);
client.post(&node_submit_url).json(&monitor_message).send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
info!("Submitting {} gateway measurements", gateway_stats.len());
gateway_stats
.chunks(10)
.map(|chunk| {
let monitor_message = MonitorMessage::new(
chunk.to_vec(),
PRIVATE_KEY.get().expect("We've set this!"),
);
client
.post(&gateway_submit_url)
.json(&monitor_message)
.send()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
}
NetworkAccount::empty_buffers();
+27 -6
View File
@@ -2,7 +2,7 @@ use crate::http::HttpServer;
use accounting::submit_metrics;
use anyhow::Result;
use clap::Parser;
use log::{info, warn};
use log::{error, info, warn};
use nym_client_core::ForgetMe;
use nym_crypto::asymmetric::ed25519::PrivateKey;
use nym_network_defaults::setup_env;
@@ -22,6 +22,7 @@ use std::{
};
use tokio::sync::OnceCell;
use tokio::{signal::ctrl_c, sync::RwLock};
use tokio_postgres::NoTls;
use tokio_util::sync::CancellationToken;
static NYM_API_URL: LazyLock<String> = LazyLock::new(|| {
@@ -136,7 +137,10 @@ struct Args {
generate_key_pair: bool,
#[arg(long)]
private_key: String,
private_key: Option<String>,
#[arg(long, env = "DATABASE_URL")]
database_url: Option<String>,
}
fn generate_key_pair() -> Result<()> {
@@ -174,8 +178,10 @@ async fn main() -> Result<()> {
std::process::exit(0);
}
let pk = PrivateKey::from_base58_string(&args.private_key)?;
PRIVATE_KEY.set(pk).ok();
if let Some(private_key) = args.private_key {
let pk = PrivateKey::from_base58_string(&private_key)?;
PRIVATE_KEY.set(pk).ok();
}
TOPOLOGY
.set(if let Some(topology_file) = args.topology {
@@ -203,16 +209,31 @@ async fn main() -> Result<()> {
info!("Waiting for message (ctrl-c to exit)");
let client = if let Some(database_url) = args.database_url {
let (client, connection) = tokio_postgres::connect(&database_url, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Postgres connection error: {}", e);
}
});
Some(Arc::new(client))
} else {
None
};
loop {
let client = client.as_ref().map(Arc::clone);
match tokio::time::timeout(Duration::from_secs(600), ctrl_c()).await {
Ok(_) => {
info!("Received kill signal, shutting down, submitting final batch of metrics");
submit_metrics().await?;
submit_metrics(client).await?;
break;
}
Err(_) => {
info!("Submitting metrics, cleaning metric buffers");
submit_metrics().await?;
submit_metrics(client).await?;
}
};
}