Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 887ae8c789 | |||
| 12759a8460 | |||
| 00b737d362 | |||
| bd7b5db68f | |||
| 80184c3a51 | |||
| a13e9d8cc3 | |||
| 003a835ee6 | |||
| a60f37ae88 | |||
| 8d400ed4e0 | |||
| e4ba6c815e | |||
| 1ee18d9efe |
@@ -0,0 +1,55 @@
|
||||
name: Build and upload Node Status API container to harbor.nymte.ch
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
WORKING_DIRECTORY: "nym-node-status-api"
|
||||
CONTAINER_NAME: "node-status-api"
|
||||
|
||||
jobs:
|
||||
build-container:
|
||||
runs-on: arc-ubuntu-22.04-dind
|
||||
steps:
|
||||
- name: Login to Harbor
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: harbor.nymte.ch
|
||||
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
|
||||
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
|
||||
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Configure git identity
|
||||
run: |
|
||||
git config --global user.email "lawrence@nymtech.net"
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.3
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
- name: Check if tag exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
|
||||
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
|
||||
fi
|
||||
|
||||
- name: Remove existing tag if exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
|
||||
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.value }}
|
||||
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.value }}
|
||||
fi
|
||||
|
||||
- name: Create tag
|
||||
run: |
|
||||
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.value }} -m "Version ${{ steps.get_version.outputs.value }}"
|
||||
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.value }}
|
||||
|
||||
- name: BuildAndPushImageOnHarbor
|
||||
run: |
|
||||
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.value }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
|
||||
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
|
||||
Generated
+894
-317
File diff suppressed because it is too large
Load Diff
@@ -118,6 +118,7 @@ members = [
|
||||
"nym-node",
|
||||
"nym-node/nym-node-http-api",
|
||||
"nym-node/nym-node-requests",
|
||||
"nym-node-status-api",
|
||||
"nym-outfox",
|
||||
"nym-validator-rewarder",
|
||||
"tools/echo-server",
|
||||
@@ -152,6 +153,7 @@ default-members = [
|
||||
"nym-data-observatory",
|
||||
"nym-node",
|
||||
"nym-validator-rewarder",
|
||||
"nym-node-status-api",
|
||||
"service-providers/authenticator",
|
||||
"service-providers/ip-packet-router",
|
||||
"service-providers/network-requester",
|
||||
@@ -232,10 +234,12 @@ dotenvy = "0.15.6"
|
||||
ecdsa = "0.16"
|
||||
ed25519-dalek = "2.1"
|
||||
etherparse = "0.13.0"
|
||||
envy = "0.4"
|
||||
eyre = "0.6.9"
|
||||
fastrand = "2.1.1"
|
||||
flate2 = "1.0.34"
|
||||
futures = "0.3.28"
|
||||
futures-util = "0.3"
|
||||
generic-array = "0.14.7"
|
||||
getrandom = "0.2.10"
|
||||
getset = "0.1.3"
|
||||
@@ -265,6 +269,7 @@ ledger-transport-hid = "0.10.0"
|
||||
log = "0.4"
|
||||
maxminddb = "0.23.0"
|
||||
mime = "0.3.17"
|
||||
moka = { version = "0.12", features = ["future"] }
|
||||
nix = "0.27.1"
|
||||
notify = "5.1.0"
|
||||
okapi = "0.7.0"
|
||||
@@ -298,6 +303,7 @@ serde = "1.0.210"
|
||||
serde_bytes = "0.11.15"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0.128"
|
||||
serde_json_path = "0.6.7"
|
||||
serde_repr = "0.1"
|
||||
serde_with = "3.9.0"
|
||||
serde_yaml = "0.9.25"
|
||||
@@ -327,6 +333,7 @@ tracing = "0.1.37"
|
||||
tracing-opentelemetry = "0.19.0"
|
||||
tracing-subscriber = "0.3.16"
|
||||
tracing-tree = "0.2.2"
|
||||
tracing-log = "0.2"
|
||||
ts-rs = "10.0.0"
|
||||
tungstenite = { version = "0.20.1", default-features = false }
|
||||
url = "2.5"
|
||||
|
||||
@@ -25,7 +25,7 @@ serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
nym-http-api-client = { path = "../../../common/http-api-client" }
|
||||
thiserror = { workspace = true }
|
||||
log = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
url = { workspace = true, features = ["serde"] }
|
||||
tokio = { workspace = true, features = ["sync", "time"] }
|
||||
time = { workspace = true, features = ["formatting"] }
|
||||
|
||||
@@ -265,6 +265,13 @@ impl NymApiClient {
|
||||
NymApiClient { nym_api }
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn new_with_timeout(api_url: Url, timeout: std::time::Duration) -> Self {
|
||||
let nym_api = nym_api::Client::new(api_url, Some(timeout));
|
||||
|
||||
NymApiClient { nym_api }
|
||||
}
|
||||
|
||||
pub fn new_with_user_agent(api_url: Url, user_agent: UserAgent) -> Self {
|
||||
let nym_api = nym_api::Client::builder::<_, ValidatorClientError>(api_url)
|
||||
.expect("invalid api url")
|
||||
|
||||
@@ -121,36 +121,36 @@ async fn test_nyxd_connection(
|
||||
{
|
||||
Ok(Err(NyxdError::TendermintErrorRpc(e))) => {
|
||||
// If we get a tendermint-rpc error, we classify the node as not contactable
|
||||
log::warn!("Checking: nyxd url: {url}: {}: {}", "failed".red(), e);
|
||||
tracing::warn!("Checking: nyxd url: {url}: {}: {}", "failed".red(), e);
|
||||
false
|
||||
}
|
||||
Ok(Err(NyxdError::AbciError { code, log, .. })) => {
|
||||
// We accept the mixnet contract not found as ok from a connection standpoint. This happens
|
||||
// for example on a pre-launch network.
|
||||
log::debug!(
|
||||
tracing::debug!(
|
||||
"Checking: nyxd url: {url}: {}, but with abci error: {code}: {log}",
|
||||
"success".green()
|
||||
);
|
||||
code == 18
|
||||
}
|
||||
Ok(Err(error @ NyxdError::NoContractAddressAvailable(_))) => {
|
||||
log::warn!("Checking: nyxd url: {url}: {}: {error}", "failed".red());
|
||||
tracing::warn!("Checking: nyxd url: {url}: {}: {error}", "failed".red());
|
||||
false
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
// For any other error, we're optimistic and just try anyway.
|
||||
log::warn!(
|
||||
tracing::warn!(
|
||||
"Checking: nyxd_url: {url}: {}, but with error: {e}",
|
||||
"success".green()
|
||||
);
|
||||
true
|
||||
}
|
||||
Ok(Ok(_)) => {
|
||||
log::debug!("Checking: nyxd_url: {url}: {}", "success".green());
|
||||
tracing::debug!("Checking: nyxd_url: {url}: {}", "success".green());
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Checking: nyxd_url: {url}: {}: {e}", "failed".red());
|
||||
tracing::warn!("Checking: nyxd_url: {url}: {}: {e}", "failed".red());
|
||||
false
|
||||
}
|
||||
};
|
||||
@@ -169,15 +169,15 @@ async fn test_nym_api_connection(
|
||||
.await
|
||||
{
|
||||
Ok(Ok(_)) => {
|
||||
log::debug!("Checking: api_url: {url}: {}", "success".green());
|
||||
tracing::debug!("Checking: api_url: {url}: {}", "success".green());
|
||||
true
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
log::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
|
||||
tracing::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
|
||||
false
|
||||
}
|
||||
Err(e) => {
|
||||
log::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
|
||||
tracing::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
@@ -40,6 +40,7 @@ use nym_mixnet_contract_common::mixnode::MixNodeDetails;
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, NodeId};
|
||||
use time::format_description::BorrowedFormatItem;
|
||||
use time::Date;
|
||||
use tracing::instrument;
|
||||
|
||||
pub mod error;
|
||||
pub mod routes;
|
||||
@@ -51,11 +52,13 @@ pub fn rfc_3339_date() -> Vec<BorrowedFormatItem<'static>> {
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
pub trait NymApiClientExt: ApiClient {
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.get_json(&[routes::API_VERSION, routes::MIXNODES], NO_PARAMS)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
@@ -69,6 +72,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways_detailed(&self) -> Result<Vec<GatewayBondAnnotated>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
@@ -82,6 +86,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_detailed_unfiltered(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
@@ -97,11 +102,13 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways(&self) -> Result<Vec<GatewayBond>, NymAPIError> {
|
||||
self.get_json(&[routes::API_VERSION, routes::GATEWAYS], NO_PARAMS)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways_described(&self) -> Result<Vec<LegacyDescribedGateway>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::GATEWAYS, routes::DESCRIBED],
|
||||
@@ -110,6 +117,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_described(&self) -> Result<Vec<LegacyDescribedMixNode>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::DESCRIBED],
|
||||
@@ -118,6 +126,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn get_basic_mixnodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
@@ -141,6 +150,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_basic_gateways(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
@@ -164,6 +174,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::ACTIVE],
|
||||
@@ -172,6 +183,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_active_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
@@ -186,6 +198,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_rewarded_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::REWARDED],
|
||||
@@ -194,6 +207,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_report(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -211,6 +225,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateway_report(
|
||||
&self,
|
||||
identity: IdentityKeyRef<'_>,
|
||||
@@ -228,6 +243,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_history(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -245,6 +261,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateway_history(
|
||||
&self,
|
||||
identity: IdentityKeyRef<'_>,
|
||||
@@ -262,6 +279,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_rewarded_mixnodes_detailed(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
@@ -278,6 +296,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateway_core_status_count(
|
||||
&self,
|
||||
identity: IdentityKeyRef<'_>,
|
||||
@@ -309,6 +328,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_core_status_count(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -341,6 +361,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_status(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -358,6 +379,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_reward_estimation(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -375,6 +397,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn compute_mixnode_reward_estimation(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -394,6 +417,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_stake_saturation(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -411,6 +435,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_inclusion_probability(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -428,6 +453,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_current_node_performance(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
@@ -458,6 +484,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_blacklisted(&self) -> Result<Vec<NodeId>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::BLACKLISTED],
|
||||
@@ -466,6 +493,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways_blacklisted(&self) -> Result<Vec<IdentityKey>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::GATEWAYS, routes::BLACKLISTED],
|
||||
@@ -474,6 +502,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, request_body))]
|
||||
async fn blind_sign(
|
||||
&self,
|
||||
request_body: &BlindSignRequestBody,
|
||||
@@ -490,6 +519,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, request_body))]
|
||||
async fn verify_ecash_ticket(
|
||||
&self,
|
||||
request_body: &VerifyEcashTicketBody,
|
||||
@@ -506,6 +536,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, request_body))]
|
||||
async fn batch_redeem_ecash_tickets(
|
||||
&self,
|
||||
request_body: &BatchRedeemTicketsBody,
|
||||
@@ -522,6 +553,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn double_spending_filter_v1(&self) -> Result<SpentCredentialsResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
@@ -534,6 +566,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn partial_expiration_date_signatures(
|
||||
&self,
|
||||
expiration_date: Option<Date>,
|
||||
@@ -557,6 +590,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn partial_coin_indices_signatures(
|
||||
&self,
|
||||
epoch_id: Option<EpochId>,
|
||||
@@ -577,6 +611,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn global_expiration_date_signatures(
|
||||
&self,
|
||||
expiration_date: Option<Date>,
|
||||
@@ -600,6 +635,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn global_coin_indices_signatures(
|
||||
&self,
|
||||
epoch_id: Option<EpochId>,
|
||||
@@ -620,6 +656,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn master_verification_key(
|
||||
&self,
|
||||
epoch_id: Option<EpochId>,
|
||||
@@ -639,6 +676,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn epoch_credentials(
|
||||
&self,
|
||||
dkg_epoch: EpochId,
|
||||
@@ -655,6 +693,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn issued_credential(
|
||||
&self,
|
||||
credential_id: i64,
|
||||
@@ -671,6 +710,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn issued_credentials(
|
||||
&self,
|
||||
credential_ids: Vec<i64>,
|
||||
|
||||
@@ -8,9 +8,9 @@ use crate::nyxd::CosmWasmClient;
|
||||
use async_trait::async_trait;
|
||||
use cosmrs::AccountId;
|
||||
use cosmwasm_std::Addr;
|
||||
use log::trace;
|
||||
use nym_coconut_dkg_common::types::{ChunkIndex, NodeIndex, StateAdvanceResponse};
|
||||
use serde::Deserialize;
|
||||
use tracing::trace;
|
||||
|
||||
use nym_coconut_dkg_common::dealer::RegisteredDealerDetails;
|
||||
pub use nym_coconut_dkg_common::{
|
||||
|
||||
+3
-4
@@ -29,7 +29,6 @@ use cosmrs::proto::cosmwasm::wasm::v1::{
|
||||
};
|
||||
use cosmrs::tendermint::{block, chain, Hash};
|
||||
use cosmrs::{AccountId, Coin as CosmosCoin, Tx};
|
||||
use log::trace;
|
||||
use prost::Message;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -68,7 +67,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
Res: Message + Default,
|
||||
{
|
||||
if let Some(ref abci_path) = path {
|
||||
trace!("performing query on abci path {abci_path}")
|
||||
tracing::trace!("performing query on abci path {abci_path}")
|
||||
}
|
||||
let mut buf = Vec::with_capacity(req.encoded_len());
|
||||
req.encode(&mut buf)?;
|
||||
@@ -297,7 +296,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
|
||||
let start = Instant::now();
|
||||
loop {
|
||||
log::debug!(
|
||||
tracing::debug!(
|
||||
"Polling for result of including {} in a block...",
|
||||
broadcasted.hash
|
||||
);
|
||||
@@ -522,7 +521,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
.make_abci_query::<_, QuerySmartContractStateResponse>(path, req)
|
||||
.await?;
|
||||
|
||||
trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
|
||||
tracing::trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
|
||||
Ok(serde_json::from_slice(&res.data)?)
|
||||
}
|
||||
|
||||
|
||||
+1
-1
@@ -25,12 +25,12 @@ use cosmrs::proto::cosmos::tx::signing::v1beta1::SignMode;
|
||||
use cosmrs::staking::{MsgDelegate, MsgUndelegate};
|
||||
use cosmrs::tx::{self, Msg};
|
||||
use cosmrs::{cosmwasm, AccountId, Any, Tx};
|
||||
use log::debug;
|
||||
use serde::Serialize;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use std::time::SystemTime;
|
||||
use tendermint_rpc::endpoint::broadcast;
|
||||
use tracing::debug;
|
||||
|
||||
fn empty_fee() -> tx::Fee {
|
||||
tx::Fee {
|
||||
|
||||
@@ -7,9 +7,9 @@ use base64::Engine;
|
||||
use cosmrs::abci::TxMsgData;
|
||||
use cosmrs::cosmwasm::MsgExecuteContractResponse;
|
||||
use cosmrs::proto::cosmos::base::query::v1beta1::{PageRequest, PageResponse};
|
||||
use log::error;
|
||||
use prost::bytes::Bytes;
|
||||
use tendermint_rpc::endpoint::broadcast;
|
||||
use tracing::error;
|
||||
|
||||
pub use cosmrs::abci::MsgResponse;
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Display;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tracing::warn;
|
||||
use tracing::{instrument, warn};
|
||||
use url::Url;
|
||||
|
||||
pub use reqwest::IntoUrl;
|
||||
@@ -202,6 +202,7 @@ impl Client {
|
||||
self.reqwest_client.get(url)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, fields(path=?path))]
|
||||
async fn send_get_request<K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
@@ -212,6 +213,7 @@ impl Client {
|
||||
V: AsRef<str>,
|
||||
E: Display,
|
||||
{
|
||||
tracing::trace!("Sending GET request");
|
||||
let url = sanitize_url(&self.base_url, path, params);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
@@ -277,6 +279,7 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub async fn get_json<T, K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
@@ -512,12 +515,14 @@ pub fn sanitize_url<K: AsRef<str>, V: AsRef<str>>(
|
||||
url
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub async fn parse_response<T, E>(res: Response, allow_empty: bool) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
E: DeserializeOwned + Display,
|
||||
{
|
||||
let status = res.status();
|
||||
tracing::debug!("Status: {} (success: {})", &status, status.is_success());
|
||||
|
||||
if !allow_empty {
|
||||
if let Some(0) = res.content_length() {
|
||||
|
||||
+3
-3
@@ -21,8 +21,8 @@ COCONUT_DKG_CONTRACT_ADDRESS=n19604yflqggs9mk2z26mqygq43q2kr3n932egxx630svywd5mp
|
||||
|
||||
REWARDING_VALIDATOR_ADDRESS=n10yyd98e2tuwu0f7ypz9dy3hhjw7v772q6287gy
|
||||
STATISTICS_SERVICE_DOMAIN_ADDRESS="https://mainnet-stats.nymte.ch:8090"
|
||||
NYXD="https://rpc.nymtech.net"
|
||||
NYM_API="https://validator.nymtech.net/api/"
|
||||
NYXD=https://rpc.nymtech.net
|
||||
NYM_API=https://validator.nymtech.net/api/
|
||||
NYXD_WS="wss://rpc.nymtech.net/websocket"
|
||||
EXPLORER_API="https://explorer.nymtech.net/api/"
|
||||
EXPLORER_API=https://explorer.nymtech.net/api/
|
||||
NYM_VPN_API="https://nymvpn.com/api"
|
||||
|
||||
@@ -7,12 +7,12 @@ license.workspace = true
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
log.workspace = true
|
||||
nym-explorer-api-requests = { path = "../explorer-api-requests" }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
url.workspace = true
|
||||
tracing = {workspace = true, features = ["attributes"]}
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::time::Duration;
|
||||
|
||||
use reqwest::StatusCode;
|
||||
use thiserror::Error;
|
||||
use tracing::instrument;
|
||||
use url::Url;
|
||||
|
||||
// Re-export request types
|
||||
@@ -47,6 +48,12 @@ impl ExplorerClient {
|
||||
Ok(Self { client, url })
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn new_with_timeout(url: url::Url, timeout: Duration) -> Result<Self, ExplorerApiError> {
|
||||
let client = reqwest::Client::builder().timeout(timeout).build()?;
|
||||
Ok(Self { client, url })
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub fn new(url: url::Url) -> Result<Self, ExplorerApiError> {
|
||||
let client = reqwest::Client::builder().build()?;
|
||||
@@ -58,10 +65,11 @@ impl ExplorerClient {
|
||||
paths: &[&str],
|
||||
) -> Result<reqwest::Response, ExplorerApiError> {
|
||||
let url = combine_url(self.url.clone(), paths)?;
|
||||
log::trace!("Sending GET request {url:?}");
|
||||
tracing::debug!("Sending GET request");
|
||||
Ok(self.client.get(url).send().await?)
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all, fields(paths=?paths))]
|
||||
async fn query_explorer_api<T>(&self, paths: &[&str]) -> Result<T, ExplorerApiError>
|
||||
where
|
||||
T: std::fmt::Debug,
|
||||
@@ -70,7 +78,7 @@ impl ExplorerClient {
|
||||
let response = self.send_get_request(paths).await?;
|
||||
if response.status().is_success() {
|
||||
let res = response.json::<T>().await?;
|
||||
log::trace!("Got response: {res:?}");
|
||||
tracing::trace!("Got response: {res:?}");
|
||||
Ok(res)
|
||||
} else if response.status() == StatusCode::NOT_FOUND {
|
||||
Err(ExplorerApiError::NotFound)
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
data/
|
||||
enter_db.sh
|
||||
@@ -0,0 +1,62 @@
|
||||
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
[package]
|
||||
name = "nym-node-status-api"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
rust-version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
axum = { workspace = true, features = ["tokio"] }
|
||||
chrono = { workspace = true }
|
||||
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
|
||||
cosmwasm-std = { workspace = true }
|
||||
envy = { workspace = true }
|
||||
futures-util = { workspace = true }
|
||||
moka = { workspace = true, features = ["future"] }
|
||||
nym-bin-common = { path = "../common/bin-common" }
|
||||
nym-explorer-client = { path = "../explorer-api/explorer-client" }
|
||||
# TODO dz: ref before Nym API client changes. Update to latest develop once new Nym API is live
|
||||
nym-network-defaults = { git = "https://github.com/nymtech/nym", rev = "f86e08866" }
|
||||
nym-validator-client = { git = "https://github.com/nymtech/nym", rev = "f86e08866" }
|
||||
# nym-network-defaults = { path = "../common/network-defaults" }
|
||||
# nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-node-requests = { git = "https://github.com/nymtech/nym", rev = "f86e08866" }
|
||||
# nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] }
|
||||
reqwest = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
serde_json_path = { workspace = true }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||
tracing-log = { workspace = true }
|
||||
tower-http = { workspace = true, features = ["cors", "trace"] }
|
||||
utoipa = { workspace = true, features = ["axum_extras", "time"] }
|
||||
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
|
||||
# TODO dz `cargo update async-trait`
|
||||
# for automatic schema detection, which was merged, but not released yet
|
||||
# https://github.com/ProbablyClem/utoipauto/pull/38
|
||||
# utoipauto = { git = "https://github.com/ProbablyClem/utoipauto", rev = "eb04cba" }
|
||||
utoipauto = { workspace = true }
|
||||
|
||||
[build-dependencies]
|
||||
anyhow = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros"] }
|
||||
sqlx = { workspace = true, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
"macros",
|
||||
"migrate",
|
||||
] }
|
||||
@@ -0,0 +1,15 @@
|
||||
FROM rust:latest AS builder
|
||||
|
||||
COPY ./ /usr/src/nym
|
||||
WORKDIR /usr/src/nym/nym-node-status-api
|
||||
|
||||
RUN cargo build --release
|
||||
|
||||
FROM ubuntu:24.04
|
||||
|
||||
RUN apt-get update && apt-get install -y ca-certificates
|
||||
|
||||
WORKDIR /nym
|
||||
|
||||
COPY --from=builder /usr/src/nym/target/release/nym-node-status-api ./
|
||||
ENTRYPOINT [ "/nym/nym-node-status-api" ]
|
||||
@@ -0,0 +1,46 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use sqlx::{Connection, SqliteConnection};
|
||||
use tokio::{fs::File, io::AsyncWriteExt};
|
||||
|
||||
const SQLITE_DB_FILENAME: &str = "nym-node-status-api.sqlite";
|
||||
|
||||
/// If you need to re-run migrations or reset the db, just run
|
||||
/// cargo clean -p nym-node-status-api
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let out_dir = read_env_var("OUT_DIR")?;
|
||||
let database_path = format!("sqlite://{}/{}?mode=rwc", out_dir, SQLITE_DB_FILENAME);
|
||||
|
||||
write_db_path_to_file(&out_dir, SQLITE_DB_FILENAME).await?;
|
||||
let mut conn = SqliteConnection::connect(&database_path).await?;
|
||||
sqlx::migrate!("./migrations").run(&mut conn).await?;
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
println!("cargo::rustc-env=DATABASE_URL=sqlite://{}", &database_path);
|
||||
|
||||
#[cfg(target_family = "windows")]
|
||||
// for some strange reason we need to add a leading `/` to the windows path even though it's
|
||||
// not a valid windows path... but hey, it works...
|
||||
println!("cargo::rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
|
||||
|
||||
rerun_if_changed();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_env_var(var: &str) -> Result<String> {
|
||||
std::env::var(var).map_err(|_| anyhow!("You need to set {} env var", var))
|
||||
}
|
||||
|
||||
fn rerun_if_changed() {
|
||||
println!("cargo::rerun-if-changed=migrations");
|
||||
println!("cargo::rerun-if-changed=src/db/queries");
|
||||
}
|
||||
|
||||
/// use `./enter_db.sh` to inspect DB
|
||||
async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Result<()> {
|
||||
let mut file = File::create("enter_db.sh").await?;
|
||||
let _ = file.write(b"#!/bin/bash\n").await?;
|
||||
file.write_all(format!("sqlite3 {}/{}", out_dir, db_filename).as_bytes())
|
||||
.await
|
||||
.map_err(From::from)
|
||||
}
|
||||
+18
@@ -0,0 +1,18 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
export RUST_LOG=${RUST_LOG:-debug}
|
||||
|
||||
export NYM_API_CLIENT_TIMEOUT=60
|
||||
export EXPLORER_CLIENT_TIMEOUT=60
|
||||
#export NYXD=https://rpc.nymtech.net
|
||||
#export NYM_API=https://validator.nymtech.net/api/
|
||||
#export EXPLORER_API=https://explorer.nymtech.net/api/
|
||||
#export NETWORK_NAME=mainnet
|
||||
|
||||
#cargo run --package nym-node-status-api --release -- --connection-url "sqlite://node-status-api.sqlite?mode=rwc"
|
||||
|
||||
cd ..
|
||||
docker build -t node-status-api -f nym-node-status-api/Dockerfile .
|
||||
docker run --env-file envs/mainnet.env -e NYM_NODE_STATUS_API_CONNECTION_URL="sqlite://node-status-api.sqlite?mode=rwc" node-status-api
|
||||
@@ -0,0 +1,100 @@
|
||||
CREATE TABLE gateways
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
gateway_identity_key VARCHAR NOT NULL UNIQUE,
|
||||
self_described VARCHAR,
|
||||
explorer_pretty_bond VARCHAR,
|
||||
last_probe_result VARCHAR,
|
||||
last_probe_log VARCHAR,
|
||||
config_score INTEGER NOT NULL DEFAULT (0),
|
||||
config_score_successes REAL NOT NULL DEFAULT (0),
|
||||
config_score_samples REAL NOT NULL DEFAULT (0),
|
||||
routing_score INTEGER NOT NULL DEFAULT (0),
|
||||
routing_score_successes REAL NOT NULL DEFAULT (0),
|
||||
routing_score_samples REAL NOT NULL DEFAULT (0),
|
||||
test_run_samples REAL NOT NULL DEFAULT (0),
|
||||
last_testrun_utc INTEGER,
|
||||
last_updated_utc INTEGER NOT NULL,
|
||||
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
|
||||
blacklisted INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
|
||||
performance INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE INDEX idx_gateway_description_gateway_identity_key ON gateways (gateway_identity_key);
|
||||
|
||||
|
||||
CREATE TABLE mixnodes (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
identity_key VARCHAR NOT NULL UNIQUE,
|
||||
mix_id INTEGER NOT NULL UNIQUE,
|
||||
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
|
||||
total_stake INTEGER NOT NULL,
|
||||
host VARCHAR NOT NULL,
|
||||
http_api_port INTEGER NOT NULL,
|
||||
blacklisted INTEGER CHECK (blacklisted in (0, 1)) NOT NULL DEFAULT 0,
|
||||
full_details VARCHAR,
|
||||
self_described VARCHAR,
|
||||
last_updated_utc INTEGER NOT NULL
|
||||
, is_dp_delegatee INTEGER CHECK (is_dp_delegatee IN (0, 1)) NOT NULL DEFAULT 0);
|
||||
CREATE INDEX idx_mixnodes_mix_id ON mixnodes (mix_id);
|
||||
CREATE INDEX idx_mixnodes_identity_key ON mixnodes (identity_key);
|
||||
|
||||
CREATE TABLE
|
||||
mixnode_description (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
mix_id INTEGER UNIQUE NOT NULL,
|
||||
moniker VARCHAR,
|
||||
website VARCHAR,
|
||||
security_contact VARCHAR,
|
||||
details VARCHAR,
|
||||
last_updated_utc INTEGER NOT NULL,
|
||||
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id)
|
||||
);
|
||||
|
||||
-- Indexes for description table
|
||||
CREATE INDEX idx_mixnode_description_mix_id ON mixnode_description (mix_id);
|
||||
|
||||
|
||||
CREATE TABLE summary
|
||||
(
|
||||
key VARCHAR PRIMARY KEY,
|
||||
value_json VARCHAR,
|
||||
last_updated_utc INTEGER NOT NULL
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE summary_history
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
date VARCHAR UNIQUE NOT NULL,
|
||||
timestamp_utc INTEGER NOT NULL,
|
||||
value_json VARCHAR
|
||||
);
|
||||
CREATE INDEX idx_summary_history_timestamp_utc ON summary_history (timestamp_utc);
|
||||
CREATE INDEX idx_summary_history_date ON summary_history (date);
|
||||
|
||||
|
||||
CREATE TABLE gateway_description (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
gateway_identity_key VARCHAR UNIQUE NOT NULL,
|
||||
moniker VARCHAR,
|
||||
website VARCHAR,
|
||||
security_contact VARCHAR,
|
||||
details VARCHAR,
|
||||
last_updated_utc INTEGER NOT NULL,
|
||||
FOREIGN KEY (gateway_identity_key) REFERENCES gateways (gateway_identity_key)
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE
|
||||
mixnode_daily_stats (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
mix_id INTEGER NOT NULL,
|
||||
total_stake BIGINT NOT NULL,
|
||||
date_utc VARCHAR NOT NULL,
|
||||
packets_received INTEGER DEFAULT 0,
|
||||
packets_sent INTEGER DEFAULT 0,
|
||||
packets_dropped INTEGER DEFAULT 0,
|
||||
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id),
|
||||
UNIQUE (mix_id, date_utc) -- This constraint automatically creates an index
|
||||
);
|
||||
@@ -0,0 +1,69 @@
|
||||
use clap::Parser;
|
||||
use nym_bin_common::bin_info;
|
||||
use reqwest::Url;
|
||||
use std::{sync::OnceLock, time::Duration};
|
||||
|
||||
// Helper for passing LONG_VERSION to clap
|
||||
fn pretty_build_info_static() -> &'static str {
|
||||
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
|
||||
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Parser)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Cli {
|
||||
/// Network name for the network to which we're connecting.
|
||||
#[clap(long, env = "NETWORK_NAME")]
|
||||
pub(crate) network_name: String,
|
||||
|
||||
/// Explorer api url.
|
||||
#[clap(short, long, env = "EXPLORER_API")]
|
||||
pub(crate) explorer_api: String,
|
||||
|
||||
/// Nym api url.
|
||||
#[clap(short, long, env = "NYM_API")]
|
||||
pub(crate) nym_api: String,
|
||||
|
||||
/// TTL for the http cache.
|
||||
#[clap(
|
||||
long,
|
||||
default_value_t = 30,
|
||||
env = "NYM_NODE_STATUS_API_NYM_HTTP_CACHE_TTL"
|
||||
)]
|
||||
pub(crate) nym_http_cache_ttl: u64,
|
||||
|
||||
/// HTTP port on which to run node status api.
|
||||
#[clap(long, default_value_t = 8000, env = "NYM_NODE_STATUS_API_HTTP_PORT")]
|
||||
pub(crate) http_port: u16,
|
||||
|
||||
/// Nyxd address.
|
||||
#[clap(long, env = "NYXD")]
|
||||
pub(crate) nyxd_addr: Url,
|
||||
|
||||
/// Nym api client timeout.
|
||||
#[clap(
|
||||
long,
|
||||
default_value = "15",
|
||||
env = "NYM_NODE_STATUS_API_NYM_API_CLIENT_TIMEOUT"
|
||||
)]
|
||||
#[arg(value_parser = parse_duration)]
|
||||
pub(crate) nym_api_client_timeout: Duration,
|
||||
|
||||
/// Explorer api client timeout.
|
||||
#[clap(
|
||||
long,
|
||||
default_value = "15",
|
||||
env = "NYM_NODE_STATUS_API_EXPLORER_CLIENT_TIMEOUT"
|
||||
)]
|
||||
#[arg(value_parser = parse_duration)]
|
||||
pub(crate) explorer_client_timeout: Duration,
|
||||
|
||||
/// Connection url for the database.
|
||||
#[clap(long, env = "NYM_NODE_STATUS_API_CONNECTION_URL")]
|
||||
pub(crate) connection_url: String,
|
||||
}
|
||||
|
||||
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
|
||||
let seconds = arg.parse()?;
|
||||
Ok(std::time::Duration::from_secs(seconds))
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
|
||||
|
||||
pub(crate) mod models;
|
||||
pub(crate) mod queries;
|
||||
|
||||
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
|
||||
|
||||
pub(crate) type DbPool = SqlitePool;
|
||||
|
||||
pub(crate) struct Storage {
|
||||
pool: DbPool,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub async fn init(connection_url: String) -> Result<Self> {
|
||||
let connect_options = {
|
||||
let connect_options = SqliteConnectOptions::from_str(&connection_url)?;
|
||||
let mut connect_options = connect_options.create_if_missing(true);
|
||||
let connect_options = connect_options.disable_statement_logging();
|
||||
(*connect_options).clone()
|
||||
};
|
||||
|
||||
let pool = sqlx::SqlitePool::connect_with(connect_options)
|
||||
.await
|
||||
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
|
||||
|
||||
MIGRATOR.run(&pool).await?;
|
||||
|
||||
Ok(Storage { pool })
|
||||
}
|
||||
|
||||
/// Cloning pool is cheap, it's the same underlying set of connections
|
||||
pub async fn pool_owned(&self) -> DbPool {
|
||||
self.pool.clone()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,300 @@
|
||||
use crate::{
|
||||
http::{self, models::SummaryHistory},
|
||||
monitor::NumericalCheckedCast,
|
||||
};
|
||||
use nym_node_requests::api::v1::node::models::NodeDescription;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
pub(crate) struct GatewayRecord {
|
||||
pub(crate) identity_key: String,
|
||||
pub(crate) bonded: bool,
|
||||
pub(crate) blacklisted: bool,
|
||||
pub(crate) self_described: Option<String>,
|
||||
pub(crate) explorer_pretty_bond: Option<String>,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
pub(crate) performance: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct GatewayDto {
|
||||
pub(crate) gateway_identity_key: String,
|
||||
pub(crate) bonded: bool,
|
||||
pub(crate) blacklisted: bool,
|
||||
pub(crate) performance: i64,
|
||||
pub(crate) self_described: Option<String>,
|
||||
pub(crate) explorer_pretty_bond: Option<String>,
|
||||
pub(crate) last_probe_result: Option<String>,
|
||||
pub(crate) last_probe_log: Option<String>,
|
||||
pub(crate) last_testrun_utc: Option<i64>,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
pub(crate) moniker: String,
|
||||
pub(crate) security_contact: String,
|
||||
pub(crate) details: String,
|
||||
pub(crate) website: String,
|
||||
}
|
||||
|
||||
impl TryFrom<GatewayDto> for http::models::Gateway {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: GatewayDto) -> Result<Self, Self::Error> {
|
||||
// Instead of using routing_score_successes / routing_score_samples, we use the
|
||||
// number of successful testruns in the last 24h.
|
||||
let routing_score = 0f32;
|
||||
let config_score = 0u32;
|
||||
let last_updated_utc =
|
||||
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
|
||||
let last_testrun_utc = value
|
||||
.last_testrun_utc
|
||||
.and_then(|i| i.cast_checked().ok())
|
||||
.map(|t| timestamp_as_utc(t).to_rfc3339());
|
||||
|
||||
let self_described = value.self_described.clone().unwrap_or("null".to_string());
|
||||
let explorer_pretty_bond = value
|
||||
.explorer_pretty_bond
|
||||
.clone()
|
||||
.unwrap_or("null".to_string());
|
||||
let last_probe_result = value
|
||||
.last_probe_result
|
||||
.clone()
|
||||
.unwrap_or("null".to_string());
|
||||
let last_probe_log = value.last_probe_log.clone();
|
||||
|
||||
let self_described = serde_json::from_str(&self_described).unwrap_or(None);
|
||||
let explorer_pretty_bond = serde_json::from_str(&explorer_pretty_bond).unwrap_or(None);
|
||||
let last_probe_result = serde_json::from_str(&last_probe_result).unwrap_or(None);
|
||||
|
||||
let bonded = value.bonded;
|
||||
let blacklisted = value.blacklisted;
|
||||
let performance = value.performance as u8;
|
||||
|
||||
let description = NodeDescription {
|
||||
moniker: value.moniker.clone(),
|
||||
website: value.website.clone(),
|
||||
security_contact: value.security_contact.clone(),
|
||||
details: value.details.clone(),
|
||||
};
|
||||
|
||||
Ok(http::models::Gateway {
|
||||
gateway_identity_key: value.gateway_identity_key.clone(),
|
||||
bonded,
|
||||
blacklisted,
|
||||
performance,
|
||||
self_described,
|
||||
explorer_pretty_bond,
|
||||
description,
|
||||
last_probe_result,
|
||||
last_probe_log,
|
||||
routing_score,
|
||||
config_score,
|
||||
last_testrun_utc,
|
||||
last_updated_utc,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn timestamp_as_utc(unix_timestamp: u64) -> chrono::DateTime<chrono::Utc> {
|
||||
let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp);
|
||||
d.into()
|
||||
}
|
||||
|
||||
pub(crate) struct MixnodeRecord {
|
||||
pub(crate) mix_id: u32,
|
||||
pub(crate) identity_key: String,
|
||||
pub(crate) bonded: bool,
|
||||
pub(crate) total_stake: i64,
|
||||
pub(crate) host: String,
|
||||
pub(crate) http_port: u16,
|
||||
pub(crate) blacklisted: bool,
|
||||
pub(crate) full_details: String,
|
||||
pub(crate) self_described: Option<String>,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
pub(crate) is_dp_delegatee: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct MixnodeDto {
|
||||
pub(crate) mix_id: i64,
|
||||
pub(crate) bonded: bool,
|
||||
pub(crate) blacklisted: bool,
|
||||
pub(crate) is_dp_delegatee: bool,
|
||||
pub(crate) total_stake: i64,
|
||||
pub(crate) full_details: String,
|
||||
pub(crate) self_described: Option<String>,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
pub(crate) moniker: String,
|
||||
pub(crate) website: String,
|
||||
pub(crate) security_contact: String,
|
||||
pub(crate) details: String,
|
||||
}
|
||||
|
||||
impl TryFrom<MixnodeDto> for http::models::Mixnode {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: MixnodeDto) -> Result<Self, Self::Error> {
|
||||
let mix_id = value.mix_id.cast_checked()?;
|
||||
let full_details = value.full_details.clone();
|
||||
let full_details = serde_json::from_str(&full_details).unwrap_or(None);
|
||||
|
||||
let self_described = value
|
||||
.self_described
|
||||
.clone()
|
||||
.map(|v| serde_json::from_str(&v).unwrap_or(serde_json::Value::Null));
|
||||
|
||||
let last_updated_utc =
|
||||
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
|
||||
let blacklisted = value.blacklisted;
|
||||
let is_dp_delegatee = value.is_dp_delegatee;
|
||||
let moniker = value.moniker.clone();
|
||||
let website = value.website.clone();
|
||||
let security_contact = value.security_contact.clone();
|
||||
let details = value.details.clone();
|
||||
|
||||
Ok(http::models::Mixnode {
|
||||
mix_id,
|
||||
bonded: value.bonded,
|
||||
blacklisted,
|
||||
is_dp_delegatee,
|
||||
total_stake: value.total_stake,
|
||||
full_details,
|
||||
description: NodeDescription {
|
||||
moniker,
|
||||
website,
|
||||
security_contact,
|
||||
details,
|
||||
},
|
||||
self_described,
|
||||
last_updated_utc,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct BondedStatusDto {
|
||||
pub(crate) id: i64,
|
||||
pub(crate) identity_key: String,
|
||||
pub(crate) bonded: bool,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct SummaryDto {
|
||||
pub(crate) key: String,
|
||||
pub(crate) value_json: String,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct SummaryHistoryDto {
|
||||
#[allow(dead_code)]
|
||||
pub id: i64,
|
||||
pub date: String,
|
||||
pub value_json: String,
|
||||
pub timestamp_utc: i64,
|
||||
}
|
||||
|
||||
impl TryFrom<SummaryHistoryDto> for SummaryHistory {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: SummaryHistoryDto) -> Result<Self, Self::Error> {
|
||||
let value_json = serde_json::from_str(&value.value_json).unwrap_or_default();
|
||||
Ok(SummaryHistory {
|
||||
value_json,
|
||||
date: value.date.clone(),
|
||||
timestamp_utc: timestamp_as_utc(value.timestamp_utc.cast_checked()?).to_rfc3339(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count";
|
||||
pub(crate) const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active";
|
||||
pub(crate) const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive";
|
||||
pub(crate) const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve";
|
||||
pub(crate) const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count";
|
||||
|
||||
pub(crate) const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count";
|
||||
pub(crate) const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count";
|
||||
pub(crate) const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count";
|
||||
|
||||
pub(crate) const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count";
|
||||
pub(crate) const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count";
|
||||
|
||||
// `utoipa`` goes crazy if you use module-qualified prefix as field type so we
|
||||
// have to import it
|
||||
use gateway::GatewaySummary;
|
||||
use mixnode::MixnodeSummary;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct NetworkSummary {
|
||||
pub(crate) mixnodes: MixnodeSummary,
|
||||
pub(crate) gateways: GatewaySummary,
|
||||
}
|
||||
|
||||
pub(crate) mod mixnode {
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct MixnodeSummary {
|
||||
pub(crate) bonded: MixnodeSummaryBonded,
|
||||
pub(crate) blacklisted: MixnodeSummaryBlacklisted,
|
||||
pub(crate) historical: MixnodeSummaryHistorical,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct MixnodeSummaryBonded {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) active: i32,
|
||||
pub(crate) inactive: i32,
|
||||
pub(crate) reserve: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct MixnodeSummaryBlacklisted {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct MixnodeSummaryHistorical {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod gateway {
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummary {
|
||||
pub(crate) bonded: GatewaySummaryBonded,
|
||||
pub(crate) blacklisted: GatewaySummaryBlacklisted,
|
||||
pub(crate) historical: GatewaySummaryHistorical,
|
||||
pub(crate) explorer: GatewaySummaryExplorer,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummaryExplorer {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummaryBonded {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummaryHistorical {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummaryBlacklisted {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,160 @@
|
||||
use crate::{
|
||||
db::{
|
||||
models::{BondedStatusDto, GatewayDto, GatewayRecord},
|
||||
DbPool,
|
||||
},
|
||||
http::models::Gateway,
|
||||
};
|
||||
use futures_util::TryStreamExt;
|
||||
use nym_validator_client::models::DescribedGateway;
|
||||
use tracing::error;
|
||||
|
||||
pub(crate) async fn insert_gateways(
|
||||
pool: &DbPool,
|
||||
gateways: Vec<GatewayRecord>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut db = pool.acquire().await?;
|
||||
for record in gateways {
|
||||
sqlx::query!(
|
||||
"INSERT INTO gateways
|
||||
(gateway_identity_key, bonded, blacklisted,
|
||||
self_described, explorer_pretty_bond,
|
||||
last_updated_utc, performance)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(gateway_identity_key) DO UPDATE SET
|
||||
bonded=excluded.bonded,
|
||||
blacklisted=excluded.blacklisted,
|
||||
self_described=excluded.self_described,
|
||||
explorer_pretty_bond=excluded.explorer_pretty_bond,
|
||||
last_updated_utc=excluded.last_updated_utc,
|
||||
performance = excluded.performance;",
|
||||
record.identity_key,
|
||||
record.bonded,
|
||||
record.blacklisted,
|
||||
record.self_described,
|
||||
record.explorer_pretty_bond,
|
||||
record.last_updated_utc,
|
||||
record.performance
|
||||
)
|
||||
.execute(&mut *db)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn write_blacklisted_gateways_to_db<'a, I>(
|
||||
pool: &DbPool,
|
||||
gateways: I,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
I: Iterator<Item = &'a String>,
|
||||
{
|
||||
let mut conn = pool.acquire().await?;
|
||||
for gateway_identity_key in gateways {
|
||||
sqlx::query!(
|
||||
"UPDATE gateways
|
||||
SET blacklisted = true
|
||||
WHERE gateway_identity_key = ?;",
|
||||
gateway_identity_key,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ensure all gateways that are set as bonded, are still bonded
|
||||
pub(crate) async fn ensure_gateways_still_bonded(
|
||||
pool: &DbPool,
|
||||
gateways: &[DescribedGateway],
|
||||
) -> anyhow::Result<usize> {
|
||||
let bonded_gateways_rows = get_all_bonded_gateways_row_ids_by_status(pool, true).await?;
|
||||
let unbonded_gateways_rows = bonded_gateways_rows.iter().filter(|v| {
|
||||
!gateways
|
||||
.iter()
|
||||
.any(|bonded| *bonded.bond.identity() == v.identity_key)
|
||||
});
|
||||
|
||||
let recently_unbonded_gateways = unbonded_gateways_rows.to_owned().count();
|
||||
let last_updated_utc = chrono::offset::Utc::now().timestamp();
|
||||
let mut transaction = pool.begin().await?;
|
||||
for row in unbonded_gateways_rows {
|
||||
sqlx::query!(
|
||||
"UPDATE gateways
|
||||
SET bonded = ?, last_updated_utc = ?
|
||||
WHERE id = ?;",
|
||||
false,
|
||||
last_updated_utc,
|
||||
row.id,
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
}
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(recently_unbonded_gateways)
|
||||
}
|
||||
|
||||
async fn get_all_bonded_gateways_row_ids_by_status(
|
||||
pool: &DbPool,
|
||||
status: bool,
|
||||
) -> anyhow::Result<Vec<BondedStatusDto>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
BondedStatusDto,
|
||||
r#"SELECT
|
||||
id as "id!",
|
||||
gateway_identity_key as "identity_key!",
|
||||
bonded as "bonded: bool"
|
||||
FROM gateways
|
||||
WHERE bonded = ?"#,
|
||||
status,
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gateway>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
GatewayDto,
|
||||
r#"SELECT
|
||||
gw.gateway_identity_key as "gateway_identity_key!",
|
||||
gw.bonded as "bonded: bool",
|
||||
gw.blacklisted as "blacklisted: bool",
|
||||
gw.performance as "performance!",
|
||||
gw.self_described as "self_described?",
|
||||
gw.explorer_pretty_bond as "explorer_pretty_bond?",
|
||||
gw.last_probe_result as "last_probe_result?",
|
||||
gw.last_probe_log as "last_probe_log?",
|
||||
gw.last_testrun_utc as "last_testrun_utc?",
|
||||
gw.last_updated_utc as "last_updated_utc!",
|
||||
COALESCE(gd.moniker, "NA") as "moniker!",
|
||||
COALESCE(gd.website, "NA") as "website!",
|
||||
COALESCE(gd.security_contact, "NA") as "security_contact!",
|
||||
COALESCE(gd.details, "NA") as "details!"
|
||||
FROM gateways gw
|
||||
LEFT JOIN gateway_description gd
|
||||
ON gw.gateway_identity_key = gd.gateway_identity_key
|
||||
ORDER BY gw.gateway_identity_key"#,
|
||||
)
|
||||
.fetch(&mut conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
let items: Vec<Gateway> = items
|
||||
.into_iter()
|
||||
.map(|item| item.try_into())
|
||||
.collect::<anyhow::Result<Vec<_>>>()
|
||||
.map_err(|e| {
|
||||
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
|
||||
e
|
||||
})?;
|
||||
tracing::trace!("Fetched {} gateways from DB", items.len());
|
||||
Ok(items)
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
use crate::db::{models::NetworkSummary, DbPool};
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
/// take `last_updated` instead of calculating it so that `summary` matches
|
||||
/// `daily_summary`
|
||||
pub(crate) async fn insert_summaries(
|
||||
pool: &DbPool,
|
||||
summaries: &[(&str, &usize)],
|
||||
summary: &NetworkSummary,
|
||||
last_updated: DateTime<Utc>,
|
||||
) -> anyhow::Result<()> {
|
||||
insert_summary(pool, summaries, last_updated).await?;
|
||||
|
||||
insert_summary_history(pool, summary, last_updated).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_summary(
|
||||
pool: &DbPool,
|
||||
summaries: &[(&str, &usize)],
|
||||
last_updated: DateTime<Utc>,
|
||||
) -> anyhow::Result<()> {
|
||||
let timestamp = last_updated.timestamp();
|
||||
let mut tx = pool.begin().await?;
|
||||
|
||||
for (kind, value) in summaries {
|
||||
let value = value.to_string();
|
||||
sqlx::query!(
|
||||
"INSERT INTO summary
|
||||
(key, value_json, last_updated_utc)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(key) DO UPDATE SET
|
||||
value_json=excluded.value_json,
|
||||
last_updated_utc=excluded.last_updated_utc;",
|
||||
kind,
|
||||
value,
|
||||
timestamp
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
tracing::error!("Failed to insert data for {kind}: {err}, aborting transaction",);
|
||||
err
|
||||
})?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For `<date_N>`, `summary_history` is updated with fresh data on every
|
||||
/// iteration.
|
||||
///
|
||||
/// After UTC midnight, summary is inserted for `<date_N+1>` and last entry for
|
||||
/// `<date_N>` stays there forever.
|
||||
///
|
||||
/// This is not aggregate data, it's a set of latest data points
|
||||
async fn insert_summary_history(
|
||||
pool: &DbPool,
|
||||
summary: &NetworkSummary,
|
||||
last_updated: DateTime<Utc>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
let value_json = serde_json::to_string(&summary)?;
|
||||
let timestamp = last_updated.timestamp();
|
||||
let now_rfc3339 = last_updated.to_rfc3339();
|
||||
// YYYY-MM-DD, without time
|
||||
let date = &now_rfc3339[..10];
|
||||
|
||||
sqlx::query!(
|
||||
"INSERT INTO summary_history
|
||||
(date, timestamp_utc, value_json)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(date) DO UPDATE SET
|
||||
timestamp_utc=excluded.timestamp_utc,
|
||||
value_json=excluded.value_json;",
|
||||
date,
|
||||
timestamp,
|
||||
value_json
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
use futures_util::TryStreamExt;
|
||||
use nym_validator_client::models::MixNodeBondAnnotated;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
models::{BondedStatusDto, MixnodeDto, MixnodeRecord},
|
||||
DbPool,
|
||||
},
|
||||
http::models::{DailyStats, Mixnode},
|
||||
};
|
||||
|
||||
pub(crate) async fn insert_mixnodes(
|
||||
pool: &DbPool,
|
||||
mixnodes: Vec<MixnodeRecord>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
for record in mixnodes.iter() {
|
||||
// https://www.sqlite.org/lang_upsert.html
|
||||
sqlx::query!(
|
||||
"INSERT INTO mixnodes
|
||||
(mix_id, identity_key, bonded, total_stake,
|
||||
host, http_api_port, blacklisted, full_details,
|
||||
self_described, last_updated_utc, is_dp_delegatee)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(mix_id) DO UPDATE SET
|
||||
bonded=excluded.bonded,
|
||||
total_stake=excluded.total_stake, host=excluded.host,
|
||||
http_api_port=excluded.http_api_port,blacklisted=excluded.blacklisted,
|
||||
full_details=excluded.full_details,self_described=excluded.self_described,
|
||||
last_updated_utc=excluded.last_updated_utc,
|
||||
is_dp_delegatee = excluded.is_dp_delegatee;",
|
||||
record.mix_id,
|
||||
record.identity_key,
|
||||
record.bonded,
|
||||
record.total_stake,
|
||||
record.host,
|
||||
record.http_port,
|
||||
record.blacklisted,
|
||||
record.full_details,
|
||||
record.self_described,
|
||||
record.last_updated_utc,
|
||||
record.is_dp_delegatee
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_mixnodes(pool: &DbPool) -> anyhow::Result<Vec<Mixnode>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
MixnodeDto,
|
||||
r#"SELECT
|
||||
mn.mix_id as "mix_id!",
|
||||
mn.bonded as "bonded: bool",
|
||||
mn.blacklisted as "blacklisted: bool",
|
||||
mn.is_dp_delegatee as "is_dp_delegatee: bool",
|
||||
mn.total_stake as "total_stake!",
|
||||
mn.full_details as "full_details!",
|
||||
mn.self_described as "self_described",
|
||||
mn.last_updated_utc as "last_updated_utc!",
|
||||
COALESCE(md.moniker, "NA") as "moniker!",
|
||||
COALESCE(md.website, "NA") as "website!",
|
||||
COALESCE(md.security_contact, "NA") as "security_contact!",
|
||||
COALESCE(md.details, "NA") as "details!"
|
||||
FROM mixnodes mn
|
||||
LEFT JOIN mixnode_description md ON mn.mix_id = md.mix_id
|
||||
ORDER BY mn.mix_id"#
|
||||
)
|
||||
.fetch(&mut conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
let items = items
|
||||
.into_iter()
|
||||
.map(|item| item.try_into())
|
||||
.collect::<anyhow::Result<Vec<_>>>()
|
||||
.map_err(|e| {
|
||||
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
|
||||
e
|
||||
})?;
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
/// We fetch the latest 30 days of data as a subquery and then
|
||||
/// return it in ascending order, so we don't break existing UI
|
||||
pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailyStats>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
DailyStats,
|
||||
r#"
|
||||
SELECT
|
||||
date_utc as "date_utc!",
|
||||
packets_received as "total_packets_received!: i64",
|
||||
packets_sent as "total_packets_sent!: i64",
|
||||
packets_dropped as "total_packets_dropped!: i64",
|
||||
total_stake as "total_stake!: i64"
|
||||
FROM (
|
||||
SELECT
|
||||
date_utc,
|
||||
SUM(packets_received) as packets_received,
|
||||
SUM(packets_sent) as packets_sent,
|
||||
SUM(packets_dropped) as packets_dropped,
|
||||
SUM(total_stake) as total_stake
|
||||
FROM mixnode_daily_stats
|
||||
GROUP BY date_utc
|
||||
ORDER BY date_utc DESC
|
||||
LIMIT 30
|
||||
)
|
||||
GROUP BY date_utc
|
||||
ORDER BY date_utc
|
||||
"#
|
||||
)
|
||||
.fetch(&mut conn)
|
||||
.try_collect::<Vec<DailyStats>>()
|
||||
.await?;
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
/// Ensure all mixnodes that are set as bonded, are still bonded
|
||||
pub(crate) async fn ensure_mixnodes_still_bonded(
|
||||
pool: &DbPool,
|
||||
mixnodes: &[MixNodeBondAnnotated],
|
||||
) -> anyhow::Result<usize> {
|
||||
let bonded_mixnodes_rows = get_all_bonded_mixnodes_row_ids_by_status(pool, true).await?;
|
||||
let unbonded_mixnodes_rows = bonded_mixnodes_rows.iter().filter(|v| {
|
||||
!mixnodes
|
||||
.iter()
|
||||
.any(|bonded| *bonded.mixnode_details.bond_information.identity() == v.identity_key)
|
||||
});
|
||||
|
||||
let recently_unbonded_mixnodes = unbonded_mixnodes_rows.to_owned().count();
|
||||
let last_updated_utc = chrono::offset::Utc::now().timestamp();
|
||||
let mut transaction = pool.begin().await?;
|
||||
for row in unbonded_mixnodes_rows {
|
||||
sqlx::query!(
|
||||
"UPDATE mixnodes
|
||||
SET bonded = ?, last_updated_utc = ?
|
||||
WHERE id = ?;",
|
||||
false,
|
||||
last_updated_utc,
|
||||
row.id,
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
}
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(recently_unbonded_mixnodes)
|
||||
}
|
||||
|
||||
async fn get_all_bonded_mixnodes_row_ids_by_status(
|
||||
pool: &DbPool,
|
||||
status: bool,
|
||||
) -> anyhow::Result<Vec<BondedStatusDto>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
BondedStatusDto,
|
||||
r#"SELECT
|
||||
id as "id!",
|
||||
identity_key as "identity_key!",
|
||||
bonded as "bonded: bool"
|
||||
FROM mixnodes
|
||||
WHERE bonded = ?"#,
|
||||
status,
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
mod gateways;
|
||||
mod misc;
|
||||
mod mixnodes;
|
||||
mod summary;
|
||||
|
||||
pub(crate) use gateways::{
|
||||
ensure_gateways_still_bonded, get_all_gateways, insert_gateways,
|
||||
write_blacklisted_gateways_to_db,
|
||||
};
|
||||
pub(crate) use misc::insert_summaries;
|
||||
pub(crate) use mixnodes::{
|
||||
ensure_mixnodes_still_bonded, get_all_mixnodes, get_daily_stats, insert_mixnodes,
|
||||
};
|
||||
pub(crate) use summary::{get_summary, get_summary_history};
|
||||
@@ -0,0 +1,209 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures_util::TryStreamExt;
|
||||
use std::collections::HashMap;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
models::{
|
||||
gateway::{
|
||||
GatewaySummary, GatewaySummaryBlacklisted, GatewaySummaryBonded,
|
||||
GatewaySummaryExplorer, GatewaySummaryHistorical,
|
||||
},
|
||||
mixnode::{
|
||||
MixnodeSummary, MixnodeSummaryBlacklisted, MixnodeSummaryBonded,
|
||||
MixnodeSummaryHistorical,
|
||||
},
|
||||
NetworkSummary, SummaryDto, SummaryHistoryDto,
|
||||
},
|
||||
DbPool,
|
||||
},
|
||||
http::{
|
||||
error::{HttpError, HttpResult},
|
||||
models::SummaryHistory,
|
||||
},
|
||||
};
|
||||
|
||||
pub(crate) async fn get_summary_history(pool: &DbPool) -> anyhow::Result<Vec<SummaryHistory>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
SummaryHistoryDto,
|
||||
r#"SELECT
|
||||
id as "id!",
|
||||
date as "date!",
|
||||
timestamp_utc as "timestamp_utc!",
|
||||
value_json as "value_json!"
|
||||
FROM summary_history
|
||||
ORDER BY date DESC
|
||||
LIMIT 30"#,
|
||||
)
|
||||
.fetch(&mut conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
let items = items
|
||||
.into_iter()
|
||||
.map(|item| item.try_into())
|
||||
.collect::<anyhow::Result<Vec<_>>>()
|
||||
.map_err(|e| {
|
||||
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
|
||||
e
|
||||
})?;
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
async fn get_summary_dto(pool: &DbPool) -> anyhow::Result<Vec<SummaryDto>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
Ok(sqlx::query_as!(
|
||||
SummaryDto,
|
||||
r#"SELECT
|
||||
key as "key!",
|
||||
value_json as "value_json!",
|
||||
last_updated_utc as "last_updated_utc!"
|
||||
FROM summary"#
|
||||
)
|
||||
.fetch(&mut conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_summary(pool: &DbPool) -> HttpResult<NetworkSummary> {
|
||||
let items = get_summary_dto(pool).await.map_err(|err| {
|
||||
tracing::error!("Couldn't get Summary from DB: {err}");
|
||||
HttpError::internal()
|
||||
})?;
|
||||
from_summary_dto(items).await
|
||||
}
|
||||
|
||||
async fn from_summary_dto(items: Vec<SummaryDto>) -> HttpResult<NetworkSummary> {
|
||||
const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count";
|
||||
const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active";
|
||||
const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive";
|
||||
const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve";
|
||||
const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count";
|
||||
const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count";
|
||||
const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count";
|
||||
const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count";
|
||||
const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count";
|
||||
const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count";
|
||||
|
||||
// convert database rows into a map by key
|
||||
let mut map = HashMap::new();
|
||||
for item in items {
|
||||
map.insert(item.key.clone(), item);
|
||||
}
|
||||
|
||||
// check we have all the keys we are expecting, and build up a map of errors for missing one
|
||||
let keys = [
|
||||
GATEWAYS_BONDED_COUNT,
|
||||
GATEWAYS_EXPLORER_COUNT,
|
||||
GATEWAYS_HISTORICAL_COUNT,
|
||||
GATEWAYS_BLACKLISTED_COUNT,
|
||||
MIXNODES_BLACKLISTED_COUNT,
|
||||
MIXNODES_BONDED_ACTIVE,
|
||||
MIXNODES_BONDED_COUNT,
|
||||
MIXNODES_BONDED_INACTIVE,
|
||||
MIXNODES_BONDED_RESERVE,
|
||||
MIXNODES_HISTORICAL_COUNT,
|
||||
];
|
||||
|
||||
let mut errors: Vec<&str> = vec![];
|
||||
for key in keys {
|
||||
if !map.contains_key(key) {
|
||||
errors.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
// return an error if anything is missing, with a nice list
|
||||
if !errors.is_empty() {
|
||||
tracing::error!("Summary value missing: {}", errors.join(", "));
|
||||
return Err(HttpError::internal());
|
||||
}
|
||||
|
||||
// strip the options and use default values (anything missing is trapped above)
|
||||
let mixnodes_bonded_count: SummaryDto =
|
||||
map.get(MIXNODES_BONDED_COUNT).cloned().unwrap_or_default();
|
||||
let mixnodes_bonded_active: SummaryDto =
|
||||
map.get(MIXNODES_BONDED_ACTIVE).cloned().unwrap_or_default();
|
||||
let mixnodes_bonded_inactive: SummaryDto = map
|
||||
.get(MIXNODES_BONDED_INACTIVE)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let mixnodes_bonded_reserve: SummaryDto = map
|
||||
.get(MIXNODES_BONDED_RESERVE)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let mixnodes_blacklisted_count: SummaryDto = map
|
||||
.get(MIXNODES_BLACKLISTED_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let gateways_bonded_count: SummaryDto =
|
||||
map.get(GATEWAYS_BONDED_COUNT).cloned().unwrap_or_default();
|
||||
let gateways_explorer_count: SummaryDto = map
|
||||
.get(GATEWAYS_EXPLORER_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let mixnodes_historical_count: SummaryDto = map
|
||||
.get(MIXNODES_HISTORICAL_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let gateways_historical_count: SummaryDto = map
|
||||
.get(GATEWAYS_HISTORICAL_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let gateways_blacklisted_count: SummaryDto = map
|
||||
.get(GATEWAYS_BLACKLISTED_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(NetworkSummary {
|
||||
mixnodes: MixnodeSummary {
|
||||
bonded: MixnodeSummaryBonded {
|
||||
count: to_count_i32(&mixnodes_bonded_count),
|
||||
active: to_count_i32(&mixnodes_bonded_active),
|
||||
reserve: to_count_i32(&mixnodes_bonded_reserve),
|
||||
inactive: to_count_i32(&mixnodes_bonded_inactive),
|
||||
last_updated_utc: to_timestamp(&mixnodes_bonded_count),
|
||||
},
|
||||
blacklisted: MixnodeSummaryBlacklisted {
|
||||
count: to_count_i32(&mixnodes_blacklisted_count),
|
||||
last_updated_utc: to_timestamp(&mixnodes_blacklisted_count),
|
||||
},
|
||||
historical: MixnodeSummaryHistorical {
|
||||
count: to_count_i32(&mixnodes_historical_count),
|
||||
last_updated_utc: to_timestamp(&mixnodes_historical_count),
|
||||
},
|
||||
},
|
||||
gateways: GatewaySummary {
|
||||
bonded: GatewaySummaryBonded {
|
||||
count: to_count_i32(&gateways_bonded_count),
|
||||
last_updated_utc: to_timestamp(&gateways_bonded_count),
|
||||
},
|
||||
blacklisted: GatewaySummaryBlacklisted {
|
||||
count: to_count_i32(&gateways_blacklisted_count),
|
||||
last_updated_utc: to_timestamp(&gateways_blacklisted_count),
|
||||
},
|
||||
historical: GatewaySummaryHistorical {
|
||||
count: to_count_i32(&gateways_historical_count),
|
||||
last_updated_utc: to_timestamp(&gateways_historical_count),
|
||||
},
|
||||
explorer: GatewaySummaryExplorer {
|
||||
count: to_count_i32(&gateways_explorer_count),
|
||||
last_updated_utc: to_timestamp(&gateways_explorer_count),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn to_count_i32(value: &SummaryDto) -> i32 {
|
||||
value.value_json.parse::<i32>().unwrap_or_default()
|
||||
}
|
||||
|
||||
fn to_timestamp(value: &SummaryDto) -> String {
|
||||
timestamp_as_utc(value.last_updated_utc as u64).to_rfc3339()
|
||||
}
|
||||
|
||||
fn timestamp_as_utc(unix_timestamp: u64) -> DateTime<Utc> {
|
||||
let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp);
|
||||
d.into()
|
||||
}
|
||||
@@ -0,0 +1,110 @@
|
||||
use axum::{
|
||||
extract::{Path, Query, State},
|
||||
Json, Router,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::http::{
|
||||
error::{HttpError, HttpResult},
|
||||
models::{Gateway, GatewaySkinny},
|
||||
state::AppState,
|
||||
PagedResult, Pagination,
|
||||
};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", axum::routing::get(gateways))
|
||||
.route("/skinny", axum::routing::get(gateways_skinny))
|
||||
.route("/skinny/:identity_key", axum::routing::get(get_gateway))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Gateways",
|
||||
get,
|
||||
params(
|
||||
Pagination
|
||||
),
|
||||
path = "/v2/gateways",
|
||||
responses(
|
||||
(status = 200, body = PagedGateway)
|
||||
)
|
||||
)]
|
||||
async fn gateways(
|
||||
Query(pagination): Query<Pagination>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<PagedResult<Gateway>>> {
|
||||
let db = state.db_pool();
|
||||
let res = state.cache().get_gateway_list(db).await;
|
||||
|
||||
Ok(Json(PagedResult::paginate(pagination, res)))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Gateways",
|
||||
get,
|
||||
params(
|
||||
Pagination
|
||||
),
|
||||
path = "/v2/gateways/skinny",
|
||||
responses(
|
||||
(status = 200, body = PagedGatewaySkinny)
|
||||
)
|
||||
)]
|
||||
async fn gateways_skinny(
|
||||
Query(pagination): Query<Pagination>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<PagedResult<GatewaySkinny>>> {
|
||||
let db = state.db_pool();
|
||||
let res = state.cache().get_gateway_list(db).await;
|
||||
let res: Vec<GatewaySkinny> = res
|
||||
.iter()
|
||||
.filter(|g| g.bonded)
|
||||
.map(|g| GatewaySkinny {
|
||||
gateway_identity_key: g.gateway_identity_key.clone(),
|
||||
self_described: g.self_described.clone(),
|
||||
performance: g.performance,
|
||||
explorer_pretty_bond: g.explorer_pretty_bond.clone(),
|
||||
last_probe_result: g.last_probe_result.clone(),
|
||||
last_testrun_utc: g.last_testrun_utc.clone(),
|
||||
last_updated_utc: g.last_updated_utc.clone(),
|
||||
routing_score: g.routing_score,
|
||||
config_score: g.config_score,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Json(PagedResult::paginate(pagination, res)))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
#[into_params(parameter_in = Path)]
|
||||
struct IdentityKeyParam {
|
||||
identity_key: String,
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Gateways",
|
||||
get,
|
||||
params(
|
||||
IdentityKeyParam
|
||||
),
|
||||
path = "/v2/gateways/{identity_key}",
|
||||
responses(
|
||||
(status = 200, body = Gateway)
|
||||
)
|
||||
)]
|
||||
async fn get_gateway(
|
||||
Path(IdentityKeyParam { identity_key }): Path<IdentityKeyParam>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<Gateway>> {
|
||||
let db = state.db_pool();
|
||||
let res = state.cache().get_gateway_list(db).await;
|
||||
|
||||
match res
|
||||
.iter()
|
||||
.find(|item| item.gateway_identity_key == identity_key)
|
||||
{
|
||||
Some(res) => Ok(Json(res.clone())),
|
||||
None => Err(HttpError::invalid_input(identity_key)),
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
use axum::{
|
||||
extract::{Path, Query, State},
|
||||
Json, Router,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use tracing::instrument;
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::http::{
|
||||
error::{HttpError, HttpResult},
|
||||
models::{DailyStats, Mixnode},
|
||||
state::AppState,
|
||||
PagedResult, Pagination,
|
||||
};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", axum::routing::get(mixnodes))
|
||||
.route("/:mix_id", axum::routing::get(get_mixnodes))
|
||||
.route("/stats", axum::routing::get(get_stats))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Mixnodes",
|
||||
get,
|
||||
params(
|
||||
Pagination
|
||||
),
|
||||
path = "/v2/mixnodes",
|
||||
responses(
|
||||
(status = 200, body = PagedMixnode)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(page=pagination.page, size=pagination.size))]
|
||||
async fn mixnodes(
|
||||
Query(pagination): Query<Pagination>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<PagedResult<Mixnode>>> {
|
||||
let db = state.db_pool();
|
||||
let res = state.cache().get_mixnodes_list(db).await;
|
||||
|
||||
Ok(Json(PagedResult::paginate(pagination, res)))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
#[into_params(parameter_in = Path)]
|
||||
struct MixIdParam {
|
||||
mix_id: String,
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Mixnodes",
|
||||
get,
|
||||
params(
|
||||
MixIdParam
|
||||
),
|
||||
path = "/v2/mixnodes/{mix_id}",
|
||||
responses(
|
||||
(status = 200, body = Mixnode)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(mix_id = mix_id))]
|
||||
async fn get_mixnodes(
|
||||
Path(MixIdParam { mix_id }): Path<MixIdParam>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<Mixnode>> {
|
||||
match mix_id.parse::<u32>() {
|
||||
Ok(parsed_mix_id) => {
|
||||
let res = state.cache().get_mixnodes_list(state.db_pool()).await;
|
||||
|
||||
match res.iter().find(|item| item.mix_id == parsed_mix_id) {
|
||||
Some(res) => Ok(Json(res.clone())),
|
||||
None => Err(HttpError::invalid_input(mix_id)),
|
||||
}
|
||||
}
|
||||
Err(_e) => Err(HttpError::invalid_input(mix_id)),
|
||||
}
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Mixnodes",
|
||||
get,
|
||||
path = "/v2/mixnodes/stats",
|
||||
responses(
|
||||
(status = 200, body = Vec<DailyStats>)
|
||||
)
|
||||
)]
|
||||
async fn get_stats(State(state): State<AppState>) -> HttpResult<Json<Vec<DailyStats>>> {
|
||||
let stats = state.cache().get_mixnode_stats(state.db_pool()).await;
|
||||
Ok(Json(stats))
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
use anyhow::anyhow;
|
||||
use axum::{response::Redirect, Router};
|
||||
use tokio::net::ToSocketAddrs;
|
||||
use tower_http::{cors::CorsLayer, trace::TraceLayer};
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
use crate::http::{server::HttpServer, state::AppState};
|
||||
|
||||
pub(crate) mod gateways;
|
||||
pub(crate) mod mixnodes;
|
||||
pub(crate) mod services;
|
||||
pub(crate) mod summary;
|
||||
pub(crate) mod testruns;
|
||||
|
||||
pub(crate) struct RouterBuilder {
|
||||
unfinished_router: Router<AppState>,
|
||||
}
|
||||
|
||||
impl RouterBuilder {
|
||||
pub(crate) fn with_default_routes() -> Self {
|
||||
let router = Router::new()
|
||||
.merge(
|
||||
SwaggerUi::new("/swagger")
|
||||
.url("/api-docs/openapi.json", super::api_docs::ApiDoc::openapi()),
|
||||
)
|
||||
.route(
|
||||
"/",
|
||||
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
|
||||
)
|
||||
.nest(
|
||||
"/v2",
|
||||
Router::new()
|
||||
.nest("/gateways", gateways::routes())
|
||||
.nest("/mixnodes", mixnodes::routes())
|
||||
.nest("/services", services::routes())
|
||||
.nest("/summary", summary::routes()),
|
||||
// .nest("/testruns", testruns::_routes()),
|
||||
);
|
||||
|
||||
Self {
|
||||
unfinished_router: router,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn with_state(self, state: AppState) -> RouterWithState {
|
||||
RouterWithState {
|
||||
router: self.finalize_routes().with_state(state),
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize_routes(self) -> Router<AppState> {
|
||||
// layers added later wrap earlier layers
|
||||
self.unfinished_router
|
||||
// CORS layer needs to wrap other API layers
|
||||
.layer(setup_cors())
|
||||
// logger should be outermost layer
|
||||
.layer(TraceLayer::new_for_http())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RouterWithState {
|
||||
router: Router,
|
||||
}
|
||||
|
||||
impl RouterWithState {
|
||||
pub(crate) async fn build_server<A: ToSocketAddrs>(
|
||||
self,
|
||||
bind_address: A,
|
||||
) -> anyhow::Result<HttpServer> {
|
||||
tokio::net::TcpListener::bind(bind_address)
|
||||
.await
|
||||
.map(|listener| HttpServer::new(self.router, listener))
|
||||
.map_err(|err| anyhow!("Couldn't bind to address due to {}", err))
|
||||
}
|
||||
}
|
||||
|
||||
fn setup_cors() -> CorsLayer {
|
||||
use axum::http::Method;
|
||||
CorsLayer::new()
|
||||
.allow_origin(tower_http::cors::Any)
|
||||
.allow_methods([Method::POST, Method::GET, Method::PATCH, Method::OPTIONS])
|
||||
.allow_headers(tower_http::cors::Any)
|
||||
.allow_credentials(false)
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
use serde_json_path::JsonPath;
|
||||
|
||||
use crate::http::models::Gateway;
|
||||
|
||||
pub(super) struct ParseJsonPaths {
|
||||
pub(super) path_ip_address: JsonPath,
|
||||
pub(super) path_hostname: JsonPath,
|
||||
pub(super) path_service_provider_client_id: JsonPath,
|
||||
}
|
||||
|
||||
impl ParseJsonPaths {
|
||||
pub fn new() -> Result<Self, serde_json_path::ParseError> {
|
||||
Ok(ParseJsonPaths {
|
||||
path_ip_address: JsonPath::parse("$.host_information.ip_address[0]")?,
|
||||
path_hostname: JsonPath::parse("$.host_information.hostname")?,
|
||||
path_service_provider_client_id: JsonPath::parse("$.network_requester.address")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct ParsedDetails {
|
||||
pub(super) ip_address: Option<String>,
|
||||
pub(super) hostname: Option<String>,
|
||||
pub(super) service_provider_client_id: Option<String>,
|
||||
}
|
||||
|
||||
impl ParsedDetails {
|
||||
fn get_string_from_json_path(
|
||||
value: &Option<serde_json::Value>,
|
||||
path: &JsonPath,
|
||||
) -> Option<String> {
|
||||
match value {
|
||||
Some(value) => path
|
||||
.query(value)
|
||||
.exactly_one()
|
||||
.map(|v2| v2.as_str().map(|v3| v3.to_string()))
|
||||
.ok()
|
||||
.flatten(),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
pub fn new(paths: &ParseJsonPaths, g: &Gateway) -> ParsedDetails {
|
||||
ParsedDetails {
|
||||
hostname: ParsedDetails::get_string_from_json_path(
|
||||
&g.self_described,
|
||||
&paths.path_hostname,
|
||||
),
|
||||
ip_address: ParsedDetails::get_string_from_json_path(
|
||||
&g.self_described,
|
||||
&paths.path_ip_address,
|
||||
),
|
||||
service_provider_client_id: ParsedDetails::get_string_from_json_path(
|
||||
&g.self_described,
|
||||
&paths.path_service_provider_client_id,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
use crate::http::{
|
||||
error::{HttpError, HttpResult},
|
||||
models::Service,
|
||||
state::AppState,
|
||||
PagedResult, Pagination,
|
||||
};
|
||||
use axum::{
|
||||
extract::{Query, State},
|
||||
Json, Router,
|
||||
};
|
||||
use json_path::{ParseJsonPaths, ParsedDetails};
|
||||
use tracing::instrument;
|
||||
|
||||
mod json_path;
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new().route("/", axum::routing::get(mixnodes))
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::IntoParams)]
|
||||
#[into_params(parameter_in = Query)]
|
||||
pub(crate) struct ServicesQueryParams {
|
||||
size: Option<usize>,
|
||||
page: Option<usize>,
|
||||
wss: Option<bool>,
|
||||
hostname: Option<bool>,
|
||||
entry: Option<bool>,
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Services",
|
||||
get,
|
||||
params(
|
||||
ServicesQueryParams,
|
||||
),
|
||||
path = "/v2/services",
|
||||
responses(
|
||||
(status = 200, body = PagedService)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip(state))]
|
||||
async fn mixnodes(
|
||||
Query(ServicesQueryParams {
|
||||
size,
|
||||
page,
|
||||
wss,
|
||||
hostname,
|
||||
entry,
|
||||
}): Query<ServicesQueryParams>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<PagedResult<Service>>> {
|
||||
let db = state.db_pool();
|
||||
let cache = state.cache();
|
||||
|
||||
let show_only_wss = wss.unwrap_or(false);
|
||||
let show_only_with_hostname = hostname.unwrap_or(false);
|
||||
let show_entry_gateways_only = entry.unwrap_or(false);
|
||||
|
||||
let paths = ParseJsonPaths::new().map_err(|e| {
|
||||
tracing::error!("Invalidly configured ParseJsonPaths: {e}");
|
||||
HttpError::internal()
|
||||
})?;
|
||||
let res = cache.get_gateway_list(db).await;
|
||||
let res: Vec<Service> = res
|
||||
.iter()
|
||||
.map(|g| {
|
||||
let details = ParsedDetails::new(&paths, g);
|
||||
|
||||
let s = Service {
|
||||
gateway_identity_key: g.gateway_identity_key.clone(),
|
||||
ip_address: details.ip_address,
|
||||
service_provider_client_id: details.service_provider_client_id,
|
||||
hostname: details.hostname,
|
||||
last_successful_ping_utc: g.last_testrun_utc.clone(),
|
||||
last_updated_utc: g.last_updated_utc.clone(),
|
||||
// routing_score: g.routing_score,
|
||||
routing_score: 1f32,
|
||||
mixnet_websockets: g
|
||||
.self_described
|
||||
.clone()
|
||||
.and_then(|s| s.get("mixnet_websockets").cloned()),
|
||||
};
|
||||
|
||||
let f = ServiceFilter::new(&s);
|
||||
|
||||
(s, f)
|
||||
})
|
||||
.filter(|(_, f)| {
|
||||
let mut keep = f.has_network_requester_sp;
|
||||
|
||||
if show_entry_gateways_only {
|
||||
keep = true;
|
||||
}
|
||||
|
||||
if show_only_wss {
|
||||
keep &= f.has_wss;
|
||||
}
|
||||
if show_only_with_hostname {
|
||||
keep &= f.has_hostname;
|
||||
}
|
||||
|
||||
keep
|
||||
})
|
||||
.map(|(s, _)| s)
|
||||
.collect();
|
||||
|
||||
Ok(Json(PagedResult::paginate(Pagination { size, page }, res)))
|
||||
}
|
||||
|
||||
struct ServiceFilter {
|
||||
has_wss: bool,
|
||||
has_network_requester_sp: bool,
|
||||
has_hostname: bool,
|
||||
}
|
||||
|
||||
impl ServiceFilter {
|
||||
fn new(s: &Service) -> Self {
|
||||
let has_wss = match &s.mixnet_websockets {
|
||||
Some(v) => v.get("wss_port").map(|v2| !v2.is_null()).unwrap_or(false),
|
||||
None => false,
|
||||
};
|
||||
let has_hostname = s.hostname.is_some();
|
||||
let has_network_requester_sp = match &s.service_provider_client_id {
|
||||
Some(v) => !v.is_empty(),
|
||||
None => false,
|
||||
};
|
||||
|
||||
ServiceFilter {
|
||||
has_wss,
|
||||
has_hostname,
|
||||
has_network_requester_sp,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
use axum::{extract::State, Json, Router};
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{
|
||||
db::models::NetworkSummary,
|
||||
http::{error::HttpResult, models::SummaryHistory, state::AppState},
|
||||
};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", axum::routing::get(summary))
|
||||
.route("/history", axum::routing::get(summary_history))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Summary",
|
||||
get,
|
||||
path = "/v2/summary",
|
||||
responses(
|
||||
(status = 200, body = NetworkSummary)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn summary(State(state): State<AppState>) -> HttpResult<Json<NetworkSummary>> {
|
||||
crate::db::queries::get_summary(state.db_pool())
|
||||
.await
|
||||
.map(Json)
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Summary",
|
||||
get,
|
||||
path = "/v2/summary/history",
|
||||
responses(
|
||||
(status = 200, body = Vec<SummaryHistory>)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn summary_history(State(state): State<AppState>) -> HttpResult<Json<Vec<SummaryHistory>>> {
|
||||
Ok(Json(
|
||||
state.cache().get_summary_history(state.db_pool()).await,
|
||||
))
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
use axum::Router;
|
||||
|
||||
use crate::http::state::AppState;
|
||||
|
||||
pub(crate) fn _routes() -> Router<AppState> {
|
||||
unimplemented!()
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
use crate::http::{Gateway, GatewaySkinny, Mixnode, Service};
|
||||
use utoipa::OpenApi;
|
||||
use utoipauto::utoipauto;
|
||||
|
||||
// manually import external structs which are behind feature flags because they
|
||||
// can't be automatically discovered
|
||||
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
|
||||
#[utoipauto(paths = "./nym-node-status-api/src")]
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
info(title = "Nym API"),
|
||||
tags(),
|
||||
components(schemas(nym_node_requests::api::v1::node::models::NodeDescription,))
|
||||
)]
|
||||
pub(super) struct ApiDoc;
|
||||
@@ -0,0 +1,28 @@
|
||||
pub(crate) type HttpResult<T> = Result<T, HttpError>;
|
||||
|
||||
pub(crate) struct HttpError {
|
||||
message: String,
|
||||
status: axum::http::StatusCode,
|
||||
}
|
||||
|
||||
impl HttpError {
|
||||
pub(crate) fn invalid_input(message: String) -> Self {
|
||||
Self {
|
||||
message,
|
||||
status: axum::http::StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn internal() -> Self {
|
||||
Self {
|
||||
message: serde_json::json!({"message": "Internal server error"}).to_string(),
|
||||
status: axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl axum::response::IntoResponse for HttpError {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
(self.status, self.message).into_response()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
use models::{Gateway, GatewaySkinny, Mixnode, Service};
|
||||
|
||||
pub(crate) mod api;
|
||||
pub(crate) mod api_docs;
|
||||
pub(crate) mod error;
|
||||
pub(crate) mod models;
|
||||
pub(crate) mod server;
|
||||
pub(crate) mod state;
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
|
||||
// exclude generic from auto-discovery
|
||||
#[utoipauto::utoipa_ignore]
|
||||
// https://docs.rs/utoipa/latest/utoipa/derive.ToSchema.html#generic-schemas-with-aliases
|
||||
// Generic structs can only be included via aliases, not directly, because they
|
||||
// it would cause an error in generated Swagger docs.
|
||||
// Instead, you have to manually monomorphize each generic struct that
|
||||
// you wish to document
|
||||
#[aliases(
|
||||
PagedGateway = PagedResult<Gateway>,
|
||||
PagedGatewaySkinny = PagedResult<GatewaySkinny>,
|
||||
PagedMixnode = PagedResult<Mixnode>,
|
||||
PagedService = PagedResult<Service>,
|
||||
)]
|
||||
pub struct PagedResult<T> {
|
||||
pub page: usize,
|
||||
pub size: usize,
|
||||
pub total: usize,
|
||||
pub items: Vec<T>,
|
||||
}
|
||||
|
||||
impl<T: Clone> PagedResult<T> {
|
||||
pub fn paginate(pagination: Pagination, res: Vec<T>) -> Self {
|
||||
let total = res.len();
|
||||
let (size, mut page) = pagination.intoto_inner_values();
|
||||
|
||||
if page * size > total {
|
||||
page = total / size;
|
||||
}
|
||||
|
||||
let chunks: Vec<&[T]> = res.chunks(size).collect();
|
||||
|
||||
PagedResult {
|
||||
page,
|
||||
size,
|
||||
total,
|
||||
items: chunks.get(page).cloned().unwrap_or_default().into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::IntoParams)]
|
||||
#[into_params(parameter_in = Query)]
|
||||
pub(crate) struct Pagination {
|
||||
size: Option<usize>,
|
||||
page: Option<usize>,
|
||||
}
|
||||
|
||||
impl Pagination {
|
||||
// unwrap stored values or use predefined defaults
|
||||
pub(crate) fn intoto_inner_values(self) -> (usize, usize) {
|
||||
const SIZE_DEFAULT: usize = 10;
|
||||
const SIZE_MAX: usize = 200;
|
||||
|
||||
const PAGE_DEFAULT: usize = 0;
|
||||
|
||||
(
|
||||
self.size.unwrap_or(SIZE_DEFAULT).min(SIZE_MAX),
|
||||
self.page.unwrap_or(PAGE_DEFAULT),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
use nym_node_requests::api::v1::node::models::NodeDescription;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub struct Gateway {
|
||||
pub gateway_identity_key: String,
|
||||
pub bonded: bool,
|
||||
pub blacklisted: bool,
|
||||
pub performance: u8,
|
||||
pub self_described: Option<serde_json::Value>,
|
||||
pub explorer_pretty_bond: Option<serde_json::Value>,
|
||||
pub description: NodeDescription,
|
||||
pub last_probe_result: Option<serde_json::Value>,
|
||||
pub last_probe_log: Option<String>,
|
||||
pub last_testrun_utc: Option<String>,
|
||||
pub last_updated_utc: String,
|
||||
pub routing_score: f32,
|
||||
pub config_score: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub struct GatewaySkinny {
|
||||
pub gateway_identity_key: String,
|
||||
pub self_described: Option<serde_json::Value>,
|
||||
pub explorer_pretty_bond: Option<serde_json::Value>,
|
||||
pub last_probe_result: Option<serde_json::Value>,
|
||||
pub last_testrun_utc: Option<String>,
|
||||
pub last_updated_utc: String,
|
||||
pub routing_score: f32,
|
||||
pub config_score: u32,
|
||||
pub performance: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub struct Mixnode {
|
||||
pub mix_id: u32,
|
||||
pub bonded: bool,
|
||||
pub blacklisted: bool,
|
||||
pub is_dp_delegatee: bool,
|
||||
pub total_stake: i64,
|
||||
pub full_details: Option<serde_json::Value>,
|
||||
pub self_described: Option<serde_json::Value>,
|
||||
pub description: NodeDescription,
|
||||
pub last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub struct DailyStats {
|
||||
pub date_utc: String,
|
||||
pub total_packets_received: i64,
|
||||
pub total_packets_sent: i64,
|
||||
pub total_packets_dropped: i64,
|
||||
pub total_stake: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub struct Service {
|
||||
pub gateway_identity_key: String,
|
||||
pub last_updated_utc: String,
|
||||
pub routing_score: f32,
|
||||
pub service_provider_client_id: Option<String>,
|
||||
pub ip_address: Option<String>,
|
||||
pub hostname: Option<String>,
|
||||
pub mixnet_websockets: Option<serde_json::Value>,
|
||||
pub last_successful_ping_utc: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct SummaryHistory {
|
||||
pub date: String,
|
||||
pub value_json: serde_json::Value,
|
||||
pub timestamp_utc: String,
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
use axum::Router;
|
||||
use core::net::SocketAddr;
|
||||
use tokio::{net::TcpListener, task::JoinHandle};
|
||||
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
|
||||
|
||||
use crate::{
|
||||
db::DbPool,
|
||||
http::{api::RouterBuilder, state::AppState},
|
||||
};
|
||||
|
||||
/// Return handles that allow for graceful shutdown of server + awaiting its
|
||||
/// background tokio task
|
||||
pub(crate) async fn start_http_api(
|
||||
db_pool: DbPool,
|
||||
http_port: u16,
|
||||
nym_http_cache_ttl: u64,
|
||||
) -> anyhow::Result<ShutdownHandles> {
|
||||
let router_builder = RouterBuilder::with_default_routes();
|
||||
|
||||
let state = AppState::new(db_pool, nym_http_cache_ttl);
|
||||
let router = router_builder.with_state(state);
|
||||
|
||||
// TODO dz do we need this to be configurable?
|
||||
let bind_addr = format!("0.0.0.0:{}", http_port);
|
||||
let server = router.build_server(bind_addr).await?;
|
||||
|
||||
Ok(start_server(server))
|
||||
}
|
||||
|
||||
fn start_server(server: HttpServer) -> ShutdownHandles {
|
||||
// one copy is stored to trigger a graceful shutdown later
|
||||
let shutdown_button = CancellationToken::new();
|
||||
// other copy is given to server to listen for a shutdown
|
||||
let shutdown_receiver = shutdown_button.clone();
|
||||
let shutdown_receiver = shutdown_receiver.cancelled_owned();
|
||||
|
||||
let server_handle = tokio::spawn(async move { server.run(shutdown_receiver).await });
|
||||
|
||||
ShutdownHandles {
|
||||
server_handle,
|
||||
shutdown_button,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ShutdownHandles {
|
||||
server_handle: JoinHandle<std::io::Result<()>>,
|
||||
shutdown_button: CancellationToken,
|
||||
}
|
||||
|
||||
impl ShutdownHandles {
|
||||
/// Send graceful shutdown signal to server and wait for server task to complete
|
||||
pub(crate) async fn shutdown(self) -> anyhow::Result<()> {
|
||||
self.shutdown_button.cancel();
|
||||
|
||||
match self.server_handle.await {
|
||||
Ok(Ok(_)) => {
|
||||
tracing::info!("HTTP server shut down without errors");
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
tracing::error!("HTTP server terminated with: {err}");
|
||||
anyhow::bail!(err)
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Server task panicked: {err}");
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct HttpServer {
|
||||
router: Router,
|
||||
listener: TcpListener,
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub(crate) fn new(router: Router, listener: TcpListener) -> Self {
|
||||
Self { router, listener }
|
||||
}
|
||||
|
||||
pub(crate) async fn run(self, receiver: WaitForCancellationFutureOwned) -> std::io::Result<()> {
|
||||
// into_make_service_with_connect_info allows us to see client ip address
|
||||
axum::serve(
|
||||
self.listener,
|
||||
self.router
|
||||
.into_make_service_with_connect_info::<SocketAddr>(),
|
||||
)
|
||||
.with_graceful_shutdown(receiver)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,223 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use moka::{future::Cache, Entry};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{
|
||||
db::DbPool,
|
||||
http::models::{DailyStats, Gateway, Mixnode, SummaryHistory},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct AppState {
|
||||
db_pool: DbPool,
|
||||
cache: HttpCache,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub(crate) fn new(db_pool: DbPool, cache_ttl: u64) -> Self {
|
||||
Self {
|
||||
db_pool,
|
||||
cache: HttpCache::new(cache_ttl),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn db_pool(&self) -> &DbPool {
|
||||
&self.db_pool
|
||||
}
|
||||
|
||||
pub(crate) fn cache(&self) -> &HttpCache {
|
||||
&self.cache
|
||||
}
|
||||
}
|
||||
|
||||
static GATEWAYS_LIST_KEY: &str = "gateways";
|
||||
static MIXNODES_LIST_KEY: &str = "mixnodes";
|
||||
static MIXSTATS_LIST_KEY: &str = "mixstats";
|
||||
static SUMMARY_HISTORY_LIST_KEY: &str = "summary-history";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct HttpCache {
|
||||
gateways: Cache<String, Arc<RwLock<Vec<Gateway>>>>,
|
||||
mixnodes: Cache<String, Arc<RwLock<Vec<Mixnode>>>>,
|
||||
mixstats: Cache<String, Arc<RwLock<Vec<DailyStats>>>>,
|
||||
history: Cache<String, Arc<RwLock<Vec<SummaryHistory>>>>,
|
||||
}
|
||||
|
||||
impl HttpCache {
|
||||
pub fn new(ttl_seconds: u64) -> Self {
|
||||
HttpCache {
|
||||
gateways: Cache::builder()
|
||||
.max_capacity(2)
|
||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||
.build(),
|
||||
mixnodes: Cache::builder()
|
||||
.max_capacity(2)
|
||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||
.build(),
|
||||
mixstats: Cache::builder()
|
||||
.max_capacity(2)
|
||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||
.build(),
|
||||
history: Cache::builder()
|
||||
.max_capacity(2)
|
||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||
.build(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upsert_gateway_list(
|
||||
&self,
|
||||
new_gateway_list: Vec<Gateway>,
|
||||
) -> Entry<String, Arc<RwLock<Vec<Gateway>>>> {
|
||||
self.gateways
|
||||
.entry_by_ref(GATEWAYS_LIST_KEY)
|
||||
.and_upsert_with(|maybe_entry| async {
|
||||
if let Some(entry) = maybe_entry {
|
||||
let v = entry.into_value();
|
||||
let mut guard = v.write().await;
|
||||
*guard = new_gateway_list;
|
||||
v.clone()
|
||||
} else {
|
||||
Arc::new(RwLock::new(new_gateway_list))
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_gateway_list(&self, db: &DbPool) -> Vec<Gateway> {
|
||||
match self.gateways.get(GATEWAYS_LIST_KEY).await {
|
||||
Some(guard) => {
|
||||
let read_lock = guard.read().await;
|
||||
read_lock.clone()
|
||||
}
|
||||
None => {
|
||||
// the key is missing so populate it
|
||||
tracing::warn!("No gateways in cache, refreshing cache from DB...");
|
||||
|
||||
let gateways = crate::db::queries::get_all_gateways(db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
self.upsert_gateway_list(gateways.clone()).await;
|
||||
|
||||
if gateways.is_empty() {
|
||||
tracing::warn!("Database contains 0 gateways");
|
||||
}
|
||||
|
||||
gateways
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upsert_mixnode_list(
|
||||
&self,
|
||||
new_mixnode_list: Vec<Mixnode>,
|
||||
) -> Entry<String, Arc<RwLock<Vec<Mixnode>>>> {
|
||||
self.mixnodes
|
||||
.entry_by_ref(MIXNODES_LIST_KEY)
|
||||
.and_upsert_with(|maybe_entry| async {
|
||||
if let Some(entry) = maybe_entry {
|
||||
let v = entry.into_value();
|
||||
let mut guard = v.write().await;
|
||||
*guard = new_mixnode_list;
|
||||
v.clone()
|
||||
} else {
|
||||
Arc::new(RwLock::new(new_mixnode_list))
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_mixnodes_list(&self, db: &DbPool) -> Vec<Mixnode> {
|
||||
match self.mixnodes.get(MIXNODES_LIST_KEY).await {
|
||||
Some(guard) => {
|
||||
let read_lock = guard.read().await;
|
||||
read_lock.clone()
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("No mixnodes in cache, refreshing cache from DB...");
|
||||
|
||||
let mixnodes = crate::db::queries::get_all_mixnodes(db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
self.upsert_mixnode_list(mixnodes.clone()).await;
|
||||
|
||||
if mixnodes.is_empty() {
|
||||
tracing::warn!("Database contains 0 mixnodes");
|
||||
}
|
||||
|
||||
mixnodes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upsert_mixnode_stats(
|
||||
&self,
|
||||
mixnode_stats: Vec<DailyStats>,
|
||||
) -> Entry<String, Arc<RwLock<Vec<DailyStats>>>> {
|
||||
self.mixstats
|
||||
.entry_by_ref(MIXSTATS_LIST_KEY)
|
||||
.and_upsert_with(|maybe_entry| async {
|
||||
if let Some(entry) = maybe_entry {
|
||||
let v = entry.into_value();
|
||||
let mut guard = v.write().await;
|
||||
*guard = mixnode_stats;
|
||||
v.clone()
|
||||
} else {
|
||||
Arc::new(RwLock::new(mixnode_stats))
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_mixnode_stats(&self, db: &DbPool) -> Vec<DailyStats> {
|
||||
match self.mixstats.get(MIXSTATS_LIST_KEY).await {
|
||||
Some(guard) => {
|
||||
let read_lock = guard.read().await;
|
||||
read_lock.to_vec()
|
||||
}
|
||||
None => {
|
||||
let mixnode_stats = crate::db::queries::get_daily_stats(db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
self.upsert_mixnode_stats(mixnode_stats.clone()).await;
|
||||
mixnode_stats
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_summary_history(&self, db: &DbPool) -> Vec<SummaryHistory> {
|
||||
match self.history.get(SUMMARY_HISTORY_LIST_KEY).await {
|
||||
Some(guard) => {
|
||||
let read_lock = guard.read().await;
|
||||
read_lock.to_vec()
|
||||
}
|
||||
None => {
|
||||
let summary_history = crate::db::queries::get_summary_history(db)
|
||||
.await
|
||||
.unwrap_or(vec![]);
|
||||
self.upsert_summary_history(summary_history.clone()).await;
|
||||
summary_history
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upsert_summary_history(
|
||||
&self,
|
||||
summary_history: Vec<SummaryHistory>,
|
||||
) -> Entry<String, Arc<RwLock<Vec<SummaryHistory>>>> {
|
||||
self.history
|
||||
.entry_by_ref(SUMMARY_HISTORY_LIST_KEY)
|
||||
.and_upsert_with(|maybe_entry| async {
|
||||
if let Some(entry) = maybe_entry {
|
||||
let v = entry.into_value();
|
||||
let mut guard = v.write().await;
|
||||
*guard = summary_history;
|
||||
v.clone()
|
||||
} else {
|
||||
Arc::new(RwLock::new(summary_history))
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing_subscriber::{filter::Directive, EnvFilter};
|
||||
|
||||
pub(crate) fn setup_tracing_logger() {
|
||||
fn directive_checked(directive: impl Into<String>) -> Directive {
|
||||
directive
|
||||
.into()
|
||||
.parse()
|
||||
.expect("Failed to parse log directive")
|
||||
}
|
||||
|
||||
let log_builder = tracing_subscriber::fmt()
|
||||
// Use a more compact, abbreviated log format
|
||||
.compact()
|
||||
// Display source code file paths
|
||||
.with_file(true)
|
||||
// Display source code line numbers
|
||||
.with_line_number(true)
|
||||
.with_thread_ids(true)
|
||||
// Don't display the event's target (module path)
|
||||
.with_target(false);
|
||||
|
||||
let mut filter = EnvFilter::builder()
|
||||
// if RUST_LOG isn't set, set default level
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy();
|
||||
// these crates are more granularly filtered
|
||||
let filter_crates = [
|
||||
"reqwest",
|
||||
"rustls",
|
||||
"hyper",
|
||||
"sqlx",
|
||||
"h2",
|
||||
"tendermint_rpc",
|
||||
"tower_http",
|
||||
"axum",
|
||||
];
|
||||
for crate_name in filter_crates {
|
||||
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name)));
|
||||
}
|
||||
|
||||
filter = filter.add_directive(directive_checked("nym_bin_common=debug"));
|
||||
filter = filter.add_directive(directive_checked("nym_explorer_client=debug"));
|
||||
filter = filter.add_directive(directive_checked("nym_network_defaults=debug"));
|
||||
filter = filter.add_directive(directive_checked("nym_validator_client=debug"));
|
||||
|
||||
log_builder.with_env_filter(filter).init();
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
use clap::Parser;
|
||||
use nym_task::signal::wait_for_signal;
|
||||
|
||||
mod cli;
|
||||
mod db;
|
||||
mod http;
|
||||
mod logging;
|
||||
mod monitor;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
logging::setup_tracing_logger();
|
||||
|
||||
let args = cli::Cli::parse();
|
||||
|
||||
let connection_url = args.connection_url.clone();
|
||||
tracing::debug!("Using config:\n{:#?}", args);
|
||||
|
||||
let storage = db::Storage::init(connection_url).await?;
|
||||
let db_pool = storage.pool_owned().await;
|
||||
let args_clone = args.clone();
|
||||
tokio::spawn(async move {
|
||||
monitor::spawn_in_background(db_pool, args_clone).await;
|
||||
});
|
||||
tracing::info!("Started monitor task");
|
||||
|
||||
let shutdown_handles = http::server::start_http_api(
|
||||
storage.pool_owned().await,
|
||||
args.http_port,
|
||||
args.nym_http_cache_ttl,
|
||||
)
|
||||
.await
|
||||
.expect("Failed to start server");
|
||||
|
||||
tracing::info!("Started HTTP server on port {}", args.http_port);
|
||||
|
||||
wait_for_signal().await;
|
||||
|
||||
if let Err(err) = shutdown_handles.shutdown().await {
|
||||
tracing::error!("{err}");
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,449 @@
|
||||
use crate::cli::Cli;
|
||||
use crate::db::models::{
|
||||
gateway, mixnode, GatewayRecord, MixnodeRecord, NetworkSummary, GATEWAYS_BLACKLISTED_COUNT,
|
||||
GATEWAYS_BONDED_COUNT, GATEWAYS_EXPLORER_COUNT, GATEWAYS_HISTORICAL_COUNT,
|
||||
MIXNODES_BLACKLISTED_COUNT, MIXNODES_BONDED_ACTIVE, MIXNODES_BONDED_COUNT,
|
||||
MIXNODES_BONDED_INACTIVE, MIXNODES_BONDED_RESERVE, MIXNODES_HISTORICAL_COUNT,
|
||||
};
|
||||
use crate::db::{queries, DbPool};
|
||||
use anyhow::anyhow;
|
||||
use cosmwasm_std::Decimal;
|
||||
use nym_explorer_client::{ExplorerClient, PrettyDetailedGatewayBond};
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use nym_validator_client::client::NymApiClientExt;
|
||||
use nym_validator_client::models::{DescribedGateway, DescribedMixNode, MixNodeBondAnnotated};
|
||||
use nym_validator_client::nym_nodes::SkimmedNode;
|
||||
use nym_validator_client::nyxd::contract_traits::PagedMixnetQueryClient;
|
||||
use nym_validator_client::nyxd::{AccountId, NyxdClient};
|
||||
use nym_validator_client::NymApiClient;
|
||||
use reqwest::Url;
|
||||
use std::collections::HashSet;
|
||||
use std::str::FromStr;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
|
||||
const REFRESH_DELAY: Duration = Duration::from_secs(60 * 5);
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(15);
|
||||
|
||||
static DELEGATION_PROGRAM_WALLET: &str = "n1rnxpdpx3kldygsklfft0gech7fhfcux4zst5lw";
|
||||
|
||||
// TODO dz: query many NYM APIs:
|
||||
// multiple instances running directory cache, ask sachin
|
||||
pub(crate) async fn spawn_in_background(db_pool: DbPool, config: Cli) -> JoinHandle<()> {
|
||||
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
|
||||
|
||||
loop {
|
||||
tracing::info!("Refreshing node info...");
|
||||
|
||||
if let Err(e) = run(&db_pool, &network_defaults, &config).await {
|
||||
tracing::error!(
|
||||
"Monitor run failed: {e}, retrying in {}s...",
|
||||
FAILURE_RETRY_DELAY.as_secs()
|
||||
);
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Info successfully collected, sleeping for {}s...",
|
||||
REFRESH_DELAY.as_secs()
|
||||
);
|
||||
tokio::time::sleep(REFRESH_DELAY).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(
|
||||
pool: &DbPool,
|
||||
network_details: &NymNetworkDetails,
|
||||
config: &Cli,
|
||||
) -> anyhow::Result<()> {
|
||||
let default_api_url = network_details
|
||||
.endpoints
|
||||
.first()
|
||||
.expect("rust sdk mainnet default incorrectly configured")
|
||||
.api_url()
|
||||
.clone()
|
||||
.expect("rust sdk mainnet default missing api_url");
|
||||
let default_explorer_url = network_details.explorer_api.clone().map(|url| {
|
||||
url.parse()
|
||||
.expect("rust sdk mainnet default explorer url not parseable")
|
||||
});
|
||||
|
||||
let default_explorer_url =
|
||||
default_explorer_url.expect("explorer url missing in network config");
|
||||
let explorer_client =
|
||||
ExplorerClient::new_with_timeout(default_explorer_url, config.explorer_client_timeout)?;
|
||||
let explorer_gateways = explorer_client
|
||||
.get_gateways()
|
||||
.await
|
||||
.log_error("get_gateways")?;
|
||||
tracing::debug!("6");
|
||||
|
||||
let api_client =
|
||||
// TODO dz introduce timeout ?
|
||||
NymApiClient::new(default_api_url);
|
||||
let gateways = api_client
|
||||
.get_cached_described_gateways()
|
||||
.await
|
||||
.log_error("get_described_gateways")?;
|
||||
tracing::debug!("Fetched {} gateways", gateways.len());
|
||||
let skimmed_gateways = api_client
|
||||
.get_basic_gateways(None)
|
||||
.await
|
||||
.log_error("get_basic_gateways")?;
|
||||
|
||||
let mixnodes = api_client
|
||||
.get_cached_mixnodes()
|
||||
.await
|
||||
.log_error("get_cached_mixnodes")?;
|
||||
tracing::debug!("Fetched {} mixnodes", mixnodes.len());
|
||||
|
||||
// TODO dz can we calculate blacklisted GWs from their performance?
|
||||
// where do we get their performance?
|
||||
let gateways_blacklisted = api_client
|
||||
.nym_api
|
||||
.get_gateways_blacklisted()
|
||||
.await
|
||||
.map(|vec| vec.into_iter().collect::<HashSet<_>>())
|
||||
.log_error("get_gateways_blacklisted")?;
|
||||
|
||||
// Cached mixnodes don't include blacklisted nodes
|
||||
// We need that to calculate the total locked tokens later
|
||||
let mixnodes = api_client
|
||||
.nym_api
|
||||
.get_mixnodes_detailed_unfiltered()
|
||||
.await
|
||||
.log_error("get_mixnodes_detailed_unfiltered")?;
|
||||
let mixnodes_described = api_client
|
||||
.nym_api
|
||||
.get_mixnodes_described()
|
||||
.await
|
||||
.log_error("get_mixnodes_described")?;
|
||||
let mixnodes_active = api_client
|
||||
.nym_api
|
||||
.get_active_mixnodes()
|
||||
.await
|
||||
.log_error("get_active_mixnodes")?;
|
||||
let delegation_program_members =
|
||||
get_delegation_program_details(network_details, &config.nyxd_addr).await?;
|
||||
|
||||
// keep stats for later
|
||||
let count_bonded_mixnodes = mixnodes.len();
|
||||
let count_bonded_gateways = gateways.len();
|
||||
let count_explorer_gateways = explorer_gateways.len();
|
||||
let count_bonded_mixnodes_active = mixnodes_active.len();
|
||||
|
||||
let gateway_records = prepare_gateway_data(
|
||||
&gateways,
|
||||
&gateways_blacklisted,
|
||||
explorer_gateways,
|
||||
skimmed_gateways,
|
||||
)?;
|
||||
queries::insert_gateways(pool, gateway_records)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Gateway info written to DB!");
|
||||
})?;
|
||||
|
||||
// instead of counting blacklisted GWs returned from API cache, count from the active set
|
||||
let count_gateways_blacklisted = gateways
|
||||
.iter()
|
||||
.filter(|gw| {
|
||||
let gw_identity = gw.bond.identity();
|
||||
gateways_blacklisted.contains(gw_identity)
|
||||
})
|
||||
.count();
|
||||
|
||||
if count_gateways_blacklisted > 0 {
|
||||
queries::write_blacklisted_gateways_to_db(pool, gateways_blacklisted.iter())
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!(
|
||||
"Gateway blacklist info written to DB! {} blacklisted by Nym API",
|
||||
count_gateways_blacklisted
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
let mixnode_records =
|
||||
prepare_mixnode_data(&mixnodes, mixnodes_described, delegation_program_members)?;
|
||||
queries::insert_mixnodes(pool, mixnode_records)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Mixnode info written to DB!");
|
||||
})?;
|
||||
|
||||
let count_mixnodes_blacklisted = mixnodes.iter().filter(|elem| elem.blacklisted).count();
|
||||
|
||||
let recently_unbonded_gateways = queries::ensure_gateways_still_bonded(pool, &gateways).await?;
|
||||
let recently_unbonded_mixnodes = queries::ensure_mixnodes_still_bonded(pool, &mixnodes).await?;
|
||||
|
||||
let count_bonded_mixnodes_reserve = 0; // TODO: NymAPI doesn't report the reserve set size
|
||||
let count_bonded_mixnodes_inactive = count_bonded_mixnodes - count_bonded_mixnodes_active;
|
||||
|
||||
let (all_historical_gateways, all_historical_mixnodes) = calculate_stats(pool).await?;
|
||||
|
||||
//
|
||||
// write summary keys and values to table
|
||||
//
|
||||
|
||||
let nodes_summary = vec![
|
||||
(MIXNODES_BONDED_COUNT, &count_bonded_mixnodes),
|
||||
(MIXNODES_BONDED_ACTIVE, &count_bonded_mixnodes_active),
|
||||
(MIXNODES_BONDED_INACTIVE, &count_bonded_mixnodes_inactive),
|
||||
(MIXNODES_BONDED_RESERVE, &count_bonded_mixnodes_reserve),
|
||||
(MIXNODES_BLACKLISTED_COUNT, &count_mixnodes_blacklisted),
|
||||
(GATEWAYS_BONDED_COUNT, &count_bonded_gateways),
|
||||
(GATEWAYS_EXPLORER_COUNT, &count_explorer_gateways),
|
||||
(MIXNODES_HISTORICAL_COUNT, &all_historical_mixnodes),
|
||||
(GATEWAYS_HISTORICAL_COUNT, &all_historical_gateways),
|
||||
(GATEWAYS_BLACKLISTED_COUNT, &count_gateways_blacklisted),
|
||||
];
|
||||
|
||||
let last_updated = chrono::offset::Utc::now();
|
||||
let last_updated_utc = last_updated.timestamp().to_string();
|
||||
let network_summary = NetworkSummary {
|
||||
mixnodes: mixnode::MixnodeSummary {
|
||||
bonded: mixnode::MixnodeSummaryBonded {
|
||||
count: count_bonded_mixnodes.cast_checked()?,
|
||||
active: count_bonded_mixnodes_active.cast_checked()?,
|
||||
inactive: count_bonded_mixnodes_inactive.cast_checked()?,
|
||||
reserve: count_bonded_mixnodes_reserve.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
blacklisted: mixnode::MixnodeSummaryBlacklisted {
|
||||
count: count_mixnodes_blacklisted.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
historical: mixnode::MixnodeSummaryHistorical {
|
||||
count: all_historical_mixnodes.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
},
|
||||
gateways: gateway::GatewaySummary {
|
||||
bonded: gateway::GatewaySummaryBonded {
|
||||
count: count_bonded_gateways.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
blacklisted: gateway::GatewaySummaryBlacklisted {
|
||||
count: count_gateways_blacklisted.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
historical: gateway::GatewaySummaryHistorical {
|
||||
count: all_historical_gateways.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
explorer: gateway::GatewaySummaryExplorer {
|
||||
count: count_explorer_gateways.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
queries::insert_summaries(pool, &nodes_summary, &network_summary, last_updated).await?;
|
||||
|
||||
let mut log_lines: Vec<String> = vec![];
|
||||
for (key, value) in nodes_summary.iter() {
|
||||
log_lines.push(format!("{} = {}", key, value));
|
||||
}
|
||||
log_lines.push(format!(
|
||||
"recently_unbonded_mixnodes = {}",
|
||||
recently_unbonded_mixnodes
|
||||
));
|
||||
log_lines.push(format!(
|
||||
"recently_unbonded_gateways = {}",
|
||||
recently_unbonded_gateways
|
||||
));
|
||||
|
||||
tracing::info!("Directory summary: \n{}", log_lines.join("\n"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_gateway_data(
|
||||
gateways: &[DescribedGateway],
|
||||
gateways_blacklisted: &HashSet<String>,
|
||||
explorer_gateways: Vec<PrettyDetailedGatewayBond>,
|
||||
skimmed_gateways: Vec<SkimmedNode>,
|
||||
) -> anyhow::Result<Vec<GatewayRecord>> {
|
||||
let mut gateway_records = Vec::new();
|
||||
|
||||
for gateway in gateways {
|
||||
let identity_key = gateway.bond.identity();
|
||||
let bonded = true;
|
||||
let last_updated_utc = chrono::offset::Utc::now().timestamp();
|
||||
let blacklisted = gateways_blacklisted.contains(identity_key);
|
||||
|
||||
let self_described = gateway
|
||||
.self_described
|
||||
.as_ref()
|
||||
.and_then(|v| serde_json::to_string(&v).ok());
|
||||
|
||||
let explorer_pretty_bond = explorer_gateways
|
||||
.iter()
|
||||
.find(|g| g.gateway.identity_key.eq(identity_key));
|
||||
let explorer_pretty_bond = explorer_pretty_bond.and_then(|g| serde_json::to_string(g).ok());
|
||||
|
||||
let performance = skimmed_gateways
|
||||
.iter()
|
||||
.find(|g| g.ed25519_identity_pubkey.eq(identity_key))
|
||||
.map(|g| g.performance)
|
||||
.unwrap_or_default()
|
||||
.round_to_integer();
|
||||
|
||||
gateway_records.push(GatewayRecord {
|
||||
identity_key: identity_key.to_owned(),
|
||||
bonded,
|
||||
blacklisted,
|
||||
self_described,
|
||||
explorer_pretty_bond,
|
||||
last_updated_utc,
|
||||
performance,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(gateway_records)
|
||||
}
|
||||
|
||||
fn prepare_mixnode_data(
|
||||
mixnodes: &[MixNodeBondAnnotated],
|
||||
mixnodes_described: Vec<DescribedMixNode>,
|
||||
delegation_program_members: Vec<u32>,
|
||||
) -> anyhow::Result<Vec<MixnodeRecord>> {
|
||||
let mut mixnode_records = Vec::new();
|
||||
|
||||
for mixnode in mixnodes {
|
||||
let mix_id = mixnode.mix_id();
|
||||
let identity_key = mixnode.identity_key();
|
||||
let bonded = true;
|
||||
let total_stake = decimal_to_i64(mixnode.mixnode_details.total_stake());
|
||||
let blacklisted = mixnode.blacklisted;
|
||||
let node_info = mixnode.mix_node();
|
||||
let host = node_info.host.clone();
|
||||
let http_port = node_info.http_api_port;
|
||||
// Contains all the information including what's above
|
||||
let full_details = serde_json::to_string(&mixnode)?;
|
||||
|
||||
let mixnode_described = mixnodes_described.iter().find(|m| m.bond.mix_id == mix_id);
|
||||
let self_described = mixnode_described.and_then(|v| serde_json::to_string(v).ok());
|
||||
let is_dp_delegatee = delegation_program_members.contains(&mix_id);
|
||||
|
||||
let last_updated_utc = chrono::offset::Utc::now().timestamp();
|
||||
|
||||
mixnode_records.push(MixnodeRecord {
|
||||
mix_id,
|
||||
identity_key: identity_key.to_owned(),
|
||||
bonded,
|
||||
total_stake,
|
||||
host,
|
||||
http_port,
|
||||
blacklisted,
|
||||
full_details,
|
||||
self_described,
|
||||
last_updated_utc,
|
||||
is_dp_delegatee,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(mixnode_records)
|
||||
}
|
||||
|
||||
// TODO dz is there a common monorepo place this can be put?
|
||||
pub trait NumericalCheckedCast<T>
|
||||
where
|
||||
T: TryFrom<Self>,
|
||||
<T as TryFrom<Self>>::Error: std::error::Error,
|
||||
Self: std::fmt::Display + Copy,
|
||||
{
|
||||
fn cast_checked(self) -> anyhow::Result<T> {
|
||||
T::try_from(self).map_err(|e| {
|
||||
anyhow::anyhow!(
|
||||
"Couldn't cast {} to {}: {}",
|
||||
self,
|
||||
std::any::type_name::<T>(),
|
||||
e
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> NumericalCheckedCast<U> for T
|
||||
where
|
||||
U: TryFrom<T>,
|
||||
<U as TryFrom<T>>::Error: std::error::Error,
|
||||
T: std::fmt::Display + Copy,
|
||||
{
|
||||
}
|
||||
|
||||
async fn calculate_stats(pool: &DbPool) -> anyhow::Result<(usize, usize)> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
let all_historical_gateways = sqlx::query_scalar!(r#"SELECT count(id) FROM gateways"#)
|
||||
.fetch_one(&mut *conn)
|
||||
.await?
|
||||
.cast_checked()?;
|
||||
|
||||
let all_historical_mixnodes = sqlx::query_scalar!(r#"SELECT count(id) FROM mixnodes"#)
|
||||
.fetch_one(&mut *conn)
|
||||
.await?
|
||||
.cast_checked()?;
|
||||
|
||||
Ok((all_historical_gateways, all_historical_mixnodes))
|
||||
}
|
||||
|
||||
async fn get_delegation_program_details(
|
||||
network_details: &NymNetworkDetails,
|
||||
nyxd_addr: &Url,
|
||||
) -> anyhow::Result<Vec<u32>> {
|
||||
let config = nym_validator_client::nyxd::Config::try_from_nym_network_details(network_details)?;
|
||||
|
||||
let client = NyxdClient::connect(config, nyxd_addr.as_str())
|
||||
.map_err(|err| anyhow::anyhow!("Couldn't connect: {}", err))?;
|
||||
|
||||
let account_id = AccountId::from_str(DELEGATION_PROGRAM_WALLET)
|
||||
.map_err(|e| anyhow!("Invalid bech32 address: {}", e))?;
|
||||
|
||||
let delegations = client.get_all_delegator_delegations(&account_id).await?;
|
||||
|
||||
let mix_ids: Vec<u32> = delegations
|
||||
.iter()
|
||||
.map(|delegation| delegation.mix_id)
|
||||
.collect();
|
||||
|
||||
Ok(mix_ids)
|
||||
}
|
||||
|
||||
fn decimal_to_i64(decimal: Decimal) -> i64 {
|
||||
// Convert the underlying Uint128 to a u128
|
||||
let atomics = decimal.atomics().u128();
|
||||
let precision = 1_000_000_000_000_000_000u128;
|
||||
|
||||
// Get the fractional part
|
||||
let fractional = atomics % precision;
|
||||
|
||||
// Get the integer part
|
||||
let integer = atomics / precision;
|
||||
|
||||
// Combine them into a float
|
||||
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
|
||||
|
||||
// Limit to 6 decimal places
|
||||
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
|
||||
|
||||
rounded_value as i64
|
||||
}
|
||||
|
||||
trait LogError<T, E> {
|
||||
fn log_error(self, msg: &str) -> Result<T, E>;
|
||||
}
|
||||
|
||||
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
|
||||
where
|
||||
E: std::error::Error,
|
||||
{
|
||||
fn log_error(self, msg: &str) -> Result<T, E> {
|
||||
if let Err(e) = &self {
|
||||
tracing::error!("[{msg}]:\t{e}");
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user