Batch SQL writes for packet stats (#5874)
* Move stuff around * Batch SQL operations * Clippy * Bump version * Remove shared queue which was always re-initialized * Make max_concurrent_tasks configurable * fixed typo * clippy --------- Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
This commit is contained in:
Generated
+1
-1
@@ -6682,7 +6682,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-node-status-api"
|
||||
version = "3.1.1"
|
||||
version = "3.1.2"
|
||||
dependencies = [
|
||||
"ammonia",
|
||||
"anyhow",
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
set -eu
|
||||
export ENVIRONMENT=${ENVIRONMENT:-"mainnet"}
|
||||
|
||||
probe_git_ref="nym-vpn-core-v1.10.0"
|
||||
probe_git_ref="nym-vpn-core-v1.4.0"
|
||||
|
||||
crate_root=$(dirname $(realpath "$0"))
|
||||
monorepo_root=$(realpath "${crate_root}/../..")
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-node-status-api"
|
||||
version = "3.1.1"
|
||||
version = "3.1.2"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
@@ -73,6 +73,13 @@ pub(crate) struct Cli {
|
||||
#[arg(value_delimiter = ',')]
|
||||
pub(crate) agent_key_list: Vec<String>,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
default_value_t = 10,
|
||||
env = "NYM_NODE_STATUS_API_PACKET_STATS_MAX_CONCURRENT_TASKS"
|
||||
)]
|
||||
pub(crate) packet_stats_max_concurrent_tasks: usize,
|
||||
|
||||
/// https://github.com/ipinfo/rust
|
||||
#[clap(long, env = "IPINFO_API_TOKEN")]
|
||||
pub(crate) ipinfo_api_token: String,
|
||||
|
||||
@@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
|
||||
use sqlx::FromRow;
|
||||
use std::str::FromStr;
|
||||
use strum_macros::{EnumString, FromRepr};
|
||||
use time::{Date, OffsetDateTime};
|
||||
use time::{Date, OffsetDateTime, UtcDateTime};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
macro_rules! serialize_opt_to_value {
|
||||
@@ -362,7 +362,7 @@ impl TryFrom<GatewaySessionsRecord> for http::models::SessionStats {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(strum_macros::Display)]
|
||||
#[derive(strum_macros::Display, Clone)]
|
||||
pub(crate) enum ScrapeNodeKind {
|
||||
LegacyMixnode { mix_id: i64 },
|
||||
MixingNymNode { node_id: i64 },
|
||||
@@ -520,3 +520,10 @@ pub struct NodeStats {
|
||||
pub packets_sent: i64,
|
||||
pub packets_dropped: i64,
|
||||
}
|
||||
|
||||
pub struct InsertStatsRecord {
|
||||
pub node_kind: ScrapeNodeKind,
|
||||
pub timestamp_utc: UtcDateTime,
|
||||
pub unix_timestamp: i64,
|
||||
pub stats: NodeStats,
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::{
|
||||
DbPool,
|
||||
},
|
||||
http::models::Gateway,
|
||||
mixnet_scraper::helpers::NodeDescriptionResponse,
|
||||
node_scraper::helpers::NodeDescriptionResponse,
|
||||
};
|
||||
use futures_util::TryStreamExt;
|
||||
use sqlx::{pool::PoolConnection, Sqlite};
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::{
|
||||
DbPool,
|
||||
},
|
||||
http::models::{DailyStats, Mixnode},
|
||||
mixnet_scraper::helpers::NodeDescriptionResponse,
|
||||
node_scraper::helpers::NodeDescriptionResponse,
|
||||
};
|
||||
|
||||
pub(crate) async fn update_mixnodes(
|
||||
|
||||
@@ -19,7 +19,7 @@ pub(crate) use nym_nodes::{
|
||||
get_described_node_bond_info, get_node_self_description, update_nym_nodes,
|
||||
};
|
||||
pub(crate) use packet_stats::{
|
||||
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
|
||||
batch_store_packet_stats, get_raw_node_stats, insert_daily_node_stats_uncommitted,
|
||||
};
|
||||
pub(crate) use scraper::{get_nodes_for_scraping, insert_scraped_node_description};
|
||||
pub(crate) use summary::{get_summary, get_summary_history};
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::{
|
||||
models::{NymNodeDto, NymNodeInsertRecord},
|
||||
DbPool,
|
||||
},
|
||||
mixnet_scraper::helpers::NodeDescriptionResponse,
|
||||
node_scraper::helpers::NodeDescriptionResponse,
|
||||
};
|
||||
|
||||
pub(crate) async fn get_all_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<NymNodeDto>> {
|
||||
|
||||
@@ -1,17 +1,70 @@
|
||||
use crate::db::{
|
||||
models::{NodeStats, ScrapeNodeKind, ScraperNodeInfo},
|
||||
DbPool,
|
||||
use crate::{
|
||||
db::{
|
||||
models::{InsertStatsRecord, NodeStats, ScrapeNodeKind},
|
||||
DbPool,
|
||||
},
|
||||
node_scraper::helpers::update_daily_stats_uncommitted,
|
||||
utils::now_utc,
|
||||
};
|
||||
use anyhow::Result;
|
||||
use sqlx::Transaction;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{info, instrument};
|
||||
|
||||
pub(crate) async fn insert_node_packet_stats(
|
||||
#[instrument(level = "info", skip_all)]
|
||||
pub(crate) async fn batch_store_packet_stats(
|
||||
pool: &DbPool,
|
||||
results: Arc<Mutex<Vec<InsertStatsRecord>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let results_iter = results.lock().await;
|
||||
info!(
|
||||
"📊 ⏳ Storing {} packet stats into the DB",
|
||||
results_iter.len()
|
||||
);
|
||||
let started_at = now_utc();
|
||||
|
||||
let mut tx = pool
|
||||
.begin()
|
||||
.await
|
||||
.map_err(|err| anyhow::anyhow!("Failed to begin transaction: {err}"))?;
|
||||
|
||||
for stats_record in &(*results_iter) {
|
||||
insert_node_packet_stats_uncommitted(
|
||||
&mut tx,
|
||||
&stats_record.node_kind,
|
||||
&stats_record.stats,
|
||||
stats_record.unix_timestamp,
|
||||
)
|
||||
.await?;
|
||||
|
||||
update_daily_stats_uncommitted(
|
||||
&mut tx,
|
||||
&stats_record.node_kind,
|
||||
stats_record.timestamp_utc,
|
||||
&stats_record.stats,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
tx.commit()
|
||||
.await
|
||||
.inspect(|_| {
|
||||
let elapsed = now_utc() - started_at;
|
||||
info!(
|
||||
"📊 ☑️ Packet stats successfully committed to DB (took {}s)",
|
||||
elapsed.as_seconds_f32()
|
||||
);
|
||||
})
|
||||
.map_err(|err| anyhow::anyhow!("Failed to commit: {err}"))
|
||||
}
|
||||
|
||||
async fn insert_node_packet_stats_uncommitted(
|
||||
tx: &mut Transaction<'static, sqlx::Sqlite>,
|
||||
node_kind: &ScrapeNodeKind,
|
||||
stats: &NodeStats,
|
||||
timestamp_utc: i64,
|
||||
) -> Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
match node_kind {
|
||||
ScrapeNodeKind::LegacyMixnode { mix_id } => {
|
||||
sqlx::query!(
|
||||
@@ -26,7 +79,7 @@ pub(crate) async fn insert_node_packet_stats(
|
||||
stats.packets_sent,
|
||||
stats.packets_dropped,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.execute(tx.as_mut())
|
||||
.await?;
|
||||
}
|
||||
ScrapeNodeKind::MixingNymNode { node_id }
|
||||
@@ -43,7 +96,7 @@ pub(crate) async fn insert_node_packet_stats(
|
||||
stats.packets_sent,
|
||||
stats.packets_dropped,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.execute(tx.as_mut())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
@@ -52,12 +105,10 @@ pub(crate) async fn insert_node_packet_stats(
|
||||
}
|
||||
|
||||
pub(crate) async fn get_raw_node_stats(
|
||||
pool: &DbPool,
|
||||
node: &ScraperNodeInfo,
|
||||
tx: &mut Transaction<'static, sqlx::Sqlite>,
|
||||
node_kind: &ScrapeNodeKind,
|
||||
) -> Result<Option<NodeStats>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
let packets = match node.node_kind {
|
||||
let packets = match node_kind {
|
||||
// if no packets are found, it's fine to assume 0 because that's also
|
||||
// SQL default value if none provided
|
||||
ScrapeNodeKind::LegacyMixnode { mix_id } => {
|
||||
@@ -75,7 +126,7 @@ pub(crate) async fn get_raw_node_stats(
|
||||
"#,
|
||||
mix_id
|
||||
)
|
||||
.fetch_optional(&mut *conn)
|
||||
.fetch_optional(tx.as_mut())
|
||||
.await?
|
||||
}
|
||||
ScrapeNodeKind::MixingNymNode { node_id }
|
||||
@@ -94,7 +145,7 @@ pub(crate) async fn get_raw_node_stats(
|
||||
"#,
|
||||
node_id
|
||||
)
|
||||
.fetch_optional(&mut *conn)
|
||||
.fetch_optional(tx.as_mut())
|
||||
.await?
|
||||
}
|
||||
};
|
||||
@@ -102,15 +153,13 @@ pub(crate) async fn get_raw_node_stats(
|
||||
Ok(packets)
|
||||
}
|
||||
|
||||
pub(crate) async fn insert_daily_node_stats(
|
||||
pool: &DbPool,
|
||||
node: &ScraperNodeInfo,
|
||||
pub(crate) async fn insert_daily_node_stats_uncommitted(
|
||||
tx: &mut Transaction<'static, sqlx::Sqlite>,
|
||||
node_kind: &ScrapeNodeKind,
|
||||
date_utc: &str,
|
||||
packets: NodeStats,
|
||||
) -> Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
match node.node_kind {
|
||||
match node_kind {
|
||||
ScrapeNodeKind::LegacyMixnode { mix_id } => {
|
||||
let total_stake = sqlx::query_scalar!(
|
||||
r#"
|
||||
@@ -121,7 +170,7 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
"#,
|
||||
mix_id
|
||||
)
|
||||
.fetch_one(&mut *conn)
|
||||
.fetch_one(tx.as_mut())
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
@@ -144,7 +193,7 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
packets.packets_sent,
|
||||
packets.packets_dropped,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.execute(tx.as_mut())
|
||||
.await?;
|
||||
}
|
||||
ScrapeNodeKind::MixingNymNode { node_id }
|
||||
@@ -158,7 +207,7 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
"#,
|
||||
node_id
|
||||
)
|
||||
.fetch_one(&mut *conn)
|
||||
.fetch_one(tx.as_mut())
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
@@ -181,7 +230,7 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
packets.packets_sent,
|
||||
packets.packets_dropped,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.execute(tx.as_mut())
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::{
|
||||
},
|
||||
DbPool,
|
||||
},
|
||||
mixnet_scraper::helpers::NodeDescriptionResponse,
|
||||
node_scraper::helpers::NodeDescriptionResponse,
|
||||
utils::now_utc,
|
||||
};
|
||||
use anyhow::Result;
|
||||
|
||||
@@ -160,11 +160,11 @@ async fn submit_testrun(
|
||||
.map(unix_timestamp_to_utc_rfc3339)
|
||||
.unwrap_or_else(|| String::from("never"));
|
||||
tracing::info!(
|
||||
"✅ Testrun row_id {} for gateway {} complete (last assigned {}, created at {})",
|
||||
gateway_id = gw_identity,
|
||||
last_assigned = last_assigned,
|
||||
created_at = created_at,
|
||||
"✅ Testrun row_id {} for gateway complete",
|
||||
assigned_testrun.id,
|
||||
gw_identity,
|
||||
last_assigned,
|
||||
created_at
|
||||
);
|
||||
|
||||
Ok(StatusCode::CREATED)
|
||||
|
||||
@@ -9,7 +9,7 @@ mod cli;
|
||||
mod db;
|
||||
mod http;
|
||||
mod logging;
|
||||
mod mixnet_scraper;
|
||||
mod metrics_scraper;
|
||||
mod monitor;
|
||||
mod node_scraper;
|
||||
mod testruns;
|
||||
@@ -35,7 +35,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
let db_pool = storage.pool_owned();
|
||||
|
||||
// Start the node scraper
|
||||
let scraper = mixnet_scraper::Scraper::new(storage.pool_owned());
|
||||
let scraper = node_scraper::DescriptionScraper::new(storage.pool_owned());
|
||||
tokio::spawn(async move {
|
||||
scraper.start().await;
|
||||
});
|
||||
let scraper = node_scraper::PacketScraper::new(
|
||||
storage.pool_owned(),
|
||||
args.packet_stats_max_concurrent_tasks,
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
scraper.start().await;
|
||||
});
|
||||
@@ -74,7 +81,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let db_pool_scraper = storage.pool_owned();
|
||||
tokio::spawn(async move {
|
||||
node_scraper::spawn_in_background(db_pool_scraper, args_clone.nym_api_client_timeout).await;
|
||||
metrics_scraper::spawn_in_background(db_pool_scraper, args_clone.nym_api_client_timeout)
|
||||
.await;
|
||||
tracing::info!("Started metrics scraper task");
|
||||
});
|
||||
|
||||
|
||||
@@ -0,0 +1,284 @@
|
||||
use crate::db::{models::GatewaySessionsRecord, queries, DbPool};
|
||||
use error::NodeScraperError;
|
||||
use nym_network_defaults::{NymNetworkDetails, DEFAULT_NYM_NODE_HTTP_PORT};
|
||||
use nym_node_requests::api::{client::NymNodeApiClientExt, v1::metrics::models::SessionStats};
|
||||
use nym_validator_client::{
|
||||
client::{NodeId, NymNodeDetails},
|
||||
models::{DescribedNodeType, NymNodeDescription},
|
||||
NymApiClient,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use nym_statistics_common::types::SessionType;
|
||||
use std::collections::HashMap;
|
||||
use tokio::time::Duration;
|
||||
use tracing::instrument;
|
||||
|
||||
mod error;
|
||||
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
|
||||
const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6);
|
||||
const STALE_DURATION: Duration = Duration::from_secs(86400 * 365); //one year
|
||||
|
||||
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
|
||||
pub(crate) async fn spawn_in_background(db_pool: DbPool, nym_api_client_timeout: Duration) {
|
||||
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
|
||||
|
||||
loop {
|
||||
tracing::info!("Refreshing node self-described metrics...");
|
||||
|
||||
if let Err(e) = run(&db_pool, &network_defaults, nym_api_client_timeout).await {
|
||||
tracing::error!(
|
||||
"Metrics collection failed: {e}, retrying in {}s...",
|
||||
FAILURE_RETRY_DELAY.as_secs()
|
||||
);
|
||||
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Metrics successfully collected, sleeping for {}s...",
|
||||
REFRESH_INTERVAL.as_secs()
|
||||
);
|
||||
tokio::time::sleep(REFRESH_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(
|
||||
pool: &DbPool,
|
||||
network_details: &NymNetworkDetails,
|
||||
nym_api_client_timeout: Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
let default_api_url = network_details
|
||||
.endpoints
|
||||
.first()
|
||||
.expect("rust sdk mainnet default incorrectly configured")
|
||||
.api_url()
|
||||
.clone()
|
||||
.expect("rust sdk mainnet default missing api_url");
|
||||
|
||||
let nym_api = nym_http_api_client::ClientBuilder::new_with_url(default_api_url)
|
||||
.no_hickory_dns()
|
||||
.with_timeout(nym_api_client_timeout)
|
||||
.build::<&str>()?;
|
||||
|
||||
let api_client = NymApiClient::from(nym_api);
|
||||
|
||||
//SW TBC what nodes exactly need to be scraped, the skimmed node endpoint seems to return more nodes
|
||||
let bonded_nodes = api_client.get_all_bonded_nym_nodes().await?;
|
||||
let all_nodes = api_client.get_all_described_nodes().await?; //legacy node that did not upgrade the contract bond yet
|
||||
tracing::debug!("Fetched {} total nodes", all_nodes.len());
|
||||
|
||||
let mut nodes_to_scrape: HashMap<NodeId, MetricsScrapingData> = bonded_nodes
|
||||
.into_iter()
|
||||
.map(|n| (n.node_id(), n.into()))
|
||||
.collect();
|
||||
|
||||
all_nodes
|
||||
.into_iter()
|
||||
.filter(|n| n.contract_node_type != DescribedNodeType::LegacyMixnode)
|
||||
.for_each(|n| {
|
||||
nodes_to_scrape.entry(n.node_id).or_insert_with(|| n.into());
|
||||
});
|
||||
tracing::debug!("Will try to scrape {} nodes", nodes_to_scrape.len());
|
||||
|
||||
let mut session_records = Vec::new();
|
||||
for n in nodes_to_scrape.into_values() {
|
||||
if let Some(stat) = n.try_scrape_metrics().await {
|
||||
session_records.push(prepare_session_data(stat, &n));
|
||||
}
|
||||
}
|
||||
|
||||
queries::insert_session_records(pool, session_records)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Session info written to DB!");
|
||||
})?;
|
||||
let cut_off_date = (OffsetDateTime::now_utc() - STALE_DURATION).date();
|
||||
queries::delete_old_records(pool, cut_off_date)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Cleared old data before {}", cut_off_date);
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MetricsScrapingData {
|
||||
host: String,
|
||||
node_id: NodeId,
|
||||
id_key: String,
|
||||
port: Option<u16>,
|
||||
}
|
||||
|
||||
impl MetricsScrapingData {
|
||||
pub fn new(
|
||||
host: impl Into<String>,
|
||||
node_id: NodeId,
|
||||
id_key: String,
|
||||
port: Option<u16>,
|
||||
) -> Self {
|
||||
MetricsScrapingData {
|
||||
host: host.into(),
|
||||
node_id,
|
||||
id_key,
|
||||
port,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
|
||||
async fn try_scrape_metrics(&self) -> Option<SessionStats> {
|
||||
match self.try_get_client().await {
|
||||
Ok(client) => {
|
||||
match client.get_sessions_metrics().await {
|
||||
Ok(session_stats) => {
|
||||
if session_stats.update_time != OffsetDateTime::UNIX_EPOCH {
|
||||
Some(session_stats)
|
||||
} else {
|
||||
//means no data
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("{e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("{e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_get_client(&self) -> Result<nym_node_requests::api::Client, NodeScraperError> {
|
||||
// first try the standard port in case the operator didn't put the node behind the proxy,
|
||||
// then default https (443)
|
||||
// finally default http (80)
|
||||
let mut addresses_to_try = vec![
|
||||
format!("http://{0}:{DEFAULT_NYM_NODE_HTTP_PORT}", self.host), // 'standard' nym-node
|
||||
format!("https://{0}", self.host), // node behind https proxy (443)
|
||||
format!("http://{0}", self.host), // node behind http proxy (80)
|
||||
];
|
||||
|
||||
// note: I removed 'standard' legacy mixnode port because it should now be automatically pulled via
|
||||
// the 'custom_port' since it should have been present in the contract.
|
||||
|
||||
if let Some(port) = self.port {
|
||||
addresses_to_try.insert(0, format!("http://{0}:{port}", self.host));
|
||||
}
|
||||
|
||||
for address in addresses_to_try {
|
||||
// if provided host was malformed, no point in continuing
|
||||
let client = match nym_node_requests::api::Client::builder(address).and_then(|b| {
|
||||
b.with_timeout(Duration::from_secs(5))
|
||||
.with_user_agent("node-status-api-metrics-scraper")
|
||||
.no_hickory_dns()
|
||||
.build()
|
||||
}) {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
return Err(NodeScraperError::MalformedHost {
|
||||
host: self.host.to_string(),
|
||||
node_id: self.node_id,
|
||||
source: err,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
if let Ok(health) = client.get_health().await {
|
||||
if health.status.is_up() {
|
||||
return Ok(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(NodeScraperError::NoHttpPortsAvailable {
|
||||
host: self.host.to_string(),
|
||||
node_id: self.node_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NymNodeDetails> for MetricsScrapingData {
|
||||
fn from(value: NymNodeDetails) -> Self {
|
||||
MetricsScrapingData::new(
|
||||
value.bond_information.node.host.clone(),
|
||||
value.node_id(),
|
||||
value.bond_information.node.identity_key,
|
||||
value.bond_information.node.custom_http_port,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NymNodeDescription> for MetricsScrapingData {
|
||||
fn from(value: NymNodeDescription) -> Self {
|
||||
MetricsScrapingData::new(
|
||||
value.description.host_information.ip_address[0].to_string(),
|
||||
value.node_id,
|
||||
value.ed25519_identity_key().to_base58_string(),
|
||||
None,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare_session_data(
|
||||
stat: SessionStats,
|
||||
node_data: &MetricsScrapingData,
|
||||
) -> GatewaySessionsRecord {
|
||||
let users_hashes = if !stat.unique_active_users_hashes.is_empty() {
|
||||
Some(serde_json::to_string(&stat.unique_active_users_hashes).unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let vpn_durations = stat
|
||||
.sessions
|
||||
.iter()
|
||||
.filter(|s| SessionType::from_string(&s.typ) == SessionType::Vpn)
|
||||
.map(|s| s.duration_ms)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mixnet_durations = stat
|
||||
.sessions
|
||||
.iter()
|
||||
.filter(|s| SessionType::from_string(&s.typ) == SessionType::Mixnet)
|
||||
.map(|s| s.duration_ms)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let unknown_durations = stat
|
||||
.sessions
|
||||
.iter()
|
||||
.filter(|s| SessionType::from_string(&s.typ) == SessionType::Unknown)
|
||||
.map(|s| s.duration_ms)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let vpn_sessions = if !vpn_durations.is_empty() {
|
||||
Some(serde_json::to_string(&vpn_durations).unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mixnet_sessions = if !mixnet_durations.is_empty() {
|
||||
Some(serde_json::to_string(&mixnet_durations).unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let unknown_sessions = if !unknown_durations.is_empty() {
|
||||
Some(serde_json::to_string(&unknown_durations).unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
GatewaySessionsRecord {
|
||||
gateway_identity_key: node_data.id_key.clone(),
|
||||
node_id: node_data.node_id as i64,
|
||||
day: stat.update_time.date(),
|
||||
unique_active_clients: stat.unique_active_users as i64,
|
||||
session_started: stat.sessions_started as i64,
|
||||
users_hashes,
|
||||
vpn_sessions,
|
||||
mixnet_sessions,
|
||||
unknown_sessions,
|
||||
}
|
||||
}
|
||||
+6
-88
@@ -1,41 +1,36 @@
|
||||
use super::helpers::scrape_and_store_description;
|
||||
use anyhow::Result;
|
||||
use sqlx::SqlitePool;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
pub mod helpers;
|
||||
use anyhow::Result;
|
||||
use helpers::{scrape_and_store_description, scrape_and_store_packet_stats};
|
||||
use sqlx::SqlitePool;
|
||||
use tracing::{debug, error, instrument, warn};
|
||||
|
||||
use crate::db::models::ScraperNodeInfo;
|
||||
use crate::db::queries::get_nodes_for_scraping;
|
||||
|
||||
const DESCRIPTION_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 4);
|
||||
const PACKET_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60);
|
||||
const QUEUE_CHECK_INTERVAL: Duration = Duration::from_millis(250);
|
||||
const MAX_CONCURRENT_TASKS: usize = 5;
|
||||
|
||||
static TASK_COUNTER: AtomicUsize = AtomicUsize::new(0);
|
||||
static TASK_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
pub struct Scraper {
|
||||
pub struct DescriptionScraper {
|
||||
pool: SqlitePool,
|
||||
description_queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
|
||||
packet_queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
|
||||
}
|
||||
|
||||
impl Scraper {
|
||||
impl DescriptionScraper {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self {
|
||||
pool,
|
||||
description_queue: Arc::new(Mutex::new(Vec::new())),
|
||||
packet_queue: Arc::new(Mutex::new(Vec::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&self) {
|
||||
self.spawn_description_scraper().await;
|
||||
self.spawn_packet_scraper().await;
|
||||
}
|
||||
|
||||
async fn spawn_description_scraper(&self) {
|
||||
@@ -53,22 +48,6 @@ impl Scraper {
|
||||
});
|
||||
}
|
||||
|
||||
async fn spawn_packet_scraper(&self) {
|
||||
let pool = self.pool.clone();
|
||||
let queue = self.packet_queue.clone();
|
||||
tracing::info!("Starting packet scraper");
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if let Err(e) = Self::run_packet_scraper(&pool, queue.clone()).await {
|
||||
error!(name: "packet_scraper", "Packet scraper failed: {}", e);
|
||||
}
|
||||
debug!(name: "packet_scraper", "Sleeping for {}s", PACKET_SCRAPE_INTERVAL.as_secs());
|
||||
tokio::time::sleep(PACKET_SCRAPE_INTERVAL).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[instrument(level = "info", name = "description_scraper", skip_all)]
|
||||
async fn run_description_scraper(
|
||||
pool: &SqlitePool,
|
||||
@@ -86,24 +65,6 @@ impl Scraper {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "info", name = "packet_scraper", skip_all)]
|
||||
async fn run_packet_scraper(
|
||||
pool: &SqlitePool,
|
||||
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
|
||||
) -> Result<()> {
|
||||
let nodes = get_nodes_for_scraping(pool).await?;
|
||||
tracing::info!("Querying {} mixing nodes", nodes.len());
|
||||
if let Ok(mut queue_lock) = queue.lock() {
|
||||
queue_lock.extend(nodes);
|
||||
} else {
|
||||
warn!("Failed to acquire packet queue lock");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Self::process_packet_queue(pool, queue).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_description_queue(pool: &SqlitePool, queue: Arc<Mutex<Vec<ScraperNodeInfo>>>) {
|
||||
loop {
|
||||
let running_tasks = TASK_COUNTER.load(Ordering::Relaxed);
|
||||
@@ -147,50 +108,7 @@ impl Scraper {
|
||||
tokio::time::sleep(QUEUE_CHECK_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_packet_queue(pool: &SqlitePool, queue: Arc<Mutex<Vec<ScraperNodeInfo>>>) {
|
||||
loop {
|
||||
let running_tasks = TASK_COUNTER.load(Ordering::Relaxed);
|
||||
|
||||
if running_tasks < MAX_CONCURRENT_TASKS {
|
||||
let node = {
|
||||
if let Ok(mut queue_lock) = queue.lock() {
|
||||
if queue_lock.is_empty() {
|
||||
TASK_ID_COUNTER.store(0, Ordering::Relaxed);
|
||||
break;
|
||||
}
|
||||
queue_lock.remove(0)
|
||||
} else {
|
||||
warn!("Failed to acquire packet queue lock");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
TASK_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let task_id = TASK_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let pool = pool.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
match scrape_and_store_packet_stats(&pool, &node).await {
|
||||
Ok(_) => debug!(
|
||||
"📊 ✅ Packet stats task #{} for node {} complete",
|
||||
task_id,
|
||||
node.node_id()
|
||||
),
|
||||
Err(e) => debug!(
|
||||
"📊 ❌ Packet stats task #{} for {} {} failed: {}",
|
||||
task_id,
|
||||
node.node_kind,
|
||||
node.node_id(),
|
||||
e
|
||||
),
|
||||
}
|
||||
TASK_COUNTER.fetch_sub(1, Ordering::Relaxed);
|
||||
});
|
||||
} else {
|
||||
tokio::time::sleep(QUEUE_CHECK_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
// TODO After all tasks complete, write results to the DB
|
||||
}
|
||||
}
|
||||
+20
-22
@@ -1,8 +1,8 @@
|
||||
use crate::{
|
||||
db::{
|
||||
models::{NodeStats, ScraperNodeInfo},
|
||||
models::{InsertStatsRecord, NodeStats, ScrapeNodeKind, ScraperNodeInfo},
|
||||
queries::{
|
||||
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
|
||||
get_raw_node_stats, insert_daily_node_stats_uncommitted,
|
||||
insert_scraped_node_description,
|
||||
},
|
||||
},
|
||||
@@ -10,9 +10,8 @@ use crate::{
|
||||
};
|
||||
use ammonia::Builder;
|
||||
use anyhow::{anyhow, Result};
|
||||
use reqwest;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::{SqlitePool, Transaction};
|
||||
use std::time::Duration;
|
||||
use time::UtcDateTime;
|
||||
|
||||
@@ -156,10 +155,7 @@ pub async fn scrape_and_store_description(pool: &SqlitePool, node: &ScraperNodeI
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn scrape_and_store_packet_stats(
|
||||
pool: &SqlitePool,
|
||||
node: &ScraperNodeInfo,
|
||||
) -> Result<()> {
|
||||
pub async fn scrape_packet_stats(node: &ScraperNodeInfo) -> Result<InsertStatsRecord> {
|
||||
let client = build_client()?;
|
||||
let urls = node.contact_addresses();
|
||||
|
||||
@@ -187,19 +183,21 @@ pub async fn scrape_and_store_packet_stats(
|
||||
anyhow::anyhow!("Failed to fetch description from any URL: {}", err_msg)
|
||||
})?;
|
||||
|
||||
let timestamp = now_utc();
|
||||
let timestamp_utc = timestamp.unix_timestamp();
|
||||
insert_node_packet_stats(pool, &node.node_kind, &stats, timestamp_utc).await?;
|
||||
let timestamp_utc = now_utc();
|
||||
let unix_timestamp = timestamp_utc.unix_timestamp();
|
||||
let result = InsertStatsRecord {
|
||||
node_kind: node.node_kind.to_owned(),
|
||||
timestamp_utc,
|
||||
unix_timestamp,
|
||||
stats,
|
||||
};
|
||||
|
||||
// Update daily stats
|
||||
update_daily_stats(pool, node, timestamp, &stats).await?;
|
||||
|
||||
Ok(())
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn update_daily_stats(
|
||||
pool: &SqlitePool,
|
||||
node: &ScraperNodeInfo,
|
||||
pub async fn update_daily_stats_uncommitted(
|
||||
tx: &mut Transaction<'static, sqlx::Sqlite>,
|
||||
node_kind: &ScrapeNodeKind,
|
||||
timestamp: UtcDateTime,
|
||||
current_stats: &NodeStats,
|
||||
) -> Result<()> {
|
||||
@@ -211,7 +209,7 @@ pub async fn update_daily_stats(
|
||||
);
|
||||
|
||||
// Get previous stats
|
||||
let previous_stats = get_raw_node_stats(pool, node).await?;
|
||||
let previous_stats = get_raw_node_stats(tx, node_kind).await?;
|
||||
|
||||
let (diff_received, diff_sent, diff_dropped) = if let Some(prev) = previous_stats {
|
||||
(
|
||||
@@ -223,9 +221,9 @@ pub async fn update_daily_stats(
|
||||
(0, 0, 0) // No previous stats available
|
||||
};
|
||||
|
||||
insert_daily_node_stats(
|
||||
pool,
|
||||
node,
|
||||
insert_daily_node_stats_uncommitted(
|
||||
tx,
|
||||
node_kind,
|
||||
&date_utc,
|
||||
NodeStats {
|
||||
packets_received: diff_received,
|
||||
@@ -1,284 +1,6 @@
|
||||
use crate::db::{models::GatewaySessionsRecord, queries, DbPool};
|
||||
use error::NodeScraperError;
|
||||
use nym_network_defaults::{NymNetworkDetails, DEFAULT_NYM_NODE_HTTP_PORT};
|
||||
use nym_node_requests::api::{client::NymNodeApiClientExt, v1::metrics::models::SessionStats};
|
||||
use nym_validator_client::{
|
||||
client::{NodeId, NymNodeDetails},
|
||||
models::{DescribedNodeType, NymNodeDescription},
|
||||
NymApiClient,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
pub(crate) mod description;
|
||||
pub(crate) mod helpers;
|
||||
pub(crate) mod packet_stats;
|
||||
|
||||
use nym_statistics_common::types::SessionType;
|
||||
use std::collections::HashMap;
|
||||
use tokio::time::Duration;
|
||||
use tracing::instrument;
|
||||
|
||||
mod error;
|
||||
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
|
||||
const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6);
|
||||
const STALE_DURATION: Duration = Duration::from_secs(86400 * 365); //one year
|
||||
|
||||
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
|
||||
pub(crate) async fn spawn_in_background(db_pool: DbPool, nym_api_client_timeout: Duration) {
|
||||
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
|
||||
|
||||
loop {
|
||||
tracing::info!("Refreshing node self-described metrics...");
|
||||
|
||||
if let Err(e) = run(&db_pool, &network_defaults, nym_api_client_timeout).await {
|
||||
tracing::error!(
|
||||
"Metrics collection failed: {e}, retrying in {}s...",
|
||||
FAILURE_RETRY_DELAY.as_secs()
|
||||
);
|
||||
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Metrics successfully collected, sleeping for {}s...",
|
||||
REFRESH_INTERVAL.as_secs()
|
||||
);
|
||||
tokio::time::sleep(REFRESH_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(
|
||||
pool: &DbPool,
|
||||
network_details: &NymNetworkDetails,
|
||||
nym_api_client_timeout: Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
let default_api_url = network_details
|
||||
.endpoints
|
||||
.first()
|
||||
.expect("rust sdk mainnet default incorrectly configured")
|
||||
.api_url()
|
||||
.clone()
|
||||
.expect("rust sdk mainnet default missing api_url");
|
||||
|
||||
let nym_api = nym_http_api_client::ClientBuilder::new_with_url(default_api_url)
|
||||
.no_hickory_dns()
|
||||
.with_timeout(nym_api_client_timeout)
|
||||
.build::<&str>()?;
|
||||
|
||||
let api_client = NymApiClient::from(nym_api);
|
||||
|
||||
//SW TBC what nodes exactly need to be scraped, the skimmed node endpoint seems to return more nodes
|
||||
let bonded_nodes = api_client.get_all_bonded_nym_nodes().await?;
|
||||
let all_nodes = api_client.get_all_described_nodes().await?; //legacy node that did not upgrade the contract bond yet
|
||||
tracing::debug!("Fetched {} total nodes", all_nodes.len());
|
||||
|
||||
let mut nodes_to_scrape: HashMap<NodeId, MetricsScrapingData> = bonded_nodes
|
||||
.into_iter()
|
||||
.map(|n| (n.node_id(), n.into()))
|
||||
.collect();
|
||||
|
||||
all_nodes
|
||||
.into_iter()
|
||||
.filter(|n| n.contract_node_type != DescribedNodeType::LegacyMixnode)
|
||||
.for_each(|n| {
|
||||
nodes_to_scrape.entry(n.node_id).or_insert_with(|| n.into());
|
||||
});
|
||||
tracing::debug!("Will try to scrape {} nodes", nodes_to_scrape.len());
|
||||
|
||||
let mut session_records = Vec::new();
|
||||
for n in nodes_to_scrape.into_values() {
|
||||
if let Some(stat) = n.try_scrape_metrics().await {
|
||||
session_records.push(prepare_session_data(stat, &n));
|
||||
}
|
||||
}
|
||||
|
||||
queries::insert_session_records(pool, session_records)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Session info written to DB!");
|
||||
})?;
|
||||
let cut_off_date = (OffsetDateTime::now_utc() - STALE_DURATION).date();
|
||||
queries::delete_old_records(pool, cut_off_date)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Cleared old data before {}", cut_off_date);
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MetricsScrapingData {
|
||||
host: String,
|
||||
node_id: NodeId,
|
||||
id_key: String,
|
||||
port: Option<u16>,
|
||||
}
|
||||
|
||||
impl MetricsScrapingData {
|
||||
pub fn new(
|
||||
host: impl Into<String>,
|
||||
node_id: NodeId,
|
||||
id_key: String,
|
||||
port: Option<u16>,
|
||||
) -> Self {
|
||||
MetricsScrapingData {
|
||||
host: host.into(),
|
||||
node_id,
|
||||
id_key,
|
||||
port,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
|
||||
async fn try_scrape_metrics(&self) -> Option<SessionStats> {
|
||||
match self.try_get_client().await {
|
||||
Ok(client) => {
|
||||
match client.get_sessions_metrics().await {
|
||||
Ok(session_stats) => {
|
||||
if session_stats.update_time != OffsetDateTime::UNIX_EPOCH {
|
||||
Some(session_stats)
|
||||
} else {
|
||||
//means no data
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("{e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("{e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_get_client(&self) -> Result<nym_node_requests::api::Client, NodeScraperError> {
|
||||
// first try the standard port in case the operator didn't put the node behind the proxy,
|
||||
// then default https (443)
|
||||
// finally default http (80)
|
||||
let mut addresses_to_try = vec![
|
||||
format!("http://{0}:{DEFAULT_NYM_NODE_HTTP_PORT}", self.host), // 'standard' nym-node
|
||||
format!("https://{0}", self.host), // node behind https proxy (443)
|
||||
format!("http://{0}", self.host), // node behind http proxy (80)
|
||||
];
|
||||
|
||||
// note: I removed 'standard' legacy mixnode port because it should now be automatically pulled via
|
||||
// the 'custom_port' since it should have been present in the contract.
|
||||
|
||||
if let Some(port) = self.port {
|
||||
addresses_to_try.insert(0, format!("http://{0}:{port}", self.host));
|
||||
}
|
||||
|
||||
for address in addresses_to_try {
|
||||
// if provided host was malformed, no point in continuing
|
||||
let client = match nym_node_requests::api::Client::builder(address).and_then(|b| {
|
||||
b.with_timeout(Duration::from_secs(5))
|
||||
.with_user_agent("node-status-api-metrics-scraper")
|
||||
.no_hickory_dns()
|
||||
.build()
|
||||
}) {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
return Err(NodeScraperError::MalformedHost {
|
||||
host: self.host.to_string(),
|
||||
node_id: self.node_id,
|
||||
source: err,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
if let Ok(health) = client.get_health().await {
|
||||
if health.status.is_up() {
|
||||
return Ok(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(NodeScraperError::NoHttpPortsAvailable {
|
||||
host: self.host.to_string(),
|
||||
node_id: self.node_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NymNodeDetails> for MetricsScrapingData {
|
||||
fn from(value: NymNodeDetails) -> Self {
|
||||
MetricsScrapingData::new(
|
||||
value.bond_information.node.host.clone(),
|
||||
value.node_id(),
|
||||
value.bond_information.node.identity_key,
|
||||
value.bond_information.node.custom_http_port,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NymNodeDescription> for MetricsScrapingData {
|
||||
fn from(value: NymNodeDescription) -> Self {
|
||||
MetricsScrapingData::new(
|
||||
value.description.host_information.ip_address[0].to_string(),
|
||||
value.node_id,
|
||||
value.ed25519_identity_key().to_base58_string(),
|
||||
None,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare_session_data(
|
||||
stat: SessionStats,
|
||||
node_data: &MetricsScrapingData,
|
||||
) -> GatewaySessionsRecord {
|
||||
let users_hashes = if !stat.unique_active_users_hashes.is_empty() {
|
||||
Some(serde_json::to_string(&stat.unique_active_users_hashes).unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let vpn_durations = stat
|
||||
.sessions
|
||||
.iter()
|
||||
.filter(|s| SessionType::from_string(&s.typ) == SessionType::Vpn)
|
||||
.map(|s| s.duration_ms)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mixnet_durations = stat
|
||||
.sessions
|
||||
.iter()
|
||||
.filter(|s| SessionType::from_string(&s.typ) == SessionType::Mixnet)
|
||||
.map(|s| s.duration_ms)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let unkown_durations = stat
|
||||
.sessions
|
||||
.iter()
|
||||
.filter(|s| SessionType::from_string(&s.typ) == SessionType::Unknown)
|
||||
.map(|s| s.duration_ms)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let vpn_sessions = if !vpn_durations.is_empty() {
|
||||
Some(serde_json::to_string(&vpn_durations).unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mixnet_sessions = if !mixnet_durations.is_empty() {
|
||||
Some(serde_json::to_string(&mixnet_durations).unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let unknown_sessions = if !unkown_durations.is_empty() {
|
||||
Some(serde_json::to_string(&unkown_durations).unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
GatewaySessionsRecord {
|
||||
gateway_identity_key: node_data.id_key.clone(),
|
||||
node_id: node_data.node_id as i64,
|
||||
day: stat.update_time.date(),
|
||||
unique_active_clients: stat.unique_active_users as i64,
|
||||
session_started: stat.sessions_started as i64,
|
||||
users_hashes,
|
||||
vpn_sessions,
|
||||
mixnet_sessions,
|
||||
unknown_sessions,
|
||||
}
|
||||
}
|
||||
pub(crate) use description::DescriptionScraper;
|
||||
pub(crate) use packet_stats::PacketScraper;
|
||||
|
||||
@@ -0,0 +1,135 @@
|
||||
use super::helpers::scrape_packet_stats;
|
||||
use sqlx::SqlitePool;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
use crate::db::models::{InsertStatsRecord, ScraperNodeInfo};
|
||||
use crate::db::queries;
|
||||
|
||||
const PACKET_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60);
|
||||
const QUEUE_CHECK_INTERVAL: Duration = Duration::from_millis(250);
|
||||
|
||||
static TASK_COUNTER: AtomicUsize = AtomicUsize::new(0);
|
||||
static TASK_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
pub struct PacketScraper {
|
||||
pool: SqlitePool,
|
||||
max_concurrent_tasks: usize,
|
||||
}
|
||||
|
||||
impl PacketScraper {
|
||||
pub fn new(pool: SqlitePool, max_concurrent_tasks: usize) -> Self {
|
||||
Self {
|
||||
pool,
|
||||
max_concurrent_tasks,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&self) {
|
||||
self.spawn_packet_scraper().await;
|
||||
}
|
||||
|
||||
async fn spawn_packet_scraper(&self) {
|
||||
let pool = self.pool.clone();
|
||||
tracing::info!("Starting packet scraper");
|
||||
let max_concurrent_tasks = self.max_concurrent_tasks;
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if let Err(e) = Self::run_packet_scraper(&pool, max_concurrent_tasks).await {
|
||||
error!(name: "packet_scraper", "Packet scraper failed: {}", e);
|
||||
}
|
||||
debug!(name: "packet_scraper", "Sleeping for {}s", PACKET_SCRAPE_INTERVAL.as_secs());
|
||||
tokio::time::sleep(PACKET_SCRAPE_INTERVAL).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[instrument(level = "info", name = "packet_scraper", skip_all)]
|
||||
async fn run_packet_scraper(
|
||||
pool: &SqlitePool,
|
||||
max_concurrent_tasks: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
let queue = queries::get_nodes_for_scraping(pool).await?;
|
||||
tracing::info!("Adding {} nodes to the queue", queue.len(),);
|
||||
|
||||
let results = Self::process_packet_queue(queue, max_concurrent_tasks).await;
|
||||
queries::batch_store_packet_stats(pool, results)
|
||||
.await
|
||||
.map_err(|err| anyhow::anyhow!("Failed to store packet stats to DB: {err}"))
|
||||
}
|
||||
|
||||
async fn process_packet_queue(
|
||||
queue: Vec<ScraperNodeInfo>,
|
||||
max_concurrent_tasks: usize,
|
||||
) -> Arc<Mutex<Vec<InsertStatsRecord>>> {
|
||||
let mut queue = queue;
|
||||
let results = Arc::new(Mutex::new(Vec::new()));
|
||||
let mut task_set = JoinSet::new();
|
||||
|
||||
loop {
|
||||
let running_tasks = TASK_COUNTER.load(Ordering::Relaxed);
|
||||
|
||||
if running_tasks < max_concurrent_tasks {
|
||||
let node = {
|
||||
if queue.is_empty() {
|
||||
TASK_ID_COUNTER.store(0, Ordering::Relaxed);
|
||||
break;
|
||||
}
|
||||
queue.remove(0)
|
||||
};
|
||||
|
||||
TASK_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let task_id = TASK_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let results_clone = Arc::clone(&results);
|
||||
|
||||
task_set.spawn(async move {
|
||||
match scrape_packet_stats(&node).await {
|
||||
Ok(result) => {
|
||||
// each task contributes their result to a shared vec
|
||||
results_clone.lock().await.push(result);
|
||||
debug!(
|
||||
"📊 ✅ Packet stats task #{} for node {} complete",
|
||||
task_id,
|
||||
node.node_id()
|
||||
)
|
||||
}
|
||||
Err(e) => debug!(
|
||||
"📊 ❌ Packet stats task #{} for {} {} failed: {}",
|
||||
task_id,
|
||||
node.node_kind,
|
||||
node.node_id(),
|
||||
e
|
||||
),
|
||||
}
|
||||
TASK_COUNTER.fetch_sub(1, Ordering::Relaxed);
|
||||
});
|
||||
} else {
|
||||
tokio::time::sleep(QUEUE_CHECK_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
|
||||
// wait for all the tasks to complete before returning their results
|
||||
let total_count = task_set.len();
|
||||
let mut success_count = 0;
|
||||
while let Some(res) = task_set.join_next().await {
|
||||
if let Err(err) = res {
|
||||
warn!("Packet stats task panicked: {err}");
|
||||
} else {
|
||||
success_count += 1;
|
||||
}
|
||||
}
|
||||
let msg = format!("Successfully completed {success_count}/{total_count} tasks ",);
|
||||
if success_count != total_count {
|
||||
warn!(msg);
|
||||
} else {
|
||||
info!(msg);
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user