Raw route submissions (#5756)

* Handle PG connection failures

* Readibility nit
This commit is contained in:
Drazen Urch
2025-05-12 17:36:10 +02:00
committed by GitHub
parent a3a234b41b
commit a830881ba5
2 changed files with 107 additions and 30 deletions
+104 -11
View File
@@ -5,7 +5,7 @@ use std::{
use anyhow::Result;
use futures::{pin_mut, stream::FuturesUnordered, StreamExt};
use log::{debug, info};
use log::{debug, error, info};
use nym_sphinx::chunking::{monitoring, SentFragment};
use nym_topology::{NymRouteProvider, RoutingNode};
use nym_types::monitoring::{MonitorMessage, NodeResult};
@@ -13,7 +13,8 @@ use nym_validator_client::nym_api::routes::{API_VERSION, STATUS, SUBMIT_GATEWAY,
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use serde::{Deserialize, Serialize};
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type, Client};
use tokio::task::JoinHandle;
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type, Client, NoTls};
use utoipa::ToSchema;
use crate::{NYM_API_URL, PRIVATE_KEY, TOPOLOGY};
@@ -23,6 +24,39 @@ struct HydratedRoute {
gateway_node: RoutingNode,
}
#[derive(Serialize, Deserialize, Debug, ToSchema)]
struct AccountingRoute {
mix_nodes: (u32, u32, u32),
gateway_node: u32,
success: bool,
}
impl AccountingRoute {
fn from_complete(route: &HydratedRoute) -> Self {
Self {
mix_nodes: (
route.mix_nodes[0].node_id,
route.mix_nodes[1].node_id,
route.mix_nodes[2].node_id,
),
gateway_node: route.gateway_node.node_id,
success: true,
}
}
fn from_incomplete(route: &HydratedRoute) -> Self {
Self {
mix_nodes: (
route.mix_nodes[0].node_id,
route.mix_nodes[1].node_id,
route.mix_nodes[2].node_id,
),
gateway_node: route.gateway_node.node_id,
success: false,
}
}
}
#[derive(Serialize, Deserialize, Debug, Default, ToSchema)]
struct GatewayStats(u32, u32);
@@ -67,6 +101,8 @@ pub struct NetworkAccount {
mix_details: HashMap<u32, RoutingNode>,
#[serde(skip)]
gateway_details: HashMap<String, RoutingNode>,
#[serde(skip)]
accounting_routes: Vec<AccountingRoute>,
}
impl NetworkAccount {
@@ -190,14 +226,18 @@ impl NetworkAccount {
.or_insert(GatewayStats::new(0, 0));
self.gateway_details.insert(
route.gateway_node.identity_key.to_base58_string(),
route.gateway_node,
route.gateway_node.clone(),
);
if self.complete_fragment_sets.contains(fragment_set_id) {
self.complete_routes.push(mix_ids);
gateway_stats_entry.incr_success();
self.accounting_routes
.push(AccountingRoute::from_complete(&route));
} else {
self.incomplete_routes.push(mix_ids);
gateway_stats_entry.incr_failure();
self.accounting_routes
.push(AccountingRoute::from_incomplete(&route));
}
}
}
@@ -384,17 +424,70 @@ async fn submit_gateway_stats_to_db(client: Arc<Client>) -> anyhow::Result<()> {
Ok(())
}
pub async fn submit_metrics_to_db(client: Arc<Client>) -> anyhow::Result<()> {
let client = Arc::clone(&client);
let client2 = Arc::clone(&client);
submit_node_stats_to_db(client).await?;
submit_gateway_stats_to_db(client2).await?;
async fn db_connection(database_url: Option<&String>) -> Result<Option<(Client, JoinHandle<()>)>> {
if let Some(database_url) = database_url {
let (client, connection) = tokio_postgres::connect(database_url, NoTls).await?;
let handle = tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Postgres connection error: {}", e);
}
});
Ok(Some((client, handle)))
} else {
Ok(None)
}
}
pub async fn submit_metrics_to_db(database_url: Option<&String>) -> anyhow::Result<()> {
if let Some((client, handle)) = db_connection(database_url).await? {
let client = Arc::new(client);
let client2 = Arc::clone(&client);
let client3 = Arc::clone(&client);
submit_node_stats_to_db(client).await?;
submit_gateway_stats_to_db(client2).await?;
submit_accounting_routes_to_db(client3).await?;
handle.abort();
}
Ok(())
}
pub async fn submit_metrics(client: Option<Arc<Client>>) -> anyhow::Result<()> {
if let Some(client) = client {
submit_metrics_to_db(client).await?;
async fn submit_accounting_routes_to_db(client: Arc<Client>) -> anyhow::Result<()> {
let client = Arc::clone(&client);
let network_account = NetworkAccount::finalize()?;
let accounting_routes = network_account.accounting_routes;
let sink = client
.copy_in("COPY routes (layer1, layer2, layer3, gw, success) FROM STDIN BINARY")
.await?;
let writer = BinaryCopyInWriter::new(
sink,
&[Type::INT4, Type::INT4, Type::INT4, Type::INT4, Type::BOOL],
);
pin_mut!(writer);
for route in accounting_routes {
writer
.as_mut()
.write(&[
&(route.mix_nodes.0 as i32),
&(route.mix_nodes.1 as i32),
&(route.mix_nodes.2 as i32),
&(route.gateway_node as i32),
&route.success,
])
.await?;
}
writer.finish().await?;
Ok(())
}
pub async fn submit_metrics(database_url: Option<&String>) -> anyhow::Result<()> {
if let Err(e) = submit_metrics_to_db(database_url).await {
error!("Error submitting metrics to db: {}", e);
}
if let Some(private_key) = PRIVATE_KEY.get() {
+3 -19
View File
@@ -2,7 +2,7 @@ use crate::http::HttpServer;
use accounting::submit_metrics;
use anyhow::Result;
use clap::Parser;
use log::{error, info, warn};
use log::{info, warn};
use nym_bin_common::bin_info;
use nym_client_core::config::ForgetMe;
use nym_crypto::asymmetric::ed25519::PrivateKey;
@@ -23,7 +23,6 @@ use std::{
};
use tokio::sync::OnceCell;
use tokio::{signal::ctrl_c, sync::RwLock};
use tokio_postgres::NoTls;
use tokio_util::sync::CancellationToken;
static NYM_API_URL: LazyLock<String> = LazyLock::new(|| {
@@ -229,31 +228,16 @@ async fn main() -> Result<()> {
info!("Waiting for message (ctrl-c to exit)");
let client = if let Some(database_url) = args.database_url {
let (client, connection) = tokio_postgres::connect(&database_url, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("Postgres connection error: {}", e);
}
});
Some(Arc::new(client))
} else {
None
};
loop {
let client = client.as_ref().map(Arc::clone);
match tokio::time::timeout(Duration::from_secs(600), ctrl_c()).await {
Ok(_) => {
info!("Received kill signal, shutting down, submitting final batch of metrics");
submit_metrics(client).await?;
submit_metrics(args.database_url.as_ref()).await?;
break;
}
Err(_) => {
info!("Submitting metrics, cleaning metric buffers");
submit_metrics(client).await?;
submit_metrics(args.database_url.as_ref()).await?;
}
};
}