Compare commits

..

3 Commits

Author SHA1 Message Date
Drazen 5ed7da9bdb Fix orphan ! 2025-02-25 10:42:51 +01:00
Drazen 9bedda8a24 Remove prints 2025-02-25 10:39:20 +01:00
Drazen 7d601782fb Disable acks, friendlier SURB construction 2025-02-21 14:12:55 +01:00
43 changed files with 377 additions and 572 deletions
Generated
+9 -14
View File
@@ -702,7 +702,7 @@ checksum = "33415e24172c1b7d6066f6d999545375ab8e1d95421d6784bdfff9496f292387"
dependencies = [
"bitcoin_hashes",
"rand 0.8.5",
"rand_core 0.6.4",
"rand_core 0.5.1",
"serde",
"unicode-normalization",
"zeroize",
@@ -4190,9 +4190,9 @@ checksum = "9374ef4228402d4b7e403e5838cb880d9ee663314b0a900d5a6aabf0c213552e"
[[package]]
name = "log"
version = "0.4.26"
version = "0.4.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]]
name = "loom"
@@ -5940,7 +5940,6 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"subtle 2.6.1",
"tower 0.5.2",
"tracing",
"utoipa",
@@ -6366,18 +6365,16 @@ dependencies = [
[[package]]
name = "nym-node-status-api"
version = "1.0.2"
version = "1.0.0-rc.8"
dependencies = [
"ammonia",
"anyhow",
"axum 0.7.9",
"bip39",
"chrono",
"clap",
"cosmwasm-std",
"envy",
"futures-util",
"itertools 0.13.0",
"moka",
"nym-bin-common",
"nym-contracts-common",
@@ -6391,8 +6388,6 @@ dependencies = [
"nym-statistics-common",
"nym-task",
"nym-validator-client",
"rand 0.8.5",
"rand_chacha 0.3.1",
"regex",
"reqwest 0.12.4",
"serde",
@@ -6783,6 +6778,7 @@ dependencies = [
"rand 0.8.5",
"rand_chacha 0.3.1",
"serde",
"sphinx-packet",
"thiserror 2.0.11",
"wasm-bindgen",
]
@@ -6837,6 +6833,7 @@ name = "nym-sphinx-framing"
version = "0.1.0"
dependencies = [
"bytes",
"cfg-if",
"log",
"nym-metrics",
"nym-sphinx-acknowledgements",
@@ -7263,7 +7260,6 @@ dependencies = [
"axum 0.7.9",
"chrono",
"clap",
"lazy_static",
"nym-bin-common",
"nym-config",
"nym-network-defaults",
@@ -7271,7 +7267,6 @@ dependencies = [
"nym-task",
"nym-validator-client",
"nyxd-scraper",
"regex",
"reqwest 0.12.4",
"rocket",
"schemars",
@@ -8216,7 +8211,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf2890aaef0aa82719a50e808de264f9484b74b442e1a3a0e5ee38243ac40bdb"
dependencies = [
"rand_core 0.6.4",
"rand_core 0.5.1",
]
[[package]]
@@ -9974,9 +9969,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tar"
version = "0.4.44"
version = "0.4.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a"
checksum = "c65998313f8e17d0d553d28f91a0df93e4dbbbf770279c7bc21ca0f09ea1a1f6"
dependencies = [
"filetime",
"libc",
+1 -1
View File
@@ -330,7 +330,7 @@ subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.33.0"
tap = "1.0.1"
tar = "0.4.44"
tar = "0.4.43"
tempfile = "3.15"
thiserror = "2.0"
time = "0.3.37"
@@ -204,15 +204,15 @@ impl<C, St> GatewayClient<C, St> {
"Attemting to establish connection to gateway at: {}",
self.gateway_address
);
let (ws_stream, _) = connect_async(
&self.gateway_address,
#[cfg(unix)]
self.connection_fd_callback.clone(),
)
.await?;
let (ws_stream, _) = connect_async(&self.gateway_address).await?;
self.connection = SocketState::Available(Box::new(ws_stream));
#[cfg(unix)]
if let (Some(callback), Some(fd)) = (self.connection_fd_callback.as_ref(), self.ws_fd()) {
callback.as_ref()(fd);
}
Ok(())
}
@@ -1,11 +1,6 @@
use crate::error::GatewayClientError;
use nym_http_api_client::HickoryDnsResolver;
#[cfg(unix)]
use std::{
os::fd::{AsRawFd, RawFd},
sync::Arc,
};
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::handshake::client::Response;
@@ -16,10 +11,7 @@ use std::net::SocketAddr;
#[cfg(not(target_arch = "wasm32"))]
pub(crate) async fn connect_async(
endpoint: &str,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
use tokio::net::TcpSocket;
let resolver = HickoryDnsResolver::default();
let uri =
Url::parse(endpoint).map_err(|_| GatewayClientError::InvalidUrl(endpoint.to_owned()))?;
@@ -45,41 +37,14 @@ pub(crate) async fn connect_async(
}
};
let mut stream = Err(GatewayClientError::NoEndpointForConnection {
address: endpoint.to_owned(),
});
for sock_addr in sock_addrs {
let socket = if sock_addr.is_ipv4() {
TcpSocket::new_v4()
} else {
TcpSocket::new_v6()
}
.map_err(|err| GatewayClientError::NetworkConnectionFailed {
let stream = TcpStream::connect(&sock_addrs[..]).await.map_err(|error| {
GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: err.into(),
})?;
#[cfg(unix)]
if let Some(callback) = connection_fd_callback.as_ref() {
callback.as_ref()(socket.as_raw_fd());
source: error.into(),
}
})?;
match socket.connect(sock_addr).await {
Ok(s) => {
stream = Ok(s);
break;
}
Err(err) => {
stream = Err(GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
source: err.into(),
});
continue;
}
}
}
tokio_tungstenite::client_async_tls(endpoint, stream?)
tokio_tungstenite::client_async_tls(endpoint, stream)
.await
.map_err(|error| GatewayClientError::NetworkConnectionFailed {
address: endpoint.to_owned(),
@@ -43,9 +43,6 @@ pub enum GatewayClientError {
#[error("connection failed: {address}: {source}")]
NetworkConnectionFailed { address: String, source: WsError },
#[error("no socket address for endpoint: {address}")]
NoEndpointForConnection { address: String },
#[error("Invalid URL: {0}")]
InvalidUrl(String),
-1
View File
@@ -20,7 +20,6 @@ mime = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
serde_yaml = { workspace = true }
subtle.workspace = true
tower = { workspace = true }
tracing.workspace = true
utoipa = { workspace = true, optional = true }
@@ -7,7 +7,6 @@ use axum::{extract::Request, response::Response};
use futures::future::BoxFuture;
use std::sync::Arc;
use std::task::{Context, Poll};
use subtle::ConstantTimeEq;
use tower::{Layer, Service};
use tracing::{debug, instrument, trace};
use zeroize::Zeroizing;
@@ -77,7 +76,7 @@ impl<S> RequireAuth<S> {
return Err("`Authorization` header must contain non-empty `Bearer` token");
}
if bool::from(self.bearer_token.as_bytes().ct_ne(bearer_token.as_bytes())) {
if self.bearer_token.as_str() != bearer_token {
return Err("`Authorization` header does not contain the correct `Bearer` token");
}
@@ -12,6 +12,7 @@ rand = { workspace = true }
bs58 = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
sphinx-packet = { workspace = true }
nym-crypto = { path = "../../crypto", features = ["stream_cipher", "rand"] }
nym-sphinx-addressing = { path = "../addressing" }
@@ -7,11 +7,12 @@ use nym_sphinx_addressing::clients::Recipient;
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, MAX_NODE_ADDRESS_UNPADDED_LEN};
use nym_sphinx_params::packet_sizes::PacketSize;
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
use nym_sphinx_types::{NymPacket, SURBMaterial, SphinxError, SURB};
use nym_sphinx_types::{Destination, NymPacket, SURBMaterial, SphinxError, SURB};
use nym_topology::{NymRouteProvider, NymTopologyError};
use rand::{CryptoRng, RngCore};
use serde::de::{Error as SerdeError, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use sphinx_packet::route::Node;
use std::fmt::{self, Formatter};
use std::time;
@@ -83,6 +84,25 @@ impl ReplySurb {
packet_size.plaintext_size() - ack_overhead - ReplySurbKeyDigestAlgorithm::output_size() - 1
}
pub fn construct_with_route<R>(
rng: &mut R,
destination: Destination,
average_delay: time::Duration,
route: &[Node],
) -> Result<Self, NymTopologyError>
where
R: RngCore + CryptoRng,
{
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
let surb_material = SURBMaterial::new(route.to_vec(), delays, destination);
// this can't fail as we know we have a valid route to gateway and have correct number of delays
Ok(ReplySurb {
surb: surb_material.construct_SURB().unwrap(),
encryption_key: SurbEncryptionKey::new(rng),
})
}
// TODO: should this return `ReplySURBError` for consistency sake
// or keep `NymTopologyError` because it's the only error it can actually return?
pub fn construct<R>(
@@ -123,11 +143,15 @@ impl ReplySurb {
pub fn to_bytes(&self) -> Vec<u8> {
// KEY || SURB_BYTES
self.encryption_key
let bytes: Vec<u8> = self.encryption_key
.to_bytes()
.into_iter()
.chain(self.surb.to_bytes())
.collect()
.collect();
assert_eq!(bytes.len(), ReplySurb::serialized_len());
bytes
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, ReplySurbError> {
@@ -170,6 +170,7 @@ impl RepliableMessage {
}
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, InvalidReplyRequestError> {
// println!("Trying to deserialize message: {} bytes", bytes.len());
if bytes.len() < SENDER_TAG_SIZE + 1 {
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
}
+4
View File
@@ -9,6 +9,7 @@ repository = { workspace = true }
[dependencies]
bytes = { workspace = true }
cfg-if = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
thiserror = { workspace = true }
log = { workspace = true }
@@ -20,5 +21,8 @@ nym-metrics = { path = "../../nym-metrics" }
nym-sphinx-addressing = { path = "../addressing" }
nym-sphinx-acknowledgements = { path = "../acknowledgements" }
[features]
no-acks = []
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
+10 -4
View File
@@ -230,8 +230,12 @@ fn split_into_ack_and_message(
| PacketSize::ExtendedPacket32
| PacketSize::OutfoxRegularPacket => {
trace!("received a normal packet!");
let (ack_data, message) = split_hop_data_into_ack_and_message(data, packet_type)?;
let (ack_first_hop, ack_packet) =
cfg_if::cfg_if! {
if #[cfg(feature = "no-acks")] {
return Ok((None, data));
} else {
let (ack_data, message) = split_hop_data_into_ack_and_message(data, packet_type)?;
let (ack_first_hop, ack_packet) =
match SurbAck::try_recover_first_hop_packet(&ack_data, packet_type) {
Ok((first_hop, packet)) => (first_hop, packet),
Err(err) => {
@@ -239,8 +243,10 @@ fn split_into_ack_and_message(
return Err(err.into());
}
};
let forward_ack = MixPacket::new(ack_first_hop, ack_packet, packet_type);
Ok((Some(forward_ack), message))
let forward_ack = MixPacket::new(ack_first_hop, ack_packet, packet_type);
Ok((Some(forward_ack), message))
}
}
}
}
}
@@ -3,7 +3,7 @@
[package]
name = "nym-node-status-api"
version = "1.0.2"
version = "1.0.0-rc.8"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -16,13 +16,11 @@ rust-version.workspace = true
ammonia = { workspace = true }
anyhow = { workspace = true }
axum = { workspace = true, features = ["tokio", "macros"] }
bip39 = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
cosmwasm-std = { workspace = true }
envy = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
moka = { workspace = true, features = ["future"] }
nym-contracts-common = { path = "../../common/cosmwasm-smart-contracts/contracts-common" }
nym-bin-common = { path = "../../common/bin-common", features = ["models"] }
@@ -35,8 +33,6 @@ nym-statistics-common = { path = "../../common/statistics" }
nym-validator-client = { path = "../../common/client-libs/validator-client" }
nym-task = { path = "../../common/task" }
nym-node-requests = { path = "../../nym-node/nym-node-requests", features = ["openapi"] }
rand = { workspace = true }
rand_chacha = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
@@ -83,6 +83,9 @@ pub(crate) struct Cli {
env = "NYM_NODE_STATUS_API_MAX_AGENT_COUNT"
)]
pub(crate) max_agent_count: i64,
#[clap(long, default_value = "", env = "NYM_NODE_STATUS_API_HM_URL")]
pub(crate) hm_url: String,
}
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
@@ -2,7 +2,7 @@ use std::str::FromStr;
use crate::{
http::{self, models::SummaryHistory},
utils::NumericalCheckedCast,
monitor::NumericalCheckedCast,
};
use anyhow::Context;
use nym_contracts_common::Percent;
@@ -16,7 +16,7 @@ use strum_macros::{EnumString, FromRepr};
use time::{Date, OffsetDateTime};
use utoipa::ToSchema;
pub(crate) struct GatewayInsertRecord {
pub(crate) struct GatewayRecord {
pub(crate) identity_key: String,
pub(crate) bonded: bool,
pub(crate) self_described: String,
@@ -360,24 +360,14 @@ impl TryFrom<GatewaySessionsRecord> for http::models::SessionStats {
}
}
pub(crate) enum ScrapeNodeKind {
LegacyMixnode { mix_id: i64 },
MixingNymNode { node_id: i64 },
EntryExitNymNode { node_id: i64, identity_key: String },
}
impl ScrapeNodeKind {
pub(crate) fn node_id(&self) -> &i64 {
match self {
ScrapeNodeKind::LegacyMixnode { mix_id } => mix_id,
ScrapeNodeKind::MixingNymNode { node_id } => node_id,
ScrapeNodeKind::EntryExitNymNode { node_id, .. } => node_id,
}
}
pub(crate) enum MixingNodeKind {
LegacyMixnode,
NymNode,
}
pub(crate) struct ScraperNodeInfo {
pub node_kind: ScrapeNodeKind,
pub node_id: i64,
pub node_kind: MixingNodeKind,
pub hosts: Vec<String>,
pub http_api_port: i64,
}
@@ -400,10 +390,6 @@ impl ScraperNodeInfo {
urls
}
pub(crate) fn node_id(&self) -> &i64 {
self.node_kind.node_id()
}
}
#[derive(sqlx::Decode, Debug)]
@@ -1,8 +1,6 @@
use std::collections::HashSet;
use crate::{
db::{
models::{GatewayDto, GatewayInsertRecord},
models::{GatewayDto, GatewayRecord},
DbPool,
},
http::models::Gateway,
@@ -32,7 +30,7 @@ pub(crate) async fn select_gateway_identity(
pub(crate) async fn insert_gateways(
pool: &DbPool,
gateways: Vec<GatewayInsertRecord>,
gateways: Vec<GatewayRecord>,
) -> anyhow::Result<()> {
let mut db = pool.acquire().await?;
for record in gateways {
@@ -100,21 +98,3 @@ pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gatewa
tracing::trace!("Fetched {} gateways from DB", items.len());
Ok(items)
}
pub(crate) async fn get_all_gateway_id_keys(pool: &DbPool) -> anyhow::Result<HashSet<String>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query!(
r#"
SELECT gateway_identity_key
FROM gateways
WHERE bonded = true
"#
)
.fetch_all(&mut *conn)
.await?
.into_iter()
.map(|record| record.gateway_identity_key)
.collect::<HashSet<_>>();
Ok(items)
}
@@ -1,5 +1,3 @@
use std::collections::HashSet;
use futures_util::TryStreamExt;
use tracing::error;
@@ -85,7 +83,8 @@ pub(crate) async fn get_all_mixnodes(pool: &DbPool) -> anyhow::Result<Vec<Mixnod
Ok(items)
}
pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailyStats>> {
/// `offset` = slides our fixed-day period further into the past by N days
pub(crate) async fn get_daily_stats(pool: &DbPool, offset: i64) -> anyhow::Result<Vec<DailyStats>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
DailyStats,
@@ -116,8 +115,11 @@ pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailySt
WHERE nym_node_daily_mixing_stats.node_id IS NULL
)
GROUP BY date_utc
ORDER BY date_utc ASC
ORDER BY date_utc DESC
LIMIT 30
OFFSET ?
"#,
offset
)
.fetch(&mut *conn)
.try_collect::<Vec<DailyStats>>()
@@ -125,21 +127,3 @@ pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailySt
Ok(items)
}
pub(crate) async fn get_all_mix_ids(pool: &DbPool) -> anyhow::Result<HashSet<i64>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query!(
r#"
SELECT mix_id
FROM mixnodes
WHERE bonded = true
"#
)
.fetch_all(&mut *conn)
.await?
.into_iter()
.map(|record| record.mix_id)
.collect::<HashSet<_>>();
Ok(items)
}
@@ -8,15 +8,13 @@ pub(crate) mod scraper;
mod summary;
pub(crate) mod testruns;
pub(crate) use gateways::{
get_all_gateway_id_keys, get_all_gateways, insert_gateways, select_gateway_identity,
};
pub(crate) use gateways::{get_all_gateways, insert_gateways, select_gateway_identity};
pub(crate) use gateways_stats::{delete_old_records, get_sessions_stats, insert_session_records};
pub(crate) use misc::insert_summaries;
pub(crate) use mixnodes::{get_all_mix_ids, get_all_mixnodes, get_daily_stats, insert_mixnodes};
pub(crate) use mixnodes::{get_all_mixnodes, get_daily_stats, insert_mixnodes};
pub(crate) use nym_nodes::{get_nym_nodes, insert_nym_nodes};
pub(crate) use packet_stats::{
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
};
pub(crate) use scraper::{get_nodes_for_scraping, insert_scraped_node_description};
pub(crate) use scraper::{get_mixing_nodes_for_scraping, insert_scraped_node_description};
pub(crate) use summary::{get_summary, get_summary_history};
@@ -1,6 +1,5 @@
use std::collections::HashMap;
use anyhow::Context;
use futures_util::TryStreamExt;
use nym_validator_client::{client::NymNodeDetails, nym_api::SkimmedNode};
use tracing::instrument;
@@ -10,7 +9,7 @@ use crate::{
models::{NymNodeDto, NymNodeInsertRecord},
DbPool,
},
utils::decimal_to_i64,
monitor::decimal_to_i64,
};
pub(crate) async fn get_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<SkimmedNode>> {
@@ -101,8 +100,7 @@ pub(crate) async fn insert_nym_nodes(
record.last_updated_utc,
)
.execute(&mut *conn)
.await
.with_context(|| format!("node_id={}", record.node_id))?;
.await?;
}
Ok(())
@@ -1,26 +1,27 @@
use crate::db::{
models::{NodeStats, ScrapeNodeKind, ScraperNodeInfo},
models::{MixingNodeKind, NodeStats, ScraperNodeInfo},
DbPool,
};
use anyhow::Result;
pub(crate) async fn insert_node_packet_stats(
pool: &DbPool,
node_kind: &ScrapeNodeKind,
node_id: i64,
node_kind: &MixingNodeKind,
stats: &NodeStats,
timestamp_utc: i64,
) -> Result<()> {
let mut conn = pool.acquire().await?;
match node_kind {
ScrapeNodeKind::LegacyMixnode { mix_id } => {
MixingNodeKind::LegacyMixnode => {
sqlx::query!(
r#"
INSERT INTO mixnode_packet_stats_raw (
mix_id, timestamp_utc, packets_received, packets_sent, packets_dropped
) VALUES (?, ?, ?, ?, ?)
"#,
mix_id,
node_id,
timestamp_utc,
stats.packets_received,
stats.packets_sent,
@@ -29,8 +30,7 @@ pub(crate) async fn insert_node_packet_stats(
.execute(&mut *conn)
.await?;
}
ScrapeNodeKind::MixingNymNode { node_id }
| ScrapeNodeKind::EntryExitNymNode { node_id, .. } => {
MixingNodeKind::NymNode => {
sqlx::query!(
r#"
INSERT INTO nym_nodes_packet_stats_raw (
@@ -60,7 +60,7 @@ pub(crate) async fn get_raw_node_stats(
let packets = match node.node_kind {
// if no packets are found, it's fine to assume 0 because that's also
// SQL default value if none provided
ScrapeNodeKind::LegacyMixnode { mix_id } => {
MixingNodeKind::LegacyMixnode => {
sqlx::query_as!(
NodeStats,
r#"
@@ -73,13 +73,12 @@ pub(crate) async fn get_raw_node_stats(
ORDER BY timestamp_utc DESC
LIMIT 1 OFFSET 1
"#,
mix_id
node.node_id
)
.fetch_optional(&mut *conn)
.await?
}
ScrapeNodeKind::MixingNymNode { node_id }
| ScrapeNodeKind::EntryExitNymNode { node_id, .. } => {
MixingNodeKind::NymNode => {
sqlx::query_as!(
NodeStats,
r#"
@@ -92,7 +91,7 @@ pub(crate) async fn get_raw_node_stats(
ORDER BY timestamp_utc DESC
LIMIT 1 OFFSET 1
"#,
node_id
node.node_id
)
.fetch_optional(&mut *conn)
.await?
@@ -111,7 +110,7 @@ pub(crate) async fn insert_daily_node_stats(
let mut conn = pool.acquire().await?;
match node.node_kind {
ScrapeNodeKind::LegacyMixnode { mix_id } => {
MixingNodeKind::LegacyMixnode => {
let total_stake = sqlx::query_scalar!(
r#"
SELECT
@@ -119,7 +118,7 @@ pub(crate) async fn insert_daily_node_stats(
FROM mixnodes
WHERE mix_id = ?
"#,
mix_id
node.node_id
)
.fetch_one(&mut *conn)
.await?;
@@ -137,7 +136,7 @@ pub(crate) async fn insert_daily_node_stats(
packets_sent = mixnode_daily_stats.packets_sent + excluded.packets_sent,
packets_dropped = mixnode_daily_stats.packets_dropped + excluded.packets_dropped
"#,
mix_id,
node.node_id,
date_utc,
total_stake,
packets.packets_received,
@@ -147,8 +146,7 @@ pub(crate) async fn insert_daily_node_stats(
.execute(&mut *conn)
.await?;
}
ScrapeNodeKind::MixingNymNode { node_id }
| ScrapeNodeKind::EntryExitNymNode { node_id, .. } => {
MixingNodeKind::NymNode => {
let total_stake = sqlx::query_scalar!(
r#"
SELECT
@@ -156,7 +154,7 @@ pub(crate) async fn insert_daily_node_stats(
FROM nym_nodes
WHERE node_id = ?
"#,
node_id
node.node_id
)
.fetch_one(&mut *conn)
.await?;
@@ -169,12 +167,12 @@ pub(crate) async fn insert_daily_node_stats(
packets_sent, packets_dropped
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(node_id, date_utc) DO UPDATE SET
total_stake = excluded.total_stake,
total_stake = nym_node_daily_mixing_stats.total_stake + excluded.total_stake,
packets_received = nym_node_daily_mixing_stats.packets_received + excluded.packets_received,
packets_sent = nym_node_daily_mixing_stats.packets_sent + excluded.packets_sent,
packets_dropped = nym_node_daily_mixing_stats.packets_dropped + excluded.packets_dropped
"#,
node_id,
node.node_id,
date_utc,
total_stake,
packets.packets_received,
@@ -1,6 +1,6 @@
use crate::{
db::{
models::{ScrapeNodeKind, ScraperNodeInfo},
models::{MixingNodeKind, ScraperNodeInfo},
queries, DbPool,
},
mixnet_scraper::helpers::NodeDescriptionResponse,
@@ -8,36 +8,16 @@ use crate::{
use anyhow::Result;
use chrono::Utc;
pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperNodeInfo>> {
pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperNodeInfo>> {
let mut nodes_to_scrape = Vec::new();
let mixnode_ids = queries::get_all_mix_ids(pool).await?;
let gateway_keys = queries::get_all_gateway_id_keys(pool).await?;
let mut entry_exit_nodes = 0;
queries::get_nym_nodes(pool)
.await?
.into_iter()
.for_each(|node| {
// due to polyfilling, Nym nodes table might contain legacy mixnodes
// as well. Mark them as such here.
let node_kind = if mixnode_ids.contains(&node.node_id.into()) {
ScrapeNodeKind::LegacyMixnode {
mix_id: node.node_id.into(),
}
} else if gateway_keys.contains(&node.ed25519_identity_pubkey.to_base58_string()) {
entry_exit_nodes += 1;
ScrapeNodeKind::EntryExitNymNode {
node_id: node.node_id.into(),
identity_key: node.ed25519_identity_pubkey.to_base58_string(),
}
} else {
ScrapeNodeKind::MixingNymNode {
node_id: node.node_id.into(),
}
};
nodes_to_scrape.push(ScraperNodeInfo {
node_kind,
node_id: node.node_id.into(),
node_kind: MixingNodeKind::NymNode,
hosts: node
.ip_addresses
.into_iter()
@@ -47,8 +27,7 @@ pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperN
})
});
tracing::debug!("Fetched {} 🌟 total nym nodes", nodes_to_scrape.len());
tracing::debug!("Fetched {} 🚪 entry/exit nodes", entry_exit_nodes);
tracing::debug!("Fetched {} 🌟 nym nodes", nodes_to_scrape.len());
let mut conn = pool.acquire().await?;
let mixnodes = sqlx::query!(
@@ -62,7 +41,7 @@ pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperN
.await?;
drop(conn);
tracing::debug!("Fetched {} 🦖 mixnodes", mixnodes.len());
tracing::debug!("Fetched {} 🦖 mixnodes", nodes_to_scrape.len());
let mut duplicates = 0;
let mut legacy_not_in_nym_node_list = 0;
@@ -70,22 +49,26 @@ pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperN
for mixnode in mixnodes {
if nodes_to_scrape
.iter()
.all(|node| node.node_id() != &mixnode.node_id)
.all(|node| node.node_id != mixnode.node_id)
{
// in case polyfilling on Nym API gets removed, this part ensures
// mixnodes are added to the final list of nodes to scrape
nodes_to_scrape.push(ScraperNodeInfo {
node_kind: ScrapeNodeKind::LegacyMixnode {
mix_id: mixnode.node_id,
},
hosts: vec![mixnode.host],
http_api_port: mixnode.http_api_port,
});
legacy_not_in_nym_node_list += 1;
} else {
duplicates += 1;
}
// technically, mixnodes shouldn't be in nym_nodes table, but it's
// possible due to polyfilling on Nym API
if nodes_to_scrape
.iter()
.all(|node| node.node_id != mixnode.node_id)
{
nodes_to_scrape.push(ScraperNodeInfo {
node_id: mixnode.node_id,
node_kind: MixingNodeKind::LegacyMixnode,
hosts: vec![mixnode.host],
http_api_port: mixnode.http_api_port,
})
}
}
tracing::debug!(
"{}/{} legacy mixnodes already included in nym_node list",
@@ -102,16 +85,19 @@ pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperN
Ok(nodes_to_scrape)
}
// TODO: add stuff for gateways
pub(crate) async fn insert_scraped_node_description(
pool: &DbPool,
node_kind: &ScrapeNodeKind,
node_kind: &MixingNodeKind,
node_id: i64,
description: &NodeDescriptionResponse,
) -> Result<()> {
let timestamp = Utc::now().timestamp();
let mut conn = pool.acquire().await?;
match node_kind {
ScrapeNodeKind::LegacyMixnode { mix_id } => {
MixingNodeKind::LegacyMixnode => {
sqlx::query!(
r#"
INSERT INTO mixnode_description (
@@ -124,7 +110,7 @@ pub(crate) async fn insert_scraped_node_description(
details = excluded.details,
last_updated_utc = excluded.last_updated_utc
"#,
mix_id,
node_id,
description.moniker,
description.website,
description.security_contact,
@@ -134,7 +120,7 @@ pub(crate) async fn insert_scraped_node_description(
.execute(&mut *conn)
.await?;
}
ScrapeNodeKind::MixingNymNode { node_id } => {
MixingNodeKind::NymNode => {
sqlx::query!(
r#"
INSERT INTO nym_node_descriptions (
@@ -157,34 +143,6 @@ pub(crate) async fn insert_scraped_node_description(
.execute(&mut *conn)
.await?;
}
ScrapeNodeKind::EntryExitNymNode { identity_key, .. } => {
sqlx::query!(
r#"
INSERT INTO gateway_description (
gateway_identity_key,
moniker,
website,
security_contact,
details,
last_updated_utc
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (gateway_identity_key) DO UPDATE SET
moniker = excluded.moniker,
website = excluded.website,
security_contact = excluded.security_contact,
details = excluded.details,
last_updated_utc = excluded.last_updated_utc
"#,
identity_key,
description.moniker,
description.website,
description.security_contact,
description.details,
timestamp,
)
.execute(&mut *conn)
.await?;
}
}
Ok(())
@@ -99,10 +99,7 @@ async fn get_stats(
Query(MixStatsQueryParams { offset }): Query<MixStatsQueryParams>,
State(state): State<AppState>,
) -> HttpResult<Json<Vec<DailyStats>>> {
let offset: usize = offset
.unwrap_or(0)
.try_into()
.map_err(|_| HttpError::invalid_input("Offset must be non-negative"))?;
let offset = offset.unwrap_or(0);
let last_30_days = state
.cache()
.get_mixnode_stats(state.db_pool(), offset)
@@ -17,10 +17,18 @@ pub(crate) async fn start_http_api(
nym_http_cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
hm_url: String,
) -> anyhow::Result<ShutdownHandles> {
let router_builder = RouterBuilder::with_default_routes();
let state = AppState::new(db_pool, nym_http_cache_ttl, agent_key_list, agent_max_count).await;
let state = AppState::new(
db_pool,
nym_http_cache_ttl,
agent_key_list,
agent_max_count,
hm_url,
)
.await;
let router = router_builder.with_state(state);
let bind_addr = format!("0.0.0.0:{}", http_port);
@@ -25,10 +25,11 @@ impl AppState {
cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
hm_url: String,
) -> Self {
Self {
db_pool,
cache: HttpCache::new(cache_ttl).await,
cache: HttpCache::new(cache_ttl, hm_url).await,
agent_key_list,
agent_max_count,
}
@@ -51,14 +52,96 @@ impl AppState {
}
}
#[derive(Debug, Clone)]
struct HistoricMixingStats {
historic_stats: Vec<DailyStats>,
}
impl HistoricMixingStats {
/// Collect historic stats only on initialization. From this point onwards,
/// service will collect its own stats
async fn init(hm_url: String) -> Self {
tracing::info!("Fetching historic mixnode stats from {}", hm_url);
let target_url = format!("{}/v2/mixnodes/stats", hm_url);
if let Ok(response) = reqwest::get(&target_url)
.await
.and_then(|res| res.error_for_status())
.inspect_err(|err| tracing::error!("Failed to fetch cache from HM: {}", err))
{
if let Ok(mut daily_stats) = response.json::<Vec<DailyStats>>().await {
// sorting required for seamless comparison later (descending, newest first)
daily_stats.sort_by(|left, right| right.date_utc.cmp(&left.date_utc));
tracing::info!(
"Successfully fetched {} historic entries from {}",
daily_stats.len(),
hm_url
);
return Self {
historic_stats: daily_stats,
};
}
};
tracing::warn!("Failed to get historic daily stats from {}", hm_url);
Self {
historic_stats: Vec::new(),
}
}
/// polyfill with historical data obtained from Harbour Master
fn merge_with_historic_stats(&self, mut new_stats: Vec<DailyStats>) -> Vec<DailyStats> {
// newest first
new_stats.sort_by(|left, right| right.date_utc.cmp(&left.date_utc));
// historic stats are only used for dates when we don't have new data
let oldest_date_in_new_stats = new_stats
.last()
.map(|day| day.date_utc.to_owned())
.unwrap_or(String::from("1900-01-01"));
// given 2 arrays
// index historic_stats new_stats
// 0 30-01 31-01
// 1 29-01 30-01
// 2 28-01
// ...
// N 01-01
// cutoff point would be at historic_stats[1]
// (first date smaller than oldest we've already got)
if let Some(cutoff) = self
.historic_stats
.iter()
.position(|elem| elem.date_utc < oldest_date_in_new_stats)
{
// missing data = (all historic data) - (however many days we already have)
let missing_data = self.historic_stats.iter().skip(cutoff).cloned();
// extend new data with missing days
tracing::debug!(
"Polyfilled with {} historic records from {:?} to {:?}",
missing_data.len(),
self.historic_stats.last(),
self.historic_stats.get(cutoff)
);
new_stats.extend(missing_data);
// oldest first
new_stats.into_iter().rev().collect::<Vec<_>>()
} else {
// if all historic data is older than what we've got, don't use it
new_stats
}
}
}
static GATEWAYS_LIST_KEY: &str = "gateways";
static MIXNODES_LIST_KEY: &str = "mixnodes";
static MIXSTATS_LIST_KEY: &str = "mixstats";
static SUMMARY_HISTORY_LIST_KEY: &str = "summary-history";
static SESSION_STATS_LIST_KEY: &str = "session-stats";
const MIXNODE_STATS_HISTORY_DAYS: usize = 30;
#[derive(Debug, Clone)]
pub(crate) struct HttpCache {
gateways: Cache<String, Arc<RwLock<Vec<Gateway>>>>,
@@ -66,10 +149,11 @@ pub(crate) struct HttpCache {
mixstats: Cache<String, Arc<RwLock<Vec<DailyStats>>>>,
history: Cache<String, Arc<RwLock<Vec<SummaryHistory>>>>,
session_stats: Cache<String, Arc<RwLock<Vec<SessionStats>>>>,
mixnode_historic_daily_stats: HistoricMixingStats,
}
impl HttpCache {
pub async fn new(ttl_seconds: u64) -> Self {
pub async fn new(ttl_seconds: u64, hm_url: String) -> Self {
HttpCache {
gateways: Cache::builder()
.max_capacity(2)
@@ -91,6 +175,7 @@ impl HttpCache {
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
mixnode_historic_daily_stats: HistoricMixingStats::init(hm_url).await,
}
}
@@ -200,27 +285,26 @@ impl HttpCache {
.await
}
pub async fn get_mixnode_stats(&self, db: &DbPool, offset: usize) -> Vec<DailyStats> {
let mut stats = match self.mixstats.get(MIXSTATS_LIST_KEY).await {
pub async fn get_mixnode_stats(&self, db: &DbPool, offset: i64) -> Vec<DailyStats> {
match self.mixstats.get(MIXSTATS_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.to_vec()
}
None => {
let new_node_stats = crate::db::queries::get_daily_stats(db)
let new_node_stats = crate::db::queries::get_daily_stats(db, offset)
.await
.unwrap_or_default()
.into_iter()
.rev()
.collect::<Vec<_>>();
// cache result without offset
self.upsert_mixnode_stats(new_node_stats.clone()).await;
new_node_stats
}
};
.unwrap_or_default();
// for every day that's missing, fill it with cached historic data
let mut mixnode_stats = self
.mixnode_historic_daily_stats
.merge_with_historic_stats(new_node_stats);
mixnode_stats.truncate(30);
stats.truncate(MIXNODE_STATS_HISTORY_DAYS + offset);
stats.into_iter().skip(offset).rev().collect()
self.upsert_mixnode_stats(mixnode_stats.clone()).await;
mixnode_stats
}
}
}
pub async fn get_summary_history(&self, db: &DbPool) -> Vec<SummaryHistory> {
@@ -34,8 +34,6 @@ pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
"tower_http",
"axum",
"html5ever",
"hickory_proto",
"hickory_resolver",
];
for crate_name in warn_crates {
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name))?);
@@ -10,7 +10,6 @@ mod mixnet_scraper;
mod monitor;
mod node_scraper;
mod testruns;
mod utils;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@@ -67,6 +66,7 @@ async fn main() -> anyhow::Result<()> {
args.nym_http_cache_ttl,
agent_key_list.to_owned(),
args.max_agent_count,
args.hm_url,
)
.await
.expect("Failed to start server");
@@ -1,15 +1,12 @@
use crate::{
db::{
models::{NodeStats, ScraperNodeInfo},
queries::{
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
insert_scraped_node_description,
},
use crate::db::{
models::{NodeStats, ScraperNodeInfo},
queries::{
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
insert_scraped_node_description,
},
utils::generate_node_name,
};
use ammonia::Builder;
use anyhow::{anyhow, Result};
use anyhow::Result;
use chrono::{DateTime, Datelike, Utc};
use reqwest;
use serde::{Deserialize, Serialize};
@@ -83,33 +80,22 @@ pub fn build_client() -> Result<reqwest::Client> {
.map_err(|e| anyhow::anyhow!("Failed to build HTTP client: {}", e))
}
pub fn sanitize_description(
description: NodeDescriptionResponse,
node_id: i64,
) -> NodeDescriptionResponse {
pub fn sanitize_description(description: NodeDescriptionResponse) -> NodeDescriptionResponse {
let mut sanitizer = Builder::new();
sanitizer
.tags(std::collections::HashSet::new())
.generic_attributes(std::collections::HashSet::new())
.url_schemes(std::collections::HashSet::new());
const UNKNOWN: &str = "N/A";
let sanitize_field = |opt: Option<String>| -> Option<String> {
Some(
opt.filter(|s| !s.trim().is_empty())
.map_or_else(|| UNKNOWN.to_string(), |s| sanitizer.clean(&s).to_string()),
.map_or_else(|| "N/A".to_string(), |s| sanitizer.clean(&s).to_string()),
)
};
let mut moniker = sanitize_field(description.moniker);
if let Some(sanitized) = &moniker {
if sanitized == UNKNOWN {
moniker = Some(generate_node_name(node_id));
}
};
NodeDescriptionResponse {
moniker,
moniker: sanitize_field(description.moniker),
website: sanitize_field(description.website),
security_contact: sanitize_field(description.security_contact),
details: sanitize_field(description.details),
@@ -122,26 +108,18 @@ pub async fn scrape_and_store_description(pool: &SqlitePool, node: &ScraperNodeI
let mut description = None;
let mut error = None;
let mut tried_url_list = Vec::new();
for mut url in urls {
url = format!("{}{}", url.trim_end_matches('/'), DESCRIPTION_URL);
tried_url_list.push(url.clone());
match client
.get(&url)
.send()
.await
// convert 404 and similar to error
.and_then(|res| res.error_for_status())
{
match client.get(&url).send().await {
Ok(response) => {
if let Ok(desc) = response.json::<NodeDescriptionResponse>().await {
description = Some(desc);
break;
}
}
Err(e) => error = Some(anyhow!("{:?} ({})", tried_url_list, e)),
Err(e) => error = Some(e),
}
}
@@ -150,8 +128,9 @@ pub async fn scrape_and_store_description(pool: &SqlitePool, node: &ScraperNodeI
anyhow::anyhow!("Failed to fetch description from any URL: {}", err_msg)
})?;
let sanitized_description = sanitize_description(description, *node.node_id());
insert_scraped_node_description(pool, &node.node_kind, &sanitized_description).await?;
let sanitized_description = sanitize_description(description);
insert_scraped_node_description(pool, &node.node_kind, node.node_id, &sanitized_description)
.await?;
Ok(())
}
@@ -165,11 +144,9 @@ pub async fn scrape_and_store_packet_stats(
let mut stats = None;
let mut error = None;
let mut tried_url_list = Vec::new();
for mut url in urls {
url = format!("{}{}", url.trim_end_matches('/'), PACKET_STATS_URL);
tried_url_list.push(url.clone());
match client.get(&url).send().await {
Ok(response) => {
@@ -178,18 +155,18 @@ pub async fn scrape_and_store_packet_stats(
break;
}
}
Err(e) => error = Some(anyhow!("{:?} ({})", tried_url_list, e)),
Err(e) => error = Some(e),
}
}
let stats = stats.ok_or_else(|| {
let err_msg = error.map_or_else(|| "Unknown error".to_string(), |e| e.to_string());
anyhow::anyhow!("Failed to fetch description from any URL: {}", err_msg)
anyhow::anyhow!("Failed to fetch stats from any URL: {}", err_msg)
})?;
let timestamp = Utc::now();
let timestamp_utc = timestamp.timestamp();
insert_node_packet_stats(pool, &node.node_kind, &stats, timestamp_utc).await?;
insert_node_packet_stats(pool, node.node_id, &node.node_kind, &stats, timestamp_utc).await?;
// Update daily stats
update_daily_stats(pool, node, timestamp, &stats).await?;
@@ -8,7 +8,7 @@ use sqlx::SqlitePool;
use tracing::{debug, error, instrument, warn};
use crate::db::models::ScraperNodeInfo;
use crate::db::queries::get_nodes_for_scraping;
use crate::db::queries::get_mixing_nodes_for_scraping;
const DESCRIPTION_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 4);
const PACKET_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60);
@@ -74,7 +74,7 @@ impl Scraper {
pool: &SqlitePool,
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
) -> Result<()> {
let nodes = get_nodes_for_scraping(pool).await?;
let nodes = get_mixing_nodes_for_scraping(pool).await?;
if let Ok(mut queue_lock) = queue.lock() {
queue_lock.extend(nodes);
} else {
@@ -82,7 +82,7 @@ impl Scraper {
return Ok(());
}
Self::process_description_queue(pool, queue).await;
Self::process_description_queue(pool, queue).await?;
Ok(())
}
@@ -91,7 +91,7 @@ impl Scraper {
pool: &SqlitePool,
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
) -> Result<()> {
let nodes = get_nodes_for_scraping(pool).await?;
let nodes = get_mixing_nodes_for_scraping(pool).await?;
tracing::info!("Querying {} mixing nodes", nodes.len());
if let Ok(mut queue_lock) = queue.lock() {
queue_lock.extend(nodes);
@@ -100,11 +100,14 @@ impl Scraper {
return Ok(());
}
Self::process_packet_queue(pool, queue).await;
Self::process_packet_queue(pool, queue).await?;
Ok(())
}
async fn process_description_queue(pool: &SqlitePool, queue: Arc<Mutex<Vec<ScraperNodeInfo>>>) {
async fn process_description_queue(
pool: &SqlitePool,
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
) -> Result<()> {
loop {
let running_tasks = TASK_COUNTER.load(Ordering::Relaxed);
@@ -129,15 +132,12 @@ impl Scraper {
tokio::spawn(async move {
match scrape_and_store_description(&pool, &node).await {
Ok(_) => debug!(
"📝 ✅ Description task #{} for node {} complete",
task_id,
node.node_id()
"✅ Description task #{} for node {} complete",
task_id, node.node_id
),
Err(e) => debug!(
"📝 ❌ Description task #{} for node {} failed: {}",
task_id,
node.node_id(),
e
"❌ Description task #{} for node {} failed: {}",
task_id, node.node_id, e
),
}
TASK_COUNTER.fetch_sub(1, Ordering::Relaxed);
@@ -146,9 +146,13 @@ impl Scraper {
tokio::time::sleep(QUEUE_CHECK_INTERVAL).await;
}
}
Ok(())
}
async fn process_packet_queue(pool: &SqlitePool, queue: Arc<Mutex<Vec<ScraperNodeInfo>>>) {
async fn process_packet_queue(
pool: &SqlitePool,
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
) -> Result<()> {
loop {
let running_tasks = TASK_COUNTER.load(Ordering::Relaxed);
@@ -173,15 +177,12 @@ impl Scraper {
tokio::spawn(async move {
match scrape_and_store_packet_stats(&pool, &node).await {
Ok(_) => debug!(
"📊 ✅ Packet stats task #{} for node {} complete",
task_id,
node.node_id()
"✅ Packet stats task #{} for node {} complete",
task_id, node.node_id
),
Err(e) => debug!(
"📊 ❌ Packet stats task #{} for node {} failed: {}",
task_id,
node.node_id(),
e
"❌ Packet stats task #{} for node {} failed: {}",
task_id, node.node_id, e
),
}
TASK_COUNTER.fetch_sub(1, Ordering::Relaxed);
@@ -190,5 +191,6 @@ impl Scraper {
tokio::time::sleep(QUEUE_CHECK_INTERVAL).await;
}
}
Ok(())
}
}
@@ -1,14 +1,14 @@
#![allow(deprecated)]
use crate::db::models::{
gateway, mixnode, GatewayInsertRecord, MixnodeRecord, NetworkSummary, ASSIGNED_ENTRY_COUNT,
gateway, mixnode, GatewayRecord, MixnodeRecord, NetworkSummary, ASSIGNED_ENTRY_COUNT,
ASSIGNED_EXIT_COUNT, ASSIGNED_MIXING_COUNT, GATEWAYS_BONDED_COUNT, GATEWAYS_HISTORICAL_COUNT,
MIXNODES_HISTORICAL_COUNT, MIXNODES_LEGACY_COUNT, NYMNODES_DESCRIBED_COUNT, NYMNODE_COUNT,
};
use crate::db::{queries, DbPool};
use crate::monitor::geodata::{Location, NodeGeoData};
use crate::utils::{decimal_to_i64, LogError, NumericalCheckedCast};
use anyhow::anyhow;
use cosmwasm_std::Decimal;
use moka::future::Cache;
use nym_network_defaults::NymNetworkDetails;
use nym_validator_client::client::{NodeId, NymApiClientExt};
@@ -29,6 +29,7 @@ pub(crate) use geodata::IpInfoClient;
mod geodata;
// TODO dz should be configurable
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
static DELEGATION_PROGRAM_WALLET: &str = "n1rnxpdpx3kldygsklfft0gech7fhfcux4zst5lw";
@@ -108,11 +109,7 @@ impl Monitor {
let gateways = described_nodes
.iter()
.filter(|node| {
node.description.declared_role.entry
|| node.description.declared_role.exit_ipr
|| node.description.declared_role.exit_nr
})
.filter(|node| node.description.declared_role.entry)
.collect::<Vec<_>>();
let bonded_node_info = api_client
@@ -123,18 +120,12 @@ impl Monitor {
// for faster reads
.collect::<HashMap<_, _>>();
tracing::info!("🟣 bonded_nodes: {}", bonded_node_info.len());
let nym_nodes = api_client
.get_all_basic_nodes()
.await
.log_error("get_all_basic_nodes")?;
queries::insert_nym_nodes(&self.db_pool, nym_nodes.clone(), &bonded_node_info)
.await
.map(|_| {
tracing::debug!("{} nym nodes written to DB!", nym_nodes.len());
})?;
queries::insert_nym_nodes(&self.db_pool, nym_nodes.clone(), &bonded_node_info).await?;
let mut gateway_geodata = Vec::new();
for gateway in gateways.iter() {
@@ -207,11 +198,10 @@ impl Monitor {
let gateway_records = self.prepare_gateway_data(&gateways, gateway_geodata, &nym_nodes)?;
let pool = self.db_pool.clone();
let gateways_count = gateway_records.len();
queries::insert_gateways(&pool, gateway_records)
.await
.map(|_| {
tracing::debug!("{} gateway records written to DB!", gateways_count);
tracing::debug!("Gateway info written to DB!");
})?;
let mixnode_records = self.prepare_mixnode_data(
@@ -219,11 +209,10 @@ impl Monitor {
mixnodes_described,
delegation_program_members,
)?;
let mixnodes_count = mixnode_records.len();
queries::insert_mixnodes(&pool, mixnode_records)
.await
.map(|_| {
tracing::debug!("{} mixnode info written to DB!", mixnodes_count);
tracing::debug!("Mixnode info written to DB!");
})?;
let (all_historical_gateways, all_historical_mixnodes) = calculate_stats(&pool).await?;
@@ -310,13 +299,13 @@ impl Monitor {
fn prepare_gateway_data(
&self,
described_gateways: &[&NymNodeDescription],
gateways: &[&NymNodeDescription],
gateway_geodata: Vec<NodeGeoData>,
skimmed_gateways: &[SkimmedNode],
) -> anyhow::Result<Vec<GatewayInsertRecord>> {
) -> anyhow::Result<Vec<GatewayRecord>> {
let mut gateway_records = Vec::new();
for gateway in described_gateways {
for gateway in gateways {
let identity_key = gateway.ed25519_identity_key().to_base58_string();
let bonded = true;
let last_updated_utc = chrono::offset::Utc::now().timestamp();
@@ -340,7 +329,7 @@ impl Monitor {
.unwrap_or_default()
.round_to_integer();
gateway_records.push(GatewayInsertRecord {
gateway_records.push(GatewayRecord {
identity_key: identity_key.to_owned(),
bonded,
self_described,
@@ -411,6 +400,33 @@ impl Monitor {
}
}
// TODO dz is there a common monorepo place this can be put?
pub trait NumericalCheckedCast<T>
where
T: TryFrom<Self>,
<T as TryFrom<Self>>::Error: std::error::Error,
Self: std::fmt::Display + Copy,
{
fn cast_checked(self) -> anyhow::Result<T> {
T::try_from(self).map_err(|e| {
anyhow::anyhow!(
"Couldn't cast {} to {}: {}",
self,
std::any::type_name::<T>(),
e
)
})
}
}
impl<T, U> NumericalCheckedCast<U> for T
where
U: TryFrom<T>,
<U as TryFrom<T>>::Error: std::error::Error,
T: std::fmt::Display + Copy,
{
}
async fn calculate_stats(pool: &DbPool) -> anyhow::Result<(usize, usize)> {
let mut conn = pool.acquire().await?;
@@ -448,3 +464,39 @@ async fn get_delegation_program_details(
Ok(mix_ids)
}
pub(crate) fn decimal_to_i64(decimal: Decimal) -> i64 {
// Convert the underlying Uint128 to a u128
let atomics = decimal.atomics().u128();
let precision = 1_000_000_000_000_000_000u128;
// Get the fractional part
let fractional = atomics % precision;
// Get the integer part
let integer = atomics / precision;
// Combine them into a float
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
// Limit to 6 decimal places
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
rounded_value as i64
}
trait LogError<T, E> {
fn log_error(self, msg: &str) -> Result<T, E>;
}
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
where
E: std::error::Error,
{
fn log_error(self, msg: &str) -> Result<T, E> {
if let Err(e) = &self {
tracing::error!("[{msg}]:\t{e}");
}
self
}
}
@@ -17,14 +17,15 @@ use tracing::instrument;
mod error;
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6);
const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6); //6h, data only update once a day
const STALE_DURATION: Duration = Duration::from_secs(86400 * 365); //one year
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
#[instrument(level = "debug", name = "node_scraper", skip_all)]
pub(crate) async fn spawn_in_background(db_pool: DbPool, nym_api_client_timeout: Duration) {
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
loop {
//No graceful shutdown?
tracing::info!("Refreshing node self-described metrics...");
if let Err(e) = run(&db_pool, &network_defaults, nym_api_client_timeout).await {
@@ -122,7 +123,7 @@ impl MetricsScrapingData {
}
}
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
#[instrument(level = "debug", name = "metrics_scraper", skip_all)]
async fn try_scrape_metrics(&self) -> Option<SessionStats> {
match self.try_get_client().await {
Ok(client) => {
@@ -1,104 +0,0 @@
use cosmwasm_std::Decimal;
use itertools::Itertools;
use rand::prelude::SliceRandom;
use rand::SeedableRng;
// pub(crate) fn generate_node_name(identity: ed25519::PublicKey) -> String {
pub(crate) fn generate_node_name(node_id: i64) -> String {
let seed = {
let node_id_bytes = node_id.to_le_bytes();
let mut seed = [0u8; 32];
for i in 0..4 {
seed[i * 8..(i + 1) * 8].copy_from_slice(&node_id_bytes);
}
seed
};
let mut rng = rand_chacha::ChaCha20Rng::from_seed(seed);
let words = bip39::Language::English.word_list();
words.choose_multiple(&mut rng, 3).join(" ")
}
#[allow(clippy::items_after_test_module)]
#[cfg(test)]
mod test {
use rand::Rng;
use super::*;
#[test]
fn generate_node_name_should_be_deterministic() {
let mut rng = rand::thread_rng();
let node_id: i64 = rng.gen();
let different_node_id: i64 = rng.gen();
let node_name = generate_node_name(node_id);
let node_name_different = generate_node_name(different_node_id);
assert_ne!(node_name, node_name_different);
let node_name_same = generate_node_name(node_id);
assert_eq!(node_name, node_name_same);
}
}
pub trait NumericalCheckedCast<T>
where
T: TryFrom<Self>,
<T as TryFrom<Self>>::Error: std::error::Error,
Self: std::fmt::Display + Copy,
{
fn cast_checked(self) -> anyhow::Result<T> {
T::try_from(self).map_err(|e| {
anyhow::anyhow!(
"Couldn't cast {} to {}: {}",
self,
std::any::type_name::<T>(),
e
)
})
}
}
impl<T, U> NumericalCheckedCast<U> for T
where
U: TryFrom<T>,
<U as TryFrom<T>>::Error: std::error::Error,
T: std::fmt::Display + Copy,
{
}
pub(crate) fn decimal_to_i64(decimal: Decimal) -> i64 {
// Convert the underlying Uint128 to a u128
let atomics = decimal.atomics().u128();
let precision = 1_000_000_000_000_000_000u128;
// Get the fractional part
let fractional = atomics % precision;
// Get the integer part
let integer = atomics / precision;
// Combine them into a float
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
// Limit to 6 decimal places
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
rounded_value as i64
}
pub(crate) trait LogError<T, E> {
fn log_error(self, msg: &str) -> Result<T, E>;
}
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
where
E: std::error::Error,
{
fn log_error(self, msg: &str) -> Result<T, E> {
if let Err(e) = &self {
tracing::error!("[{msg}]:\t{e}");
}
self
}
}
-2
View File
@@ -44,8 +44,6 @@ tower-http = { workspace = true, features = ["cors", "trace"] }
utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
utoipauto = { workspace = true }
regex = "1.11.1"
lazy_static = "1.5.0"
[build-dependencies]
+6 -6
View File
@@ -7,12 +7,12 @@ Look in [env.rs](./src/env.rs) for the names of environment variables that can b
## Running locally
```
export NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH=chain_history.sqlite \
export NYX_CHAIN_WATCHER_DATABASE_PATH=nyx_chain_watcher.sqlite \
export NYX_CHAIN_WATCHER_WATCH_ACCOUNTS=n1...,n1...,n1... \
export NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES="/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
export NYX_CHAIN_WATCHER_WEBHOOK_URL="https://webhook.site" \
export NYX_CHAIN_WATCHER_WEBHOOK_AUTH=1234 \
NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH=chain_history.sqlite \
NYX_CHAIN_WATCHER_DATABASE_PATH=nyx_chain_watcher.sqlite \
NYX_CHAIN_WATCHER_WATCH_ACCOUNTS=n1...,n1...,n1... \
NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES="/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
NYX_CHAIN_WATCHER_WEBHOOK_URL="https://webhook.site" \
NYX_CHAIN_WATCHER_WEBHOOK_AUTH=1234 \
cargo run -- run
```
+3 -2
View File
@@ -1,12 +1,13 @@
use anyhow::Result;
use sqlx::{sqlite::SqliteConnectOptions, Connection, SqliteConnection};
use std::env::var;
use std::io::Write;
use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr};
#[tokio::main]
async fn main() -> Result<()> {
let db_path = PathBuf::from(var("OUT_DIR").unwrap()).join("nyx_chain_watcher.sqlite");
let db_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join(".build")
.join("nyx_chain_watcher.sqlite");
// Create the database directory if it doesn't exist
if let Some(parent) = db_path.parent() {
@@ -22,15 +22,6 @@ pub(crate) struct Args {
)]
pub watch_for_transfer_recipient_accounts: Option<Vec<AccountId>>,
/// (Override) Watch for transfers to these recipient accounts
#[clap(
long,
value_delimiter = ',',
env = NYX_RECORD_BEARER_VALUE
)]
pub record_bearer_token: Option<String>,
/// (Override) Watch for chain messages of these types
#[clap(
long,
@@ -14,7 +14,6 @@ pub(crate) fn get_run_config(args: Args) -> Result<Config, NyxChainWatcherError>
webhook_auth,
ref chain_watcher_db_path,
ref chain_history_db_path,
ref record_bearer_token,
webhook_url,
} = args;
@@ -55,11 +54,6 @@ pub(crate) fn get_run_config(args: Args) -> Result<Config, NyxChainWatcherError>
builder = builder.with_chain_scraper_db_path(db_path.clone());
}
if let Some(token) = record_bearer_token {
info!("Setting bearer token for authentication");
builder = builder.with_record_bearer_token(token.clone());
}
if let Some(webhook_url) = webhook_url {
let authentication =
webhook_auth.map(|token| HttpAuthenticationOptions::AuthorizationBearerToken { token });
-12
View File
@@ -49,8 +49,6 @@ pub struct ConfigBuilder {
pub payment_watcher_config: Option<PaymentWatcherConfig>,
pub logging: Option<LoggingSettings>,
pub bearer_token: Option<String>,
}
impl ConfigBuilder {
@@ -62,15 +60,9 @@ impl ConfigBuilder {
logging: None,
db_path: None,
chain_scraper_db_path: None,
bearer_token: None,
}
}
pub fn with_record_bearer_token(mut self, token: String) -> Self {
self.bearer_token = Some(token);
self
}
pub fn with_db_path(mut self, db_path: String) -> Self {
self.db_path = Some(db_path);
self
@@ -104,7 +96,6 @@ impl ConfigBuilder {
data_dir: self.data_dir,
db_path: self.db_path,
chain_scraper_db_path: self.chain_scraper_db_path,
bearer_token: self.bearer_token,
}
}
}
@@ -127,11 +118,8 @@ pub struct Config {
pub payment_watcher_config: Option<PaymentWatcherConfig>,
pub bearer_token: Option<String>,
#[serde(default)]
pub logging: LoggingSettings,
}
impl NymConfigTemplate for Config {
+1 -3
View File
@@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use utoipa::ToSchema;
#[derive(Clone, Deserialize, Debug, ToSchema)]
@@ -33,7 +32,7 @@ pub(crate) struct PriceHistory {
pub(crate) btc: f64,
}
#[derive(Serialize, Deserialize, Debug, ToSchema, FromRow)]
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct PaymentRecord {
pub(crate) transaction_hash: String,
pub(crate) sender_address: String,
@@ -42,4 +41,3 @@ pub(crate) struct PaymentRecord {
pub(crate) timestamp: i64,
pub(crate) height: i64,
}
+1 -32
View File
@@ -1,7 +1,5 @@
use crate::db::DbPool;
use anyhow::{Context, Result};
use regex::Regex;
use crate::db::models::PaymentRecord;
use anyhow::Result;
pub async fn get_last_checked_height(pool: &DbPool) -> Result<i64> {
let result = sqlx::query_scalar!("SELECT MAX(height) FROM payments")
@@ -10,35 +8,6 @@ pub async fn get_last_checked_height(pool: &DbPool) -> Result<i64> {
Ok(result.unwrap_or(0))
}
lazy_static::lazy_static! {
static ref HEX_PATTERN: Regex = Regex::new(r"^[A-Fa-f0-9]{64}$").unwrap();
static ref BASE64_PATTERN: Regex = Regex::new(r"^[A-Za-z0-9+/=]+$").unwrap();
}
pub async fn get_transaction_record(pool: &DbPool, record_txs: &str) -> Result<Option<PaymentRecord>> {
let query = if HEX_PATTERN.is_match(record_txs) {
"SELECT transaction_hash, sender_address, receiver_address, amount, timestamp, height
FROM transactions WHERE tx_hash = $1"
} else if BASE64_PATTERN.is_match(record_txs) {
"SELECT transaction_hash, sender_address, receiver_address, amount, timestamp, height
FROM transactions WHERE memo LIKE $1"
} else {
return Ok(None);
};
let param = if BASE64_PATTERN.is_match(record_txs) {
format!("%{}%", record_txs)
} else {
record_txs.to_string()
};
sqlx::query_as::<_, PaymentRecord>(query)
.bind(param)
.fetch_optional(pool)
.await
.context("Database query failed")
}
pub async fn insert_payment(
pool: &DbPool,
transaction_hash: String,
-1
View File
@@ -14,7 +14,6 @@ pub mod vars {
pub const NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT: &str =
"NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT";
pub const NYX_RECORD_BEARER_VALUE: &str = "NYX_RECORD_BEARER_VALUE";
pub const NYXD_SCRAPER_UNSAFE_NUKE_DB: &str = "NYXD_SCRAPER_UNSAFE_NUKE_DB";
pub const NYX_CHAIN_WATCHER_ID_ARG: &str = "NYX_CHAIN_WATCHER_ID";
+1 -3
View File
@@ -9,7 +9,6 @@ use crate::http::{api_docs, server::HttpServer, state::AppState};
pub(crate) mod price;
pub(crate) mod watcher;
pub(crate) mod records;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
@@ -27,8 +26,7 @@ impl RouterBuilder {
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
)
.nest("/v1", Router::new().nest("/price", price::routes()))
.nest("/v1", Router::new().nest("/watcher", watcher::routes()))
.nest("/v1", Router::new().nest("/records", records::routes()));
.nest("/v1", Router::new().nest("/watcher", watcher::routes()));
Self {
unfinished_router: router,
-30
View File
@@ -1,30 +0,0 @@
use crate::db::models::PaymentRecord;
use crate::db::queries::payments::get_transaction_record;
use crate::http::error::{Error, HttpResult};
use crate::http::state::AppState;
use axum::{extract::{State, Path}, Json, Router};
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/records/:record_txs", axum::routing::get(transaction_record))
}
#[utoipa::path(
tag = "Watcher Records",
get,
path = "/v1/records/{record_txs}",
responses(
(status = 200, body = PaymentRecord),
(status = 404, description = "Transaction record not found")
)
)]
/// Fetch a transaction record from the database
async fn transaction_record(
State(state): State<AppState>,
Path(record_txs): Path<String>,
) -> HttpResult<Json<PaymentRecord>> {
get_transaction_record(state.db_pool(), &record_txs)
.await
.map_err(|_| Error::internal())?
.map(Json::from)
.ok_or_else(|| Error::not_found(&record_txs))
}
-8
View File
@@ -1,4 +1,3 @@
use tracing::error;
pub(crate) type HttpResult<T> = Result<T, Error>;
pub(crate) struct Error {
@@ -13,13 +12,6 @@ impl Error {
status: axum::http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
pub(crate) fn not_found(resource: &str) -> Self {
error!("Resource not found: {}", resource);
Self {
message: format!("{} not found", resource),
status: axum::http::StatusCode::NOT_FOUND,
}
}
}
impl axum::response::IntoResponse for Error {