Compare commits

...

12 Commits

Author SHA1 Message Date
Lawrence Stalder ef6d2fd7dd change from value to result 2024-10-18 14:50:26 +02:00
Lawrence Stalder 887ae8c789 Added naming and tags 2024-10-18 14:43:46 +02:00
dynco-nym 12759a8460 Remove config in favor of clap args 2024-10-18 14:43:04 +02:00
Fran Arbanas 00b737d362 fix: docker build workflow 2024-10-18 14:14:10 +02:00
Fran Arbanas bd7b5db68f Merge branch 'feat/node-status-api-dockerfile' of github.com:nymtech/nym into feat/node-status-api-dockerfile 2024-10-18 14:08:36 +02:00
Fran Arbanas 80184c3a51 fix: change the way we read env vars for nyxd, nym api and explorer 2024-10-18 14:08:29 +02:00
Lawrence Stalder a13e9d8cc3 Misc changes to pathing and using yq instead of jq 2024-10-18 13:51:02 +02:00
Lawrence Stalder 003a835ee6 Added workflow for pushing node status api on harbor 2024-10-18 13:34:24 +02:00
Fran Arbanas a60f37ae88 feat: add dockerfile and env variables 2024-10-18 12:23:25 +02:00
dynco-nym 8d400ed4e0 Work with directory pre-v2.1
Rebase + point to earlier network client code

Adjust to new Nym API types

Refer to earlier client code

Revert "Rebase + point to earlier network client code"

This reverts commit dd75e7dc0695c25b0883e2f5dd15b7d70165e9e8.

Point to earlier commit
2024-10-16 14:33:23 +02:00
Dinko Zdravac e4ba6c815e Working HTTP server (#4941)
* Server file structure

* Create HTTP server
- graceful shutdown
- routes
- logging, CORS

* gateways WIP

* gateways API + swagger docs complete

* Mixnodes API + swagger docs complete

* Services API + swagger docs complete

* Commit summary insert

* Make troubleshooting DB easier

* Summary API + swagger docs

* Client log changes

* QOL improvements

- remove implicit panics via `as`
- safer DTO conversions
- add logging
- new config
2024-10-14 16:33:21 +02:00
Dinko Zdravac 1ee18d9efe Node Status API background task (#4854)
* Setup new package

* Setup DB

* Fetch & store mixnodes/GWs
- refactor db package structure
- finally solve DATABASE_URL: absolute path works best

* Additional query functionality
- missing only daily summary, which requires type refactoring

* Replace type alias tuples with structs

* Insert summary

* Add github job to build package

* Build script for sqlx

* Remove data dir
- useless now that sqlx DB sits in OUT_DIR

* PR feedback
2024-10-14 16:33:21 +02:00
45 changed files with 3910 additions and 341 deletions
@@ -0,0 +1,55 @@
name: Build and upload Node Status API container to harbor.nymte.ch
on:
workflow_dispatch:
env:
WORKING_DIRECTORY: "nym-node-status-api"
CONTAINER_NAME: "node-status-api"
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
- name: Checkout repo
uses: actions/checkout@v4
- name: Configure git identity
run: |
git config --global user.email "lawrence@nymtech.net"
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.44.3
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Check if tag exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
echo "Tag ${{ steps.get_version.outputs.result }} already exists"
fi
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
Generated
+894 -317
View File
File diff suppressed because it is too large Load Diff
+7
View File
@@ -118,6 +118,7 @@ members = [
"nym-node",
"nym-node/nym-node-http-api",
"nym-node/nym-node-requests",
"nym-node-status-api",
"nym-outfox",
"nym-validator-rewarder",
"tools/echo-server",
@@ -152,6 +153,7 @@ default-members = [
"nym-data-observatory",
"nym-node",
"nym-validator-rewarder",
"nym-node-status-api",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -232,10 +234,12 @@ dotenvy = "0.15.6"
ecdsa = "0.16"
ed25519-dalek = "2.1"
etherparse = "0.13.0"
envy = "0.4"
eyre = "0.6.9"
fastrand = "2.1.1"
flate2 = "1.0.34"
futures = "0.3.28"
futures-util = "0.3"
generic-array = "0.14.7"
getrandom = "0.2.10"
getset = "0.1.3"
@@ -265,6 +269,7 @@ ledger-transport-hid = "0.10.0"
log = "0.4"
maxminddb = "0.23.0"
mime = "0.3.17"
moka = { version = "0.12", features = ["future"] }
nix = "0.27.1"
notify = "5.1.0"
okapi = "0.7.0"
@@ -298,6 +303,7 @@ serde = "1.0.210"
serde_bytes = "0.11.15"
serde_derive = "1.0"
serde_json = "1.0.128"
serde_json_path = "0.6.7"
serde_repr = "0.1"
serde_with = "3.9.0"
serde_yaml = "0.9.25"
@@ -327,6 +333,7 @@ tracing = "0.1.37"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = "0.3.16"
tracing-tree = "0.2.2"
tracing-log = "0.2"
ts-rs = "10.0.0"
tungstenite = { version = "0.20.1", default-features = false }
url = "2.5"
@@ -25,7 +25,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
nym-http-api-client = { path = "../../../common/http-api-client" }
thiserror = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["sync", "time"] }
time = { workspace = true, features = ["formatting"] }
@@ -265,6 +265,13 @@ impl NymApiClient {
NymApiClient { nym_api }
}
#[cfg(not(target_arch = "wasm32"))]
pub fn new_with_timeout(api_url: Url, timeout: std::time::Duration) -> Self {
let nym_api = nym_api::Client::new(api_url, Some(timeout));
NymApiClient { nym_api }
}
pub fn new_with_user_agent(api_url: Url, user_agent: UserAgent) -> Self {
let nym_api = nym_api::Client::builder::<_, ValidatorClientError>(api_url)
.expect("invalid api url")
@@ -121,36 +121,36 @@ async fn test_nyxd_connection(
{
Ok(Err(NyxdError::TendermintErrorRpc(e))) => {
// If we get a tendermint-rpc error, we classify the node as not contactable
log::warn!("Checking: nyxd url: {url}: {}: {}", "failed".red(), e);
tracing::warn!("Checking: nyxd url: {url}: {}: {}", "failed".red(), e);
false
}
Ok(Err(NyxdError::AbciError { code, log, .. })) => {
// We accept the mixnet contract not found as ok from a connection standpoint. This happens
// for example on a pre-launch network.
log::debug!(
tracing::debug!(
"Checking: nyxd url: {url}: {}, but with abci error: {code}: {log}",
"success".green()
);
code == 18
}
Ok(Err(error @ NyxdError::NoContractAddressAvailable(_))) => {
log::warn!("Checking: nyxd url: {url}: {}: {error}", "failed".red());
tracing::warn!("Checking: nyxd url: {url}: {}: {error}", "failed".red());
false
}
Ok(Err(e)) => {
// For any other error, we're optimistic and just try anyway.
log::warn!(
tracing::warn!(
"Checking: nyxd_url: {url}: {}, but with error: {e}",
"success".green()
);
true
}
Ok(Ok(_)) => {
log::debug!("Checking: nyxd_url: {url}: {}", "success".green());
tracing::debug!("Checking: nyxd_url: {url}: {}", "success".green());
true
}
Err(e) => {
log::warn!("Checking: nyxd_url: {url}: {}: {e}", "failed".red());
tracing::warn!("Checking: nyxd_url: {url}: {}: {e}", "failed".red());
false
}
};
@@ -169,15 +169,15 @@ async fn test_nym_api_connection(
.await
{
Ok(Ok(_)) => {
log::debug!("Checking: api_url: {url}: {}", "success".green());
tracing::debug!("Checking: api_url: {url}: {}", "success".green());
true
}
Ok(Err(e)) => {
log::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
tracing::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
false
}
Err(e) => {
log::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
tracing::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
false
}
};
@@ -40,6 +40,7 @@ use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, NodeId};
use time::format_description::BorrowedFormatItem;
use time::Date;
use tracing::instrument;
pub mod error;
pub mod routes;
@@ -51,11 +52,13 @@ pub fn rfc_3339_date() -> Vec<BorrowedFormatItem<'static>> {
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait NymApiClientExt: ApiClient {
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(&[routes::API_VERSION, routes::MIXNODES], NO_PARAMS)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
&[
@@ -69,6 +72,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateways_detailed(&self) -> Result<Vec<GatewayBondAnnotated>, NymAPIError> {
self.get_json(
&[
@@ -82,6 +86,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_detailed_unfiltered(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
@@ -97,11 +102,13 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateways(&self) -> Result<Vec<GatewayBond>, NymAPIError> {
self.get_json(&[routes::API_VERSION, routes::GATEWAYS], NO_PARAMS)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateways_described(&self) -> Result<Vec<LegacyDescribedGateway>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::GATEWAYS, routes::DESCRIBED],
@@ -110,6 +117,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_described(&self) -> Result<Vec<LegacyDescribedMixNode>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::MIXNODES, routes::DESCRIBED],
@@ -118,6 +126,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_basic_mixnodes(
&self,
semver_compatibility: Option<String>,
@@ -141,6 +150,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_basic_gateways(
&self,
semver_compatibility: Option<String>,
@@ -164,6 +174,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::MIXNODES, routes::ACTIVE],
@@ -172,6 +183,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_active_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
&[
@@ -186,6 +198,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::MIXNODES, routes::REWARDED],
@@ -194,6 +207,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_report(
&self,
mix_id: NodeId,
@@ -211,6 +225,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateway_report(
&self,
identity: IdentityKeyRef<'_>,
@@ -228,6 +243,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_history(
&self,
mix_id: NodeId,
@@ -245,6 +261,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateway_history(
&self,
identity: IdentityKeyRef<'_>,
@@ -262,6 +279,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_mixnodes_detailed(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
@@ -278,6 +296,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateway_core_status_count(
&self,
identity: IdentityKeyRef<'_>,
@@ -309,6 +328,7 @@ pub trait NymApiClientExt: ApiClient {
}
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_core_status_count(
&self,
mix_id: NodeId,
@@ -341,6 +361,7 @@ pub trait NymApiClientExt: ApiClient {
}
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_status(
&self,
mix_id: NodeId,
@@ -358,6 +379,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_reward_estimation(
&self,
mix_id: NodeId,
@@ -375,6 +397,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn compute_mixnode_reward_estimation(
&self,
mix_id: NodeId,
@@ -394,6 +417,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_stake_saturation(
&self,
mix_id: NodeId,
@@ -411,6 +435,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_inclusion_probability(
&self,
mix_id: NodeId,
@@ -428,6 +453,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_current_node_performance(
&self,
node_id: NodeId,
@@ -458,6 +484,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_blacklisted(&self) -> Result<Vec<NodeId>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::MIXNODES, routes::BLACKLISTED],
@@ -466,6 +493,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateways_blacklisted(&self) -> Result<Vec<IdentityKey>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::GATEWAYS, routes::BLACKLISTED],
@@ -474,6 +502,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self, request_body))]
async fn blind_sign(
&self,
request_body: &BlindSignRequestBody,
@@ -490,6 +519,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self, request_body))]
async fn verify_ecash_ticket(
&self,
request_body: &VerifyEcashTicketBody,
@@ -506,6 +536,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self, request_body))]
async fn batch_redeem_ecash_tickets(
&self,
request_body: &BatchRedeemTicketsBody,
@@ -522,6 +553,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn double_spending_filter_v1(&self) -> Result<SpentCredentialsResponse, NymAPIError> {
self.get_json(
&[
@@ -534,6 +566,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn partial_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
@@ -557,6 +590,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn partial_coin_indices_signatures(
&self,
epoch_id: Option<EpochId>,
@@ -577,6 +611,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn global_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
@@ -600,6 +635,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn global_coin_indices_signatures(
&self,
epoch_id: Option<EpochId>,
@@ -620,6 +656,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn master_verification_key(
&self,
epoch_id: Option<EpochId>,
@@ -639,6 +676,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn epoch_credentials(
&self,
dkg_epoch: EpochId,
@@ -655,6 +693,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_credential(
&self,
credential_id: i64,
@@ -671,6 +710,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_credentials(
&self,
credential_ids: Vec<i64>,
@@ -8,9 +8,9 @@ use crate::nyxd::CosmWasmClient;
use async_trait::async_trait;
use cosmrs::AccountId;
use cosmwasm_std::Addr;
use log::trace;
use nym_coconut_dkg_common::types::{ChunkIndex, NodeIndex, StateAdvanceResponse};
use serde::Deserialize;
use tracing::trace;
use nym_coconut_dkg_common::dealer::RegisteredDealerDetails;
pub use nym_coconut_dkg_common::{
@@ -29,7 +29,6 @@ use cosmrs::proto::cosmwasm::wasm::v1::{
};
use cosmrs::tendermint::{block, chain, Hash};
use cosmrs::{AccountId, Coin as CosmosCoin, Tx};
use log::trace;
use prost::Message;
use serde::{Deserialize, Serialize};
@@ -68,7 +67,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
Res: Message + Default,
{
if let Some(ref abci_path) = path {
trace!("performing query on abci path {abci_path}")
tracing::trace!("performing query on abci path {abci_path}")
}
let mut buf = Vec::with_capacity(req.encoded_len());
req.encode(&mut buf)?;
@@ -297,7 +296,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
let start = Instant::now();
loop {
log::debug!(
tracing::debug!(
"Polling for result of including {} in a block...",
broadcasted.hash
);
@@ -522,7 +521,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QuerySmartContractStateResponse>(path, req)
.await?;
trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
tracing::trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
Ok(serde_json::from_slice(&res.data)?)
}
@@ -25,12 +25,12 @@ use cosmrs::proto::cosmos::tx::signing::v1beta1::SignMode;
use cosmrs::staking::{MsgDelegate, MsgUndelegate};
use cosmrs::tx::{self, Msg};
use cosmrs::{cosmwasm, AccountId, Any, Tx};
use log::debug;
use serde::Serialize;
use sha2::Digest;
use sha2::Sha256;
use std::time::SystemTime;
use tendermint_rpc::endpoint::broadcast;
use tracing::debug;
fn empty_fee() -> tx::Fee {
tx::Fee {
@@ -7,9 +7,9 @@ use base64::Engine;
use cosmrs::abci::TxMsgData;
use cosmrs::cosmwasm::MsgExecuteContractResponse;
use cosmrs::proto::cosmos::base::query::v1beta1::{PageRequest, PageResponse};
use log::error;
use prost::bytes::Bytes;
use tendermint_rpc::endpoint::broadcast;
use tracing::error;
pub use cosmrs::abci::MsgResponse;
+6 -1
View File
@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::time::Duration;
use thiserror::Error;
use tracing::warn;
use tracing::{instrument, warn};
use url::Url;
pub use reqwest::IntoUrl;
@@ -202,6 +202,7 @@ impl Client {
self.reqwest_client.get(url)
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
@@ -212,6 +213,7 @@ impl Client {
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending GET request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
@@ -277,6 +279,7 @@ impl Client {
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_json<T, K, V, E>(
&self,
path: PathSegments<'_>,
@@ -512,12 +515,14 @@ pub fn sanitize_url<K: AsRef<str>, V: AsRef<str>>(
url
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn parse_response<T, E>(res: Response, allow_empty: bool) -> Result<T, HttpClientError<E>>
where
T: DeserializeOwned,
E: DeserializeOwned + Display,
{
let status = res.status();
tracing::debug!("Status: {} (success: {})", &status, status.is_success());
if !allow_empty {
if let Some(0) = res.content_length() {
+3 -3
View File
@@ -21,8 +21,8 @@ COCONUT_DKG_CONTRACT_ADDRESS=n19604yflqggs9mk2z26mqygq43q2kr3n932egxx630svywd5mp
REWARDING_VALIDATOR_ADDRESS=n10yyd98e2tuwu0f7ypz9dy3hhjw7v772q6287gy
STATISTICS_SERVICE_DOMAIN_ADDRESS="https://mainnet-stats.nymte.ch:8090"
NYXD="https://rpc.nymtech.net"
NYM_API="https://validator.nymtech.net/api/"
NYXD=https://rpc.nymtech.net
NYM_API=https://validator.nymtech.net/api/
NYXD_WS="wss://rpc.nymtech.net/websocket"
EXPLORER_API="https://explorer.nymtech.net/api/"
EXPLORER_API=https://explorer.nymtech.net/api/
NYM_VPN_API="https://nymvpn.com/api"
+1 -1
View File
@@ -7,12 +7,12 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log.workspace = true
nym-explorer-api-requests = { path = "../explorer-api-requests" }
reqwest = { workspace = true, features = ["json"] }
serde.workspace = true
thiserror.workspace = true
url.workspace = true
tracing = {workspace = true, features = ["attributes"]}
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
+10 -2
View File
@@ -3,6 +3,7 @@ use std::time::Duration;
use reqwest::StatusCode;
use thiserror::Error;
use tracing::instrument;
use url::Url;
// Re-export request types
@@ -47,6 +48,12 @@ impl ExplorerClient {
Ok(Self { client, url })
}
#[cfg(not(target_arch = "wasm32"))]
pub fn new_with_timeout(url: url::Url, timeout: Duration) -> Result<Self, ExplorerApiError> {
let client = reqwest::Client::builder().timeout(timeout).build()?;
Ok(Self { client, url })
}
#[cfg(target_arch = "wasm32")]
pub fn new(url: url::Url) -> Result<Self, ExplorerApiError> {
let client = reqwest::Client::builder().build()?;
@@ -58,10 +65,11 @@ impl ExplorerClient {
paths: &[&str],
) -> Result<reqwest::Response, ExplorerApiError> {
let url = combine_url(self.url.clone(), paths)?;
log::trace!("Sending GET request {url:?}");
tracing::debug!("Sending GET request");
Ok(self.client.get(url).send().await?)
}
#[instrument(level = "trace", skip_all, fields(paths=?paths))]
async fn query_explorer_api<T>(&self, paths: &[&str]) -> Result<T, ExplorerApiError>
where
T: std::fmt::Debug,
@@ -70,7 +78,7 @@ impl ExplorerClient {
let response = self.send_get_request(paths).await?;
if response.status().is_success() {
let res = response.json::<T>().await?;
log::trace!("Got response: {res:?}");
tracing::trace!("Got response: {res:?}");
Ok(res)
} else if response.status() == StatusCode::NOT_FOUND {
Err(ExplorerApiError::NotFound)
+2
View File
@@ -0,0 +1,2 @@
data/
enter_db.sh
+62
View File
@@ -0,0 +1,62 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
[package]
name = "nym-node-status-api"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
[dependencies]
anyhow = { workspace = true }
axum = { workspace = true, features = ["tokio"] }
chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
cosmwasm-std = { workspace = true }
envy = { workspace = true }
futures-util = { workspace = true }
moka = { workspace = true, features = ["future"] }
nym-bin-common = { path = "../common/bin-common" }
nym-explorer-client = { path = "../explorer-api/explorer-client" }
# TODO dz: ref before Nym API client changes. Update to latest develop once new Nym API is live
nym-network-defaults = { git = "https://github.com/nymtech/nym", rev = "f86e08866" }
nym-validator-client = { git = "https://github.com/nymtech/nym", rev = "f86e08866" }
# nym-network-defaults = { path = "../common/network-defaults" }
# nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-task = { path = "../common/task" }
nym-node-requests = { git = "https://github.com/nymtech/nym", rev = "f86e08866" }
# nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_json_path = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing-log = { workspace = true }
tower-http = { workspace = true, features = ["cors", "trace"] }
utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
# TODO dz `cargo update async-trait`
# for automatic schema detection, which was merged, but not released yet
# https://github.com/ProbablyClem/utoipauto/pull/38
# utoipauto = { git = "https://github.com/ProbablyClem/utoipauto", rev = "eb04cba" }
utoipauto = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
+15
View File
@@ -0,0 +1,15 @@
FROM rust:latest AS builder
COPY ./ /usr/src/nym
WORKDIR /usr/src/nym/nym-node-status-api
RUN cargo build --release
FROM ubuntu:24.04
RUN apt-get update && apt-get install -y ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/target/release/nym-node-status-api ./
ENTRYPOINT [ "/nym/nym-node-status-api" ]
+46
View File
@@ -0,0 +1,46 @@
use anyhow::{anyhow, Result};
use sqlx::{Connection, SqliteConnection};
use tokio::{fs::File, io::AsyncWriteExt};
const SQLITE_DB_FILENAME: &str = "nym-node-status-api.sqlite";
/// If you need to re-run migrations or reset the db, just run
/// cargo clean -p nym-node-status-api
#[tokio::main]
async fn main() -> Result<()> {
let out_dir = read_env_var("OUT_DIR")?;
let database_path = format!("sqlite://{}/{}?mode=rwc", out_dir, SQLITE_DB_FILENAME);
write_db_path_to_file(&out_dir, SQLITE_DB_FILENAME).await?;
let mut conn = SqliteConnection::connect(&database_path).await?;
sqlx::migrate!("./migrations").run(&mut conn).await?;
#[cfg(target_family = "unix")]
println!("cargo::rustc-env=DATABASE_URL=sqlite://{}", &database_path);
#[cfg(target_family = "windows")]
// for some strange reason we need to add a leading `/` to the windows path even though it's
// not a valid windows path... but hey, it works...
println!("cargo::rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
rerun_if_changed();
Ok(())
}
fn read_env_var(var: &str) -> Result<String> {
std::env::var(var).map_err(|_| anyhow!("You need to set {} env var", var))
}
fn rerun_if_changed() {
println!("cargo::rerun-if-changed=migrations");
println!("cargo::rerun-if-changed=src/db/queries");
}
/// use `./enter_db.sh` to inspect DB
async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Result<()> {
let mut file = File::create("enter_db.sh").await?;
let _ = file.write(b"#!/bin/bash\n").await?;
file.write_all(format!("sqlite3 {}/{}", out_dir, db_filename).as_bytes())
.await
.map_err(From::from)
}
+18
View File
@@ -0,0 +1,18 @@
#!/bin/bash
set -e
export RUST_LOG=${RUST_LOG:-debug}
export NYM_API_CLIENT_TIMEOUT=60
export EXPLORER_CLIENT_TIMEOUT=60
#export NYXD=https://rpc.nymtech.net
#export NYM_API=https://validator.nymtech.net/api/
#export EXPLORER_API=https://explorer.nymtech.net/api/
#export NETWORK_NAME=mainnet
#cargo run --package nym-node-status-api --release -- --connection-url "sqlite://node-status-api.sqlite?mode=rwc"
cd ..
docker build -t node-status-api -f nym-node-status-api/Dockerfile .
docker run --env-file envs/mainnet.env -e NYM_NODE_STATUS_API_CONNECTION_URL="sqlite://node-status-api.sqlite?mode=rwc" node-status-api
+100
View File
@@ -0,0 +1,100 @@
CREATE TABLE gateways
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_identity_key VARCHAR NOT NULL UNIQUE,
self_described VARCHAR,
explorer_pretty_bond VARCHAR,
last_probe_result VARCHAR,
last_probe_log VARCHAR,
config_score INTEGER NOT NULL DEFAULT (0),
config_score_successes REAL NOT NULL DEFAULT (0),
config_score_samples REAL NOT NULL DEFAULT (0),
routing_score INTEGER NOT NULL DEFAULT (0),
routing_score_successes REAL NOT NULL DEFAULT (0),
routing_score_samples REAL NOT NULL DEFAULT (0),
test_run_samples REAL NOT NULL DEFAULT (0),
last_testrun_utc INTEGER,
last_updated_utc INTEGER NOT NULL,
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
blacklisted INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
performance INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX idx_gateway_description_gateway_identity_key ON gateways (gateway_identity_key);
CREATE TABLE mixnodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
identity_key VARCHAR NOT NULL UNIQUE,
mix_id INTEGER NOT NULL UNIQUE,
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
total_stake INTEGER NOT NULL,
host VARCHAR NOT NULL,
http_api_port INTEGER NOT NULL,
blacklisted INTEGER CHECK (blacklisted in (0, 1)) NOT NULL DEFAULT 0,
full_details VARCHAR,
self_described VARCHAR,
last_updated_utc INTEGER NOT NULL
, is_dp_delegatee INTEGER CHECK (is_dp_delegatee IN (0, 1)) NOT NULL DEFAULT 0);
CREATE INDEX idx_mixnodes_mix_id ON mixnodes (mix_id);
CREATE INDEX idx_mixnodes_identity_key ON mixnodes (identity_key);
CREATE TABLE
mixnode_description (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mix_id INTEGER UNIQUE NOT NULL,
moniker VARCHAR,
website VARCHAR,
security_contact VARCHAR,
details VARCHAR,
last_updated_utc INTEGER NOT NULL,
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id)
);
-- Indexes for description table
CREATE INDEX idx_mixnode_description_mix_id ON mixnode_description (mix_id);
CREATE TABLE summary
(
key VARCHAR PRIMARY KEY,
value_json VARCHAR,
last_updated_utc INTEGER NOT NULL
);
CREATE TABLE summary_history
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
date VARCHAR UNIQUE NOT NULL,
timestamp_utc INTEGER NOT NULL,
value_json VARCHAR
);
CREATE INDEX idx_summary_history_timestamp_utc ON summary_history (timestamp_utc);
CREATE INDEX idx_summary_history_date ON summary_history (date);
CREATE TABLE gateway_description (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_identity_key VARCHAR UNIQUE NOT NULL,
moniker VARCHAR,
website VARCHAR,
security_contact VARCHAR,
details VARCHAR,
last_updated_utc INTEGER NOT NULL,
FOREIGN KEY (gateway_identity_key) REFERENCES gateways (gateway_identity_key)
);
CREATE TABLE
mixnode_daily_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mix_id INTEGER NOT NULL,
total_stake BIGINT NOT NULL,
date_utc VARCHAR NOT NULL,
packets_received INTEGER DEFAULT 0,
packets_sent INTEGER DEFAULT 0,
packets_dropped INTEGER DEFAULT 0,
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id),
UNIQUE (mix_id, date_utc) -- This constraint automatically creates an index
);
+69
View File
@@ -0,0 +1,69 @@
use clap::Parser;
use nym_bin_common::bin_info;
use reqwest::Url;
use std::{sync::OnceLock, time::Duration};
// Helper for passing LONG_VERSION to clap
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
#[derive(Clone, Debug, Parser)]
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
pub(crate) struct Cli {
/// Network name for the network to which we're connecting.
#[clap(long, env = "NETWORK_NAME")]
pub(crate) network_name: String,
/// Explorer api url.
#[clap(short, long, env = "EXPLORER_API")]
pub(crate) explorer_api: String,
/// Nym api url.
#[clap(short, long, env = "NYM_API")]
pub(crate) nym_api: String,
/// TTL for the http cache.
#[clap(
long,
default_value_t = 30,
env = "NYM_NODE_STATUS_API_NYM_HTTP_CACHE_TTL"
)]
pub(crate) nym_http_cache_ttl: u64,
/// HTTP port on which to run node status api.
#[clap(long, default_value_t = 8000, env = "NYM_NODE_STATUS_API_HTTP_PORT")]
pub(crate) http_port: u16,
/// Nyxd address.
#[clap(long, env = "NYXD")]
pub(crate) nyxd_addr: Url,
/// Nym api client timeout.
#[clap(
long,
default_value = "15",
env = "NYM_NODE_STATUS_API_NYM_API_CLIENT_TIMEOUT"
)]
#[arg(value_parser = parse_duration)]
pub(crate) nym_api_client_timeout: Duration,
/// Explorer api client timeout.
#[clap(
long,
default_value = "15",
env = "NYM_NODE_STATUS_API_EXPLORER_CLIENT_TIMEOUT"
)]
#[arg(value_parser = parse_duration)]
pub(crate) explorer_client_timeout: Duration,
/// Connection url for the database.
#[clap(long, env = "NYM_NODE_STATUS_API_CONNECTION_URL")]
pub(crate) connection_url: String,
}
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
let seconds = arg.parse()?;
Ok(std::time::Duration::from_secs(seconds))
}
+39
View File
@@ -0,0 +1,39 @@
use std::str::FromStr;
use anyhow::{anyhow, Result};
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
pub(crate) mod models;
pub(crate) mod queries;
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
pub(crate) type DbPool = SqlitePool;
pub(crate) struct Storage {
pool: DbPool,
}
impl Storage {
pub async fn init(connection_url: String) -> Result<Self> {
let connect_options = {
let connect_options = SqliteConnectOptions::from_str(&connection_url)?;
let mut connect_options = connect_options.create_if_missing(true);
let connect_options = connect_options.disable_statement_logging();
(*connect_options).clone()
};
let pool = sqlx::SqlitePool::connect_with(connect_options)
.await
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
MIGRATOR.run(&pool).await?;
Ok(Storage { pool })
}
/// Cloning pool is cheap, it's the same underlying set of connections
pub async fn pool_owned(&self) -> DbPool {
self.pool.clone()
}
}
+300
View File
@@ -0,0 +1,300 @@
use crate::{
http::{self, models::SummaryHistory},
monitor::NumericalCheckedCast,
};
use nym_node_requests::api::v1::node::models::NodeDescription;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
pub(crate) struct GatewayRecord {
pub(crate) identity_key: String,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) self_described: Option<String>,
pub(crate) explorer_pretty_bond: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) performance: u8,
}
#[derive(Debug, Clone)]
pub(crate) struct GatewayDto {
pub(crate) gateway_identity_key: String,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) performance: i64,
pub(crate) self_described: Option<String>,
pub(crate) explorer_pretty_bond: Option<String>,
pub(crate) last_probe_result: Option<String>,
pub(crate) last_probe_log: Option<String>,
pub(crate) last_testrun_utc: Option<i64>,
pub(crate) last_updated_utc: i64,
pub(crate) moniker: String,
pub(crate) security_contact: String,
pub(crate) details: String,
pub(crate) website: String,
}
impl TryFrom<GatewayDto> for http::models::Gateway {
type Error = anyhow::Error;
fn try_from(value: GatewayDto) -> Result<Self, Self::Error> {
// Instead of using routing_score_successes / routing_score_samples, we use the
// number of successful testruns in the last 24h.
let routing_score = 0f32;
let config_score = 0u32;
let last_updated_utc =
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
let last_testrun_utc = value
.last_testrun_utc
.and_then(|i| i.cast_checked().ok())
.map(|t| timestamp_as_utc(t).to_rfc3339());
let self_described = value.self_described.clone().unwrap_or("null".to_string());
let explorer_pretty_bond = value
.explorer_pretty_bond
.clone()
.unwrap_or("null".to_string());
let last_probe_result = value
.last_probe_result
.clone()
.unwrap_or("null".to_string());
let last_probe_log = value.last_probe_log.clone();
let self_described = serde_json::from_str(&self_described).unwrap_or(None);
let explorer_pretty_bond = serde_json::from_str(&explorer_pretty_bond).unwrap_or(None);
let last_probe_result = serde_json::from_str(&last_probe_result).unwrap_or(None);
let bonded = value.bonded;
let blacklisted = value.blacklisted;
let performance = value.performance as u8;
let description = NodeDescription {
moniker: value.moniker.clone(),
website: value.website.clone(),
security_contact: value.security_contact.clone(),
details: value.details.clone(),
};
Ok(http::models::Gateway {
gateway_identity_key: value.gateway_identity_key.clone(),
bonded,
blacklisted,
performance,
self_described,
explorer_pretty_bond,
description,
last_probe_result,
last_probe_log,
routing_score,
config_score,
last_testrun_utc,
last_updated_utc,
})
}
}
fn timestamp_as_utc(unix_timestamp: u64) -> chrono::DateTime<chrono::Utc> {
let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp);
d.into()
}
pub(crate) struct MixnodeRecord {
pub(crate) mix_id: u32,
pub(crate) identity_key: String,
pub(crate) bonded: bool,
pub(crate) total_stake: i64,
pub(crate) host: String,
pub(crate) http_port: u16,
pub(crate) blacklisted: bool,
pub(crate) full_details: String,
pub(crate) self_described: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) is_dp_delegatee: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct MixnodeDto {
pub(crate) mix_id: i64,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) is_dp_delegatee: bool,
pub(crate) total_stake: i64,
pub(crate) full_details: String,
pub(crate) self_described: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) moniker: String,
pub(crate) website: String,
pub(crate) security_contact: String,
pub(crate) details: String,
}
impl TryFrom<MixnodeDto> for http::models::Mixnode {
type Error = anyhow::Error;
fn try_from(value: MixnodeDto) -> Result<Self, Self::Error> {
let mix_id = value.mix_id.cast_checked()?;
let full_details = value.full_details.clone();
let full_details = serde_json::from_str(&full_details).unwrap_or(None);
let self_described = value
.self_described
.clone()
.map(|v| serde_json::from_str(&v).unwrap_or(serde_json::Value::Null));
let last_updated_utc =
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
let blacklisted = value.blacklisted;
let is_dp_delegatee = value.is_dp_delegatee;
let moniker = value.moniker.clone();
let website = value.website.clone();
let security_contact = value.security_contact.clone();
let details = value.details.clone();
Ok(http::models::Mixnode {
mix_id,
bonded: value.bonded,
blacklisted,
is_dp_delegatee,
total_stake: value.total_stake,
full_details,
description: NodeDescription {
moniker,
website,
security_contact,
details,
},
self_described,
last_updated_utc,
})
}
}
#[allow(unused)]
#[derive(Debug, Clone)]
pub(crate) struct BondedStatusDto {
pub(crate) id: i64,
pub(crate) identity_key: String,
pub(crate) bonded: bool,
}
#[allow(unused)]
#[derive(Debug, Clone, Default)]
pub(crate) struct SummaryDto {
pub(crate) key: String,
pub(crate) value_json: String,
pub(crate) last_updated_utc: i64,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SummaryHistoryDto {
#[allow(dead_code)]
pub id: i64,
pub date: String,
pub value_json: String,
pub timestamp_utc: i64,
}
impl TryFrom<SummaryHistoryDto> for SummaryHistory {
type Error = anyhow::Error;
fn try_from(value: SummaryHistoryDto) -> Result<Self, Self::Error> {
let value_json = serde_json::from_str(&value.value_json).unwrap_or_default();
Ok(SummaryHistory {
value_json,
date: value.date.clone(),
timestamp_utc: timestamp_as_utc(value.timestamp_utc.cast_checked()?).to_rfc3339(),
})
}
}
pub(crate) const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count";
pub(crate) const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active";
pub(crate) const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive";
pub(crate) const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve";
pub(crate) const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count";
pub(crate) const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count";
pub(crate) const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count";
pub(crate) const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count";
pub(crate) const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count";
pub(crate) const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count";
// `utoipa`` goes crazy if you use module-qualified prefix as field type so we
// have to import it
use gateway::GatewaySummary;
use mixnode::MixnodeSummary;
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct NetworkSummary {
pub(crate) mixnodes: MixnodeSummary,
pub(crate) gateways: GatewaySummary,
}
pub(crate) mod mixnode {
use super::*;
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummary {
pub(crate) bonded: MixnodeSummaryBonded,
pub(crate) blacklisted: MixnodeSummaryBlacklisted,
pub(crate) historical: MixnodeSummaryHistorical,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummaryBonded {
pub(crate) count: i32,
pub(crate) active: i32,
pub(crate) inactive: i32,
pub(crate) reserve: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummaryBlacklisted {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummaryHistorical {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
}
pub(crate) mod gateway {
use super::*;
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummary {
pub(crate) bonded: GatewaySummaryBonded,
pub(crate) blacklisted: GatewaySummaryBlacklisted,
pub(crate) historical: GatewaySummaryHistorical,
pub(crate) explorer: GatewaySummaryExplorer,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryExplorer {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryBonded {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryHistorical {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryBlacklisted {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
}
@@ -0,0 +1,160 @@
use crate::{
db::{
models::{BondedStatusDto, GatewayDto, GatewayRecord},
DbPool,
},
http::models::Gateway,
};
use futures_util::TryStreamExt;
use nym_validator_client::models::DescribedGateway;
use tracing::error;
pub(crate) async fn insert_gateways(
pool: &DbPool,
gateways: Vec<GatewayRecord>,
) -> anyhow::Result<()> {
let mut db = pool.acquire().await?;
for record in gateways {
sqlx::query!(
"INSERT INTO gateways
(gateway_identity_key, bonded, blacklisted,
self_described, explorer_pretty_bond,
last_updated_utc, performance)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(gateway_identity_key) DO UPDATE SET
bonded=excluded.bonded,
blacklisted=excluded.blacklisted,
self_described=excluded.self_described,
explorer_pretty_bond=excluded.explorer_pretty_bond,
last_updated_utc=excluded.last_updated_utc,
performance = excluded.performance;",
record.identity_key,
record.bonded,
record.blacklisted,
record.self_described,
record.explorer_pretty_bond,
record.last_updated_utc,
record.performance
)
.execute(&mut *db)
.await?;
}
Ok(())
}
pub(crate) async fn write_blacklisted_gateways_to_db<'a, I>(
pool: &DbPool,
gateways: I,
) -> anyhow::Result<()>
where
I: Iterator<Item = &'a String>,
{
let mut conn = pool.acquire().await?;
for gateway_identity_key in gateways {
sqlx::query!(
"UPDATE gateways
SET blacklisted = true
WHERE gateway_identity_key = ?;",
gateway_identity_key,
)
.execute(&mut *conn)
.await?;
}
Ok(())
}
/// Ensure all gateways that are set as bonded, are still bonded
pub(crate) async fn ensure_gateways_still_bonded(
pool: &DbPool,
gateways: &[DescribedGateway],
) -> anyhow::Result<usize> {
let bonded_gateways_rows = get_all_bonded_gateways_row_ids_by_status(pool, true).await?;
let unbonded_gateways_rows = bonded_gateways_rows.iter().filter(|v| {
!gateways
.iter()
.any(|bonded| *bonded.bond.identity() == v.identity_key)
});
let recently_unbonded_gateways = unbonded_gateways_rows.to_owned().count();
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let mut transaction = pool.begin().await?;
for row in unbonded_gateways_rows {
sqlx::query!(
"UPDATE gateways
SET bonded = ?, last_updated_utc = ?
WHERE id = ?;",
false,
last_updated_utc,
row.id,
)
.execute(&mut *transaction)
.await?;
}
transaction.commit().await?;
Ok(recently_unbonded_gateways)
}
async fn get_all_bonded_gateways_row_ids_by_status(
pool: &DbPool,
status: bool,
) -> anyhow::Result<Vec<BondedStatusDto>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
BondedStatusDto,
r#"SELECT
id as "id!",
gateway_identity_key as "identity_key!",
bonded as "bonded: bool"
FROM gateways
WHERE bonded = ?"#,
status,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
Ok(items)
}
pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gateway>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
GatewayDto,
r#"SELECT
gw.gateway_identity_key as "gateway_identity_key!",
gw.bonded as "bonded: bool",
gw.blacklisted as "blacklisted: bool",
gw.performance as "performance!",
gw.self_described as "self_described?",
gw.explorer_pretty_bond as "explorer_pretty_bond?",
gw.last_probe_result as "last_probe_result?",
gw.last_probe_log as "last_probe_log?",
gw.last_testrun_utc as "last_testrun_utc?",
gw.last_updated_utc as "last_updated_utc!",
COALESCE(gd.moniker, "NA") as "moniker!",
COALESCE(gd.website, "NA") as "website!",
COALESCE(gd.security_contact, "NA") as "security_contact!",
COALESCE(gd.details, "NA") as "details!"
FROM gateways gw
LEFT JOIN gateway_description gd
ON gw.gateway_identity_key = gd.gateway_identity_key
ORDER BY gw.gateway_identity_key"#,
)
.fetch(&mut conn)
.try_collect::<Vec<_>>()
.await?;
let items: Vec<Gateway> = items
.into_iter()
.map(|item| item.try_into())
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| {
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
e
})?;
tracing::trace!("Fetched {} gateways from DB", items.len());
Ok(items)
}
@@ -0,0 +1,88 @@
use crate::db::{models::NetworkSummary, DbPool};
use chrono::{DateTime, Utc};
/// take `last_updated` instead of calculating it so that `summary` matches
/// `daily_summary`
pub(crate) async fn insert_summaries(
pool: &DbPool,
summaries: &[(&str, &usize)],
summary: &NetworkSummary,
last_updated: DateTime<Utc>,
) -> anyhow::Result<()> {
insert_summary(pool, summaries, last_updated).await?;
insert_summary_history(pool, summary, last_updated).await?;
Ok(())
}
async fn insert_summary(
pool: &DbPool,
summaries: &[(&str, &usize)],
last_updated: DateTime<Utc>,
) -> anyhow::Result<()> {
let timestamp = last_updated.timestamp();
let mut tx = pool.begin().await?;
for (kind, value) in summaries {
let value = value.to_string();
sqlx::query!(
"INSERT INTO summary
(key, value_json, last_updated_utc)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value_json=excluded.value_json,
last_updated_utc=excluded.last_updated_utc;",
kind,
value,
timestamp
)
.execute(&mut tx)
.await
.map_err(|err| {
tracing::error!("Failed to insert data for {kind}: {err}, aborting transaction",);
err
})?;
}
tx.commit().await?;
Ok(())
}
/// For `<date_N>`, `summary_history` is updated with fresh data on every
/// iteration.
///
/// After UTC midnight, summary is inserted for `<date_N+1>` and last entry for
/// `<date_N>` stays there forever.
///
/// This is not aggregate data, it's a set of latest data points
async fn insert_summary_history(
pool: &DbPool,
summary: &NetworkSummary,
last_updated: DateTime<Utc>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
let value_json = serde_json::to_string(&summary)?;
let timestamp = last_updated.timestamp();
let now_rfc3339 = last_updated.to_rfc3339();
// YYYY-MM-DD, without time
let date = &now_rfc3339[..10];
sqlx::query!(
"INSERT INTO summary_history
(date, timestamp_utc, value_json)
VALUES (?, ?, ?)
ON CONFLICT(date) DO UPDATE SET
timestamp_utc=excluded.timestamp_utc,
value_json=excluded.value_json;",
date,
timestamp,
value_json
)
.execute(&mut *conn)
.await?;
Ok(())
}
@@ -0,0 +1,177 @@
use futures_util::TryStreamExt;
use nym_validator_client::models::MixNodeBondAnnotated;
use tracing::error;
use crate::{
db::{
models::{BondedStatusDto, MixnodeDto, MixnodeRecord},
DbPool,
},
http::models::{DailyStats, Mixnode},
};
pub(crate) async fn insert_mixnodes(
pool: &DbPool,
mixnodes: Vec<MixnodeRecord>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
for record in mixnodes.iter() {
// https://www.sqlite.org/lang_upsert.html
sqlx::query!(
"INSERT INTO mixnodes
(mix_id, identity_key, bonded, total_stake,
host, http_api_port, blacklisted, full_details,
self_described, last_updated_utc, is_dp_delegatee)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(mix_id) DO UPDATE SET
bonded=excluded.bonded,
total_stake=excluded.total_stake, host=excluded.host,
http_api_port=excluded.http_api_port,blacklisted=excluded.blacklisted,
full_details=excluded.full_details,self_described=excluded.self_described,
last_updated_utc=excluded.last_updated_utc,
is_dp_delegatee = excluded.is_dp_delegatee;",
record.mix_id,
record.identity_key,
record.bonded,
record.total_stake,
record.host,
record.http_port,
record.blacklisted,
record.full_details,
record.self_described,
record.last_updated_utc,
record.is_dp_delegatee
)
.execute(&mut *conn)
.await?;
}
Ok(())
}
pub(crate) async fn get_all_mixnodes(pool: &DbPool) -> anyhow::Result<Vec<Mixnode>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
MixnodeDto,
r#"SELECT
mn.mix_id as "mix_id!",
mn.bonded as "bonded: bool",
mn.blacklisted as "blacklisted: bool",
mn.is_dp_delegatee as "is_dp_delegatee: bool",
mn.total_stake as "total_stake!",
mn.full_details as "full_details!",
mn.self_described as "self_described",
mn.last_updated_utc as "last_updated_utc!",
COALESCE(md.moniker, "NA") as "moniker!",
COALESCE(md.website, "NA") as "website!",
COALESCE(md.security_contact, "NA") as "security_contact!",
COALESCE(md.details, "NA") as "details!"
FROM mixnodes mn
LEFT JOIN mixnode_description md ON mn.mix_id = md.mix_id
ORDER BY mn.mix_id"#
)
.fetch(&mut conn)
.try_collect::<Vec<_>>()
.await?;
let items = items
.into_iter()
.map(|item| item.try_into())
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| {
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
e
})?;
Ok(items)
}
/// We fetch the latest 30 days of data as a subquery and then
/// return it in ascending order, so we don't break existing UI
pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailyStats>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
DailyStats,
r#"
SELECT
date_utc as "date_utc!",
packets_received as "total_packets_received!: i64",
packets_sent as "total_packets_sent!: i64",
packets_dropped as "total_packets_dropped!: i64",
total_stake as "total_stake!: i64"
FROM (
SELECT
date_utc,
SUM(packets_received) as packets_received,
SUM(packets_sent) as packets_sent,
SUM(packets_dropped) as packets_dropped,
SUM(total_stake) as total_stake
FROM mixnode_daily_stats
GROUP BY date_utc
ORDER BY date_utc DESC
LIMIT 30
)
GROUP BY date_utc
ORDER BY date_utc
"#
)
.fetch(&mut conn)
.try_collect::<Vec<DailyStats>>()
.await?;
Ok(items)
}
/// Ensure all mixnodes that are set as bonded, are still bonded
pub(crate) async fn ensure_mixnodes_still_bonded(
pool: &DbPool,
mixnodes: &[MixNodeBondAnnotated],
) -> anyhow::Result<usize> {
let bonded_mixnodes_rows = get_all_bonded_mixnodes_row_ids_by_status(pool, true).await?;
let unbonded_mixnodes_rows = bonded_mixnodes_rows.iter().filter(|v| {
!mixnodes
.iter()
.any(|bonded| *bonded.mixnode_details.bond_information.identity() == v.identity_key)
});
let recently_unbonded_mixnodes = unbonded_mixnodes_rows.to_owned().count();
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let mut transaction = pool.begin().await?;
for row in unbonded_mixnodes_rows {
sqlx::query!(
"UPDATE mixnodes
SET bonded = ?, last_updated_utc = ?
WHERE id = ?;",
false,
last_updated_utc,
row.id,
)
.execute(&mut *transaction)
.await?;
}
transaction.commit().await?;
Ok(recently_unbonded_mixnodes)
}
async fn get_all_bonded_mixnodes_row_ids_by_status(
pool: &DbPool,
status: bool,
) -> anyhow::Result<Vec<BondedStatusDto>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
BondedStatusDto,
r#"SELECT
id as "id!",
identity_key as "identity_key!",
bonded as "bonded: bool"
FROM mixnodes
WHERE bonded = ?"#,
status,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
Ok(items)
}
+14
View File
@@ -0,0 +1,14 @@
mod gateways;
mod misc;
mod mixnodes;
mod summary;
pub(crate) use gateways::{
ensure_gateways_still_bonded, get_all_gateways, insert_gateways,
write_blacklisted_gateways_to_db,
};
pub(crate) use misc::insert_summaries;
pub(crate) use mixnodes::{
ensure_mixnodes_still_bonded, get_all_mixnodes, get_daily_stats, insert_mixnodes,
};
pub(crate) use summary::{get_summary, get_summary_history};
@@ -0,0 +1,209 @@
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use std::collections::HashMap;
use tracing::error;
use crate::{
db::{
models::{
gateway::{
GatewaySummary, GatewaySummaryBlacklisted, GatewaySummaryBonded,
GatewaySummaryExplorer, GatewaySummaryHistorical,
},
mixnode::{
MixnodeSummary, MixnodeSummaryBlacklisted, MixnodeSummaryBonded,
MixnodeSummaryHistorical,
},
NetworkSummary, SummaryDto, SummaryHistoryDto,
},
DbPool,
},
http::{
error::{HttpError, HttpResult},
models::SummaryHistory,
},
};
pub(crate) async fn get_summary_history(pool: &DbPool) -> anyhow::Result<Vec<SummaryHistory>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
SummaryHistoryDto,
r#"SELECT
id as "id!",
date as "date!",
timestamp_utc as "timestamp_utc!",
value_json as "value_json!"
FROM summary_history
ORDER BY date DESC
LIMIT 30"#,
)
.fetch(&mut conn)
.try_collect::<Vec<_>>()
.await?;
let items = items
.into_iter()
.map(|item| item.try_into())
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| {
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
e
})?;
Ok(items)
}
async fn get_summary_dto(pool: &DbPool) -> anyhow::Result<Vec<SummaryDto>> {
let mut conn = pool.acquire().await?;
Ok(sqlx::query_as!(
SummaryDto,
r#"SELECT
key as "key!",
value_json as "value_json!",
last_updated_utc as "last_updated_utc!"
FROM summary"#
)
.fetch(&mut conn)
.try_collect::<Vec<_>>()
.await?)
}
pub(crate) async fn get_summary(pool: &DbPool) -> HttpResult<NetworkSummary> {
let items = get_summary_dto(pool).await.map_err(|err| {
tracing::error!("Couldn't get Summary from DB: {err}");
HttpError::internal()
})?;
from_summary_dto(items).await
}
async fn from_summary_dto(items: Vec<SummaryDto>) -> HttpResult<NetworkSummary> {
const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count";
const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active";
const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive";
const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve";
const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count";
const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count";
const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count";
const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count";
const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count";
const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count";
// convert database rows into a map by key
let mut map = HashMap::new();
for item in items {
map.insert(item.key.clone(), item);
}
// check we have all the keys we are expecting, and build up a map of errors for missing one
let keys = [
GATEWAYS_BONDED_COUNT,
GATEWAYS_EXPLORER_COUNT,
GATEWAYS_HISTORICAL_COUNT,
GATEWAYS_BLACKLISTED_COUNT,
MIXNODES_BLACKLISTED_COUNT,
MIXNODES_BONDED_ACTIVE,
MIXNODES_BONDED_COUNT,
MIXNODES_BONDED_INACTIVE,
MIXNODES_BONDED_RESERVE,
MIXNODES_HISTORICAL_COUNT,
];
let mut errors: Vec<&str> = vec![];
for key in keys {
if !map.contains_key(key) {
errors.push(key);
}
}
// return an error if anything is missing, with a nice list
if !errors.is_empty() {
tracing::error!("Summary value missing: {}", errors.join(", "));
return Err(HttpError::internal());
}
// strip the options and use default values (anything missing is trapped above)
let mixnodes_bonded_count: SummaryDto =
map.get(MIXNODES_BONDED_COUNT).cloned().unwrap_or_default();
let mixnodes_bonded_active: SummaryDto =
map.get(MIXNODES_BONDED_ACTIVE).cloned().unwrap_or_default();
let mixnodes_bonded_inactive: SummaryDto = map
.get(MIXNODES_BONDED_INACTIVE)
.cloned()
.unwrap_or_default();
let mixnodes_bonded_reserve: SummaryDto = map
.get(MIXNODES_BONDED_RESERVE)
.cloned()
.unwrap_or_default();
let mixnodes_blacklisted_count: SummaryDto = map
.get(MIXNODES_BLACKLISTED_COUNT)
.cloned()
.unwrap_or_default();
let gateways_bonded_count: SummaryDto =
map.get(GATEWAYS_BONDED_COUNT).cloned().unwrap_or_default();
let gateways_explorer_count: SummaryDto = map
.get(GATEWAYS_EXPLORER_COUNT)
.cloned()
.unwrap_or_default();
let mixnodes_historical_count: SummaryDto = map
.get(MIXNODES_HISTORICAL_COUNT)
.cloned()
.unwrap_or_default();
let gateways_historical_count: SummaryDto = map
.get(GATEWAYS_HISTORICAL_COUNT)
.cloned()
.unwrap_or_default();
let gateways_blacklisted_count: SummaryDto = map
.get(GATEWAYS_BLACKLISTED_COUNT)
.cloned()
.unwrap_or_default();
Ok(NetworkSummary {
mixnodes: MixnodeSummary {
bonded: MixnodeSummaryBonded {
count: to_count_i32(&mixnodes_bonded_count),
active: to_count_i32(&mixnodes_bonded_active),
reserve: to_count_i32(&mixnodes_bonded_reserve),
inactive: to_count_i32(&mixnodes_bonded_inactive),
last_updated_utc: to_timestamp(&mixnodes_bonded_count),
},
blacklisted: MixnodeSummaryBlacklisted {
count: to_count_i32(&mixnodes_blacklisted_count),
last_updated_utc: to_timestamp(&mixnodes_blacklisted_count),
},
historical: MixnodeSummaryHistorical {
count: to_count_i32(&mixnodes_historical_count),
last_updated_utc: to_timestamp(&mixnodes_historical_count),
},
},
gateways: GatewaySummary {
bonded: GatewaySummaryBonded {
count: to_count_i32(&gateways_bonded_count),
last_updated_utc: to_timestamp(&gateways_bonded_count),
},
blacklisted: GatewaySummaryBlacklisted {
count: to_count_i32(&gateways_blacklisted_count),
last_updated_utc: to_timestamp(&gateways_blacklisted_count),
},
historical: GatewaySummaryHistorical {
count: to_count_i32(&gateways_historical_count),
last_updated_utc: to_timestamp(&gateways_historical_count),
},
explorer: GatewaySummaryExplorer {
count: to_count_i32(&gateways_explorer_count),
last_updated_utc: to_timestamp(&gateways_explorer_count),
},
},
})
}
fn to_count_i32(value: &SummaryDto) -> i32 {
value.value_json.parse::<i32>().unwrap_or_default()
}
fn to_timestamp(value: &SummaryDto) -> String {
timestamp_as_utc(value.last_updated_utc as u64).to_rfc3339()
}
fn timestamp_as_utc(unix_timestamp: u64) -> DateTime<Utc> {
let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp);
d.into()
}
@@ -0,0 +1,110 @@
use axum::{
extract::{Path, Query, State},
Json, Router,
};
use serde::Deserialize;
use utoipa::IntoParams;
use crate::http::{
error::{HttpError, HttpResult},
models::{Gateway, GatewaySkinny},
state::AppState,
PagedResult, Pagination,
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(gateways))
.route("/skinny", axum::routing::get(gateways_skinny))
.route("/skinny/:identity_key", axum::routing::get(get_gateway))
}
#[utoipa::path(
tag = "Gateways",
get,
params(
Pagination
),
path = "/v2/gateways",
responses(
(status = 200, body = PagedGateway)
)
)]
async fn gateways(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<Gateway>>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
Ok(Json(PagedResult::paginate(pagination, res)))
}
#[utoipa::path(
tag = "Gateways",
get,
params(
Pagination
),
path = "/v2/gateways/skinny",
responses(
(status = 200, body = PagedGatewaySkinny)
)
)]
async fn gateways_skinny(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<GatewaySkinny>>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
let res: Vec<GatewaySkinny> = res
.iter()
.filter(|g| g.bonded)
.map(|g| GatewaySkinny {
gateway_identity_key: g.gateway_identity_key.clone(),
self_described: g.self_described.clone(),
performance: g.performance,
explorer_pretty_bond: g.explorer_pretty_bond.clone(),
last_probe_result: g.last_probe_result.clone(),
last_testrun_utc: g.last_testrun_utc.clone(),
last_updated_utc: g.last_updated_utc.clone(),
routing_score: g.routing_score,
config_score: g.config_score,
})
.collect();
Ok(Json(PagedResult::paginate(pagination, res)))
}
#[derive(Deserialize, IntoParams)]
#[into_params(parameter_in = Path)]
struct IdentityKeyParam {
identity_key: String,
}
#[utoipa::path(
tag = "Gateways",
get,
params(
IdentityKeyParam
),
path = "/v2/gateways/{identity_key}",
responses(
(status = 200, body = Gateway)
)
)]
async fn get_gateway(
Path(IdentityKeyParam { identity_key }): Path<IdentityKeyParam>,
State(state): State<AppState>,
) -> HttpResult<Json<Gateway>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
match res
.iter()
.find(|item| item.gateway_identity_key == identity_key)
{
Some(res) => Ok(Json(res.clone())),
None => Err(HttpError::invalid_input(identity_key)),
}
}
@@ -0,0 +1,91 @@
use axum::{
extract::{Path, Query, State},
Json, Router,
};
use serde::Deserialize;
use tracing::instrument;
use utoipa::IntoParams;
use crate::http::{
error::{HttpError, HttpResult},
models::{DailyStats, Mixnode},
state::AppState,
PagedResult, Pagination,
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(mixnodes))
.route("/:mix_id", axum::routing::get(get_mixnodes))
.route("/stats", axum::routing::get(get_stats))
}
#[utoipa::path(
tag = "Mixnodes",
get,
params(
Pagination
),
path = "/v2/mixnodes",
responses(
(status = 200, body = PagedMixnode)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(page=pagination.page, size=pagination.size))]
async fn mixnodes(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<Mixnode>>> {
let db = state.db_pool();
let res = state.cache().get_mixnodes_list(db).await;
Ok(Json(PagedResult::paginate(pagination, res)))
}
#[derive(Deserialize, IntoParams)]
#[into_params(parameter_in = Path)]
struct MixIdParam {
mix_id: String,
}
#[utoipa::path(
tag = "Mixnodes",
get,
params(
MixIdParam
),
path = "/v2/mixnodes/{mix_id}",
responses(
(status = 200, body = Mixnode)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(mix_id = mix_id))]
async fn get_mixnodes(
Path(MixIdParam { mix_id }): Path<MixIdParam>,
State(state): State<AppState>,
) -> HttpResult<Json<Mixnode>> {
match mix_id.parse::<u32>() {
Ok(parsed_mix_id) => {
let res = state.cache().get_mixnodes_list(state.db_pool()).await;
match res.iter().find(|item| item.mix_id == parsed_mix_id) {
Some(res) => Ok(Json(res.clone())),
None => Err(HttpError::invalid_input(mix_id)),
}
}
Err(_e) => Err(HttpError::invalid_input(mix_id)),
}
}
#[utoipa::path(
tag = "Mixnodes",
get,
path = "/v2/mixnodes/stats",
responses(
(status = 200, body = Vec<DailyStats>)
)
)]
async fn get_stats(State(state): State<AppState>) -> HttpResult<Json<Vec<DailyStats>>> {
let stats = state.cache().get_mixnode_stats(state.db_pool()).await;
Ok(Json(stats))
}
+85
View File
@@ -0,0 +1,85 @@
use anyhow::anyhow;
use axum::{response::Redirect, Router};
use tokio::net::ToSocketAddrs;
use tower_http::{cors::CorsLayer, trace::TraceLayer};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::http::{server::HttpServer, state::AppState};
pub(crate) mod gateways;
pub(crate) mod mixnodes;
pub(crate) mod services;
pub(crate) mod summary;
pub(crate) mod testruns;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
}
impl RouterBuilder {
pub(crate) fn with_default_routes() -> Self {
let router = Router::new()
.merge(
SwaggerUi::new("/swagger")
.url("/api-docs/openapi.json", super::api_docs::ApiDoc::openapi()),
)
.route(
"/",
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
)
.nest(
"/v2",
Router::new()
.nest("/gateways", gateways::routes())
.nest("/mixnodes", mixnodes::routes())
.nest("/services", services::routes())
.nest("/summary", summary::routes()),
// .nest("/testruns", testruns::_routes()),
);
Self {
unfinished_router: router,
}
}
pub(crate) fn with_state(self, state: AppState) -> RouterWithState {
RouterWithState {
router: self.finalize_routes().with_state(state),
}
}
fn finalize_routes(self) -> Router<AppState> {
// layers added later wrap earlier layers
self.unfinished_router
// CORS layer needs to wrap other API layers
.layer(setup_cors())
// logger should be outermost layer
.layer(TraceLayer::new_for_http())
}
}
pub(crate) struct RouterWithState {
router: Router,
}
impl RouterWithState {
pub(crate) async fn build_server<A: ToSocketAddrs>(
self,
bind_address: A,
) -> anyhow::Result<HttpServer> {
tokio::net::TcpListener::bind(bind_address)
.await
.map(|listener| HttpServer::new(self.router, listener))
.map_err(|err| anyhow!("Couldn't bind to address due to {}", err))
}
}
fn setup_cors() -> CorsLayer {
use axum::http::Method;
CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods([Method::POST, Method::GET, Method::PATCH, Method::OPTIONS])
.allow_headers(tower_http::cors::Any)
.allow_credentials(false)
}
@@ -0,0 +1,58 @@
use serde_json_path::JsonPath;
use crate::http::models::Gateway;
pub(super) struct ParseJsonPaths {
pub(super) path_ip_address: JsonPath,
pub(super) path_hostname: JsonPath,
pub(super) path_service_provider_client_id: JsonPath,
}
impl ParseJsonPaths {
pub fn new() -> Result<Self, serde_json_path::ParseError> {
Ok(ParseJsonPaths {
path_ip_address: JsonPath::parse("$.host_information.ip_address[0]")?,
path_hostname: JsonPath::parse("$.host_information.hostname")?,
path_service_provider_client_id: JsonPath::parse("$.network_requester.address")?,
})
}
}
pub(super) struct ParsedDetails {
pub(super) ip_address: Option<String>,
pub(super) hostname: Option<String>,
pub(super) service_provider_client_id: Option<String>,
}
impl ParsedDetails {
fn get_string_from_json_path(
value: &Option<serde_json::Value>,
path: &JsonPath,
) -> Option<String> {
match value {
Some(value) => path
.query(value)
.exactly_one()
.map(|v2| v2.as_str().map(|v3| v3.to_string()))
.ok()
.flatten(),
None => None,
}
}
pub fn new(paths: &ParseJsonPaths, g: &Gateway) -> ParsedDetails {
ParsedDetails {
hostname: ParsedDetails::get_string_from_json_path(
&g.self_described,
&paths.path_hostname,
),
ip_address: ParsedDetails::get_string_from_json_path(
&g.self_described,
&paths.path_ip_address,
),
service_provider_client_id: ParsedDetails::get_string_from_json_path(
&g.self_described,
&paths.path_service_provider_client_id,
),
}
}
}
@@ -0,0 +1,134 @@
use crate::http::{
error::{HttpError, HttpResult},
models::Service,
state::AppState,
PagedResult, Pagination,
};
use axum::{
extract::{Query, State},
Json, Router,
};
use json_path::{ParseJsonPaths, ParsedDetails};
use tracing::instrument;
mod json_path;
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/", axum::routing::get(mixnodes))
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::IntoParams)]
#[into_params(parameter_in = Query)]
pub(crate) struct ServicesQueryParams {
size: Option<usize>,
page: Option<usize>,
wss: Option<bool>,
hostname: Option<bool>,
entry: Option<bool>,
}
#[utoipa::path(
tag = "Services",
get,
params(
ServicesQueryParams,
),
path = "/v2/services",
responses(
(status = 200, body = PagedService)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip(state))]
async fn mixnodes(
Query(ServicesQueryParams {
size,
page,
wss,
hostname,
entry,
}): Query<ServicesQueryParams>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<Service>>> {
let db = state.db_pool();
let cache = state.cache();
let show_only_wss = wss.unwrap_or(false);
let show_only_with_hostname = hostname.unwrap_or(false);
let show_entry_gateways_only = entry.unwrap_or(false);
let paths = ParseJsonPaths::new().map_err(|e| {
tracing::error!("Invalidly configured ParseJsonPaths: {e}");
HttpError::internal()
})?;
let res = cache.get_gateway_list(db).await;
let res: Vec<Service> = res
.iter()
.map(|g| {
let details = ParsedDetails::new(&paths, g);
let s = Service {
gateway_identity_key: g.gateway_identity_key.clone(),
ip_address: details.ip_address,
service_provider_client_id: details.service_provider_client_id,
hostname: details.hostname,
last_successful_ping_utc: g.last_testrun_utc.clone(),
last_updated_utc: g.last_updated_utc.clone(),
// routing_score: g.routing_score,
routing_score: 1f32,
mixnet_websockets: g
.self_described
.clone()
.and_then(|s| s.get("mixnet_websockets").cloned()),
};
let f = ServiceFilter::new(&s);
(s, f)
})
.filter(|(_, f)| {
let mut keep = f.has_network_requester_sp;
if show_entry_gateways_only {
keep = true;
}
if show_only_wss {
keep &= f.has_wss;
}
if show_only_with_hostname {
keep &= f.has_hostname;
}
keep
})
.map(|(s, _)| s)
.collect();
Ok(Json(PagedResult::paginate(Pagination { size, page }, res)))
}
struct ServiceFilter {
has_wss: bool,
has_network_requester_sp: bool,
has_hostname: bool,
}
impl ServiceFilter {
fn new(s: &Service) -> Self {
let has_wss = match &s.mixnet_websockets {
Some(v) => v.get("wss_port").map(|v2| !v2.is_null()).unwrap_or(false),
None => false,
};
let has_hostname = s.hostname.is_some();
let has_network_requester_sp = match &s.service_provider_client_id {
Some(v) => !v.is_empty(),
None => false,
};
ServiceFilter {
has_wss,
has_hostname,
has_network_requester_sp,
}
}
}
@@ -0,0 +1,43 @@
use axum::{extract::State, Json, Router};
use tracing::instrument;
use crate::{
db::models::NetworkSummary,
http::{error::HttpResult, models::SummaryHistory, state::AppState},
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(summary))
.route("/history", axum::routing::get(summary_history))
}
#[utoipa::path(
tag = "Summary",
get,
path = "/v2/summary",
responses(
(status = 200, body = NetworkSummary)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn summary(State(state): State<AppState>) -> HttpResult<Json<NetworkSummary>> {
crate::db::queries::get_summary(state.db_pool())
.await
.map(Json)
}
#[utoipa::path(
tag = "Summary",
get,
path = "/v2/summary/history",
responses(
(status = 200, body = Vec<SummaryHistory>)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn summary_history(State(state): State<AppState>) -> HttpResult<Json<Vec<SummaryHistory>>> {
Ok(Json(
state.cache().get_summary_history(state.db_pool()).await,
))
}
@@ -0,0 +1,7 @@
use axum::Router;
use crate::http::state::AppState;
pub(crate) fn _routes() -> Router<AppState> {
unimplemented!()
}
+15
View File
@@ -0,0 +1,15 @@
use crate::http::{Gateway, GatewaySkinny, Mixnode, Service};
use utoipa::OpenApi;
use utoipauto::utoipauto;
// manually import external structs which are behind feature flags because they
// can't be automatically discovered
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
#[utoipauto(paths = "./nym-node-status-api/src")]
#[derive(OpenApi)]
#[openapi(
info(title = "Nym API"),
tags(),
components(schemas(nym_node_requests::api::v1::node::models::NodeDescription,))
)]
pub(super) struct ApiDoc;
+28
View File
@@ -0,0 +1,28 @@
pub(crate) type HttpResult<T> = Result<T, HttpError>;
pub(crate) struct HttpError {
message: String,
status: axum::http::StatusCode,
}
impl HttpError {
pub(crate) fn invalid_input(message: String) -> Self {
Self {
message,
status: axum::http::StatusCode::BAD_REQUEST,
}
}
pub(crate) fn internal() -> Self {
Self {
message: serde_json::json!({"message": "Internal server error"}).to_string(),
status: axum::http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
impl axum::response::IntoResponse for HttpError {
fn into_response(self) -> axum::response::Response {
(self.status, self.message).into_response()
}
}
+71
View File
@@ -0,0 +1,71 @@
use models::{Gateway, GatewaySkinny, Mixnode, Service};
pub(crate) mod api;
pub(crate) mod api_docs;
pub(crate) mod error;
pub(crate) mod models;
pub(crate) mod server;
pub(crate) mod state;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
// exclude generic from auto-discovery
#[utoipauto::utoipa_ignore]
// https://docs.rs/utoipa/latest/utoipa/derive.ToSchema.html#generic-schemas-with-aliases
// Generic structs can only be included via aliases, not directly, because they
// it would cause an error in generated Swagger docs.
// Instead, you have to manually monomorphize each generic struct that
// you wish to document
#[aliases(
PagedGateway = PagedResult<Gateway>,
PagedGatewaySkinny = PagedResult<GatewaySkinny>,
PagedMixnode = PagedResult<Mixnode>,
PagedService = PagedResult<Service>,
)]
pub struct PagedResult<T> {
pub page: usize,
pub size: usize,
pub total: usize,
pub items: Vec<T>,
}
impl<T: Clone> PagedResult<T> {
pub fn paginate(pagination: Pagination, res: Vec<T>) -> Self {
let total = res.len();
let (size, mut page) = pagination.intoto_inner_values();
if page * size > total {
page = total / size;
}
let chunks: Vec<&[T]> = res.chunks(size).collect();
PagedResult {
page,
size,
total,
items: chunks.get(page).cloned().unwrap_or_default().into(),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::IntoParams)]
#[into_params(parameter_in = Query)]
pub(crate) struct Pagination {
size: Option<usize>,
page: Option<usize>,
}
impl Pagination {
// unwrap stored values or use predefined defaults
pub(crate) fn intoto_inner_values(self) -> (usize, usize) {
const SIZE_DEFAULT: usize = 10;
const SIZE_MAX: usize = 200;
const PAGE_DEFAULT: usize = 0;
(
self.size.unwrap_or(SIZE_DEFAULT).min(SIZE_MAX),
self.page.unwrap_or(PAGE_DEFAULT),
)
}
}
+74
View File
@@ -0,0 +1,74 @@
use nym_node_requests::api::v1::node::models::NodeDescription;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct Gateway {
pub gateway_identity_key: String,
pub bonded: bool,
pub blacklisted: bool,
pub performance: u8,
pub self_described: Option<serde_json::Value>,
pub explorer_pretty_bond: Option<serde_json::Value>,
pub description: NodeDescription,
pub last_probe_result: Option<serde_json::Value>,
pub last_probe_log: Option<String>,
pub last_testrun_utc: Option<String>,
pub last_updated_utc: String,
pub routing_score: f32,
pub config_score: u32,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct GatewaySkinny {
pub gateway_identity_key: String,
pub self_described: Option<serde_json::Value>,
pub explorer_pretty_bond: Option<serde_json::Value>,
pub last_probe_result: Option<serde_json::Value>,
pub last_testrun_utc: Option<String>,
pub last_updated_utc: String,
pub routing_score: f32,
pub config_score: u32,
pub performance: u8,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct Mixnode {
pub mix_id: u32,
pub bonded: bool,
pub blacklisted: bool,
pub is_dp_delegatee: bool,
pub total_stake: i64,
pub full_details: Option<serde_json::Value>,
pub self_described: Option<serde_json::Value>,
pub description: NodeDescription,
pub last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct DailyStats {
pub date_utc: String,
pub total_packets_received: i64,
pub total_packets_sent: i64,
pub total_packets_dropped: i64,
pub total_stake: i64,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct Service {
pub gateway_identity_key: String,
pub last_updated_utc: String,
pub routing_score: f32,
pub service_provider_client_id: Option<String>,
pub ip_address: Option<String>,
pub hostname: Option<String>,
pub mixnet_websockets: Option<serde_json::Value>,
pub last_successful_ping_utc: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct SummaryHistory {
pub date: String,
pub value_json: serde_json::Value,
pub timestamp_utc: String,
}
+92
View File
@@ -0,0 +1,92 @@
use axum::Router;
use core::net::SocketAddr;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
use crate::{
db::DbPool,
http::{api::RouterBuilder, state::AppState},
};
/// Return handles that allow for graceful shutdown of server + awaiting its
/// background tokio task
pub(crate) async fn start_http_api(
db_pool: DbPool,
http_port: u16,
nym_http_cache_ttl: u64,
) -> anyhow::Result<ShutdownHandles> {
let router_builder = RouterBuilder::with_default_routes();
let state = AppState::new(db_pool, nym_http_cache_ttl);
let router = router_builder.with_state(state);
// TODO dz do we need this to be configurable?
let bind_addr = format!("0.0.0.0:{}", http_port);
let server = router.build_server(bind_addr).await?;
Ok(start_server(server))
}
fn start_server(server: HttpServer) -> ShutdownHandles {
// one copy is stored to trigger a graceful shutdown later
let shutdown_button = CancellationToken::new();
// other copy is given to server to listen for a shutdown
let shutdown_receiver = shutdown_button.clone();
let shutdown_receiver = shutdown_receiver.cancelled_owned();
let server_handle = tokio::spawn(async move { server.run(shutdown_receiver).await });
ShutdownHandles {
server_handle,
shutdown_button,
}
}
pub(crate) struct ShutdownHandles {
server_handle: JoinHandle<std::io::Result<()>>,
shutdown_button: CancellationToken,
}
impl ShutdownHandles {
/// Send graceful shutdown signal to server and wait for server task to complete
pub(crate) async fn shutdown(self) -> anyhow::Result<()> {
self.shutdown_button.cancel();
match self.server_handle.await {
Ok(Ok(_)) => {
tracing::info!("HTTP server shut down without errors");
}
Ok(Err(err)) => {
tracing::error!("HTTP server terminated with: {err}");
anyhow::bail!(err)
}
Err(err) => {
tracing::error!("Server task panicked: {err}");
}
};
Ok(())
}
}
pub(crate) struct HttpServer {
router: Router,
listener: TcpListener,
}
impl HttpServer {
pub(crate) fn new(router: Router, listener: TcpListener) -> Self {
Self { router, listener }
}
pub(crate) async fn run(self, receiver: WaitForCancellationFutureOwned) -> std::io::Result<()> {
// into_make_service_with_connect_info allows us to see client ip address
axum::serve(
self.listener,
self.router
.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(receiver)
.await
}
}
+223
View File
@@ -0,0 +1,223 @@
use std::{sync::Arc, time::Duration};
use moka::{future::Cache, Entry};
use tokio::sync::RwLock;
use crate::{
db::DbPool,
http::models::{DailyStats, Gateway, Mixnode, SummaryHistory},
};
#[derive(Debug, Clone)]
pub(crate) struct AppState {
db_pool: DbPool,
cache: HttpCache,
}
impl AppState {
pub(crate) fn new(db_pool: DbPool, cache_ttl: u64) -> Self {
Self {
db_pool,
cache: HttpCache::new(cache_ttl),
}
}
pub(crate) fn db_pool(&self) -> &DbPool {
&self.db_pool
}
pub(crate) fn cache(&self) -> &HttpCache {
&self.cache
}
}
static GATEWAYS_LIST_KEY: &str = "gateways";
static MIXNODES_LIST_KEY: &str = "mixnodes";
static MIXSTATS_LIST_KEY: &str = "mixstats";
static SUMMARY_HISTORY_LIST_KEY: &str = "summary-history";
#[derive(Debug, Clone)]
pub(crate) struct HttpCache {
gateways: Cache<String, Arc<RwLock<Vec<Gateway>>>>,
mixnodes: Cache<String, Arc<RwLock<Vec<Mixnode>>>>,
mixstats: Cache<String, Arc<RwLock<Vec<DailyStats>>>>,
history: Cache<String, Arc<RwLock<Vec<SummaryHistory>>>>,
}
impl HttpCache {
pub fn new(ttl_seconds: u64) -> Self {
HttpCache {
gateways: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
mixnodes: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
mixstats: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
history: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
}
}
pub async fn upsert_gateway_list(
&self,
new_gateway_list: Vec<Gateway>,
) -> Entry<String, Arc<RwLock<Vec<Gateway>>>> {
self.gateways
.entry_by_ref(GATEWAYS_LIST_KEY)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
let v = entry.into_value();
let mut guard = v.write().await;
*guard = new_gateway_list;
v.clone()
} else {
Arc::new(RwLock::new(new_gateway_list))
}
})
.await
}
pub async fn get_gateway_list(&self, db: &DbPool) -> Vec<Gateway> {
match self.gateways.get(GATEWAYS_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.clone()
}
None => {
// the key is missing so populate it
tracing::warn!("No gateways in cache, refreshing cache from DB...");
let gateways = crate::db::queries::get_all_gateways(db)
.await
.unwrap_or_default();
self.upsert_gateway_list(gateways.clone()).await;
if gateways.is_empty() {
tracing::warn!("Database contains 0 gateways");
}
gateways
}
}
}
pub async fn upsert_mixnode_list(
&self,
new_mixnode_list: Vec<Mixnode>,
) -> Entry<String, Arc<RwLock<Vec<Mixnode>>>> {
self.mixnodes
.entry_by_ref(MIXNODES_LIST_KEY)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
let v = entry.into_value();
let mut guard = v.write().await;
*guard = new_mixnode_list;
v.clone()
} else {
Arc::new(RwLock::new(new_mixnode_list))
}
})
.await
}
pub async fn get_mixnodes_list(&self, db: &DbPool) -> Vec<Mixnode> {
match self.mixnodes.get(MIXNODES_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.clone()
}
None => {
tracing::warn!("No mixnodes in cache, refreshing cache from DB...");
let mixnodes = crate::db::queries::get_all_mixnodes(db)
.await
.unwrap_or_default();
self.upsert_mixnode_list(mixnodes.clone()).await;
if mixnodes.is_empty() {
tracing::warn!("Database contains 0 mixnodes");
}
mixnodes
}
}
}
pub async fn upsert_mixnode_stats(
&self,
mixnode_stats: Vec<DailyStats>,
) -> Entry<String, Arc<RwLock<Vec<DailyStats>>>> {
self.mixstats
.entry_by_ref(MIXSTATS_LIST_KEY)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
let v = entry.into_value();
let mut guard = v.write().await;
*guard = mixnode_stats;
v.clone()
} else {
Arc::new(RwLock::new(mixnode_stats))
}
})
.await
}
pub async fn get_mixnode_stats(&self, db: &DbPool) -> Vec<DailyStats> {
match self.mixstats.get(MIXSTATS_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.to_vec()
}
None => {
let mixnode_stats = crate::db::queries::get_daily_stats(db)
.await
.unwrap_or_default();
self.upsert_mixnode_stats(mixnode_stats.clone()).await;
mixnode_stats
}
}
}
pub async fn get_summary_history(&self, db: &DbPool) -> Vec<SummaryHistory> {
match self.history.get(SUMMARY_HISTORY_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.to_vec()
}
None => {
let summary_history = crate::db::queries::get_summary_history(db)
.await
.unwrap_or(vec![]);
self.upsert_summary_history(summary_history.clone()).await;
summary_history
}
}
}
pub async fn upsert_summary_history(
&self,
summary_history: Vec<SummaryHistory>,
) -> Entry<String, Arc<RwLock<Vec<SummaryHistory>>>> {
self.history
.entry_by_ref(SUMMARY_HISTORY_LIST_KEY)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
let v = entry.into_value();
let mut guard = v.write().await;
*guard = summary_history;
v.clone()
} else {
Arc::new(RwLock::new(summary_history))
}
})
.await
}
}
+48
View File
@@ -0,0 +1,48 @@
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{filter::Directive, EnvFilter};
pub(crate) fn setup_tracing_logger() {
fn directive_checked(directive: impl Into<String>) -> Directive {
directive
.into()
.parse()
.expect("Failed to parse log directive")
}
let log_builder = tracing_subscriber::fmt()
// Use a more compact, abbreviated log format
.compact()
// Display source code file paths
.with_file(true)
// Display source code line numbers
.with_line_number(true)
.with_thread_ids(true)
// Don't display the event's target (module path)
.with_target(false);
let mut filter = EnvFilter::builder()
// if RUST_LOG isn't set, set default level
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
// these crates are more granularly filtered
let filter_crates = [
"reqwest",
"rustls",
"hyper",
"sqlx",
"h2",
"tendermint_rpc",
"tower_http",
"axum",
];
for crate_name in filter_crates {
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name)));
}
filter = filter.add_directive(directive_checked("nym_bin_common=debug"));
filter = filter.add_directive(directive_checked("nym_explorer_client=debug"));
filter = filter.add_directive(directive_checked("nym_network_defaults=debug"));
filter = filter.add_directive(directive_checked("nym_validator_client=debug"));
log_builder.with_env_filter(filter).init();
}
+44
View File
@@ -0,0 +1,44 @@
use clap::Parser;
use nym_task::signal::wait_for_signal;
mod cli;
mod db;
mod http;
mod logging;
mod monitor;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::setup_tracing_logger();
let args = cli::Cli::parse();
let connection_url = args.connection_url.clone();
tracing::debug!("Using config:\n{:#?}", args);
let storage = db::Storage::init(connection_url).await?;
let db_pool = storage.pool_owned().await;
let args_clone = args.clone();
tokio::spawn(async move {
monitor::spawn_in_background(db_pool, args_clone).await;
});
tracing::info!("Started monitor task");
let shutdown_handles = http::server::start_http_api(
storage.pool_owned().await,
args.http_port,
args.nym_http_cache_ttl,
)
.await
.expect("Failed to start server");
tracing::info!("Started HTTP server on port {}", args.http_port);
wait_for_signal().await;
if let Err(err) = shutdown_handles.shutdown().await {
tracing::error!("{err}");
};
Ok(())
}
+449
View File
@@ -0,0 +1,449 @@
use crate::cli::Cli;
use crate::db::models::{
gateway, mixnode, GatewayRecord, MixnodeRecord, NetworkSummary, GATEWAYS_BLACKLISTED_COUNT,
GATEWAYS_BONDED_COUNT, GATEWAYS_EXPLORER_COUNT, GATEWAYS_HISTORICAL_COUNT,
MIXNODES_BLACKLISTED_COUNT, MIXNODES_BONDED_ACTIVE, MIXNODES_BONDED_COUNT,
MIXNODES_BONDED_INACTIVE, MIXNODES_BONDED_RESERVE, MIXNODES_HISTORICAL_COUNT,
};
use crate::db::{queries, DbPool};
use anyhow::anyhow;
use cosmwasm_std::Decimal;
use nym_explorer_client::{ExplorerClient, PrettyDetailedGatewayBond};
use nym_network_defaults::NymNetworkDetails;
use nym_validator_client::client::NymApiClientExt;
use nym_validator_client::models::{DescribedGateway, DescribedMixNode, MixNodeBondAnnotated};
use nym_validator_client::nym_nodes::SkimmedNode;
use nym_validator_client::nyxd::contract_traits::PagedMixnetQueryClient;
use nym_validator_client::nyxd::{AccountId, NyxdClient};
use nym_validator_client::NymApiClient;
use reqwest::Url;
use std::collections::HashSet;
use std::str::FromStr;
use tokio::task::JoinHandle;
use tokio::time::Duration;
const REFRESH_DELAY: Duration = Duration::from_secs(60 * 5);
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(15);
static DELEGATION_PROGRAM_WALLET: &str = "n1rnxpdpx3kldygsklfft0gech7fhfcux4zst5lw";
// TODO dz: query many NYM APIs:
// multiple instances running directory cache, ask sachin
pub(crate) async fn spawn_in_background(db_pool: DbPool, config: Cli) -> JoinHandle<()> {
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
loop {
tracing::info!("Refreshing node info...");
if let Err(e) = run(&db_pool, &network_defaults, &config).await {
tracing::error!(
"Monitor run failed: {e}, retrying in {}s...",
FAILURE_RETRY_DELAY.as_secs()
);
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!(
"Info successfully collected, sleeping for {}s...",
REFRESH_DELAY.as_secs()
);
tokio::time::sleep(REFRESH_DELAY).await;
}
}
}
async fn run(
pool: &DbPool,
network_details: &NymNetworkDetails,
config: &Cli,
) -> anyhow::Result<()> {
let default_api_url = network_details
.endpoints
.first()
.expect("rust sdk mainnet default incorrectly configured")
.api_url()
.clone()
.expect("rust sdk mainnet default missing api_url");
let default_explorer_url = network_details.explorer_api.clone().map(|url| {
url.parse()
.expect("rust sdk mainnet default explorer url not parseable")
});
let default_explorer_url =
default_explorer_url.expect("explorer url missing in network config");
let explorer_client =
ExplorerClient::new_with_timeout(default_explorer_url, config.explorer_client_timeout)?;
let explorer_gateways = explorer_client
.get_gateways()
.await
.log_error("get_gateways")?;
tracing::debug!("6");
let api_client =
// TODO dz introduce timeout ?
NymApiClient::new(default_api_url);
let gateways = api_client
.get_cached_described_gateways()
.await
.log_error("get_described_gateways")?;
tracing::debug!("Fetched {} gateways", gateways.len());
let skimmed_gateways = api_client
.get_basic_gateways(None)
.await
.log_error("get_basic_gateways")?;
let mixnodes = api_client
.get_cached_mixnodes()
.await
.log_error("get_cached_mixnodes")?;
tracing::debug!("Fetched {} mixnodes", mixnodes.len());
// TODO dz can we calculate blacklisted GWs from their performance?
// where do we get their performance?
let gateways_blacklisted = api_client
.nym_api
.get_gateways_blacklisted()
.await
.map(|vec| vec.into_iter().collect::<HashSet<_>>())
.log_error("get_gateways_blacklisted")?;
// Cached mixnodes don't include blacklisted nodes
// We need that to calculate the total locked tokens later
let mixnodes = api_client
.nym_api
.get_mixnodes_detailed_unfiltered()
.await
.log_error("get_mixnodes_detailed_unfiltered")?;
let mixnodes_described = api_client
.nym_api
.get_mixnodes_described()
.await
.log_error("get_mixnodes_described")?;
let mixnodes_active = api_client
.nym_api
.get_active_mixnodes()
.await
.log_error("get_active_mixnodes")?;
let delegation_program_members =
get_delegation_program_details(network_details, &config.nyxd_addr).await?;
// keep stats for later
let count_bonded_mixnodes = mixnodes.len();
let count_bonded_gateways = gateways.len();
let count_explorer_gateways = explorer_gateways.len();
let count_bonded_mixnodes_active = mixnodes_active.len();
let gateway_records = prepare_gateway_data(
&gateways,
&gateways_blacklisted,
explorer_gateways,
skimmed_gateways,
)?;
queries::insert_gateways(pool, gateway_records)
.await
.map(|_| {
tracing::debug!("Gateway info written to DB!");
})?;
// instead of counting blacklisted GWs returned from API cache, count from the active set
let count_gateways_blacklisted = gateways
.iter()
.filter(|gw| {
let gw_identity = gw.bond.identity();
gateways_blacklisted.contains(gw_identity)
})
.count();
if count_gateways_blacklisted > 0 {
queries::write_blacklisted_gateways_to_db(pool, gateways_blacklisted.iter())
.await
.map(|_| {
tracing::debug!(
"Gateway blacklist info written to DB! {} blacklisted by Nym API",
count_gateways_blacklisted
)
})?;
}
let mixnode_records =
prepare_mixnode_data(&mixnodes, mixnodes_described, delegation_program_members)?;
queries::insert_mixnodes(pool, mixnode_records)
.await
.map(|_| {
tracing::debug!("Mixnode info written to DB!");
})?;
let count_mixnodes_blacklisted = mixnodes.iter().filter(|elem| elem.blacklisted).count();
let recently_unbonded_gateways = queries::ensure_gateways_still_bonded(pool, &gateways).await?;
let recently_unbonded_mixnodes = queries::ensure_mixnodes_still_bonded(pool, &mixnodes).await?;
let count_bonded_mixnodes_reserve = 0; // TODO: NymAPI doesn't report the reserve set size
let count_bonded_mixnodes_inactive = count_bonded_mixnodes - count_bonded_mixnodes_active;
let (all_historical_gateways, all_historical_mixnodes) = calculate_stats(pool).await?;
//
// write summary keys and values to table
//
let nodes_summary = vec![
(MIXNODES_BONDED_COUNT, &count_bonded_mixnodes),
(MIXNODES_BONDED_ACTIVE, &count_bonded_mixnodes_active),
(MIXNODES_BONDED_INACTIVE, &count_bonded_mixnodes_inactive),
(MIXNODES_BONDED_RESERVE, &count_bonded_mixnodes_reserve),
(MIXNODES_BLACKLISTED_COUNT, &count_mixnodes_blacklisted),
(GATEWAYS_BONDED_COUNT, &count_bonded_gateways),
(GATEWAYS_EXPLORER_COUNT, &count_explorer_gateways),
(MIXNODES_HISTORICAL_COUNT, &all_historical_mixnodes),
(GATEWAYS_HISTORICAL_COUNT, &all_historical_gateways),
(GATEWAYS_BLACKLISTED_COUNT, &count_gateways_blacklisted),
];
let last_updated = chrono::offset::Utc::now();
let last_updated_utc = last_updated.timestamp().to_string();
let network_summary = NetworkSummary {
mixnodes: mixnode::MixnodeSummary {
bonded: mixnode::MixnodeSummaryBonded {
count: count_bonded_mixnodes.cast_checked()?,
active: count_bonded_mixnodes_active.cast_checked()?,
inactive: count_bonded_mixnodes_inactive.cast_checked()?,
reserve: count_bonded_mixnodes_reserve.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
blacklisted: mixnode::MixnodeSummaryBlacklisted {
count: count_mixnodes_blacklisted.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
historical: mixnode::MixnodeSummaryHistorical {
count: all_historical_mixnodes.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
},
gateways: gateway::GatewaySummary {
bonded: gateway::GatewaySummaryBonded {
count: count_bonded_gateways.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
blacklisted: gateway::GatewaySummaryBlacklisted {
count: count_gateways_blacklisted.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
historical: gateway::GatewaySummaryHistorical {
count: all_historical_gateways.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
explorer: gateway::GatewaySummaryExplorer {
count: count_explorer_gateways.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
},
};
queries::insert_summaries(pool, &nodes_summary, &network_summary, last_updated).await?;
let mut log_lines: Vec<String> = vec![];
for (key, value) in nodes_summary.iter() {
log_lines.push(format!("{} = {}", key, value));
}
log_lines.push(format!(
"recently_unbonded_mixnodes = {}",
recently_unbonded_mixnodes
));
log_lines.push(format!(
"recently_unbonded_gateways = {}",
recently_unbonded_gateways
));
tracing::info!("Directory summary: \n{}", log_lines.join("\n"));
Ok(())
}
fn prepare_gateway_data(
gateways: &[DescribedGateway],
gateways_blacklisted: &HashSet<String>,
explorer_gateways: Vec<PrettyDetailedGatewayBond>,
skimmed_gateways: Vec<SkimmedNode>,
) -> anyhow::Result<Vec<GatewayRecord>> {
let mut gateway_records = Vec::new();
for gateway in gateways {
let identity_key = gateway.bond.identity();
let bonded = true;
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let blacklisted = gateways_blacklisted.contains(identity_key);
let self_described = gateway
.self_described
.as_ref()
.and_then(|v| serde_json::to_string(&v).ok());
let explorer_pretty_bond = explorer_gateways
.iter()
.find(|g| g.gateway.identity_key.eq(identity_key));
let explorer_pretty_bond = explorer_pretty_bond.and_then(|g| serde_json::to_string(g).ok());
let performance = skimmed_gateways
.iter()
.find(|g| g.ed25519_identity_pubkey.eq(identity_key))
.map(|g| g.performance)
.unwrap_or_default()
.round_to_integer();
gateway_records.push(GatewayRecord {
identity_key: identity_key.to_owned(),
bonded,
blacklisted,
self_described,
explorer_pretty_bond,
last_updated_utc,
performance,
});
}
Ok(gateway_records)
}
fn prepare_mixnode_data(
mixnodes: &[MixNodeBondAnnotated],
mixnodes_described: Vec<DescribedMixNode>,
delegation_program_members: Vec<u32>,
) -> anyhow::Result<Vec<MixnodeRecord>> {
let mut mixnode_records = Vec::new();
for mixnode in mixnodes {
let mix_id = mixnode.mix_id();
let identity_key = mixnode.identity_key();
let bonded = true;
let total_stake = decimal_to_i64(mixnode.mixnode_details.total_stake());
let blacklisted = mixnode.blacklisted;
let node_info = mixnode.mix_node();
let host = node_info.host.clone();
let http_port = node_info.http_api_port;
// Contains all the information including what's above
let full_details = serde_json::to_string(&mixnode)?;
let mixnode_described = mixnodes_described.iter().find(|m| m.bond.mix_id == mix_id);
let self_described = mixnode_described.and_then(|v| serde_json::to_string(v).ok());
let is_dp_delegatee = delegation_program_members.contains(&mix_id);
let last_updated_utc = chrono::offset::Utc::now().timestamp();
mixnode_records.push(MixnodeRecord {
mix_id,
identity_key: identity_key.to_owned(),
bonded,
total_stake,
host,
http_port,
blacklisted,
full_details,
self_described,
last_updated_utc,
is_dp_delegatee,
});
}
Ok(mixnode_records)
}
// TODO dz is there a common monorepo place this can be put?
pub trait NumericalCheckedCast<T>
where
T: TryFrom<Self>,
<T as TryFrom<Self>>::Error: std::error::Error,
Self: std::fmt::Display + Copy,
{
fn cast_checked(self) -> anyhow::Result<T> {
T::try_from(self).map_err(|e| {
anyhow::anyhow!(
"Couldn't cast {} to {}: {}",
self,
std::any::type_name::<T>(),
e
)
})
}
}
impl<T, U> NumericalCheckedCast<U> for T
where
U: TryFrom<T>,
<U as TryFrom<T>>::Error: std::error::Error,
T: std::fmt::Display + Copy,
{
}
async fn calculate_stats(pool: &DbPool) -> anyhow::Result<(usize, usize)> {
let mut conn = pool.acquire().await?;
let all_historical_gateways = sqlx::query_scalar!(r#"SELECT count(id) FROM gateways"#)
.fetch_one(&mut *conn)
.await?
.cast_checked()?;
let all_historical_mixnodes = sqlx::query_scalar!(r#"SELECT count(id) FROM mixnodes"#)
.fetch_one(&mut *conn)
.await?
.cast_checked()?;
Ok((all_historical_gateways, all_historical_mixnodes))
}
async fn get_delegation_program_details(
network_details: &NymNetworkDetails,
nyxd_addr: &Url,
) -> anyhow::Result<Vec<u32>> {
let config = nym_validator_client::nyxd::Config::try_from_nym_network_details(network_details)?;
let client = NyxdClient::connect(config, nyxd_addr.as_str())
.map_err(|err| anyhow::anyhow!("Couldn't connect: {}", err))?;
let account_id = AccountId::from_str(DELEGATION_PROGRAM_WALLET)
.map_err(|e| anyhow!("Invalid bech32 address: {}", e))?;
let delegations = client.get_all_delegator_delegations(&account_id).await?;
let mix_ids: Vec<u32> = delegations
.iter()
.map(|delegation| delegation.mix_id)
.collect();
Ok(mix_ids)
}
fn decimal_to_i64(decimal: Decimal) -> i64 {
// Convert the underlying Uint128 to a u128
let atomics = decimal.atomics().u128();
let precision = 1_000_000_000_000_000_000u128;
// Get the fractional part
let fractional = atomics % precision;
// Get the integer part
let integer = atomics / precision;
// Combine them into a float
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
// Limit to 6 decimal places
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
rounded_value as i64
}
trait LogError<T, E> {
fn log_error(self, msg: &str) -> Result<T, E>;
}
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
where
E: std::error::Error,
{
fn log_error(self, msg: &str) -> Result<T, E> {
if let Err(e) = &self {
tracing::error!("[{msg}]:\t{e}");
}
self
}
}