Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f7bbd0c93e | |||
| dd3dcfa7fe | |||
| 86ea2d23cb | |||
| 42a37442e8 |
Generated
+5
-3
@@ -6365,16 +6365,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-node-status-api"
|
||||
version = "1.0.0-rc.8"
|
||||
version = "1.0.0"
|
||||
dependencies = [
|
||||
"ammonia",
|
||||
"anyhow",
|
||||
"axum 0.7.9",
|
||||
"bip39",
|
||||
"chrono",
|
||||
"clap",
|
||||
"cosmwasm-std",
|
||||
"envy",
|
||||
"futures-util",
|
||||
"itertools 0.13.0",
|
||||
"moka",
|
||||
"nym-bin-common",
|
||||
"nym-contracts-common",
|
||||
@@ -6388,6 +6390,8 @@ dependencies = [
|
||||
"nym-statistics-common",
|
||||
"nym-task",
|
||||
"nym-validator-client",
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
"regex",
|
||||
"reqwest 0.12.4",
|
||||
"serde",
|
||||
@@ -6778,7 +6782,6 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
"serde",
|
||||
"sphinx-packet",
|
||||
"thiserror 2.0.11",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
@@ -6833,7 +6836,6 @@ name = "nym-sphinx-framing"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"cfg-if",
|
||||
"log",
|
||||
"nym-metrics",
|
||||
"nym-sphinx-acknowledgements",
|
||||
|
||||
@@ -497,6 +497,11 @@ impl NymApiClient {
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
/// retrieve basic information for all bonded nodes on the network
|
||||
pub async fn retrieve_basic_nodes_batch(&self, node_ids: Vec<u32>) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
|
||||
Ok(self.nym_api.retrieve_basic_nodes_batch(&node_ids).await?.nodes)
|
||||
}
|
||||
|
||||
pub async fn health(&self) -> Result<ApiHealthResponse, ValidatorClientError> {
|
||||
Ok(self.nym_api.health().await?)
|
||||
}
|
||||
|
||||
@@ -253,10 +253,10 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"mixnodes",
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
],
|
||||
NO_PARAMS,
|
||||
)
|
||||
@@ -269,10 +269,10 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"gateways",
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
],
|
||||
NO_PARAMS,
|
||||
)
|
||||
@@ -318,9 +318,9 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
"entry-gateways",
|
||||
"all",
|
||||
],
|
||||
@@ -355,9 +355,9 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
"mixnodes",
|
||||
"active",
|
||||
],
|
||||
@@ -392,9 +392,9 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
"mixnodes",
|
||||
"all",
|
||||
],
|
||||
@@ -403,6 +403,31 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send a Post request with a set of node ids. A successful response will contain descriptors
|
||||
/// for all nodes associated with those node IDs available in the current full topology.
|
||||
///
|
||||
/// If a provided node ID is not present there will be no descriptor for that node in the response.
|
||||
///
|
||||
/// If no node IDs are provided the response will contain no descriptors.
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn retrieve_basic_nodes_batch(
|
||||
&self,
|
||||
node_ids: &[NodeId],
|
||||
) -> Result<CachedNodesResponse<SkimmedNode>, NymAPIError> {
|
||||
self.post_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
routes::SKIMMED,
|
||||
routes::BATCH,
|
||||
],
|
||||
NO_PARAMS,
|
||||
node_ids,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_basic_nodes(
|
||||
&self,
|
||||
@@ -427,9 +452,9 @@ pub trait NymApiClientExt: ApiClient {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
routes::UNSTABLE,
|
||||
routes::NYM_NODES_ROUTES,
|
||||
"skimmed",
|
||||
routes::SKIMMED,
|
||||
],
|
||||
¶ms,
|
||||
)
|
||||
|
||||
@@ -70,3 +70,7 @@ pub const SERVICE_PROVIDERS: &str = "services";
|
||||
|
||||
pub const DETAILS: &str = "details";
|
||||
pub const NETWORK: &str = "network";
|
||||
|
||||
pub const UNSTABLE: &str = "unstable";
|
||||
pub const SKIMMED: &str = "skimmed";
|
||||
pub const BATCH: &str = "batch";
|
||||
|
||||
@@ -12,7 +12,6 @@ rand = { workspace = true }
|
||||
bs58 = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
sphinx-packet = { workspace = true }
|
||||
|
||||
nym-crypto = { path = "../../crypto", features = ["stream_cipher", "rand"] }
|
||||
nym-sphinx-addressing = { path = "../addressing" }
|
||||
|
||||
@@ -7,12 +7,11 @@ use nym_sphinx_addressing::clients::Recipient;
|
||||
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, MAX_NODE_ADDRESS_UNPADDED_LEN};
|
||||
use nym_sphinx_params::packet_sizes::PacketSize;
|
||||
use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm};
|
||||
use nym_sphinx_types::{Destination, NymPacket, SURBMaterial, SphinxError, SURB};
|
||||
use nym_sphinx_types::{NymPacket, SURBMaterial, SphinxError, SURB};
|
||||
use nym_topology::{NymRouteProvider, NymTopologyError};
|
||||
use rand::{CryptoRng, RngCore};
|
||||
use serde::de::{Error as SerdeError, Visitor};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use sphinx_packet::route::Node;
|
||||
|
||||
use std::fmt::{self, Formatter};
|
||||
use std::time;
|
||||
@@ -84,25 +83,6 @@ impl ReplySurb {
|
||||
packet_size.plaintext_size() - ack_overhead - ReplySurbKeyDigestAlgorithm::output_size() - 1
|
||||
}
|
||||
|
||||
pub fn construct_with_route<R>(
|
||||
rng: &mut R,
|
||||
destination: Destination,
|
||||
average_delay: time::Duration,
|
||||
route: &[Node],
|
||||
) -> Result<Self, NymTopologyError>
|
||||
where
|
||||
R: RngCore + CryptoRng,
|
||||
{
|
||||
let delays = nym_sphinx_routing::generate_hop_delays(average_delay, route.len());
|
||||
let surb_material = SURBMaterial::new(route.to_vec(), delays, destination);
|
||||
|
||||
// this can't fail as we know we have a valid route to gateway and have correct number of delays
|
||||
Ok(ReplySurb {
|
||||
surb: surb_material.construct_SURB().unwrap(),
|
||||
encryption_key: SurbEncryptionKey::new(rng),
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: should this return `ReplySURBError` for consistency sake
|
||||
// or keep `NymTopologyError` because it's the only error it can actually return?
|
||||
pub fn construct<R>(
|
||||
@@ -143,15 +123,11 @@ impl ReplySurb {
|
||||
|
||||
pub fn to_bytes(&self) -> Vec<u8> {
|
||||
// KEY || SURB_BYTES
|
||||
let bytes: Vec<u8> = self.encryption_key
|
||||
self.encryption_key
|
||||
.to_bytes()
|
||||
.into_iter()
|
||||
.chain(self.surb.to_bytes())
|
||||
.collect();
|
||||
|
||||
assert_eq!(bytes.len(), ReplySurb::serialized_len());
|
||||
|
||||
bytes
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn from_bytes(bytes: &[u8]) -> Result<Self, ReplySurbError> {
|
||||
|
||||
@@ -170,7 +170,6 @@ impl RepliableMessage {
|
||||
}
|
||||
|
||||
pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, InvalidReplyRequestError> {
|
||||
// println!("Trying to deserialize message: {} bytes", bytes.len());
|
||||
if bytes.len() < SENDER_TAG_SIZE + 1 {
|
||||
return Err(InvalidReplyRequestError::RequestTooShortToDeserialize);
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ repository = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
bytes = { workspace = true }
|
||||
cfg-if = { workspace = true }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
thiserror = { workspace = true }
|
||||
log = { workspace = true }
|
||||
@@ -21,8 +20,5 @@ nym-metrics = { path = "../../nym-metrics" }
|
||||
nym-sphinx-addressing = { path = "../addressing" }
|
||||
nym-sphinx-acknowledgements = { path = "../acknowledgements" }
|
||||
|
||||
[features]
|
||||
no-acks = []
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
@@ -230,12 +230,8 @@ fn split_into_ack_and_message(
|
||||
| PacketSize::ExtendedPacket32
|
||||
| PacketSize::OutfoxRegularPacket => {
|
||||
trace!("received a normal packet!");
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(feature = "no-acks")] {
|
||||
return Ok((None, data));
|
||||
} else {
|
||||
let (ack_data, message) = split_hop_data_into_ack_and_message(data, packet_type)?;
|
||||
let (ack_first_hop, ack_packet) =
|
||||
let (ack_data, message) = split_hop_data_into_ack_and_message(data, packet_type)?;
|
||||
let (ack_first_hop, ack_packet) =
|
||||
match SurbAck::try_recover_first_hop_packet(&ack_data, packet_type) {
|
||||
Ok((first_hop, packet)) => (first_hop, packet),
|
||||
Err(err) => {
|
||||
@@ -243,10 +239,8 @@ fn split_into_ack_and_message(
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
let forward_ack = MixPacket::new(ack_first_hop, ack_packet, packet_type);
|
||||
Ok((Some(forward_ack), message))
|
||||
}
|
||||
}
|
||||
let forward_ack = MixPacket::new(ack_first_hop, ack_packet, packet_type);
|
||||
Ok((Some(forward_ack), message))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ use utoipa::{IntoParams, ToSchema};
|
||||
|
||||
pub(crate) mod legacy;
|
||||
pub(crate) mod unstable;
|
||||
pub(crate) mod topology;
|
||||
|
||||
pub(crate) fn nym_node_routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
|
||||
@@ -0,0 +1,153 @@
|
||||
#![warn(missing_docs)]
|
||||
#![warn(rustdoc::missing_crate_level_docs)]
|
||||
|
||||
use crate::{
|
||||
node_status_api::models::{AxumErrorResponse, AxumResult},
|
||||
support::http::{helpers::NodeIdParam, topology_cache::{TopologyCache, PayloadFormat}},
|
||||
};
|
||||
|
||||
use axum::{
|
||||
extract::{Path, Query, State},
|
||||
routing::{get, post},
|
||||
Json, Router,
|
||||
};
|
||||
use nym_mixnet_contract_common::{Interval, NodeId};
|
||||
use nym_api_requests::models::RewardedSetResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
type SkimmedNodes = AxumResult<Json<TopologyResponse>>;
|
||||
|
||||
#[derive(Debug, Deserialize, utoipa::IntoParams)]
|
||||
#[into_params(parameter_in = Query)]
|
||||
struct TopologyParams {
|
||||
#[allow(dead_code)]
|
||||
semver_compatibility: Option<String>,
|
||||
|
||||
// Identifier for the current epoch of the topology state. When sent by a client we can check if
|
||||
// the client already knows about the latest topology state, allowing a `no-updates` response
|
||||
// instead of wasting bandwidth serving an unchanged topology.
|
||||
epoch_id: Option<u32>,
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
pub(crate) fn topology_routes() -> Router<Arc<TopologyCache>> {
|
||||
Router::new()
|
||||
.route("layer-assignments", get(layer_assignments))
|
||||
.nest(
|
||||
"/skimmed",
|
||||
Router::new()
|
||||
.route("/", get(nodes_basic_all))
|
||||
.route("batch", post(nodes_basic_batch))
|
||||
.route("/:node_id", get(node_basic)),
|
||||
)
|
||||
// // NOT IMPLEMENTED
|
||||
// .nest(
|
||||
// "/semi-skimmed",
|
||||
// Router::new().route("/", get(nodes_expanded_all)),
|
||||
// )
|
||||
// .nest(
|
||||
// "/full-fat",
|
||||
// Router::new().route("/", get(nodes_detailed_all)),
|
||||
// )
|
||||
}
|
||||
|
||||
async fn nodes_basic_all(
|
||||
State(state): State<Arc<TopologyCache>>,
|
||||
Query(query_params): Query<TopologyParams>,
|
||||
) -> AxumResult<Json<TopologyResponse>> {
|
||||
Err(AxumErrorResponse::not_implemented())
|
||||
}
|
||||
|
||||
async fn nodes_basic_batch(
|
||||
State(state): State<Arc<TopologyCache>>,
|
||||
Query(query_params): Query<TopologyParams>,
|
||||
Json(node_ids): Json<Vec<NodeId>>,
|
||||
) -> AxumResult<Json<TopologyResponse>> {
|
||||
Err(AxumErrorResponse::not_implemented())
|
||||
}
|
||||
|
||||
async fn node_basic(
|
||||
Path(NodeIdParam { node_id }): Path<NodeIdParam>,
|
||||
State(state): State<Arc<TopologyCache>>,
|
||||
Query(query_params): Query<TopologyParams>,
|
||||
) -> AxumResult<Json<TopologyResponse>> {
|
||||
Err(AxumErrorResponse::not_implemented())
|
||||
}
|
||||
|
||||
async fn layer_assignments(
|
||||
State(state): State<Arc<TopologyCache>>,
|
||||
Query(query_params): Query<TopologyParams>,
|
||||
) -> AxumResult<Json<LayerAssignmentsResponse>> {
|
||||
Err(AxumErrorResponse::not_implemented())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, utoipa::ToSchema)]
|
||||
struct LayerAssignmentsResponse {
|
||||
pub status: Option<TopologyRequestStatus>,
|
||||
|
||||
pub assignments: RewardedSetResponse,
|
||||
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, utoipa::ToSchema)]
|
||||
struct TopologyResponse {
|
||||
pub status: Option<TopologyRequestStatus>,
|
||||
|
||||
payload: Vec<u8>,
|
||||
payload_format: Option<PayloadFormat>,
|
||||
payload_signature: Option<Vec<u8>>,
|
||||
|
||||
current_topology_hash: Option<Vec<u8>>,
|
||||
topology_signature: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, schemars::JsonSchema, utoipa::ToSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum TopologyRequestStatus {
|
||||
NoUpdates,
|
||||
Fresh(Interval),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::time::Duration;
|
||||
|
||||
use axum_test::TestServer;
|
||||
use nym_topology::NymTopology;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::support::http::topology_cache::Epoch;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn build_test_topology_cache() -> TopologyCache {
|
||||
|
||||
let current_epoch = Epoch {
|
||||
id: 123,
|
||||
current_epoch_start: OffsetDateTime::now_utc(),
|
||||
epoch_length: Duration::from_secs(120),
|
||||
};
|
||||
|
||||
let topology = NymTopology::default();
|
||||
|
||||
TopologyCache::new(current_epoch, topology);
|
||||
todo!();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_topology_basic() -> Result<(), Box<dyn ::std::error::Error>> {
|
||||
let state = Arc::new(build_test_topology_cache());
|
||||
let app = topology_routes().with_state(state);
|
||||
|
||||
let server = TestServer::new(app)?;
|
||||
|
||||
let response = server
|
||||
.get(&"/layer_assignments")
|
||||
.await;
|
||||
|
||||
response.assert_text("hello!");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -25,11 +25,11 @@ use crate::nym_nodes::handlers::unstable::semi_skimmed::nodes_expanded;
|
||||
use crate::nym_nodes::handlers::unstable::skimmed::{
|
||||
entry_gateways_basic_active, entry_gateways_basic_all, exit_gateways_basic_active,
|
||||
exit_gateways_basic_all, mixnodes_basic_active, mixnodes_basic_all, nodes_basic_active,
|
||||
nodes_basic_all,
|
||||
nodes_basic_all, nodes_basic_batch,
|
||||
};
|
||||
use crate::support::http::helpers::PaginationRequest;
|
||||
use crate::support::http::state::AppState;
|
||||
use axum::routing::get;
|
||||
use axum::routing::{get, post};
|
||||
use axum::Router;
|
||||
use nym_api_requests::nym_nodes::NodeRoleQueryParam;
|
||||
use serde::Deserialize;
|
||||
@@ -47,6 +47,7 @@ pub(crate) fn nym_node_routes_unstable() -> Router<AppState> {
|
||||
"/skimmed",
|
||||
Router::new()
|
||||
.route("/", get(nodes_basic_all))
|
||||
.route("batch", post(nodes_basic_batch))
|
||||
.route("/active", get(nodes_basic_active))
|
||||
.nest(
|
||||
"/mixnodes",
|
||||
|
||||
@@ -25,6 +25,7 @@ use tracing::trace;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
pub type PaginatedSkimmedNodes = AxumResult<Json<PaginatedCachedNodesResponse<SkimmedNode>>>;
|
||||
type SkimmedNodes = AxumResult<Json<CachedNodesResponse<SkimmedNode>>>;
|
||||
|
||||
/// Given all relevant caches, build part of response for JUST Nym Nodes
|
||||
fn build_nym_nodes_response<'a, NI>(
|
||||
@@ -196,7 +197,7 @@ where
|
||||
pub(super) async fn deprecated_gateways_basic(
|
||||
state: State<AppState>,
|
||||
query_params: Query<NodesParams>,
|
||||
) -> AxumResult<Json<CachedNodesResponse<SkimmedNode>>> {
|
||||
) -> SkimmedNodes {
|
||||
// 1. call '/v1/unstable/skimmed/entry-gateways/all'
|
||||
let all_gateways = entry_gateways_basic_all(state, query_params).await?;
|
||||
|
||||
@@ -223,7 +224,7 @@ pub(super) async fn deprecated_gateways_basic(
|
||||
pub(super) async fn deprecated_mixnodes_basic(
|
||||
state: State<AppState>,
|
||||
query_params: Query<NodesParams>,
|
||||
) -> AxumResult<Json<CachedNodesResponse<SkimmedNode>>> {
|
||||
) -> SkimmedNodes {
|
||||
// 1. call '/v1/unstable/nym-nodes/skimmed/mixnodes/active'
|
||||
let active_mixnodes = mixnodes_basic_active(state, query_params).await?;
|
||||
|
||||
@@ -239,7 +240,7 @@ async fn nodes_basic(
|
||||
state: State<AppState>,
|
||||
Query(_query_params): Query<NodesParams>,
|
||||
active_only: bool,
|
||||
) -> PaginatedSkimmedNodes {
|
||||
) -> SkimmedNodes {
|
||||
// unfortunately we have to build the response semi-manually here as we need to add two sources of legacy nodes
|
||||
|
||||
// 1. grab all relevant described nym-nodes
|
||||
@@ -281,10 +282,10 @@ async fn nodes_basic(
|
||||
legacy_gateways.timestamp(),
|
||||
]);
|
||||
|
||||
Ok(Json(PaginatedCachedNodesResponse::new_full(
|
||||
Ok(Json(CachedNodesResponse {
|
||||
refreshed_at,
|
||||
nodes,
|
||||
)))
|
||||
}))
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // not dead, used in OpenAPI docs
|
||||
@@ -329,6 +330,31 @@ pub(super) async fn nodes_basic_all(
|
||||
nodes_basic(state, Query(query_params.into()), false).await
|
||||
}
|
||||
|
||||
/// Post request handler taking a json array of NodeId (u32) values and returning descriptors for
|
||||
/// the provided NodeId values. A successful response will contain descriptors for all nodes
|
||||
/// associated with those node IDs available in the current full topology.
|
||||
///
|
||||
/// If a provided node ID is not present in the current topology there will be no descriptor for
|
||||
/// that node in the response.
|
||||
///
|
||||
/// If no node IDs are provided the response will contain no descriptors.
|
||||
#[utoipa::path(
|
||||
tag = "Unstable Nym Nodes batch by Node ID",
|
||||
get,
|
||||
params(NodesParamsWithRole),
|
||||
path = "batch",
|
||||
context_path = "/v1/unstable/nym-nodes/skimmed",
|
||||
responses(
|
||||
(status = 200, body = PaginatedCachedNodesResponseSchema)
|
||||
)
|
||||
)]
|
||||
pub(super) async fn nodes_basic_batch(
|
||||
state: State<AppState>,
|
||||
Query(query_params): Query<NodesParamsWithRole>,
|
||||
) -> SkimmedNodes {
|
||||
nodes_basic(state, Query(query_params.into()), false).await
|
||||
}
|
||||
|
||||
/// Return Nym Nodes and optionally legacy mixnodes/gateways (if `no-legacy` flag is not used)
|
||||
/// that are currently bonded and are in the **active set**
|
||||
#[utoipa::path(
|
||||
|
||||
@@ -23,6 +23,7 @@ use crate::support::config::Config;
|
||||
use crate::support::http::state::{
|
||||
AppState, ForcedRefresh, ShutdownHandles, TASK_MANAGER_TIMEOUT_S,
|
||||
};
|
||||
use crate::support::http::topology_cache::{Epoch, TopologyCache};
|
||||
use crate::support::http::RouterBuilder;
|
||||
use crate::support::nyxd;
|
||||
use crate::support::storage::runtime_migrations::m001_directory_services_v2_1::migrate_to_directory_services_v2_1;
|
||||
@@ -35,6 +36,7 @@ use anyhow::{bail, Context};
|
||||
use nym_config::defaults::NymNetworkDetails;
|
||||
use nym_sphinx::receiver::SphinxMessageReceiver;
|
||||
use nym_task::TaskManager;
|
||||
use nym_topology::NymTopology;
|
||||
use nym_validator_client::nyxd::Coin;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
@@ -133,8 +135,6 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
|
||||
let identity_keypair = config.base.storage_paths.load_identity()?;
|
||||
let identity_public_key = *identity_keypair.public_key();
|
||||
|
||||
let router = RouterBuilder::with_default_routes(config.network_monitor.enabled);
|
||||
|
||||
let nym_contract_cache_state = NymContractCache::new();
|
||||
let node_status_cache_state = NodeStatusCache::new();
|
||||
let mix_denom = network_details.network.chain_details.mix_denom.base.clone();
|
||||
@@ -190,7 +190,16 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
|
||||
};
|
||||
|
||||
ecash_state.spawn_background_cleaner();
|
||||
let router = router.with_state(AppState {
|
||||
|
||||
|
||||
let epoch = Epoch::from(nym_contract_cache_state.current_interval().await.take());
|
||||
let topology = {
|
||||
todo!("cannot proceed without implementing this.");
|
||||
NymTopology::default()
|
||||
};
|
||||
let topology_cache = TopologyCache::new(epoch, topology);
|
||||
|
||||
let state = AppState {
|
||||
forced_refresh: ForcedRefresh::new(
|
||||
config.topology_cacher.debug.node_describe_allow_illegal_ips,
|
||||
),
|
||||
@@ -203,7 +212,11 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
|
||||
node_info_cache,
|
||||
api_status: ApiStatusState::new(signer_information),
|
||||
ecash_state: Arc::new(ecash_state),
|
||||
});
|
||||
topology_cache: Arc::new(topology_cache)
|
||||
};
|
||||
|
||||
let router = RouterBuilder::with_default_routes(&state, config.network_monitor.enabled);
|
||||
let router = router.with_state(state);
|
||||
|
||||
// start note describe cache refresher
|
||||
// we should be doing the below, but can't due to our current startup structure
|
||||
|
||||
@@ -6,6 +6,7 @@ pub(crate) mod openapi;
|
||||
pub(crate) mod router;
|
||||
pub(crate) mod state;
|
||||
mod unstable_routes;
|
||||
pub(crate) mod topology_cache;
|
||||
|
||||
pub(crate) use router::RouterBuilder;
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::node_status_api::handlers::status_routes;
|
||||
use crate::nym_contract_cache::handlers::nym_contract_cache_routes;
|
||||
use crate::nym_nodes::handlers::legacy::legacy_nym_node_routes;
|
||||
use crate::nym_nodes::handlers::nym_node_routes;
|
||||
use crate::nym_nodes::handlers::topology::topology_routes;
|
||||
use crate::status;
|
||||
use crate::support::http::openapi::ApiDoc;
|
||||
use crate::support::http::state::AppState;
|
||||
@@ -38,7 +39,7 @@ pub(crate) struct RouterBuilder {
|
||||
impl RouterBuilder {
|
||||
/// All routes should be, if possible, added here. Exceptions are e.g.
|
||||
/// routes which are added conditionally in other places based on some `if`.
|
||||
pub(crate) fn with_default_routes(network_monitor: bool) -> Self {
|
||||
pub(crate) fn with_default_routes(state: &AppState, network_monitor: bool) -> Self {
|
||||
// https://docs.rs/tower-http/0.1.1/tower_http/trace/index.html
|
||||
// TODO rocket use tracing instead of env_logger
|
||||
// https://github.com/tokio-rs/axum/blob/main/examples/tracing-aka-logging/src/main.rs
|
||||
@@ -64,7 +65,9 @@ impl RouterBuilder {
|
||||
.nest("/api-status", status::handlers::api_status_routes())
|
||||
.nest("/nym-nodes", nym_node_routes())
|
||||
.nest("/ecash", ecash_routes())
|
||||
.nest("/unstable", unstable_routes()), // CORS layer needs to be "outside" of routes
|
||||
.nest("/unstable", unstable_routes())
|
||||
.nest("/topology", topology_routes().with_state(state.topology_cache.clone())),
|
||||
// CORS layer needs to be "outside" of routes
|
||||
);
|
||||
|
||||
Self {
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::nym_contract_cache::cache::NymContractCache;
|
||||
use crate::status::ApiStatusState;
|
||||
use crate::support::caching::cache::SharedCache;
|
||||
use crate::support::caching::Cache;
|
||||
use crate::support::http::topology_cache::TopologyCache;
|
||||
use crate::support::storage;
|
||||
use axum::extract::FromRef;
|
||||
use nym_api_requests::models::{GatewayBondAnnotated, MixNodeBondAnnotated, NodeAnnotation};
|
||||
@@ -87,6 +88,7 @@ pub(crate) struct AppState {
|
||||
pub(crate) api_status: ApiStatusState,
|
||||
// todo: refactor it into inner: Arc<EcashStateInner>
|
||||
pub(crate) ecash_state: Arc<EcashState>,
|
||||
pub(crate) topology_cache: Arc<TopologyCache>
|
||||
}
|
||||
|
||||
impl FromRef<AppState> for ApiStatusState {
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
#![warn(missing_docs)]
|
||||
#![warn(rustdoc::missing_crate_level_docs)]
|
||||
|
||||
|
||||
use std::collections::HashMap;
|
||||
use nym_topology::NymTopology;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use nym_mixnet_contract_common::{EpochId, Interval};
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
pub struct Epoch {
|
||||
pub id: EpochId,
|
||||
pub current_epoch_start: OffsetDateTime,
|
||||
pub epoch_length: Duration,
|
||||
}
|
||||
|
||||
impl From<Interval> for Epoch {
|
||||
fn from(value: Interval) -> Self {
|
||||
Self {
|
||||
id: value.current_epoch_id(),
|
||||
current_epoch_start: value.current_epoch_start(),
|
||||
epoch_length: value.epoch_length(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Format for
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, schemars::JsonSchema, utoipa::ToSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub(crate) enum PayloadFormat {
|
||||
Json,
|
||||
BitCode,
|
||||
}
|
||||
|
||||
pub(crate) struct TopologyCache {
|
||||
current_epoch: Epoch,
|
||||
formats: HashMap<PayloadFormat, SerializedTopology>,
|
||||
cached: NymTopology,
|
||||
hash: Option<Vec<u8>>,
|
||||
signature: Option<Vec<u8>>
|
||||
}
|
||||
|
||||
pub(crate) struct SerializedTopology{
|
||||
bytes: Vec<u8>,
|
||||
signature: Vec<u8>,
|
||||
}
|
||||
|
||||
|
||||
impl TopologyCache {
|
||||
pub fn new(current_epoch: Epoch, initial: NymTopology) -> Self {
|
||||
Self {
|
||||
current_epoch,
|
||||
formats: HashMap::new(),
|
||||
cached: initial,
|
||||
hash: None,
|
||||
signature: None
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-node-status-api"
|
||||
version = "1.0.0-rc.8"
|
||||
version = "1.0.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
@@ -16,11 +16,13 @@ rust-version.workspace = true
|
||||
ammonia = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
axum = { workspace = true, features = ["tokio", "macros"] }
|
||||
bip39 = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
|
||||
cosmwasm-std = { workspace = true }
|
||||
envy = { workspace = true }
|
||||
futures-util = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
moka = { workspace = true, features = ["future"] }
|
||||
nym-contracts-common = { path = "../../common/cosmwasm-smart-contracts/contracts-common" }
|
||||
nym-bin-common = { path = "../../common/bin-common", features = ["models"] }
|
||||
@@ -33,6 +35,8 @@ nym-statistics-common = { path = "../../common/statistics" }
|
||||
nym-validator-client = { path = "../../common/client-libs/validator-client" }
|
||||
nym-task = { path = "../../common/task" }
|
||||
nym-node-requests = { path = "../../nym-node/nym-node-requests", features = ["openapi"] }
|
||||
rand = { workspace = true }
|
||||
rand_chacha = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
|
||||
@@ -83,9 +83,6 @@ pub(crate) struct Cli {
|
||||
env = "NYM_NODE_STATUS_API_MAX_AGENT_COUNT"
|
||||
)]
|
||||
pub(crate) max_agent_count: i64,
|
||||
|
||||
#[clap(long, default_value = "", env = "NYM_NODE_STATUS_API_HM_URL")]
|
||||
pub(crate) hm_url: String,
|
||||
}
|
||||
|
||||
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::str::FromStr;
|
||||
|
||||
use crate::{
|
||||
http::{self, models::SummaryHistory},
|
||||
monitor::NumericalCheckedCast,
|
||||
utils::NumericalCheckedCast,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use nym_contracts_common::Percent;
|
||||
@@ -16,7 +16,7 @@ use strum_macros::{EnumString, FromRepr};
|
||||
use time::{Date, OffsetDateTime};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
pub(crate) struct GatewayRecord {
|
||||
pub(crate) struct GatewayInsertRecord {
|
||||
pub(crate) identity_key: String,
|
||||
pub(crate) bonded: bool,
|
||||
pub(crate) self_described: String,
|
||||
@@ -360,14 +360,24 @@ impl TryFrom<GatewaySessionsRecord> for http::models::SessionStats {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum MixingNodeKind {
|
||||
LegacyMixnode,
|
||||
NymNode,
|
||||
pub(crate) enum ScrapeNodeKind {
|
||||
LegacyMixnode { mix_id: i64 },
|
||||
MixingNymNode { node_id: i64 },
|
||||
EntryExitNymNode { node_id: i64, identity_key: String },
|
||||
}
|
||||
|
||||
impl ScrapeNodeKind {
|
||||
pub(crate) fn node_id(&self) -> &i64 {
|
||||
match self {
|
||||
ScrapeNodeKind::LegacyMixnode { mix_id } => mix_id,
|
||||
ScrapeNodeKind::MixingNymNode { node_id } => node_id,
|
||||
ScrapeNodeKind::EntryExitNymNode { node_id, .. } => node_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ScraperNodeInfo {
|
||||
pub node_id: i64,
|
||||
pub node_kind: MixingNodeKind,
|
||||
pub node_kind: ScrapeNodeKind,
|
||||
pub hosts: Vec<String>,
|
||||
pub http_api_port: i64,
|
||||
}
|
||||
@@ -390,6 +400,10 @@ impl ScraperNodeInfo {
|
||||
|
||||
urls
|
||||
}
|
||||
|
||||
pub(crate) fn node_id(&self) -> &i64 {
|
||||
self.node_kind.node_id()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(sqlx::Decode, Debug)]
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
models::{GatewayDto, GatewayRecord},
|
||||
models::{GatewayDto, GatewayInsertRecord},
|
||||
DbPool,
|
||||
},
|
||||
http::models::Gateway,
|
||||
@@ -30,7 +32,7 @@ pub(crate) async fn select_gateway_identity(
|
||||
|
||||
pub(crate) async fn insert_gateways(
|
||||
pool: &DbPool,
|
||||
gateways: Vec<GatewayRecord>,
|
||||
gateways: Vec<GatewayInsertRecord>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut db = pool.acquire().await?;
|
||||
for record in gateways {
|
||||
@@ -98,3 +100,21 @@ pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gatewa
|
||||
tracing::trace!("Fetched {} gateways from DB", items.len());
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_gateway_id_keys(pool: &DbPool) -> anyhow::Result<HashSet<String>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query!(
|
||||
r#"
|
||||
SELECT gateway_identity_key
|
||||
FROM gateways
|
||||
WHERE bonded = true
|
||||
"#
|
||||
)
|
||||
.fetch_all(&mut *conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|record| record.gateway_identity_key)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use futures_util::TryStreamExt;
|
||||
use tracing::error;
|
||||
|
||||
@@ -84,7 +86,7 @@ pub(crate) async fn get_all_mixnodes(pool: &DbPool) -> anyhow::Result<Vec<Mixnod
|
||||
}
|
||||
|
||||
/// `offset` = slides our fixed-day period further into the past by N days
|
||||
pub(crate) async fn get_daily_stats(pool: &DbPool, offset: i64) -> anyhow::Result<Vec<DailyStats>> {
|
||||
pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailyStats>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
DailyStats,
|
||||
@@ -115,11 +117,8 @@ pub(crate) async fn get_daily_stats(pool: &DbPool, offset: i64) -> anyhow::Resul
|
||||
WHERE nym_node_daily_mixing_stats.node_id IS NULL
|
||||
)
|
||||
GROUP BY date_utc
|
||||
ORDER BY date_utc DESC
|
||||
LIMIT 30
|
||||
OFFSET ?
|
||||
ORDER BY date_utc ASC
|
||||
"#,
|
||||
offset
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<DailyStats>>()
|
||||
@@ -127,3 +126,21 @@ pub(crate) async fn get_daily_stats(pool: &DbPool, offset: i64) -> anyhow::Resul
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_mix_ids(pool: &DbPool) -> anyhow::Result<HashSet<i64>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query!(
|
||||
r#"
|
||||
SELECT mix_id
|
||||
FROM mixnodes
|
||||
WHERE bonded = true
|
||||
"#
|
||||
)
|
||||
.fetch_all(&mut *conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|record| record.mix_id)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
@@ -8,13 +8,15 @@ pub(crate) mod scraper;
|
||||
mod summary;
|
||||
pub(crate) mod testruns;
|
||||
|
||||
pub(crate) use gateways::{get_all_gateways, insert_gateways, select_gateway_identity};
|
||||
pub(crate) use gateways::{
|
||||
get_all_gateway_id_keys, get_all_gateways, insert_gateways, select_gateway_identity,
|
||||
};
|
||||
pub(crate) use gateways_stats::{delete_old_records, get_sessions_stats, insert_session_records};
|
||||
pub(crate) use misc::insert_summaries;
|
||||
pub(crate) use mixnodes::{get_all_mixnodes, get_daily_stats, insert_mixnodes};
|
||||
pub(crate) use mixnodes::{get_all_mix_ids, get_all_mixnodes, get_daily_stats, insert_mixnodes};
|
||||
pub(crate) use nym_nodes::{get_nym_nodes, insert_nym_nodes};
|
||||
pub(crate) use packet_stats::{
|
||||
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
|
||||
};
|
||||
pub(crate) use scraper::{get_mixing_nodes_for_scraping, insert_scraped_node_description};
|
||||
pub(crate) use scraper::{get_nodes_for_scraping, insert_scraped_node_description};
|
||||
pub(crate) use summary::{get_summary, get_summary_history};
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::Context;
|
||||
use futures_util::TryStreamExt;
|
||||
use nym_validator_client::{client::NymNodeDetails, nym_api::SkimmedNode};
|
||||
use tracing::instrument;
|
||||
@@ -9,7 +10,7 @@ use crate::{
|
||||
models::{NymNodeDto, NymNodeInsertRecord},
|
||||
DbPool,
|
||||
},
|
||||
monitor::decimal_to_i64,
|
||||
utils::decimal_to_i64,
|
||||
};
|
||||
|
||||
pub(crate) async fn get_nym_nodes(pool: &DbPool) -> anyhow::Result<Vec<SkimmedNode>> {
|
||||
@@ -100,7 +101,8 @@ pub(crate) async fn insert_nym_nodes(
|
||||
record.last_updated_utc,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
.await
|
||||
.with_context(|| format!("node_id={}", record.node_id))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1,27 +1,26 @@
|
||||
use crate::db::{
|
||||
models::{MixingNodeKind, NodeStats, ScraperNodeInfo},
|
||||
models::{NodeStats, ScrapeNodeKind, ScraperNodeInfo},
|
||||
DbPool,
|
||||
};
|
||||
use anyhow::Result;
|
||||
|
||||
pub(crate) async fn insert_node_packet_stats(
|
||||
pool: &DbPool,
|
||||
node_id: i64,
|
||||
node_kind: &MixingNodeKind,
|
||||
node_kind: &ScrapeNodeKind,
|
||||
stats: &NodeStats,
|
||||
timestamp_utc: i64,
|
||||
) -> Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
match node_kind {
|
||||
MixingNodeKind::LegacyMixnode => {
|
||||
ScrapeNodeKind::LegacyMixnode { mix_id } => {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO mixnode_packet_stats_raw (
|
||||
mix_id, timestamp_utc, packets_received, packets_sent, packets_dropped
|
||||
) VALUES (?, ?, ?, ?, ?)
|
||||
"#,
|
||||
node_id,
|
||||
mix_id,
|
||||
timestamp_utc,
|
||||
stats.packets_received,
|
||||
stats.packets_sent,
|
||||
@@ -30,7 +29,8 @@ pub(crate) async fn insert_node_packet_stats(
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
}
|
||||
MixingNodeKind::NymNode => {
|
||||
ScrapeNodeKind::MixingNymNode { node_id }
|
||||
| ScrapeNodeKind::EntryExitNymNode { node_id, .. } => {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO nym_nodes_packet_stats_raw (
|
||||
@@ -60,7 +60,7 @@ pub(crate) async fn get_raw_node_stats(
|
||||
let packets = match node.node_kind {
|
||||
// if no packets are found, it's fine to assume 0 because that's also
|
||||
// SQL default value if none provided
|
||||
MixingNodeKind::LegacyMixnode => {
|
||||
ScrapeNodeKind::LegacyMixnode { mix_id } => {
|
||||
sqlx::query_as!(
|
||||
NodeStats,
|
||||
r#"
|
||||
@@ -73,12 +73,13 @@ pub(crate) async fn get_raw_node_stats(
|
||||
ORDER BY timestamp_utc DESC
|
||||
LIMIT 1 OFFSET 1
|
||||
"#,
|
||||
node.node_id
|
||||
mix_id
|
||||
)
|
||||
.fetch_optional(&mut *conn)
|
||||
.await?
|
||||
}
|
||||
MixingNodeKind::NymNode => {
|
||||
ScrapeNodeKind::MixingNymNode { node_id }
|
||||
| ScrapeNodeKind::EntryExitNymNode { node_id, .. } => {
|
||||
sqlx::query_as!(
|
||||
NodeStats,
|
||||
r#"
|
||||
@@ -91,7 +92,7 @@ pub(crate) async fn get_raw_node_stats(
|
||||
ORDER BY timestamp_utc DESC
|
||||
LIMIT 1 OFFSET 1
|
||||
"#,
|
||||
node.node_id
|
||||
node_id
|
||||
)
|
||||
.fetch_optional(&mut *conn)
|
||||
.await?
|
||||
@@ -110,7 +111,7 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
match node.node_kind {
|
||||
MixingNodeKind::LegacyMixnode => {
|
||||
ScrapeNodeKind::LegacyMixnode { mix_id } => {
|
||||
let total_stake = sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT
|
||||
@@ -118,7 +119,7 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
FROM mixnodes
|
||||
WHERE mix_id = ?
|
||||
"#,
|
||||
node.node_id
|
||||
mix_id
|
||||
)
|
||||
.fetch_one(&mut *conn)
|
||||
.await?;
|
||||
@@ -136,7 +137,7 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
packets_sent = mixnode_daily_stats.packets_sent + excluded.packets_sent,
|
||||
packets_dropped = mixnode_daily_stats.packets_dropped + excluded.packets_dropped
|
||||
"#,
|
||||
node.node_id,
|
||||
mix_id,
|
||||
date_utc,
|
||||
total_stake,
|
||||
packets.packets_received,
|
||||
@@ -146,7 +147,8 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
}
|
||||
MixingNodeKind::NymNode => {
|
||||
ScrapeNodeKind::MixingNymNode { node_id }
|
||||
| ScrapeNodeKind::EntryExitNymNode { node_id, .. } => {
|
||||
let total_stake = sqlx::query_scalar!(
|
||||
r#"
|
||||
SELECT
|
||||
@@ -154,7 +156,7 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
FROM nym_nodes
|
||||
WHERE node_id = ?
|
||||
"#,
|
||||
node.node_id
|
||||
node_id
|
||||
)
|
||||
.fetch_one(&mut *conn)
|
||||
.await?;
|
||||
@@ -172,7 +174,7 @@ pub(crate) async fn insert_daily_node_stats(
|
||||
packets_sent = nym_node_daily_mixing_stats.packets_sent + excluded.packets_sent,
|
||||
packets_dropped = nym_node_daily_mixing_stats.packets_dropped + excluded.packets_dropped
|
||||
"#,
|
||||
node.node_id,
|
||||
node_id,
|
||||
date_utc,
|
||||
total_stake,
|
||||
packets.packets_received,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::{
|
||||
db::{
|
||||
models::{MixingNodeKind, ScraperNodeInfo},
|
||||
models::{ScrapeNodeKind, ScraperNodeInfo},
|
||||
queries, DbPool,
|
||||
},
|
||||
mixnet_scraper::helpers::NodeDescriptionResponse,
|
||||
@@ -8,16 +8,36 @@ use crate::{
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
|
||||
pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperNodeInfo>> {
|
||||
pub(crate) async fn get_nodes_for_scraping(pool: &DbPool) -> Result<Vec<ScraperNodeInfo>> {
|
||||
let mut nodes_to_scrape = Vec::new();
|
||||
|
||||
let mixnode_ids = queries::get_all_mix_ids(pool).await?;
|
||||
let gateway_keys = queries::get_all_gateway_id_keys(pool).await?;
|
||||
|
||||
let mut entry_exit_nodes = 0;
|
||||
queries::get_nym_nodes(pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.for_each(|node| {
|
||||
// due to polyfilling, Nym nodes table might contain legacy mixnodes
|
||||
// as well. Mark them as such here.
|
||||
let node_kind = if mixnode_ids.contains(&node.node_id.into()) {
|
||||
ScrapeNodeKind::LegacyMixnode {
|
||||
mix_id: node.node_id.into(),
|
||||
}
|
||||
} else if gateway_keys.contains(&node.ed25519_identity_pubkey.to_base58_string()) {
|
||||
entry_exit_nodes += 1;
|
||||
ScrapeNodeKind::EntryExitNymNode {
|
||||
node_id: node.node_id.into(),
|
||||
identity_key: node.ed25519_identity_pubkey.to_base58_string(),
|
||||
}
|
||||
} else {
|
||||
ScrapeNodeKind::MixingNymNode {
|
||||
node_id: node.node_id.into(),
|
||||
}
|
||||
};
|
||||
nodes_to_scrape.push(ScraperNodeInfo {
|
||||
node_id: node.node_id.into(),
|
||||
node_kind: MixingNodeKind::NymNode,
|
||||
node_kind,
|
||||
hosts: node
|
||||
.ip_addresses
|
||||
.into_iter()
|
||||
@@ -27,7 +47,8 @@ pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<S
|
||||
})
|
||||
});
|
||||
|
||||
tracing::debug!("Fetched {} 🌟 nym nodes", nodes_to_scrape.len());
|
||||
tracing::debug!("Fetched {} 🌟 total nym nodes", nodes_to_scrape.len());
|
||||
tracing::debug!("Fetched {} 🚪 entry/exit nodes", entry_exit_nodes);
|
||||
|
||||
let mut conn = pool.acquire().await?;
|
||||
let mixnodes = sqlx::query!(
|
||||
@@ -41,7 +62,7 @@ pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<S
|
||||
.await?;
|
||||
drop(conn);
|
||||
|
||||
tracing::debug!("Fetched {} 🦖 mixnodes", nodes_to_scrape.len());
|
||||
tracing::debug!("Fetched {} 🦖 mixnodes", mixnodes.len());
|
||||
|
||||
let mut duplicates = 0;
|
||||
let mut legacy_not_in_nym_node_list = 0;
|
||||
@@ -49,26 +70,22 @@ pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<S
|
||||
for mixnode in mixnodes {
|
||||
if nodes_to_scrape
|
||||
.iter()
|
||||
.all(|node| node.node_id != mixnode.node_id)
|
||||
.all(|node| node.node_id() != &mixnode.node_id)
|
||||
{
|
||||
// in case polyfilling on Nym API gets removed, this part ensures
|
||||
// mixnodes are added to the final list of nodes to scrape
|
||||
nodes_to_scrape.push(ScraperNodeInfo {
|
||||
node_kind: ScrapeNodeKind::LegacyMixnode {
|
||||
mix_id: mixnode.node_id,
|
||||
},
|
||||
hosts: vec![mixnode.host],
|
||||
http_api_port: mixnode.http_api_port,
|
||||
});
|
||||
|
||||
legacy_not_in_nym_node_list += 1;
|
||||
} else {
|
||||
duplicates += 1;
|
||||
}
|
||||
|
||||
// technically, mixnodes shouldn't be in nym_nodes table, but it's
|
||||
// possible due to polyfilling on Nym API
|
||||
if nodes_to_scrape
|
||||
.iter()
|
||||
.all(|node| node.node_id != mixnode.node_id)
|
||||
{
|
||||
nodes_to_scrape.push(ScraperNodeInfo {
|
||||
node_id: mixnode.node_id,
|
||||
node_kind: MixingNodeKind::LegacyMixnode,
|
||||
hosts: vec![mixnode.host],
|
||||
http_api_port: mixnode.http_api_port,
|
||||
})
|
||||
}
|
||||
}
|
||||
tracing::debug!(
|
||||
"{}/{} legacy mixnodes already included in nym_node list",
|
||||
@@ -85,19 +102,16 @@ pub(crate) async fn get_mixing_nodes_for_scraping(pool: &DbPool) -> Result<Vec<S
|
||||
Ok(nodes_to_scrape)
|
||||
}
|
||||
|
||||
// TODO: add stuff for gateways
|
||||
|
||||
pub(crate) async fn insert_scraped_node_description(
|
||||
pool: &DbPool,
|
||||
node_kind: &MixingNodeKind,
|
||||
node_id: i64,
|
||||
node_kind: &ScrapeNodeKind,
|
||||
description: &NodeDescriptionResponse,
|
||||
) -> Result<()> {
|
||||
let timestamp = Utc::now().timestamp();
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
match node_kind {
|
||||
MixingNodeKind::LegacyMixnode => {
|
||||
ScrapeNodeKind::LegacyMixnode { mix_id } => {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO mixnode_description (
|
||||
@@ -110,7 +124,7 @@ pub(crate) async fn insert_scraped_node_description(
|
||||
details = excluded.details,
|
||||
last_updated_utc = excluded.last_updated_utc
|
||||
"#,
|
||||
node_id,
|
||||
mix_id,
|
||||
description.moniker,
|
||||
description.website,
|
||||
description.security_contact,
|
||||
@@ -120,7 +134,7 @@ pub(crate) async fn insert_scraped_node_description(
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
}
|
||||
MixingNodeKind::NymNode => {
|
||||
ScrapeNodeKind::MixingNymNode { node_id } => {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO nym_node_descriptions (
|
||||
@@ -143,6 +157,34 @@ pub(crate) async fn insert_scraped_node_description(
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
}
|
||||
ScrapeNodeKind::EntryExitNymNode { identity_key, .. } => {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO gateway_description (
|
||||
gateway_identity_key,
|
||||
moniker,
|
||||
website,
|
||||
security_contact,
|
||||
details,
|
||||
last_updated_utc
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT (gateway_identity_key) DO UPDATE SET
|
||||
moniker = excluded.moniker,
|
||||
website = excluded.website,
|
||||
security_contact = excluded.security_contact,
|
||||
details = excluded.details,
|
||||
last_updated_utc = excluded.last_updated_utc
|
||||
"#,
|
||||
identity_key,
|
||||
description.moniker,
|
||||
description.website,
|
||||
description.security_contact,
|
||||
description.details,
|
||||
timestamp,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -99,7 +99,10 @@ async fn get_stats(
|
||||
Query(MixStatsQueryParams { offset }): Query<MixStatsQueryParams>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<Vec<DailyStats>>> {
|
||||
let offset = offset.unwrap_or(0);
|
||||
let offset: usize = offset
|
||||
.unwrap_or(0)
|
||||
.try_into()
|
||||
.map_err(|_| HttpError::invalid_input("Offset must be non-negative"))?;
|
||||
let last_30_days = state
|
||||
.cache()
|
||||
.get_mixnode_stats(state.db_pool(), offset)
|
||||
|
||||
@@ -17,18 +17,10 @@ pub(crate) async fn start_http_api(
|
||||
nym_http_cache_ttl: u64,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_max_count: i64,
|
||||
hm_url: String,
|
||||
) -> anyhow::Result<ShutdownHandles> {
|
||||
let router_builder = RouterBuilder::with_default_routes();
|
||||
|
||||
let state = AppState::new(
|
||||
db_pool,
|
||||
nym_http_cache_ttl,
|
||||
agent_key_list,
|
||||
agent_max_count,
|
||||
hm_url,
|
||||
)
|
||||
.await;
|
||||
let state = AppState::new(db_pool, nym_http_cache_ttl, agent_key_list, agent_max_count).await;
|
||||
let router = router_builder.with_state(state);
|
||||
|
||||
let bind_addr = format!("0.0.0.0:{}", http_port);
|
||||
|
||||
@@ -25,11 +25,10 @@ impl AppState {
|
||||
cache_ttl: u64,
|
||||
agent_key_list: Vec<PublicKey>,
|
||||
agent_max_count: i64,
|
||||
hm_url: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
db_pool,
|
||||
cache: HttpCache::new(cache_ttl, hm_url).await,
|
||||
cache: HttpCache::new(cache_ttl).await,
|
||||
agent_key_list,
|
||||
agent_max_count,
|
||||
}
|
||||
@@ -52,96 +51,14 @@ impl AppState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct HistoricMixingStats {
|
||||
historic_stats: Vec<DailyStats>,
|
||||
}
|
||||
|
||||
impl HistoricMixingStats {
|
||||
/// Collect historic stats only on initialization. From this point onwards,
|
||||
/// service will collect its own stats
|
||||
async fn init(hm_url: String) -> Self {
|
||||
tracing::info!("Fetching historic mixnode stats from {}", hm_url);
|
||||
|
||||
let target_url = format!("{}/v2/mixnodes/stats", hm_url);
|
||||
if let Ok(response) = reqwest::get(&target_url)
|
||||
.await
|
||||
.and_then(|res| res.error_for_status())
|
||||
.inspect_err(|err| tracing::error!("Failed to fetch cache from HM: {}", err))
|
||||
{
|
||||
if let Ok(mut daily_stats) = response.json::<Vec<DailyStats>>().await {
|
||||
// sorting required for seamless comparison later (descending, newest first)
|
||||
daily_stats.sort_by(|left, right| right.date_utc.cmp(&left.date_utc));
|
||||
|
||||
tracing::info!(
|
||||
"Successfully fetched {} historic entries from {}",
|
||||
daily_stats.len(),
|
||||
hm_url
|
||||
);
|
||||
return Self {
|
||||
historic_stats: daily_stats,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
tracing::warn!("Failed to get historic daily stats from {}", hm_url);
|
||||
Self {
|
||||
historic_stats: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// polyfill with historical data obtained from Harbour Master
|
||||
fn merge_with_historic_stats(&self, mut new_stats: Vec<DailyStats>) -> Vec<DailyStats> {
|
||||
// newest first
|
||||
new_stats.sort_by(|left, right| right.date_utc.cmp(&left.date_utc));
|
||||
|
||||
// historic stats are only used for dates when we don't have new data
|
||||
let oldest_date_in_new_stats = new_stats
|
||||
.last()
|
||||
.map(|day| day.date_utc.to_owned())
|
||||
.unwrap_or(String::from("1900-01-01"));
|
||||
|
||||
// given 2 arrays
|
||||
// index historic_stats new_stats
|
||||
// 0 30-01 31-01
|
||||
// 1 29-01 30-01
|
||||
// 2 28-01
|
||||
// ...
|
||||
// N 01-01
|
||||
// cutoff point would be at historic_stats[1]
|
||||
// (first date smaller than oldest we've already got)
|
||||
if let Some(cutoff) = self
|
||||
.historic_stats
|
||||
.iter()
|
||||
.position(|elem| elem.date_utc < oldest_date_in_new_stats)
|
||||
{
|
||||
// missing data = (all historic data) - (however many days we already have)
|
||||
let missing_data = self.historic_stats.iter().skip(cutoff).cloned();
|
||||
|
||||
// extend new data with missing days
|
||||
tracing::debug!(
|
||||
"Polyfilled with {} historic records from {:?} to {:?}",
|
||||
missing_data.len(),
|
||||
self.historic_stats.last(),
|
||||
self.historic_stats.get(cutoff)
|
||||
);
|
||||
new_stats.extend(missing_data);
|
||||
|
||||
// oldest first
|
||||
new_stats.into_iter().rev().collect::<Vec<_>>()
|
||||
} else {
|
||||
// if all historic data is older than what we've got, don't use it
|
||||
new_stats
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static GATEWAYS_LIST_KEY: &str = "gateways";
|
||||
static MIXNODES_LIST_KEY: &str = "mixnodes";
|
||||
static MIXSTATS_LIST_KEY: &str = "mixstats";
|
||||
static SUMMARY_HISTORY_LIST_KEY: &str = "summary-history";
|
||||
static SESSION_STATS_LIST_KEY: &str = "session-stats";
|
||||
|
||||
const MIXNODE_STATS_HISTORY_DAYS: usize = 30;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct HttpCache {
|
||||
gateways: Cache<String, Arc<RwLock<Vec<Gateway>>>>,
|
||||
@@ -149,11 +66,10 @@ pub(crate) struct HttpCache {
|
||||
mixstats: Cache<String, Arc<RwLock<Vec<DailyStats>>>>,
|
||||
history: Cache<String, Arc<RwLock<Vec<SummaryHistory>>>>,
|
||||
session_stats: Cache<String, Arc<RwLock<Vec<SessionStats>>>>,
|
||||
mixnode_historic_daily_stats: HistoricMixingStats,
|
||||
}
|
||||
|
||||
impl HttpCache {
|
||||
pub async fn new(ttl_seconds: u64, hm_url: String) -> Self {
|
||||
pub async fn new(ttl_seconds: u64) -> Self {
|
||||
HttpCache {
|
||||
gateways: Cache::builder()
|
||||
.max_capacity(2)
|
||||
@@ -175,7 +91,6 @@ impl HttpCache {
|
||||
.max_capacity(2)
|
||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||
.build(),
|
||||
mixnode_historic_daily_stats: HistoricMixingStats::init(hm_url).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -285,26 +200,27 @@ impl HttpCache {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_mixnode_stats(&self, db: &DbPool, offset: i64) -> Vec<DailyStats> {
|
||||
match self.mixstats.get(MIXSTATS_LIST_KEY).await {
|
||||
pub async fn get_mixnode_stats(&self, db: &DbPool, offset: usize) -> Vec<DailyStats> {
|
||||
let mut stats = match self.mixstats.get(MIXSTATS_LIST_KEY).await {
|
||||
Some(guard) => {
|
||||
let read_lock = guard.read().await;
|
||||
read_lock.to_vec()
|
||||
}
|
||||
None => {
|
||||
let new_node_stats = crate::db::queries::get_daily_stats(db, offset)
|
||||
let new_node_stats = crate::db::queries::get_daily_stats(db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
// for every day that's missing, fill it with cached historic data
|
||||
let mut mixnode_stats = self
|
||||
.mixnode_historic_daily_stats
|
||||
.merge_with_historic_stats(new_node_stats);
|
||||
mixnode_stats.truncate(30);
|
||||
|
||||
self.upsert_mixnode_stats(mixnode_stats.clone()).await;
|
||||
mixnode_stats
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.rev()
|
||||
.collect::<Vec<_>>();
|
||||
// cache result without offset
|
||||
self.upsert_mixnode_stats(new_node_stats.clone()).await;
|
||||
new_node_stats
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
stats.truncate(MIXNODE_STATS_HISTORY_DAYS + offset);
|
||||
stats.into_iter().skip(offset).rev().collect()
|
||||
}
|
||||
|
||||
pub async fn get_summary_history(&self, db: &DbPool) -> Vec<SummaryHistory> {
|
||||
|
||||
@@ -34,6 +34,8 @@ pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
|
||||
"tower_http",
|
||||
"axum",
|
||||
"html5ever",
|
||||
"hickory_proto",
|
||||
"hickory_resolver",
|
||||
];
|
||||
for crate_name in warn_crates {
|
||||
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name))?);
|
||||
|
||||
@@ -10,6 +10,7 @@ mod mixnet_scraper;
|
||||
mod monitor;
|
||||
mod node_scraper;
|
||||
mod testruns;
|
||||
mod utils;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
@@ -66,7 +67,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
args.nym_http_cache_ttl,
|
||||
agent_key_list.to_owned(),
|
||||
args.max_agent_count,
|
||||
args.hm_url,
|
||||
)
|
||||
.await
|
||||
.expect("Failed to start server");
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
use crate::db::{
|
||||
models::{NodeStats, ScraperNodeInfo},
|
||||
queries::{
|
||||
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
|
||||
insert_scraped_node_description,
|
||||
use crate::{
|
||||
db::{
|
||||
models::{NodeStats, ScraperNodeInfo},
|
||||
queries::{
|
||||
get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats,
|
||||
insert_scraped_node_description,
|
||||
},
|
||||
},
|
||||
utils::generate_node_name,
|
||||
};
|
||||
use ammonia::Builder;
|
||||
use anyhow::Result;
|
||||
use anyhow::{anyhow, Result};
|
||||
use chrono::{DateTime, Datelike, Utc};
|
||||
use reqwest;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -80,22 +83,33 @@ pub fn build_client() -> Result<reqwest::Client> {
|
||||
.map_err(|e| anyhow::anyhow!("Failed to build HTTP client: {}", e))
|
||||
}
|
||||
|
||||
pub fn sanitize_description(description: NodeDescriptionResponse) -> NodeDescriptionResponse {
|
||||
pub fn sanitize_description(
|
||||
description: NodeDescriptionResponse,
|
||||
node_id: i64,
|
||||
) -> NodeDescriptionResponse {
|
||||
let mut sanitizer = Builder::new();
|
||||
sanitizer
|
||||
.tags(std::collections::HashSet::new())
|
||||
.generic_attributes(std::collections::HashSet::new())
|
||||
.url_schemes(std::collections::HashSet::new());
|
||||
|
||||
const UNKNOWN: &str = "N/A";
|
||||
let sanitize_field = |opt: Option<String>| -> Option<String> {
|
||||
Some(
|
||||
opt.filter(|s| !s.trim().is_empty())
|
||||
.map_or_else(|| "N/A".to_string(), |s| sanitizer.clean(&s).to_string()),
|
||||
.map_or_else(|| UNKNOWN.to_string(), |s| sanitizer.clean(&s).to_string()),
|
||||
)
|
||||
};
|
||||
|
||||
let mut moniker = sanitize_field(description.moniker);
|
||||
if let Some(sanitized) = &moniker {
|
||||
if sanitized == UNKNOWN {
|
||||
moniker = Some(generate_node_name(node_id));
|
||||
}
|
||||
};
|
||||
|
||||
NodeDescriptionResponse {
|
||||
moniker: sanitize_field(description.moniker),
|
||||
moniker,
|
||||
website: sanitize_field(description.website),
|
||||
security_contact: sanitize_field(description.security_contact),
|
||||
details: sanitize_field(description.details),
|
||||
@@ -108,18 +122,26 @@ pub async fn scrape_and_store_description(pool: &SqlitePool, node: &ScraperNodeI
|
||||
|
||||
let mut description = None;
|
||||
let mut error = None;
|
||||
let mut tried_url_list = Vec::new();
|
||||
|
||||
for mut url in urls {
|
||||
url = format!("{}{}", url.trim_end_matches('/'), DESCRIPTION_URL);
|
||||
tried_url_list.push(url.clone());
|
||||
|
||||
match client.get(&url).send().await {
|
||||
match client
|
||||
.get(&url)
|
||||
.send()
|
||||
.await
|
||||
// convert 404 and similar to error
|
||||
.and_then(|res| res.error_for_status())
|
||||
{
|
||||
Ok(response) => {
|
||||
if let Ok(desc) = response.json::<NodeDescriptionResponse>().await {
|
||||
description = Some(desc);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => error = Some(e),
|
||||
Err(e) => error = Some(anyhow!("{:?} ({})", tried_url_list, e)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,9 +150,8 @@ pub async fn scrape_and_store_description(pool: &SqlitePool, node: &ScraperNodeI
|
||||
anyhow::anyhow!("Failed to fetch description from any URL: {}", err_msg)
|
||||
})?;
|
||||
|
||||
let sanitized_description = sanitize_description(description);
|
||||
insert_scraped_node_description(pool, &node.node_kind, node.node_id, &sanitized_description)
|
||||
.await?;
|
||||
let sanitized_description = sanitize_description(description, *node.node_id());
|
||||
insert_scraped_node_description(pool, &node.node_kind, &sanitized_description).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -144,9 +165,11 @@ pub async fn scrape_and_store_packet_stats(
|
||||
|
||||
let mut stats = None;
|
||||
let mut error = None;
|
||||
let mut tried_url_list = Vec::new();
|
||||
|
||||
for mut url in urls {
|
||||
url = format!("{}{}", url.trim_end_matches('/'), PACKET_STATS_URL);
|
||||
tried_url_list.push(url.clone());
|
||||
|
||||
match client.get(&url).send().await {
|
||||
Ok(response) => {
|
||||
@@ -155,18 +178,18 @@ pub async fn scrape_and_store_packet_stats(
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => error = Some(e),
|
||||
Err(e) => error = Some(anyhow!("{:?} ({})", tried_url_list, e)),
|
||||
}
|
||||
}
|
||||
|
||||
let stats = stats.ok_or_else(|| {
|
||||
let err_msg = error.map_or_else(|| "Unknown error".to_string(), |e| e.to_string());
|
||||
anyhow::anyhow!("Failed to fetch stats from any URL: {}", err_msg)
|
||||
anyhow::anyhow!("Failed to fetch description from any URL: {}", err_msg)
|
||||
})?;
|
||||
|
||||
let timestamp = Utc::now();
|
||||
let timestamp_utc = timestamp.timestamp();
|
||||
insert_node_packet_stats(pool, node.node_id, &node.node_kind, &stats, timestamp_utc).await?;
|
||||
insert_node_packet_stats(pool, &node.node_kind, &stats, timestamp_utc).await?;
|
||||
|
||||
// Update daily stats
|
||||
update_daily_stats(pool, node, timestamp, &stats).await?;
|
||||
|
||||
@@ -8,7 +8,7 @@ use sqlx::SqlitePool;
|
||||
use tracing::{debug, error, instrument, warn};
|
||||
|
||||
use crate::db::models::ScraperNodeInfo;
|
||||
use crate::db::queries::get_mixing_nodes_for_scraping;
|
||||
use crate::db::queries::get_nodes_for_scraping;
|
||||
|
||||
const DESCRIPTION_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 4);
|
||||
const PACKET_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60);
|
||||
@@ -74,7 +74,7 @@ impl Scraper {
|
||||
pool: &SqlitePool,
|
||||
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
|
||||
) -> Result<()> {
|
||||
let nodes = get_mixing_nodes_for_scraping(pool).await?;
|
||||
let nodes = get_nodes_for_scraping(pool).await?;
|
||||
if let Ok(mut queue_lock) = queue.lock() {
|
||||
queue_lock.extend(nodes);
|
||||
} else {
|
||||
@@ -82,7 +82,7 @@ impl Scraper {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Self::process_description_queue(pool, queue).await?;
|
||||
Self::process_description_queue(pool, queue).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ impl Scraper {
|
||||
pool: &SqlitePool,
|
||||
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
|
||||
) -> Result<()> {
|
||||
let nodes = get_mixing_nodes_for_scraping(pool).await?;
|
||||
let nodes = get_nodes_for_scraping(pool).await?;
|
||||
tracing::info!("Querying {} mixing nodes", nodes.len());
|
||||
if let Ok(mut queue_lock) = queue.lock() {
|
||||
queue_lock.extend(nodes);
|
||||
@@ -100,14 +100,11 @@ impl Scraper {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Self::process_packet_queue(pool, queue).await?;
|
||||
Self::process_packet_queue(pool, queue).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_description_queue(
|
||||
pool: &SqlitePool,
|
||||
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
|
||||
) -> Result<()> {
|
||||
async fn process_description_queue(pool: &SqlitePool, queue: Arc<Mutex<Vec<ScraperNodeInfo>>>) {
|
||||
loop {
|
||||
let running_tasks = TASK_COUNTER.load(Ordering::Relaxed);
|
||||
|
||||
@@ -132,12 +129,15 @@ impl Scraper {
|
||||
tokio::spawn(async move {
|
||||
match scrape_and_store_description(&pool, &node).await {
|
||||
Ok(_) => debug!(
|
||||
"✅ Description task #{} for node {} complete",
|
||||
task_id, node.node_id
|
||||
"📝 ✅ Description task #{} for node {} complete",
|
||||
task_id,
|
||||
node.node_id()
|
||||
),
|
||||
Err(e) => debug!(
|
||||
"❌ Description task #{} for node {} failed: {}",
|
||||
task_id, node.node_id, e
|
||||
"📝 ❌ Description task #{} for node {} failed: {}",
|
||||
task_id,
|
||||
node.node_id(),
|
||||
e
|
||||
),
|
||||
}
|
||||
TASK_COUNTER.fetch_sub(1, Ordering::Relaxed);
|
||||
@@ -146,13 +146,9 @@ impl Scraper {
|
||||
tokio::time::sleep(QUEUE_CHECK_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_packet_queue(
|
||||
pool: &SqlitePool,
|
||||
queue: Arc<Mutex<Vec<ScraperNodeInfo>>>,
|
||||
) -> Result<()> {
|
||||
async fn process_packet_queue(pool: &SqlitePool, queue: Arc<Mutex<Vec<ScraperNodeInfo>>>) {
|
||||
loop {
|
||||
let running_tasks = TASK_COUNTER.load(Ordering::Relaxed);
|
||||
|
||||
@@ -177,12 +173,15 @@ impl Scraper {
|
||||
tokio::spawn(async move {
|
||||
match scrape_and_store_packet_stats(&pool, &node).await {
|
||||
Ok(_) => debug!(
|
||||
"✅ Packet stats task #{} for node {} complete",
|
||||
task_id, node.node_id
|
||||
"📊 ✅ Packet stats task #{} for node {} complete",
|
||||
task_id,
|
||||
node.node_id()
|
||||
),
|
||||
Err(e) => debug!(
|
||||
"❌ Packet stats task #{} for node {} failed: {}",
|
||||
task_id, node.node_id, e
|
||||
"📊 ❌ Packet stats task #{} for node {} failed: {}",
|
||||
task_id,
|
||||
node.node_id(),
|
||||
e
|
||||
),
|
||||
}
|
||||
TASK_COUNTER.fetch_sub(1, Ordering::Relaxed);
|
||||
@@ -191,6 +190,5 @@ impl Scraper {
|
||||
tokio::time::sleep(QUEUE_CHECK_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
#![allow(deprecated)]
|
||||
|
||||
use crate::db::models::{
|
||||
gateway, mixnode, GatewayRecord, MixnodeRecord, NetworkSummary, ASSIGNED_ENTRY_COUNT,
|
||||
gateway, mixnode, GatewayInsertRecord, MixnodeRecord, NetworkSummary, ASSIGNED_ENTRY_COUNT,
|
||||
ASSIGNED_EXIT_COUNT, ASSIGNED_MIXING_COUNT, GATEWAYS_BONDED_COUNT, GATEWAYS_HISTORICAL_COUNT,
|
||||
MIXNODES_HISTORICAL_COUNT, MIXNODES_LEGACY_COUNT, NYMNODES_DESCRIBED_COUNT, NYMNODE_COUNT,
|
||||
};
|
||||
use crate::db::{queries, DbPool};
|
||||
use crate::monitor::geodata::{Location, NodeGeoData};
|
||||
use crate::utils::{decimal_to_i64, LogError, NumericalCheckedCast};
|
||||
use anyhow::anyhow;
|
||||
use cosmwasm_std::Decimal;
|
||||
use moka::future::Cache;
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use nym_validator_client::client::{NodeId, NymApiClientExt};
|
||||
@@ -29,7 +29,6 @@ pub(crate) use geodata::IpInfoClient;
|
||||
|
||||
mod geodata;
|
||||
|
||||
// TODO dz should be configurable
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
|
||||
|
||||
static DELEGATION_PROGRAM_WALLET: &str = "n1rnxpdpx3kldygsklfft0gech7fhfcux4zst5lw";
|
||||
@@ -109,7 +108,11 @@ impl Monitor {
|
||||
|
||||
let gateways = described_nodes
|
||||
.iter()
|
||||
.filter(|node| node.description.declared_role.entry)
|
||||
.filter(|node| {
|
||||
node.description.declared_role.entry
|
||||
|| node.description.declared_role.exit_ipr
|
||||
|| node.description.declared_role.exit_nr
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let bonded_node_info = api_client
|
||||
@@ -120,6 +123,8 @@ impl Monitor {
|
||||
// for faster reads
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
tracing::info!("🟣 bonded_nodes: {}", bonded_node_info.len());
|
||||
|
||||
let nym_nodes = api_client
|
||||
.get_all_basic_nodes()
|
||||
.await
|
||||
@@ -198,10 +203,11 @@ impl Monitor {
|
||||
let gateway_records = self.prepare_gateway_data(&gateways, gateway_geodata, &nym_nodes)?;
|
||||
|
||||
let pool = self.db_pool.clone();
|
||||
let gateways_count = gateway_records.len();
|
||||
queries::insert_gateways(&pool, gateway_records)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Gateway info written to DB!");
|
||||
tracing::debug!("{} gateway records written to DB!", gateways_count);
|
||||
})?;
|
||||
|
||||
let mixnode_records = self.prepare_mixnode_data(
|
||||
@@ -209,10 +215,11 @@ impl Monitor {
|
||||
mixnodes_described,
|
||||
delegation_program_members,
|
||||
)?;
|
||||
let mixnodes_count = mixnode_records.len();
|
||||
queries::insert_mixnodes(&pool, mixnode_records)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Mixnode info written to DB!");
|
||||
tracing::debug!("{} mixnode info written to DB!", mixnodes_count);
|
||||
})?;
|
||||
|
||||
let (all_historical_gateways, all_historical_mixnodes) = calculate_stats(&pool).await?;
|
||||
@@ -299,13 +306,13 @@ impl Monitor {
|
||||
|
||||
fn prepare_gateway_data(
|
||||
&self,
|
||||
gateways: &[&NymNodeDescription],
|
||||
described_gateways: &[&NymNodeDescription],
|
||||
gateway_geodata: Vec<NodeGeoData>,
|
||||
skimmed_gateways: &[SkimmedNode],
|
||||
) -> anyhow::Result<Vec<GatewayRecord>> {
|
||||
) -> anyhow::Result<Vec<GatewayInsertRecord>> {
|
||||
let mut gateway_records = Vec::new();
|
||||
|
||||
for gateway in gateways {
|
||||
for gateway in described_gateways {
|
||||
let identity_key = gateway.ed25519_identity_key().to_base58_string();
|
||||
let bonded = true;
|
||||
let last_updated_utc = chrono::offset::Utc::now().timestamp();
|
||||
@@ -329,7 +336,7 @@ impl Monitor {
|
||||
.unwrap_or_default()
|
||||
.round_to_integer();
|
||||
|
||||
gateway_records.push(GatewayRecord {
|
||||
gateway_records.push(GatewayInsertRecord {
|
||||
identity_key: identity_key.to_owned(),
|
||||
bonded,
|
||||
self_described,
|
||||
@@ -400,33 +407,6 @@ impl Monitor {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO dz is there a common monorepo place this can be put?
|
||||
pub trait NumericalCheckedCast<T>
|
||||
where
|
||||
T: TryFrom<Self>,
|
||||
<T as TryFrom<Self>>::Error: std::error::Error,
|
||||
Self: std::fmt::Display + Copy,
|
||||
{
|
||||
fn cast_checked(self) -> anyhow::Result<T> {
|
||||
T::try_from(self).map_err(|e| {
|
||||
anyhow::anyhow!(
|
||||
"Couldn't cast {} to {}: {}",
|
||||
self,
|
||||
std::any::type_name::<T>(),
|
||||
e
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> NumericalCheckedCast<U> for T
|
||||
where
|
||||
U: TryFrom<T>,
|
||||
<U as TryFrom<T>>::Error: std::error::Error,
|
||||
T: std::fmt::Display + Copy,
|
||||
{
|
||||
}
|
||||
|
||||
async fn calculate_stats(pool: &DbPool) -> anyhow::Result<(usize, usize)> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
@@ -464,39 +444,3 @@ async fn get_delegation_program_details(
|
||||
|
||||
Ok(mix_ids)
|
||||
}
|
||||
|
||||
pub(crate) fn decimal_to_i64(decimal: Decimal) -> i64 {
|
||||
// Convert the underlying Uint128 to a u128
|
||||
let atomics = decimal.atomics().u128();
|
||||
let precision = 1_000_000_000_000_000_000u128;
|
||||
|
||||
// Get the fractional part
|
||||
let fractional = atomics % precision;
|
||||
|
||||
// Get the integer part
|
||||
let integer = atomics / precision;
|
||||
|
||||
// Combine them into a float
|
||||
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
|
||||
|
||||
// Limit to 6 decimal places
|
||||
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
|
||||
|
||||
rounded_value as i64
|
||||
}
|
||||
|
||||
trait LogError<T, E> {
|
||||
fn log_error(self, msg: &str) -> Result<T, E>;
|
||||
}
|
||||
|
||||
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
|
||||
where
|
||||
E: std::error::Error,
|
||||
{
|
||||
fn log_error(self, msg: &str) -> Result<T, E> {
|
||||
if let Err(e) = &self {
|
||||
tracing::error!("[{msg}]:\t{e}");
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,15 +17,14 @@ use tracing::instrument;
|
||||
mod error;
|
||||
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
|
||||
const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6); //6h, data only update once a day
|
||||
const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6);
|
||||
const STALE_DURATION: Duration = Duration::from_secs(86400 * 365); //one year
|
||||
|
||||
#[instrument(level = "debug", name = "node_scraper", skip_all)]
|
||||
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
|
||||
pub(crate) async fn spawn_in_background(db_pool: DbPool, nym_api_client_timeout: Duration) {
|
||||
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
|
||||
|
||||
loop {
|
||||
//No graceful shutdown?
|
||||
tracing::info!("Refreshing node self-described metrics...");
|
||||
|
||||
if let Err(e) = run(&db_pool, &network_defaults, nym_api_client_timeout).await {
|
||||
@@ -123,7 +122,7 @@ impl MetricsScrapingData {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", name = "metrics_scraper", skip_all)]
|
||||
#[instrument(level = "info", name = "metrics_scraper", skip_all)]
|
||||
async fn try_scrape_metrics(&self) -> Option<SessionStats> {
|
||||
match self.try_get_client().await {
|
||||
Ok(client) => {
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
use cosmwasm_std::Decimal;
|
||||
use itertools::Itertools;
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::SeedableRng;
|
||||
|
||||
// pub(crate) fn generate_node_name(identity: ed25519::PublicKey) -> String {
|
||||
pub(crate) fn generate_node_name(node_id: i64) -> String {
|
||||
let seed = {
|
||||
let node_id_bytes = node_id.to_le_bytes();
|
||||
let mut seed = [0u8; 32];
|
||||
for i in 0..4 {
|
||||
seed[i * 8..(i + 1) * 8].copy_from_slice(&node_id_bytes);
|
||||
}
|
||||
seed
|
||||
};
|
||||
let mut rng = rand_chacha::ChaCha20Rng::from_seed(seed);
|
||||
let words = bip39::Language::English.word_list();
|
||||
words.choose_multiple(&mut rng, 3).join(" ")
|
||||
}
|
||||
|
||||
#[allow(clippy::items_after_test_module)]
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use rand::Rng;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn generate_node_name_should_be_deterministic() {
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
let node_id: i64 = rng.gen();
|
||||
let different_node_id: i64 = rng.gen();
|
||||
|
||||
let node_name = generate_node_name(node_id);
|
||||
let node_name_different = generate_node_name(different_node_id);
|
||||
assert_ne!(node_name, node_name_different);
|
||||
|
||||
let node_name_same = generate_node_name(node_id);
|
||||
assert_eq!(node_name, node_name_same);
|
||||
}
|
||||
}
|
||||
|
||||
pub trait NumericalCheckedCast<T>
|
||||
where
|
||||
T: TryFrom<Self>,
|
||||
<T as TryFrom<Self>>::Error: std::error::Error,
|
||||
Self: std::fmt::Display + Copy,
|
||||
{
|
||||
fn cast_checked(self) -> anyhow::Result<T> {
|
||||
T::try_from(self).map_err(|e| {
|
||||
anyhow::anyhow!(
|
||||
"Couldn't cast {} to {}: {}",
|
||||
self,
|
||||
std::any::type_name::<T>(),
|
||||
e
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> NumericalCheckedCast<U> for T
|
||||
where
|
||||
U: TryFrom<T>,
|
||||
<U as TryFrom<T>>::Error: std::error::Error,
|
||||
T: std::fmt::Display + Copy,
|
||||
{
|
||||
}
|
||||
|
||||
pub(crate) fn decimal_to_i64(decimal: Decimal) -> i64 {
|
||||
// Convert the underlying Uint128 to a u128
|
||||
let atomics = decimal.atomics().u128();
|
||||
let precision = 1_000_000_000_000_000_000u128;
|
||||
|
||||
// Get the fractional part
|
||||
let fractional = atomics % precision;
|
||||
|
||||
// Get the integer part
|
||||
let integer = atomics / precision;
|
||||
|
||||
// Combine them into a float
|
||||
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
|
||||
|
||||
// Limit to 6 decimal places
|
||||
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
|
||||
|
||||
rounded_value as i64
|
||||
}
|
||||
|
||||
pub(crate) trait LogError<T, E> {
|
||||
fn log_error(self, msg: &str) -> Result<T, E>;
|
||||
}
|
||||
|
||||
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
|
||||
where
|
||||
E: std::error::Error,
|
||||
{
|
||||
fn log_error(self, msg: &str) -> Result<T, E> {
|
||||
if let Err(e) = &self {
|
||||
tracing::error!("[{msg}]:\t{e}");
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,12 @@
|
||||
use anyhow::Result;
|
||||
use sqlx::{sqlite::SqliteConnectOptions, Connection, SqliteConnection};
|
||||
use std::env::var;
|
||||
use std::io::Write;
|
||||
use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let db_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
.join(".build")
|
||||
.join("nyx_chain_watcher.sqlite");
|
||||
let db_path = PathBuf::from(var("OUT_DIR").unwrap()).join("nyx_chain_watcher.sqlite");
|
||||
|
||||
// Create the database directory if it doesn't exist
|
||||
if let Some(parent) = db_path.parent() {
|
||||
|
||||
Reference in New Issue
Block a user