Compare commits

...

16 Commits

Author SHA1 Message Date
Sachin Kamath 1bdcf9c3cf fix review comments 2024-11-06 14:16:13 +05:30
Sachin Kamath 4ebb9cd239 clippy 2024-11-05 16:01:18 +05:30
Sachin Kamath 620d68ea2f nyxd-scraper: add config to make pre-commit storage optional 2024-11-05 15:49:30 +05:30
Dinko Zdravac b747308f74 Add subcommand to image (#5056) 2024-10-29 10:52:33 +01:00
Dinko Zdravac afdd721cc3 Ns agent workflow (#5055)
* feat: add dockerfile

* add github workflow for node status agent

---------

Co-authored-by: Fran Arbanas <arbanasfran@gmail.com>
2024-10-29 10:39:58 +01:00
Dinko Zdravac 9f5c4c5968 Merge pull request #5050 from nymtech/dz-node-status-api
Node Status API
2024-10-29 00:43:33 +01:00
dynco-nym 9583a5c6c8 Fix build script 2024-10-29 00:24:18 +01:00
Tommy Verrall da60fc0ade Merge pull request #5052 from nymtech/feat/add-node-status-agent-workflow
feat: add simple node-status-agent
2024-10-28 19:30:52 +00:00
Fran Arbanas 96b54c455e feat: add simple node-status-agent 2024-10-28 19:16:46 +01:00
Dinko Zdravac cc983963d4 Fully functional network scores (#5048)
* Compile & copy wg probe

* Node status agent WIP

* Enable debug logging

* Agent submits results
- add clap to agent
- agent runs network probe
- /submit endpoint on NS API

* Build clients with timeouts

* Update logging and dev scripts

* Replace /blaclisted endpoint

* Testruns fully functional
- task that queues testruns periodically
- testruns read/write in DB

* Probe scores fully working
- testruns are assigned on API
- submit updates testruns correctly on NS API side
- agent registers with API
- agent submits results correctly

* Clippy fixes

* PR feedback

* Clippy again

* PR feedback

* Run clippy earlier in CI

* Make refresh delay configurable in server & agent
2024-10-28 17:31:43 +01:00
Fran Arbanas 40d9321aec Node status API dockerfile and env vars (#4986)
* feat: add dockerfile and env variables

* Added workflow for pushing node status api on harbor

* Misc changes to pathing and using yq instead of jq

* fix: change the way we read env vars for nyxd, nym api and explorer

* fix: docker build workflow

* Remove config in favor of clap args

* Added naming and tags

* change from value to result

---------

Co-authored-by: Lawrence Stalder <lawrence@nymtech.net>
Co-authored-by: dynco-nym <173912580+dynco-nym@users.noreply.github.com>
2024-10-28 17:07:38 +01:00
dynco-nym e5a29cc76e Work with directory pre-v2.1
Rebase + point to earlier network client code

Adjust to new Nym API types

Refer to earlier client code

Revert "Rebase + point to earlier network client code"

This reverts commit dd75e7dc0695c25b0883e2f5dd15b7d70165e9e8.

Point to earlier commit
2024-10-28 17:04:22 +01:00
Dinko Zdravac 56c55f6b95 Working HTTP server (#4941)
* Server file structure

* Create HTTP server
- graceful shutdown
- routes
- logging, CORS

* gateways WIP

* gateways API + swagger docs complete

* Mixnodes API + swagger docs complete

* Services API + swagger docs complete

* Commit summary insert

* Make troubleshooting DB easier

* Summary API + swagger docs

* Client log changes

* QOL improvements

- remove implicit panics via `as`
- safer DTO conversions
- add logging
- new config
2024-10-28 16:59:12 +01:00
Dinko Zdravac 2f051fd943 Node Status API background task (#4854)
* Setup new package

* Setup DB

* Fetch & store mixnodes/GWs
- refactor db package structure
- finally solve DATABASE_URL: absolute path works best

* Additional query functionality
- missing only daily summary, which requires type refactoring

* Replace type alias tuples with structs

* Insert summary

* Add github job to build package

* Build script for sqlx

* Remove data dir
- useless now that sqlx DB sits in OUT_DIR

* PR feedback
2024-10-28 16:54:26 +01:00
Bogdan-Ștefan Neacşu c03cf86000 Authenticator CLI client mode (#5044) 2024-10-28 16:42:05 +02:00
Simon Wicky ab11508235 [Product Data] Introduce data persistence on gateways (#5022)
* add stats storage to gateways

* config fix

* add stats storage model and logic

* adapt stats collection to new storage

* stats cleanup on start

* change to linux only code

* tweaks

* modified stats cleanup + change session started

* change wrong table name

* store crashed session as 0 duration

* adapt for sqlx 0.7

* remove unused dependencies

* revert changes from gateway config, as it is broken anyway

* copyright and misc stuff

---------

Co-authored-by: Simon Wicky <simon@linode2-2.net>
2024-10-28 09:25:37 +01:00
96 changed files with 7525 additions and 590 deletions
+6 -6
View File
@@ -57,6 +57,12 @@ jobs:
command: fmt
args: --all -- --check
- name: Clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace --all-targets -- -D warnings
- name: Build all binaries
uses: actions-rs/cargo@v1
with:
@@ -82,9 +88,3 @@ jobs:
with:
command: test
args: --workspace -- --ignored
- name: Clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace --all-targets -- -D warnings
@@ -0,0 +1,56 @@
name: Build and upload Node Status agent container to harbor.nymte.ch
on:
workflow_dispatch:
env:
WORKING_DIRECTORY: "nym-node-status-agent"
CONTAINER_NAME: "node-status-agent"
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
- name: Checkout repo
uses: actions/checkout@v4
- name: Configure git identity
run: |
git config --global user.email "lawrence@nymtech.net"
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.44.3
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Check if tag exists
run: |
if git rev-parse ${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
fi
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
+49 -5
View File
@@ -1,11 +1,55 @@
name: Build and upload Node Status API container to harbor.nymte.ch
on:
workflow_dispatch:
env:
WORKING_DIRECTORY: "nym-node-status-api"
CONTAINER_NAME: "node-status-api"
jobs:
my-job:
runs-on: arc-ubuntu-22.04
build-container:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: my-step
run: echo "Hello World!"
- name: Login to Harbor
uses: docker/login-action@v3
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
- name: Checkout repo
uses: actions/checkout@v4
- name: Configure git identity
run: |
git config --global user.email "lawrence@nymtech.net"
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.44.3
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Check if tag exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
echo "Tag ${{ steps.get_version.outputs.result }} already exists"
fi
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
Generated
+950 -320
View File
File diff suppressed because it is too large Load Diff
+13
View File
@@ -54,12 +54,14 @@ members = [
"common/exit-policy",
"common/gateway-requests",
"common/gateway-storage",
"common/gateway-stats-storage",
"common/http-api-client",
"common/http-api-common",
"common/inclusion-probability",
"common/ip-packet-requests",
"common/ledger",
"common/mixnode-common",
"common/models",
"common/network-defaults",
"common/node-tester-utils",
"common/nonexhaustive-delayqueue",
@@ -119,6 +121,8 @@ members = [
"nym-node",
"nym-node/nym-node-http-api",
"nym-node/nym-node-requests",
"nym-node-status-api",
"nym-node-status-agent",
"nym-outfox",
"nym-validator-rewarder",
"tools/echo-server",
@@ -146,13 +150,16 @@ members = [
default-members = [
"clients/native",
"clients/socks5",
"common/models",
"explorer-api",
"gateway",
"mixnode",
"nym-api",
"nym-data-observatory",
"nym-node",
"nym-node-status-api",
"nym-validator-rewarder",
"nym-node-status-api",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -233,10 +240,12 @@ dotenvy = "0.15.6"
ecdsa = "0.16"
ed25519-dalek = "2.1"
etherparse = "0.13.0"
envy = "0.4"
eyre = "0.6.9"
fastrand = "2.1.1"
flate2 = "1.0.34"
futures = "0.3.28"
futures-util = "0.3"
generic-array = "0.14.7"
getrandom = "0.2.10"
getset = "0.1.3"
@@ -266,6 +275,7 @@ ledger-transport-hid = "0.10.0"
log = "0.4"
maxminddb = "0.23.0"
mime = "0.3.17"
moka = { version = "0.12", features = ["future"] }
nix = "0.27.1"
notify = "5.1.0"
okapi = "0.7.0"
@@ -299,6 +309,7 @@ serde = "1.0.211"
serde_bytes = "0.11.15"
serde_derive = "1.0"
serde_json = "1.0.132"
serde_json_path = "0.6.7"
serde_repr = "0.1"
serde_with = "3.9.0"
serde_yaml = "0.9.25"
@@ -307,6 +318,7 @@ si-scale = "0.2.3"
sphinx-packet = "0.1.1"
sqlx = "0.7.4"
strum = "0.26"
strum_macros = "0.26"
subtle-encoding = "0.5"
syn = "1"
sysinfo = "0.30.13"
@@ -328,6 +340,7 @@ tracing = "0.1.37"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = "0.3.16"
tracing-tree = "0.2.2"
tracing-log = "0.2"
ts-rs = "10.0.0"
tungstenite = { version = "0.20.1", default-features = false }
url = "2.5"
+1
View File
@@ -45,3 +45,4 @@ tracing = [
"opentelemetry",
]
clap = [ "dep:clap", "dep:clap_complete", "dep:clap_complete_fig" ]
models = []
@@ -25,7 +25,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
nym-http-api-client = { path = "../../../common/http-api-client" }
thiserror = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["sync", "time"] }
time = { workspace = true, features = ["formatting"] }
@@ -265,6 +265,13 @@ impl NymApiClient {
NymApiClient { nym_api }
}
#[cfg(not(target_arch = "wasm32"))]
pub fn new_with_timeout(api_url: Url, timeout: std::time::Duration) -> Self {
let nym_api = nym_api::Client::new(api_url, Some(timeout));
NymApiClient { nym_api }
}
pub fn new_with_user_agent(api_url: Url, user_agent: UserAgent) -> Self {
let nym_api = nym_api::Client::builder::<_, ValidatorClientError>(api_url)
.expect("invalid api url")
@@ -121,36 +121,36 @@ async fn test_nyxd_connection(
{
Ok(Err(NyxdError::TendermintErrorRpc(e))) => {
// If we get a tendermint-rpc error, we classify the node as not contactable
log::warn!("Checking: nyxd url: {url}: {}: {}", "failed".red(), e);
tracing::warn!("Checking: nyxd url: {url}: {}: {}", "failed".red(), e);
false
}
Ok(Err(NyxdError::AbciError { code, log, .. })) => {
// We accept the mixnet contract not found as ok from a connection standpoint. This happens
// for example on a pre-launch network.
log::debug!(
tracing::debug!(
"Checking: nyxd url: {url}: {}, but with abci error: {code}: {log}",
"success".green()
);
code == 18
}
Ok(Err(error @ NyxdError::NoContractAddressAvailable(_))) => {
log::warn!("Checking: nyxd url: {url}: {}: {error}", "failed".red());
tracing::warn!("Checking: nyxd url: {url}: {}: {error}", "failed".red());
false
}
Ok(Err(e)) => {
// For any other error, we're optimistic and just try anyway.
log::warn!(
tracing::warn!(
"Checking: nyxd_url: {url}: {}, but with error: {e}",
"success".green()
);
true
}
Ok(Ok(_)) => {
log::debug!("Checking: nyxd_url: {url}: {}", "success".green());
tracing::debug!("Checking: nyxd_url: {url}: {}", "success".green());
true
}
Err(e) => {
log::warn!("Checking: nyxd_url: {url}: {}: {e}", "failed".red());
tracing::warn!("Checking: nyxd_url: {url}: {}: {e}", "failed".red());
false
}
};
@@ -169,15 +169,15 @@ async fn test_nym_api_connection(
.await
{
Ok(Ok(_)) => {
log::debug!("Checking: api_url: {url}: {}", "success".green());
tracing::debug!("Checking: api_url: {url}: {}", "success".green());
true
}
Ok(Err(e)) => {
log::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
tracing::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
false
}
Err(e) => {
log::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
tracing::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
false
}
};
@@ -41,6 +41,7 @@ use nym_mixnet_contract_common::mixnode::MixNodeDetails;
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, NodeId};
use time::format_description::BorrowedFormatItem;
use time::Date;
use tracing::instrument;
pub mod error;
pub mod routes;
@@ -52,11 +53,13 @@ pub fn rfc_3339_date() -> Vec<BorrowedFormatItem<'static>> {
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait NymApiClientExt: ApiClient {
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(&[routes::API_VERSION, routes::MIXNODES], NO_PARAMS)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
&[
@@ -70,6 +73,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateways_detailed(&self) -> Result<Vec<GatewayBondAnnotated>, NymAPIError> {
self.get_json(
&[
@@ -83,6 +87,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_detailed_unfiltered(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
@@ -98,11 +103,13 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateways(&self) -> Result<Vec<GatewayBond>, NymAPIError> {
self.get_json(&[routes::API_VERSION, routes::GATEWAYS], NO_PARAMS)
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateways_described(&self) -> Result<Vec<LegacyDescribedGateway>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::GATEWAYS, routes::DESCRIBED],
@@ -111,6 +118,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_described(&self) -> Result<Vec<LegacyDescribedMixNode>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::MIXNODES, routes::DESCRIBED],
@@ -119,6 +127,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[tracing::instrument(level = "debug", skip_all)]
async fn get_basic_mixnodes(
&self,
semver_compatibility: Option<String>,
@@ -142,6 +151,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_basic_gateways(
&self,
semver_compatibility: Option<String>,
@@ -167,6 +177,7 @@ pub trait NymApiClientExt: ApiClient {
/// retrieve basic information for nodes are capable of operating as an entry gateway
/// this includes legacy gateways and nym-nodes
#[instrument(level = "debug", skip(self))]
async fn get_all_basic_entry_assigned_nodes(
&self,
semver_compatibility: Option<String>,
@@ -208,6 +219,7 @@ pub trait NymApiClientExt: ApiClient {
/// retrieve basic information for nodes that got assigned 'mixing' node in this epoch
/// this includes legacy mixnodes and nym-nodes
#[instrument(level = "debug", skip(self))]
async fn get_basic_active_mixing_assigned_nodes(
&self,
semver_compatibility: Option<String>,
@@ -247,6 +259,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::MIXNODES, routes::ACTIVE],
@@ -255,6 +268,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_active_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
self.get_json(
&[
@@ -269,6 +283,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::MIXNODES, routes::REWARDED],
@@ -277,6 +292,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_report(
&self,
mix_id: NodeId,
@@ -294,6 +310,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateway_report(
&self,
identity: IdentityKeyRef<'_>,
@@ -311,6 +328,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_history(
&self,
mix_id: NodeId,
@@ -328,6 +346,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateway_history(
&self,
identity: IdentityKeyRef<'_>,
@@ -345,6 +364,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_rewarded_mixnodes_detailed(
&self,
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
@@ -361,6 +381,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateway_core_status_count(
&self,
identity: IdentityKeyRef<'_>,
@@ -392,6 +413,7 @@ pub trait NymApiClientExt: ApiClient {
}
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_core_status_count(
&self,
mix_id: NodeId,
@@ -424,6 +446,7 @@ pub trait NymApiClientExt: ApiClient {
}
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_status(
&self,
mix_id: NodeId,
@@ -441,6 +464,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_reward_estimation(
&self,
mix_id: NodeId,
@@ -458,6 +482,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn compute_mixnode_reward_estimation(
&self,
mix_id: NodeId,
@@ -477,6 +502,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_stake_saturation(
&self,
mix_id: NodeId,
@@ -494,6 +520,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnode_inclusion_probability(
&self,
mix_id: NodeId,
@@ -511,6 +538,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_current_node_performance(
&self,
node_id: NodeId,
@@ -541,6 +569,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_mixnodes_blacklisted(&self) -> Result<Vec<NodeId>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::MIXNODES, routes::BLACKLISTED],
@@ -549,6 +578,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn get_gateways_blacklisted(&self) -> Result<Vec<IdentityKey>, NymAPIError> {
self.get_json(
&[routes::API_VERSION, routes::GATEWAYS, routes::BLACKLISTED],
@@ -557,6 +587,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self, request_body))]
async fn blind_sign(
&self,
request_body: &BlindSignRequestBody,
@@ -573,6 +604,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self, request_body))]
async fn verify_ecash_ticket(
&self,
request_body: &VerifyEcashTicketBody,
@@ -589,6 +621,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self, request_body))]
async fn batch_redeem_ecash_tickets(
&self,
request_body: &BatchRedeemTicketsBody,
@@ -605,6 +638,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn double_spending_filter_v1(&self) -> Result<SpentCredentialsResponse, NymAPIError> {
self.get_json(
&[
@@ -617,6 +651,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn partial_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
@@ -640,6 +675,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn partial_coin_indices_signatures(
&self,
epoch_id: Option<EpochId>,
@@ -660,6 +696,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn global_expiration_date_signatures(
&self,
expiration_date: Option<Date>,
@@ -683,6 +720,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn global_coin_indices_signatures(
&self,
epoch_id: Option<EpochId>,
@@ -703,6 +741,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn master_verification_key(
&self,
epoch_id: Option<EpochId>,
@@ -722,6 +761,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn epoch_credentials(
&self,
dkg_epoch: EpochId,
@@ -738,6 +778,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_credential(
&self,
credential_id: i64,
@@ -754,6 +795,7 @@ pub trait NymApiClientExt: ApiClient {
.await
}
#[instrument(level = "debug", skip(self))]
async fn issued_credentials(
&self,
credential_ids: Vec<i64>,
@@ -8,9 +8,9 @@ use crate::nyxd::CosmWasmClient;
use async_trait::async_trait;
use cosmrs::AccountId;
use cosmwasm_std::Addr;
use log::trace;
use nym_coconut_dkg_common::types::{ChunkIndex, NodeIndex, StateAdvanceResponse};
use serde::Deserialize;
use tracing::trace;
use nym_coconut_dkg_common::dealer::RegisteredDealerDetails;
pub use nym_coconut_dkg_common::{
@@ -29,7 +29,6 @@ use cosmrs::proto::cosmwasm::wasm::v1::{
};
use cosmrs::tendermint::{block, chain, Hash};
use cosmrs::{AccountId, Coin as CosmosCoin, Tx};
use log::trace;
use prost::Message;
use serde::{Deserialize, Serialize};
@@ -68,7 +67,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
Res: Message + Default,
{
if let Some(ref abci_path) = path {
trace!("performing query on abci path {abci_path}")
tracing::trace!("performing query on abci path {abci_path}")
}
let mut buf = Vec::with_capacity(req.encoded_len());
req.encode(&mut buf)?;
@@ -297,7 +296,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
let start = Instant::now();
loop {
log::debug!(
tracing::debug!(
"Polling for result of including {} in a block...",
broadcasted.hash
);
@@ -522,7 +521,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
.make_abci_query::<_, QuerySmartContractStateResponse>(path, req)
.await?;
trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
tracing::trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
Ok(serde_json::from_slice(&res.data)?)
}
@@ -25,12 +25,12 @@ use cosmrs::proto::cosmos::tx::signing::v1beta1::SignMode;
use cosmrs::staking::{MsgDelegate, MsgUndelegate};
use cosmrs::tx::{self, Msg};
use cosmrs::{cosmwasm, AccountId, Any, Tx};
use log::debug;
use serde::Serialize;
use sha2::Digest;
use sha2::Sha256;
use std::time::SystemTime;
use tendermint_rpc::endpoint::broadcast;
use tracing::debug;
fn empty_fee() -> tx::Fee {
tx::Fee {
@@ -7,9 +7,9 @@ use base64::Engine;
use cosmrs::abci::TxMsgData;
use cosmrs::cosmwasm::MsgExecuteContractResponse;
use cosmrs::proto::cosmos::base::query::v1beta1::{PageRequest, PageResponse};
use log::error;
use prost::bytes::Bytes;
use tendermint_rpc::endpoint::broadcast;
use tracing::error;
pub use cosmrs::abci::MsgResponse;
+34
View File
@@ -0,0 +1,34 @@
[package]
name = "nym-gateway-stats-storage"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
"time",
] }
time = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
nym-sphinx = { path = "../nymsphinx" }
nym-credentials-interface = { path = "../credentials-interface" }
[build-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
+28
View File
@@ -0,0 +1,28 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use sqlx::{Connection, SqliteConnection};
use std::env;
#[tokio::main]
async fn main() {
let out_dir = env::var("OUT_DIR").unwrap();
let database_path = format!("{}/gateway-stats-example.sqlite", out_dir);
let mut conn = SqliteConnection::connect(&format!("sqlite://{}?mode=rwc", database_path))
.await
.expect("Failed to create SQLx database connection");
sqlx::migrate!("./migrations")
.run(&mut conn)
.await
.expect("Failed to perform SQLx migrations");
#[cfg(target_family = "unix")]
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);
#[cfg(target_family = "windows")]
// for some strange reason we need to add a leading `/` to the windows path even though it's
// not a valid windows path... but hey, it works...
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
}
@@ -0,0 +1,26 @@
/*
* Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: GPL-3.0-only
*/
CREATE TABLE sessions_active
(
client_address TEXT NOT NULL PRIMARY KEY UNIQUE,
start_time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
typ TEXT NOT NULL
);
CREATE TABLE sessions_finished
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
day DATE NOT NULL,
duration_ms INTEGER NOT NULL,
typ TEXT NOT NULL
);
CREATE TABLE sessions_unique_users
(
day DATE NOT NULL,
client_address TEXT NOT NULL,
PRIMARY KEY (day, client_address)
);
+13
View File
@@ -0,0 +1,13 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use thiserror::Error;
#[derive(Error, Debug)]
pub enum StatsStorageError {
#[error("Database experienced an internal error: {0}")]
InternalDatabaseError(#[from] sqlx::Error),
#[error("Failed to perform database migration: {0}")]
MigrationError(#[from] sqlx::migrate::MigrateError),
}
+195
View File
@@ -0,0 +1,195 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use error::StatsStorageError;
use models::{ActiveSession, FinishedSession, SessionType, StoredFinishedSession};
use nym_sphinx::DestinationAddressBytes;
use sessions::SessionManager;
use sqlx::ConnectOptions;
use std::path::Path;
use time::Date;
use tracing::{debug, error};
pub mod error;
pub mod models;
mod sessions;
// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
pub struct PersistentStatsStorage {
session_manager: SessionManager,
}
impl PersistentStatsStorage {
/// Initialises `PersistentStatsStorage` using the provided path.
///
/// # Arguments
///
/// * `database_path`: path to the database.
pub async fn init<P: AsRef<Path> + Send>(database_path: P) -> Result<Self, StatsStorageError> {
debug!(
"Attempting to connect to database {:?}",
database_path.as_ref().as_os_str()
);
// TODO: we can inject here more stuff based on our gateway global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
error!("Failed to perform migration on the SQLx database: {err}");
return Err(err.into());
}
// the cloning here are cheap as connection pool is stored behind an Arc
Ok(PersistentStatsStorage {
session_manager: sessions::SessionManager::new(connection_pool),
})
}
//Sessions fn
pub async fn insert_finished_session(
&self,
date: Date,
session: FinishedSession,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.insert_finished_session(
date,
session.duration.whole_milliseconds() as i64,
session.typ.to_string().into(),
)
.await?)
}
pub async fn get_finished_sessions(
&self,
date: Date,
) -> Result<Vec<StoredFinishedSession>, StatsStorageError> {
Ok(self.session_manager.get_finished_sessions(date).await?)
}
pub async fn delete_finished_sessions(
&self,
before_date: Date,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_finished_sessions(before_date)
.await?)
}
pub async fn insert_unique_user(
&self,
date: Date,
client_address_bs58: String,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.insert_unique_user(date, client_address_bs58)
.await?)
}
pub async fn get_unique_users_count(&self, date: Date) -> Result<i32, StatsStorageError> {
Ok(self.session_manager.get_unique_users_count(date).await?)
}
pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_unique_users(before_date)
.await?)
}
pub async fn insert_active_session(
&self,
client_address: DestinationAddressBytes,
session: ActiveSession,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.insert_active_session(
client_address.as_base58_string(),
session.start,
session.typ.to_string().into(),
)
.await?)
}
pub async fn update_active_session_type(
&self,
client_address: DestinationAddressBytes,
session_type: SessionType,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.update_active_session_type(
client_address.as_base58_string(),
session_type.to_string().into(),
)
.await?)
}
pub async fn get_active_session(
&self,
client_address: DestinationAddressBytes,
) -> Result<Option<ActiveSession>, StatsStorageError> {
Ok(self
.session_manager
.get_active_session(client_address.as_base58_string())
.await?
.map(Into::into))
}
pub async fn get_all_active_sessions(&self) -> Result<Vec<ActiveSession>, StatsStorageError> {
Ok(self
.session_manager
.get_all_active_sessions()
.await?
.into_iter()
.map(Into::into)
.collect())
}
pub async fn get_started_sessions_count(
&self,
start_date: Date,
) -> Result<i32, StatsStorageError> {
Ok(self
.session_manager
.get_started_sessions_count(start_date)
.await?)
}
pub async fn get_active_users(&self) -> Result<Vec<String>, StatsStorageError> {
Ok(self.session_manager.get_active_users().await?)
}
pub async fn delete_active_session(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), StatsStorageError> {
Ok(self
.session_manager
.delete_active_session(client_address.as_base58_string())
.await?)
}
pub async fn cleanup_active_sessions(&self) -> Result<(), StatsStorageError> {
Ok(self.session_manager.cleanup_active_sessions().await?)
}
}
+109
View File
@@ -0,0 +1,109 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_credentials_interface::TicketType;
use sqlx::prelude::FromRow;
use time::{Duration, OffsetDateTime};
#[derive(FromRow)]
pub struct StoredFinishedSession {
duration_ms: i64,
typ: String,
}
impl StoredFinishedSession {
pub fn serialize(&self) -> (u64, String) {
(
self.duration_ms as u64, //we are sure that it fits in a u64, see `fn end_at`
self.typ.clone(),
)
}
}
pub struct FinishedSession {
pub duration: Duration,
pub typ: SessionType,
}
#[derive(PartialEq)]
pub enum SessionType {
Vpn,
Mixnet,
Unknown,
}
impl SessionType {
pub fn to_string(&self) -> &str {
match self {
Self::Vpn => "vpn",
Self::Mixnet => "mixnet",
Self::Unknown => "unknown",
}
}
pub fn from_string(s: &str) -> Self {
match s {
"vpn" => Self::Vpn,
"mixnet" => Self::Mixnet,
_ => Self::Unknown,
}
}
}
impl From<TicketType> for SessionType {
fn from(value: TicketType) -> Self {
match value {
TicketType::V1MixnetEntry => Self::Mixnet,
TicketType::V1MixnetExit => Self::Mixnet,
TicketType::V1WireguardEntry => Self::Vpn,
TicketType::V1WireguardExit => Self::Vpn,
}
}
}
#[derive(FromRow)]
pub(crate) struct StoredActiveSession {
start_time: OffsetDateTime,
typ: String,
}
pub struct ActiveSession {
pub start: OffsetDateTime,
pub typ: SessionType,
}
impl ActiveSession {
pub fn new(start_time: OffsetDateTime) -> Self {
ActiveSession {
start: start_time,
typ: SessionType::Unknown,
}
}
pub fn set_type(&mut self, ticket_type: TicketType) {
self.typ = ticket_type.into();
}
pub fn end_at(self, stop_time: OffsetDateTime) -> Option<FinishedSession> {
let session_duration = stop_time - self.start;
//ensure duration is positive to fit in a u64
//u64::max milliseconds is 500k millenia so no overflow issue
if session_duration > Duration::ZERO {
Some(FinishedSession {
duration: session_duration,
typ: self.typ,
})
} else {
None
}
}
}
impl From<StoredActiveSession> for ActiveSession {
fn from(value: StoredActiveSession) -> Self {
ActiveSession {
start: value.start_time,
typ: SessionType::from_string(&value.typ),
}
}
}
@@ -0,0 +1,177 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use time::{Date, OffsetDateTime};
use crate::models::{StoredActiveSession, StoredFinishedSession};
pub(crate) type Result<T> = std::result::Result<T, sqlx::Error>;
#[derive(Clone)]
pub(crate) struct SessionManager {
connection_pool: sqlx::SqlitePool,
}
impl SessionManager {
/// Creates new instance of the `SessionsManager` with the provided sqlite connection pool.
///
/// # Arguments
///
/// * `connection_pool`: database connection pool to use.
pub(crate) fn new(connection_pool: sqlx::SqlitePool) -> Self {
SessionManager { connection_pool }
}
pub(crate) async fn insert_finished_session(
&self,
date: Date,
duration_ms: i64,
typ: String,
) -> Result<()> {
sqlx::query!(
"INSERT INTO sessions_finished (day, duration_ms, typ) VALUES (?, ?, ?)",
date,
duration_ms,
typ
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn get_finished_sessions(
&self,
date: Date,
) -> Result<Vec<StoredFinishedSession>> {
sqlx::query_as("SELECT duration_ms, typ FROM sessions_finished WHERE day = ?")
.bind(date)
.fetch_all(&self.connection_pool)
.await
}
pub(crate) async fn delete_finished_sessions(&self, before_date: Date) -> Result<()> {
sqlx::query!("DELETE FROM sessions_finished WHERE day <= ? ", before_date)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn insert_unique_user(
&self,
date: Date,
client_address_b58: String,
) -> Result<()> {
sqlx::query!(
"INSERT OR IGNORE INTO sessions_unique_users (day, client_address) VALUES (?, ?)",
date,
client_address_b58,
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn get_unique_users_count(&self, date: Date) -> Result<i32> {
Ok(sqlx::query!(
"SELECT COUNT(*) as count FROM sessions_unique_users WHERE day = ?",
date
)
.fetch_one(&self.connection_pool)
.await?
.count)
}
pub(crate) async fn delete_unique_users(&self, before_date: Date) -> Result<()> {
sqlx::query!(
"DELETE FROM sessions_unique_users WHERE day <= ? ",
before_date
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn insert_active_session(
&self,
client_address_b58: String,
start_time: OffsetDateTime,
typ: String,
) -> Result<()> {
sqlx::query!(
"INSERT INTO sessions_active (client_address, start_time, typ) VALUES (?, ?, ?)",
client_address_b58,
start_time,
typ
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn update_active_session_type(
&self,
client_address_b58: String,
typ: String,
) -> Result<()> {
sqlx::query!(
"UPDATE sessions_active SET typ = ? WHERE client_address = ?",
typ,
client_address_b58,
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn get_active_session(
&self,
client_address_b58: String,
) -> Result<Option<StoredActiveSession>> {
sqlx::query_as("SELECT start_time, typ FROM sessions_active WHERE client_address = ?")
.bind(client_address_b58)
.fetch_optional(&self.connection_pool)
.await
}
pub(crate) async fn get_all_active_sessions(&self) -> Result<Vec<StoredActiveSession>> {
sqlx::query_as("SELECT start_time, typ FROM sessions_active")
.fetch_all(&self.connection_pool)
.await
}
pub(crate) async fn get_started_sessions_count(&self, start_date: Date) -> Result<i32> {
Ok(sqlx::query!(
"SELECT COUNT(*) as count FROM sessions_active WHERE date(start_time) = ?",
start_date
)
.fetch_one(&self.connection_pool)
.await?
.count)
}
pub(crate) async fn get_active_users(&self) -> Result<Vec<String>> {
Ok(sqlx::query!("SELECT client_address from sessions_active")
.fetch_all(&self.connection_pool)
.await?
.into_iter()
.map(|record| record.client_address)
.collect())
}
pub(crate) async fn delete_active_session(&self, client_address_b58: String) -> Result<()> {
sqlx::query!(
"DELETE FROM sessions_active WHERE client_address = ?",
client_address_b58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn cleanup_active_sessions(&self) -> Result<()> {
sqlx::query!("DELETE FROM sessions_active")
.execute(&self.connection_pool)
.await?;
Ok(())
}
}
+18 -1
View File
@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::time::Duration;
use thiserror::Error;
use tracing::warn;
use tracing::{instrument, warn};
use url::Url;
pub use reqwest::IntoUrl;
@@ -202,6 +202,7 @@ impl Client {
self.reqwest_client.get(url)
}
#[instrument(level = "debug", skip_all, fields(path=?path))]
async fn send_get_request<K, V, E>(
&self,
path: PathSegments<'_>,
@@ -212,6 +213,7 @@ impl Client {
V: AsRef<str>,
E: Display,
{
tracing::trace!("Sending GET request");
let url = sanitize_url(&self.base_url, path, params);
#[cfg(target_arch = "wasm32")]
@@ -277,6 +279,7 @@ impl Client {
}
}
#[instrument(level = "debug", skip_all)]
pub async fn get_json<T, K, V, E>(
&self,
path: PathSegments<'_>,
@@ -512,12 +515,14 @@ pub fn sanitize_url<K: AsRef<str>, V: AsRef<str>>(
url
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn parse_response<T, E>(res: Response, allow_empty: bool) -> Result<T, HttpClientError<E>>
where
T: DeserializeOwned,
E: DeserializeOwned + Display,
{
let status = res.status();
tracing::debug!("Status: {} (success: {})", &status, status.is_success());
if !allow_empty {
if let Some(0) = res.content_length() {
@@ -526,6 +531,18 @@ where
}
if res.status().is_success() {
#[cfg(debug_assertions)]
{
let text = res.text().await.inspect_err(|err| {
tracing::error!("Couldn't even get response text: {err}");
})?;
tracing::trace!("Result:\n{:#?}", text);
serde_json::from_str(&text)
.map_err(|err| HttpClientError::GenericRequestFailure(err.to_string()))
}
#[cfg(not(debug_assertions))]
Ok(res.json().await?)
} else if res.status() == StatusCode::NOT_FOUND {
Err(HttpClientError::NotFound)
+14
View File
@@ -0,0 +1,14 @@
[package]
name = "nym-common-models"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
serde = { workspace = true, features = ["derive"] }
+1
View File
@@ -0,0 +1 @@
pub mod ns_api;
+8
View File
@@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TestrunAssignment {
/// has nothing to do with GW identity key. This is PK from `gateways` table
pub testrun_id: i64,
pub gateway_pk_id: i64,
}
+34 -9
View File
@@ -42,8 +42,32 @@ impl PendingSync {
}
}
#[derive(Debug, Clone)]
pub struct BlockProcessorConfig {
pub pruning_options: PruningOptions,
pub store_precommits: bool,
}
impl Default for BlockProcessorConfig {
fn default() -> Self {
Self {
pruning_options: PruningOptions::nothing(),
store_precommits: true,
}
}
}
impl BlockProcessorConfig {
pub fn new(pruning_options: PruningOptions, store_precommits: bool) -> Self {
Self {
pruning_options,
store_precommits,
}
}
}
pub struct BlockProcessor {
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
@@ -65,9 +89,10 @@ pub struct BlockProcessor {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
pub async fn new(
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
@@ -82,7 +107,7 @@ impl BlockProcessor {
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
Ok(BlockProcessor {
pruning_options,
config,
cancel,
synced,
last_processed_height,
@@ -101,7 +126,7 @@ impl BlockProcessor {
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.pruning_options = pruning_options;
self.config.pruning_options = pruning_options;
self
}
@@ -128,7 +153,7 @@ impl BlockProcessor {
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;
persist_block(&full_info, &mut tx).await?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
// let the modules do whatever they want
// the ones wanting the full block:
@@ -241,7 +266,7 @@ impl BlockProcessor {
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
@@ -282,12 +307,12 @@ impl BlockProcessor {
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.pruning_options.strategy.is_nothing() {
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.pruning_options.strategy_interval();
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
@@ -371,7 +396,7 @@ impl BlockProcessor {
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
self.last_processed_height = max(self.last_processed_height, last_to_keep);
+14 -3
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::BlockProcessor;
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
@@ -34,6 +34,8 @@ pub struct Config {
pub database_path: PathBuf,
pub pruning_options: PruningOptions,
pub store_precommits: bool,
}
pub struct NyxdScraperBuilder {
@@ -60,8 +62,14 @@ impl NyxdScraperBuilder {
req_rx,
processing_tx.clone(),
);
let mut block_processor = BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
scraper.config.pruning_options,
scraper.config.store_precommits,
);
let mut block_processor = BlockProcessor::new(
block_processor_config,
scraper.cancel_token.clone(),
scraper.startup_sync.clone(),
processing_rx,
@@ -275,8 +283,11 @@ impl NyxdScraper {
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
let block_processor_config =
BlockProcessorConfig::new(self.config.pruning_options, self.config.store_precommits);
BlockProcessor::new(
self.config.pruning_options,
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
+7 -5
View File
@@ -212,6 +212,7 @@ impl ScraperStorage {
pub async fn persist_block(
block: &FullBlockInformation,
tx: &mut StorageTransaction,
store_precommits: bool,
) -> Result<(), ScraperError> {
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
@@ -224,11 +225,12 @@ pub async fn persist_block(
// persist block data
persist_block_data(&block.block, total_gas, tx).await?;
// persist commits
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
if store_precommits {
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
+2 -2
View File
@@ -19,5 +19,5 @@ MULTISIG_CONTRACT_ADDRESS=n1zwv6feuzhy6a9wekh96cd57lsarmqlwxdypdsplw6zhfncqw6ftq
COCONUT_DKG_CONTRACT_ADDRESS=n1aakfpghcanxtc45gpqlx8j3rq0zcpyf49qmhm9mdjrfx036h4z5sy2vfh9
EXPLORER_API=https://canary-explorer.performance.nymte.ch/api
NYXD="https://canary-validator.performance.nymte.ch"
NYM_API="https://canary-api.performance.nymte.ch/api"
NYXD=https://canary-validator.performance.nymte.ch
NYM_API=https://canary-api.performance.nymte.ch/api
+3 -3
View File
@@ -21,8 +21,8 @@ COCONUT_DKG_CONTRACT_ADDRESS=n19604yflqggs9mk2z26mqygq43q2kr3n932egxx630svywd5mp
REWARDING_VALIDATOR_ADDRESS=n10yyd98e2tuwu0f7ypz9dy3hhjw7v772q6287gy
STATISTICS_SERVICE_DOMAIN_ADDRESS="https://mainnet-stats.nymte.ch:8090"
NYXD="https://rpc.nymtech.net"
NYM_API="https://validator.nymtech.net/api/"
NYXD=https://rpc.nymtech.net
NYM_API=https://validator.nymtech.net/api/
NYXD_WS="wss://rpc.nymtech.net/websocket"
EXPLORER_API="https://explorer.nymtech.net/api/"
EXPLORER_API=https://explorer.nymtech.net/api/
NYM_VPN_API="https://nymvpn.com/api"
+2 -2
View File
@@ -19,5 +19,5 @@ VESTING_CONTRACT_ADDRESS=n1jlzdxnyces4hrhqz68dqk28mrw5jgwtcfq0c2funcwrmw0dx9l9s8
REWARDING_VALIDATOR_ADDRESS=n1rfvpsynktze6wvn6ldskj8xgwfzzk5v6pnff39
EXPLORER_API=https://qa-network-explorer.qa.nymte.ch/api
NYXD="https://qa-validator.qa.nymte.ch"
NYM_API="https://qa-nym-api.qa.nymte.ch/api"
NYXD=https://qa-validator.qa.nymte.ch
NYM_API=https://qa-nym-api.qa.nymte.ch/api
+3 -3
View File
@@ -20,6 +20,6 @@ ECASH_CONTRACT_ADDRESS=n1v3vydvs2ued84yv3khqwtgldmgwn0elljsdh08dr5s2j9x4rc5fs9jl
STATISTICS_SERVICE_DOMAIN_ADDRESS="http://0.0.0.0"
EXPLORER_API=https://sandbox-explorer.nymtech.net/api
NYXD="https://rpc.sandbox.nymtech.net"
NYXD_WS="wss://rpc.sandbox.nymtech.net/websocket"
NYM_API="https://sandbox-nym-api1.nymtech.net/api"
NYXD=https://rpc.sandbox.nymtech.net
NYXD_WS=wss://rpc.sandbox.nymtech.net/websocket
NYM_API=https://sandbox-nym-api1.nymtech.net/api
+1 -1
View File
@@ -7,12 +7,12 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log.workspace = true
nym-explorer-api-requests = { path = "../explorer-api-requests" }
reqwest = { workspace = true, features = ["json"] }
serde.workspace = true
thiserror.workspace = true
url.workspace = true
tracing = {workspace = true, features = ["attributes"]}
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
+13 -3
View File
@@ -3,6 +3,7 @@ use std::time::Duration;
use reqwest::StatusCode;
use thiserror::Error;
use tracing::instrument;
use url::Url;
// Re-export request types
@@ -47,6 +48,12 @@ impl ExplorerClient {
Ok(Self { client, url })
}
#[cfg(not(target_arch = "wasm32"))]
pub fn new_with_timeout(url: url::Url, timeout: Duration) -> Result<Self, ExplorerApiError> {
let client = reqwest::Client::builder().timeout(timeout).build()?;
Ok(Self { client, url })
}
#[cfg(target_arch = "wasm32")]
pub fn new(url: url::Url) -> Result<Self, ExplorerApiError> {
let client = reqwest::Client::builder().build()?;
@@ -58,10 +65,11 @@ impl ExplorerClient {
paths: &[&str],
) -> Result<reqwest::Response, ExplorerApiError> {
let url = combine_url(self.url.clone(), paths)?;
log::trace!("Sending GET request {url:?}");
tracing::debug!("Sending GET request");
Ok(self.client.get(url).send().await?)
}
#[instrument(level = "trace", skip_all, fields(paths=?paths))]
async fn query_explorer_api<T>(&self, paths: &[&str]) -> Result<T, ExplorerApiError>
where
T: std::fmt::Debug,
@@ -70,12 +78,14 @@ impl ExplorerClient {
let response = self.send_get_request(paths).await?;
if response.status().is_success() {
let res = response.json::<T>().await?;
log::trace!("Got response: {res:?}");
tracing::trace!("Got response: {res:?}");
Ok(res)
} else if response.status() == StatusCode::NOT_FOUND {
Err(ExplorerApiError::NotFound)
} else {
Err(ExplorerApiError::RequestFailure(response.text().await?))
let status = response.status();
let err_msg = format!("{}: {}", response.text().await?, status);
Err(ExplorerApiError::RequestFailure(err_msg))
}
}
+1
View File
@@ -69,6 +69,7 @@ nym-credentials-interface = { path = "../common/credentials-interface" }
nym-credential-verification = { path = "../common/credential-verification" }
nym-crypto = { path = "../common/crypto" }
nym-gateway-storage = { path = "../common/gateway-storage" }
nym-gateway-stats-storage = { path = "../common/gateway-stats-storage" }
nym-gateway-requests = { path = "../common/gateway-requests" }
nym-mixnet-client = { path = "../common/client-libs/mixnet-client" }
nym-mixnode-common = { path = "../common/mixnode-common" }
+8 -1
View File
@@ -12,6 +12,7 @@ pub const DEFAULT_PRIVATE_SPHINX_KEY_FILENAME: &str = "private_sphinx.pem";
pub const DEFAULT_PUBLIC_SPHINX_KEY_FILENAME: &str = "public_sphinx.pem";
pub const DEFAULT_CLIENTS_STORAGE_FILENAME: &str = "db.sqlite";
pub const DEFAULT_STATS_STORAGE_FILENAME: &str = "stats.sqlite";
pub const DEFAULT_NETWORK_REQUESTER_CONFIG_FILENAME: &str = "network_requester_config.toml";
pub const DEFAULT_NETWORK_REQUESTER_DATA_DIR: &str = "network-requester-data";
@@ -39,6 +40,9 @@ pub struct GatewayPaths {
#[serde(alias = "persistent_storage")]
pub clients_storage: PathBuf,
/// Path to sqlite database containing all persistent stats data.
pub stats_storage: PathBuf,
/// Path to the configuration of the embedded network requester.
#[serde(deserialize_with = "de_maybe_stringified")]
pub network_requester_config: Option<PathBuf>,
@@ -54,7 +58,9 @@ impl GatewayPaths {
pub fn new_default<P: AsRef<Path>>(id: P) -> Self {
GatewayPaths {
keys: KeysPaths::new_default(id.as_ref()),
clients_storage: default_data_directory(id).join(DEFAULT_CLIENTS_STORAGE_FILENAME),
clients_storage: default_data_directory(id.as_ref())
.join(DEFAULT_CLIENTS_STORAGE_FILENAME),
stats_storage: default_data_directory(id).join(DEFAULT_STATS_STORAGE_FILENAME),
// node_description: default_config_filepath(id).join(DEFAULT_DESCRIPTION_FILENAME),
network_requester_config: None,
ip_packet_router_config: None,
@@ -70,6 +76,7 @@ impl GatewayPaths {
public_sphinx_key_file: Default::default(),
},
clients_storage: Default::default(),
stats_storage: Default::default(),
network_requester_config: None,
ip_packet_router_config: None,
}
+7
View File
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: GPL-3.0-only
use nym_authenticator::error::AuthenticatorError;
use nym_gateway_stats_storage::error::StatsStorageError;
use nym_gateway_storage::error::StorageError;
use nym_ip_packet_router::error::IpPacketRouterError;
use nym_network_requester::error::{ClientCoreError, NetworkRequesterError};
@@ -115,6 +116,12 @@ pub enum GatewayError {
source: StorageError,
},
#[error("stats storage failure: {source}")]
StatsStorageError {
#[from]
source: StatsStorageError,
},
#[error("Path to network requester configuration file hasn't been specified. Perhaps try to run `setup-network-requester`?")]
UnspecifiedNetworkRequesterConfig,
+9
View File
@@ -5,6 +5,7 @@ use crate::config::Config;
use crate::error::GatewayError;
use nym_crypto::asymmetric::encryption;
use nym_gateway_stats_storage::PersistentStatsStorage;
use nym_gateway_storage::PersistentStorage;
use nym_pemstore::traits::PemStorableKeyPair;
use nym_pemstore::KeyPairPath;
@@ -74,6 +75,14 @@ pub(crate) async fn initialise_main_storage(
Ok(PersistentStorage::init(path, retrieval_limit).await?)
}
pub(crate) async fn initialise_stats_storage(
config: &Config,
) -> Result<PersistentStatsStorage, GatewayError> {
let path = &config.storage_paths.stats_storage;
Ok(PersistentStatsStorage::init(path).await?)
}
pub fn load_keypair<T: PemStorableKeyPair>(
paths: KeyPairPath,
name: impl Into<String>,
+25 -14
View File
@@ -12,7 +12,9 @@ use crate::http::HttpApiBuilder;
use crate::node::client_handling::active_clients::ActiveClientsStore;
use crate::node::client_handling::embedded_clients::{LocalEmbeddedClientHandle, MessageRouter};
use crate::node::client_handling::websocket;
use crate::node::helpers::{initialise_main_storage, load_network_requester_config};
use crate::node::helpers::{
initialise_main_storage, initialise_stats_storage, load_network_requester_config,
};
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
use futures::channel::{mpsc, oneshot};
use nym_credential_verification::ecash::{
@@ -41,6 +43,7 @@ pub(crate) mod helpers;
pub(crate) mod mixnet_handling;
pub(crate) mod statistics;
pub use nym_gateway_stats_storage::PersistentStatsStorage;
pub use nym_gateway_storage::{PersistentStorage, Storage};
// TODO: should this struct live here?
@@ -96,6 +99,8 @@ pub async fn create_gateway(
let storage = initialise_main_storage(&config).await?;
let stats_storage = initialise_stats_storage(&config).await?;
let nr_opts = network_requester_config.map(|config| LocalNetworkRequesterOpts {
config: config.clone(),
custom_mixnet_path: custom_mixnet.clone(),
@@ -106,7 +111,7 @@ pub async fn create_gateway(
custom_mixnet_path: custom_mixnet.clone(),
});
Gateway::new(config, nr_opts, ip_opts, storage)
Gateway::new(config, nr_opts, ip_opts, storage, stats_storage)
}
#[derive(Debug, Clone)]
@@ -147,7 +152,9 @@ pub struct Gateway<St = PersistentStorage> {
/// x25519 keypair used for Diffie-Hellman. Currently only used for sphinx key derivation.
sphinx_keypair: Arc<encryption::KeyPair>,
storage: St,
client_storage: St,
stats_storage: PersistentStatsStorage,
wireguard_data: Option<nym_wireguard::WireguardData>,
@@ -163,10 +170,12 @@ impl<St> Gateway<St> {
config: Config,
network_requester_opts: Option<LocalNetworkRequesterOpts>,
ip_packet_router_opts: Option<LocalIpPacketRouterOpts>,
storage: St,
client_storage: St,
stats_storage: PersistentStatsStorage,
) -> Result<Self, GatewayError> {
Ok(Gateway {
storage,
client_storage,
stats_storage,
identity_keypair: Arc::new(load_identity_keys(&config)?),
sphinx_keypair: Arc::new(helpers::load_sphinx_keys(&config)?),
config,
@@ -179,7 +188,7 @@ impl<St> Gateway<St> {
task_client: None,
})
}
#[allow(clippy::too_many_arguments)]
pub fn new_loaded(
config: Config,
network_requester_opts: Option<LocalNetworkRequesterOpts>,
@@ -187,7 +196,8 @@ impl<St> Gateway<St> {
authenticator_opts: Option<LocalAuthenticatorOpts>,
identity_keypair: Arc<identity::KeyPair>,
sphinx_keypair: Arc<encryption::KeyPair>,
storage: St,
client_storage: St,
stats_storage: PersistentStatsStorage,
) -> Self {
Gateway {
config,
@@ -196,7 +206,8 @@ impl<St> Gateway<St> {
authenticator_opts,
identity_keypair,
sphinx_keypair,
storage,
client_storage,
stats_storage,
wireguard_data: None,
session_stats: None,
run_http_server: true,
@@ -240,7 +251,7 @@ impl<St> Gateway<St> {
let connection_handler = ConnectionHandler::new(
packet_processor,
self.storage.clone(),
self.client_storage.clone(),
ack_sender,
active_clients_store,
);
@@ -275,7 +286,7 @@ impl<St> Gateway<St> {
forwarding_channel,
router_tx,
);
let all_peers = self.storage.get_all_wireguard_peers().await?;
let all_peers = self.client_storage.get_all_wireguard_peers().await?;
let used_private_network_ips = all_peers
.iter()
.cloned()
@@ -330,7 +341,7 @@ impl<St> Gateway<St> {
.start_with_shutdown(router_shutdown);
let wg_api = nym_wireguard::start_wireguard(
self.storage.clone(),
self.client_storage.clone(),
all_peers,
shutdown,
wireguard_data,
@@ -377,7 +388,7 @@ impl<St> Gateway<St> {
let shared_state = websocket::CommonHandlerState {
ecash_verifier,
storage: self.storage.clone(),
storage: self.client_storage.clone(),
local_identity: Arc::clone(&self.identity_keypair),
only_coconut_credentials: self.config.gateway.only_coconut_credentials,
bandwidth_cfg: (&self.config).into(),
@@ -415,7 +426,7 @@ impl<St> Gateway<St> {
info!("Starting gateway stats collector...");
let (mut stats_collector, stats_event_sender) =
GatewayStatisticsCollector::new(shared_session_stats);
GatewayStatisticsCollector::new(shared_session_stats, self.stats_storage.clone());
tokio::spawn(async move { stats_collector.run(shutdown).await });
stats_event_sender
}
@@ -654,7 +665,7 @@ impl<St> Gateway<St> {
nyxd_client,
self.identity_keypair.public_key().to_bytes(),
shutdown.fork("EcashVerifier"),
self.storage.clone(),
self.client_storage.clone(),
)
.await?,
);
+25 -4
View File
@@ -2,13 +2,14 @@
// SPDX-License-Identifier: GPL-3.0-only
use futures::{channel::mpsc, StreamExt};
use nym_gateway_stats_storage::PersistentStatsStorage;
use nym_node_http_api::state::metrics::SharedSessionStats;
use nym_statistics_common::events::{StatsEvent, StatsEventReceiver, StatsEventSender};
use nym_task::TaskClient;
use sessions::SessionStatsHandler;
use std::time::Duration;
use time::OffsetDateTime;
use tracing::trace;
use tracing::{error, trace, warn};
pub mod sessions;
@@ -23,21 +24,38 @@ pub(crate) struct GatewayStatisticsCollector {
impl GatewayStatisticsCollector {
pub fn new(
shared_session_stats: SharedSessionStats,
stats_storage: PersistentStatsStorage,
) -> (GatewayStatisticsCollector, StatsEventSender) {
let (stats_event_tx, stats_event_rx) = mpsc::unbounded();
let session_stats = SessionStatsHandler::new(shared_session_stats, stats_storage);
let collector = GatewayStatisticsCollector {
stats_event_rx,
session_stats: SessionStatsHandler::new(shared_session_stats),
session_stats,
};
(collector, stats_event_tx)
}
async fn update_shared_state(&mut self, update_time: OffsetDateTime) {
self.session_stats.update_shared_state(update_time).await;
if let Err(e) = self
.session_stats
.maybe_update_shared_state(update_time)
.await
{
error!("Failed to update session stats - {e}");
}
//here goes additionnal stats handler update
}
async fn on_start(&mut self) {
if let Err(e) = self.session_stats.on_start().await {
error!("Failed to cleanup session stats handler - {e}");
}
//here goes additionnal stats handler start cleanup
}
pub async fn run(&mut self, mut shutdown: TaskClient) {
self.on_start().await;
let mut update_interval = tokio::time::interval(STATISTICS_UPDATE_TIMER_INTERVAL);
while !shutdown.is_shutdown() {
tokio::select! {
@@ -53,7 +71,10 @@ impl GatewayStatisticsCollector {
Some(stat_event) = self.stats_event_rx.next() => {
//dispatching event to proper handler
match stat_event {
StatsEvent::SessionStatsEvent(event) => self.session_stats.handle_event(event),
StatsEvent::SessionStatsEvent(event) => {
if let Err(e) = self.session_stats.handle_event(event).await{
warn!("Session event handling error - {e}");
}},
}
},
+113 -131
View File
@@ -2,176 +2,158 @@
// SPDX-License-Identifier: GPL-3.0-only
use nym_credentials_interface::TicketType;
use nym_gateway_stats_storage::models::FinishedSession;
use nym_gateway_stats_storage::PersistentStatsStorage;
use nym_gateway_stats_storage::{error::StatsStorageError, models::ActiveSession};
use nym_node_http_api::state::metrics::SharedSessionStats;
use nym_sphinx::DestinationAddressBytes;
use std::collections::{HashMap, HashSet};
use time::{Date, Duration, OffsetDateTime};
use nym_statistics_common::events::SessionEvent;
const FINISHED_SESSIONS_CAP: usize = 1_000_000; //to be on the safe side of memory blowups until persistent storage
#[derive(PartialEq)]
enum SessionType {
Vpn,
Mixnet,
Unknown,
}
impl SessionType {
fn to_string(&self) -> &str {
match self {
Self::Vpn => "vpn",
Self::Mixnet => "mixnet",
Self::Unknown => "unknown",
}
}
}
impl From<TicketType> for SessionType {
fn from(value: TicketType) -> Self {
match value {
TicketType::V1MixnetEntry => Self::Mixnet,
TicketType::V1MixnetExit => Self::Mixnet,
TicketType::V1WireguardEntry => Self::Vpn,
TicketType::V1WireguardExit => Self::Vpn,
}
}
}
struct FinishedSession {
duration: Duration,
typ: SessionType,
}
impl FinishedSession {
fn serialize(&self) -> (u64, String) {
(
self.duration.whole_milliseconds() as u64, //we are sure that it fits in a u64, see `fn end_at`
self.typ.to_string().into(),
)
}
}
struct ActiveSession {
start: OffsetDateTime,
typ: SessionType,
}
impl ActiveSession {
fn new(start_time: OffsetDateTime) -> Self {
ActiveSession {
start: start_time,
typ: SessionType::Unknown,
}
}
fn set_type(&mut self, ticket_type: TicketType) {
self.typ = ticket_type.into();
}
fn end_at(self, stop_time: OffsetDateTime) -> Option<FinishedSession> {
let session_duration = stop_time - self.start;
//ensure duration is positive to fit in a u64
//u64::max milliseconds is 500k millenia so no overflow issue
if session_duration > Duration::ZERO {
Some(FinishedSession {
duration: session_duration,
typ: self.typ,
})
} else {
None
}
}
}
pub(crate) struct SessionStatsHandler {
last_update_day: Date,
storage: PersistentStatsStorage,
current_day: Date,
shared_session_stats: SharedSessionStats,
active_sessions: HashMap<DestinationAddressBytes, ActiveSession>,
unique_users: HashSet<DestinationAddressBytes>,
sessions_started: u32,
finished_sessions: Vec<FinishedSession>,
}
impl SessionStatsHandler {
pub fn new(shared_session_stats: SharedSessionStats) -> Self {
pub fn new(shared_session_stats: SharedSessionStats, storage: PersistentStatsStorage) -> Self {
SessionStatsHandler {
last_update_day: OffsetDateTime::now_utc().date(),
storage,
current_day: OffsetDateTime::now_utc().date(),
shared_session_stats,
active_sessions: Default::default(),
unique_users: Default::default(),
sessions_started: 0,
finished_sessions: Default::default(),
}
}
pub(crate) fn handle_event(&mut self, event: SessionEvent) {
pub(crate) async fn handle_event(
&mut self,
event: SessionEvent,
) -> Result<(), StatsStorageError> {
match event {
SessionEvent::SessionStart { start_time, client } => {
self.handle_session_start(start_time, client);
self.handle_session_start(start_time, client).await
}
SessionEvent::SessionStop { stop_time, client } => {
self.handle_session_stop(stop_time, client);
self.handle_session_stop(stop_time, client).await
}
SessionEvent::EcashTicket {
ticket_type,
client,
} => self.handle_ecash_ticket(ticket_type, client),
} => self.handle_ecash_ticket(ticket_type, client).await,
}
}
fn handle_session_start(
async fn handle_session_start(
&mut self,
start_time: OffsetDateTime,
client: DestinationAddressBytes,
) {
self.sessions_started += 1;
self.unique_users.insert(client);
self.active_sessions
.insert(client, ActiveSession::new(start_time));
}
fn handle_session_stop(&mut self, stop_time: OffsetDateTime, client: DestinationAddressBytes) {
if let Some(session) = self.active_sessions.remove(&client) {
if let Some(finished_session) = session.end_at(stop_time) {
if self.finished_sessions.len() < FINISHED_SESSIONS_CAP {
self.finished_sessions.push(finished_session);
}
}
}
) -> Result<(), StatsStorageError> {
self.storage
.insert_unique_user(self.current_day, client.as_base58_string())
.await?;
self.storage
.insert_active_session(client, ActiveSession::new(start_time))
.await?;
Ok(())
}
fn handle_ecash_ticket(&mut self, ticket_type: TicketType, client: DestinationAddressBytes) {
if let Some(active_session) = self.active_sessions.get_mut(&client) {
if active_session.typ == SessionType::Unknown {
active_session.set_type(ticket_type);
async fn handle_session_stop(
&mut self,
stop_time: OffsetDateTime,
client: DestinationAddressBytes,
) -> Result<(), StatsStorageError> {
if let Some(session) = self.storage.get_active_session(client).await? {
if let Some(finished_session) = session.end_at(stop_time) {
self.storage
.insert_finished_session(self.current_day, finished_session)
.await?;
self.storage.delete_active_session(client).await?;
}
}
Ok(())
}
async fn handle_ecash_ticket(
&mut self,
ticket_type: TicketType,
client: DestinationAddressBytes,
) -> Result<(), StatsStorageError> {
self.storage
.update_active_session_type(client, ticket_type.into())
.await?;
Ok(())
}
pub(crate) async fn on_start(&mut self) -> Result<(), StatsStorageError> {
let yesterday = OffsetDateTime::now_utc().date() - Duration::DAY;
//publish yesterday's data if any
self.publish_stats(yesterday).await?;
//store "active" sessions as duration 0
for active_session in self.storage.get_all_active_sessions().await? {
self.storage
.insert_finished_session(
self.current_day,
FinishedSession {
duration: Duration::ZERO,
typ: active_session.typ,
},
)
.await?
}
//cleanup active sessions
self.storage.cleanup_active_sessions().await?;
//delete old entries
self.delete_old_stats(yesterday - Duration::DAY).await?;
Ok(())
}
//update shared state once a day has passed, with data from the previous day
pub(crate) async fn update_shared_state(&mut self, update_time: OffsetDateTime) {
let update_date = update_time.date();
if update_date != self.last_update_day {
{
let mut shared_state = self.shared_session_stats.write().await;
shared_state.update_time = self.last_update_day;
shared_state.unique_active_users = self.unique_users.len() as u32;
shared_state.session_started = self.sessions_started;
shared_state.sessions = self
.finished_sessions
.iter()
.map(|s| s.serialize())
.collect();
}
self.reset_stats(update_date);
async fn publish_stats(&mut self, stats_date: Date) -> Result<(), StatsStorageError> {
let finished_sessions = self.storage.get_finished_sessions(stats_date).await?;
let user_count = self.storage.get_unique_users_count(stats_date).await?;
let session_started = self.storage.get_started_sessions_count(stats_date).await? as u32;
{
let mut shared_state = self.shared_session_stats.write().await;
shared_state.update_time = stats_date;
shared_state.unique_active_users = user_count as u32;
shared_state.session_started = session_started;
shared_state.sessions = finished_sessions.iter().map(|s| s.serialize()).collect();
}
Ok(())
}
pub(crate) async fn maybe_update_shared_state(
&mut self,
update_time: OffsetDateTime,
) -> Result<(), StatsStorageError> {
let update_date = update_time.date();
if update_date != self.current_day {
self.publish_stats(self.current_day).await?;
self.delete_old_stats(self.current_day - Duration::DAY)
.await?;
self.reset_stats(update_date).await?;
self.current_day = update_date;
}
Ok(())
}
fn reset_stats(&mut self, reset_day: Date) {
self.last_update_day = reset_day;
self.unique_users = self.active_sessions.keys().copied().collect();
self.finished_sessions = Default::default();
self.sessions_started = 0;
async fn reset_stats(&mut self, reset_day: Date) -> Result<(), StatsStorageError> {
//active users reset
let new_active_users = self.storage.get_active_users().await?;
for user in new_active_users {
self.storage.insert_unique_user(reset_day, user).await?;
}
Ok(())
}
async fn delete_old_stats(&mut self, delete_before: Date) -> Result<(), StatsStorageError> {
self.storage.delete_finished_sessions(delete_before).await?;
self.storage.delete_unique_users(delete_before).await?;
Ok(())
}
}
+1
View File
@@ -68,6 +68,7 @@ warning: no queries found; do you have the `offline` feature enabled
### Possible solutions
- does your `sqlx-cli` version match `sqlx` version from `Cargo.toml`?
+ `cargo install -f sqlx-cli --version <specific version>`
```
cargo install sqlx-cli --version <exact semver version as sqlx> --force
```
+1
View File
@@ -0,0 +1 @@
nym-gateway-probe
+27
View File
@@ -0,0 +1,27 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
[package]
name = "nym-node-status-agent"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
anyhow = { workspace = true}
clap = { workspace = true, features = ["derive", "env"] }
nym-bin-common = { path = "../common/bin-common", features = ["models"]}
nym-common-models = { path = "../common/models" }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
reqwest = { workspace = true, features = ["json"] }
serde_json = { workspace = true }
+28
View File
@@ -0,0 +1,28 @@
FROM rust:latest AS builder
RUN apt update && apt install -yy libdbus-1-dev pkg-config libclang-dev
# Install go
RUN wget https://go.dev/dl/go1.22.5.linux-amd64.tar.gz -O go.tar.gz
RUN tar -xzvf go.tar.gz -C /usr/local
RUN git clone https://github.com/nymtech/nym-vpn-client /usr/src/nym-vpn-client
ENV PATH=/go/bin:/usr/local/go/bin:$PATH
WORKDIR /usr/src/nym-vpn-client/nym-vpn-core
RUN cargo build --release --package nym-gateway-probe
COPY ./ /usr/src/nym
WORKDIR /usr/src/nym/nym-node-status-agent
RUN cargo build --release
FROM ubuntu:24.04
RUN apt-get update && apt-get install -y ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/target/release/nym-node-status-agent ./
COPY --from=builder /usr/src/nym-vpn-client/nym-vpn-core/target/release/nym-gateway-probe ./
ENV NODE_STATUS_AGENT_PROBE_PATH=/nym/nym-gateway-probe
ENTRYPOINT [ "/nym/nym-node-status-agent", "run-probe" ]
+49
View File
@@ -0,0 +1,49 @@
#!/bin/bash
set -eu
export RUST_LOG=${RUST_LOG:-debug}
crate_root=$(dirname $(realpath "$0"))
gateway_probe_src=$(dirname $(dirname "$crate_root"))/nym-vpn-client/nym-vpn-core
echo "gateway_probe_src=$gateway_probe_src"
echo "crate_root=$crate_root"
export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
# build & copy over GW probe
function copy_gw_probe() {
pushd $gateway_probe_src
cargo build --release --package nym-gateway-probe
cp target/release/nym-gateway-probe "$crate_root"
$crate_root/nym-gateway-probe --version
popd
}
function build_agent() {
cargo build --package nym-node-status-agent --release
}
function swarm() {
local workers=$1
echo "Running $workers in parallel"
build_agent
for ((i=1; i<=$workers; i++)); do
../target/release/nym-node-status-agent run-probe &
done
wait
echo "All agents completed"
}
export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
export NODE_STATUS_AGENT_SERVER_PORT="8000"
copy_gw_probe
swarm 30
# cargo run -- run-probe
+109
View File
@@ -0,0 +1,109 @@
use clap::{Parser, Subcommand};
use nym_bin_common::bin_info;
use nym_common_models::ns_api::TestrunAssignment;
use std::sync::OnceLock;
use tracing::instrument;
use crate::probe::GwProbe;
// Helper for passing LONG_VERSION to clap
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
#[derive(Parser, Debug)]
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
pub(crate) struct Args {
#[command(subcommand)]
pub(crate) command: Command,
#[arg(short, long, env = "NODE_STATUS_AGENT_SERVER_ADDRESS")]
pub(crate) server_address: String,
#[arg(short = 'p', long, env = "NODE_STATUS_AGENT_SERVER_PORT")]
pub(crate) server_port: u16,
// TODO dz accept keypair for identification / auth
}
#[derive(Subcommand, Debug)]
pub(crate) enum Command {
RunProbe {
/// path of binary to run
#[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")]
probe_path: String,
#[arg(short, long, env = "NODE_STATUS_AGENT_GATEWAY_ID")]
gateway_id: Option<String>,
},
}
impl Args {
pub(crate) async fn execute(&self) -> anyhow::Result<()> {
match &self.command {
Command::RunProbe {
probe_path,
gateway_id,
} => self.run_probe(probe_path, gateway_id).await?,
}
Ok(())
}
async fn run_probe(&self, probe_path: &str, gateway_id: &Option<String>) -> anyhow::Result<()> {
let server_address = format!("{}:{}", &self.server_address, self.server_port);
let probe = GwProbe::new(probe_path.to_string());
let version = probe.version().await;
tracing::info!("Probe version:\n{}", version);
let testrun = request_testrun(&server_address).await?;
let log = probe.run_and_get_log(gateway_id);
submit_results(&server_address, testrun.testrun_id, log).await?;
Ok(())
}
}
const URL_BASE: &str = "internal/testruns";
#[instrument(level = "debug", skip_all)]
async fn request_testrun(server_addr: &str) -> anyhow::Result<TestrunAssignment> {
let target_url = format!("{}/{}", server_addr, URL_BASE);
let client = reqwest::Client::new();
let res = client
.get(target_url)
.send()
.await
.and_then(|response| response.error_for_status())?;
res.json()
.await
.map(|testrun| {
tracing::info!("Received testrun assignment: {:?}", testrun);
testrun
})
.map_err(|err| {
tracing::error!("err");
err.into()
})
}
#[instrument(level = "debug", skip(probe_outcome))]
async fn submit_results(
server_addr: &str,
testrun_id: i64,
probe_outcome: String,
) -> anyhow::Result<()> {
let target_url = format!("{}/{}/{}", server_addr, URL_BASE, testrun_id);
let client = reqwest::Client::new();
let res = client
.post(target_url)
.body(probe_outcome)
.send()
.await
.and_then(|response| response.error_for_status())?;
tracing::debug!("Submitted results: {})", res.status());
Ok(())
}
+78
View File
@@ -0,0 +1,78 @@
use crate::cli::Args;
use clap::Parser;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{filter::Directive, EnvFilter};
mod cli;
mod probe;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
setup_tracing();
let args = Args::parse();
let server_addr = format!("{}:{}", args.server_address, args.server_port);
test_ns_api_conn(&server_addr).await?;
args.execute().await?;
Ok(())
}
async fn test_ns_api_conn(server_addr: &str) -> anyhow::Result<()> {
reqwest::get(server_addr)
.await
.map(|res| {
tracing::info!(
"Testing connection to NS API at {server_addr}: {}",
res.status()
);
})
.map_err(|err| anyhow::anyhow!("Couldn't connect to server on {}: {}", server_addr, err))
}
pub(crate) fn setup_tracing() {
fn directive_checked(directive: impl Into<String>) -> Directive {
directive
.into()
.parse()
.expect("Failed to parse log directive")
}
let log_builder = tracing_subscriber::fmt()
// Use a more compact, abbreviated log format
.compact()
// Display source code file paths
.with_file(true)
// Display source code line numbers
.with_line_number(true)
.with_thread_ids(true)
// Don't display the event's target (module path)
.with_target(false);
let mut filter = EnvFilter::builder()
// if RUST_LOG isn't set, set default level
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
// these crates are more granularly filtered
let filter_crates = [
"reqwest",
"rustls",
"hyper",
"sqlx",
"h2",
"tendermint_rpc",
"tower_http",
"axum",
];
for crate_name in filter_crates {
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name)));
}
filter = filter.add_directive(directive_checked("nym_bin_common=debug"));
filter = filter.add_directive(directive_checked("nym_explorer_client=debug"));
filter = filter.add_directive(directive_checked("nym_network_defaults=debug"));
filter = filter.add_directive(directive_checked("nym_validator_client=debug"));
log_builder.with_env_filter(filter).init();
}
+54
View File
@@ -0,0 +1,54 @@
use tracing::error;
pub(crate) struct GwProbe {
path: String,
}
impl GwProbe {
pub(crate) fn new(probe_path: String) -> Self {
Self { path: probe_path }
}
pub(crate) async fn version(&self) -> String {
let mut command = tokio::process::Command::new(&self.path);
command.stdout(std::process::Stdio::piped());
command.arg("--version");
match command.spawn() {
Ok(child) => {
if let Ok(output) = child.wait_with_output().await {
return String::from_utf8(output.stdout)
.unwrap_or("Unable to get log from test run".to_string());
}
"Unable to get probe version".to_string()
}
Err(e) => {
error!("Failed to get probe version: {}", e);
"Failed to get probe version".to_string()
}
}
}
pub(crate) fn run_and_get_log(&self, gateway_key: &Option<String>) -> String {
let mut command = std::process::Command::new(&self.path);
command.stdout(std::process::Stdio::piped());
if let Some(gateway_id) = gateway_key {
command.arg("--gateway").arg(gateway_id);
}
match command.spawn() {
Ok(child) => {
if let Ok(output) = child.wait_with_output() {
return String::from_utf8(output.stdout)
.unwrap_or("Unable to get log from test run".to_string());
}
"Unable to get log from test run".to_string()
}
Err(e) => {
error!("Failed to spawn test: {}", e);
"Failed to spawn test run task".to_string()
}
}
}
}
+6
View File
@@ -0,0 +1,6 @@
data/
enter_db.sh
nym-gateway-probe
nym-node-status-api
*.sqlite
*.sqlite-journal
+66
View File
@@ -0,0 +1,66 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
[package]
name = "nym-node-status-api"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
[dependencies]
anyhow = { workspace = true }
axum = { workspace = true, features = ["tokio", "macros"] }
chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
cosmwasm-std = { workspace = true }
envy = { workspace = true }
futures-util = { workspace = true }
moka = { workspace = true, features = ["future"] }
nym-bin-common = { path = "../common/bin-common", features = ["models"]}
nym-common-models = { path = "../common/models" }
nym-explorer-client = { path = "../explorer-api/explorer-client" }
# TODO dz: before Nym API client breaking changes. Update to latest develop once new Nym API is live
nym-network-defaults = { git = "https://github.com/nymtech/nym", branch = "pre-dir-v2-fork" }
nym-validator-client = { git = "https://github.com/nymtech/nym", branch = "pre-dir-v2-fork" }
# nym-network-defaults = { path = "../common/network-defaults" }
# nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-task = { path = "../common/task" }
nym-node-requests = { git = "https://github.com/nymtech/nym", branch = "pre-dir-v2-fork" }
# nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_json_path = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing-log = { workspace = true }
tower-http = { workspace = true, features = ["cors", "trace"] }
utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
# TODO dz `cargo update async-trait`
# for automatic schema detection, which was merged, but not released yet
# https://github.com/ProbablyClem/utoipauto/pull/38
# utoipauto = { git = "https://github.com/ProbablyClem/utoipauto", rev = "eb04cba" }
utoipauto = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros"] }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
+15
View File
@@ -0,0 +1,15 @@
FROM rust:latest AS builder
COPY ./ /usr/src/nym
WORKDIR /usr/src/nym/nym-node-status-api
RUN cargo build --release
FROM ubuntu:24.04
RUN apt-get update && apt-get install -y ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/target/release/nym-node-status-api ./
ENTRYPOINT [ "/nym/nym-node-status-api" ]
+8
View File
@@ -0,0 +1,8 @@
FROM ubuntu:22.04
RUN apt-get update && apt-get install -y ca-certificates
WORKDIR /nym
COPY nym-node-status-api/nym-node-status-api ./
ENTRYPOINT [ "/nym/nym-node-status-api" ]
+51
View File
@@ -0,0 +1,51 @@
use anyhow::{anyhow, Result};
use sqlx::{Connection, SqliteConnection};
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
use tokio::{fs::File, io::AsyncWriteExt};
const SQLITE_DB_FILENAME: &str = "nym-node-status-api.sqlite";
/// If you need to re-run migrations or reset the db, just run
/// cargo clean -p nym-node-status-api
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let out_dir = read_env_var("OUT_DIR")?;
let database_path = format!("sqlite://{}/{}?mode=rwc", out_dir, SQLITE_DB_FILENAME);
write_db_path_to_file(&out_dir, SQLITE_DB_FILENAME).await?;
let mut conn = SqliteConnection::connect(&database_path).await?;
sqlx::migrate!("./migrations").run(&mut conn).await?;
#[cfg(target_family = "unix")]
println!("cargo::rustc-env=DATABASE_URL=sqlite://{}", &database_path);
#[cfg(target_family = "windows")]
// for some strange reason we need to add a leading `/` to the windows path even though it's
// not a valid windows path... but hey, it works...
println!("cargo::rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
rerun_if_changed();
Ok(())
}
fn read_env_var(var: &str) -> Result<String> {
std::env::var(var).map_err(|_| anyhow!("You need to set {} env var", var))
}
fn rerun_if_changed() {
println!("cargo::rerun-if-changed=migrations");
println!("cargo::rerun-if-changed=src/db/queries");
}
/// use `./enter_db.sh` to inspect DB
async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Result<()> {
let mut file = File::create("enter_db.sh").await?;
let _ = file.write(b"#!/bin/bash\n").await?;
file.write_all(format!("sqlite3 {}/{}", out_dir, db_filename).as_bytes())
.await?;
file.set_permissions(Permissions::from_mode(0o755))
.await
.map_err(From::from)
}
+39
View File
@@ -0,0 +1,39 @@
#!/bin/bash
set -e
export RUST_LOG=${RUST_LOG:-debug}
export NYM_API_CLIENT_TIMEOUT=60
export EXPLORER_CLIENT_TIMEOUT=60
export ENVIRONMENT="mainnet.env"
function run_bare() {
# export necessary env vars
set -a
source ../envs/$ENVIRONMENT
set +a
export RUST_LOG=debug
# --conection-url is provided in build.rs
cargo run --package nym-node-status-api
}
function run_docker() {
cargo build --package nym-node-status-api --release
cp ../target/release/nym-node-status-api .
cd ..
docker build -t node-status-api -f nym-node-status-api/Dockerfile.dev .
docker run --env-file envs/${ENVIRONMENT} \
-e EXPLORER_CLIENT_TIMEOUT=$EXPLORER_CLIENT_TIMEOUT \
-e NYM_API_CLIENT_TIMEOUT=$NYM_API_CLIENT_TIMEOUT \
-e DATABASE_URL="sqlite://node-status-api.sqlite?mode=rwc" \
-e RUST_LOG=${RUST_LOG} node-status-api
}
run_bare
# run_docker
+112
View File
@@ -0,0 +1,112 @@
CREATE TABLE gateways
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_identity_key VARCHAR NOT NULL UNIQUE,
self_described VARCHAR,
explorer_pretty_bond VARCHAR,
last_probe_result VARCHAR,
last_probe_log VARCHAR,
config_score INTEGER NOT NULL DEFAULT (0),
config_score_successes REAL NOT NULL DEFAULT (0),
config_score_samples REAL NOT NULL DEFAULT (0),
routing_score INTEGER NOT NULL DEFAULT (0),
routing_score_successes REAL NOT NULL DEFAULT (0),
routing_score_samples REAL NOT NULL DEFAULT (0),
test_run_samples REAL NOT NULL DEFAULT (0),
last_testrun_utc INTEGER,
last_updated_utc INTEGER NOT NULL,
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
blacklisted INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
performance INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX idx_gateway_description_gateway_identity_key ON gateways (gateway_identity_key);
CREATE TABLE mixnodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
identity_key VARCHAR NOT NULL UNIQUE,
mix_id INTEGER NOT NULL UNIQUE,
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
total_stake INTEGER NOT NULL,
host VARCHAR NOT NULL,
http_api_port INTEGER NOT NULL,
blacklisted INTEGER CHECK (blacklisted in (0, 1)) NOT NULL DEFAULT 0,
full_details VARCHAR,
self_described VARCHAR,
last_updated_utc INTEGER NOT NULL
, is_dp_delegatee INTEGER CHECK (is_dp_delegatee IN (0, 1)) NOT NULL DEFAULT 0);
CREATE INDEX idx_mixnodes_mix_id ON mixnodes (mix_id);
CREATE INDEX idx_mixnodes_identity_key ON mixnodes (identity_key);
CREATE TABLE
mixnode_description (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mix_id INTEGER UNIQUE NOT NULL,
moniker VARCHAR,
website VARCHAR,
security_contact VARCHAR,
details VARCHAR,
last_updated_utc INTEGER NOT NULL,
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id)
);
-- Indexes for description table
CREATE INDEX idx_mixnode_description_mix_id ON mixnode_description (mix_id);
CREATE TABLE summary
(
key VARCHAR PRIMARY KEY,
value_json VARCHAR,
last_updated_utc INTEGER NOT NULL
);
CREATE TABLE summary_history
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
date VARCHAR UNIQUE NOT NULL,
timestamp_utc INTEGER NOT NULL,
value_json VARCHAR
);
CREATE INDEX idx_summary_history_timestamp_utc ON summary_history (timestamp_utc);
CREATE INDEX idx_summary_history_date ON summary_history (date);
CREATE TABLE gateway_description (
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_identity_key VARCHAR UNIQUE NOT NULL,
moniker VARCHAR,
website VARCHAR,
security_contact VARCHAR,
details VARCHAR,
last_updated_utc INTEGER NOT NULL,
FOREIGN KEY (gateway_identity_key) REFERENCES gateways (gateway_identity_key)
);
CREATE TABLE
mixnode_daily_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mix_id INTEGER NOT NULL,
total_stake BIGINT NOT NULL,
date_utc VARCHAR NOT NULL,
packets_received INTEGER DEFAULT 0,
packets_sent INTEGER DEFAULT 0,
packets_dropped INTEGER DEFAULT 0,
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id),
UNIQUE (mix_id, date_utc) -- This constraint automatically creates an index
);
CREATE TABLE testruns
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
gateway_id INTEGER,
status INTEGER NOT NULL, -- 0=pending, 1=in-progress, 2=complete
timestamp_utc INTEGER NOT NULL,
ip_address VARCHAR NOT NULL,
log VARCHAR NOT NULL,
FOREIGN KEY (gateway_id) REFERENCES gateways (id)
);
+77
View File
@@ -0,0 +1,77 @@
use clap::Parser;
use nym_bin_common::bin_info;
use reqwest::Url;
use std::{sync::OnceLock, time::Duration};
// Helper for passing LONG_VERSION to clap
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
#[derive(Clone, Debug, Parser)]
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
pub(crate) struct Cli {
/// Network name for the network to which we're connecting.
#[clap(long, env = "NETWORK_NAME")]
pub(crate) network_name: String,
/// Explorer api url.
#[clap(short, long, env = "EXPLORER_API")]
pub(crate) explorer_api: String,
/// Nym api url.
#[clap(short, long, env = "NYM_API")]
pub(crate) nym_api: String,
/// TTL for the http cache.
#[clap(
long,
default_value_t = 30,
env = "NYM_NODE_STATUS_API_NYM_HTTP_CACHE_TTL"
)]
pub(crate) nym_http_cache_ttl: u64,
/// HTTP port on which to run node status api.
#[clap(long, default_value_t = 8000, env = "NYM_NODE_STATUS_API_HTTP_PORT")]
pub(crate) http_port: u16,
/// Nyxd address.
#[clap(long, env = "NYXD")]
pub(crate) nyxd_addr: Url,
/// Nym api client timeout.
#[clap(long, default_value = "15", env = "NYM_API_CLIENT_TIMEOUT")]
#[arg(value_parser = parse_duration)]
pub(crate) nym_api_client_timeout: Duration,
/// Explorer api client timeout.
#[clap(long, default_value = "15", env = "EXPLORER_CLIENT_TIMEOUT")]
#[arg(value_parser = parse_duration)]
pub(crate) explorer_client_timeout: Duration,
/// Connection url for the database.
#[clap(long, env = "DATABASE_URL")]
pub(crate) database_url: String,
#[clap(
long,
default_value = "600",
env = "NODE_STATUS_API_MONITOR_REFRESH_INTERVAL"
)]
#[arg(value_parser = parse_duration)]
pub(crate) monitor_refresh_interval: Duration,
#[clap(
long,
default_value = "600",
env = "NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL"
)]
#[arg(value_parser = parse_duration)]
pub(crate) testruns_refresh_interval: Duration,
}
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
let seconds = arg.parse()?;
Ok(std::time::Duration::from_secs(seconds))
}
+35
View File
@@ -0,0 +1,35 @@
use anyhow::{anyhow, Result};
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
use std::str::FromStr;
pub(crate) mod models;
pub(crate) mod queries;
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
pub(crate) type DbPool = SqlitePool;
pub(crate) struct Storage {
pool: DbPool,
}
impl Storage {
pub async fn init(connection_url: String) -> Result<Self> {
let connect_options = SqliteConnectOptions::from_str(&connection_url)?
.create_if_missing(true)
.disable_statement_logging();
let pool = sqlx::SqlitePool::connect_with(connect_options)
.await
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
MIGRATOR.run(&pool).await?;
Ok(Storage { pool })
}
/// Cloning pool is cheap, it's the same underlying set of connections
pub fn pool_owned(&self) -> DbPool {
self.pool.clone()
}
}
+334
View File
@@ -0,0 +1,334 @@
use crate::{
http::{self, models::SummaryHistory},
monitor::NumericalCheckedCast,
};
use nym_node_requests::api::v1::node::models::NodeDescription;
use serde::{Deserialize, Serialize};
use strum_macros::{EnumString, FromRepr};
use utoipa::ToSchema;
pub(crate) struct GatewayRecord {
pub(crate) identity_key: String,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) self_described: Option<String>,
pub(crate) explorer_pretty_bond: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) performance: u8,
}
#[derive(Debug, Clone)]
pub(crate) struct GatewayDto {
pub(crate) gateway_identity_key: String,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) performance: i64,
pub(crate) self_described: Option<String>,
pub(crate) explorer_pretty_bond: Option<String>,
pub(crate) last_probe_result: Option<String>,
pub(crate) last_probe_log: Option<String>,
pub(crate) last_testrun_utc: Option<i64>,
pub(crate) last_updated_utc: i64,
pub(crate) moniker: String,
pub(crate) security_contact: String,
pub(crate) details: String,
pub(crate) website: String,
}
impl TryFrom<GatewayDto> for http::models::Gateway {
type Error = anyhow::Error;
fn try_from(value: GatewayDto) -> Result<Self, Self::Error> {
// Instead of using routing_score_successes / routing_score_samples, we use the
// number of successful testruns in the last 24h.
let routing_score = 0f32;
let config_score = 0u32;
let last_updated_utc =
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
let last_testrun_utc = value
.last_testrun_utc
.and_then(|i| i.cast_checked().ok())
.map(|t| timestamp_as_utc(t).to_rfc3339());
let self_described = value.self_described.clone().unwrap_or("null".to_string());
let explorer_pretty_bond = value
.explorer_pretty_bond
.clone()
.unwrap_or("null".to_string());
let last_probe_result = value
.last_probe_result
.clone()
.unwrap_or("null".to_string());
let last_probe_log = value.last_probe_log.clone();
let self_described = serde_json::from_str(&self_described).unwrap_or(None);
let explorer_pretty_bond = serde_json::from_str(&explorer_pretty_bond).unwrap_or(None);
let last_probe_result = serde_json::from_str(&last_probe_result).unwrap_or(None);
let bonded = value.bonded;
let blacklisted = value.blacklisted;
let performance = value.performance as u8;
let description = NodeDescription {
moniker: value.moniker.clone(),
website: value.website.clone(),
security_contact: value.security_contact.clone(),
details: value.details.clone(),
};
Ok(http::models::Gateway {
gateway_identity_key: value.gateway_identity_key.clone(),
bonded,
blacklisted,
performance,
self_described,
explorer_pretty_bond,
description,
last_probe_result,
last_probe_log,
routing_score,
config_score,
last_testrun_utc,
last_updated_utc,
})
}
}
fn timestamp_as_utc(unix_timestamp: u64) -> chrono::DateTime<chrono::Utc> {
let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp);
d.into()
}
pub(crate) struct MixnodeRecord {
pub(crate) mix_id: u32,
pub(crate) identity_key: String,
pub(crate) bonded: bool,
pub(crate) total_stake: i64,
pub(crate) host: String,
pub(crate) http_port: u16,
pub(crate) blacklisted: bool,
pub(crate) full_details: String,
pub(crate) self_described: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) is_dp_delegatee: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct MixnodeDto {
pub(crate) mix_id: i64,
pub(crate) bonded: bool,
pub(crate) blacklisted: bool,
pub(crate) is_dp_delegatee: bool,
pub(crate) total_stake: i64,
pub(crate) full_details: String,
pub(crate) self_described: Option<String>,
pub(crate) last_updated_utc: i64,
pub(crate) moniker: String,
pub(crate) website: String,
pub(crate) security_contact: String,
pub(crate) details: String,
}
impl TryFrom<MixnodeDto> for http::models::Mixnode {
type Error = anyhow::Error;
fn try_from(value: MixnodeDto) -> Result<Self, Self::Error> {
let mix_id = value.mix_id.cast_checked()?;
let full_details = value.full_details.clone();
let full_details = serde_json::from_str(&full_details).unwrap_or(None);
let self_described = value
.self_described
.clone()
.map(|v| serde_json::from_str(&v).unwrap_or(serde_json::Value::Null));
let last_updated_utc =
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
let blacklisted = value.blacklisted;
let is_dp_delegatee = value.is_dp_delegatee;
let moniker = value.moniker.clone();
let website = value.website.clone();
let security_contact = value.security_contact.clone();
let details = value.details.clone();
Ok(http::models::Mixnode {
mix_id,
bonded: value.bonded,
blacklisted,
is_dp_delegatee,
total_stake: value.total_stake,
full_details,
description: NodeDescription {
moniker,
website,
security_contact,
details,
},
self_described,
last_updated_utc,
})
}
}
#[allow(unused)]
#[derive(Debug, Clone)]
pub(crate) struct BondedStatusDto {
pub(crate) id: i64,
pub(crate) identity_key: String,
pub(crate) bonded: bool,
}
#[allow(unused)]
#[derive(Debug, Clone, Default)]
pub(crate) struct SummaryDto {
pub(crate) key: String,
pub(crate) value_json: String,
pub(crate) last_updated_utc: i64,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SummaryHistoryDto {
#[allow(dead_code)]
pub id: i64,
pub date: String,
pub value_json: String,
pub timestamp_utc: i64,
}
impl TryFrom<SummaryHistoryDto> for SummaryHistory {
type Error = anyhow::Error;
fn try_from(value: SummaryHistoryDto) -> Result<Self, Self::Error> {
let value_json = serde_json::from_str(&value.value_json).unwrap_or_default();
Ok(SummaryHistory {
value_json,
date: value.date.clone(),
timestamp_utc: timestamp_as_utc(value.timestamp_utc.cast_checked()?).to_rfc3339(),
})
}
}
pub(crate) const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count";
pub(crate) const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active";
pub(crate) const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive";
pub(crate) const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve";
pub(crate) const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count";
pub(crate) const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count";
pub(crate) const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count";
pub(crate) const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count";
pub(crate) const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count";
pub(crate) const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count";
// `utoipa`` goes crazy if you use module-qualified prefix as field type so we
// have to import it
use gateway::GatewaySummary;
use mixnode::MixnodeSummary;
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct NetworkSummary {
pub(crate) mixnodes: MixnodeSummary,
pub(crate) gateways: GatewaySummary,
}
pub(crate) mod mixnode {
use super::*;
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummary {
pub(crate) bonded: MixnodeSummaryBonded,
pub(crate) blacklisted: MixnodeSummaryBlacklisted,
pub(crate) historical: MixnodeSummaryHistorical,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummaryBonded {
pub(crate) count: i32,
pub(crate) active: i32,
pub(crate) inactive: i32,
pub(crate) reserve: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummaryBlacklisted {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct MixnodeSummaryHistorical {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
}
pub(crate) mod gateway {
use super::*;
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummary {
pub(crate) bonded: GatewaySummaryBonded,
pub(crate) blacklisted: GatewaySummaryBlacklisted,
pub(crate) historical: GatewaySummaryHistorical,
pub(crate) explorer: GatewaySummaryExplorer,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryExplorer {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryBonded {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryHistorical {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct GatewaySummaryBlacklisted {
pub(crate) count: i32,
pub(crate) last_updated_utc: String,
}
}
#[derive(Debug, Clone)]
pub struct TestRunDto {
pub id: i64,
pub gateway_id: i64,
pub status: i64,
pub timestamp_utc: i64,
pub ip_address: String,
pub log: String,
}
#[derive(Debug, Clone, strum_macros::Display, EnumString, FromRepr, PartialEq)]
#[repr(u8)]
pub(crate) enum TestRunStatus {
Complete = 2,
InProgress = 1,
Pending = 0,
}
#[derive(Debug, Clone)]
pub struct GatewayIdentityDto {
pub gateway_identity_key: String,
pub bonded: bool,
}
#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros
#[derive(Debug, Clone)]
pub struct GatewayInfoDto {
pub id: i64,
pub gateway_identity_key: String,
pub self_described: Option<String>,
pub explorer_pretty_bond: Option<String>,
}
@@ -0,0 +1,160 @@
use crate::{
db::{
models::{BondedStatusDto, GatewayDto, GatewayRecord},
DbPool,
},
http::models::Gateway,
};
use futures_util::TryStreamExt;
use nym_validator_client::models::DescribedGateway;
use tracing::error;
pub(crate) async fn insert_gateways(
pool: &DbPool,
gateways: Vec<GatewayRecord>,
) -> anyhow::Result<()> {
let mut db = pool.acquire().await?;
for record in gateways {
sqlx::query!(
"INSERT INTO gateways
(gateway_identity_key, bonded, blacklisted,
self_described, explorer_pretty_bond,
last_updated_utc, performance)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(gateway_identity_key) DO UPDATE SET
bonded=excluded.bonded,
blacklisted=excluded.blacklisted,
self_described=excluded.self_described,
explorer_pretty_bond=excluded.explorer_pretty_bond,
last_updated_utc=excluded.last_updated_utc,
performance = excluded.performance;",
record.identity_key,
record.bonded,
record.blacklisted,
record.self_described,
record.explorer_pretty_bond,
record.last_updated_utc,
record.performance
)
.execute(&mut *db)
.await?;
}
Ok(())
}
pub(crate) async fn write_blacklisted_gateways_to_db<'a, I>(
pool: &DbPool,
gateways: I,
) -> anyhow::Result<()>
where
I: Iterator<Item = &'a String>,
{
let mut conn = pool.acquire().await?;
for gateway_identity_key in gateways {
sqlx::query!(
"UPDATE gateways
SET blacklisted = true
WHERE gateway_identity_key = ?;",
gateway_identity_key,
)
.execute(&mut *conn)
.await?;
}
Ok(())
}
/// Ensure all gateways that are set as bonded, are still bonded
pub(crate) async fn ensure_gateways_still_bonded(
pool: &DbPool,
gateways: &[DescribedGateway],
) -> anyhow::Result<usize> {
let bonded_gateways_rows = get_all_bonded_gateways_row_ids_by_status(pool, true).await?;
let unbonded_gateways_rows = bonded_gateways_rows.iter().filter(|v| {
!gateways
.iter()
.any(|bonded| *bonded.bond.identity() == v.identity_key)
});
let recently_unbonded_gateways = unbonded_gateways_rows.to_owned().count();
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let mut transaction = pool.begin().await?;
for row in unbonded_gateways_rows {
sqlx::query!(
"UPDATE gateways
SET bonded = ?, last_updated_utc = ?
WHERE id = ?;",
false,
last_updated_utc,
row.id,
)
.execute(&mut *transaction)
.await?;
}
transaction.commit().await?;
Ok(recently_unbonded_gateways)
}
async fn get_all_bonded_gateways_row_ids_by_status(
pool: &DbPool,
status: bool,
) -> anyhow::Result<Vec<BondedStatusDto>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
BondedStatusDto,
r#"SELECT
id as "id!",
gateway_identity_key as "identity_key!",
bonded as "bonded: bool"
FROM gateways
WHERE bonded = ?"#,
status,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
Ok(items)
}
pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gateway>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
GatewayDto,
r#"SELECT
gw.gateway_identity_key as "gateway_identity_key!",
gw.bonded as "bonded: bool",
gw.blacklisted as "blacklisted: bool",
gw.performance as "performance!",
gw.self_described as "self_described?",
gw.explorer_pretty_bond as "explorer_pretty_bond?",
gw.last_probe_result as "last_probe_result?",
gw.last_probe_log as "last_probe_log?",
gw.last_testrun_utc as "last_testrun_utc?",
gw.last_updated_utc as "last_updated_utc!",
COALESCE(gd.moniker, "NA") as "moniker!",
COALESCE(gd.website, "NA") as "website!",
COALESCE(gd.security_contact, "NA") as "security_contact!",
COALESCE(gd.details, "NA") as "details!"
FROM gateways gw
LEFT JOIN gateway_description gd
ON gw.gateway_identity_key = gd.gateway_identity_key
ORDER BY gw.gateway_identity_key"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
let items: Vec<Gateway> = items
.into_iter()
.map(|item| item.try_into())
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| {
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
e
})?;
tracing::trace!("Fetched {} gateways from DB", items.len());
Ok(items)
}
@@ -0,0 +1,88 @@
use crate::db::{models::NetworkSummary, DbPool};
use chrono::{DateTime, Utc};
/// take `last_updated` instead of calculating it so that `summary` matches
/// `daily_summary`
pub(crate) async fn insert_summaries(
pool: &DbPool,
summaries: &[(&str, &usize)],
summary: &NetworkSummary,
last_updated: DateTime<Utc>,
) -> anyhow::Result<()> {
insert_summary(pool, summaries, last_updated).await?;
insert_summary_history(pool, summary, last_updated).await?;
Ok(())
}
async fn insert_summary(
pool: &DbPool,
summaries: &[(&str, &usize)],
last_updated: DateTime<Utc>,
) -> anyhow::Result<()> {
let timestamp = last_updated.timestamp();
let mut tx = pool.begin().await?;
for (kind, value) in summaries {
let value = value.to_string();
sqlx::query!(
"INSERT INTO summary
(key, value_json, last_updated_utc)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value_json=excluded.value_json,
last_updated_utc=excluded.last_updated_utc;",
kind,
value,
timestamp
)
.execute(&mut *tx)
.await
.map_err(|err| {
tracing::error!("Failed to insert data for {kind}: {err}, aborting transaction",);
err
})?;
}
tx.commit().await?;
Ok(())
}
/// For `<date_N>`, `summary_history` is updated with fresh data on every
/// iteration.
///
/// After UTC midnight, summary is inserted for `<date_N+1>` and last entry for
/// `<date_N>` stays there forever.
///
/// This is not aggregate data, it's a set of latest data points
async fn insert_summary_history(
pool: &DbPool,
summary: &NetworkSummary,
last_updated: DateTime<Utc>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
let value_json = serde_json::to_string(&summary)?;
let timestamp = last_updated.timestamp();
let now_rfc3339 = last_updated.to_rfc3339();
// YYYY-MM-DD, without time
let date = &now_rfc3339[..10];
sqlx::query!(
"INSERT INTO summary_history
(date, timestamp_utc, value_json)
VALUES (?, ?, ?)
ON CONFLICT(date) DO UPDATE SET
timestamp_utc=excluded.timestamp_utc,
value_json=excluded.value_json;",
date,
timestamp,
value_json
)
.execute(&mut *conn)
.await?;
Ok(())
}
@@ -0,0 +1,177 @@
use futures_util::TryStreamExt;
use nym_validator_client::models::MixNodeBondAnnotated;
use tracing::error;
use crate::{
db::{
models::{BondedStatusDto, MixnodeDto, MixnodeRecord},
DbPool,
},
http::models::{DailyStats, Mixnode},
};
pub(crate) async fn insert_mixnodes(
pool: &DbPool,
mixnodes: Vec<MixnodeRecord>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
for record in mixnodes.iter() {
// https://www.sqlite.org/lang_upsert.html
sqlx::query!(
"INSERT INTO mixnodes
(mix_id, identity_key, bonded, total_stake,
host, http_api_port, blacklisted, full_details,
self_described, last_updated_utc, is_dp_delegatee)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(mix_id) DO UPDATE SET
bonded=excluded.bonded,
total_stake=excluded.total_stake, host=excluded.host,
http_api_port=excluded.http_api_port,blacklisted=excluded.blacklisted,
full_details=excluded.full_details,self_described=excluded.self_described,
last_updated_utc=excluded.last_updated_utc,
is_dp_delegatee = excluded.is_dp_delegatee;",
record.mix_id,
record.identity_key,
record.bonded,
record.total_stake,
record.host,
record.http_port,
record.blacklisted,
record.full_details,
record.self_described,
record.last_updated_utc,
record.is_dp_delegatee
)
.execute(&mut *conn)
.await?;
}
Ok(())
}
pub(crate) async fn get_all_mixnodes(pool: &DbPool) -> anyhow::Result<Vec<Mixnode>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
MixnodeDto,
r#"SELECT
mn.mix_id as "mix_id!",
mn.bonded as "bonded: bool",
mn.blacklisted as "blacklisted: bool",
mn.is_dp_delegatee as "is_dp_delegatee: bool",
mn.total_stake as "total_stake!",
mn.full_details as "full_details!",
mn.self_described as "self_described",
mn.last_updated_utc as "last_updated_utc!",
COALESCE(md.moniker, "NA") as "moniker!",
COALESCE(md.website, "NA") as "website!",
COALESCE(md.security_contact, "NA") as "security_contact!",
COALESCE(md.details, "NA") as "details!"
FROM mixnodes mn
LEFT JOIN mixnode_description md ON mn.mix_id = md.mix_id
ORDER BY mn.mix_id"#
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
let items = items
.into_iter()
.map(|item| item.try_into())
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| {
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
e
})?;
Ok(items)
}
/// We fetch the latest 30 days of data as a subquery and then
/// return it in ascending order, so we don't break existing UI
pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailyStats>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
DailyStats,
r#"
SELECT
date_utc as "date_utc!",
packets_received as "total_packets_received!: i64",
packets_sent as "total_packets_sent!: i64",
packets_dropped as "total_packets_dropped!: i64",
total_stake as "total_stake!: i64"
FROM (
SELECT
date_utc,
SUM(packets_received) as packets_received,
SUM(packets_sent) as packets_sent,
SUM(packets_dropped) as packets_dropped,
SUM(total_stake) as total_stake
FROM mixnode_daily_stats
GROUP BY date_utc
ORDER BY date_utc DESC
LIMIT 30
)
GROUP BY date_utc
ORDER BY date_utc
"#
)
.fetch(&mut *conn)
.try_collect::<Vec<DailyStats>>()
.await?;
Ok(items)
}
/// Ensure all mixnodes that are set as bonded, are still bonded
pub(crate) async fn ensure_mixnodes_still_bonded(
pool: &DbPool,
mixnodes: &[MixNodeBondAnnotated],
) -> anyhow::Result<usize> {
let bonded_mixnodes_rows = get_all_bonded_mixnodes_row_ids_by_status(pool, true).await?;
let unbonded_mixnodes_rows = bonded_mixnodes_rows.iter().filter(|v| {
!mixnodes
.iter()
.any(|bonded| *bonded.mixnode_details.bond_information.identity() == v.identity_key)
});
let recently_unbonded_mixnodes = unbonded_mixnodes_rows.to_owned().count();
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let mut transaction = pool.begin().await?;
for row in unbonded_mixnodes_rows {
sqlx::query!(
"UPDATE mixnodes
SET bonded = ?, last_updated_utc = ?
WHERE id = ?;",
false,
last_updated_utc,
row.id,
)
.execute(&mut *transaction)
.await?;
}
transaction.commit().await?;
Ok(recently_unbonded_mixnodes)
}
async fn get_all_bonded_mixnodes_row_ids_by_status(
pool: &DbPool,
status: bool,
) -> anyhow::Result<Vec<BondedStatusDto>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
BondedStatusDto,
r#"SELECT
id as "id!",
identity_key as "identity_key!",
bonded as "bonded: bool"
FROM mixnodes
WHERE bonded = ?"#,
status,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
Ok(items)
}
+15
View File
@@ -0,0 +1,15 @@
mod gateways;
mod misc;
mod mixnodes;
mod summary;
pub(crate) mod testruns;
pub(crate) use gateways::{
ensure_gateways_still_bonded, get_all_gateways, insert_gateways,
write_blacklisted_gateways_to_db,
};
pub(crate) use misc::insert_summaries;
pub(crate) use mixnodes::{
ensure_mixnodes_still_bonded, get_all_mixnodes, get_daily_stats, insert_mixnodes,
};
pub(crate) use summary::{get_summary, get_summary_history};
@@ -0,0 +1,209 @@
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use std::collections::HashMap;
use tracing::error;
use crate::{
db::{
models::{
gateway::{
GatewaySummary, GatewaySummaryBlacklisted, GatewaySummaryBonded,
GatewaySummaryExplorer, GatewaySummaryHistorical,
},
mixnode::{
MixnodeSummary, MixnodeSummaryBlacklisted, MixnodeSummaryBonded,
MixnodeSummaryHistorical,
},
NetworkSummary, SummaryDto, SummaryHistoryDto,
},
DbPool,
},
http::{
error::{HttpError, HttpResult},
models::SummaryHistory,
},
};
pub(crate) async fn get_summary_history(pool: &DbPool) -> anyhow::Result<Vec<SummaryHistory>> {
let mut conn = pool.acquire().await?;
let items = sqlx::query_as!(
SummaryHistoryDto,
r#"SELECT
id as "id!",
date as "date!",
timestamp_utc as "timestamp_utc!",
value_json as "value_json!"
FROM summary_history
ORDER BY date DESC
LIMIT 30"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
let items = items
.into_iter()
.map(|item| item.try_into())
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| {
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
e
})?;
Ok(items)
}
async fn get_summary_dto(pool: &DbPool) -> anyhow::Result<Vec<SummaryDto>> {
let mut conn = pool.acquire().await?;
Ok(sqlx::query_as!(
SummaryDto,
r#"SELECT
key as "key!",
value_json as "value_json!",
last_updated_utc as "last_updated_utc!"
FROM summary"#
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?)
}
pub(crate) async fn get_summary(pool: &DbPool) -> HttpResult<NetworkSummary> {
let items = get_summary_dto(pool).await.map_err(|err| {
tracing::error!("Couldn't get Summary from DB: {err}");
HttpError::internal()
})?;
from_summary_dto(items).await
}
async fn from_summary_dto(items: Vec<SummaryDto>) -> HttpResult<NetworkSummary> {
const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count";
const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active";
const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive";
const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve";
const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count";
const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count";
const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count";
const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count";
const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count";
const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count";
// convert database rows into a map by key
let mut map = HashMap::new();
for item in items {
map.insert(item.key.clone(), item);
}
// check we have all the keys we are expecting, and build up a map of errors for missing one
let keys = [
GATEWAYS_BONDED_COUNT,
GATEWAYS_EXPLORER_COUNT,
GATEWAYS_HISTORICAL_COUNT,
GATEWAYS_BLACKLISTED_COUNT,
MIXNODES_BLACKLISTED_COUNT,
MIXNODES_BONDED_ACTIVE,
MIXNODES_BONDED_COUNT,
MIXNODES_BONDED_INACTIVE,
MIXNODES_BONDED_RESERVE,
MIXNODES_HISTORICAL_COUNT,
];
let mut errors: Vec<&str> = vec![];
for key in keys {
if !map.contains_key(key) {
errors.push(key);
}
}
// return an error if anything is missing, with a nice list
if !errors.is_empty() {
tracing::error!("Summary value missing: {}", errors.join(", "));
return Err(HttpError::internal());
}
// strip the options and use default values (anything missing is trapped above)
let mixnodes_bonded_count: SummaryDto =
map.get(MIXNODES_BONDED_COUNT).cloned().unwrap_or_default();
let mixnodes_bonded_active: SummaryDto =
map.get(MIXNODES_BONDED_ACTIVE).cloned().unwrap_or_default();
let mixnodes_bonded_inactive: SummaryDto = map
.get(MIXNODES_BONDED_INACTIVE)
.cloned()
.unwrap_or_default();
let mixnodes_bonded_reserve: SummaryDto = map
.get(MIXNODES_BONDED_RESERVE)
.cloned()
.unwrap_or_default();
let mixnodes_blacklisted_count: SummaryDto = map
.get(MIXNODES_BLACKLISTED_COUNT)
.cloned()
.unwrap_or_default();
let gateways_bonded_count: SummaryDto =
map.get(GATEWAYS_BONDED_COUNT).cloned().unwrap_or_default();
let gateways_explorer_count: SummaryDto = map
.get(GATEWAYS_EXPLORER_COUNT)
.cloned()
.unwrap_or_default();
let mixnodes_historical_count: SummaryDto = map
.get(MIXNODES_HISTORICAL_COUNT)
.cloned()
.unwrap_or_default();
let gateways_historical_count: SummaryDto = map
.get(GATEWAYS_HISTORICAL_COUNT)
.cloned()
.unwrap_or_default();
let gateways_blacklisted_count: SummaryDto = map
.get(GATEWAYS_BLACKLISTED_COUNT)
.cloned()
.unwrap_or_default();
Ok(NetworkSummary {
mixnodes: MixnodeSummary {
bonded: MixnodeSummaryBonded {
count: to_count_i32(&mixnodes_bonded_count),
active: to_count_i32(&mixnodes_bonded_active),
reserve: to_count_i32(&mixnodes_bonded_reserve),
inactive: to_count_i32(&mixnodes_bonded_inactive),
last_updated_utc: to_timestamp(&mixnodes_bonded_count),
},
blacklisted: MixnodeSummaryBlacklisted {
count: to_count_i32(&mixnodes_blacklisted_count),
last_updated_utc: to_timestamp(&mixnodes_blacklisted_count),
},
historical: MixnodeSummaryHistorical {
count: to_count_i32(&mixnodes_historical_count),
last_updated_utc: to_timestamp(&mixnodes_historical_count),
},
},
gateways: GatewaySummary {
bonded: GatewaySummaryBonded {
count: to_count_i32(&gateways_bonded_count),
last_updated_utc: to_timestamp(&gateways_bonded_count),
},
blacklisted: GatewaySummaryBlacklisted {
count: to_count_i32(&gateways_blacklisted_count),
last_updated_utc: to_timestamp(&gateways_blacklisted_count),
},
historical: GatewaySummaryHistorical {
count: to_count_i32(&gateways_historical_count),
last_updated_utc: to_timestamp(&gateways_historical_count),
},
explorer: GatewaySummaryExplorer {
count: to_count_i32(&gateways_explorer_count),
last_updated_utc: to_timestamp(&gateways_explorer_count),
},
},
})
}
fn to_count_i32(value: &SummaryDto) -> i32 {
value.value_json.parse::<i32>().unwrap_or_default()
}
fn to_timestamp(value: &SummaryDto) -> String {
timestamp_as_utc(value.last_updated_utc as u64).to_rfc3339()
}
fn timestamp_as_utc(unix_timestamp: u64) -> DateTime<Utc> {
let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp);
d.into()
}
@@ -0,0 +1,126 @@
use crate::http::models::TestrunAssignment;
use crate::{
db::models::{TestRunDto, TestRunStatus},
testruns::now_utc,
};
use anyhow::Context;
use sqlx::{pool::PoolConnection, Sqlite};
pub(crate) async fn get_testrun_by_id(
conn: &mut PoolConnection<Sqlite>,
testrun_id: i64,
) -> anyhow::Result<TestRunDto> {
sqlx::query_as!(
TestRunDto,
r#"SELECT
id as "id!",
gateway_id as "gateway_id!",
status as "status!",
timestamp_utc as "timestamp_utc!",
ip_address as "ip_address!",
log as "log!"
FROM testruns
WHERE id = ?
ORDER BY timestamp_utc"#,
testrun_id
)
.fetch_one(conn.as_mut())
.await
.context(format!("Couldn't retrieve testrun {testrun_id}"))
}
pub(crate) async fn get_oldest_testrun_and_make_it_pending(
// TODO dz accept mut reference, repeat in all similar functions
conn: PoolConnection<Sqlite>,
) -> anyhow::Result<Option<TestrunAssignment>> {
let mut conn = conn;
let assignment = sqlx::query_as!(
TestrunAssignment,
r#"UPDATE testruns
SET status = ?
WHERE rowid =
(
SELECT rowid
FROM testruns
WHERE status = ?
ORDER BY timestamp_utc asc
LIMIT 1
)
RETURNING
id as "testrun_id!",
gateway_id as "gateway_pk_id!"
"#,
TestRunStatus::InProgress as i64,
TestRunStatus::Pending as i64,
)
.fetch_optional(&mut *conn)
.await?;
Ok(assignment)
}
pub(crate) async fn update_testrun_status(
conn: &mut PoolConnection<Sqlite>,
testrun_id: i64,
status: TestRunStatus,
) -> anyhow::Result<()> {
let status = status as u32;
sqlx::query!(
"UPDATE testruns SET status = ? WHERE id = ?",
status,
testrun_id
)
.execute(conn.as_mut())
.await?;
Ok(())
}
pub(crate) async fn update_gateway_last_probe_log(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
log: &str,
) -> anyhow::Result<()> {
sqlx::query!(
"UPDATE gateways SET last_probe_log = ? WHERE id = ?",
log,
gateway_pk
)
.execute(conn.as_mut())
.await
.map(drop)
.map_err(From::from)
}
pub(crate) async fn update_gateway_last_probe_result(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
result: &str,
) -> anyhow::Result<()> {
sqlx::query!(
"UPDATE gateways SET last_probe_result = ? WHERE id = ?",
result,
gateway_pk
)
.execute(conn.as_mut())
.await
.map(drop)
.map_err(From::from)
}
pub(crate) async fn update_gateway_score(
conn: &mut PoolConnection<Sqlite>,
gateway_pk: i64,
) -> anyhow::Result<()> {
let now = now_utc().timestamp();
sqlx::query!(
"UPDATE gateways SET last_testrun_utc = ?, last_updated_utc = ? WHERE id = ?",
now,
now,
gateway_pk
)
.execute(conn.as_mut())
.await
.map(drop)
.map_err(From::from)
}
@@ -0,0 +1,110 @@
use axum::{
extract::{Path, Query, State},
Json, Router,
};
use serde::Deserialize;
use utoipa::IntoParams;
use crate::http::{
error::{HttpError, HttpResult},
models::{Gateway, GatewaySkinny},
state::AppState,
PagedResult, Pagination,
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(gateways))
.route("/skinny", axum::routing::get(gateways_skinny))
.route("/:identity_key", axum::routing::get(get_gateway))
}
#[utoipa::path(
tag = "Gateways",
get,
params(
Pagination
),
path = "/v2/gateways",
responses(
(status = 200, body = PagedGateway)
)
)]
async fn gateways(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<Gateway>>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
Ok(Json(PagedResult::paginate(pagination, res)))
}
#[utoipa::path(
tag = "Gateways",
get,
params(
Pagination
),
path = "/v2/gateways/skinny",
responses(
(status = 200, body = PagedGatewaySkinny)
)
)]
async fn gateways_skinny(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<GatewaySkinny>>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
let res: Vec<GatewaySkinny> = res
.iter()
.filter(|g| g.bonded)
.map(|g| GatewaySkinny {
gateway_identity_key: g.gateway_identity_key.clone(),
self_described: g.self_described.clone(),
performance: g.performance,
explorer_pretty_bond: g.explorer_pretty_bond.clone(),
last_probe_result: g.last_probe_result.clone(),
last_testrun_utc: g.last_testrun_utc.clone(),
last_updated_utc: g.last_updated_utc.clone(),
routing_score: g.routing_score,
config_score: g.config_score,
})
.collect();
Ok(Json(PagedResult::paginate(pagination, res)))
}
#[derive(Deserialize, IntoParams)]
#[into_params(parameter_in = Path)]
struct IdentityKeyParam {
identity_key: String,
}
#[utoipa::path(
tag = "Gateways",
get,
params(
IdentityKeyParam
),
path = "/v2/gateways/{identity_key}",
responses(
(status = 200, body = Gateway)
)
)]
async fn get_gateway(
Path(IdentityKeyParam { identity_key }): Path<IdentityKeyParam>,
State(state): State<AppState>,
) -> HttpResult<Json<Gateway>> {
let db = state.db_pool();
let res = state.cache().get_gateway_list(db).await;
match res
.iter()
.find(|item| item.gateway_identity_key == identity_key)
{
Some(res) => Ok(Json(res.clone())),
None => Err(HttpError::invalid_input(identity_key)),
}
}
@@ -0,0 +1,91 @@
use axum::{
extract::{Path, Query, State},
Json, Router,
};
use serde::Deserialize;
use tracing::instrument;
use utoipa::IntoParams;
use crate::http::{
error::{HttpError, HttpResult},
models::{DailyStats, Mixnode},
state::AppState,
PagedResult, Pagination,
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(mixnodes))
.route("/:mix_id", axum::routing::get(get_mixnodes))
.route("/stats", axum::routing::get(get_stats))
}
#[utoipa::path(
tag = "Mixnodes",
get,
params(
Pagination
),
path = "/v2/mixnodes",
responses(
(status = 200, body = PagedMixnode)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(page=pagination.page, size=pagination.size))]
async fn mixnodes(
Query(pagination): Query<Pagination>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<Mixnode>>> {
let db = state.db_pool();
let res = state.cache().get_mixnodes_list(db).await;
Ok(Json(PagedResult::paginate(pagination, res)))
}
#[derive(Deserialize, IntoParams)]
#[into_params(parameter_in = Path)]
struct MixIdParam {
mix_id: String,
}
#[utoipa::path(
tag = "Mixnodes",
get,
params(
MixIdParam
),
path = "/v2/mixnodes/{mix_id}",
responses(
(status = 200, body = Mixnode)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(mix_id = mix_id))]
async fn get_mixnodes(
Path(MixIdParam { mix_id }): Path<MixIdParam>,
State(state): State<AppState>,
) -> HttpResult<Json<Mixnode>> {
match mix_id.parse::<u32>() {
Ok(parsed_mix_id) => {
let res = state.cache().get_mixnodes_list(state.db_pool()).await;
match res.iter().find(|item| item.mix_id == parsed_mix_id) {
Some(res) => Ok(Json(res.clone())),
None => Err(HttpError::invalid_input(mix_id)),
}
}
Err(_e) => Err(HttpError::invalid_input(mix_id)),
}
}
#[utoipa::path(
tag = "Mixnodes",
get,
path = "/v2/mixnodes/stats",
responses(
(status = 200, body = Vec<DailyStats>)
)
)]
async fn get_stats(State(state): State<AppState>) -> HttpResult<Json<Vec<DailyStats>>> {
let stats = state.cache().get_mixnode_stats(state.db_pool()).await;
Ok(Json(stats))
}
+88
View File
@@ -0,0 +1,88 @@
use anyhow::anyhow;
use axum::{response::Redirect, Router};
use tokio::net::ToSocketAddrs;
use tower_http::{cors::CorsLayer, trace::TraceLayer};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::http::{server::HttpServer, state::AppState};
pub(crate) mod gateways;
pub(crate) mod mixnodes;
pub(crate) mod services;
pub(crate) mod summary;
pub(crate) mod testruns;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
}
impl RouterBuilder {
pub(crate) fn with_default_routes() -> Self {
let router = Router::new()
.merge(
SwaggerUi::new("/swagger")
.url("/api-docs/openapi.json", super::api_docs::ApiDoc::openapi()),
)
.route(
"/",
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
)
.nest(
"/v2",
Router::new()
.nest("/gateways", gateways::routes())
.nest("/mixnodes", mixnodes::routes())
.nest("/services", services::routes())
.nest("/summary", summary::routes()),
)
.nest(
"/internal",
Router::new().nest("/testruns", testruns::routes()),
);
Self {
unfinished_router: router,
}
}
pub(crate) fn with_state(self, state: AppState) -> RouterWithState {
RouterWithState {
router: self.finalize_routes().with_state(state),
}
}
fn finalize_routes(self) -> Router<AppState> {
// layers added later wrap earlier layers
self.unfinished_router
// CORS layer needs to wrap other API layers
.layer(setup_cors())
// logger should be outermost layer
.layer(TraceLayer::new_for_http())
}
}
pub(crate) struct RouterWithState {
router: Router,
}
impl RouterWithState {
pub(crate) async fn build_server<A: ToSocketAddrs>(
self,
bind_address: A,
) -> anyhow::Result<HttpServer> {
tokio::net::TcpListener::bind(bind_address)
.await
.map(|listener| HttpServer::new(self.router, listener))
.map_err(|err| anyhow!("Couldn't bind to address due to {}", err))
}
}
fn setup_cors() -> CorsLayer {
use axum::http::Method;
CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods([Method::POST, Method::GET, Method::PATCH, Method::OPTIONS])
.allow_headers(tower_http::cors::Any)
.allow_credentials(false)
}
@@ -0,0 +1,58 @@
use serde_json_path::JsonPath;
use crate::http::models::Gateway;
pub(super) struct ParseJsonPaths {
pub(super) path_ip_address: JsonPath,
pub(super) path_hostname: JsonPath,
pub(super) path_service_provider_client_id: JsonPath,
}
impl ParseJsonPaths {
pub fn new() -> Result<Self, serde_json_path::ParseError> {
Ok(ParseJsonPaths {
path_ip_address: JsonPath::parse("$.host_information.ip_address[0]")?,
path_hostname: JsonPath::parse("$.host_information.hostname")?,
path_service_provider_client_id: JsonPath::parse("$.network_requester.address")?,
})
}
}
pub(super) struct ParsedDetails {
pub(super) ip_address: Option<String>,
pub(super) hostname: Option<String>,
pub(super) service_provider_client_id: Option<String>,
}
impl ParsedDetails {
fn get_string_from_json_path(
value: &Option<serde_json::Value>,
path: &JsonPath,
) -> Option<String> {
match value {
Some(value) => path
.query(value)
.exactly_one()
.map(|v2| v2.as_str().map(|v3| v3.to_string()))
.ok()
.flatten(),
None => None,
}
}
pub fn new(paths: &ParseJsonPaths, g: &Gateway) -> ParsedDetails {
ParsedDetails {
hostname: ParsedDetails::get_string_from_json_path(
&g.self_described,
&paths.path_hostname,
),
ip_address: ParsedDetails::get_string_from_json_path(
&g.self_described,
&paths.path_ip_address,
),
service_provider_client_id: ParsedDetails::get_string_from_json_path(
&g.self_described,
&paths.path_service_provider_client_id,
),
}
}
}
@@ -0,0 +1,134 @@
use crate::http::{
error::{HttpError, HttpResult},
models::Service,
state::AppState,
PagedResult, Pagination,
};
use axum::{
extract::{Query, State},
Json, Router,
};
use json_path::{ParseJsonPaths, ParsedDetails};
use tracing::instrument;
mod json_path;
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/", axum::routing::get(mixnodes))
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::IntoParams)]
#[into_params(parameter_in = Query)]
pub(crate) struct ServicesQueryParams {
size: Option<usize>,
page: Option<usize>,
wss: Option<bool>,
hostname: Option<bool>,
entry: Option<bool>,
}
#[utoipa::path(
tag = "Services",
get,
params(
ServicesQueryParams,
),
path = "/v2/services",
responses(
(status = 200, body = PagedService)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip(state))]
async fn mixnodes(
Query(ServicesQueryParams {
size,
page,
wss,
hostname,
entry,
}): Query<ServicesQueryParams>,
State(state): State<AppState>,
) -> HttpResult<Json<PagedResult<Service>>> {
let db = state.db_pool();
let cache = state.cache();
let show_only_wss = wss.unwrap_or(false);
let show_only_with_hostname = hostname.unwrap_or(false);
let show_entry_gateways_only = entry.unwrap_or(false);
let paths = ParseJsonPaths::new().map_err(|e| {
tracing::error!("Invalidly configured ParseJsonPaths: {e}");
HttpError::internal()
})?;
let res = cache.get_gateway_list(db).await;
let res: Vec<Service> = res
.iter()
.map(|g| {
let details = ParsedDetails::new(&paths, g);
let s = Service {
gateway_identity_key: g.gateway_identity_key.clone(),
ip_address: details.ip_address,
service_provider_client_id: details.service_provider_client_id,
hostname: details.hostname,
last_successful_ping_utc: g.last_testrun_utc.clone(),
last_updated_utc: g.last_updated_utc.clone(),
// routing_score: g.routing_score,
routing_score: 1f32,
mixnet_websockets: g
.self_described
.clone()
.and_then(|s| s.get("mixnet_websockets").cloned()),
};
let f = ServiceFilter::new(&s);
(s, f)
})
.filter(|(_, f)| {
let mut keep = f.has_network_requester_sp;
if show_entry_gateways_only {
keep = true;
}
if show_only_wss {
keep &= f.has_wss;
}
if show_only_with_hostname {
keep &= f.has_hostname;
}
keep
})
.map(|(s, _)| s)
.collect();
Ok(Json(PagedResult::paginate(Pagination { size, page }, res)))
}
struct ServiceFilter {
has_wss: bool,
has_network_requester_sp: bool,
has_hostname: bool,
}
impl ServiceFilter {
fn new(s: &Service) -> Self {
let has_wss = match &s.mixnet_websockets {
Some(v) => v.get("wss_port").map(|v2| !v2.is_null()).unwrap_or(false),
None => false,
};
let has_hostname = s.hostname.is_some();
let has_network_requester_sp = match &s.service_provider_client_id {
Some(v) => !v.is_empty(),
None => false,
};
ServiceFilter {
has_wss,
has_hostname,
has_network_requester_sp,
}
}
}
@@ -0,0 +1,43 @@
use axum::{extract::State, Json, Router};
use tracing::instrument;
use crate::{
db::models::NetworkSummary,
http::{error::HttpResult, models::SummaryHistory, state::AppState},
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(summary))
.route("/history", axum::routing::get(summary_history))
}
#[utoipa::path(
tag = "Summary",
get,
path = "/v2/summary",
responses(
(status = 200, body = NetworkSummary)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn summary(State(state): State<AppState>) -> HttpResult<Json<NetworkSummary>> {
crate::db::queries::get_summary(state.db_pool())
.await
.map(Json)
}
#[utoipa::path(
tag = "Summary",
get,
path = "/v2/summary/history",
responses(
(status = 200, body = Vec<SummaryHistory>)
)
)]
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn summary_history(State(state): State<AppState>) -> HttpResult<Json<Vec<SummaryHistory>>> {
Ok(Json(
state.cache().get_summary_history(state.db_pool()).await,
))
}
@@ -0,0 +1,116 @@
use axum::Json;
use axum::{
extract::{Path, State},
Router,
};
use reqwest::StatusCode;
use crate::db::models::TestRunStatus;
use crate::db::queries;
use crate::{
db,
http::{
error::{HttpError, HttpResult},
models::TestrunAssignment,
state::AppState,
},
};
// TODO dz consider adding endpoint to trigger testrun scan for a given gateway_id
// like in H< src/http/testruns.rs
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(request_testrun))
.route("/:testrun_id", axum::routing::post(submit_testrun))
}
#[tracing::instrument(level = "debug", skip_all)]
async fn request_testrun(State(state): State<AppState>) -> HttpResult<Json<TestrunAssignment>> {
// TODO dz log agent's key
// TODO dz log agent's network probe version
tracing::debug!("Agent X requested testrun");
let db = state.db_pool();
let conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;
return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(conn).await {
Ok(res) => {
if let Some(testrun) = res {
// TODO dz consider adding a column to testruns table with agent's public key
tracing::debug!(
"🏃‍ Assigned testrun row_id {} to agent X",
&testrun.testrun_id
);
Ok(Json(testrun))
} else {
Err(HttpError::not_found("No testruns available"))
}
}
Err(err) => Err(HttpError::internal_with_logging(err)),
};
}
// TODO dz accept testrun_id as query parameter
#[tracing::instrument(level = "debug", skip_all)]
async fn submit_testrun(
Path(testrun_id): Path<i64>,
State(state): State<AppState>,
body: String,
) -> HttpResult<StatusCode> {
tracing::debug!(
"Agent submitted testrun {}. Total length: {}",
testrun_id,
body.len(),
);
// TODO dz store testrun results
let db = state.db_pool();
let mut conn = db
.acquire()
.await
.map_err(HttpError::internal_with_logging)?;
let testrun = queries::testruns::get_testrun_by_id(&mut conn, testrun_id)
.await
.map_err(|e| {
tracing::error!("{e}");
HttpError::not_found(testrun_id)
})?;
// TODO dz this should be part of a single transaction: commit after everything is done
queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::update_gateway_last_probe_log(&mut conn, testrun.gateway_id, &body)
.await
.map_err(HttpError::internal_with_logging)?;
let result = get_result_from_log(&body);
queries::testruns::update_gateway_last_probe_result(&mut conn, testrun.gateway_id, &result)
.await
.map_err(HttpError::internal_with_logging)?;
queries::testruns::update_gateway_score(&mut conn, testrun.gateway_id)
.await
.map_err(HttpError::internal_with_logging)?;
// TODO dz log gw identity key
tracing::info!(
"✅ Testrun row_id {} for gateway {} complete",
testrun.id,
testrun.gateway_id
);
Ok(StatusCode::CREATED)
}
fn get_result_from_log(log: &str) -> String {
let re = regex::Regex::new(r"\n\{\s").unwrap();
let result: Vec<_> = re.splitn(log, 2).collect();
if result.len() == 2 {
let res = format!("{} {}", "{", result[1]).to_string();
return res;
}
"".to_string()
}
+15
View File
@@ -0,0 +1,15 @@
use crate::http::{Gateway, GatewaySkinny, Mixnode, Service};
use utoipa::OpenApi;
use utoipauto::utoipauto;
// manually import external structs which are behind feature flags because they
// can't be automatically discovered
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
#[utoipauto(paths = "./nym-node-status-api/src")]
#[derive(OpenApi)]
#[openapi(
info(title = "Node Status API"),
tags(),
components(schemas(nym_node_requests::api::v1::node::models::NodeDescription,))
)]
pub(super) struct ApiDoc;
+42
View File
@@ -0,0 +1,42 @@
use std::fmt::Display;
pub(crate) type HttpResult<T> = Result<T, HttpError>;
pub(crate) struct HttpError {
message: String,
status: axum::http::StatusCode,
}
impl HttpError {
pub(crate) fn invalid_input(message: String) -> Self {
Self {
message,
status: axum::http::StatusCode::BAD_REQUEST,
}
}
pub(crate) fn internal_with_logging(msg: impl Display) -> Self {
tracing::error!("{}", msg.to_string());
Self::internal()
}
pub(crate) fn internal() -> Self {
Self {
message: serde_json::json!({"message": "Internal server error"}).to_string(),
status: axum::http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
pub(crate) fn not_found(msg: impl Display) -> Self {
Self {
message: serde_json::json!({"message": msg.to_string()}).to_string(),
status: axum::http::StatusCode::NOT_FOUND,
}
}
}
impl axum::response::IntoResponse for HttpError {
fn into_response(self) -> axum::response::Response {
(self.status, self.message).into_response()
}
}
+71
View File
@@ -0,0 +1,71 @@
use models::{Gateway, GatewaySkinny, Mixnode, Service};
pub(crate) mod api;
pub(crate) mod api_docs;
pub(crate) mod error;
pub(crate) mod models;
pub(crate) mod server;
pub(crate) mod state;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
// exclude generic from auto-discovery
#[utoipauto::utoipa_ignore]
// https://docs.rs/utoipa/latest/utoipa/derive.ToSchema.html#generic-schemas-with-aliases
// Generic structs can only be included via aliases, not directly, because they
// it would cause an error in generated Swagger docs.
// Instead, you have to manually monomorphize each generic struct that
// you wish to document
#[aliases(
PagedGateway = PagedResult<Gateway>,
PagedGatewaySkinny = PagedResult<GatewaySkinny>,
PagedMixnode = PagedResult<Mixnode>,
PagedService = PagedResult<Service>,
)]
pub struct PagedResult<T> {
pub page: usize,
pub size: usize,
pub total: usize,
pub items: Vec<T>,
}
impl<T: Clone> PagedResult<T> {
pub fn paginate(pagination: Pagination, res: Vec<T>) -> Self {
let total = res.len();
let (size, mut page) = pagination.intoto_inner_values();
if page * size > total {
page = total / size;
}
let chunks: Vec<&[T]> = res.chunks(size).collect();
PagedResult {
page,
size,
total,
items: chunks.get(page).cloned().unwrap_or_default().into(),
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::IntoParams)]
#[into_params(parameter_in = Query)]
pub(crate) struct Pagination {
size: Option<usize>,
page: Option<usize>,
}
impl Pagination {
// unwrap stored values or use predefined defaults
pub(crate) fn intoto_inner_values(self) -> (usize, usize) {
const SIZE_DEFAULT: usize = 10;
const SIZE_MAX: usize = 200;
const PAGE_DEFAULT: usize = 0;
(
self.size.unwrap_or(SIZE_DEFAULT).min(SIZE_MAX),
self.page.unwrap_or(PAGE_DEFAULT),
)
}
}
+86
View File
@@ -0,0 +1,86 @@
use crate::db::models::TestRunDto;
use nym_node_requests::api::v1::node::models::NodeDescription;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
pub(crate) use nym_common_models::ns_api::TestrunAssignment;
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct Gateway {
pub gateway_identity_key: String,
pub bonded: bool,
pub blacklisted: bool,
pub performance: u8,
pub self_described: Option<serde_json::Value>,
pub explorer_pretty_bond: Option<serde_json::Value>,
pub description: NodeDescription,
pub last_probe_result: Option<serde_json::Value>,
pub last_probe_log: Option<String>,
pub last_testrun_utc: Option<String>,
pub last_updated_utc: String,
pub routing_score: f32,
pub config_score: u32,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct GatewaySkinny {
pub gateway_identity_key: String,
pub self_described: Option<serde_json::Value>,
pub explorer_pretty_bond: Option<serde_json::Value>,
pub last_probe_result: Option<serde_json::Value>,
pub last_testrun_utc: Option<String>,
pub last_updated_utc: String,
pub routing_score: f32,
pub config_score: u32,
pub performance: u8,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct Mixnode {
pub mix_id: u32,
pub bonded: bool,
pub blacklisted: bool,
pub is_dp_delegatee: bool,
pub total_stake: i64,
pub full_details: Option<serde_json::Value>,
pub self_described: Option<serde_json::Value>,
pub description: NodeDescription,
pub last_updated_utc: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct DailyStats {
pub date_utc: String,
pub total_packets_received: i64,
pub total_packets_sent: i64,
pub total_packets_dropped: i64,
pub total_stake: i64,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub struct Service {
pub gateway_identity_key: String,
pub last_updated_utc: String,
pub routing_score: f32,
pub service_provider_client_id: Option<String>,
pub ip_address: Option<String>,
pub hostname: Option<String>,
pub mixnet_websockets: Option<serde_json::Value>,
pub last_successful_ping_utc: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
pub(crate) struct SummaryHistory {
pub date: String,
pub value_json: serde_json::Value,
pub timestamp_utc: String,
}
impl From<TestRunDto> for TestrunAssignment {
fn from(value: TestRunDto) -> Self {
Self {
gateway_pk_id: value.gateway_id,
testrun_id: value.id,
}
}
}
+93
View File
@@ -0,0 +1,93 @@
use axum::Router;
use core::net::SocketAddr;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
use crate::{
db::DbPool,
http::{api::RouterBuilder, state::AppState},
};
/// Return handles that allow for graceful shutdown of server + awaiting its
/// background tokio task
pub(crate) async fn start_http_api(
db_pool: DbPool,
http_port: u16,
nym_http_cache_ttl: u64,
) -> anyhow::Result<ShutdownHandles> {
let router_builder = RouterBuilder::with_default_routes();
let state = AppState::new(db_pool, nym_http_cache_ttl);
let router = router_builder.with_state(state);
// TODO dz do we need this to be configurable?
let bind_addr = format!("0.0.0.0:{}", http_port);
tracing::info!("Binding server to {bind_addr}");
let server = router.build_server(bind_addr).await?;
Ok(start_server(server))
}
fn start_server(server: HttpServer) -> ShutdownHandles {
// one copy is stored to trigger a graceful shutdown later
let shutdown_button = CancellationToken::new();
// other copy is given to server to listen for a shutdown
let shutdown_receiver = shutdown_button.clone();
let shutdown_receiver = shutdown_receiver.cancelled_owned();
let server_handle = tokio::spawn(async move { server.run(shutdown_receiver).await });
ShutdownHandles {
server_handle,
shutdown_button,
}
}
pub(crate) struct ShutdownHandles {
server_handle: JoinHandle<std::io::Result<()>>,
shutdown_button: CancellationToken,
}
impl ShutdownHandles {
/// Send graceful shutdown signal to server and wait for server task to complete
pub(crate) async fn shutdown(self) -> anyhow::Result<()> {
self.shutdown_button.cancel();
match self.server_handle.await {
Ok(Ok(_)) => {
tracing::info!("HTTP server shut down without errors");
}
Ok(Err(err)) => {
tracing::error!("HTTP server terminated with: {err}");
anyhow::bail!(err)
}
Err(err) => {
tracing::error!("Server task panicked: {err}");
}
};
Ok(())
}
}
pub(crate) struct HttpServer {
router: Router,
listener: TcpListener,
}
impl HttpServer {
pub(crate) fn new(router: Router, listener: TcpListener) -> Self {
Self { router, listener }
}
pub(crate) async fn run(self, receiver: WaitForCancellationFutureOwned) -> std::io::Result<()> {
// into_make_service_with_connect_info allows us to see client ip address
axum::serve(
self.listener,
self.router
.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(receiver)
.await
}
}
+223
View File
@@ -0,0 +1,223 @@
use std::{sync::Arc, time::Duration};
use moka::{future::Cache, Entry};
use tokio::sync::RwLock;
use crate::{
db::DbPool,
http::models::{DailyStats, Gateway, Mixnode, SummaryHistory},
};
#[derive(Debug, Clone)]
pub(crate) struct AppState {
db_pool: DbPool,
cache: HttpCache,
}
impl AppState {
pub(crate) fn new(db_pool: DbPool, cache_ttl: u64) -> Self {
Self {
db_pool,
cache: HttpCache::new(cache_ttl),
}
}
pub(crate) fn db_pool(&self) -> &DbPool {
&self.db_pool
}
pub(crate) fn cache(&self) -> &HttpCache {
&self.cache
}
}
static GATEWAYS_LIST_KEY: &str = "gateways";
static MIXNODES_LIST_KEY: &str = "mixnodes";
static MIXSTATS_LIST_KEY: &str = "mixstats";
static SUMMARY_HISTORY_LIST_KEY: &str = "summary-history";
#[derive(Debug, Clone)]
pub(crate) struct HttpCache {
gateways: Cache<String, Arc<RwLock<Vec<Gateway>>>>,
mixnodes: Cache<String, Arc<RwLock<Vec<Mixnode>>>>,
mixstats: Cache<String, Arc<RwLock<Vec<DailyStats>>>>,
history: Cache<String, Arc<RwLock<Vec<SummaryHistory>>>>,
}
impl HttpCache {
pub fn new(ttl_seconds: u64) -> Self {
HttpCache {
gateways: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
mixnodes: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
mixstats: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
history: Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
}
}
pub async fn upsert_gateway_list(
&self,
new_gateway_list: Vec<Gateway>,
) -> Entry<String, Arc<RwLock<Vec<Gateway>>>> {
self.gateways
.entry_by_ref(GATEWAYS_LIST_KEY)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
let v = entry.into_value();
let mut guard = v.write().await;
*guard = new_gateway_list;
v.clone()
} else {
Arc::new(RwLock::new(new_gateway_list))
}
})
.await
}
pub async fn get_gateway_list(&self, db: &DbPool) -> Vec<Gateway> {
match self.gateways.get(GATEWAYS_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.clone()
}
None => {
// the key is missing so populate it
tracing::warn!("No gateways in cache, refreshing cache from DB...");
let gateways = crate::db::queries::get_all_gateways(db)
.await
.unwrap_or_default();
self.upsert_gateway_list(gateways.clone()).await;
if gateways.is_empty() {
tracing::warn!("Database contains 0 gateways");
}
gateways
}
}
}
pub async fn upsert_mixnode_list(
&self,
new_mixnode_list: Vec<Mixnode>,
) -> Entry<String, Arc<RwLock<Vec<Mixnode>>>> {
self.mixnodes
.entry_by_ref(MIXNODES_LIST_KEY)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
let v = entry.into_value();
let mut guard = v.write().await;
*guard = new_mixnode_list;
v.clone()
} else {
Arc::new(RwLock::new(new_mixnode_list))
}
})
.await
}
pub async fn get_mixnodes_list(&self, db: &DbPool) -> Vec<Mixnode> {
match self.mixnodes.get(MIXNODES_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.clone()
}
None => {
tracing::warn!("No mixnodes in cache, refreshing cache from DB...");
let mixnodes = crate::db::queries::get_all_mixnodes(db)
.await
.unwrap_or_default();
self.upsert_mixnode_list(mixnodes.clone()).await;
if mixnodes.is_empty() {
tracing::warn!("Database contains 0 mixnodes");
}
mixnodes
}
}
}
pub async fn upsert_mixnode_stats(
&self,
mixnode_stats: Vec<DailyStats>,
) -> Entry<String, Arc<RwLock<Vec<DailyStats>>>> {
self.mixstats
.entry_by_ref(MIXSTATS_LIST_KEY)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
let v = entry.into_value();
let mut guard = v.write().await;
*guard = mixnode_stats;
v.clone()
} else {
Arc::new(RwLock::new(mixnode_stats))
}
})
.await
}
pub async fn get_mixnode_stats(&self, db: &DbPool) -> Vec<DailyStats> {
match self.mixstats.get(MIXSTATS_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.to_vec()
}
None => {
let mixnode_stats = crate::db::queries::get_daily_stats(db)
.await
.unwrap_or_default();
self.upsert_mixnode_stats(mixnode_stats.clone()).await;
mixnode_stats
}
}
}
pub async fn get_summary_history(&self, db: &DbPool) -> Vec<SummaryHistory> {
match self.history.get(SUMMARY_HISTORY_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.to_vec()
}
None => {
let summary_history = crate::db::queries::get_summary_history(db)
.await
.unwrap_or(vec![]);
self.upsert_summary_history(summary_history.clone()).await;
summary_history
}
}
}
pub async fn upsert_summary_history(
&self,
summary_history: Vec<SummaryHistory>,
) -> Entry<String, Arc<RwLock<Vec<SummaryHistory>>>> {
self.history
.entry_by_ref(SUMMARY_HISTORY_LIST_KEY)
.and_upsert_with(|maybe_entry| async {
if let Some(entry) = maybe_entry {
let v = entry.into_value();
let mut guard = v.write().await;
*guard = summary_history;
v.clone()
} else {
Arc::new(RwLock::new(summary_history))
}
})
.await
}
}
+69
View File
@@ -0,0 +1,69 @@
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{filter::Directive, EnvFilter};
// TODO dz you can get the tracing-subscriber via basic-tracing feature on nym-bin-common
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
directive.into().parse().map_err(From::from)
}
let log_builder = tracing_subscriber::fmt()
// Use a more compact, abbreviated log format
.compact()
// Display source code file paths
.with_file(true)
// Display source code line numbers
.with_line_number(true)
.with_thread_ids(true)
// Don't display the event's target (module path)
.with_target(false);
let mut filter = EnvFilter::builder()
// if RUST_LOG isn't set, set default level
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy();
// these crates are more granularly filtered
let warn_crates = [
"reqwest",
"rustls",
"hyper",
"sqlx",
"h2",
"tendermint_rpc",
"tower_http",
"axum",
];
for crate_name in warn_crates {
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name))?);
}
let log_level_hint = filter.max_level_hint();
// debug or higher granularity (e.g. trace)
let debug_or_higher = std::cmp::max(
log_level_hint.unwrap_or(LevelFilter::DEBUG),
LevelFilter::DEBUG,
);
filter = filter.add_directive(directive_checked(format!(
"nym_bin_common={}",
debug_or_higher
))?);
filter = filter.add_directive(directive_checked(format!(
"nym_explorer_client={}",
debug_or_higher
))?);
filter = filter.add_directive(directive_checked(format!(
"nym_network_defaults={}",
debug_or_higher
))?);
filter = filter.add_directive(directive_checked(format!(
"nym_validator_client={}",
debug_or_higher
))?);
log_builder.with_env_filter(filter).init();
tracing::info!("Log level: {:?}", log_level_hint);
Ok(())
}
+53
View File
@@ -0,0 +1,53 @@
use clap::Parser;
use nym_task::signal::wait_for_signal;
mod cli;
mod db;
mod http;
mod logging;
mod monitor;
mod testruns;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::setup_tracing_logger()?;
let args = cli::Cli::parse();
let connection_url = args.database_url.clone();
tracing::debug!("Using config:\n{:#?}", args);
let storage = db::Storage::init(connection_url).await?;
let db_pool = storage.pool_owned();
let args_clone = args.clone();
tokio::spawn(async move {
monitor::spawn_in_background(
db_pool,
args_clone.explorer_client_timeout,
args_clone.nym_api_client_timeout,
&args_clone.nyxd_addr,
args_clone.monitor_refresh_interval,
)
.await;
tracing::info!("Started monitor task");
});
testruns::spawn(storage.pool_owned(), args.testruns_refresh_interval).await;
let shutdown_handles = http::server::start_http_api(
storage.pool_owned(),
args.http_port,
args.nym_http_cache_ttl,
)
.await
.expect("Failed to start server");
tracing::info!("Started HTTP server on port {}", args.http_port);
wait_for_signal().await;
if let Err(err) = shutdown_handles.shutdown().await {
tracing::error!("{err}");
};
Ok(())
}
+466
View File
@@ -0,0 +1,466 @@
use crate::db::models::{
gateway, mixnode, GatewayRecord, MixnodeRecord, NetworkSummary, GATEWAYS_BLACKLISTED_COUNT,
GATEWAYS_BONDED_COUNT, GATEWAYS_EXPLORER_COUNT, GATEWAYS_HISTORICAL_COUNT,
MIXNODES_BLACKLISTED_COUNT, MIXNODES_BONDED_ACTIVE, MIXNODES_BONDED_COUNT,
MIXNODES_BONDED_INACTIVE, MIXNODES_BONDED_RESERVE, MIXNODES_HISTORICAL_COUNT,
};
use crate::db::{queries, DbPool};
use anyhow::anyhow;
use cosmwasm_std::Decimal;
use nym_explorer_client::{ExplorerClient, PrettyDetailedGatewayBond};
use nym_network_defaults::NymNetworkDetails;
use nym_validator_client::client::NymApiClientExt;
use nym_validator_client::models::{DescribedGateway, DescribedMixNode, MixNodeBondAnnotated};
use nym_validator_client::nym_nodes::SkimmedNode;
use nym_validator_client::nyxd::contract_traits::PagedMixnetQueryClient;
use nym_validator_client::nyxd::{AccountId, NyxdClient};
use nym_validator_client::NymApiClient;
use reqwest::Url;
use std::collections::HashSet;
use std::str::FromStr;
use tokio::time::Duration;
use tracing::instrument;
// TODO dz should be configurable
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
static DELEGATION_PROGRAM_WALLET: &str = "n1rnxpdpx3kldygsklfft0gech7fhfcux4zst5lw";
// TODO dz: query many NYM APIs:
// multiple instances running directory cache, ask sachin
#[instrument(level = "debug", name = "data_monitor", skip_all)]
pub(crate) async fn spawn_in_background(
db_pool: DbPool,
explorer_client_timeout: Duration,
nym_api_client_timeout: Duration,
nyxd_addr: &Url,
refresh_interval: Duration,
) {
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
loop {
tracing::info!("Refreshing node info...");
if let Err(e) = run(
&db_pool,
&network_defaults,
explorer_client_timeout,
nym_api_client_timeout,
nyxd_addr,
)
.await
{
tracing::error!(
"Monitor run failed: {e}, retrying in {}s...",
FAILURE_RETRY_DELAY.as_secs()
);
// TODO dz implement some sort of backoff
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!(
"Info successfully collected, sleeping for {}s...",
refresh_interval.as_secs()
);
tokio::time::sleep(refresh_interval).await;
}
}
}
async fn run(
pool: &DbPool,
network_details: &NymNetworkDetails,
explorer_client_timeout: Duration,
nym_api_client_timeout: Duration,
nyxd_addr: &Url,
) -> anyhow::Result<()> {
let default_api_url = network_details
.endpoints
.first()
.expect("rust sdk mainnet default incorrectly configured")
.api_url()
.clone()
.expect("rust sdk mainnet default missing api_url");
let default_explorer_url = network_details.explorer_api.clone().map(|url| {
url.parse()
.expect("rust sdk mainnet default explorer url not parseable")
});
// TODO dz replace explorer api with ipinfo.io
let default_explorer_url =
default_explorer_url.expect("explorer url missing in network config");
let explorer_client =
ExplorerClient::new_with_timeout(default_explorer_url, explorer_client_timeout)?;
let explorer_gateways = explorer_client
.get_gateways()
.await
.log_error("get_gateways")?;
let api_client = NymApiClient::new_with_timeout(default_api_url, nym_api_client_timeout);
let gateways = api_client
.get_cached_described_gateways()
.await
.log_error("get_described_gateways")?;
tracing::debug!("Fetched {} gateways", gateways.len());
let skimmed_gateways = api_client
.get_basic_gateways(None)
.await
.log_error("get_basic_gateways")?;
let mixnodes = api_client
.get_cached_mixnodes()
.await
.log_error("get_cached_mixnodes")?;
tracing::debug!("Fetched {} mixnodes", mixnodes.len());
let gateways_blacklisted = skimmed_gateways
.iter()
.filter_map(|gw| {
if gw.performance.round_to_integer() <= 50 {
Some(gw.ed25519_identity_pubkey.to_owned())
} else {
None
}
})
.collect::<HashSet<_>>();
// Cached mixnodes don't include blacklisted nodes
// We need that to calculate the total locked tokens later
let mixnodes = api_client
.nym_api
.get_mixnodes_detailed_unfiltered()
.await
.log_error("get_mixnodes_detailed_unfiltered")?;
let mixnodes_described = api_client
.nym_api
.get_mixnodes_described()
.await
.log_error("get_mixnodes_described")?;
let mixnodes_active = api_client
.nym_api
.get_active_mixnodes()
.await
.log_error("get_active_mixnodes")?;
let delegation_program_members =
get_delegation_program_details(network_details, nyxd_addr).await?;
// keep stats for later
let count_bonded_mixnodes = mixnodes.len();
let count_bonded_gateways = gateways.len();
let count_explorer_gateways = explorer_gateways.len();
let count_bonded_mixnodes_active = mixnodes_active.len();
let gateway_records = prepare_gateway_data(
&gateways,
&gateways_blacklisted,
explorer_gateways,
skimmed_gateways,
)?;
queries::insert_gateways(pool, gateway_records)
.await
.map(|_| {
tracing::debug!("Gateway info written to DB!");
})?;
// instead of counting blacklisted GWs returned from API cache, count from the active set
let count_gateways_blacklisted = gateways
.iter()
.filter(|gw| {
let gw_identity = gw.bond.identity();
gateways_blacklisted.contains(gw_identity)
})
.count();
if count_gateways_blacklisted > 0 {
queries::write_blacklisted_gateways_to_db(pool, gateways_blacklisted.iter())
.await
.map(|_| {
tracing::debug!(
"Gateway blacklist info written to DB! {} blacklisted by Nym API",
count_gateways_blacklisted
)
})?;
}
let mixnode_records =
prepare_mixnode_data(&mixnodes, mixnodes_described, delegation_program_members)?;
queries::insert_mixnodes(pool, mixnode_records)
.await
.map(|_| {
tracing::debug!("Mixnode info written to DB!");
})?;
let count_mixnodes_blacklisted = mixnodes.iter().filter(|elem| elem.blacklisted).count();
let recently_unbonded_gateways = queries::ensure_gateways_still_bonded(pool, &gateways).await?;
let recently_unbonded_mixnodes = queries::ensure_mixnodes_still_bonded(pool, &mixnodes).await?;
let count_bonded_mixnodes_reserve = 0; // TODO: NymAPI doesn't report the reserve set size
let count_bonded_mixnodes_inactive = count_bonded_mixnodes - count_bonded_mixnodes_active;
let (all_historical_gateways, all_historical_mixnodes) = calculate_stats(pool).await?;
//
// write summary keys and values to table
//
let nodes_summary = vec![
(MIXNODES_BONDED_COUNT, &count_bonded_mixnodes),
(MIXNODES_BONDED_ACTIVE, &count_bonded_mixnodes_active),
(MIXNODES_BONDED_INACTIVE, &count_bonded_mixnodes_inactive),
(MIXNODES_BONDED_RESERVE, &count_bonded_mixnodes_reserve),
(MIXNODES_BLACKLISTED_COUNT, &count_mixnodes_blacklisted),
(GATEWAYS_BONDED_COUNT, &count_bonded_gateways),
(GATEWAYS_EXPLORER_COUNT, &count_explorer_gateways),
(MIXNODES_HISTORICAL_COUNT, &all_historical_mixnodes),
(GATEWAYS_HISTORICAL_COUNT, &all_historical_gateways),
(GATEWAYS_BLACKLISTED_COUNT, &count_gateways_blacklisted),
];
let last_updated = chrono::offset::Utc::now();
let last_updated_utc = last_updated.timestamp().to_string();
let network_summary = NetworkSummary {
mixnodes: mixnode::MixnodeSummary {
bonded: mixnode::MixnodeSummaryBonded {
count: count_bonded_mixnodes.cast_checked()?,
active: count_bonded_mixnodes_active.cast_checked()?,
inactive: count_bonded_mixnodes_inactive.cast_checked()?,
reserve: count_bonded_mixnodes_reserve.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
blacklisted: mixnode::MixnodeSummaryBlacklisted {
count: count_mixnodes_blacklisted.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
historical: mixnode::MixnodeSummaryHistorical {
count: all_historical_mixnodes.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
},
gateways: gateway::GatewaySummary {
bonded: gateway::GatewaySummaryBonded {
count: count_bonded_gateways.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
blacklisted: gateway::GatewaySummaryBlacklisted {
count: count_gateways_blacklisted.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
historical: gateway::GatewaySummaryHistorical {
count: all_historical_gateways.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
explorer: gateway::GatewaySummaryExplorer {
count: count_explorer_gateways.cast_checked()?,
last_updated_utc: last_updated_utc.to_owned(),
},
},
};
queries::insert_summaries(pool, &nodes_summary, &network_summary, last_updated).await?;
let mut log_lines: Vec<String> = vec![];
for (key, value) in nodes_summary.iter() {
log_lines.push(format!("{} = {}", key, value));
}
log_lines.push(format!(
"recently_unbonded_mixnodes = {}",
recently_unbonded_mixnodes
));
log_lines.push(format!(
"recently_unbonded_gateways = {}",
recently_unbonded_gateways
));
tracing::info!("Directory summary: \n{}", log_lines.join("\n"));
Ok(())
}
fn prepare_gateway_data(
gateways: &[DescribedGateway],
gateways_blacklisted: &HashSet<String>,
explorer_gateways: Vec<PrettyDetailedGatewayBond>,
skimmed_gateways: Vec<SkimmedNode>,
) -> anyhow::Result<Vec<GatewayRecord>> {
let mut gateway_records = Vec::new();
for gateway in gateways {
let identity_key = gateway.bond.identity();
let bonded = true;
let last_updated_utc = chrono::offset::Utc::now().timestamp();
let blacklisted = gateways_blacklisted.contains(identity_key);
let self_described = gateway
.self_described
.as_ref()
.and_then(|v| serde_json::to_string(&v).ok());
let explorer_pretty_bond = explorer_gateways
.iter()
.find(|g| g.gateway.identity_key.eq(identity_key));
let explorer_pretty_bond = explorer_pretty_bond.and_then(|g| serde_json::to_string(g).ok());
let performance = skimmed_gateways
.iter()
.find(|g| g.ed25519_identity_pubkey.eq(identity_key))
.map(|g| g.performance)
.unwrap_or_default()
.round_to_integer();
gateway_records.push(GatewayRecord {
identity_key: identity_key.to_owned(),
bonded,
blacklisted,
self_described,
explorer_pretty_bond,
last_updated_utc,
performance,
});
}
Ok(gateway_records)
}
fn prepare_mixnode_data(
mixnodes: &[MixNodeBondAnnotated],
mixnodes_described: Vec<DescribedMixNode>,
delegation_program_members: Vec<u32>,
) -> anyhow::Result<Vec<MixnodeRecord>> {
let mut mixnode_records = Vec::new();
for mixnode in mixnodes {
let mix_id = mixnode.mix_id();
let identity_key = mixnode.identity_key();
let bonded = true;
let total_stake = decimal_to_i64(mixnode.mixnode_details.total_stake());
let blacklisted = mixnode.blacklisted;
let node_info = mixnode.mix_node();
let host = node_info.host.clone();
let http_port = node_info.http_api_port;
// Contains all the information including what's above
let full_details = serde_json::to_string(&mixnode)?;
let mixnode_described = mixnodes_described.iter().find(|m| m.bond.mix_id == mix_id);
let self_described = mixnode_described.and_then(|v| serde_json::to_string(v).ok());
let is_dp_delegatee = delegation_program_members.contains(&mix_id);
let last_updated_utc = chrono::offset::Utc::now().timestamp();
mixnode_records.push(MixnodeRecord {
mix_id,
identity_key: identity_key.to_owned(),
bonded,
total_stake,
host,
http_port,
blacklisted,
full_details,
self_described,
last_updated_utc,
is_dp_delegatee,
});
}
Ok(mixnode_records)
}
// TODO dz is there a common monorepo place this can be put?
pub trait NumericalCheckedCast<T>
where
T: TryFrom<Self>,
<T as TryFrom<Self>>::Error: std::error::Error,
Self: std::fmt::Display + Copy,
{
fn cast_checked(self) -> anyhow::Result<T> {
T::try_from(self).map_err(|e| {
anyhow::anyhow!(
"Couldn't cast {} to {}: {}",
self,
std::any::type_name::<T>(),
e
)
})
}
}
impl<T, U> NumericalCheckedCast<U> for T
where
U: TryFrom<T>,
<U as TryFrom<T>>::Error: std::error::Error,
T: std::fmt::Display + Copy,
{
}
async fn calculate_stats(pool: &DbPool) -> anyhow::Result<(usize, usize)> {
let mut conn = pool.acquire().await?;
let all_historical_gateways = sqlx::query_scalar!(r#"SELECT count(id) FROM gateways"#)
.fetch_one(&mut *conn)
.await?
.cast_checked()?;
let all_historical_mixnodes = sqlx::query_scalar!(r#"SELECT count(id) FROM mixnodes"#)
.fetch_one(&mut *conn)
.await?
.cast_checked()?;
Ok((all_historical_gateways, all_historical_mixnodes))
}
async fn get_delegation_program_details(
network_details: &NymNetworkDetails,
nyxd_addr: &Url,
) -> anyhow::Result<Vec<u32>> {
let config = nym_validator_client::nyxd::Config::try_from_nym_network_details(network_details)?;
let client = NyxdClient::connect(config, nyxd_addr.as_str())
.map_err(|err| anyhow::anyhow!("Couldn't connect: {}", err))?;
let account_id = AccountId::from_str(DELEGATION_PROGRAM_WALLET)
.map_err(|e| anyhow!("Invalid bech32 address: {}", e))?;
let delegations = client.get_all_delegator_delegations(&account_id).await?;
let mix_ids: Vec<u32> = delegations
.iter()
.map(|delegation| delegation.mix_id)
.collect();
Ok(mix_ids)
}
fn decimal_to_i64(decimal: Decimal) -> i64 {
// Convert the underlying Uint128 to a u128
let atomics = decimal.atomics().u128();
let precision = 1_000_000_000_000_000_000u128;
// Get the fractional part
let fractional = atomics % precision;
// Get the integer part
let integer = atomics / precision;
// Combine them into a float
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
// Limit to 6 decimal places
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
rounded_value as i64
}
trait LogError<T, E> {
fn log_error(self, msg: &str) -> Result<T, E>;
}
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
where
E: std::error::Error,
{
fn log_error(self, msg: &str) -> Result<T, E> {
if let Err(e) = &self {
tracing::error!("[{msg}]:\t{e}");
}
self
}
}
+76
View File
@@ -0,0 +1,76 @@
use crate::db::models::GatewayIdentityDto;
use crate::db::DbPool;
use futures_util::TryStreamExt;
use std::time::Duration;
use tracing::instrument;
pub(crate) mod models;
mod queue;
pub(crate) use queue::now_utc;
pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) {
tokio::spawn(async move {
loop {
tracing::info!("Spawning testruns...");
if let Err(e) = run(&pool).await {
tracing::error!("Cron job failed: {}", e);
}
tracing::debug!("Sleeping for {}s...", refresh_interval.as_secs());
tokio::time::sleep(refresh_interval).await;
}
});
}
// TODO dz make number of max agents configurable
// TODO dz periodically clean up stale pending testruns
#[instrument(level = "debug", name = "testrun_queue", skip_all)]
async fn run(pool: &DbPool) -> anyhow::Result<()> {
if pool.is_closed() {
tracing::debug!("DB pool closed, returning early");
return Ok(());
}
let mut conn = pool.acquire().await?;
let gateways = sqlx::query_as!(
GatewayIdentityDto,
r#"SELECT
gateway_identity_key as "gateway_identity_key!",
bonded as "bonded: bool"
FROM gateways
ORDER BY last_testrun_utc"#,
)
.fetch(&mut *conn)
.try_collect::<Vec<_>>()
.await?;
// TODO dz this filtering could be done in SQL
let gateways: Vec<GatewayIdentityDto> = gateways.into_iter().filter(|g| g.bonded).collect();
tracing::debug!("Trying to queue {} testruns", gateways.len());
let mut testruns_created = 0;
for gateway in gateways {
if let Err(e) = queue::try_queue_testrun(
&mut conn,
gateway.gateway_identity_key.clone(),
// TODO dz read from config
"127.0.0.1".to_string(),
)
.await
// TODO dz measure how many were actually inserted and how many were skipped
{
tracing::debug!(
"Skipping test for identity {} with error {}",
&gateway.gateway_identity_key,
e
);
} else {
testruns_created += 1;
}
}
tracing::debug!("{} testruns queued in total", testruns_created);
Ok(())
}
@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};
#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros
#[derive(Debug, Clone)]
pub struct GatewayIdentityDto {
pub gateway_identity_key: String,
pub bonded: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize, utoipa::ToSchema)]
pub struct TestRun {
pub id: u32,
pub identity_key: String,
pub status: String,
pub log: String,
}
+118
View File
@@ -0,0 +1,118 @@
use crate::db::models::{GatewayInfoDto, TestRunDto, TestRunStatus};
use crate::testruns::models::TestRun;
use anyhow::anyhow;
use chrono::DateTime;
use futures_util::TryStreamExt;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::time::SystemTime;
pub(crate) async fn try_queue_testrun(
conn: &mut PoolConnection<Sqlite>,
identity_key: String,
ip_address: String,
) -> anyhow::Result<TestRun> {
let timestamp = now_utc().timestamp();
let timestamp_pretty = now_utc_as_rfc3339();
let items = sqlx::query_as!(
GatewayInfoDto,
r#"SELECT
id as "id!",
gateway_identity_key as "gateway_identity_key!",
self_described as "self_described?",
explorer_pretty_bond as "explorer_pretty_bond?"
FROM gateways
WHERE gateway_identity_key = ?
ORDER BY gateway_identity_key
LIMIT 1"#,
identity_key,
)
// TODO dz shoudl call .fetch_one
// TODO dz replace this in other queries as well
.fetch(conn.as_mut())
.try_collect::<Vec<_>>()
.await?;
let gateway = items
.iter()
.find(|g| g.gateway_identity_key == identity_key);
// TODO dz if let Some() = gateway.first() ...
if gateway.is_none() {
return Err(anyhow!("Unknown gateway {identity_key}"));
}
let gateway_id = gateway.unwrap().id;
//
// check if there is already a test run for this gateway
//
let items = sqlx::query_as!(
TestRunDto,
r#"SELECT
id as "id!",
gateway_id as "gateway_id!",
status as "status!",
timestamp_utc as "timestamp_utc!",
ip_address as "ip_address!",
log as "log!"
FROM testruns
WHERE gateway_id = ? AND status != 2
ORDER BY id DESC
LIMIT 1"#,
gateway_id,
)
.fetch(conn.as_mut())
.try_collect::<Vec<_>>()
.await?;
if !items.is_empty() {
let testrun = items.first().unwrap();
return Ok(TestRun {
id: testrun.id as u32,
identity_key,
status: format!(
"{}",
TestRunStatus::from_repr(testrun.status as u8).unwrap()
),
log: testrun.log.clone(),
});
}
//
// save test run
//
let status = TestRunStatus::Pending as u32;
let log = format!(
"Test for {identity_key} requested at {} UTC\n\n",
timestamp_pretty
);
let id = sqlx::query!(
"INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)",
gateway_id,
status,
ip_address,
timestamp,
log,
)
.execute(conn.as_mut())
.await?
.last_insert_rowid();
Ok(TestRun {
id: id as u32,
identity_key,
status: format!("{}", TestRunStatus::Pending),
log,
})
}
// TODO dz do we need these?
pub fn now_utc() -> DateTime<chrono::Utc> {
SystemTime::now().into()
}
pub fn now_utc_as_rfc3339() -> String {
now_utc().to_rfc3339()
}
+2
View File
@@ -4,7 +4,9 @@
mod old_config_v1;
mod old_config_v2;
mod old_config_v3;
mod old_config_v4;
pub use old_config_v1::try_upgrade_config_v1;
pub use old_config_v2::try_upgrade_config_v2;
pub use old_config_v3::try_upgrade_config_v3;
pub use old_config_v4::try_upgrade_config_v4;
@@ -4,13 +4,7 @@
#![allow(dead_code)]
use crate::{config::*, error::KeyIOFailure};
use entry_gateway::Debug as EntryGatewayConfigDebug;
use exit_gateway::{IpPacketRouter, IpPacketRouterDebug, NetworkRequester, NetworkRequesterDebug};
use mixnode::{Verloc, VerlocDebug};
use nym_client_core_config_types::{
disk_persistence::{ClientKeysPaths, CommonClientPaths},
DebugConfig as ClientDebugConfig,
};
use nym_client_core_config_types::DebugConfig as ClientDebugConfig;
use nym_config::serde_helpers::de_maybe_port;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_network_requester::{
@@ -19,6 +13,7 @@ use nym_network_requester::{
};
use nym_pemstore::{store_key, store_keypair};
use nym_sphinx_acknowledgements::AckKey;
use old_configs::old_config_v4::*;
use persistence::*;
use rand::rngs::OsRng;
use serde::{Deserialize, Serialize};
@@ -90,12 +85,12 @@ pub enum NodeModeV3 {
ExitGateway,
}
impl From<NodeModeV3> for NodeMode {
impl From<NodeModeV3> for NodeModeV4 {
fn from(config: NodeModeV3) -> Self {
match config {
NodeModeV3::Mixnode => NodeMode::Mixnode,
NodeModeV3::EntryGateway => NodeMode::EntryGateway,
NodeModeV3::ExitGateway => NodeMode::ExitGateway,
NodeModeV3::Mixnode => NodeModeV4::Mixnode,
NodeModeV3::EntryGateway => NodeModeV4::EntryGateway,
NodeModeV3::ExitGateway => NodeModeV4::ExitGateway,
}
}
}
@@ -601,23 +596,6 @@ impl AuthenticatorPathsV3 {
}
}
pub fn to_common_client_paths(&self) -> CommonClientPaths {
CommonClientPaths {
keys: ClientKeysPaths {
private_identity_key_file: self.private_ed25519_identity_key_file.clone(),
public_identity_key_file: self.public_ed25519_identity_key_file.clone(),
private_encryption_key_file: self.private_x25519_diffie_hellman_key_file.clone(),
public_encryption_key_file: self.public_x25519_diffie_hellman_key_file.clone(),
ack_key_file: self.ack_key_file.clone(),
},
gateway_registrations: self.gateway_registrations.clone(),
// not needed for embedded providers
credentials_database: Default::default(),
reply_surb_database: self.reply_surb_database.clone(),
}
}
pub fn ed25519_identity_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_ed25519_identity_key_file,
@@ -963,7 +941,7 @@ pub async fn initialise(
pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
path: P,
prev_config: Option<ConfigV3>,
) -> Result<Config, NymNodeError> {
) -> Result<ConfigV4, NymNodeError> {
tracing::debug!("Updating from 1.1.4");
let old_cfg = if let Some(prev_config) = prev_config {
prev_config
@@ -981,21 +959,21 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
.ok_or(NymNodeError::DataDirDerivationFailure)?,
);
let cfg = Config {
let cfg = ConfigV4 {
save_path: old_cfg.save_path,
id: old_cfg.id,
mode: old_cfg.mode.into(),
host: Host {
host: HostV4 {
public_ips: old_cfg.host.public_ips,
hostname: old_cfg.host.hostname,
location: old_cfg.host.location,
},
mixnet: Mixnet {
mixnet: MixnetV4 {
bind_address: old_cfg.mixnet.bind_address,
announce_port: None,
nym_api_urls: old_cfg.mixnet.nym_api_urls,
nyxd_urls: old_cfg.mixnet.nyxd_urls,
debug: MixnetDebug {
debug: MixnetDebugV4 {
packet_forwarding_initial_backoff: old_cfg
.mixnet
.debug
@@ -1009,8 +987,8 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
unsafe_disable_noise: old_cfg.mixnet.debug.unsafe_disable_noise,
},
},
storage_paths: NymNodePaths {
keys: KeysPaths {
storage_paths: NymNodePathsV4 {
keys: KeysPathsV4 {
private_ed25519_identity_key_file: old_cfg
.storage_paths
.keys
@@ -1038,7 +1016,7 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
},
description: old_cfg.storage_paths.description,
},
http: Http {
http: HttpV4 {
bind_address: old_cfg.http.bind_address,
landing_page_assets_path: old_cfg.http.landing_page_assets_path,
access_token: old_cfg.http.access_token,
@@ -1046,13 +1024,13 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
expose_system_hardware: old_cfg.http.expose_system_hardware,
expose_crypto_hardware: old_cfg.http.expose_crypto_hardware,
},
wireguard: Wireguard {
wireguard: WireguardV4 {
enabled: old_cfg.wireguard.enabled,
bind_address: old_cfg.wireguard.bind_address,
private_ip: old_cfg.wireguard.private_ip,
announced_port: old_cfg.wireguard.announced_port,
private_network_prefix: old_cfg.wireguard.private_network_prefix,
storage_paths: WireguardPaths {
storage_paths: WireguardPathsV4 {
private_diffie_hellman_key_file: old_cfg
.wireguard
.storage_paths
@@ -1063,12 +1041,12 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
.public_diffie_hellman_key_file,
},
},
mixnode: MixnodeConfig {
storage_paths: MixnodePaths {},
verloc: Verloc {
mixnode: MixnodeConfigV4 {
storage_paths: MixnodePathsV4 {},
verloc: VerlocV4 {
bind_address: old_cfg.mixnode.verloc.bind_address,
announce_port: None,
debug: VerlocDebug {
debug: VerlocDebugV4 {
packets_per_node: old_cfg.mixnode.verloc.debug.packets_per_node,
connection_timeout: old_cfg.mixnode.verloc.debug.connection_timeout,
packet_timeout: old_cfg.mixnode.verloc.debug.packet_timeout,
@@ -1078,16 +1056,16 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
retry_timeout: old_cfg.mixnode.verloc.debug.retry_timeout,
},
},
debug: mixnode::Debug {
debug: DebugV4 {
node_stats_logging_delay: old_cfg.mixnode.debug.node_stats_logging_delay,
node_stats_updating_delay: old_cfg.mixnode.debug.node_stats_updating_delay,
},
},
entry_gateway: EntryGatewayConfig {
storage_paths: EntryGatewayPaths {
entry_gateway: EntryGatewayConfigV4 {
storage_paths: EntryGatewayPathsV4 {
clients_storage: old_cfg.entry_gateway.storage_paths.clients_storage,
cosmos_mnemonic: old_cfg.entry_gateway.storage_paths.cosmos_mnemonic,
authenticator: AuthenticatorPaths {
authenticator: AuthenticatorPathsV4 {
private_ed25519_identity_key_file: old_cfg
.entry_gateway
.storage_paths
@@ -1129,16 +1107,16 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
bind_address: old_cfg.entry_gateway.bind_address,
announce_ws_port: old_cfg.entry_gateway.announce_ws_port,
announce_wss_port: old_cfg.entry_gateway.announce_wss_port,
debug: EntryGatewayConfigDebug {
debug: EntryGatewayConfigDebugV4 {
message_retrieval_limit: old_cfg.entry_gateway.debug.message_retrieval_limit,
// \/ ADDED
zk_nym_tickets: Default::default(),
},
},
exit_gateway: ExitGatewayConfig {
storage_paths: ExitGatewayPaths {
exit_gateway: ExitGatewayConfigV4 {
storage_paths: ExitGatewayPathsV4 {
clients_storage: exit_gateway_paths.clients_storage,
network_requester: NetworkRequesterPaths {
network_requester: NetworkRequesterPathsV4 {
private_ed25519_identity_key_file: old_cfg
.exit_gateway
.storage_paths
@@ -1175,7 +1153,7 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
.network_requester
.gateway_registrations,
},
ip_packet_router: IpPacketRouterPaths {
ip_packet_router: IpPacketRouterPathsV4 {
private_ed25519_identity_key_file: old_cfg
.exit_gateway
.storage_paths
@@ -1212,7 +1190,7 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
.ip_packet_router
.gateway_registrations,
},
authenticator: AuthenticatorPaths {
authenticator: AuthenticatorPathsV4 {
private_ed25519_identity_key_file: old_cfg
.exit_gateway
.storage_paths
@@ -1252,8 +1230,8 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
},
open_proxy: old_cfg.exit_gateway.open_proxy,
upstream_exit_policy_url: old_cfg.exit_gateway.upstream_exit_policy_url,
network_requester: NetworkRequester {
debug: NetworkRequesterDebug {
network_requester: NetworkRequesterV4 {
debug: NetworkRequesterDebugV4 {
enabled: old_cfg.exit_gateway.network_requester.debug.enabled,
disable_poisson_rate: old_cfg
.exit_gateway
@@ -1263,8 +1241,8 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
client_debug: old_cfg.exit_gateway.network_requester.debug.client_debug,
},
},
ip_packet_router: IpPacketRouter {
debug: IpPacketRouterDebug {
ip_packet_router: IpPacketRouterV4 {
debug: IpPacketRouterDebugV4 {
enabled: old_cfg.exit_gateway.ip_packet_router.debug.enabled,
disable_poisson_rate: old_cfg
.exit_gateway
@@ -1277,7 +1255,7 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
debug: Default::default(),
},
authenticator: Default::default(),
logging: LoggingSettings {},
logging: LoggingSettingsV4 {},
};
Ok(cfg)
File diff suppressed because it is too large Load Diff
+9
View File
@@ -24,6 +24,7 @@ pub const DEFAULT_NYMNODE_DESCRIPTION_FILENAME: &str = "description.toml";
// Entry Gateway:
pub const DEFAULT_CLIENTS_STORAGE_FILENAME: &str = "clients.sqlite";
pub const DEFAULT_STATS_STORAGE_FILENAME: &str = "stats.sqlite";
pub const DEFAULT_MNEMONIC_FILENAME: &str = "cosmos_mnemonic";
// Exit Gateway:
@@ -147,6 +148,9 @@ pub struct EntryGatewayPaths {
/// derived shared keys, available client bandwidths and wireguard peers.
pub clients_storage: PathBuf,
/// Path to sqlite database containing all persistent stats data.
pub stats_storage: PathBuf,
/// Path to file containing cosmos account mnemonic used for zk-nym redemption.
pub cosmos_mnemonic: PathBuf,
@@ -157,6 +161,7 @@ impl EntryGatewayPaths {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
EntryGatewayPaths {
clients_storage: data_dir.as_ref().join(DEFAULT_CLIENTS_STORAGE_FILENAME),
stats_storage: data_dir.as_ref().join(DEFAULT_STATS_STORAGE_FILENAME),
cosmos_mnemonic: data_dir.as_ref().join(DEFAULT_MNEMONIC_FILENAME),
authenticator: AuthenticatorPaths::new(data_dir),
}
@@ -207,6 +212,9 @@ pub struct ExitGatewayPaths {
/// derived shared keys, available client bandwidths and wireguard peers.
pub clients_storage: PathBuf,
/// Path to sqlite database containing all persistent stats data.
pub stats_storage: PathBuf,
pub network_requester: NetworkRequesterPaths,
pub ip_packet_router: IpPacketRouterPaths,
@@ -459,6 +467,7 @@ impl ExitGatewayPaths {
let data_dir = data_dir.as_ref();
ExitGatewayPaths {
clients_storage: data_dir.join(DEFAULT_CLIENTS_STORAGE_FILENAME),
stats_storage: data_dir.join(DEFAULT_STATS_STORAGE_FILENAME),
network_requester: NetworkRequesterPaths::new(data_dir),
ip_packet_router: IpPacketRouterPaths::new(data_dir),
authenticator: AuthenticatorPaths::new(data_dir),
+7
View File
@@ -185,6 +185,9 @@ announce_wss_port = {{#if entry_gateway.announce_wss_port }} {{ entry_gateway.an
# derived shared keys, available client bandwidths and wireguard peers.
clients_storage = '{{ entry_gateway.storage_paths.clients_storage }}'
# Path to sqlite database containing all persistent stats data.
stats_storage = '{{ entry_gateway.storage_paths.stats_storage }}'
# Path to file containing cosmos account mnemonic used for zk-nym redemption.
cosmos_mnemonic = '{{ entry_gateway.storage_paths.cosmos_mnemonic }}'
@@ -237,6 +240,10 @@ upstream_exit_policy_url = '{{ exit_gateway.upstream_exit_policy_url }}'
# derived shared keys, available client bandwidths and wireguard peers.
clients_storage = '{{ exit_gateway.storage_paths.clients_storage }}'
# Path to sqlite database containing all persistent stats data.
stats_storage = '{{ exit_gateway.storage_paths.stats_storage }}'
[exit_gateway.storage_paths.network_requester]
# Path to file containing network requester ed25519 identity private key.
private_ed25519_identity_key_file = '{{ exit_gateway.storage_paths.network_requester.private_ed25519_identity_key_file }}'
+2 -1
View File
@@ -10,7 +10,8 @@ use std::path::Path;
async fn try_upgrade_config(path: &Path) -> Result<(), NymNodeError> {
let cfg = try_upgrade_config_v1(path, None).await.ok();
let cfg = try_upgrade_config_v2(path, cfg).await.ok();
match try_upgrade_config_v3(path, cfg).await {
let cfg = try_upgrade_config_v3(path, cfg).await.ok();
match try_upgrade_config_v4(path, cfg).await {
Ok(cfg) => cfg.save(),
Err(e) => {
tracing::error!("Failed to finish upgrade - {e}");
+15
View File
@@ -67,6 +67,7 @@ impl MixnodeData {
pub struct EntryGatewayData {
mnemonic: Zeroizing<bip39::Mnemonic>,
client_storage: nym_gateway::node::PersistentStorage,
stats_storage: nym_gateway::node::PersistentStatsStorage,
sessions_stats: SharedSessionStats,
}
@@ -94,6 +95,11 @@ impl EntryGatewayData {
)
.await
.map_err(nym_gateway::GatewayError::from)?,
stats_storage: nym_gateway::node::PersistentStatsStorage::init(
&config.storage_paths.stats_storage,
)
.await
.map_err(nym_gateway::GatewayError::from)?,
sessions_stats: SharedSessionStats::new(),
})
}
@@ -114,6 +120,7 @@ pub struct ExitGatewayData {
auth_x25519: x25519::PublicKey,
client_storage: nym_gateway::node::PersistentStorage,
stats_storage: nym_gateway::node::PersistentStatsStorage,
}
impl ExitGatewayData {
@@ -262,6 +269,11 @@ impl ExitGatewayData {
.await
.map_err(nym_gateway::GatewayError::from)?;
let stats_storage =
nym_gateway::node::PersistentStatsStorage::init(&config.storage_paths.stats_storage)
.await
.map_err(nym_gateway::GatewayError::from)?;
Ok(ExitGatewayData {
nr_ed25519,
nr_x25519,
@@ -270,6 +282,7 @@ impl ExitGatewayData {
auth_ed25519,
auth_x25519,
client_storage,
stats_storage,
})
}
}
@@ -580,6 +593,7 @@ impl NymNode {
self.ed25519_identity_keys.clone(),
self.x25519_sphinx_keys.clone(),
self.entry_gateway.client_storage.clone(),
self.entry_gateway.stats_storage.clone(),
);
entry_gateway.disable_http_server();
entry_gateway.set_task_client(task_client);
@@ -610,6 +624,7 @@ impl NymNode {
self.ed25519_identity_keys.clone(),
self.x25519_sphinx_keys.clone(),
self.exit_gateway.client_storage.clone(),
self.exit_gateway.stats_storage.clone(),
);
exit_gateway.disable_http_server();
exit_gateway.set_task_client(task_client);
+10 -1
View File
@@ -107,6 +107,7 @@ impl Config {
nyxd_scraper: NyxdScraper {
websocket_url,
pruning: Default::default(),
store_precommits: true,
},
base: Base {
upstream_nyxd: nyxd_url,
@@ -122,6 +123,7 @@ impl Config {
rpc_url: self.base.upstream_nyxd.clone(),
database_path: self.storage_paths.nyxd_scraper.clone(),
pruning_options: self.nyxd_scraper.pruning,
store_precommits: self.nyxd_scraper.store_precommits,
}
}
@@ -249,7 +251,14 @@ pub struct NyxdScraper {
// if the value is missing, use `nothing` pruning as this was the past behaviour
#[serde(default = "PruningOptions::nothing")]
pub pruning: PruningOptions,
// TODO: debug with everything that's currently hardcoded in the scraper
/// Specifies whether to store pre-commits within the database.
#[serde(default = "default_store_precommits")]
pub store_precommits: bool,
}
fn default_store_precommits() -> bool {
true
}
impl NyxdScraper {
@@ -19,6 +19,7 @@ pub mod ecash;
mod init;
mod list_gateways;
mod peer_handler;
mod request;
mod run;
mod sign;
mod switch_gateway;
@@ -69,6 +70,9 @@ pub(crate) enum Commands {
/// parameters.
Run(run::Run),
/// Make a dummy request to a running authenticator
Request(request::Request),
/// Ecash-related functionalities
Ecash(Ecash),
@@ -127,6 +131,7 @@ pub(crate) async fn execute(args: Cli) -> Result<(), AuthenticatorError> {
match args.command {
Commands::Init(m) => init::execute(m).await?,
Commands::Run(m) => run::execute(&m).await?,
Commands::Request(r) => request::execute(&r).await?,
Commands::Ecash(ecash) => ecash.execute().await?,
Commands::ListGateways(args) => list_gateways::execute(args).await?,
Commands::AddGateway(args) => add_gateway::execute(args).await?,
@@ -0,0 +1,142 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::AuthenticatorError;
use crate::cli::{override_config, OverrideConfig};
use crate::cli::{try_load_current_config, version_check};
use clap::{Args, Subcommand};
use nym_authenticator_requests::latest::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
request::{AuthenticatorRequest, AuthenticatorRequestData},
};
use nym_client_core::cli_helpers::client_run::CommonClientRunArgs;
use nym_sdk::mixnet::{MixnetMessageSender, Recipient, TransmissionLane};
use nym_task::TaskHandle;
use nym_wireguard_types::PeerPublicKey;
use std::net::IpAddr;
use std::str::FromStr;
use std::time::Duration;
use tokio::time::sleep;
#[allow(clippy::struct_excessive_bools)]
#[derive(Args, Clone)]
pub(crate) struct Request {
#[command(flatten)]
common_args: CommonClientRunArgs,
#[command(subcommand)]
request: RequestType,
authenticator_recipient: String,
}
impl From<Request> for OverrideConfig {
fn from(request_config: Request) -> Self {
OverrideConfig {
nym_apis: None,
nyxd_urls: request_config.common_args.nyxd_urls,
enabled_credentials_mode: request_config.common_args.enabled_credentials_mode,
}
}
}
#[derive(Clone, Subcommand)]
pub(crate) enum RequestType {
Initial(Initial),
Final(Final),
QueryBandwidth(QueryBandwidth),
}
#[derive(Args, Clone, Debug)]
pub(crate) struct Initial {
pub_key: String,
}
#[derive(Args, Clone, Debug)]
pub(crate) struct Final {
pub_key: String,
private_ip: String,
mac: String,
}
#[derive(Args, Clone, Debug)]
pub(crate) struct QueryBandwidth {
pub_key: String,
}
impl TryFrom<RequestType> for AuthenticatorRequestData {
type Error = AuthenticatorError;
fn try_from(value: RequestType) -> Result<Self, Self::Error> {
let ret = match value {
RequestType::Initial(req) => AuthenticatorRequestData::Initial(InitMessage::new(
PeerPublicKey::from_str(&req.pub_key)?,
)),
RequestType::Final(req) => AuthenticatorRequestData::Final(Box::new(FinalMessage {
gateway_client: GatewayClient {
pub_key: PeerPublicKey::from_str(&req.pub_key)?,
private_ip: IpAddr::from_str(&req.private_ip)?,
mac: ClientMac::from_str(&req.mac)?,
},
credential: None,
})),
RequestType::QueryBandwidth(req) => {
AuthenticatorRequestData::QueryBandwidth(PeerPublicKey::from_str(&req.pub_key)?)
}
};
Ok(ret)
}
}
pub(crate) async fn execute(args: &Request) -> Result<(), AuthenticatorError> {
let mut config = try_load_current_config(&args.common_args.id).await?;
config = override_config(config, OverrideConfig::from(args.clone()));
if !version_check(&config) {
log::error!("failed the local version check");
return Err(AuthenticatorError::FailedLocalVersionCheck);
}
let shutdown = TaskHandle::default();
let mixnet_client = nym_authenticator::mixnet_client::create_mixnet_client(
&config.base,
shutdown.get_handle().named("nym_sdk::MixnetClient"),
None,
None,
false,
&config.storage_paths.common_paths,
)
.await?;
let request_data = AuthenticatorRequestData::try_from(args.request.clone())?;
let authenticator_recipient = Recipient::from_str(&args.authenticator_recipient)?;
let (request, _) = match request_data {
AuthenticatorRequestData::Initial(init_message) => {
AuthenticatorRequest::new_initial_request(init_message, *mixnet_client.nym_address())
}
AuthenticatorRequestData::Final(final_message) => {
AuthenticatorRequest::new_final_request(*final_message, *mixnet_client.nym_address())
}
AuthenticatorRequestData::QueryBandwidth(query_message) => {
AuthenticatorRequest::new_query_request(query_message, *mixnet_client.nym_address())
}
AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
AuthenticatorRequest::new_topup_request(*top_up_message, *mixnet_client.nym_address())
}
};
mixnet_client
.split_sender()
.send(nym_sdk::mixnet::InputMessage::new_regular(
authenticator_recipient,
request.to_bytes().unwrap(),
TransmissionLane::General,
None,
))
.await
.map_err(|source| AuthenticatorError::FailedToSendPacketToMixnet { source })?;
log::info!("Sent request, sleeping 60 seconds or until killed");
sleep(Duration::from_secs(60)).await;
Ok(())
}
@@ -88,6 +88,18 @@ pub enum AuthenticatorError {
#[error("storage should have the requested bandwidht entry")]
MissingClientBandwidthEntry,
#[error("{0}")]
PublicKey(#[from] nym_wireguard_types::Error),
#[error("{0}")]
IpAddr(#[from] std::net::AddrParseError),
#[error("{0}")]
AuthenticatorRequests(#[from] nym_authenticator_requests::Error),
#[error("{0}")]
RecipientFormatting(#[from] nym_sdk::mixnet::RecipientFormattingError),
}
pub type Result<T> = std::result::Result<T, AuthenticatorError>;
@@ -11,7 +11,7 @@ use crate::{config::BaseClientConfig, error::AuthenticatorError};
// This is NOT in the SDK since we don't want to expose any of the client-core config types.
// We could however consider moving it to a crate in common in the future.
// TODO: refactor this function and its arguments
pub(crate) async fn create_mixnet_client(
pub async fn create_mixnet_client(
config: &BaseClientConfig,
shutdown: TaskClient,
custom_transceiver: Option<Box<dyn GatewayTransceiver + Send + Sync>>,