Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1bdcf9c3cf | |||
| 4ebb9cd239 | |||
| 620d68ea2f | |||
| b747308f74 | |||
| afdd721cc3 | |||
| 9f5c4c5968 | |||
| 9583a5c6c8 | |||
| da60fc0ade | |||
| 96b54c455e | |||
| cc983963d4 | |||
| 40d9321aec | |||
| e5a29cc76e | |||
| 56c55f6b95 | |||
| 2f051fd943 | |||
| c03cf86000 | |||
| ab11508235 | |||
| e65bfaeb31 |
@@ -57,6 +57,12 @@ jobs:
|
||||
command: fmt
|
||||
args: --all -- --check
|
||||
|
||||
- name: Clippy
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: clippy
|
||||
args: --workspace --all-targets -- -D warnings
|
||||
|
||||
- name: Build all binaries
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
@@ -82,9 +88,3 @@ jobs:
|
||||
with:
|
||||
command: test
|
||||
args: --workspace -- --ignored
|
||||
|
||||
- name: Clippy
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: clippy
|
||||
args: --workspace --all-targets -- -D warnings
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
name: Build and upload Data observatory container to harbor.nymte.ch
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
WORKING_DIRECTORY: "nym-data-observatory"
|
||||
CONTAINER_NAME: "data-observatory"
|
||||
|
||||
jobs:
|
||||
build-container:
|
||||
runs-on: arc-ubuntu-22.04-dind
|
||||
steps:
|
||||
- name: Login to Harbor
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: harbor.nymte.ch
|
||||
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
|
||||
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
|
||||
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Configure git identity
|
||||
run: |
|
||||
git config --global user.email "lawrence@nymtech.net"
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.3
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
- name: Check if tag exists
|
||||
run: |
|
||||
if git rev-parse ${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
|
||||
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
|
||||
fi
|
||||
|
||||
- name: Remove existing tag if exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
|
||||
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
fi
|
||||
|
||||
- name: Create tag
|
||||
run: |
|
||||
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
|
||||
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
|
||||
- name: BuildAndPushImageOnHarbor
|
||||
run: |
|
||||
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
|
||||
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
|
||||
@@ -0,0 +1,56 @@
|
||||
name: Build and upload Node Status agent container to harbor.nymte.ch
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
WORKING_DIRECTORY: "nym-node-status-agent"
|
||||
CONTAINER_NAME: "node-status-agent"
|
||||
|
||||
jobs:
|
||||
build-container:
|
||||
runs-on: arc-ubuntu-22.04-dind
|
||||
steps:
|
||||
- name: Login to Harbor
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: harbor.nymte.ch
|
||||
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
|
||||
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
|
||||
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Configure git identity
|
||||
run: |
|
||||
git config --global user.email "lawrence@nymtech.net"
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.3
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
- name: Check if tag exists
|
||||
run: |
|
||||
if git rev-parse ${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
|
||||
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
|
||||
fi
|
||||
|
||||
- name: Remove existing tag if exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
|
||||
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
fi
|
||||
|
||||
- name: Create tag
|
||||
run: |
|
||||
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
|
||||
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
|
||||
- name: BuildAndPushImageOnHarbor
|
||||
run: |
|
||||
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
|
||||
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
|
||||
@@ -1,11 +1,55 @@
|
||||
name: Build and upload Node Status API container to harbor.nymte.ch
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
WORKING_DIRECTORY: "nym-node-status-api"
|
||||
CONTAINER_NAME: "node-status-api"
|
||||
|
||||
jobs:
|
||||
my-job:
|
||||
runs-on: arc-ubuntu-22.04
|
||||
build-container:
|
||||
runs-on: arc-ubuntu-22.04-dind
|
||||
steps:
|
||||
- name: my-step
|
||||
run: echo "Hello World!"
|
||||
- name: Login to Harbor
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: harbor.nymte.ch
|
||||
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
|
||||
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
|
||||
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Configure git identity
|
||||
run: |
|
||||
git config --global user.email "lawrence@nymtech.net"
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.3
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
- name: Check if tag exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
|
||||
echo "Tag ${{ steps.get_version.outputs.result }} already exists"
|
||||
fi
|
||||
|
||||
- name: Remove existing tag if exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
|
||||
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
fi
|
||||
|
||||
- name: Create tag
|
||||
run: |
|
||||
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
|
||||
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
|
||||
- name: BuildAndPushImageOnHarbor
|
||||
run: |
|
||||
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
|
||||
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
|
||||
|
||||
Generated
+950
-320
File diff suppressed because it is too large
Load Diff
+13
@@ -54,12 +54,14 @@ members = [
|
||||
"common/exit-policy",
|
||||
"common/gateway-requests",
|
||||
"common/gateway-storage",
|
||||
"common/gateway-stats-storage",
|
||||
"common/http-api-client",
|
||||
"common/http-api-common",
|
||||
"common/inclusion-probability",
|
||||
"common/ip-packet-requests",
|
||||
"common/ledger",
|
||||
"common/mixnode-common",
|
||||
"common/models",
|
||||
"common/network-defaults",
|
||||
"common/node-tester-utils",
|
||||
"common/nonexhaustive-delayqueue",
|
||||
@@ -119,6 +121,8 @@ members = [
|
||||
"nym-node",
|
||||
"nym-node/nym-node-http-api",
|
||||
"nym-node/nym-node-requests",
|
||||
"nym-node-status-api",
|
||||
"nym-node-status-agent",
|
||||
"nym-outfox",
|
||||
"nym-validator-rewarder",
|
||||
"tools/echo-server",
|
||||
@@ -146,13 +150,16 @@ members = [
|
||||
default-members = [
|
||||
"clients/native",
|
||||
"clients/socks5",
|
||||
"common/models",
|
||||
"explorer-api",
|
||||
"gateway",
|
||||
"mixnode",
|
||||
"nym-api",
|
||||
"nym-data-observatory",
|
||||
"nym-node",
|
||||
"nym-node-status-api",
|
||||
"nym-validator-rewarder",
|
||||
"nym-node-status-api",
|
||||
"service-providers/authenticator",
|
||||
"service-providers/ip-packet-router",
|
||||
"service-providers/network-requester",
|
||||
@@ -233,10 +240,12 @@ dotenvy = "0.15.6"
|
||||
ecdsa = "0.16"
|
||||
ed25519-dalek = "2.1"
|
||||
etherparse = "0.13.0"
|
||||
envy = "0.4"
|
||||
eyre = "0.6.9"
|
||||
fastrand = "2.1.1"
|
||||
flate2 = "1.0.34"
|
||||
futures = "0.3.28"
|
||||
futures-util = "0.3"
|
||||
generic-array = "0.14.7"
|
||||
getrandom = "0.2.10"
|
||||
getset = "0.1.3"
|
||||
@@ -266,6 +275,7 @@ ledger-transport-hid = "0.10.0"
|
||||
log = "0.4"
|
||||
maxminddb = "0.23.0"
|
||||
mime = "0.3.17"
|
||||
moka = { version = "0.12", features = ["future"] }
|
||||
nix = "0.27.1"
|
||||
notify = "5.1.0"
|
||||
okapi = "0.7.0"
|
||||
@@ -299,6 +309,7 @@ serde = "1.0.211"
|
||||
serde_bytes = "0.11.15"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0.132"
|
||||
serde_json_path = "0.6.7"
|
||||
serde_repr = "0.1"
|
||||
serde_with = "3.9.0"
|
||||
serde_yaml = "0.9.25"
|
||||
@@ -307,6 +318,7 @@ si-scale = "0.2.3"
|
||||
sphinx-packet = "0.1.1"
|
||||
sqlx = "0.7.4"
|
||||
strum = "0.26"
|
||||
strum_macros = "0.26"
|
||||
subtle-encoding = "0.5"
|
||||
syn = "1"
|
||||
sysinfo = "0.30.13"
|
||||
@@ -328,6 +340,7 @@ tracing = "0.1.37"
|
||||
tracing-opentelemetry = "0.19.0"
|
||||
tracing-subscriber = "0.3.16"
|
||||
tracing-tree = "0.2.2"
|
||||
tracing-log = "0.2"
|
||||
ts-rs = "10.0.0"
|
||||
tungstenite = { version = "0.20.1", default-features = false }
|
||||
url = "2.5"
|
||||
|
||||
@@ -45,3 +45,4 @@ tracing = [
|
||||
"opentelemetry",
|
||||
]
|
||||
clap = [ "dep:clap", "dep:clap_complete", "dep:clap_complete_fig" ]
|
||||
models = []
|
||||
|
||||
@@ -25,7 +25,7 @@ serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
nym-http-api-client = { path = "../../../common/http-api-client" }
|
||||
thiserror = { workspace = true }
|
||||
log = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
url = { workspace = true, features = ["serde"] }
|
||||
tokio = { workspace = true, features = ["sync", "time"] }
|
||||
time = { workspace = true, features = ["formatting"] }
|
||||
|
||||
@@ -265,6 +265,13 @@ impl NymApiClient {
|
||||
NymApiClient { nym_api }
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn new_with_timeout(api_url: Url, timeout: std::time::Duration) -> Self {
|
||||
let nym_api = nym_api::Client::new(api_url, Some(timeout));
|
||||
|
||||
NymApiClient { nym_api }
|
||||
}
|
||||
|
||||
pub fn new_with_user_agent(api_url: Url, user_agent: UserAgent) -> Self {
|
||||
let nym_api = nym_api::Client::builder::<_, ValidatorClientError>(api_url)
|
||||
.expect("invalid api url")
|
||||
|
||||
@@ -121,36 +121,36 @@ async fn test_nyxd_connection(
|
||||
{
|
||||
Ok(Err(NyxdError::TendermintErrorRpc(e))) => {
|
||||
// If we get a tendermint-rpc error, we classify the node as not contactable
|
||||
log::warn!("Checking: nyxd url: {url}: {}: {}", "failed".red(), e);
|
||||
tracing::warn!("Checking: nyxd url: {url}: {}: {}", "failed".red(), e);
|
||||
false
|
||||
}
|
||||
Ok(Err(NyxdError::AbciError { code, log, .. })) => {
|
||||
// We accept the mixnet contract not found as ok from a connection standpoint. This happens
|
||||
// for example on a pre-launch network.
|
||||
log::debug!(
|
||||
tracing::debug!(
|
||||
"Checking: nyxd url: {url}: {}, but with abci error: {code}: {log}",
|
||||
"success".green()
|
||||
);
|
||||
code == 18
|
||||
}
|
||||
Ok(Err(error @ NyxdError::NoContractAddressAvailable(_))) => {
|
||||
log::warn!("Checking: nyxd url: {url}: {}: {error}", "failed".red());
|
||||
tracing::warn!("Checking: nyxd url: {url}: {}: {error}", "failed".red());
|
||||
false
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
// For any other error, we're optimistic and just try anyway.
|
||||
log::warn!(
|
||||
tracing::warn!(
|
||||
"Checking: nyxd_url: {url}: {}, but with error: {e}",
|
||||
"success".green()
|
||||
);
|
||||
true
|
||||
}
|
||||
Ok(Ok(_)) => {
|
||||
log::debug!("Checking: nyxd_url: {url}: {}", "success".green());
|
||||
tracing::debug!("Checking: nyxd_url: {url}: {}", "success".green());
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Checking: nyxd_url: {url}: {}: {e}", "failed".red());
|
||||
tracing::warn!("Checking: nyxd_url: {url}: {}: {e}", "failed".red());
|
||||
false
|
||||
}
|
||||
};
|
||||
@@ -169,15 +169,15 @@ async fn test_nym_api_connection(
|
||||
.await
|
||||
{
|
||||
Ok(Ok(_)) => {
|
||||
log::debug!("Checking: api_url: {url}: {}", "success".green());
|
||||
tracing::debug!("Checking: api_url: {url}: {}", "success".green());
|
||||
true
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
log::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
|
||||
tracing::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
|
||||
false
|
||||
}
|
||||
Err(e) => {
|
||||
log::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
|
||||
tracing::debug!("Checking: api_url: {url}: {}: {e}", "failed".red());
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
@@ -41,6 +41,7 @@ use nym_mixnet_contract_common::mixnode::MixNodeDetails;
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, NodeId};
|
||||
use time::format_description::BorrowedFormatItem;
|
||||
use time::Date;
|
||||
use tracing::instrument;
|
||||
|
||||
pub mod error;
|
||||
pub mod routes;
|
||||
@@ -52,11 +53,13 @@ pub fn rfc_3339_date() -> Vec<BorrowedFormatItem<'static>> {
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
pub trait NymApiClientExt: ApiClient {
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.get_json(&[routes::API_VERSION, routes::MIXNODES], NO_PARAMS)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
@@ -70,6 +73,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways_detailed(&self) -> Result<Vec<GatewayBondAnnotated>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
@@ -83,6 +87,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_detailed_unfiltered(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
@@ -98,11 +103,13 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways(&self) -> Result<Vec<GatewayBond>, NymAPIError> {
|
||||
self.get_json(&[routes::API_VERSION, routes::GATEWAYS], NO_PARAMS)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways_described(&self) -> Result<Vec<LegacyDescribedGateway>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::GATEWAYS, routes::DESCRIBED],
|
||||
@@ -111,6 +118,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_described(&self) -> Result<Vec<LegacyDescribedMixNode>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::DESCRIBED],
|
||||
@@ -119,6 +127,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn get_basic_mixnodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
@@ -142,6 +151,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_basic_gateways(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
@@ -167,6 +177,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
|
||||
/// retrieve basic information for nodes are capable of operating as an entry gateway
|
||||
/// this includes legacy gateways and nym-nodes
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_all_basic_entry_assigned_nodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
@@ -208,6 +219,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
|
||||
/// retrieve basic information for nodes that got assigned 'mixing' node in this epoch
|
||||
/// this includes legacy mixnodes and nym-nodes
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_basic_active_mixing_assigned_nodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
@@ -247,6 +259,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::ACTIVE],
|
||||
@@ -255,6 +268,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_active_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
@@ -269,6 +283,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_rewarded_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::REWARDED],
|
||||
@@ -277,6 +292,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_report(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -294,6 +310,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateway_report(
|
||||
&self,
|
||||
identity: IdentityKeyRef<'_>,
|
||||
@@ -311,6 +328,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_history(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -328,6 +346,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateway_history(
|
||||
&self,
|
||||
identity: IdentityKeyRef<'_>,
|
||||
@@ -345,6 +364,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_rewarded_mixnodes_detailed(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
@@ -361,6 +381,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateway_core_status_count(
|
||||
&self,
|
||||
identity: IdentityKeyRef<'_>,
|
||||
@@ -392,6 +413,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_core_status_count(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -424,6 +446,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_status(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -441,6 +464,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_reward_estimation(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -458,6 +482,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn compute_mixnode_reward_estimation(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -477,6 +502,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_stake_saturation(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -494,6 +520,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_inclusion_probability(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -511,6 +538,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_current_node_performance(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
@@ -541,6 +569,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_blacklisted(&self) -> Result<Vec<NodeId>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::MIXNODES, routes::BLACKLISTED],
|
||||
@@ -549,6 +578,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways_blacklisted(&self) -> Result<Vec<IdentityKey>, NymAPIError> {
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, routes::GATEWAYS, routes::BLACKLISTED],
|
||||
@@ -557,6 +587,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, request_body))]
|
||||
async fn blind_sign(
|
||||
&self,
|
||||
request_body: &BlindSignRequestBody,
|
||||
@@ -573,6 +604,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, request_body))]
|
||||
async fn verify_ecash_ticket(
|
||||
&self,
|
||||
request_body: &VerifyEcashTicketBody,
|
||||
@@ -589,6 +621,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, request_body))]
|
||||
async fn batch_redeem_ecash_tickets(
|
||||
&self,
|
||||
request_body: &BatchRedeemTicketsBody,
|
||||
@@ -605,6 +638,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn double_spending_filter_v1(&self) -> Result<SpentCredentialsResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
@@ -617,6 +651,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn partial_expiration_date_signatures(
|
||||
&self,
|
||||
expiration_date: Option<Date>,
|
||||
@@ -640,6 +675,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn partial_coin_indices_signatures(
|
||||
&self,
|
||||
epoch_id: Option<EpochId>,
|
||||
@@ -660,6 +696,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn global_expiration_date_signatures(
|
||||
&self,
|
||||
expiration_date: Option<Date>,
|
||||
@@ -683,6 +720,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn global_coin_indices_signatures(
|
||||
&self,
|
||||
epoch_id: Option<EpochId>,
|
||||
@@ -703,6 +741,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn master_verification_key(
|
||||
&self,
|
||||
epoch_id: Option<EpochId>,
|
||||
@@ -722,6 +761,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn epoch_credentials(
|
||||
&self,
|
||||
dkg_epoch: EpochId,
|
||||
@@ -738,6 +778,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn issued_credential(
|
||||
&self,
|
||||
credential_id: i64,
|
||||
@@ -754,6 +795,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn issued_credentials(
|
||||
&self,
|
||||
credential_ids: Vec<i64>,
|
||||
|
||||
@@ -8,9 +8,9 @@ use crate::nyxd::CosmWasmClient;
|
||||
use async_trait::async_trait;
|
||||
use cosmrs::AccountId;
|
||||
use cosmwasm_std::Addr;
|
||||
use log::trace;
|
||||
use nym_coconut_dkg_common::types::{ChunkIndex, NodeIndex, StateAdvanceResponse};
|
||||
use serde::Deserialize;
|
||||
use tracing::trace;
|
||||
|
||||
use nym_coconut_dkg_common::dealer::RegisteredDealerDetails;
|
||||
pub use nym_coconut_dkg_common::{
|
||||
|
||||
+3
-4
@@ -29,7 +29,6 @@ use cosmrs::proto::cosmwasm::wasm::v1::{
|
||||
};
|
||||
use cosmrs::tendermint::{block, chain, Hash};
|
||||
use cosmrs::{AccountId, Coin as CosmosCoin, Tx};
|
||||
use log::trace;
|
||||
use prost::Message;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -68,7 +67,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
Res: Message + Default,
|
||||
{
|
||||
if let Some(ref abci_path) = path {
|
||||
trace!("performing query on abci path {abci_path}")
|
||||
tracing::trace!("performing query on abci path {abci_path}")
|
||||
}
|
||||
let mut buf = Vec::with_capacity(req.encoded_len());
|
||||
req.encode(&mut buf)?;
|
||||
@@ -297,7 +296,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
|
||||
let start = Instant::now();
|
||||
loop {
|
||||
log::debug!(
|
||||
tracing::debug!(
|
||||
"Polling for result of including {} in a block...",
|
||||
broadcasted.hash
|
||||
);
|
||||
@@ -522,7 +521,7 @@ pub trait CosmWasmClient: TendermintRpcClient {
|
||||
.make_abci_query::<_, QuerySmartContractStateResponse>(path, req)
|
||||
.await?;
|
||||
|
||||
trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
|
||||
tracing::trace!("raw query response: {}", String::from_utf8_lossy(&res.data));
|
||||
Ok(serde_json::from_slice(&res.data)?)
|
||||
}
|
||||
|
||||
|
||||
+1
-1
@@ -25,12 +25,12 @@ use cosmrs::proto::cosmos::tx::signing::v1beta1::SignMode;
|
||||
use cosmrs::staking::{MsgDelegate, MsgUndelegate};
|
||||
use cosmrs::tx::{self, Msg};
|
||||
use cosmrs::{cosmwasm, AccountId, Any, Tx};
|
||||
use log::debug;
|
||||
use serde::Serialize;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use std::time::SystemTime;
|
||||
use tendermint_rpc::endpoint::broadcast;
|
||||
use tracing::debug;
|
||||
|
||||
fn empty_fee() -> tx::Fee {
|
||||
tx::Fee {
|
||||
|
||||
@@ -7,9 +7,9 @@ use base64::Engine;
|
||||
use cosmrs::abci::TxMsgData;
|
||||
use cosmrs::cosmwasm::MsgExecuteContractResponse;
|
||||
use cosmrs::proto::cosmos::base::query::v1beta1::{PageRequest, PageResponse};
|
||||
use log::error;
|
||||
use prost::bytes::Bytes;
|
||||
use tendermint_rpc::endpoint::broadcast;
|
||||
use tracing::error;
|
||||
|
||||
pub use cosmrs::abci::MsgResponse;
|
||||
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
[package]
|
||||
name = "nym-gateway-stats-storage"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
sqlx = { workspace = true, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
"macros",
|
||||
"migrate",
|
||||
"time",
|
||||
] }
|
||||
time = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
nym-sphinx = { path = "../nymsphinx" }
|
||||
nym-credentials-interface = { path = "../credentials-interface" }
|
||||
|
||||
|
||||
[build-dependencies]
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
sqlx = { workspace = true, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
"macros",
|
||||
"migrate",
|
||||
] }
|
||||
@@ -0,0 +1,28 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use sqlx::{Connection, SqliteConnection};
|
||||
use std::env;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let out_dir = env::var("OUT_DIR").unwrap();
|
||||
let database_path = format!("{}/gateway-stats-example.sqlite", out_dir);
|
||||
|
||||
let mut conn = SqliteConnection::connect(&format!("sqlite://{}?mode=rwc", database_path))
|
||||
.await
|
||||
.expect("Failed to create SQLx database connection");
|
||||
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&mut conn)
|
||||
.await
|
||||
.expect("Failed to perform SQLx migrations");
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);
|
||||
|
||||
#[cfg(target_family = "windows")]
|
||||
// for some strange reason we need to add a leading `/` to the windows path even though it's
|
||||
// not a valid windows path... but hey, it works...
|
||||
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
* SPDX-License-Identifier: GPL-3.0-only
|
||||
*/
|
||||
|
||||
CREATE TABLE sessions_active
|
||||
(
|
||||
client_address TEXT NOT NULL PRIMARY KEY UNIQUE,
|
||||
start_time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
|
||||
typ TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE sessions_finished
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
day DATE NOT NULL,
|
||||
duration_ms INTEGER NOT NULL,
|
||||
typ TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE sessions_unique_users
|
||||
(
|
||||
day DATE NOT NULL,
|
||||
client_address TEXT NOT NULL,
|
||||
PRIMARY KEY (day, client_address)
|
||||
);
|
||||
@@ -0,0 +1,13 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum StatsStorageError {
|
||||
#[error("Database experienced an internal error: {0}")]
|
||||
InternalDatabaseError(#[from] sqlx::Error),
|
||||
|
||||
#[error("Failed to perform database migration: {0}")]
|
||||
MigrationError(#[from] sqlx::migrate::MigrateError),
|
||||
}
|
||||
@@ -0,0 +1,195 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use error::StatsStorageError;
|
||||
use models::{ActiveSession, FinishedSession, SessionType, StoredFinishedSession};
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use sessions::SessionManager;
|
||||
use sqlx::ConnectOptions;
|
||||
use std::path::Path;
|
||||
use time::Date;
|
||||
use tracing::{debug, error};
|
||||
|
||||
pub mod error;
|
||||
pub mod models;
|
||||
mod sessions;
|
||||
|
||||
// note that clone here is fine as upon cloning the same underlying pool will be used
|
||||
#[derive(Clone)]
|
||||
pub struct PersistentStatsStorage {
|
||||
session_manager: SessionManager,
|
||||
}
|
||||
|
||||
impl PersistentStatsStorage {
|
||||
/// Initialises `PersistentStatsStorage` using the provided path.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `database_path`: path to the database.
|
||||
pub async fn init<P: AsRef<Path> + Send>(database_path: P) -> Result<Self, StatsStorageError> {
|
||||
debug!(
|
||||
"Attempting to connect to database {:?}",
|
||||
database_path.as_ref().as_os_str()
|
||||
);
|
||||
|
||||
// TODO: we can inject here more stuff based on our gateway global config
|
||||
// struct. Maybe different pool size or timeout intervals?
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.filename(database_path)
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
|
||||
// TODO: do we want auto_vacuum ?
|
||||
|
||||
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
|
||||
Ok(db) => db,
|
||||
Err(err) => {
|
||||
error!("Failed to connect to SQLx database: {err}");
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
|
||||
error!("Failed to perform migration on the SQLx database: {err}");
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
// the cloning here are cheap as connection pool is stored behind an Arc
|
||||
Ok(PersistentStatsStorage {
|
||||
session_manager: sessions::SessionManager::new(connection_pool),
|
||||
})
|
||||
}
|
||||
|
||||
//Sessions fn
|
||||
pub async fn insert_finished_session(
|
||||
&self,
|
||||
date: Date,
|
||||
session: FinishedSession,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.insert_finished_session(
|
||||
date,
|
||||
session.duration.whole_milliseconds() as i64,
|
||||
session.typ.to_string().into(),
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_finished_sessions(
|
||||
&self,
|
||||
date: Date,
|
||||
) -> Result<Vec<StoredFinishedSession>, StatsStorageError> {
|
||||
Ok(self.session_manager.get_finished_sessions(date).await?)
|
||||
}
|
||||
|
||||
pub async fn delete_finished_sessions(
|
||||
&self,
|
||||
before_date: Date,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.delete_finished_sessions(before_date)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn insert_unique_user(
|
||||
&self,
|
||||
date: Date,
|
||||
client_address_bs58: String,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.insert_unique_user(date, client_address_bs58)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_unique_users_count(&self, date: Date) -> Result<i32, StatsStorageError> {
|
||||
Ok(self.session_manager.get_unique_users_count(date).await?)
|
||||
}
|
||||
|
||||
pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.delete_unique_users(before_date)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn insert_active_session(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
session: ActiveSession,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.insert_active_session(
|
||||
client_address.as_base58_string(),
|
||||
session.start,
|
||||
session.typ.to_string().into(),
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn update_active_session_type(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
session_type: SessionType,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.update_active_session_type(
|
||||
client_address.as_base58_string(),
|
||||
session_type.to_string().into(),
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_active_session(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
) -> Result<Option<ActiveSession>, StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.get_active_session(client_address.as_base58_string())
|
||||
.await?
|
||||
.map(Into::into))
|
||||
}
|
||||
|
||||
pub async fn get_all_active_sessions(&self) -> Result<Vec<ActiveSession>, StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.get_all_active_sessions()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn get_started_sessions_count(
|
||||
&self,
|
||||
start_date: Date,
|
||||
) -> Result<i32, StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.get_started_sessions_count(start_date)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_active_users(&self) -> Result<Vec<String>, StatsStorageError> {
|
||||
Ok(self.session_manager.get_active_users().await?)
|
||||
}
|
||||
|
||||
pub async fn delete_active_session(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.delete_active_session(client_address.as_base58_string())
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn cleanup_active_sessions(&self) -> Result<(), StatsStorageError> {
|
||||
Ok(self.session_manager.cleanup_active_sessions().await?)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_credentials_interface::TicketType;
|
||||
use sqlx::prelude::FromRow;
|
||||
use time::{Duration, OffsetDateTime};
|
||||
|
||||
#[derive(FromRow)]
|
||||
pub struct StoredFinishedSession {
|
||||
duration_ms: i64,
|
||||
typ: String,
|
||||
}
|
||||
|
||||
impl StoredFinishedSession {
|
||||
pub fn serialize(&self) -> (u64, String) {
|
||||
(
|
||||
self.duration_ms as u64, //we are sure that it fits in a u64, see `fn end_at`
|
||||
self.typ.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FinishedSession {
|
||||
pub duration: Duration,
|
||||
pub typ: SessionType,
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
pub enum SessionType {
|
||||
Vpn,
|
||||
Mixnet,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl SessionType {
|
||||
pub fn to_string(&self) -> &str {
|
||||
match self {
|
||||
Self::Vpn => "vpn",
|
||||
Self::Mixnet => "mixnet",
|
||||
Self::Unknown => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_string(s: &str) -> Self {
|
||||
match s {
|
||||
"vpn" => Self::Vpn,
|
||||
"mixnet" => Self::Mixnet,
|
||||
_ => Self::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TicketType> for SessionType {
|
||||
fn from(value: TicketType) -> Self {
|
||||
match value {
|
||||
TicketType::V1MixnetEntry => Self::Mixnet,
|
||||
TicketType::V1MixnetExit => Self::Mixnet,
|
||||
TicketType::V1WireguardEntry => Self::Vpn,
|
||||
TicketType::V1WireguardExit => Self::Vpn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromRow)]
|
||||
pub(crate) struct StoredActiveSession {
|
||||
start_time: OffsetDateTime,
|
||||
typ: String,
|
||||
}
|
||||
|
||||
pub struct ActiveSession {
|
||||
pub start: OffsetDateTime,
|
||||
pub typ: SessionType,
|
||||
}
|
||||
|
||||
impl ActiveSession {
|
||||
pub fn new(start_time: OffsetDateTime) -> Self {
|
||||
ActiveSession {
|
||||
start: start_time,
|
||||
typ: SessionType::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_type(&mut self, ticket_type: TicketType) {
|
||||
self.typ = ticket_type.into();
|
||||
}
|
||||
|
||||
pub fn end_at(self, stop_time: OffsetDateTime) -> Option<FinishedSession> {
|
||||
let session_duration = stop_time - self.start;
|
||||
//ensure duration is positive to fit in a u64
|
||||
//u64::max milliseconds is 500k millenia so no overflow issue
|
||||
if session_duration > Duration::ZERO {
|
||||
Some(FinishedSession {
|
||||
duration: session_duration,
|
||||
typ: self.typ,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StoredActiveSession> for ActiveSession {
|
||||
fn from(value: StoredActiveSession) -> Self {
|
||||
ActiveSession {
|
||||
start: value.start_time,
|
||||
typ: SessionType::from_string(&value.typ),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use time::{Date, OffsetDateTime};
|
||||
|
||||
use crate::models::{StoredActiveSession, StoredFinishedSession};
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, sqlx::Error>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SessionManager {
|
||||
connection_pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
impl SessionManager {
|
||||
/// Creates new instance of the `SessionsManager` with the provided sqlite connection pool.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `connection_pool`: database connection pool to use.
|
||||
pub(crate) fn new(connection_pool: sqlx::SqlitePool) -> Self {
|
||||
SessionManager { connection_pool }
|
||||
}
|
||||
|
||||
pub(crate) async fn insert_finished_session(
|
||||
&self,
|
||||
date: Date,
|
||||
duration_ms: i64,
|
||||
typ: String,
|
||||
) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"INSERT INTO sessions_finished (day, duration_ms, typ) VALUES (?, ?, ?)",
|
||||
date,
|
||||
duration_ms,
|
||||
typ
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_finished_sessions(
|
||||
&self,
|
||||
date: Date,
|
||||
) -> Result<Vec<StoredFinishedSession>> {
|
||||
sqlx::query_as("SELECT duration_ms, typ FROM sessions_finished WHERE day = ?")
|
||||
.bind(date)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_finished_sessions(&self, before_date: Date) -> Result<()> {
|
||||
sqlx::query!("DELETE FROM sessions_finished WHERE day <= ? ", before_date)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn insert_unique_user(
|
||||
&self,
|
||||
date: Date,
|
||||
client_address_b58: String,
|
||||
) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"INSERT OR IGNORE INTO sessions_unique_users (day, client_address) VALUES (?, ?)",
|
||||
date,
|
||||
client_address_b58,
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_unique_users_count(&self, date: Date) -> Result<i32> {
|
||||
Ok(sqlx::query!(
|
||||
"SELECT COUNT(*) as count FROM sessions_unique_users WHERE day = ?",
|
||||
date
|
||||
)
|
||||
.fetch_one(&self.connection_pool)
|
||||
.await?
|
||||
.count)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_unique_users(&self, before_date: Date) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"DELETE FROM sessions_unique_users WHERE day <= ? ",
|
||||
before_date
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn insert_active_session(
|
||||
&self,
|
||||
client_address_b58: String,
|
||||
start_time: OffsetDateTime,
|
||||
typ: String,
|
||||
) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"INSERT INTO sessions_active (client_address, start_time, typ) VALUES (?, ?, ?)",
|
||||
client_address_b58,
|
||||
start_time,
|
||||
typ
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn update_active_session_type(
|
||||
&self,
|
||||
client_address_b58: String,
|
||||
typ: String,
|
||||
) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"UPDATE sessions_active SET typ = ? WHERE client_address = ?",
|
||||
typ,
|
||||
client_address_b58,
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_active_session(
|
||||
&self,
|
||||
client_address_b58: String,
|
||||
) -> Result<Option<StoredActiveSession>> {
|
||||
sqlx::query_as("SELECT start_time, typ FROM sessions_active WHERE client_address = ?")
|
||||
.bind(client_address_b58)
|
||||
.fetch_optional(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_active_sessions(&self) -> Result<Vec<StoredActiveSession>> {
|
||||
sqlx::query_as("SELECT start_time, typ FROM sessions_active")
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_started_sessions_count(&self, start_date: Date) -> Result<i32> {
|
||||
Ok(sqlx::query!(
|
||||
"SELECT COUNT(*) as count FROM sessions_active WHERE date(start_time) = ?",
|
||||
start_date
|
||||
)
|
||||
.fetch_one(&self.connection_pool)
|
||||
.await?
|
||||
.count)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_active_users(&self) -> Result<Vec<String>> {
|
||||
Ok(sqlx::query!("SELECT client_address from sessions_active")
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|record| record.client_address)
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_active_session(&self, client_address_b58: String) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"DELETE FROM sessions_active WHERE client_address = ?",
|
||||
client_address_b58
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn cleanup_active_sessions(&self) -> Result<()> {
|
||||
sqlx::query!("DELETE FROM sessions_active")
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Display;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tracing::warn;
|
||||
use tracing::{instrument, warn};
|
||||
use url::Url;
|
||||
|
||||
pub use reqwest::IntoUrl;
|
||||
@@ -202,6 +202,7 @@ impl Client {
|
||||
self.reqwest_client.get(url)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, fields(path=?path))]
|
||||
async fn send_get_request<K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
@@ -212,6 +213,7 @@ impl Client {
|
||||
V: AsRef<str>,
|
||||
E: Display,
|
||||
{
|
||||
tracing::trace!("Sending GET request");
|
||||
let url = sanitize_url(&self.base_url, path, params);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
@@ -277,6 +279,7 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub async fn get_json<T, K, V, E>(
|
||||
&self,
|
||||
path: PathSegments<'_>,
|
||||
@@ -512,12 +515,14 @@ pub fn sanitize_url<K: AsRef<str>, V: AsRef<str>>(
|
||||
url
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
pub async fn parse_response<T, E>(res: Response, allow_empty: bool) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
E: DeserializeOwned + Display,
|
||||
{
|
||||
let status = res.status();
|
||||
tracing::debug!("Status: {} (success: {})", &status, status.is_success());
|
||||
|
||||
if !allow_empty {
|
||||
if let Some(0) = res.content_length() {
|
||||
@@ -526,6 +531,18 @@ where
|
||||
}
|
||||
|
||||
if res.status().is_success() {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let text = res.text().await.inspect_err(|err| {
|
||||
tracing::error!("Couldn't even get response text: {err}");
|
||||
})?;
|
||||
tracing::trace!("Result:\n{:#?}", text);
|
||||
|
||||
serde_json::from_str(&text)
|
||||
.map_err(|err| HttpClientError::GenericRequestFailure(err.to_string()))
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
Ok(res.json().await?)
|
||||
} else if res.status() == StatusCode::NOT_FOUND {
|
||||
Err(HttpClientError::NotFound)
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
[package]
|
||||
name = "nym-common-models"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
rust-version.workspace = true
|
||||
readme.workspace = true
|
||||
|
||||
[dependencies]
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
@@ -0,0 +1 @@
|
||||
pub mod ns_api;
|
||||
@@ -0,0 +1,8 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct TestrunAssignment {
|
||||
/// has nothing to do with GW identity key. This is PK from `gateways` table
|
||||
pub testrun_id: i64,
|
||||
pub gateway_pk_id: i64,
|
||||
}
|
||||
@@ -42,8 +42,32 @@ impl PendingSync {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockProcessorConfig {
|
||||
pub pruning_options: PruningOptions,
|
||||
pub store_precommits: bool,
|
||||
}
|
||||
|
||||
impl Default for BlockProcessorConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
pruning_options: PruningOptions::nothing(),
|
||||
store_precommits: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockProcessorConfig {
|
||||
pub fn new(pruning_options: PruningOptions, store_precommits: bool) -> Self {
|
||||
Self {
|
||||
pruning_options,
|
||||
store_precommits,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BlockProcessor {
|
||||
pruning_options: PruningOptions,
|
||||
config: BlockProcessorConfig,
|
||||
cancel: CancellationToken,
|
||||
synced: Arc<Notify>,
|
||||
last_processed_height: u32,
|
||||
@@ -65,9 +89,10 @@ pub struct BlockProcessor {
|
||||
msg_modules: Vec<Box<dyn MsgModule + Send>>,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
impl BlockProcessor {
|
||||
pub async fn new(
|
||||
pruning_options: PruningOptions,
|
||||
config: BlockProcessorConfig,
|
||||
cancel: CancellationToken,
|
||||
synced: Arc<Notify>,
|
||||
incoming: UnboundedReceiver<BlockToProcess>,
|
||||
@@ -82,7 +107,7 @@ impl BlockProcessor {
|
||||
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
|
||||
|
||||
Ok(BlockProcessor {
|
||||
pruning_options,
|
||||
config,
|
||||
cancel,
|
||||
synced,
|
||||
last_processed_height,
|
||||
@@ -101,7 +126,7 @@ impl BlockProcessor {
|
||||
}
|
||||
|
||||
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
|
||||
self.pruning_options = pruning_options;
|
||||
self.config.pruning_options = pruning_options;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -128,7 +153,7 @@ impl BlockProcessor {
|
||||
// we won't end up with a corrupted storage.
|
||||
let mut tx = self.storage.begin_processing_tx().await?;
|
||||
|
||||
persist_block(&full_info, &mut tx).await?;
|
||||
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
|
||||
|
||||
// let the modules do whatever they want
|
||||
// the ones wanting the full block:
|
||||
@@ -241,7 +266,7 @@ impl BlockProcessor {
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
||||
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||
let last_to_keep = self.last_processed_height - keep_recent;
|
||||
|
||||
info!(
|
||||
@@ -282,12 +307,12 @@ impl BlockProcessor {
|
||||
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||
debug!("checking for storage pruning");
|
||||
|
||||
if self.pruning_options.strategy.is_nothing() {
|
||||
if self.config.pruning_options.strategy.is_nothing() {
|
||||
trace!("the current pruning strategy is 'nothing'");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let interval = self.pruning_options.strategy_interval();
|
||||
let interval = self.config.pruning_options.strategy_interval();
|
||||
if self.last_pruned_height + interval <= self.last_processed_height {
|
||||
self.prune_storage().await?;
|
||||
}
|
||||
@@ -371,7 +396,7 @@ impl BlockProcessor {
|
||||
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
|
||||
// in case we were offline for a while,
|
||||
// make sure we don't request blocks we'd have to prune anyway
|
||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
||||
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||
let last_to_keep = latest_block - keep_recent;
|
||||
self.last_processed_height = max(self.last_processed_height, last_to_keep);
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::block_processor::types::BlockToProcess;
|
||||
use crate::block_processor::BlockProcessor;
|
||||
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
|
||||
use crate::block_requester::{BlockRequest, BlockRequester};
|
||||
use crate::error::ScraperError;
|
||||
use crate::modules::{BlockModule, MsgModule, TxModule};
|
||||
@@ -34,6 +34,8 @@ pub struct Config {
|
||||
pub database_path: PathBuf,
|
||||
|
||||
pub pruning_options: PruningOptions,
|
||||
|
||||
pub store_precommits: bool,
|
||||
}
|
||||
|
||||
pub struct NyxdScraperBuilder {
|
||||
@@ -60,8 +62,14 @@ impl NyxdScraperBuilder {
|
||||
req_rx,
|
||||
processing_tx.clone(),
|
||||
);
|
||||
let mut block_processor = BlockProcessor::new(
|
||||
|
||||
let block_processor_config = BlockProcessorConfig::new(
|
||||
scraper.config.pruning_options,
|
||||
scraper.config.store_precommits,
|
||||
);
|
||||
|
||||
let mut block_processor = BlockProcessor::new(
|
||||
block_processor_config,
|
||||
scraper.cancel_token.clone(),
|
||||
scraper.startup_sync.clone(),
|
||||
processing_rx,
|
||||
@@ -275,8 +283,11 @@ impl NyxdScraper {
|
||||
req_tx: Sender<BlockRequest>,
|
||||
processing_rx: UnboundedReceiver<BlockToProcess>,
|
||||
) -> Result<BlockProcessor, ScraperError> {
|
||||
let block_processor_config =
|
||||
BlockProcessorConfig::new(self.config.pruning_options, self.config.store_precommits);
|
||||
|
||||
BlockProcessor::new(
|
||||
self.config.pruning_options,
|
||||
block_processor_config,
|
||||
self.cancel_token.clone(),
|
||||
self.startup_sync.clone(),
|
||||
processing_rx,
|
||||
|
||||
@@ -212,6 +212,7 @@ impl ScraperStorage {
|
||||
pub async fn persist_block(
|
||||
block: &FullBlockInformation,
|
||||
tx: &mut StorageTransaction,
|
||||
store_precommits: bool,
|
||||
) -> Result<(), ScraperError> {
|
||||
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
|
||||
|
||||
@@ -224,11 +225,12 @@ pub async fn persist_block(
|
||||
// persist block data
|
||||
persist_block_data(&block.block, total_gas, tx).await?;
|
||||
|
||||
// persist commits
|
||||
if let Some(commit) = &block.block.last_commit {
|
||||
persist_commits(commit, &block.validators, tx).await?;
|
||||
} else {
|
||||
warn!("no commits for block {}", block.block.header.height)
|
||||
if store_precommits {
|
||||
if let Some(commit) = &block.block.last_commit {
|
||||
persist_commits(commit, &block.validators, tx).await?;
|
||||
} else {
|
||||
warn!("no commits for block {}", block.block.header.height)
|
||||
}
|
||||
}
|
||||
|
||||
// persist txs
|
||||
|
||||
+2
-2
@@ -19,5 +19,5 @@ MULTISIG_CONTRACT_ADDRESS=n1zwv6feuzhy6a9wekh96cd57lsarmqlwxdypdsplw6zhfncqw6ftq
|
||||
COCONUT_DKG_CONTRACT_ADDRESS=n1aakfpghcanxtc45gpqlx8j3rq0zcpyf49qmhm9mdjrfx036h4z5sy2vfh9
|
||||
|
||||
EXPLORER_API=https://canary-explorer.performance.nymte.ch/api
|
||||
NYXD="https://canary-validator.performance.nymte.ch"
|
||||
NYM_API="https://canary-api.performance.nymte.ch/api"
|
||||
NYXD=https://canary-validator.performance.nymte.ch
|
||||
NYM_API=https://canary-api.performance.nymte.ch/api
|
||||
|
||||
+3
-3
@@ -21,8 +21,8 @@ COCONUT_DKG_CONTRACT_ADDRESS=n19604yflqggs9mk2z26mqygq43q2kr3n932egxx630svywd5mp
|
||||
|
||||
REWARDING_VALIDATOR_ADDRESS=n10yyd98e2tuwu0f7ypz9dy3hhjw7v772q6287gy
|
||||
STATISTICS_SERVICE_DOMAIN_ADDRESS="https://mainnet-stats.nymte.ch:8090"
|
||||
NYXD="https://rpc.nymtech.net"
|
||||
NYM_API="https://validator.nymtech.net/api/"
|
||||
NYXD=https://rpc.nymtech.net
|
||||
NYM_API=https://validator.nymtech.net/api/
|
||||
NYXD_WS="wss://rpc.nymtech.net/websocket"
|
||||
EXPLORER_API="https://explorer.nymtech.net/api/"
|
||||
EXPLORER_API=https://explorer.nymtech.net/api/
|
||||
NYM_VPN_API="https://nymvpn.com/api"
|
||||
|
||||
+2
-2
@@ -19,5 +19,5 @@ VESTING_CONTRACT_ADDRESS=n1jlzdxnyces4hrhqz68dqk28mrw5jgwtcfq0c2funcwrmw0dx9l9s8
|
||||
REWARDING_VALIDATOR_ADDRESS=n1rfvpsynktze6wvn6ldskj8xgwfzzk5v6pnff39
|
||||
|
||||
EXPLORER_API=https://qa-network-explorer.qa.nymte.ch/api
|
||||
NYXD="https://qa-validator.qa.nymte.ch"
|
||||
NYM_API="https://qa-nym-api.qa.nymte.ch/api"
|
||||
NYXD=https://qa-validator.qa.nymte.ch
|
||||
NYM_API=https://qa-nym-api.qa.nymte.ch/api
|
||||
|
||||
+3
-3
@@ -20,6 +20,6 @@ ECASH_CONTRACT_ADDRESS=n1v3vydvs2ued84yv3khqwtgldmgwn0elljsdh08dr5s2j9x4rc5fs9jl
|
||||
|
||||
STATISTICS_SERVICE_DOMAIN_ADDRESS="http://0.0.0.0"
|
||||
EXPLORER_API=https://sandbox-explorer.nymtech.net/api
|
||||
NYXD="https://rpc.sandbox.nymtech.net"
|
||||
NYXD_WS="wss://rpc.sandbox.nymtech.net/websocket"
|
||||
NYM_API="https://sandbox-nym-api1.nymtech.net/api"
|
||||
NYXD=https://rpc.sandbox.nymtech.net
|
||||
NYXD_WS=wss://rpc.sandbox.nymtech.net/websocket
|
||||
NYM_API=https://sandbox-nym-api1.nymtech.net/api
|
||||
|
||||
@@ -7,12 +7,12 @@ license.workspace = true
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
log.workspace = true
|
||||
nym-explorer-api-requests = { path = "../explorer-api-requests" }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
url.workspace = true
|
||||
tracing = {workspace = true, features = ["attributes"]}
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::time::Duration;
|
||||
|
||||
use reqwest::StatusCode;
|
||||
use thiserror::Error;
|
||||
use tracing::instrument;
|
||||
use url::Url;
|
||||
|
||||
// Re-export request types
|
||||
@@ -47,6 +48,12 @@ impl ExplorerClient {
|
||||
Ok(Self { client, url })
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn new_with_timeout(url: url::Url, timeout: Duration) -> Result<Self, ExplorerApiError> {
|
||||
let client = reqwest::Client::builder().timeout(timeout).build()?;
|
||||
Ok(Self { client, url })
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub fn new(url: url::Url) -> Result<Self, ExplorerApiError> {
|
||||
let client = reqwest::Client::builder().build()?;
|
||||
@@ -58,10 +65,11 @@ impl ExplorerClient {
|
||||
paths: &[&str],
|
||||
) -> Result<reqwest::Response, ExplorerApiError> {
|
||||
let url = combine_url(self.url.clone(), paths)?;
|
||||
log::trace!("Sending GET request {url:?}");
|
||||
tracing::debug!("Sending GET request");
|
||||
Ok(self.client.get(url).send().await?)
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all, fields(paths=?paths))]
|
||||
async fn query_explorer_api<T>(&self, paths: &[&str]) -> Result<T, ExplorerApiError>
|
||||
where
|
||||
T: std::fmt::Debug,
|
||||
@@ -70,12 +78,14 @@ impl ExplorerClient {
|
||||
let response = self.send_get_request(paths).await?;
|
||||
if response.status().is_success() {
|
||||
let res = response.json::<T>().await?;
|
||||
log::trace!("Got response: {res:?}");
|
||||
tracing::trace!("Got response: {res:?}");
|
||||
Ok(res)
|
||||
} else if response.status() == StatusCode::NOT_FOUND {
|
||||
Err(ExplorerApiError::NotFound)
|
||||
} else {
|
||||
Err(ExplorerApiError::RequestFailure(response.text().await?))
|
||||
let status = response.status();
|
||||
let err_msg = format!("{}: {}", response.text().await?, status);
|
||||
Err(ExplorerApiError::RequestFailure(err_msg))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -69,6 +69,7 @@ nym-credentials-interface = { path = "../common/credentials-interface" }
|
||||
nym-credential-verification = { path = "../common/credential-verification" }
|
||||
nym-crypto = { path = "../common/crypto" }
|
||||
nym-gateway-storage = { path = "../common/gateway-storage" }
|
||||
nym-gateway-stats-storage = { path = "../common/gateway-stats-storage" }
|
||||
nym-gateway-requests = { path = "../common/gateway-requests" }
|
||||
nym-mixnet-client = { path = "../common/client-libs/mixnet-client" }
|
||||
nym-mixnode-common = { path = "../common/mixnode-common" }
|
||||
|
||||
@@ -12,6 +12,7 @@ pub const DEFAULT_PRIVATE_SPHINX_KEY_FILENAME: &str = "private_sphinx.pem";
|
||||
pub const DEFAULT_PUBLIC_SPHINX_KEY_FILENAME: &str = "public_sphinx.pem";
|
||||
|
||||
pub const DEFAULT_CLIENTS_STORAGE_FILENAME: &str = "db.sqlite";
|
||||
pub const DEFAULT_STATS_STORAGE_FILENAME: &str = "stats.sqlite";
|
||||
|
||||
pub const DEFAULT_NETWORK_REQUESTER_CONFIG_FILENAME: &str = "network_requester_config.toml";
|
||||
pub const DEFAULT_NETWORK_REQUESTER_DATA_DIR: &str = "network-requester-data";
|
||||
@@ -39,6 +40,9 @@ pub struct GatewayPaths {
|
||||
#[serde(alias = "persistent_storage")]
|
||||
pub clients_storage: PathBuf,
|
||||
|
||||
/// Path to sqlite database containing all persistent stats data.
|
||||
pub stats_storage: PathBuf,
|
||||
|
||||
/// Path to the configuration of the embedded network requester.
|
||||
#[serde(deserialize_with = "de_maybe_stringified")]
|
||||
pub network_requester_config: Option<PathBuf>,
|
||||
@@ -54,7 +58,9 @@ impl GatewayPaths {
|
||||
pub fn new_default<P: AsRef<Path>>(id: P) -> Self {
|
||||
GatewayPaths {
|
||||
keys: KeysPaths::new_default(id.as_ref()),
|
||||
clients_storage: default_data_directory(id).join(DEFAULT_CLIENTS_STORAGE_FILENAME),
|
||||
clients_storage: default_data_directory(id.as_ref())
|
||||
.join(DEFAULT_CLIENTS_STORAGE_FILENAME),
|
||||
stats_storage: default_data_directory(id).join(DEFAULT_STATS_STORAGE_FILENAME),
|
||||
// node_description: default_config_filepath(id).join(DEFAULT_DESCRIPTION_FILENAME),
|
||||
network_requester_config: None,
|
||||
ip_packet_router_config: None,
|
||||
@@ -70,6 +76,7 @@ impl GatewayPaths {
|
||||
public_sphinx_key_file: Default::default(),
|
||||
},
|
||||
clients_storage: Default::default(),
|
||||
stats_storage: Default::default(),
|
||||
network_requester_config: None,
|
||||
ip_packet_router_config: None,
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_authenticator::error::AuthenticatorError;
|
||||
use nym_gateway_stats_storage::error::StatsStorageError;
|
||||
use nym_gateway_storage::error::StorageError;
|
||||
use nym_ip_packet_router::error::IpPacketRouterError;
|
||||
use nym_network_requester::error::{ClientCoreError, NetworkRequesterError};
|
||||
@@ -115,6 +116,12 @@ pub enum GatewayError {
|
||||
source: StorageError,
|
||||
},
|
||||
|
||||
#[error("stats storage failure: {source}")]
|
||||
StatsStorageError {
|
||||
#[from]
|
||||
source: StatsStorageError,
|
||||
},
|
||||
|
||||
#[error("Path to network requester configuration file hasn't been specified. Perhaps try to run `setup-network-requester`?")]
|
||||
UnspecifiedNetworkRequesterConfig,
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::config::Config;
|
||||
use crate::error::GatewayError;
|
||||
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_gateway_stats_storage::PersistentStatsStorage;
|
||||
use nym_gateway_storage::PersistentStorage;
|
||||
use nym_pemstore::traits::PemStorableKeyPair;
|
||||
use nym_pemstore::KeyPairPath;
|
||||
@@ -74,6 +75,14 @@ pub(crate) async fn initialise_main_storage(
|
||||
Ok(PersistentStorage::init(path, retrieval_limit).await?)
|
||||
}
|
||||
|
||||
pub(crate) async fn initialise_stats_storage(
|
||||
config: &Config,
|
||||
) -> Result<PersistentStatsStorage, GatewayError> {
|
||||
let path = &config.storage_paths.stats_storage;
|
||||
|
||||
Ok(PersistentStatsStorage::init(path).await?)
|
||||
}
|
||||
|
||||
pub fn load_keypair<T: PemStorableKeyPair>(
|
||||
paths: KeyPairPath,
|
||||
name: impl Into<String>,
|
||||
|
||||
+25
-14
@@ -12,7 +12,9 @@ use crate::http::HttpApiBuilder;
|
||||
use crate::node::client_handling::active_clients::ActiveClientsStore;
|
||||
use crate::node::client_handling::embedded_clients::{LocalEmbeddedClientHandle, MessageRouter};
|
||||
use crate::node::client_handling::websocket;
|
||||
use crate::node::helpers::{initialise_main_storage, load_network_requester_config};
|
||||
use crate::node::helpers::{
|
||||
initialise_main_storage, initialise_stats_storage, load_network_requester_config,
|
||||
};
|
||||
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use nym_credential_verification::ecash::{
|
||||
@@ -41,6 +43,7 @@ pub(crate) mod helpers;
|
||||
pub(crate) mod mixnet_handling;
|
||||
pub(crate) mod statistics;
|
||||
|
||||
pub use nym_gateway_stats_storage::PersistentStatsStorage;
|
||||
pub use nym_gateway_storage::{PersistentStorage, Storage};
|
||||
|
||||
// TODO: should this struct live here?
|
||||
@@ -96,6 +99,8 @@ pub async fn create_gateway(
|
||||
|
||||
let storage = initialise_main_storage(&config).await?;
|
||||
|
||||
let stats_storage = initialise_stats_storage(&config).await?;
|
||||
|
||||
let nr_opts = network_requester_config.map(|config| LocalNetworkRequesterOpts {
|
||||
config: config.clone(),
|
||||
custom_mixnet_path: custom_mixnet.clone(),
|
||||
@@ -106,7 +111,7 @@ pub async fn create_gateway(
|
||||
custom_mixnet_path: custom_mixnet.clone(),
|
||||
});
|
||||
|
||||
Gateway::new(config, nr_opts, ip_opts, storage)
|
||||
Gateway::new(config, nr_opts, ip_opts, storage, stats_storage)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -147,7 +152,9 @@ pub struct Gateway<St = PersistentStorage> {
|
||||
/// x25519 keypair used for Diffie-Hellman. Currently only used for sphinx key derivation.
|
||||
sphinx_keypair: Arc<encryption::KeyPair>,
|
||||
|
||||
storage: St,
|
||||
client_storage: St,
|
||||
|
||||
stats_storage: PersistentStatsStorage,
|
||||
|
||||
wireguard_data: Option<nym_wireguard::WireguardData>,
|
||||
|
||||
@@ -163,10 +170,12 @@ impl<St> Gateway<St> {
|
||||
config: Config,
|
||||
network_requester_opts: Option<LocalNetworkRequesterOpts>,
|
||||
ip_packet_router_opts: Option<LocalIpPacketRouterOpts>,
|
||||
storage: St,
|
||||
client_storage: St,
|
||||
stats_storage: PersistentStatsStorage,
|
||||
) -> Result<Self, GatewayError> {
|
||||
Ok(Gateway {
|
||||
storage,
|
||||
client_storage,
|
||||
stats_storage,
|
||||
identity_keypair: Arc::new(load_identity_keys(&config)?),
|
||||
sphinx_keypair: Arc::new(helpers::load_sphinx_keys(&config)?),
|
||||
config,
|
||||
@@ -179,7 +188,7 @@ impl<St> Gateway<St> {
|
||||
task_client: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_loaded(
|
||||
config: Config,
|
||||
network_requester_opts: Option<LocalNetworkRequesterOpts>,
|
||||
@@ -187,7 +196,8 @@ impl<St> Gateway<St> {
|
||||
authenticator_opts: Option<LocalAuthenticatorOpts>,
|
||||
identity_keypair: Arc<identity::KeyPair>,
|
||||
sphinx_keypair: Arc<encryption::KeyPair>,
|
||||
storage: St,
|
||||
client_storage: St,
|
||||
stats_storage: PersistentStatsStorage,
|
||||
) -> Self {
|
||||
Gateway {
|
||||
config,
|
||||
@@ -196,7 +206,8 @@ impl<St> Gateway<St> {
|
||||
authenticator_opts,
|
||||
identity_keypair,
|
||||
sphinx_keypair,
|
||||
storage,
|
||||
client_storage,
|
||||
stats_storage,
|
||||
wireguard_data: None,
|
||||
session_stats: None,
|
||||
run_http_server: true,
|
||||
@@ -240,7 +251,7 @@ impl<St> Gateway<St> {
|
||||
|
||||
let connection_handler = ConnectionHandler::new(
|
||||
packet_processor,
|
||||
self.storage.clone(),
|
||||
self.client_storage.clone(),
|
||||
ack_sender,
|
||||
active_clients_store,
|
||||
);
|
||||
@@ -275,7 +286,7 @@ impl<St> Gateway<St> {
|
||||
forwarding_channel,
|
||||
router_tx,
|
||||
);
|
||||
let all_peers = self.storage.get_all_wireguard_peers().await?;
|
||||
let all_peers = self.client_storage.get_all_wireguard_peers().await?;
|
||||
let used_private_network_ips = all_peers
|
||||
.iter()
|
||||
.cloned()
|
||||
@@ -330,7 +341,7 @@ impl<St> Gateway<St> {
|
||||
.start_with_shutdown(router_shutdown);
|
||||
|
||||
let wg_api = nym_wireguard::start_wireguard(
|
||||
self.storage.clone(),
|
||||
self.client_storage.clone(),
|
||||
all_peers,
|
||||
shutdown,
|
||||
wireguard_data,
|
||||
@@ -377,7 +388,7 @@ impl<St> Gateway<St> {
|
||||
|
||||
let shared_state = websocket::CommonHandlerState {
|
||||
ecash_verifier,
|
||||
storage: self.storage.clone(),
|
||||
storage: self.client_storage.clone(),
|
||||
local_identity: Arc::clone(&self.identity_keypair),
|
||||
only_coconut_credentials: self.config.gateway.only_coconut_credentials,
|
||||
bandwidth_cfg: (&self.config).into(),
|
||||
@@ -415,7 +426,7 @@ impl<St> Gateway<St> {
|
||||
info!("Starting gateway stats collector...");
|
||||
|
||||
let (mut stats_collector, stats_event_sender) =
|
||||
GatewayStatisticsCollector::new(shared_session_stats);
|
||||
GatewayStatisticsCollector::new(shared_session_stats, self.stats_storage.clone());
|
||||
tokio::spawn(async move { stats_collector.run(shutdown).await });
|
||||
stats_event_sender
|
||||
}
|
||||
@@ -654,7 +665,7 @@ impl<St> Gateway<St> {
|
||||
nyxd_client,
|
||||
self.identity_keypair.public_key().to_bytes(),
|
||||
shutdown.fork("EcashVerifier"),
|
||||
self.storage.clone(),
|
||||
self.client_storage.clone(),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
@@ -2,13 +2,14 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use futures::{channel::mpsc, StreamExt};
|
||||
use nym_gateway_stats_storage::PersistentStatsStorage;
|
||||
use nym_node_http_api::state::metrics::SharedSessionStats;
|
||||
use nym_statistics_common::events::{StatsEvent, StatsEventReceiver, StatsEventSender};
|
||||
use nym_task::TaskClient;
|
||||
use sessions::SessionStatsHandler;
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::trace;
|
||||
use tracing::{error, trace, warn};
|
||||
|
||||
pub mod sessions;
|
||||
|
||||
@@ -23,21 +24,38 @@ pub(crate) struct GatewayStatisticsCollector {
|
||||
impl GatewayStatisticsCollector {
|
||||
pub fn new(
|
||||
shared_session_stats: SharedSessionStats,
|
||||
stats_storage: PersistentStatsStorage,
|
||||
) -> (GatewayStatisticsCollector, StatsEventSender) {
|
||||
let (stats_event_tx, stats_event_rx) = mpsc::unbounded();
|
||||
|
||||
let session_stats = SessionStatsHandler::new(shared_session_stats, stats_storage);
|
||||
let collector = GatewayStatisticsCollector {
|
||||
stats_event_rx,
|
||||
session_stats: SessionStatsHandler::new(shared_session_stats),
|
||||
session_stats,
|
||||
};
|
||||
(collector, stats_event_tx)
|
||||
}
|
||||
|
||||
async fn update_shared_state(&mut self, update_time: OffsetDateTime) {
|
||||
self.session_stats.update_shared_state(update_time).await;
|
||||
if let Err(e) = self
|
||||
.session_stats
|
||||
.maybe_update_shared_state(update_time)
|
||||
.await
|
||||
{
|
||||
error!("Failed to update session stats - {e}");
|
||||
}
|
||||
//here goes additionnal stats handler update
|
||||
}
|
||||
|
||||
async fn on_start(&mut self) {
|
||||
if let Err(e) = self.session_stats.on_start().await {
|
||||
error!("Failed to cleanup session stats handler - {e}");
|
||||
}
|
||||
//here goes additionnal stats handler start cleanup
|
||||
}
|
||||
|
||||
pub async fn run(&mut self, mut shutdown: TaskClient) {
|
||||
self.on_start().await;
|
||||
let mut update_interval = tokio::time::interval(STATISTICS_UPDATE_TIMER_INTERVAL);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
@@ -53,7 +71,10 @@ impl GatewayStatisticsCollector {
|
||||
Some(stat_event) = self.stats_event_rx.next() => {
|
||||
//dispatching event to proper handler
|
||||
match stat_event {
|
||||
StatsEvent::SessionStatsEvent(event) => self.session_stats.handle_event(event),
|
||||
StatsEvent::SessionStatsEvent(event) => {
|
||||
if let Err(e) = self.session_stats.handle_event(event).await{
|
||||
warn!("Session event handling error - {e}");
|
||||
}},
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
@@ -2,176 +2,158 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_credentials_interface::TicketType;
|
||||
use nym_gateway_stats_storage::models::FinishedSession;
|
||||
use nym_gateway_stats_storage::PersistentStatsStorage;
|
||||
use nym_gateway_stats_storage::{error::StatsStorageError, models::ActiveSession};
|
||||
use nym_node_http_api::state::metrics::SharedSessionStats;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use time::{Date, Duration, OffsetDateTime};
|
||||
|
||||
use nym_statistics_common::events::SessionEvent;
|
||||
|
||||
const FINISHED_SESSIONS_CAP: usize = 1_000_000; //to be on the safe side of memory blowups until persistent storage
|
||||
|
||||
#[derive(PartialEq)]
|
||||
enum SessionType {
|
||||
Vpn,
|
||||
Mixnet,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl SessionType {
|
||||
fn to_string(&self) -> &str {
|
||||
match self {
|
||||
Self::Vpn => "vpn",
|
||||
Self::Mixnet => "mixnet",
|
||||
Self::Unknown => "unknown",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TicketType> for SessionType {
|
||||
fn from(value: TicketType) -> Self {
|
||||
match value {
|
||||
TicketType::V1MixnetEntry => Self::Mixnet,
|
||||
TicketType::V1MixnetExit => Self::Mixnet,
|
||||
TicketType::V1WireguardEntry => Self::Vpn,
|
||||
TicketType::V1WireguardExit => Self::Vpn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct FinishedSession {
|
||||
duration: Duration,
|
||||
typ: SessionType,
|
||||
}
|
||||
|
||||
impl FinishedSession {
|
||||
fn serialize(&self) -> (u64, String) {
|
||||
(
|
||||
self.duration.whole_milliseconds() as u64, //we are sure that it fits in a u64, see `fn end_at`
|
||||
self.typ.to_string().into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct ActiveSession {
|
||||
start: OffsetDateTime,
|
||||
typ: SessionType,
|
||||
}
|
||||
|
||||
impl ActiveSession {
|
||||
fn new(start_time: OffsetDateTime) -> Self {
|
||||
ActiveSession {
|
||||
start: start_time,
|
||||
typ: SessionType::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
fn set_type(&mut self, ticket_type: TicketType) {
|
||||
self.typ = ticket_type.into();
|
||||
}
|
||||
|
||||
fn end_at(self, stop_time: OffsetDateTime) -> Option<FinishedSession> {
|
||||
let session_duration = stop_time - self.start;
|
||||
//ensure duration is positive to fit in a u64
|
||||
//u64::max milliseconds is 500k millenia so no overflow issue
|
||||
if session_duration > Duration::ZERO {
|
||||
Some(FinishedSession {
|
||||
duration: session_duration,
|
||||
typ: self.typ,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SessionStatsHandler {
|
||||
last_update_day: Date,
|
||||
storage: PersistentStatsStorage,
|
||||
current_day: Date,
|
||||
|
||||
shared_session_stats: SharedSessionStats,
|
||||
active_sessions: HashMap<DestinationAddressBytes, ActiveSession>,
|
||||
unique_users: HashSet<DestinationAddressBytes>,
|
||||
sessions_started: u32,
|
||||
finished_sessions: Vec<FinishedSession>,
|
||||
}
|
||||
|
||||
impl SessionStatsHandler {
|
||||
pub fn new(shared_session_stats: SharedSessionStats) -> Self {
|
||||
pub fn new(shared_session_stats: SharedSessionStats, storage: PersistentStatsStorage) -> Self {
|
||||
SessionStatsHandler {
|
||||
last_update_day: OffsetDateTime::now_utc().date(),
|
||||
storage,
|
||||
current_day: OffsetDateTime::now_utc().date(),
|
||||
shared_session_stats,
|
||||
active_sessions: Default::default(),
|
||||
unique_users: Default::default(),
|
||||
sessions_started: 0,
|
||||
finished_sessions: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn handle_event(&mut self, event: SessionEvent) {
|
||||
pub(crate) async fn handle_event(
|
||||
&mut self,
|
||||
event: SessionEvent,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
match event {
|
||||
SessionEvent::SessionStart { start_time, client } => {
|
||||
self.handle_session_start(start_time, client);
|
||||
self.handle_session_start(start_time, client).await
|
||||
}
|
||||
|
||||
SessionEvent::SessionStop { stop_time, client } => {
|
||||
self.handle_session_stop(stop_time, client);
|
||||
self.handle_session_stop(stop_time, client).await
|
||||
}
|
||||
|
||||
SessionEvent::EcashTicket {
|
||||
ticket_type,
|
||||
client,
|
||||
} => self.handle_ecash_ticket(ticket_type, client),
|
||||
} => self.handle_ecash_ticket(ticket_type, client).await,
|
||||
}
|
||||
}
|
||||
fn handle_session_start(
|
||||
async fn handle_session_start(
|
||||
&mut self,
|
||||
start_time: OffsetDateTime,
|
||||
client: DestinationAddressBytes,
|
||||
) {
|
||||
self.sessions_started += 1;
|
||||
self.unique_users.insert(client);
|
||||
self.active_sessions
|
||||
.insert(client, ActiveSession::new(start_time));
|
||||
}
|
||||
fn handle_session_stop(&mut self, stop_time: OffsetDateTime, client: DestinationAddressBytes) {
|
||||
if let Some(session) = self.active_sessions.remove(&client) {
|
||||
if let Some(finished_session) = session.end_at(stop_time) {
|
||||
if self.finished_sessions.len() < FINISHED_SESSIONS_CAP {
|
||||
self.finished_sessions.push(finished_session);
|
||||
}
|
||||
}
|
||||
}
|
||||
) -> Result<(), StatsStorageError> {
|
||||
self.storage
|
||||
.insert_unique_user(self.current_day, client.as_base58_string())
|
||||
.await?;
|
||||
self.storage
|
||||
.insert_active_session(client, ActiveSession::new(start_time))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_ecash_ticket(&mut self, ticket_type: TicketType, client: DestinationAddressBytes) {
|
||||
if let Some(active_session) = self.active_sessions.get_mut(&client) {
|
||||
if active_session.typ == SessionType::Unknown {
|
||||
active_session.set_type(ticket_type);
|
||||
async fn handle_session_stop(
|
||||
&mut self,
|
||||
stop_time: OffsetDateTime,
|
||||
client: DestinationAddressBytes,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
if let Some(session) = self.storage.get_active_session(client).await? {
|
||||
if let Some(finished_session) = session.end_at(stop_time) {
|
||||
self.storage
|
||||
.insert_finished_session(self.current_day, finished_session)
|
||||
.await?;
|
||||
self.storage.delete_active_session(client).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_ecash_ticket(
|
||||
&mut self,
|
||||
ticket_type: TicketType,
|
||||
client: DestinationAddressBytes,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
self.storage
|
||||
.update_active_session_type(client, ticket_type.into())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn on_start(&mut self) -> Result<(), StatsStorageError> {
|
||||
let yesterday = OffsetDateTime::now_utc().date() - Duration::DAY;
|
||||
//publish yesterday's data if any
|
||||
self.publish_stats(yesterday).await?;
|
||||
//store "active" sessions as duration 0
|
||||
for active_session in self.storage.get_all_active_sessions().await? {
|
||||
self.storage
|
||||
.insert_finished_session(
|
||||
self.current_day,
|
||||
FinishedSession {
|
||||
duration: Duration::ZERO,
|
||||
typ: active_session.typ,
|
||||
},
|
||||
)
|
||||
.await?
|
||||
}
|
||||
//cleanup active sessions
|
||||
self.storage.cleanup_active_sessions().await?;
|
||||
|
||||
//delete old entries
|
||||
self.delete_old_stats(yesterday - Duration::DAY).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//update shared state once a day has passed, with data from the previous day
|
||||
pub(crate) async fn update_shared_state(&mut self, update_time: OffsetDateTime) {
|
||||
let update_date = update_time.date();
|
||||
if update_date != self.last_update_day {
|
||||
{
|
||||
let mut shared_state = self.shared_session_stats.write().await;
|
||||
shared_state.update_time = self.last_update_day;
|
||||
shared_state.unique_active_users = self.unique_users.len() as u32;
|
||||
shared_state.session_started = self.sessions_started;
|
||||
shared_state.sessions = self
|
||||
.finished_sessions
|
||||
.iter()
|
||||
.map(|s| s.serialize())
|
||||
.collect();
|
||||
}
|
||||
self.reset_stats(update_date);
|
||||
async fn publish_stats(&mut self, stats_date: Date) -> Result<(), StatsStorageError> {
|
||||
let finished_sessions = self.storage.get_finished_sessions(stats_date).await?;
|
||||
let user_count = self.storage.get_unique_users_count(stats_date).await?;
|
||||
let session_started = self.storage.get_started_sessions_count(stats_date).await? as u32;
|
||||
{
|
||||
let mut shared_state = self.shared_session_stats.write().await;
|
||||
shared_state.update_time = stats_date;
|
||||
shared_state.unique_active_users = user_count as u32;
|
||||
shared_state.session_started = session_started;
|
||||
shared_state.sessions = finished_sessions.iter().map(|s| s.serialize()).collect();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) async fn maybe_update_shared_state(
|
||||
&mut self,
|
||||
update_time: OffsetDateTime,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
let update_date = update_time.date();
|
||||
if update_date != self.current_day {
|
||||
self.publish_stats(self.current_day).await?;
|
||||
self.delete_old_stats(self.current_day - Duration::DAY)
|
||||
.await?;
|
||||
self.reset_stats(update_date).await?;
|
||||
self.current_day = update_date;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reset_stats(&mut self, reset_day: Date) {
|
||||
self.last_update_day = reset_day;
|
||||
self.unique_users = self.active_sessions.keys().copied().collect();
|
||||
self.finished_sessions = Default::default();
|
||||
self.sessions_started = 0;
|
||||
async fn reset_stats(&mut self, reset_day: Date) -> Result<(), StatsStorageError> {
|
||||
//active users reset
|
||||
let new_active_users = self.storage.get_active_users().await?;
|
||||
for user in new_active_users {
|
||||
self.storage.insert_unique_user(reset_day, user).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_old_stats(&mut self, delete_before: Date) -> Result<(), StatsStorageError> {
|
||||
self.storage.delete_finished_sessions(delete_before).await?;
|
||||
self.storage.delete_unique_users(delete_before).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,18 @@ WORKDIR /usr/src/nym/nym-data-observatory
|
||||
|
||||
RUN cargo build --release
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# The following environment variables are required at runtime:
|
||||
#
|
||||
# NYM_DATA_OBSERVATORY_CONNECTION_URL
|
||||
#
|
||||
# And optionally:
|
||||
#
|
||||
# NYM_DATA_OBSERVATORY_HTTP_PORT
|
||||
#
|
||||
# see https://github.com/nymtech/nym/blob/develop/nym-data-observatory/src/main.rs for details
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
FROM ubuntu:24.04
|
||||
|
||||
RUN apt update && apt install -yy curl ca-certificates
|
||||
|
||||
@@ -68,6 +68,7 @@ warning: no queries found; do you have the `offline` feature enabled
|
||||
### Possible solutions
|
||||
|
||||
- does your `sqlx-cli` version match `sqlx` version from `Cargo.toml`?
|
||||
+ `cargo install -f sqlx-cli --version <specific version>`
|
||||
```
|
||||
cargo install sqlx-cli --version <exact semver version as sqlx> --force
|
||||
```
|
||||
|
||||
@@ -18,8 +18,14 @@ services:
|
||||
dockerfile: nym-data-observatory/Dockerfile
|
||||
container_name: nym-data-observatory
|
||||
environment:
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_URL: "postgres://postgres:password@postgres:5432"
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_USERNAME: "postgres"
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD: "password"
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_HOST: "postgres"
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_PORT: "5432"
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_DB: ""
|
||||
NYM_DATA_OBSERVATORY_HTTP_PORT: 8000
|
||||
env_file:
|
||||
- ../envs/qa.env
|
||||
|
||||
volumes:
|
||||
pgdata:
|
||||
|
||||
@@ -14,9 +14,7 @@ pub(crate) struct Storage {
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub async fn init(connection_url: Option<String>) -> Result<Self> {
|
||||
let connection_url =
|
||||
connection_url.ok_or_else(|| anyhow!("Missing the connection url for database!"))?;
|
||||
pub async fn init(connection_url: String) -> Result<Self> {
|
||||
let connect_options =
|
||||
PgConnectOptions::from_str(&connection_url)?.disable_statement_logging();
|
||||
|
||||
|
||||
@@ -18,9 +18,25 @@ struct Args {
|
||||
#[arg(short, long, default_value = None, env = "NYM_DATA_OBSERVATORY_ENV_FILE")]
|
||||
env_file: Option<String>,
|
||||
|
||||
/// DB connection url
|
||||
#[arg(short, long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_URL")]
|
||||
connection_url: Option<String>,
|
||||
/// DB connection username
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_USERNAME")]
|
||||
connection_username: String,
|
||||
|
||||
/// DB connection password
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD")]
|
||||
connection_password: String,
|
||||
|
||||
/// DB connection host
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_HOST")]
|
||||
connection_host: String,
|
||||
|
||||
/// DB connection port
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PORT")]
|
||||
connection_port: String,
|
||||
|
||||
/// DB connection database name
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_DB")]
|
||||
connection_db: String,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -31,7 +47,16 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
setup_env(args.env_file); // Defaults to mainnet if empty
|
||||
|
||||
let storage = db::Storage::init(args.connection_url).await?;
|
||||
let connection_url = format!(
|
||||
"postgres://{}:{}@{}:{}/{}",
|
||||
args.connection_username,
|
||||
args.connection_password,
|
||||
args.connection_host,
|
||||
args.connection_port,
|
||||
args.connection_db
|
||||
);
|
||||
|
||||
let storage = db::Storage::init(connection_url).await?;
|
||||
let db_pool = storage.pool_owned().await;
|
||||
tokio::spawn(async move {
|
||||
background_task::spawn_in_background(db_pool).await;
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
nym-gateway-probe
|
||||
@@ -0,0 +1,27 @@
|
||||
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
|
||||
[package]
|
||||
name = "nym-node-status-agent"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
rust-version.workspace = true
|
||||
readme.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true}
|
||||
clap = { workspace = true, features = ["derive", "env"] }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["models"]}
|
||||
nym-common-models = { path = "../common/models" }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde_json = { workspace = true }
|
||||
@@ -0,0 +1,28 @@
|
||||
FROM rust:latest AS builder
|
||||
|
||||
RUN apt update && apt install -yy libdbus-1-dev pkg-config libclang-dev
|
||||
|
||||
# Install go
|
||||
RUN wget https://go.dev/dl/go1.22.5.linux-amd64.tar.gz -O go.tar.gz
|
||||
RUN tar -xzvf go.tar.gz -C /usr/local
|
||||
|
||||
RUN git clone https://github.com/nymtech/nym-vpn-client /usr/src/nym-vpn-client
|
||||
ENV PATH=/go/bin:/usr/local/go/bin:$PATH
|
||||
WORKDIR /usr/src/nym-vpn-client/nym-vpn-core
|
||||
RUN cargo build --release --package nym-gateway-probe
|
||||
|
||||
COPY ./ /usr/src/nym
|
||||
WORKDIR /usr/src/nym/nym-node-status-agent
|
||||
RUN cargo build --release
|
||||
|
||||
FROM ubuntu:24.04
|
||||
|
||||
RUN apt-get update && apt-get install -y ca-certificates
|
||||
|
||||
WORKDIR /nym
|
||||
|
||||
COPY --from=builder /usr/src/nym/target/release/nym-node-status-agent ./
|
||||
COPY --from=builder /usr/src/nym-vpn-client/nym-vpn-core/target/release/nym-gateway-probe ./
|
||||
|
||||
ENV NODE_STATUS_AGENT_PROBE_PATH=/nym/nym-gateway-probe
|
||||
ENTRYPOINT [ "/nym/nym-node-status-agent", "run-probe" ]
|
||||
Executable
+49
@@ -0,0 +1,49 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -eu
|
||||
|
||||
export RUST_LOG=${RUST_LOG:-debug}
|
||||
|
||||
crate_root=$(dirname $(realpath "$0"))
|
||||
gateway_probe_src=$(dirname $(dirname "$crate_root"))/nym-vpn-client/nym-vpn-core
|
||||
echo "gateway_probe_src=$gateway_probe_src"
|
||||
echo "crate_root=$crate_root"
|
||||
|
||||
export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe"
|
||||
|
||||
# build & copy over GW probe
|
||||
function copy_gw_probe() {
|
||||
pushd $gateway_probe_src
|
||||
cargo build --release --package nym-gateway-probe
|
||||
cp target/release/nym-gateway-probe "$crate_root"
|
||||
$crate_root/nym-gateway-probe --version
|
||||
popd
|
||||
}
|
||||
|
||||
function build_agent() {
|
||||
cargo build --package nym-node-status-agent --release
|
||||
}
|
||||
|
||||
function swarm() {
|
||||
local workers=$1
|
||||
echo "Running $workers in parallel"
|
||||
|
||||
build_agent
|
||||
|
||||
for ((i=1; i<=$workers; i++)); do
|
||||
../target/release/nym-node-status-agent run-probe &
|
||||
done
|
||||
|
||||
wait
|
||||
|
||||
echo "All agents completed"
|
||||
}
|
||||
|
||||
export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1"
|
||||
export NODE_STATUS_AGENT_SERVER_PORT="8000"
|
||||
|
||||
copy_gw_probe
|
||||
|
||||
swarm 30
|
||||
|
||||
# cargo run -- run-probe
|
||||
@@ -0,0 +1,109 @@
|
||||
use clap::{Parser, Subcommand};
|
||||
use nym_bin_common::bin_info;
|
||||
use nym_common_models::ns_api::TestrunAssignment;
|
||||
use std::sync::OnceLock;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::probe::GwProbe;
|
||||
|
||||
// Helper for passing LONG_VERSION to clap
|
||||
fn pretty_build_info_static() -> &'static str {
|
||||
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
|
||||
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Args {
|
||||
#[command(subcommand)]
|
||||
pub(crate) command: Command,
|
||||
#[arg(short, long, env = "NODE_STATUS_AGENT_SERVER_ADDRESS")]
|
||||
pub(crate) server_address: String,
|
||||
|
||||
#[arg(short = 'p', long, env = "NODE_STATUS_AGENT_SERVER_PORT")]
|
||||
pub(crate) server_port: u16,
|
||||
// TODO dz accept keypair for identification / auth
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
pub(crate) enum Command {
|
||||
RunProbe {
|
||||
/// path of binary to run
|
||||
#[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")]
|
||||
probe_path: String,
|
||||
#[arg(short, long, env = "NODE_STATUS_AGENT_GATEWAY_ID")]
|
||||
gateway_id: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Args {
|
||||
pub(crate) async fn execute(&self) -> anyhow::Result<()> {
|
||||
match &self.command {
|
||||
Command::RunProbe {
|
||||
probe_path,
|
||||
gateway_id,
|
||||
} => self.run_probe(probe_path, gateway_id).await?,
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_probe(&self, probe_path: &str, gateway_id: &Option<String>) -> anyhow::Result<()> {
|
||||
let server_address = format!("{}:{}", &self.server_address, self.server_port);
|
||||
|
||||
let probe = GwProbe::new(probe_path.to_string());
|
||||
|
||||
let version = probe.version().await;
|
||||
tracing::info!("Probe version:\n{}", version);
|
||||
|
||||
let testrun = request_testrun(&server_address).await?;
|
||||
|
||||
let log = probe.run_and_get_log(gateway_id);
|
||||
|
||||
submit_results(&server_address, testrun.testrun_id, log).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
const URL_BASE: &str = "internal/testruns";
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
async fn request_testrun(server_addr: &str) -> anyhow::Result<TestrunAssignment> {
|
||||
let target_url = format!("{}/{}", server_addr, URL_BASE);
|
||||
let client = reqwest::Client::new();
|
||||
let res = client
|
||||
.get(target_url)
|
||||
.send()
|
||||
.await
|
||||
.and_then(|response| response.error_for_status())?;
|
||||
res.json()
|
||||
.await
|
||||
.map(|testrun| {
|
||||
tracing::info!("Received testrun assignment: {:?}", testrun);
|
||||
testrun
|
||||
})
|
||||
.map_err(|err| {
|
||||
tracing::error!("err");
|
||||
err.into()
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(probe_outcome))]
|
||||
async fn submit_results(
|
||||
server_addr: &str,
|
||||
testrun_id: i64,
|
||||
probe_outcome: String,
|
||||
) -> anyhow::Result<()> {
|
||||
let target_url = format!("{}/{}/{}", server_addr, URL_BASE, testrun_id);
|
||||
let client = reqwest::Client::new();
|
||||
let res = client
|
||||
.post(target_url)
|
||||
.body(probe_outcome)
|
||||
.send()
|
||||
.await
|
||||
.and_then(|response| response.error_for_status())?;
|
||||
|
||||
tracing::debug!("Submitted results: {})", res.status());
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
use crate::cli::Args;
|
||||
use clap::Parser;
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing_subscriber::{filter::Directive, EnvFilter};
|
||||
|
||||
mod cli;
|
||||
mod probe;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
setup_tracing();
|
||||
let args = Args::parse();
|
||||
|
||||
let server_addr = format!("{}:{}", args.server_address, args.server_port);
|
||||
test_ns_api_conn(&server_addr).await?;
|
||||
|
||||
args.execute().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn test_ns_api_conn(server_addr: &str) -> anyhow::Result<()> {
|
||||
reqwest::get(server_addr)
|
||||
.await
|
||||
.map(|res| {
|
||||
tracing::info!(
|
||||
"Testing connection to NS API at {server_addr}: {}",
|
||||
res.status()
|
||||
);
|
||||
})
|
||||
.map_err(|err| anyhow::anyhow!("Couldn't connect to server on {}: {}", server_addr, err))
|
||||
}
|
||||
|
||||
pub(crate) fn setup_tracing() {
|
||||
fn directive_checked(directive: impl Into<String>) -> Directive {
|
||||
directive
|
||||
.into()
|
||||
.parse()
|
||||
.expect("Failed to parse log directive")
|
||||
}
|
||||
|
||||
let log_builder = tracing_subscriber::fmt()
|
||||
// Use a more compact, abbreviated log format
|
||||
.compact()
|
||||
// Display source code file paths
|
||||
.with_file(true)
|
||||
// Display source code line numbers
|
||||
.with_line_number(true)
|
||||
.with_thread_ids(true)
|
||||
// Don't display the event's target (module path)
|
||||
.with_target(false);
|
||||
|
||||
let mut filter = EnvFilter::builder()
|
||||
// if RUST_LOG isn't set, set default level
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy();
|
||||
// these crates are more granularly filtered
|
||||
let filter_crates = [
|
||||
"reqwest",
|
||||
"rustls",
|
||||
"hyper",
|
||||
"sqlx",
|
||||
"h2",
|
||||
"tendermint_rpc",
|
||||
"tower_http",
|
||||
"axum",
|
||||
];
|
||||
for crate_name in filter_crates {
|
||||
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name)));
|
||||
}
|
||||
|
||||
filter = filter.add_directive(directive_checked("nym_bin_common=debug"));
|
||||
filter = filter.add_directive(directive_checked("nym_explorer_client=debug"));
|
||||
filter = filter.add_directive(directive_checked("nym_network_defaults=debug"));
|
||||
filter = filter.add_directive(directive_checked("nym_validator_client=debug"));
|
||||
|
||||
log_builder.with_env_filter(filter).init();
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
use tracing::error;
|
||||
|
||||
pub(crate) struct GwProbe {
|
||||
path: String,
|
||||
}
|
||||
|
||||
impl GwProbe {
|
||||
pub(crate) fn new(probe_path: String) -> Self {
|
||||
Self { path: probe_path }
|
||||
}
|
||||
|
||||
pub(crate) async fn version(&self) -> String {
|
||||
let mut command = tokio::process::Command::new(&self.path);
|
||||
command.stdout(std::process::Stdio::piped());
|
||||
command.arg("--version");
|
||||
|
||||
match command.spawn() {
|
||||
Ok(child) => {
|
||||
if let Ok(output) = child.wait_with_output().await {
|
||||
return String::from_utf8(output.stdout)
|
||||
.unwrap_or("Unable to get log from test run".to_string());
|
||||
}
|
||||
"Unable to get probe version".to_string()
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to get probe version: {}", e);
|
||||
"Failed to get probe version".to_string()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn run_and_get_log(&self, gateway_key: &Option<String>) -> String {
|
||||
let mut command = std::process::Command::new(&self.path);
|
||||
command.stdout(std::process::Stdio::piped());
|
||||
|
||||
if let Some(gateway_id) = gateway_key {
|
||||
command.arg("--gateway").arg(gateway_id);
|
||||
}
|
||||
|
||||
match command.spawn() {
|
||||
Ok(child) => {
|
||||
if let Ok(output) = child.wait_with_output() {
|
||||
return String::from_utf8(output.stdout)
|
||||
.unwrap_or("Unable to get log from test run".to_string());
|
||||
}
|
||||
"Unable to get log from test run".to_string()
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to spawn test: {}", e);
|
||||
"Failed to spawn test run task".to_string()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
data/
|
||||
enter_db.sh
|
||||
nym-gateway-probe
|
||||
nym-node-status-api
|
||||
*.sqlite
|
||||
*.sqlite-journal
|
||||
@@ -0,0 +1,66 @@
|
||||
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
[package]
|
||||
name = "nym-node-status-api"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
rust-version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
axum = { workspace = true, features = ["tokio", "macros"] }
|
||||
chrono = { workspace = true }
|
||||
clap = { workspace = true, features = ["cargo", "derive", "env", "string"] }
|
||||
cosmwasm-std = { workspace = true }
|
||||
envy = { workspace = true }
|
||||
futures-util = { workspace = true }
|
||||
moka = { workspace = true, features = ["future"] }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["models"]}
|
||||
nym-common-models = { path = "../common/models" }
|
||||
nym-explorer-client = { path = "../explorer-api/explorer-client" }
|
||||
# TODO dz: before Nym API client breaking changes. Update to latest develop once new Nym API is live
|
||||
nym-network-defaults = { git = "https://github.com/nymtech/nym", branch = "pre-dir-v2-fork" }
|
||||
nym-validator-client = { git = "https://github.com/nymtech/nym", branch = "pre-dir-v2-fork" }
|
||||
# nym-network-defaults = { path = "../common/network-defaults" }
|
||||
# nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-node-requests = { git = "https://github.com/nymtech/nym", branch = "pre-dir-v2-fork" }
|
||||
# nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] }
|
||||
regex = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
serde_json_path = { workspace = true }
|
||||
strum = { workspace = true }
|
||||
strum_macros = { workspace = true }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||
tracing-log = { workspace = true }
|
||||
tower-http = { workspace = true, features = ["cors", "trace"] }
|
||||
utoipa = { workspace = true, features = ["axum_extras", "time"] }
|
||||
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
|
||||
# TODO dz `cargo update async-trait`
|
||||
# for automatic schema detection, which was merged, but not released yet
|
||||
# https://github.com/ProbablyClem/utoipauto/pull/38
|
||||
# utoipauto = { git = "https://github.com/ProbablyClem/utoipauto", rev = "eb04cba" }
|
||||
utoipauto = { workspace = true }
|
||||
|
||||
[build-dependencies]
|
||||
anyhow = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros"] }
|
||||
sqlx = { workspace = true, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
"macros",
|
||||
"migrate",
|
||||
] }
|
||||
@@ -0,0 +1,15 @@
|
||||
FROM rust:latest AS builder
|
||||
|
||||
COPY ./ /usr/src/nym
|
||||
WORKDIR /usr/src/nym/nym-node-status-api
|
||||
|
||||
RUN cargo build --release
|
||||
|
||||
FROM ubuntu:24.04
|
||||
|
||||
RUN apt-get update && apt-get install -y ca-certificates
|
||||
|
||||
WORKDIR /nym
|
||||
|
||||
COPY --from=builder /usr/src/nym/target/release/nym-node-status-api ./
|
||||
ENTRYPOINT [ "/nym/nym-node-status-api" ]
|
||||
@@ -0,0 +1,8 @@
|
||||
FROM ubuntu:22.04
|
||||
|
||||
RUN apt-get update && apt-get install -y ca-certificates
|
||||
|
||||
WORKDIR /nym
|
||||
|
||||
COPY nym-node-status-api/nym-node-status-api ./
|
||||
ENTRYPOINT [ "/nym/nym-node-status-api" ]
|
||||
@@ -0,0 +1,51 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use sqlx::{Connection, SqliteConnection};
|
||||
use std::fs::Permissions;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use tokio::{fs::File, io::AsyncWriteExt};
|
||||
|
||||
const SQLITE_DB_FILENAME: &str = "nym-node-status-api.sqlite";
|
||||
|
||||
/// If you need to re-run migrations or reset the db, just run
|
||||
/// cargo clean -p nym-node-status-api
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<()> {
|
||||
let out_dir = read_env_var("OUT_DIR")?;
|
||||
let database_path = format!("sqlite://{}/{}?mode=rwc", out_dir, SQLITE_DB_FILENAME);
|
||||
|
||||
write_db_path_to_file(&out_dir, SQLITE_DB_FILENAME).await?;
|
||||
let mut conn = SqliteConnection::connect(&database_path).await?;
|
||||
sqlx::migrate!("./migrations").run(&mut conn).await?;
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
println!("cargo::rustc-env=DATABASE_URL=sqlite://{}", &database_path);
|
||||
|
||||
#[cfg(target_family = "windows")]
|
||||
// for some strange reason we need to add a leading `/` to the windows path even though it's
|
||||
// not a valid windows path... but hey, it works...
|
||||
println!("cargo::rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
|
||||
|
||||
rerun_if_changed();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_env_var(var: &str) -> Result<String> {
|
||||
std::env::var(var).map_err(|_| anyhow!("You need to set {} env var", var))
|
||||
}
|
||||
|
||||
fn rerun_if_changed() {
|
||||
println!("cargo::rerun-if-changed=migrations");
|
||||
println!("cargo::rerun-if-changed=src/db/queries");
|
||||
}
|
||||
|
||||
/// use `./enter_db.sh` to inspect DB
|
||||
async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Result<()> {
|
||||
let mut file = File::create("enter_db.sh").await?;
|
||||
let _ = file.write(b"#!/bin/bash\n").await?;
|
||||
file.write_all(format!("sqlite3 {}/{}", out_dir, db_filename).as_bytes())
|
||||
.await?;
|
||||
|
||||
file.set_permissions(Permissions::from_mode(0o755))
|
||||
.await
|
||||
.map_err(From::from)
|
||||
}
|
||||
+39
@@ -0,0 +1,39 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
export RUST_LOG=${RUST_LOG:-debug}
|
||||
|
||||
export NYM_API_CLIENT_TIMEOUT=60
|
||||
export EXPLORER_CLIENT_TIMEOUT=60
|
||||
|
||||
export ENVIRONMENT="mainnet.env"
|
||||
|
||||
function run_bare() {
|
||||
# export necessary env vars
|
||||
set -a
|
||||
source ../envs/$ENVIRONMENT
|
||||
set +a
|
||||
export RUST_LOG=debug
|
||||
|
||||
# --conection-url is provided in build.rs
|
||||
cargo run --package nym-node-status-api
|
||||
}
|
||||
|
||||
function run_docker() {
|
||||
cargo build --package nym-node-status-api --release
|
||||
cp ../target/release/nym-node-status-api .
|
||||
|
||||
cd ..
|
||||
docker build -t node-status-api -f nym-node-status-api/Dockerfile.dev .
|
||||
docker run --env-file envs/${ENVIRONMENT} \
|
||||
-e EXPLORER_CLIENT_TIMEOUT=$EXPLORER_CLIENT_TIMEOUT \
|
||||
-e NYM_API_CLIENT_TIMEOUT=$NYM_API_CLIENT_TIMEOUT \
|
||||
-e DATABASE_URL="sqlite://node-status-api.sqlite?mode=rwc" \
|
||||
-e RUST_LOG=${RUST_LOG} node-status-api
|
||||
|
||||
}
|
||||
|
||||
run_bare
|
||||
|
||||
# run_docker
|
||||
@@ -0,0 +1,112 @@
|
||||
CREATE TABLE gateways
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
gateway_identity_key VARCHAR NOT NULL UNIQUE,
|
||||
self_described VARCHAR,
|
||||
explorer_pretty_bond VARCHAR,
|
||||
last_probe_result VARCHAR,
|
||||
last_probe_log VARCHAR,
|
||||
config_score INTEGER NOT NULL DEFAULT (0),
|
||||
config_score_successes REAL NOT NULL DEFAULT (0),
|
||||
config_score_samples REAL NOT NULL DEFAULT (0),
|
||||
routing_score INTEGER NOT NULL DEFAULT (0),
|
||||
routing_score_successes REAL NOT NULL DEFAULT (0),
|
||||
routing_score_samples REAL NOT NULL DEFAULT (0),
|
||||
test_run_samples REAL NOT NULL DEFAULT (0),
|
||||
last_testrun_utc INTEGER,
|
||||
last_updated_utc INTEGER NOT NULL,
|
||||
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
|
||||
blacklisted INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
|
||||
performance INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE INDEX idx_gateway_description_gateway_identity_key ON gateways (gateway_identity_key);
|
||||
|
||||
|
||||
CREATE TABLE mixnodes (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
identity_key VARCHAR NOT NULL UNIQUE,
|
||||
mix_id INTEGER NOT NULL UNIQUE,
|
||||
bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0,
|
||||
total_stake INTEGER NOT NULL,
|
||||
host VARCHAR NOT NULL,
|
||||
http_api_port INTEGER NOT NULL,
|
||||
blacklisted INTEGER CHECK (blacklisted in (0, 1)) NOT NULL DEFAULT 0,
|
||||
full_details VARCHAR,
|
||||
self_described VARCHAR,
|
||||
last_updated_utc INTEGER NOT NULL
|
||||
, is_dp_delegatee INTEGER CHECK (is_dp_delegatee IN (0, 1)) NOT NULL DEFAULT 0);
|
||||
CREATE INDEX idx_mixnodes_mix_id ON mixnodes (mix_id);
|
||||
CREATE INDEX idx_mixnodes_identity_key ON mixnodes (identity_key);
|
||||
|
||||
CREATE TABLE
|
||||
mixnode_description (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
mix_id INTEGER UNIQUE NOT NULL,
|
||||
moniker VARCHAR,
|
||||
website VARCHAR,
|
||||
security_contact VARCHAR,
|
||||
details VARCHAR,
|
||||
last_updated_utc INTEGER NOT NULL,
|
||||
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id)
|
||||
);
|
||||
|
||||
-- Indexes for description table
|
||||
CREATE INDEX idx_mixnode_description_mix_id ON mixnode_description (mix_id);
|
||||
|
||||
|
||||
CREATE TABLE summary
|
||||
(
|
||||
key VARCHAR PRIMARY KEY,
|
||||
value_json VARCHAR,
|
||||
last_updated_utc INTEGER NOT NULL
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE summary_history
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
date VARCHAR UNIQUE NOT NULL,
|
||||
timestamp_utc INTEGER NOT NULL,
|
||||
value_json VARCHAR
|
||||
);
|
||||
CREATE INDEX idx_summary_history_timestamp_utc ON summary_history (timestamp_utc);
|
||||
CREATE INDEX idx_summary_history_date ON summary_history (date);
|
||||
|
||||
|
||||
CREATE TABLE gateway_description (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
gateway_identity_key VARCHAR UNIQUE NOT NULL,
|
||||
moniker VARCHAR,
|
||||
website VARCHAR,
|
||||
security_contact VARCHAR,
|
||||
details VARCHAR,
|
||||
last_updated_utc INTEGER NOT NULL,
|
||||
FOREIGN KEY (gateway_identity_key) REFERENCES gateways (gateway_identity_key)
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE
|
||||
mixnode_daily_stats (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
mix_id INTEGER NOT NULL,
|
||||
total_stake BIGINT NOT NULL,
|
||||
date_utc VARCHAR NOT NULL,
|
||||
packets_received INTEGER DEFAULT 0,
|
||||
packets_sent INTEGER DEFAULT 0,
|
||||
packets_dropped INTEGER DEFAULT 0,
|
||||
FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id),
|
||||
UNIQUE (mix_id, date_utc) -- This constraint automatically creates an index
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE testruns
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
gateway_id INTEGER,
|
||||
status INTEGER NOT NULL, -- 0=pending, 1=in-progress, 2=complete
|
||||
timestamp_utc INTEGER NOT NULL,
|
||||
ip_address VARCHAR NOT NULL,
|
||||
log VARCHAR NOT NULL,
|
||||
FOREIGN KEY (gateway_id) REFERENCES gateways (id)
|
||||
);
|
||||
@@ -0,0 +1,77 @@
|
||||
use clap::Parser;
|
||||
use nym_bin_common::bin_info;
|
||||
use reqwest::Url;
|
||||
use std::{sync::OnceLock, time::Duration};
|
||||
|
||||
// Helper for passing LONG_VERSION to clap
|
||||
fn pretty_build_info_static() -> &'static str {
|
||||
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
|
||||
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Parser)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Cli {
|
||||
/// Network name for the network to which we're connecting.
|
||||
#[clap(long, env = "NETWORK_NAME")]
|
||||
pub(crate) network_name: String,
|
||||
|
||||
/// Explorer api url.
|
||||
#[clap(short, long, env = "EXPLORER_API")]
|
||||
pub(crate) explorer_api: String,
|
||||
|
||||
/// Nym api url.
|
||||
#[clap(short, long, env = "NYM_API")]
|
||||
pub(crate) nym_api: String,
|
||||
|
||||
/// TTL for the http cache.
|
||||
#[clap(
|
||||
long,
|
||||
default_value_t = 30,
|
||||
env = "NYM_NODE_STATUS_API_NYM_HTTP_CACHE_TTL"
|
||||
)]
|
||||
pub(crate) nym_http_cache_ttl: u64,
|
||||
|
||||
/// HTTP port on which to run node status api.
|
||||
#[clap(long, default_value_t = 8000, env = "NYM_NODE_STATUS_API_HTTP_PORT")]
|
||||
pub(crate) http_port: u16,
|
||||
|
||||
/// Nyxd address.
|
||||
#[clap(long, env = "NYXD")]
|
||||
pub(crate) nyxd_addr: Url,
|
||||
|
||||
/// Nym api client timeout.
|
||||
#[clap(long, default_value = "15", env = "NYM_API_CLIENT_TIMEOUT")]
|
||||
#[arg(value_parser = parse_duration)]
|
||||
pub(crate) nym_api_client_timeout: Duration,
|
||||
|
||||
/// Explorer api client timeout.
|
||||
#[clap(long, default_value = "15", env = "EXPLORER_CLIENT_TIMEOUT")]
|
||||
#[arg(value_parser = parse_duration)]
|
||||
pub(crate) explorer_client_timeout: Duration,
|
||||
|
||||
/// Connection url for the database.
|
||||
#[clap(long, env = "DATABASE_URL")]
|
||||
pub(crate) database_url: String,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
default_value = "600",
|
||||
env = "NODE_STATUS_API_MONITOR_REFRESH_INTERVAL"
|
||||
)]
|
||||
#[arg(value_parser = parse_duration)]
|
||||
pub(crate) monitor_refresh_interval: Duration,
|
||||
|
||||
#[clap(
|
||||
long,
|
||||
default_value = "600",
|
||||
env = "NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL"
|
||||
)]
|
||||
#[arg(value_parser = parse_duration)]
|
||||
pub(crate) testruns_refresh_interval: Duration,
|
||||
}
|
||||
|
||||
fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
|
||||
let seconds = arg.parse()?;
|
||||
Ok(std::time::Duration::from_secs(seconds))
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, ConnectOptions, SqlitePool};
|
||||
use std::str::FromStr;
|
||||
|
||||
pub(crate) mod models;
|
||||
pub(crate) mod queries;
|
||||
|
||||
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
|
||||
|
||||
pub(crate) type DbPool = SqlitePool;
|
||||
|
||||
pub(crate) struct Storage {
|
||||
pool: DbPool,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub async fn init(connection_url: String) -> Result<Self> {
|
||||
let connect_options = SqliteConnectOptions::from_str(&connection_url)?
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
|
||||
let pool = sqlx::SqlitePool::connect_with(connect_options)
|
||||
.await
|
||||
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
|
||||
|
||||
MIGRATOR.run(&pool).await?;
|
||||
|
||||
Ok(Storage { pool })
|
||||
}
|
||||
|
||||
/// Cloning pool is cheap, it's the same underlying set of connections
|
||||
pub fn pool_owned(&self) -> DbPool {
|
||||
self.pool.clone()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,334 @@
|
||||
use crate::{
|
||||
http::{self, models::SummaryHistory},
|
||||
monitor::NumericalCheckedCast,
|
||||
};
|
||||
use nym_node_requests::api::v1::node::models::NodeDescription;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use strum_macros::{EnumString, FromRepr};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
pub(crate) struct GatewayRecord {
|
||||
pub(crate) identity_key: String,
|
||||
pub(crate) bonded: bool,
|
||||
pub(crate) blacklisted: bool,
|
||||
pub(crate) self_described: Option<String>,
|
||||
pub(crate) explorer_pretty_bond: Option<String>,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
pub(crate) performance: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct GatewayDto {
|
||||
pub(crate) gateway_identity_key: String,
|
||||
pub(crate) bonded: bool,
|
||||
pub(crate) blacklisted: bool,
|
||||
pub(crate) performance: i64,
|
||||
pub(crate) self_described: Option<String>,
|
||||
pub(crate) explorer_pretty_bond: Option<String>,
|
||||
pub(crate) last_probe_result: Option<String>,
|
||||
pub(crate) last_probe_log: Option<String>,
|
||||
pub(crate) last_testrun_utc: Option<i64>,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
pub(crate) moniker: String,
|
||||
pub(crate) security_contact: String,
|
||||
pub(crate) details: String,
|
||||
pub(crate) website: String,
|
||||
}
|
||||
|
||||
impl TryFrom<GatewayDto> for http::models::Gateway {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: GatewayDto) -> Result<Self, Self::Error> {
|
||||
// Instead of using routing_score_successes / routing_score_samples, we use the
|
||||
// number of successful testruns in the last 24h.
|
||||
let routing_score = 0f32;
|
||||
let config_score = 0u32;
|
||||
let last_updated_utc =
|
||||
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
|
||||
let last_testrun_utc = value
|
||||
.last_testrun_utc
|
||||
.and_then(|i| i.cast_checked().ok())
|
||||
.map(|t| timestamp_as_utc(t).to_rfc3339());
|
||||
|
||||
let self_described = value.self_described.clone().unwrap_or("null".to_string());
|
||||
let explorer_pretty_bond = value
|
||||
.explorer_pretty_bond
|
||||
.clone()
|
||||
.unwrap_or("null".to_string());
|
||||
let last_probe_result = value
|
||||
.last_probe_result
|
||||
.clone()
|
||||
.unwrap_or("null".to_string());
|
||||
let last_probe_log = value.last_probe_log.clone();
|
||||
|
||||
let self_described = serde_json::from_str(&self_described).unwrap_or(None);
|
||||
let explorer_pretty_bond = serde_json::from_str(&explorer_pretty_bond).unwrap_or(None);
|
||||
let last_probe_result = serde_json::from_str(&last_probe_result).unwrap_or(None);
|
||||
|
||||
let bonded = value.bonded;
|
||||
let blacklisted = value.blacklisted;
|
||||
let performance = value.performance as u8;
|
||||
|
||||
let description = NodeDescription {
|
||||
moniker: value.moniker.clone(),
|
||||
website: value.website.clone(),
|
||||
security_contact: value.security_contact.clone(),
|
||||
details: value.details.clone(),
|
||||
};
|
||||
|
||||
Ok(http::models::Gateway {
|
||||
gateway_identity_key: value.gateway_identity_key.clone(),
|
||||
bonded,
|
||||
blacklisted,
|
||||
performance,
|
||||
self_described,
|
||||
explorer_pretty_bond,
|
||||
description,
|
||||
last_probe_result,
|
||||
last_probe_log,
|
||||
routing_score,
|
||||
config_score,
|
||||
last_testrun_utc,
|
||||
last_updated_utc,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn timestamp_as_utc(unix_timestamp: u64) -> chrono::DateTime<chrono::Utc> {
|
||||
let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp);
|
||||
d.into()
|
||||
}
|
||||
|
||||
pub(crate) struct MixnodeRecord {
|
||||
pub(crate) mix_id: u32,
|
||||
pub(crate) identity_key: String,
|
||||
pub(crate) bonded: bool,
|
||||
pub(crate) total_stake: i64,
|
||||
pub(crate) host: String,
|
||||
pub(crate) http_port: u16,
|
||||
pub(crate) blacklisted: bool,
|
||||
pub(crate) full_details: String,
|
||||
pub(crate) self_described: Option<String>,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
pub(crate) is_dp_delegatee: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct MixnodeDto {
|
||||
pub(crate) mix_id: i64,
|
||||
pub(crate) bonded: bool,
|
||||
pub(crate) blacklisted: bool,
|
||||
pub(crate) is_dp_delegatee: bool,
|
||||
pub(crate) total_stake: i64,
|
||||
pub(crate) full_details: String,
|
||||
pub(crate) self_described: Option<String>,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
pub(crate) moniker: String,
|
||||
pub(crate) website: String,
|
||||
pub(crate) security_contact: String,
|
||||
pub(crate) details: String,
|
||||
}
|
||||
|
||||
impl TryFrom<MixnodeDto> for http::models::Mixnode {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: MixnodeDto) -> Result<Self, Self::Error> {
|
||||
let mix_id = value.mix_id.cast_checked()?;
|
||||
let full_details = value.full_details.clone();
|
||||
let full_details = serde_json::from_str(&full_details).unwrap_or(None);
|
||||
|
||||
let self_described = value
|
||||
.self_described
|
||||
.clone()
|
||||
.map(|v| serde_json::from_str(&v).unwrap_or(serde_json::Value::Null));
|
||||
|
||||
let last_updated_utc =
|
||||
timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339();
|
||||
let blacklisted = value.blacklisted;
|
||||
let is_dp_delegatee = value.is_dp_delegatee;
|
||||
let moniker = value.moniker.clone();
|
||||
let website = value.website.clone();
|
||||
let security_contact = value.security_contact.clone();
|
||||
let details = value.details.clone();
|
||||
|
||||
Ok(http::models::Mixnode {
|
||||
mix_id,
|
||||
bonded: value.bonded,
|
||||
blacklisted,
|
||||
is_dp_delegatee,
|
||||
total_stake: value.total_stake,
|
||||
full_details,
|
||||
description: NodeDescription {
|
||||
moniker,
|
||||
website,
|
||||
security_contact,
|
||||
details,
|
||||
},
|
||||
self_described,
|
||||
last_updated_utc,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct BondedStatusDto {
|
||||
pub(crate) id: i64,
|
||||
pub(crate) identity_key: String,
|
||||
pub(crate) bonded: bool,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct SummaryDto {
|
||||
pub(crate) key: String,
|
||||
pub(crate) value_json: String,
|
||||
pub(crate) last_updated_utc: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct SummaryHistoryDto {
|
||||
#[allow(dead_code)]
|
||||
pub id: i64,
|
||||
pub date: String,
|
||||
pub value_json: String,
|
||||
pub timestamp_utc: i64,
|
||||
}
|
||||
|
||||
impl TryFrom<SummaryHistoryDto> for SummaryHistory {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: SummaryHistoryDto) -> Result<Self, Self::Error> {
|
||||
let value_json = serde_json::from_str(&value.value_json).unwrap_or_default();
|
||||
Ok(SummaryHistory {
|
||||
value_json,
|
||||
date: value.date.clone(),
|
||||
timestamp_utc: timestamp_as_utc(value.timestamp_utc.cast_checked()?).to_rfc3339(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count";
|
||||
pub(crate) const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active";
|
||||
pub(crate) const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive";
|
||||
pub(crate) const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve";
|
||||
pub(crate) const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count";
|
||||
|
||||
pub(crate) const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count";
|
||||
pub(crate) const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count";
|
||||
pub(crate) const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count";
|
||||
|
||||
pub(crate) const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count";
|
||||
pub(crate) const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count";
|
||||
|
||||
// `utoipa`` goes crazy if you use module-qualified prefix as field type so we
|
||||
// have to import it
|
||||
use gateway::GatewaySummary;
|
||||
use mixnode::MixnodeSummary;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct NetworkSummary {
|
||||
pub(crate) mixnodes: MixnodeSummary,
|
||||
pub(crate) gateways: GatewaySummary,
|
||||
}
|
||||
|
||||
pub(crate) mod mixnode {
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct MixnodeSummary {
|
||||
pub(crate) bonded: MixnodeSummaryBonded,
|
||||
pub(crate) blacklisted: MixnodeSummaryBlacklisted,
|
||||
pub(crate) historical: MixnodeSummaryHistorical,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct MixnodeSummaryBonded {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) active: i32,
|
||||
pub(crate) inactive: i32,
|
||||
pub(crate) reserve: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct MixnodeSummaryBlacklisted {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct MixnodeSummaryHistorical {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod gateway {
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummary {
|
||||
pub(crate) bonded: GatewaySummaryBonded,
|
||||
pub(crate) blacklisted: GatewaySummaryBlacklisted,
|
||||
pub(crate) historical: GatewaySummaryHistorical,
|
||||
pub(crate) explorer: GatewaySummaryExplorer,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummaryExplorer {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummaryBonded {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummaryHistorical {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct GatewaySummaryBlacklisted {
|
||||
pub(crate) count: i32,
|
||||
pub(crate) last_updated_utc: String,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TestRunDto {
|
||||
pub id: i64,
|
||||
pub gateway_id: i64,
|
||||
pub status: i64,
|
||||
pub timestamp_utc: i64,
|
||||
pub ip_address: String,
|
||||
pub log: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, strum_macros::Display, EnumString, FromRepr, PartialEq)]
|
||||
#[repr(u8)]
|
||||
pub(crate) enum TestRunStatus {
|
||||
Complete = 2,
|
||||
InProgress = 1,
|
||||
Pending = 0,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GatewayIdentityDto {
|
||||
pub gateway_identity_key: String,
|
||||
pub bonded: bool,
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GatewayInfoDto {
|
||||
pub id: i64,
|
||||
pub gateway_identity_key: String,
|
||||
pub self_described: Option<String>,
|
||||
pub explorer_pretty_bond: Option<String>,
|
||||
}
|
||||
@@ -0,0 +1,160 @@
|
||||
use crate::{
|
||||
db::{
|
||||
models::{BondedStatusDto, GatewayDto, GatewayRecord},
|
||||
DbPool,
|
||||
},
|
||||
http::models::Gateway,
|
||||
};
|
||||
use futures_util::TryStreamExt;
|
||||
use nym_validator_client::models::DescribedGateway;
|
||||
use tracing::error;
|
||||
|
||||
pub(crate) async fn insert_gateways(
|
||||
pool: &DbPool,
|
||||
gateways: Vec<GatewayRecord>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut db = pool.acquire().await?;
|
||||
for record in gateways {
|
||||
sqlx::query!(
|
||||
"INSERT INTO gateways
|
||||
(gateway_identity_key, bonded, blacklisted,
|
||||
self_described, explorer_pretty_bond,
|
||||
last_updated_utc, performance)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(gateway_identity_key) DO UPDATE SET
|
||||
bonded=excluded.bonded,
|
||||
blacklisted=excluded.blacklisted,
|
||||
self_described=excluded.self_described,
|
||||
explorer_pretty_bond=excluded.explorer_pretty_bond,
|
||||
last_updated_utc=excluded.last_updated_utc,
|
||||
performance = excluded.performance;",
|
||||
record.identity_key,
|
||||
record.bonded,
|
||||
record.blacklisted,
|
||||
record.self_described,
|
||||
record.explorer_pretty_bond,
|
||||
record.last_updated_utc,
|
||||
record.performance
|
||||
)
|
||||
.execute(&mut *db)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn write_blacklisted_gateways_to_db<'a, I>(
|
||||
pool: &DbPool,
|
||||
gateways: I,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
I: Iterator<Item = &'a String>,
|
||||
{
|
||||
let mut conn = pool.acquire().await?;
|
||||
for gateway_identity_key in gateways {
|
||||
sqlx::query!(
|
||||
"UPDATE gateways
|
||||
SET blacklisted = true
|
||||
WHERE gateway_identity_key = ?;",
|
||||
gateway_identity_key,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ensure all gateways that are set as bonded, are still bonded
|
||||
pub(crate) async fn ensure_gateways_still_bonded(
|
||||
pool: &DbPool,
|
||||
gateways: &[DescribedGateway],
|
||||
) -> anyhow::Result<usize> {
|
||||
let bonded_gateways_rows = get_all_bonded_gateways_row_ids_by_status(pool, true).await?;
|
||||
let unbonded_gateways_rows = bonded_gateways_rows.iter().filter(|v| {
|
||||
!gateways
|
||||
.iter()
|
||||
.any(|bonded| *bonded.bond.identity() == v.identity_key)
|
||||
});
|
||||
|
||||
let recently_unbonded_gateways = unbonded_gateways_rows.to_owned().count();
|
||||
let last_updated_utc = chrono::offset::Utc::now().timestamp();
|
||||
let mut transaction = pool.begin().await?;
|
||||
for row in unbonded_gateways_rows {
|
||||
sqlx::query!(
|
||||
"UPDATE gateways
|
||||
SET bonded = ?, last_updated_utc = ?
|
||||
WHERE id = ?;",
|
||||
false,
|
||||
last_updated_utc,
|
||||
row.id,
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
}
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(recently_unbonded_gateways)
|
||||
}
|
||||
|
||||
async fn get_all_bonded_gateways_row_ids_by_status(
|
||||
pool: &DbPool,
|
||||
status: bool,
|
||||
) -> anyhow::Result<Vec<BondedStatusDto>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
BondedStatusDto,
|
||||
r#"SELECT
|
||||
id as "id!",
|
||||
gateway_identity_key as "identity_key!",
|
||||
bonded as "bonded: bool"
|
||||
FROM gateways
|
||||
WHERE bonded = ?"#,
|
||||
status,
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result<Vec<Gateway>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
GatewayDto,
|
||||
r#"SELECT
|
||||
gw.gateway_identity_key as "gateway_identity_key!",
|
||||
gw.bonded as "bonded: bool",
|
||||
gw.blacklisted as "blacklisted: bool",
|
||||
gw.performance as "performance!",
|
||||
gw.self_described as "self_described?",
|
||||
gw.explorer_pretty_bond as "explorer_pretty_bond?",
|
||||
gw.last_probe_result as "last_probe_result?",
|
||||
gw.last_probe_log as "last_probe_log?",
|
||||
gw.last_testrun_utc as "last_testrun_utc?",
|
||||
gw.last_updated_utc as "last_updated_utc!",
|
||||
COALESCE(gd.moniker, "NA") as "moniker!",
|
||||
COALESCE(gd.website, "NA") as "website!",
|
||||
COALESCE(gd.security_contact, "NA") as "security_contact!",
|
||||
COALESCE(gd.details, "NA") as "details!"
|
||||
FROM gateways gw
|
||||
LEFT JOIN gateway_description gd
|
||||
ON gw.gateway_identity_key = gd.gateway_identity_key
|
||||
ORDER BY gw.gateway_identity_key"#,
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
let items: Vec<Gateway> = items
|
||||
.into_iter()
|
||||
.map(|item| item.try_into())
|
||||
.collect::<anyhow::Result<Vec<_>>>()
|
||||
.map_err(|e| {
|
||||
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
|
||||
e
|
||||
})?;
|
||||
tracing::trace!("Fetched {} gateways from DB", items.len());
|
||||
Ok(items)
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
use crate::db::{models::NetworkSummary, DbPool};
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
/// take `last_updated` instead of calculating it so that `summary` matches
|
||||
/// `daily_summary`
|
||||
pub(crate) async fn insert_summaries(
|
||||
pool: &DbPool,
|
||||
summaries: &[(&str, &usize)],
|
||||
summary: &NetworkSummary,
|
||||
last_updated: DateTime<Utc>,
|
||||
) -> anyhow::Result<()> {
|
||||
insert_summary(pool, summaries, last_updated).await?;
|
||||
|
||||
insert_summary_history(pool, summary, last_updated).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_summary(
|
||||
pool: &DbPool,
|
||||
summaries: &[(&str, &usize)],
|
||||
last_updated: DateTime<Utc>,
|
||||
) -> anyhow::Result<()> {
|
||||
let timestamp = last_updated.timestamp();
|
||||
let mut tx = pool.begin().await?;
|
||||
|
||||
for (kind, value) in summaries {
|
||||
let value = value.to_string();
|
||||
sqlx::query!(
|
||||
"INSERT INTO summary
|
||||
(key, value_json, last_updated_utc)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(key) DO UPDATE SET
|
||||
value_json=excluded.value_json,
|
||||
last_updated_utc=excluded.last_updated_utc;",
|
||||
kind,
|
||||
value,
|
||||
timestamp
|
||||
)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
tracing::error!("Failed to insert data for {kind}: {err}, aborting transaction",);
|
||||
err
|
||||
})?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For `<date_N>`, `summary_history` is updated with fresh data on every
|
||||
/// iteration.
|
||||
///
|
||||
/// After UTC midnight, summary is inserted for `<date_N+1>` and last entry for
|
||||
/// `<date_N>` stays there forever.
|
||||
///
|
||||
/// This is not aggregate data, it's a set of latest data points
|
||||
async fn insert_summary_history(
|
||||
pool: &DbPool,
|
||||
summary: &NetworkSummary,
|
||||
last_updated: DateTime<Utc>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
let value_json = serde_json::to_string(&summary)?;
|
||||
let timestamp = last_updated.timestamp();
|
||||
let now_rfc3339 = last_updated.to_rfc3339();
|
||||
// YYYY-MM-DD, without time
|
||||
let date = &now_rfc3339[..10];
|
||||
|
||||
sqlx::query!(
|
||||
"INSERT INTO summary_history
|
||||
(date, timestamp_utc, value_json)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(date) DO UPDATE SET
|
||||
timestamp_utc=excluded.timestamp_utc,
|
||||
value_json=excluded.value_json;",
|
||||
date,
|
||||
timestamp,
|
||||
value_json
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
use futures_util::TryStreamExt;
|
||||
use nym_validator_client::models::MixNodeBondAnnotated;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
models::{BondedStatusDto, MixnodeDto, MixnodeRecord},
|
||||
DbPool,
|
||||
},
|
||||
http::models::{DailyStats, Mixnode},
|
||||
};
|
||||
|
||||
pub(crate) async fn insert_mixnodes(
|
||||
pool: &DbPool,
|
||||
mixnodes: Vec<MixnodeRecord>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
for record in mixnodes.iter() {
|
||||
// https://www.sqlite.org/lang_upsert.html
|
||||
sqlx::query!(
|
||||
"INSERT INTO mixnodes
|
||||
(mix_id, identity_key, bonded, total_stake,
|
||||
host, http_api_port, blacklisted, full_details,
|
||||
self_described, last_updated_utc, is_dp_delegatee)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(mix_id) DO UPDATE SET
|
||||
bonded=excluded.bonded,
|
||||
total_stake=excluded.total_stake, host=excluded.host,
|
||||
http_api_port=excluded.http_api_port,blacklisted=excluded.blacklisted,
|
||||
full_details=excluded.full_details,self_described=excluded.self_described,
|
||||
last_updated_utc=excluded.last_updated_utc,
|
||||
is_dp_delegatee = excluded.is_dp_delegatee;",
|
||||
record.mix_id,
|
||||
record.identity_key,
|
||||
record.bonded,
|
||||
record.total_stake,
|
||||
record.host,
|
||||
record.http_port,
|
||||
record.blacklisted,
|
||||
record.full_details,
|
||||
record.self_described,
|
||||
record.last_updated_utc,
|
||||
record.is_dp_delegatee
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_mixnodes(pool: &DbPool) -> anyhow::Result<Vec<Mixnode>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
MixnodeDto,
|
||||
r#"SELECT
|
||||
mn.mix_id as "mix_id!",
|
||||
mn.bonded as "bonded: bool",
|
||||
mn.blacklisted as "blacklisted: bool",
|
||||
mn.is_dp_delegatee as "is_dp_delegatee: bool",
|
||||
mn.total_stake as "total_stake!",
|
||||
mn.full_details as "full_details!",
|
||||
mn.self_described as "self_described",
|
||||
mn.last_updated_utc as "last_updated_utc!",
|
||||
COALESCE(md.moniker, "NA") as "moniker!",
|
||||
COALESCE(md.website, "NA") as "website!",
|
||||
COALESCE(md.security_contact, "NA") as "security_contact!",
|
||||
COALESCE(md.details, "NA") as "details!"
|
||||
FROM mixnodes mn
|
||||
LEFT JOIN mixnode_description md ON mn.mix_id = md.mix_id
|
||||
ORDER BY mn.mix_id"#
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
let items = items
|
||||
.into_iter()
|
||||
.map(|item| item.try_into())
|
||||
.collect::<anyhow::Result<Vec<_>>>()
|
||||
.map_err(|e| {
|
||||
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
|
||||
e
|
||||
})?;
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
/// We fetch the latest 30 days of data as a subquery and then
|
||||
/// return it in ascending order, so we don't break existing UI
|
||||
pub(crate) async fn get_daily_stats(pool: &DbPool) -> anyhow::Result<Vec<DailyStats>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
DailyStats,
|
||||
r#"
|
||||
SELECT
|
||||
date_utc as "date_utc!",
|
||||
packets_received as "total_packets_received!: i64",
|
||||
packets_sent as "total_packets_sent!: i64",
|
||||
packets_dropped as "total_packets_dropped!: i64",
|
||||
total_stake as "total_stake!: i64"
|
||||
FROM (
|
||||
SELECT
|
||||
date_utc,
|
||||
SUM(packets_received) as packets_received,
|
||||
SUM(packets_sent) as packets_sent,
|
||||
SUM(packets_dropped) as packets_dropped,
|
||||
SUM(total_stake) as total_stake
|
||||
FROM mixnode_daily_stats
|
||||
GROUP BY date_utc
|
||||
ORDER BY date_utc DESC
|
||||
LIMIT 30
|
||||
)
|
||||
GROUP BY date_utc
|
||||
ORDER BY date_utc
|
||||
"#
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<DailyStats>>()
|
||||
.await?;
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
/// Ensure all mixnodes that are set as bonded, are still bonded
|
||||
pub(crate) async fn ensure_mixnodes_still_bonded(
|
||||
pool: &DbPool,
|
||||
mixnodes: &[MixNodeBondAnnotated],
|
||||
) -> anyhow::Result<usize> {
|
||||
let bonded_mixnodes_rows = get_all_bonded_mixnodes_row_ids_by_status(pool, true).await?;
|
||||
let unbonded_mixnodes_rows = bonded_mixnodes_rows.iter().filter(|v| {
|
||||
!mixnodes
|
||||
.iter()
|
||||
.any(|bonded| *bonded.mixnode_details.bond_information.identity() == v.identity_key)
|
||||
});
|
||||
|
||||
let recently_unbonded_mixnodes = unbonded_mixnodes_rows.to_owned().count();
|
||||
let last_updated_utc = chrono::offset::Utc::now().timestamp();
|
||||
let mut transaction = pool.begin().await?;
|
||||
for row in unbonded_mixnodes_rows {
|
||||
sqlx::query!(
|
||||
"UPDATE mixnodes
|
||||
SET bonded = ?, last_updated_utc = ?
|
||||
WHERE id = ?;",
|
||||
false,
|
||||
last_updated_utc,
|
||||
row.id,
|
||||
)
|
||||
.execute(&mut *transaction)
|
||||
.await?;
|
||||
}
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(recently_unbonded_mixnodes)
|
||||
}
|
||||
|
||||
async fn get_all_bonded_mixnodes_row_ids_by_status(
|
||||
pool: &DbPool,
|
||||
status: bool,
|
||||
) -> anyhow::Result<Vec<BondedStatusDto>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
BondedStatusDto,
|
||||
r#"SELECT
|
||||
id as "id!",
|
||||
identity_key as "identity_key!",
|
||||
bonded as "bonded: bool"
|
||||
FROM mixnodes
|
||||
WHERE bonded = ?"#,
|
||||
status,
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
mod gateways;
|
||||
mod misc;
|
||||
mod mixnodes;
|
||||
mod summary;
|
||||
pub(crate) mod testruns;
|
||||
|
||||
pub(crate) use gateways::{
|
||||
ensure_gateways_still_bonded, get_all_gateways, insert_gateways,
|
||||
write_blacklisted_gateways_to_db,
|
||||
};
|
||||
pub(crate) use misc::insert_summaries;
|
||||
pub(crate) use mixnodes::{
|
||||
ensure_mixnodes_still_bonded, get_all_mixnodes, get_daily_stats, insert_mixnodes,
|
||||
};
|
||||
pub(crate) use summary::{get_summary, get_summary_history};
|
||||
@@ -0,0 +1,209 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures_util::TryStreamExt;
|
||||
use std::collections::HashMap;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
models::{
|
||||
gateway::{
|
||||
GatewaySummary, GatewaySummaryBlacklisted, GatewaySummaryBonded,
|
||||
GatewaySummaryExplorer, GatewaySummaryHistorical,
|
||||
},
|
||||
mixnode::{
|
||||
MixnodeSummary, MixnodeSummaryBlacklisted, MixnodeSummaryBonded,
|
||||
MixnodeSummaryHistorical,
|
||||
},
|
||||
NetworkSummary, SummaryDto, SummaryHistoryDto,
|
||||
},
|
||||
DbPool,
|
||||
},
|
||||
http::{
|
||||
error::{HttpError, HttpResult},
|
||||
models::SummaryHistory,
|
||||
},
|
||||
};
|
||||
|
||||
pub(crate) async fn get_summary_history(pool: &DbPool) -> anyhow::Result<Vec<SummaryHistory>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let items = sqlx::query_as!(
|
||||
SummaryHistoryDto,
|
||||
r#"SELECT
|
||||
id as "id!",
|
||||
date as "date!",
|
||||
timestamp_utc as "timestamp_utc!",
|
||||
value_json as "value_json!"
|
||||
FROM summary_history
|
||||
ORDER BY date DESC
|
||||
LIMIT 30"#,
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
let items = items
|
||||
.into_iter()
|
||||
.map(|item| item.try_into())
|
||||
.collect::<anyhow::Result<Vec<_>>>()
|
||||
.map_err(|e| {
|
||||
error!("Conversion from DTO failed: {e}. Invalidly stored data?");
|
||||
e
|
||||
})?;
|
||||
Ok(items)
|
||||
}
|
||||
|
||||
async fn get_summary_dto(pool: &DbPool) -> anyhow::Result<Vec<SummaryDto>> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
Ok(sqlx::query_as!(
|
||||
SummaryDto,
|
||||
r#"SELECT
|
||||
key as "key!",
|
||||
value_json as "value_json!",
|
||||
last_updated_utc as "last_updated_utc!"
|
||||
FROM summary"#
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_summary(pool: &DbPool) -> HttpResult<NetworkSummary> {
|
||||
let items = get_summary_dto(pool).await.map_err(|err| {
|
||||
tracing::error!("Couldn't get Summary from DB: {err}");
|
||||
HttpError::internal()
|
||||
})?;
|
||||
from_summary_dto(items).await
|
||||
}
|
||||
|
||||
async fn from_summary_dto(items: Vec<SummaryDto>) -> HttpResult<NetworkSummary> {
|
||||
const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count";
|
||||
const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active";
|
||||
const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive";
|
||||
const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve";
|
||||
const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count";
|
||||
const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count";
|
||||
const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count";
|
||||
const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count";
|
||||
const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count";
|
||||
const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count";
|
||||
|
||||
// convert database rows into a map by key
|
||||
let mut map = HashMap::new();
|
||||
for item in items {
|
||||
map.insert(item.key.clone(), item);
|
||||
}
|
||||
|
||||
// check we have all the keys we are expecting, and build up a map of errors for missing one
|
||||
let keys = [
|
||||
GATEWAYS_BONDED_COUNT,
|
||||
GATEWAYS_EXPLORER_COUNT,
|
||||
GATEWAYS_HISTORICAL_COUNT,
|
||||
GATEWAYS_BLACKLISTED_COUNT,
|
||||
MIXNODES_BLACKLISTED_COUNT,
|
||||
MIXNODES_BONDED_ACTIVE,
|
||||
MIXNODES_BONDED_COUNT,
|
||||
MIXNODES_BONDED_INACTIVE,
|
||||
MIXNODES_BONDED_RESERVE,
|
||||
MIXNODES_HISTORICAL_COUNT,
|
||||
];
|
||||
|
||||
let mut errors: Vec<&str> = vec![];
|
||||
for key in keys {
|
||||
if !map.contains_key(key) {
|
||||
errors.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
// return an error if anything is missing, with a nice list
|
||||
if !errors.is_empty() {
|
||||
tracing::error!("Summary value missing: {}", errors.join(", "));
|
||||
return Err(HttpError::internal());
|
||||
}
|
||||
|
||||
// strip the options and use default values (anything missing is trapped above)
|
||||
let mixnodes_bonded_count: SummaryDto =
|
||||
map.get(MIXNODES_BONDED_COUNT).cloned().unwrap_or_default();
|
||||
let mixnodes_bonded_active: SummaryDto =
|
||||
map.get(MIXNODES_BONDED_ACTIVE).cloned().unwrap_or_default();
|
||||
let mixnodes_bonded_inactive: SummaryDto = map
|
||||
.get(MIXNODES_BONDED_INACTIVE)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let mixnodes_bonded_reserve: SummaryDto = map
|
||||
.get(MIXNODES_BONDED_RESERVE)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let mixnodes_blacklisted_count: SummaryDto = map
|
||||
.get(MIXNODES_BLACKLISTED_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let gateways_bonded_count: SummaryDto =
|
||||
map.get(GATEWAYS_BONDED_COUNT).cloned().unwrap_or_default();
|
||||
let gateways_explorer_count: SummaryDto = map
|
||||
.get(GATEWAYS_EXPLORER_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let mixnodes_historical_count: SummaryDto = map
|
||||
.get(MIXNODES_HISTORICAL_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let gateways_historical_count: SummaryDto = map
|
||||
.get(GATEWAYS_HISTORICAL_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let gateways_blacklisted_count: SummaryDto = map
|
||||
.get(GATEWAYS_BLACKLISTED_COUNT)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(NetworkSummary {
|
||||
mixnodes: MixnodeSummary {
|
||||
bonded: MixnodeSummaryBonded {
|
||||
count: to_count_i32(&mixnodes_bonded_count),
|
||||
active: to_count_i32(&mixnodes_bonded_active),
|
||||
reserve: to_count_i32(&mixnodes_bonded_reserve),
|
||||
inactive: to_count_i32(&mixnodes_bonded_inactive),
|
||||
last_updated_utc: to_timestamp(&mixnodes_bonded_count),
|
||||
},
|
||||
blacklisted: MixnodeSummaryBlacklisted {
|
||||
count: to_count_i32(&mixnodes_blacklisted_count),
|
||||
last_updated_utc: to_timestamp(&mixnodes_blacklisted_count),
|
||||
},
|
||||
historical: MixnodeSummaryHistorical {
|
||||
count: to_count_i32(&mixnodes_historical_count),
|
||||
last_updated_utc: to_timestamp(&mixnodes_historical_count),
|
||||
},
|
||||
},
|
||||
gateways: GatewaySummary {
|
||||
bonded: GatewaySummaryBonded {
|
||||
count: to_count_i32(&gateways_bonded_count),
|
||||
last_updated_utc: to_timestamp(&gateways_bonded_count),
|
||||
},
|
||||
blacklisted: GatewaySummaryBlacklisted {
|
||||
count: to_count_i32(&gateways_blacklisted_count),
|
||||
last_updated_utc: to_timestamp(&gateways_blacklisted_count),
|
||||
},
|
||||
historical: GatewaySummaryHistorical {
|
||||
count: to_count_i32(&gateways_historical_count),
|
||||
last_updated_utc: to_timestamp(&gateways_historical_count),
|
||||
},
|
||||
explorer: GatewaySummaryExplorer {
|
||||
count: to_count_i32(&gateways_explorer_count),
|
||||
last_updated_utc: to_timestamp(&gateways_explorer_count),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn to_count_i32(value: &SummaryDto) -> i32 {
|
||||
value.value_json.parse::<i32>().unwrap_or_default()
|
||||
}
|
||||
|
||||
fn to_timestamp(value: &SummaryDto) -> String {
|
||||
timestamp_as_utc(value.last_updated_utc as u64).to_rfc3339()
|
||||
}
|
||||
|
||||
fn timestamp_as_utc(unix_timestamp: u64) -> DateTime<Utc> {
|
||||
let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp);
|
||||
d.into()
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
use crate::http::models::TestrunAssignment;
|
||||
use crate::{
|
||||
db::models::{TestRunDto, TestRunStatus},
|
||||
testruns::now_utc,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use sqlx::{pool::PoolConnection, Sqlite};
|
||||
|
||||
pub(crate) async fn get_testrun_by_id(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
testrun_id: i64,
|
||||
) -> anyhow::Result<TestRunDto> {
|
||||
sqlx::query_as!(
|
||||
TestRunDto,
|
||||
r#"SELECT
|
||||
id as "id!",
|
||||
gateway_id as "gateway_id!",
|
||||
status as "status!",
|
||||
timestamp_utc as "timestamp_utc!",
|
||||
ip_address as "ip_address!",
|
||||
log as "log!"
|
||||
FROM testruns
|
||||
WHERE id = ?
|
||||
ORDER BY timestamp_utc"#,
|
||||
testrun_id
|
||||
)
|
||||
.fetch_one(conn.as_mut())
|
||||
.await
|
||||
.context(format!("Couldn't retrieve testrun {testrun_id}"))
|
||||
}
|
||||
|
||||
pub(crate) async fn get_oldest_testrun_and_make_it_pending(
|
||||
// TODO dz accept mut reference, repeat in all similar functions
|
||||
conn: PoolConnection<Sqlite>,
|
||||
) -> anyhow::Result<Option<TestrunAssignment>> {
|
||||
let mut conn = conn;
|
||||
let assignment = sqlx::query_as!(
|
||||
TestrunAssignment,
|
||||
r#"UPDATE testruns
|
||||
SET status = ?
|
||||
WHERE rowid =
|
||||
(
|
||||
SELECT rowid
|
||||
FROM testruns
|
||||
WHERE status = ?
|
||||
ORDER BY timestamp_utc asc
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING
|
||||
id as "testrun_id!",
|
||||
gateway_id as "gateway_pk_id!"
|
||||
"#,
|
||||
TestRunStatus::InProgress as i64,
|
||||
TestRunStatus::Pending as i64,
|
||||
)
|
||||
.fetch_optional(&mut *conn)
|
||||
.await?;
|
||||
|
||||
Ok(assignment)
|
||||
}
|
||||
|
||||
pub(crate) async fn update_testrun_status(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
testrun_id: i64,
|
||||
status: TestRunStatus,
|
||||
) -> anyhow::Result<()> {
|
||||
let status = status as u32;
|
||||
sqlx::query!(
|
||||
"UPDATE testruns SET status = ? WHERE id = ?",
|
||||
status,
|
||||
testrun_id
|
||||
)
|
||||
.execute(conn.as_mut())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn update_gateway_last_probe_log(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
gateway_pk: i64,
|
||||
log: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
sqlx::query!(
|
||||
"UPDATE gateways SET last_probe_log = ? WHERE id = ?",
|
||||
log,
|
||||
gateway_pk
|
||||
)
|
||||
.execute(conn.as_mut())
|
||||
.await
|
||||
.map(drop)
|
||||
.map_err(From::from)
|
||||
}
|
||||
|
||||
pub(crate) async fn update_gateway_last_probe_result(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
gateway_pk: i64,
|
||||
result: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
sqlx::query!(
|
||||
"UPDATE gateways SET last_probe_result = ? WHERE id = ?",
|
||||
result,
|
||||
gateway_pk
|
||||
)
|
||||
.execute(conn.as_mut())
|
||||
.await
|
||||
.map(drop)
|
||||
.map_err(From::from)
|
||||
}
|
||||
|
||||
pub(crate) async fn update_gateway_score(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
gateway_pk: i64,
|
||||
) -> anyhow::Result<()> {
|
||||
let now = now_utc().timestamp();
|
||||
sqlx::query!(
|
||||
"UPDATE gateways SET last_testrun_utc = ?, last_updated_utc = ? WHERE id = ?",
|
||||
now,
|
||||
now,
|
||||
gateway_pk
|
||||
)
|
||||
.execute(conn.as_mut())
|
||||
.await
|
||||
.map(drop)
|
||||
.map_err(From::from)
|
||||
}
|
||||
@@ -0,0 +1,110 @@
|
||||
use axum::{
|
||||
extract::{Path, Query, State},
|
||||
Json, Router,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::http::{
|
||||
error::{HttpError, HttpResult},
|
||||
models::{Gateway, GatewaySkinny},
|
||||
state::AppState,
|
||||
PagedResult, Pagination,
|
||||
};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", axum::routing::get(gateways))
|
||||
.route("/skinny", axum::routing::get(gateways_skinny))
|
||||
.route("/:identity_key", axum::routing::get(get_gateway))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Gateways",
|
||||
get,
|
||||
params(
|
||||
Pagination
|
||||
),
|
||||
path = "/v2/gateways",
|
||||
responses(
|
||||
(status = 200, body = PagedGateway)
|
||||
)
|
||||
)]
|
||||
async fn gateways(
|
||||
Query(pagination): Query<Pagination>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<PagedResult<Gateway>>> {
|
||||
let db = state.db_pool();
|
||||
let res = state.cache().get_gateway_list(db).await;
|
||||
|
||||
Ok(Json(PagedResult::paginate(pagination, res)))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Gateways",
|
||||
get,
|
||||
params(
|
||||
Pagination
|
||||
),
|
||||
path = "/v2/gateways/skinny",
|
||||
responses(
|
||||
(status = 200, body = PagedGatewaySkinny)
|
||||
)
|
||||
)]
|
||||
async fn gateways_skinny(
|
||||
Query(pagination): Query<Pagination>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<PagedResult<GatewaySkinny>>> {
|
||||
let db = state.db_pool();
|
||||
let res = state.cache().get_gateway_list(db).await;
|
||||
let res: Vec<GatewaySkinny> = res
|
||||
.iter()
|
||||
.filter(|g| g.bonded)
|
||||
.map(|g| GatewaySkinny {
|
||||
gateway_identity_key: g.gateway_identity_key.clone(),
|
||||
self_described: g.self_described.clone(),
|
||||
performance: g.performance,
|
||||
explorer_pretty_bond: g.explorer_pretty_bond.clone(),
|
||||
last_probe_result: g.last_probe_result.clone(),
|
||||
last_testrun_utc: g.last_testrun_utc.clone(),
|
||||
last_updated_utc: g.last_updated_utc.clone(),
|
||||
routing_score: g.routing_score,
|
||||
config_score: g.config_score,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Json(PagedResult::paginate(pagination, res)))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
#[into_params(parameter_in = Path)]
|
||||
struct IdentityKeyParam {
|
||||
identity_key: String,
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Gateways",
|
||||
get,
|
||||
params(
|
||||
IdentityKeyParam
|
||||
),
|
||||
path = "/v2/gateways/{identity_key}",
|
||||
responses(
|
||||
(status = 200, body = Gateway)
|
||||
)
|
||||
)]
|
||||
async fn get_gateway(
|
||||
Path(IdentityKeyParam { identity_key }): Path<IdentityKeyParam>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<Gateway>> {
|
||||
let db = state.db_pool();
|
||||
let res = state.cache().get_gateway_list(db).await;
|
||||
|
||||
match res
|
||||
.iter()
|
||||
.find(|item| item.gateway_identity_key == identity_key)
|
||||
{
|
||||
Some(res) => Ok(Json(res.clone())),
|
||||
None => Err(HttpError::invalid_input(identity_key)),
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
use axum::{
|
||||
extract::{Path, Query, State},
|
||||
Json, Router,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use tracing::instrument;
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::http::{
|
||||
error::{HttpError, HttpResult},
|
||||
models::{DailyStats, Mixnode},
|
||||
state::AppState,
|
||||
PagedResult, Pagination,
|
||||
};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", axum::routing::get(mixnodes))
|
||||
.route("/:mix_id", axum::routing::get(get_mixnodes))
|
||||
.route("/stats", axum::routing::get(get_stats))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Mixnodes",
|
||||
get,
|
||||
params(
|
||||
Pagination
|
||||
),
|
||||
path = "/v2/mixnodes",
|
||||
responses(
|
||||
(status = 200, body = PagedMixnode)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(page=pagination.page, size=pagination.size))]
|
||||
async fn mixnodes(
|
||||
Query(pagination): Query<Pagination>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<PagedResult<Mixnode>>> {
|
||||
let db = state.db_pool();
|
||||
let res = state.cache().get_mixnodes_list(db).await;
|
||||
|
||||
Ok(Json(PagedResult::paginate(pagination, res)))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
#[into_params(parameter_in = Path)]
|
||||
struct MixIdParam {
|
||||
mix_id: String,
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Mixnodes",
|
||||
get,
|
||||
params(
|
||||
MixIdParam
|
||||
),
|
||||
path = "/v2/mixnodes/{mix_id}",
|
||||
responses(
|
||||
(status = 200, body = Mixnode)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(mix_id = mix_id))]
|
||||
async fn get_mixnodes(
|
||||
Path(MixIdParam { mix_id }): Path<MixIdParam>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<Mixnode>> {
|
||||
match mix_id.parse::<u32>() {
|
||||
Ok(parsed_mix_id) => {
|
||||
let res = state.cache().get_mixnodes_list(state.db_pool()).await;
|
||||
|
||||
match res.iter().find(|item| item.mix_id == parsed_mix_id) {
|
||||
Some(res) => Ok(Json(res.clone())),
|
||||
None => Err(HttpError::invalid_input(mix_id)),
|
||||
}
|
||||
}
|
||||
Err(_e) => Err(HttpError::invalid_input(mix_id)),
|
||||
}
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Mixnodes",
|
||||
get,
|
||||
path = "/v2/mixnodes/stats",
|
||||
responses(
|
||||
(status = 200, body = Vec<DailyStats>)
|
||||
)
|
||||
)]
|
||||
async fn get_stats(State(state): State<AppState>) -> HttpResult<Json<Vec<DailyStats>>> {
|
||||
let stats = state.cache().get_mixnode_stats(state.db_pool()).await;
|
||||
Ok(Json(stats))
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
use anyhow::anyhow;
|
||||
use axum::{response::Redirect, Router};
|
||||
use tokio::net::ToSocketAddrs;
|
||||
use tower_http::{cors::CorsLayer, trace::TraceLayer};
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
use crate::http::{server::HttpServer, state::AppState};
|
||||
|
||||
pub(crate) mod gateways;
|
||||
pub(crate) mod mixnodes;
|
||||
pub(crate) mod services;
|
||||
pub(crate) mod summary;
|
||||
pub(crate) mod testruns;
|
||||
|
||||
pub(crate) struct RouterBuilder {
|
||||
unfinished_router: Router<AppState>,
|
||||
}
|
||||
|
||||
impl RouterBuilder {
|
||||
pub(crate) fn with_default_routes() -> Self {
|
||||
let router = Router::new()
|
||||
.merge(
|
||||
SwaggerUi::new("/swagger")
|
||||
.url("/api-docs/openapi.json", super::api_docs::ApiDoc::openapi()),
|
||||
)
|
||||
.route(
|
||||
"/",
|
||||
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
|
||||
)
|
||||
.nest(
|
||||
"/v2",
|
||||
Router::new()
|
||||
.nest("/gateways", gateways::routes())
|
||||
.nest("/mixnodes", mixnodes::routes())
|
||||
.nest("/services", services::routes())
|
||||
.nest("/summary", summary::routes()),
|
||||
)
|
||||
.nest(
|
||||
"/internal",
|
||||
Router::new().nest("/testruns", testruns::routes()),
|
||||
);
|
||||
|
||||
Self {
|
||||
unfinished_router: router,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn with_state(self, state: AppState) -> RouterWithState {
|
||||
RouterWithState {
|
||||
router: self.finalize_routes().with_state(state),
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize_routes(self) -> Router<AppState> {
|
||||
// layers added later wrap earlier layers
|
||||
self.unfinished_router
|
||||
// CORS layer needs to wrap other API layers
|
||||
.layer(setup_cors())
|
||||
// logger should be outermost layer
|
||||
.layer(TraceLayer::new_for_http())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RouterWithState {
|
||||
router: Router,
|
||||
}
|
||||
|
||||
impl RouterWithState {
|
||||
pub(crate) async fn build_server<A: ToSocketAddrs>(
|
||||
self,
|
||||
bind_address: A,
|
||||
) -> anyhow::Result<HttpServer> {
|
||||
tokio::net::TcpListener::bind(bind_address)
|
||||
.await
|
||||
.map(|listener| HttpServer::new(self.router, listener))
|
||||
.map_err(|err| anyhow!("Couldn't bind to address due to {}", err))
|
||||
}
|
||||
}
|
||||
|
||||
fn setup_cors() -> CorsLayer {
|
||||
use axum::http::Method;
|
||||
CorsLayer::new()
|
||||
.allow_origin(tower_http::cors::Any)
|
||||
.allow_methods([Method::POST, Method::GET, Method::PATCH, Method::OPTIONS])
|
||||
.allow_headers(tower_http::cors::Any)
|
||||
.allow_credentials(false)
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
use serde_json_path::JsonPath;
|
||||
|
||||
use crate::http::models::Gateway;
|
||||
|
||||
pub(super) struct ParseJsonPaths {
|
||||
pub(super) path_ip_address: JsonPath,
|
||||
pub(super) path_hostname: JsonPath,
|
||||
pub(super) path_service_provider_client_id: JsonPath,
|
||||
}
|
||||
|
||||
impl ParseJsonPaths {
|
||||
pub fn new() -> Result<Self, serde_json_path::ParseError> {
|
||||
Ok(ParseJsonPaths {
|
||||
path_ip_address: JsonPath::parse("$.host_information.ip_address[0]")?,
|
||||
path_hostname: JsonPath::parse("$.host_information.hostname")?,
|
||||
path_service_provider_client_id: JsonPath::parse("$.network_requester.address")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct ParsedDetails {
|
||||
pub(super) ip_address: Option<String>,
|
||||
pub(super) hostname: Option<String>,
|
||||
pub(super) service_provider_client_id: Option<String>,
|
||||
}
|
||||
|
||||
impl ParsedDetails {
|
||||
fn get_string_from_json_path(
|
||||
value: &Option<serde_json::Value>,
|
||||
path: &JsonPath,
|
||||
) -> Option<String> {
|
||||
match value {
|
||||
Some(value) => path
|
||||
.query(value)
|
||||
.exactly_one()
|
||||
.map(|v2| v2.as_str().map(|v3| v3.to_string()))
|
||||
.ok()
|
||||
.flatten(),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
pub fn new(paths: &ParseJsonPaths, g: &Gateway) -> ParsedDetails {
|
||||
ParsedDetails {
|
||||
hostname: ParsedDetails::get_string_from_json_path(
|
||||
&g.self_described,
|
||||
&paths.path_hostname,
|
||||
),
|
||||
ip_address: ParsedDetails::get_string_from_json_path(
|
||||
&g.self_described,
|
||||
&paths.path_ip_address,
|
||||
),
|
||||
service_provider_client_id: ParsedDetails::get_string_from_json_path(
|
||||
&g.self_described,
|
||||
&paths.path_service_provider_client_id,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
use crate::http::{
|
||||
error::{HttpError, HttpResult},
|
||||
models::Service,
|
||||
state::AppState,
|
||||
PagedResult, Pagination,
|
||||
};
|
||||
use axum::{
|
||||
extract::{Query, State},
|
||||
Json, Router,
|
||||
};
|
||||
use json_path::{ParseJsonPaths, ParsedDetails};
|
||||
use tracing::instrument;
|
||||
|
||||
mod json_path;
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new().route("/", axum::routing::get(mixnodes))
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::IntoParams)]
|
||||
#[into_params(parameter_in = Query)]
|
||||
pub(crate) struct ServicesQueryParams {
|
||||
size: Option<usize>,
|
||||
page: Option<usize>,
|
||||
wss: Option<bool>,
|
||||
hostname: Option<bool>,
|
||||
entry: Option<bool>,
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Services",
|
||||
get,
|
||||
params(
|
||||
ServicesQueryParams,
|
||||
),
|
||||
path = "/v2/services",
|
||||
responses(
|
||||
(status = 200, body = PagedService)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip(state))]
|
||||
async fn mixnodes(
|
||||
Query(ServicesQueryParams {
|
||||
size,
|
||||
page,
|
||||
wss,
|
||||
hostname,
|
||||
entry,
|
||||
}): Query<ServicesQueryParams>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<PagedResult<Service>>> {
|
||||
let db = state.db_pool();
|
||||
let cache = state.cache();
|
||||
|
||||
let show_only_wss = wss.unwrap_or(false);
|
||||
let show_only_with_hostname = hostname.unwrap_or(false);
|
||||
let show_entry_gateways_only = entry.unwrap_or(false);
|
||||
|
||||
let paths = ParseJsonPaths::new().map_err(|e| {
|
||||
tracing::error!("Invalidly configured ParseJsonPaths: {e}");
|
||||
HttpError::internal()
|
||||
})?;
|
||||
let res = cache.get_gateway_list(db).await;
|
||||
let res: Vec<Service> = res
|
||||
.iter()
|
||||
.map(|g| {
|
||||
let details = ParsedDetails::new(&paths, g);
|
||||
|
||||
let s = Service {
|
||||
gateway_identity_key: g.gateway_identity_key.clone(),
|
||||
ip_address: details.ip_address,
|
||||
service_provider_client_id: details.service_provider_client_id,
|
||||
hostname: details.hostname,
|
||||
last_successful_ping_utc: g.last_testrun_utc.clone(),
|
||||
last_updated_utc: g.last_updated_utc.clone(),
|
||||
// routing_score: g.routing_score,
|
||||
routing_score: 1f32,
|
||||
mixnet_websockets: g
|
||||
.self_described
|
||||
.clone()
|
||||
.and_then(|s| s.get("mixnet_websockets").cloned()),
|
||||
};
|
||||
|
||||
let f = ServiceFilter::new(&s);
|
||||
|
||||
(s, f)
|
||||
})
|
||||
.filter(|(_, f)| {
|
||||
let mut keep = f.has_network_requester_sp;
|
||||
|
||||
if show_entry_gateways_only {
|
||||
keep = true;
|
||||
}
|
||||
|
||||
if show_only_wss {
|
||||
keep &= f.has_wss;
|
||||
}
|
||||
if show_only_with_hostname {
|
||||
keep &= f.has_hostname;
|
||||
}
|
||||
|
||||
keep
|
||||
})
|
||||
.map(|(s, _)| s)
|
||||
.collect();
|
||||
|
||||
Ok(Json(PagedResult::paginate(Pagination { size, page }, res)))
|
||||
}
|
||||
|
||||
struct ServiceFilter {
|
||||
has_wss: bool,
|
||||
has_network_requester_sp: bool,
|
||||
has_hostname: bool,
|
||||
}
|
||||
|
||||
impl ServiceFilter {
|
||||
fn new(s: &Service) -> Self {
|
||||
let has_wss = match &s.mixnet_websockets {
|
||||
Some(v) => v.get("wss_port").map(|v2| !v2.is_null()).unwrap_or(false),
|
||||
None => false,
|
||||
};
|
||||
let has_hostname = s.hostname.is_some();
|
||||
let has_network_requester_sp = match &s.service_provider_client_id {
|
||||
Some(v) => !v.is_empty(),
|
||||
None => false,
|
||||
};
|
||||
|
||||
ServiceFilter {
|
||||
has_wss,
|
||||
has_hostname,
|
||||
has_network_requester_sp,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
use axum::{extract::State, Json, Router};
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{
|
||||
db::models::NetworkSummary,
|
||||
http::{error::HttpResult, models::SummaryHistory, state::AppState},
|
||||
};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", axum::routing::get(summary))
|
||||
.route("/history", axum::routing::get(summary_history))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Summary",
|
||||
get,
|
||||
path = "/v2/summary",
|
||||
responses(
|
||||
(status = 200, body = NetworkSummary)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn summary(State(state): State<AppState>) -> HttpResult<Json<NetworkSummary>> {
|
||||
crate::db::queries::get_summary(state.db_pool())
|
||||
.await
|
||||
.map(Json)
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Summary",
|
||||
get,
|
||||
path = "/v2/summary/history",
|
||||
responses(
|
||||
(status = 200, body = Vec<SummaryHistory>)
|
||||
)
|
||||
)]
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn summary_history(State(state): State<AppState>) -> HttpResult<Json<Vec<SummaryHistory>>> {
|
||||
Ok(Json(
|
||||
state.cache().get_summary_history(state.db_pool()).await,
|
||||
))
|
||||
}
|
||||
@@ -0,0 +1,116 @@
|
||||
use axum::Json;
|
||||
use axum::{
|
||||
extract::{Path, State},
|
||||
Router,
|
||||
};
|
||||
use reqwest::StatusCode;
|
||||
|
||||
use crate::db::models::TestRunStatus;
|
||||
use crate::db::queries;
|
||||
use crate::{
|
||||
db,
|
||||
http::{
|
||||
error::{HttpError, HttpResult},
|
||||
models::TestrunAssignment,
|
||||
state::AppState,
|
||||
},
|
||||
};
|
||||
|
||||
// TODO dz consider adding endpoint to trigger testrun scan for a given gateway_id
|
||||
// like in H< src/http/testruns.rs
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", axum::routing::get(request_testrun))
|
||||
.route("/:testrun_id", axum::routing::post(submit_testrun))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn request_testrun(State(state): State<AppState>) -> HttpResult<Json<TestrunAssignment>> {
|
||||
// TODO dz log agent's key
|
||||
// TODO dz log agent's network probe version
|
||||
tracing::debug!("Agent X requested testrun");
|
||||
|
||||
let db = state.db_pool();
|
||||
let conn = db
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
|
||||
return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(conn).await {
|
||||
Ok(res) => {
|
||||
if let Some(testrun) = res {
|
||||
// TODO dz consider adding a column to testruns table with agent's public key
|
||||
tracing::debug!(
|
||||
"🏃 Assigned testrun row_id {} to agent X",
|
||||
&testrun.testrun_id
|
||||
);
|
||||
Ok(Json(testrun))
|
||||
} else {
|
||||
Err(HttpError::not_found("No testruns available"))
|
||||
}
|
||||
}
|
||||
Err(err) => Err(HttpError::internal_with_logging(err)),
|
||||
};
|
||||
}
|
||||
|
||||
// TODO dz accept testrun_id as query parameter
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn submit_testrun(
|
||||
Path(testrun_id): Path<i64>,
|
||||
State(state): State<AppState>,
|
||||
body: String,
|
||||
) -> HttpResult<StatusCode> {
|
||||
tracing::debug!(
|
||||
"Agent submitted testrun {}. Total length: {}",
|
||||
testrun_id,
|
||||
body.len(),
|
||||
);
|
||||
// TODO dz store testrun results
|
||||
|
||||
let db = state.db_pool();
|
||||
let mut conn = db
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
|
||||
let testrun = queries::testruns::get_testrun_by_id(&mut conn, testrun_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("{e}");
|
||||
HttpError::not_found(testrun_id)
|
||||
})?;
|
||||
// TODO dz this should be part of a single transaction: commit after everything is done
|
||||
queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
queries::testruns::update_gateway_last_probe_log(&mut conn, testrun.gateway_id, &body)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
let result = get_result_from_log(&body);
|
||||
queries::testruns::update_gateway_last_probe_result(&mut conn, testrun.gateway_id, &result)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
queries::testruns::update_gateway_score(&mut conn, testrun.gateway_id)
|
||||
.await
|
||||
.map_err(HttpError::internal_with_logging)?;
|
||||
// TODO dz log gw identity key
|
||||
|
||||
tracing::info!(
|
||||
"✅ Testrun row_id {} for gateway {} complete",
|
||||
testrun.id,
|
||||
testrun.gateway_id
|
||||
);
|
||||
|
||||
Ok(StatusCode::CREATED)
|
||||
}
|
||||
|
||||
fn get_result_from_log(log: &str) -> String {
|
||||
let re = regex::Regex::new(r"\n\{\s").unwrap();
|
||||
let result: Vec<_> = re.splitn(log, 2).collect();
|
||||
if result.len() == 2 {
|
||||
let res = format!("{} {}", "{", result[1]).to_string();
|
||||
return res;
|
||||
}
|
||||
"".to_string()
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
use crate::http::{Gateway, GatewaySkinny, Mixnode, Service};
|
||||
use utoipa::OpenApi;
|
||||
use utoipauto::utoipauto;
|
||||
|
||||
// manually import external structs which are behind feature flags because they
|
||||
// can't be automatically discovered
|
||||
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
|
||||
#[utoipauto(paths = "./nym-node-status-api/src")]
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
info(title = "Node Status API"),
|
||||
tags(),
|
||||
components(schemas(nym_node_requests::api::v1::node::models::NodeDescription,))
|
||||
)]
|
||||
pub(super) struct ApiDoc;
|
||||
@@ -0,0 +1,42 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
pub(crate) type HttpResult<T> = Result<T, HttpError>;
|
||||
|
||||
pub(crate) struct HttpError {
|
||||
message: String,
|
||||
status: axum::http::StatusCode,
|
||||
}
|
||||
|
||||
impl HttpError {
|
||||
pub(crate) fn invalid_input(message: String) -> Self {
|
||||
Self {
|
||||
message,
|
||||
status: axum::http::StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn internal_with_logging(msg: impl Display) -> Self {
|
||||
tracing::error!("{}", msg.to_string());
|
||||
Self::internal()
|
||||
}
|
||||
|
||||
pub(crate) fn internal() -> Self {
|
||||
Self {
|
||||
message: serde_json::json!({"message": "Internal server error"}).to_string(),
|
||||
status: axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn not_found(msg: impl Display) -> Self {
|
||||
Self {
|
||||
message: serde_json::json!({"message": msg.to_string()}).to_string(),
|
||||
status: axum::http::StatusCode::NOT_FOUND,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl axum::response::IntoResponse for HttpError {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
(self.status, self.message).into_response()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
use models::{Gateway, GatewaySkinny, Mixnode, Service};
|
||||
|
||||
pub(crate) mod api;
|
||||
pub(crate) mod api_docs;
|
||||
pub(crate) mod error;
|
||||
pub(crate) mod models;
|
||||
pub(crate) mod server;
|
||||
pub(crate) mod state;
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
|
||||
// exclude generic from auto-discovery
|
||||
#[utoipauto::utoipa_ignore]
|
||||
// https://docs.rs/utoipa/latest/utoipa/derive.ToSchema.html#generic-schemas-with-aliases
|
||||
// Generic structs can only be included via aliases, not directly, because they
|
||||
// it would cause an error in generated Swagger docs.
|
||||
// Instead, you have to manually monomorphize each generic struct that
|
||||
// you wish to document
|
||||
#[aliases(
|
||||
PagedGateway = PagedResult<Gateway>,
|
||||
PagedGatewaySkinny = PagedResult<GatewaySkinny>,
|
||||
PagedMixnode = PagedResult<Mixnode>,
|
||||
PagedService = PagedResult<Service>,
|
||||
)]
|
||||
pub struct PagedResult<T> {
|
||||
pub page: usize,
|
||||
pub size: usize,
|
||||
pub total: usize,
|
||||
pub items: Vec<T>,
|
||||
}
|
||||
|
||||
impl<T: Clone> PagedResult<T> {
|
||||
pub fn paginate(pagination: Pagination, res: Vec<T>) -> Self {
|
||||
let total = res.len();
|
||||
let (size, mut page) = pagination.intoto_inner_values();
|
||||
|
||||
if page * size > total {
|
||||
page = total / size;
|
||||
}
|
||||
|
||||
let chunks: Vec<&[T]> = res.chunks(size).collect();
|
||||
|
||||
PagedResult {
|
||||
page,
|
||||
size,
|
||||
total,
|
||||
items: chunks.get(page).cloned().unwrap_or_default().into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::IntoParams)]
|
||||
#[into_params(parameter_in = Query)]
|
||||
pub(crate) struct Pagination {
|
||||
size: Option<usize>,
|
||||
page: Option<usize>,
|
||||
}
|
||||
|
||||
impl Pagination {
|
||||
// unwrap stored values or use predefined defaults
|
||||
pub(crate) fn intoto_inner_values(self) -> (usize, usize) {
|
||||
const SIZE_DEFAULT: usize = 10;
|
||||
const SIZE_MAX: usize = 200;
|
||||
|
||||
const PAGE_DEFAULT: usize = 0;
|
||||
|
||||
(
|
||||
self.size.unwrap_or(SIZE_DEFAULT).min(SIZE_MAX),
|
||||
self.page.unwrap_or(PAGE_DEFAULT),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
use crate::db::models::TestRunDto;
|
||||
use nym_node_requests::api::v1::node::models::NodeDescription;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
pub(crate) use nym_common_models::ns_api::TestrunAssignment;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub struct Gateway {
|
||||
pub gateway_identity_key: String,
|
||||
pub bonded: bool,
|
||||
pub blacklisted: bool,
|
||||
pub performance: u8,
|
||||
pub self_described: Option<serde_json::Value>,
|
||||
pub explorer_pretty_bond: Option<serde_json::Value>,
|
||||
pub description: NodeDescription,
|
||||
pub last_probe_result: Option<serde_json::Value>,
|
||||
pub last_probe_log: Option<String>,
|
||||
pub last_testrun_utc: Option<String>,
|
||||
pub last_updated_utc: String,
|
||||
pub routing_score: f32,
|
||||
pub config_score: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub struct GatewaySkinny {
|
||||
pub gateway_identity_key: String,
|
||||
pub self_described: Option<serde_json::Value>,
|
||||
pub explorer_pretty_bond: Option<serde_json::Value>,
|
||||
pub last_probe_result: Option<serde_json::Value>,
|
||||
pub last_testrun_utc: Option<String>,
|
||||
pub last_updated_utc: String,
|
||||
pub routing_score: f32,
|
||||
pub config_score: u32,
|
||||
pub performance: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub struct Mixnode {
|
||||
pub mix_id: u32,
|
||||
pub bonded: bool,
|
||||
pub blacklisted: bool,
|
||||
pub is_dp_delegatee: bool,
|
||||
pub total_stake: i64,
|
||||
pub full_details: Option<serde_json::Value>,
|
||||
pub self_described: Option<serde_json::Value>,
|
||||
pub description: NodeDescription,
|
||||
pub last_updated_utc: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub struct DailyStats {
|
||||
pub date_utc: String,
|
||||
pub total_packets_received: i64,
|
||||
pub total_packets_sent: i64,
|
||||
pub total_packets_dropped: i64,
|
||||
pub total_stake: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub struct Service {
|
||||
pub gateway_identity_key: String,
|
||||
pub last_updated_utc: String,
|
||||
pub routing_score: f32,
|
||||
pub service_provider_client_id: Option<String>,
|
||||
pub ip_address: Option<String>,
|
||||
pub hostname: Option<String>,
|
||||
pub mixnet_websockets: Option<serde_json::Value>,
|
||||
pub last_successful_ping_utc: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
||||
pub(crate) struct SummaryHistory {
|
||||
pub date: String,
|
||||
pub value_json: serde_json::Value,
|
||||
pub timestamp_utc: String,
|
||||
}
|
||||
|
||||
impl From<TestRunDto> for TestrunAssignment {
|
||||
fn from(value: TestRunDto) -> Self {
|
||||
Self {
|
||||
gateway_pk_id: value.gateway_id,
|
||||
testrun_id: value.id,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
use axum::Router;
|
||||
use core::net::SocketAddr;
|
||||
use tokio::{net::TcpListener, task::JoinHandle};
|
||||
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
|
||||
|
||||
use crate::{
|
||||
db::DbPool,
|
||||
http::{api::RouterBuilder, state::AppState},
|
||||
};
|
||||
|
||||
/// Return handles that allow for graceful shutdown of server + awaiting its
|
||||
/// background tokio task
|
||||
pub(crate) async fn start_http_api(
|
||||
db_pool: DbPool,
|
||||
http_port: u16,
|
||||
nym_http_cache_ttl: u64,
|
||||
) -> anyhow::Result<ShutdownHandles> {
|
||||
let router_builder = RouterBuilder::with_default_routes();
|
||||
|
||||
let state = AppState::new(db_pool, nym_http_cache_ttl);
|
||||
let router = router_builder.with_state(state);
|
||||
|
||||
// TODO dz do we need this to be configurable?
|
||||
let bind_addr = format!("0.0.0.0:{}", http_port);
|
||||
tracing::info!("Binding server to {bind_addr}");
|
||||
let server = router.build_server(bind_addr).await?;
|
||||
|
||||
Ok(start_server(server))
|
||||
}
|
||||
|
||||
fn start_server(server: HttpServer) -> ShutdownHandles {
|
||||
// one copy is stored to trigger a graceful shutdown later
|
||||
let shutdown_button = CancellationToken::new();
|
||||
// other copy is given to server to listen for a shutdown
|
||||
let shutdown_receiver = shutdown_button.clone();
|
||||
let shutdown_receiver = shutdown_receiver.cancelled_owned();
|
||||
|
||||
let server_handle = tokio::spawn(async move { server.run(shutdown_receiver).await });
|
||||
|
||||
ShutdownHandles {
|
||||
server_handle,
|
||||
shutdown_button,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ShutdownHandles {
|
||||
server_handle: JoinHandle<std::io::Result<()>>,
|
||||
shutdown_button: CancellationToken,
|
||||
}
|
||||
|
||||
impl ShutdownHandles {
|
||||
/// Send graceful shutdown signal to server and wait for server task to complete
|
||||
pub(crate) async fn shutdown(self) -> anyhow::Result<()> {
|
||||
self.shutdown_button.cancel();
|
||||
|
||||
match self.server_handle.await {
|
||||
Ok(Ok(_)) => {
|
||||
tracing::info!("HTTP server shut down without errors");
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
tracing::error!("HTTP server terminated with: {err}");
|
||||
anyhow::bail!(err)
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Server task panicked: {err}");
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct HttpServer {
|
||||
router: Router,
|
||||
listener: TcpListener,
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub(crate) fn new(router: Router, listener: TcpListener) -> Self {
|
||||
Self { router, listener }
|
||||
}
|
||||
|
||||
pub(crate) async fn run(self, receiver: WaitForCancellationFutureOwned) -> std::io::Result<()> {
|
||||
// into_make_service_with_connect_info allows us to see client ip address
|
||||
axum::serve(
|
||||
self.listener,
|
||||
self.router
|
||||
.into_make_service_with_connect_info::<SocketAddr>(),
|
||||
)
|
||||
.with_graceful_shutdown(receiver)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,223 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use moka::{future::Cache, Entry};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{
|
||||
db::DbPool,
|
||||
http::models::{DailyStats, Gateway, Mixnode, SummaryHistory},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct AppState {
|
||||
db_pool: DbPool,
|
||||
cache: HttpCache,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub(crate) fn new(db_pool: DbPool, cache_ttl: u64) -> Self {
|
||||
Self {
|
||||
db_pool,
|
||||
cache: HttpCache::new(cache_ttl),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn db_pool(&self) -> &DbPool {
|
||||
&self.db_pool
|
||||
}
|
||||
|
||||
pub(crate) fn cache(&self) -> &HttpCache {
|
||||
&self.cache
|
||||
}
|
||||
}
|
||||
|
||||
static GATEWAYS_LIST_KEY: &str = "gateways";
|
||||
static MIXNODES_LIST_KEY: &str = "mixnodes";
|
||||
static MIXSTATS_LIST_KEY: &str = "mixstats";
|
||||
static SUMMARY_HISTORY_LIST_KEY: &str = "summary-history";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct HttpCache {
|
||||
gateways: Cache<String, Arc<RwLock<Vec<Gateway>>>>,
|
||||
mixnodes: Cache<String, Arc<RwLock<Vec<Mixnode>>>>,
|
||||
mixstats: Cache<String, Arc<RwLock<Vec<DailyStats>>>>,
|
||||
history: Cache<String, Arc<RwLock<Vec<SummaryHistory>>>>,
|
||||
}
|
||||
|
||||
impl HttpCache {
|
||||
pub fn new(ttl_seconds: u64) -> Self {
|
||||
HttpCache {
|
||||
gateways: Cache::builder()
|
||||
.max_capacity(2)
|
||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||
.build(),
|
||||
mixnodes: Cache::builder()
|
||||
.max_capacity(2)
|
||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||
.build(),
|
||||
mixstats: Cache::builder()
|
||||
.max_capacity(2)
|
||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||
.build(),
|
||||
history: Cache::builder()
|
||||
.max_capacity(2)
|
||||
.time_to_live(Duration::from_secs(ttl_seconds))
|
||||
.build(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upsert_gateway_list(
|
||||
&self,
|
||||
new_gateway_list: Vec<Gateway>,
|
||||
) -> Entry<String, Arc<RwLock<Vec<Gateway>>>> {
|
||||
self.gateways
|
||||
.entry_by_ref(GATEWAYS_LIST_KEY)
|
||||
.and_upsert_with(|maybe_entry| async {
|
||||
if let Some(entry) = maybe_entry {
|
||||
let v = entry.into_value();
|
||||
let mut guard = v.write().await;
|
||||
*guard = new_gateway_list;
|
||||
v.clone()
|
||||
} else {
|
||||
Arc::new(RwLock::new(new_gateway_list))
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_gateway_list(&self, db: &DbPool) -> Vec<Gateway> {
|
||||
match self.gateways.get(GATEWAYS_LIST_KEY).await {
|
||||
Some(guard) => {
|
||||
let read_lock = guard.read().await;
|
||||
read_lock.clone()
|
||||
}
|
||||
None => {
|
||||
// the key is missing so populate it
|
||||
tracing::warn!("No gateways in cache, refreshing cache from DB...");
|
||||
|
||||
let gateways = crate::db::queries::get_all_gateways(db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
self.upsert_gateway_list(gateways.clone()).await;
|
||||
|
||||
if gateways.is_empty() {
|
||||
tracing::warn!("Database contains 0 gateways");
|
||||
}
|
||||
|
||||
gateways
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upsert_mixnode_list(
|
||||
&self,
|
||||
new_mixnode_list: Vec<Mixnode>,
|
||||
) -> Entry<String, Arc<RwLock<Vec<Mixnode>>>> {
|
||||
self.mixnodes
|
||||
.entry_by_ref(MIXNODES_LIST_KEY)
|
||||
.and_upsert_with(|maybe_entry| async {
|
||||
if let Some(entry) = maybe_entry {
|
||||
let v = entry.into_value();
|
||||
let mut guard = v.write().await;
|
||||
*guard = new_mixnode_list;
|
||||
v.clone()
|
||||
} else {
|
||||
Arc::new(RwLock::new(new_mixnode_list))
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_mixnodes_list(&self, db: &DbPool) -> Vec<Mixnode> {
|
||||
match self.mixnodes.get(MIXNODES_LIST_KEY).await {
|
||||
Some(guard) => {
|
||||
let read_lock = guard.read().await;
|
||||
read_lock.clone()
|
||||
}
|
||||
None => {
|
||||
tracing::warn!("No mixnodes in cache, refreshing cache from DB...");
|
||||
|
||||
let mixnodes = crate::db::queries::get_all_mixnodes(db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
self.upsert_mixnode_list(mixnodes.clone()).await;
|
||||
|
||||
if mixnodes.is_empty() {
|
||||
tracing::warn!("Database contains 0 mixnodes");
|
||||
}
|
||||
|
||||
mixnodes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upsert_mixnode_stats(
|
||||
&self,
|
||||
mixnode_stats: Vec<DailyStats>,
|
||||
) -> Entry<String, Arc<RwLock<Vec<DailyStats>>>> {
|
||||
self.mixstats
|
||||
.entry_by_ref(MIXSTATS_LIST_KEY)
|
||||
.and_upsert_with(|maybe_entry| async {
|
||||
if let Some(entry) = maybe_entry {
|
||||
let v = entry.into_value();
|
||||
let mut guard = v.write().await;
|
||||
*guard = mixnode_stats;
|
||||
v.clone()
|
||||
} else {
|
||||
Arc::new(RwLock::new(mixnode_stats))
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_mixnode_stats(&self, db: &DbPool) -> Vec<DailyStats> {
|
||||
match self.mixstats.get(MIXSTATS_LIST_KEY).await {
|
||||
Some(guard) => {
|
||||
let read_lock = guard.read().await;
|
||||
read_lock.to_vec()
|
||||
}
|
||||
None => {
|
||||
let mixnode_stats = crate::db::queries::get_daily_stats(db)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
self.upsert_mixnode_stats(mixnode_stats.clone()).await;
|
||||
mixnode_stats
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_summary_history(&self, db: &DbPool) -> Vec<SummaryHistory> {
|
||||
match self.history.get(SUMMARY_HISTORY_LIST_KEY).await {
|
||||
Some(guard) => {
|
||||
let read_lock = guard.read().await;
|
||||
read_lock.to_vec()
|
||||
}
|
||||
None => {
|
||||
let summary_history = crate::db::queries::get_summary_history(db)
|
||||
.await
|
||||
.unwrap_or(vec![]);
|
||||
self.upsert_summary_history(summary_history.clone()).await;
|
||||
summary_history
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upsert_summary_history(
|
||||
&self,
|
||||
summary_history: Vec<SummaryHistory>,
|
||||
) -> Entry<String, Arc<RwLock<Vec<SummaryHistory>>>> {
|
||||
self.history
|
||||
.entry_by_ref(SUMMARY_HISTORY_LIST_KEY)
|
||||
.and_upsert_with(|maybe_entry| async {
|
||||
if let Some(entry) = maybe_entry {
|
||||
let v = entry.into_value();
|
||||
let mut guard = v.write().await;
|
||||
*guard = summary_history;
|
||||
v.clone()
|
||||
} else {
|
||||
Arc::new(RwLock::new(summary_history))
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing_subscriber::{filter::Directive, EnvFilter};
|
||||
|
||||
// TODO dz you can get the tracing-subscriber via basic-tracing feature on nym-bin-common
|
||||
pub(crate) fn setup_tracing_logger() -> anyhow::Result<()> {
|
||||
fn directive_checked(directive: impl Into<String>) -> anyhow::Result<Directive> {
|
||||
directive.into().parse().map_err(From::from)
|
||||
}
|
||||
|
||||
let log_builder = tracing_subscriber::fmt()
|
||||
// Use a more compact, abbreviated log format
|
||||
.compact()
|
||||
// Display source code file paths
|
||||
.with_file(true)
|
||||
// Display source code line numbers
|
||||
.with_line_number(true)
|
||||
.with_thread_ids(true)
|
||||
// Don't display the event's target (module path)
|
||||
.with_target(false);
|
||||
|
||||
let mut filter = EnvFilter::builder()
|
||||
// if RUST_LOG isn't set, set default level
|
||||
.with_default_directive(LevelFilter::DEBUG.into())
|
||||
.from_env_lossy();
|
||||
|
||||
// these crates are more granularly filtered
|
||||
let warn_crates = [
|
||||
"reqwest",
|
||||
"rustls",
|
||||
"hyper",
|
||||
"sqlx",
|
||||
"h2",
|
||||
"tendermint_rpc",
|
||||
"tower_http",
|
||||
"axum",
|
||||
];
|
||||
for crate_name in warn_crates {
|
||||
filter = filter.add_directive(directive_checked(format!("{}=warn", crate_name))?);
|
||||
}
|
||||
|
||||
let log_level_hint = filter.max_level_hint();
|
||||
|
||||
// debug or higher granularity (e.g. trace)
|
||||
let debug_or_higher = std::cmp::max(
|
||||
log_level_hint.unwrap_or(LevelFilter::DEBUG),
|
||||
LevelFilter::DEBUG,
|
||||
);
|
||||
filter = filter.add_directive(directive_checked(format!(
|
||||
"nym_bin_common={}",
|
||||
debug_or_higher
|
||||
))?);
|
||||
filter = filter.add_directive(directive_checked(format!(
|
||||
"nym_explorer_client={}",
|
||||
debug_or_higher
|
||||
))?);
|
||||
filter = filter.add_directive(directive_checked(format!(
|
||||
"nym_network_defaults={}",
|
||||
debug_or_higher
|
||||
))?);
|
||||
filter = filter.add_directive(directive_checked(format!(
|
||||
"nym_validator_client={}",
|
||||
debug_or_higher
|
||||
))?);
|
||||
|
||||
log_builder.with_env_filter(filter).init();
|
||||
tracing::info!("Log level: {:?}", log_level_hint);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
use clap::Parser;
|
||||
use nym_task::signal::wait_for_signal;
|
||||
|
||||
mod cli;
|
||||
mod db;
|
||||
mod http;
|
||||
mod logging;
|
||||
mod monitor;
|
||||
mod testruns;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
logging::setup_tracing_logger()?;
|
||||
|
||||
let args = cli::Cli::parse();
|
||||
|
||||
let connection_url = args.database_url.clone();
|
||||
tracing::debug!("Using config:\n{:#?}", args);
|
||||
|
||||
let storage = db::Storage::init(connection_url).await?;
|
||||
let db_pool = storage.pool_owned();
|
||||
let args_clone = args.clone();
|
||||
tokio::spawn(async move {
|
||||
monitor::spawn_in_background(
|
||||
db_pool,
|
||||
args_clone.explorer_client_timeout,
|
||||
args_clone.nym_api_client_timeout,
|
||||
&args_clone.nyxd_addr,
|
||||
args_clone.monitor_refresh_interval,
|
||||
)
|
||||
.await;
|
||||
tracing::info!("Started monitor task");
|
||||
});
|
||||
testruns::spawn(storage.pool_owned(), args.testruns_refresh_interval).await;
|
||||
|
||||
let shutdown_handles = http::server::start_http_api(
|
||||
storage.pool_owned(),
|
||||
args.http_port,
|
||||
args.nym_http_cache_ttl,
|
||||
)
|
||||
.await
|
||||
.expect("Failed to start server");
|
||||
|
||||
tracing::info!("Started HTTP server on port {}", args.http_port);
|
||||
|
||||
wait_for_signal().await;
|
||||
|
||||
if let Err(err) = shutdown_handles.shutdown().await {
|
||||
tracing::error!("{err}");
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,466 @@
|
||||
use crate::db::models::{
|
||||
gateway, mixnode, GatewayRecord, MixnodeRecord, NetworkSummary, GATEWAYS_BLACKLISTED_COUNT,
|
||||
GATEWAYS_BONDED_COUNT, GATEWAYS_EXPLORER_COUNT, GATEWAYS_HISTORICAL_COUNT,
|
||||
MIXNODES_BLACKLISTED_COUNT, MIXNODES_BONDED_ACTIVE, MIXNODES_BONDED_COUNT,
|
||||
MIXNODES_BONDED_INACTIVE, MIXNODES_BONDED_RESERVE, MIXNODES_HISTORICAL_COUNT,
|
||||
};
|
||||
use crate::db::{queries, DbPool};
|
||||
use anyhow::anyhow;
|
||||
use cosmwasm_std::Decimal;
|
||||
use nym_explorer_client::{ExplorerClient, PrettyDetailedGatewayBond};
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use nym_validator_client::client::NymApiClientExt;
|
||||
use nym_validator_client::models::{DescribedGateway, DescribedMixNode, MixNodeBondAnnotated};
|
||||
use nym_validator_client::nym_nodes::SkimmedNode;
|
||||
use nym_validator_client::nyxd::contract_traits::PagedMixnetQueryClient;
|
||||
use nym_validator_client::nyxd::{AccountId, NyxdClient};
|
||||
use nym_validator_client::NymApiClient;
|
||||
use reqwest::Url;
|
||||
use std::collections::HashSet;
|
||||
use std::str::FromStr;
|
||||
use tokio::time::Duration;
|
||||
use tracing::instrument;
|
||||
|
||||
// TODO dz should be configurable
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60);
|
||||
|
||||
static DELEGATION_PROGRAM_WALLET: &str = "n1rnxpdpx3kldygsklfft0gech7fhfcux4zst5lw";
|
||||
|
||||
// TODO dz: query many NYM APIs:
|
||||
// multiple instances running directory cache, ask sachin
|
||||
#[instrument(level = "debug", name = "data_monitor", skip_all)]
|
||||
pub(crate) async fn spawn_in_background(
|
||||
db_pool: DbPool,
|
||||
explorer_client_timeout: Duration,
|
||||
nym_api_client_timeout: Duration,
|
||||
nyxd_addr: &Url,
|
||||
refresh_interval: Duration,
|
||||
) {
|
||||
let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env();
|
||||
|
||||
loop {
|
||||
tracing::info!("Refreshing node info...");
|
||||
|
||||
if let Err(e) = run(
|
||||
&db_pool,
|
||||
&network_defaults,
|
||||
explorer_client_timeout,
|
||||
nym_api_client_timeout,
|
||||
nyxd_addr,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Monitor run failed: {e}, retrying in {}s...",
|
||||
FAILURE_RETRY_DELAY.as_secs()
|
||||
);
|
||||
// TODO dz implement some sort of backoff
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Info successfully collected, sleeping for {}s...",
|
||||
refresh_interval.as_secs()
|
||||
);
|
||||
tokio::time::sleep(refresh_interval).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(
|
||||
pool: &DbPool,
|
||||
network_details: &NymNetworkDetails,
|
||||
explorer_client_timeout: Duration,
|
||||
nym_api_client_timeout: Duration,
|
||||
nyxd_addr: &Url,
|
||||
) -> anyhow::Result<()> {
|
||||
let default_api_url = network_details
|
||||
.endpoints
|
||||
.first()
|
||||
.expect("rust sdk mainnet default incorrectly configured")
|
||||
.api_url()
|
||||
.clone()
|
||||
.expect("rust sdk mainnet default missing api_url");
|
||||
let default_explorer_url = network_details.explorer_api.clone().map(|url| {
|
||||
url.parse()
|
||||
.expect("rust sdk mainnet default explorer url not parseable")
|
||||
});
|
||||
|
||||
// TODO dz replace explorer api with ipinfo.io
|
||||
let default_explorer_url =
|
||||
default_explorer_url.expect("explorer url missing in network config");
|
||||
let explorer_client =
|
||||
ExplorerClient::new_with_timeout(default_explorer_url, explorer_client_timeout)?;
|
||||
let explorer_gateways = explorer_client
|
||||
.get_gateways()
|
||||
.await
|
||||
.log_error("get_gateways")?;
|
||||
|
||||
let api_client = NymApiClient::new_with_timeout(default_api_url, nym_api_client_timeout);
|
||||
let gateways = api_client
|
||||
.get_cached_described_gateways()
|
||||
.await
|
||||
.log_error("get_described_gateways")?;
|
||||
tracing::debug!("Fetched {} gateways", gateways.len());
|
||||
let skimmed_gateways = api_client
|
||||
.get_basic_gateways(None)
|
||||
.await
|
||||
.log_error("get_basic_gateways")?;
|
||||
|
||||
let mixnodes = api_client
|
||||
.get_cached_mixnodes()
|
||||
.await
|
||||
.log_error("get_cached_mixnodes")?;
|
||||
tracing::debug!("Fetched {} mixnodes", mixnodes.len());
|
||||
|
||||
let gateways_blacklisted = skimmed_gateways
|
||||
.iter()
|
||||
.filter_map(|gw| {
|
||||
if gw.performance.round_to_integer() <= 50 {
|
||||
Some(gw.ed25519_identity_pubkey.to_owned())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
// Cached mixnodes don't include blacklisted nodes
|
||||
// We need that to calculate the total locked tokens later
|
||||
let mixnodes = api_client
|
||||
.nym_api
|
||||
.get_mixnodes_detailed_unfiltered()
|
||||
.await
|
||||
.log_error("get_mixnodes_detailed_unfiltered")?;
|
||||
let mixnodes_described = api_client
|
||||
.nym_api
|
||||
.get_mixnodes_described()
|
||||
.await
|
||||
.log_error("get_mixnodes_described")?;
|
||||
let mixnodes_active = api_client
|
||||
.nym_api
|
||||
.get_active_mixnodes()
|
||||
.await
|
||||
.log_error("get_active_mixnodes")?;
|
||||
let delegation_program_members =
|
||||
get_delegation_program_details(network_details, nyxd_addr).await?;
|
||||
|
||||
// keep stats for later
|
||||
let count_bonded_mixnodes = mixnodes.len();
|
||||
let count_bonded_gateways = gateways.len();
|
||||
let count_explorer_gateways = explorer_gateways.len();
|
||||
let count_bonded_mixnodes_active = mixnodes_active.len();
|
||||
|
||||
let gateway_records = prepare_gateway_data(
|
||||
&gateways,
|
||||
&gateways_blacklisted,
|
||||
explorer_gateways,
|
||||
skimmed_gateways,
|
||||
)?;
|
||||
queries::insert_gateways(pool, gateway_records)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Gateway info written to DB!");
|
||||
})?;
|
||||
|
||||
// instead of counting blacklisted GWs returned from API cache, count from the active set
|
||||
let count_gateways_blacklisted = gateways
|
||||
.iter()
|
||||
.filter(|gw| {
|
||||
let gw_identity = gw.bond.identity();
|
||||
gateways_blacklisted.contains(gw_identity)
|
||||
})
|
||||
.count();
|
||||
|
||||
if count_gateways_blacklisted > 0 {
|
||||
queries::write_blacklisted_gateways_to_db(pool, gateways_blacklisted.iter())
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!(
|
||||
"Gateway blacklist info written to DB! {} blacklisted by Nym API",
|
||||
count_gateways_blacklisted
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
let mixnode_records =
|
||||
prepare_mixnode_data(&mixnodes, mixnodes_described, delegation_program_members)?;
|
||||
queries::insert_mixnodes(pool, mixnode_records)
|
||||
.await
|
||||
.map(|_| {
|
||||
tracing::debug!("Mixnode info written to DB!");
|
||||
})?;
|
||||
|
||||
let count_mixnodes_blacklisted = mixnodes.iter().filter(|elem| elem.blacklisted).count();
|
||||
|
||||
let recently_unbonded_gateways = queries::ensure_gateways_still_bonded(pool, &gateways).await?;
|
||||
let recently_unbonded_mixnodes = queries::ensure_mixnodes_still_bonded(pool, &mixnodes).await?;
|
||||
|
||||
let count_bonded_mixnodes_reserve = 0; // TODO: NymAPI doesn't report the reserve set size
|
||||
let count_bonded_mixnodes_inactive = count_bonded_mixnodes - count_bonded_mixnodes_active;
|
||||
|
||||
let (all_historical_gateways, all_historical_mixnodes) = calculate_stats(pool).await?;
|
||||
|
||||
//
|
||||
// write summary keys and values to table
|
||||
//
|
||||
|
||||
let nodes_summary = vec![
|
||||
(MIXNODES_BONDED_COUNT, &count_bonded_mixnodes),
|
||||
(MIXNODES_BONDED_ACTIVE, &count_bonded_mixnodes_active),
|
||||
(MIXNODES_BONDED_INACTIVE, &count_bonded_mixnodes_inactive),
|
||||
(MIXNODES_BONDED_RESERVE, &count_bonded_mixnodes_reserve),
|
||||
(MIXNODES_BLACKLISTED_COUNT, &count_mixnodes_blacklisted),
|
||||
(GATEWAYS_BONDED_COUNT, &count_bonded_gateways),
|
||||
(GATEWAYS_EXPLORER_COUNT, &count_explorer_gateways),
|
||||
(MIXNODES_HISTORICAL_COUNT, &all_historical_mixnodes),
|
||||
(GATEWAYS_HISTORICAL_COUNT, &all_historical_gateways),
|
||||
(GATEWAYS_BLACKLISTED_COUNT, &count_gateways_blacklisted),
|
||||
];
|
||||
|
||||
let last_updated = chrono::offset::Utc::now();
|
||||
let last_updated_utc = last_updated.timestamp().to_string();
|
||||
let network_summary = NetworkSummary {
|
||||
mixnodes: mixnode::MixnodeSummary {
|
||||
bonded: mixnode::MixnodeSummaryBonded {
|
||||
count: count_bonded_mixnodes.cast_checked()?,
|
||||
active: count_bonded_mixnodes_active.cast_checked()?,
|
||||
inactive: count_bonded_mixnodes_inactive.cast_checked()?,
|
||||
reserve: count_bonded_mixnodes_reserve.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
blacklisted: mixnode::MixnodeSummaryBlacklisted {
|
||||
count: count_mixnodes_blacklisted.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
historical: mixnode::MixnodeSummaryHistorical {
|
||||
count: all_historical_mixnodes.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
},
|
||||
gateways: gateway::GatewaySummary {
|
||||
bonded: gateway::GatewaySummaryBonded {
|
||||
count: count_bonded_gateways.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
blacklisted: gateway::GatewaySummaryBlacklisted {
|
||||
count: count_gateways_blacklisted.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
historical: gateway::GatewaySummaryHistorical {
|
||||
count: all_historical_gateways.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
explorer: gateway::GatewaySummaryExplorer {
|
||||
count: count_explorer_gateways.cast_checked()?,
|
||||
last_updated_utc: last_updated_utc.to_owned(),
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
queries::insert_summaries(pool, &nodes_summary, &network_summary, last_updated).await?;
|
||||
|
||||
let mut log_lines: Vec<String> = vec![];
|
||||
for (key, value) in nodes_summary.iter() {
|
||||
log_lines.push(format!("{} = {}", key, value));
|
||||
}
|
||||
log_lines.push(format!(
|
||||
"recently_unbonded_mixnodes = {}",
|
||||
recently_unbonded_mixnodes
|
||||
));
|
||||
log_lines.push(format!(
|
||||
"recently_unbonded_gateways = {}",
|
||||
recently_unbonded_gateways
|
||||
));
|
||||
|
||||
tracing::info!("Directory summary: \n{}", log_lines.join("\n"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_gateway_data(
|
||||
gateways: &[DescribedGateway],
|
||||
gateways_blacklisted: &HashSet<String>,
|
||||
explorer_gateways: Vec<PrettyDetailedGatewayBond>,
|
||||
skimmed_gateways: Vec<SkimmedNode>,
|
||||
) -> anyhow::Result<Vec<GatewayRecord>> {
|
||||
let mut gateway_records = Vec::new();
|
||||
|
||||
for gateway in gateways {
|
||||
let identity_key = gateway.bond.identity();
|
||||
let bonded = true;
|
||||
let last_updated_utc = chrono::offset::Utc::now().timestamp();
|
||||
let blacklisted = gateways_blacklisted.contains(identity_key);
|
||||
|
||||
let self_described = gateway
|
||||
.self_described
|
||||
.as_ref()
|
||||
.and_then(|v| serde_json::to_string(&v).ok());
|
||||
|
||||
let explorer_pretty_bond = explorer_gateways
|
||||
.iter()
|
||||
.find(|g| g.gateway.identity_key.eq(identity_key));
|
||||
let explorer_pretty_bond = explorer_pretty_bond.and_then(|g| serde_json::to_string(g).ok());
|
||||
|
||||
let performance = skimmed_gateways
|
||||
.iter()
|
||||
.find(|g| g.ed25519_identity_pubkey.eq(identity_key))
|
||||
.map(|g| g.performance)
|
||||
.unwrap_or_default()
|
||||
.round_to_integer();
|
||||
|
||||
gateway_records.push(GatewayRecord {
|
||||
identity_key: identity_key.to_owned(),
|
||||
bonded,
|
||||
blacklisted,
|
||||
self_described,
|
||||
explorer_pretty_bond,
|
||||
last_updated_utc,
|
||||
performance,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(gateway_records)
|
||||
}
|
||||
|
||||
fn prepare_mixnode_data(
|
||||
mixnodes: &[MixNodeBondAnnotated],
|
||||
mixnodes_described: Vec<DescribedMixNode>,
|
||||
delegation_program_members: Vec<u32>,
|
||||
) -> anyhow::Result<Vec<MixnodeRecord>> {
|
||||
let mut mixnode_records = Vec::new();
|
||||
|
||||
for mixnode in mixnodes {
|
||||
let mix_id = mixnode.mix_id();
|
||||
let identity_key = mixnode.identity_key();
|
||||
let bonded = true;
|
||||
let total_stake = decimal_to_i64(mixnode.mixnode_details.total_stake());
|
||||
let blacklisted = mixnode.blacklisted;
|
||||
let node_info = mixnode.mix_node();
|
||||
let host = node_info.host.clone();
|
||||
let http_port = node_info.http_api_port;
|
||||
// Contains all the information including what's above
|
||||
let full_details = serde_json::to_string(&mixnode)?;
|
||||
|
||||
let mixnode_described = mixnodes_described.iter().find(|m| m.bond.mix_id == mix_id);
|
||||
let self_described = mixnode_described.and_then(|v| serde_json::to_string(v).ok());
|
||||
let is_dp_delegatee = delegation_program_members.contains(&mix_id);
|
||||
|
||||
let last_updated_utc = chrono::offset::Utc::now().timestamp();
|
||||
|
||||
mixnode_records.push(MixnodeRecord {
|
||||
mix_id,
|
||||
identity_key: identity_key.to_owned(),
|
||||
bonded,
|
||||
total_stake,
|
||||
host,
|
||||
http_port,
|
||||
blacklisted,
|
||||
full_details,
|
||||
self_described,
|
||||
last_updated_utc,
|
||||
is_dp_delegatee,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(mixnode_records)
|
||||
}
|
||||
|
||||
// TODO dz is there a common monorepo place this can be put?
|
||||
pub trait NumericalCheckedCast<T>
|
||||
where
|
||||
T: TryFrom<Self>,
|
||||
<T as TryFrom<Self>>::Error: std::error::Error,
|
||||
Self: std::fmt::Display + Copy,
|
||||
{
|
||||
fn cast_checked(self) -> anyhow::Result<T> {
|
||||
T::try_from(self).map_err(|e| {
|
||||
anyhow::anyhow!(
|
||||
"Couldn't cast {} to {}: {}",
|
||||
self,
|
||||
std::any::type_name::<T>(),
|
||||
e
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> NumericalCheckedCast<U> for T
|
||||
where
|
||||
U: TryFrom<T>,
|
||||
<U as TryFrom<T>>::Error: std::error::Error,
|
||||
T: std::fmt::Display + Copy,
|
||||
{
|
||||
}
|
||||
|
||||
async fn calculate_stats(pool: &DbPool) -> anyhow::Result<(usize, usize)> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
let all_historical_gateways = sqlx::query_scalar!(r#"SELECT count(id) FROM gateways"#)
|
||||
.fetch_one(&mut *conn)
|
||||
.await?
|
||||
.cast_checked()?;
|
||||
|
||||
let all_historical_mixnodes = sqlx::query_scalar!(r#"SELECT count(id) FROM mixnodes"#)
|
||||
.fetch_one(&mut *conn)
|
||||
.await?
|
||||
.cast_checked()?;
|
||||
|
||||
Ok((all_historical_gateways, all_historical_mixnodes))
|
||||
}
|
||||
|
||||
async fn get_delegation_program_details(
|
||||
network_details: &NymNetworkDetails,
|
||||
nyxd_addr: &Url,
|
||||
) -> anyhow::Result<Vec<u32>> {
|
||||
let config = nym_validator_client::nyxd::Config::try_from_nym_network_details(network_details)?;
|
||||
|
||||
let client = NyxdClient::connect(config, nyxd_addr.as_str())
|
||||
.map_err(|err| anyhow::anyhow!("Couldn't connect: {}", err))?;
|
||||
|
||||
let account_id = AccountId::from_str(DELEGATION_PROGRAM_WALLET)
|
||||
.map_err(|e| anyhow!("Invalid bech32 address: {}", e))?;
|
||||
|
||||
let delegations = client.get_all_delegator_delegations(&account_id).await?;
|
||||
|
||||
let mix_ids: Vec<u32> = delegations
|
||||
.iter()
|
||||
.map(|delegation| delegation.mix_id)
|
||||
.collect();
|
||||
|
||||
Ok(mix_ids)
|
||||
}
|
||||
|
||||
fn decimal_to_i64(decimal: Decimal) -> i64 {
|
||||
// Convert the underlying Uint128 to a u128
|
||||
let atomics = decimal.atomics().u128();
|
||||
let precision = 1_000_000_000_000_000_000u128;
|
||||
|
||||
// Get the fractional part
|
||||
let fractional = atomics % precision;
|
||||
|
||||
// Get the integer part
|
||||
let integer = atomics / precision;
|
||||
|
||||
// Combine them into a float
|
||||
let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64);
|
||||
|
||||
// Limit to 6 decimal places
|
||||
let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0;
|
||||
|
||||
rounded_value as i64
|
||||
}
|
||||
|
||||
trait LogError<T, E> {
|
||||
fn log_error(self, msg: &str) -> Result<T, E>;
|
||||
}
|
||||
|
||||
impl<T, E> LogError<T, E> for anyhow::Result<T, E>
|
||||
where
|
||||
E: std::error::Error,
|
||||
{
|
||||
fn log_error(self, msg: &str) -> Result<T, E> {
|
||||
if let Err(e) = &self {
|
||||
tracing::error!("[{msg}]:\t{e}");
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
use crate::db::models::GatewayIdentityDto;
|
||||
use crate::db::DbPool;
|
||||
use futures_util::TryStreamExt;
|
||||
use std::time::Duration;
|
||||
use tracing::instrument;
|
||||
|
||||
pub(crate) mod models;
|
||||
mod queue;
|
||||
pub(crate) use queue::now_utc;
|
||||
|
||||
pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tracing::info!("Spawning testruns...");
|
||||
|
||||
if let Err(e) = run(&pool).await {
|
||||
tracing::error!("Cron job failed: {}", e);
|
||||
}
|
||||
tracing::debug!("Sleeping for {}s...", refresh_interval.as_secs());
|
||||
tokio::time::sleep(refresh_interval).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO dz make number of max agents configurable
|
||||
|
||||
// TODO dz periodically clean up stale pending testruns
|
||||
#[instrument(level = "debug", name = "testrun_queue", skip_all)]
|
||||
async fn run(pool: &DbPool) -> anyhow::Result<()> {
|
||||
if pool.is_closed() {
|
||||
tracing::debug!("DB pool closed, returning early");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut conn = pool.acquire().await?;
|
||||
|
||||
let gateways = sqlx::query_as!(
|
||||
GatewayIdentityDto,
|
||||
r#"SELECT
|
||||
gateway_identity_key as "gateway_identity_key!",
|
||||
bonded as "bonded: bool"
|
||||
FROM gateways
|
||||
ORDER BY last_testrun_utc"#,
|
||||
)
|
||||
.fetch(&mut *conn)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
// TODO dz this filtering could be done in SQL
|
||||
let gateways: Vec<GatewayIdentityDto> = gateways.into_iter().filter(|g| g.bonded).collect();
|
||||
|
||||
tracing::debug!("Trying to queue {} testruns", gateways.len());
|
||||
let mut testruns_created = 0;
|
||||
for gateway in gateways {
|
||||
if let Err(e) = queue::try_queue_testrun(
|
||||
&mut conn,
|
||||
gateway.gateway_identity_key.clone(),
|
||||
// TODO dz read from config
|
||||
"127.0.0.1".to_string(),
|
||||
)
|
||||
.await
|
||||
// TODO dz measure how many were actually inserted and how many were skipped
|
||||
{
|
||||
tracing::debug!(
|
||||
"Skipping test for identity {} with error {}",
|
||||
&gateway.gateway_identity_key,
|
||||
e
|
||||
);
|
||||
} else {
|
||||
testruns_created += 1;
|
||||
}
|
||||
}
|
||||
tracing::debug!("{} testruns queued in total", testruns_created);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GatewayIdentityDto {
|
||||
pub gateway_identity_key: String,
|
||||
pub bonded: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, utoipa::ToSchema)]
|
||||
pub struct TestRun {
|
||||
pub id: u32,
|
||||
pub identity_key: String,
|
||||
pub status: String,
|
||||
pub log: String,
|
||||
}
|
||||
@@ -0,0 +1,118 @@
|
||||
use crate::db::models::{GatewayInfoDto, TestRunDto, TestRunStatus};
|
||||
use crate::testruns::models::TestRun;
|
||||
use anyhow::anyhow;
|
||||
use chrono::DateTime;
|
||||
use futures_util::TryStreamExt;
|
||||
use sqlx::pool::PoolConnection;
|
||||
use sqlx::Sqlite;
|
||||
use std::time::SystemTime;
|
||||
|
||||
pub(crate) async fn try_queue_testrun(
|
||||
conn: &mut PoolConnection<Sqlite>,
|
||||
identity_key: String,
|
||||
ip_address: String,
|
||||
) -> anyhow::Result<TestRun> {
|
||||
let timestamp = now_utc().timestamp();
|
||||
let timestamp_pretty = now_utc_as_rfc3339();
|
||||
|
||||
let items = sqlx::query_as!(
|
||||
GatewayInfoDto,
|
||||
r#"SELECT
|
||||
id as "id!",
|
||||
gateway_identity_key as "gateway_identity_key!",
|
||||
self_described as "self_described?",
|
||||
explorer_pretty_bond as "explorer_pretty_bond?"
|
||||
FROM gateways
|
||||
WHERE gateway_identity_key = ?
|
||||
ORDER BY gateway_identity_key
|
||||
LIMIT 1"#,
|
||||
identity_key,
|
||||
)
|
||||
// TODO dz shoudl call .fetch_one
|
||||
// TODO dz replace this in other queries as well
|
||||
.fetch(conn.as_mut())
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
let gateway = items
|
||||
.iter()
|
||||
.find(|g| g.gateway_identity_key == identity_key);
|
||||
|
||||
// TODO dz if let Some() = gateway.first() ...
|
||||
if gateway.is_none() {
|
||||
return Err(anyhow!("Unknown gateway {identity_key}"));
|
||||
}
|
||||
let gateway_id = gateway.unwrap().id;
|
||||
|
||||
//
|
||||
// check if there is already a test run for this gateway
|
||||
//
|
||||
let items = sqlx::query_as!(
|
||||
TestRunDto,
|
||||
r#"SELECT
|
||||
id as "id!",
|
||||
gateway_id as "gateway_id!",
|
||||
status as "status!",
|
||||
timestamp_utc as "timestamp_utc!",
|
||||
ip_address as "ip_address!",
|
||||
log as "log!"
|
||||
FROM testruns
|
||||
WHERE gateway_id = ? AND status != 2
|
||||
ORDER BY id DESC
|
||||
LIMIT 1"#,
|
||||
gateway_id,
|
||||
)
|
||||
.fetch(conn.as_mut())
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
if !items.is_empty() {
|
||||
let testrun = items.first().unwrap();
|
||||
return Ok(TestRun {
|
||||
id: testrun.id as u32,
|
||||
identity_key,
|
||||
status: format!(
|
||||
"{}",
|
||||
TestRunStatus::from_repr(testrun.status as u8).unwrap()
|
||||
),
|
||||
log: testrun.log.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
//
|
||||
// save test run
|
||||
//
|
||||
let status = TestRunStatus::Pending as u32;
|
||||
let log = format!(
|
||||
"Test for {identity_key} requested at {} UTC\n\n",
|
||||
timestamp_pretty
|
||||
);
|
||||
|
||||
let id = sqlx::query!(
|
||||
"INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)",
|
||||
gateway_id,
|
||||
status,
|
||||
ip_address,
|
||||
timestamp,
|
||||
log,
|
||||
)
|
||||
.execute(conn.as_mut())
|
||||
.await?
|
||||
.last_insert_rowid();
|
||||
|
||||
Ok(TestRun {
|
||||
id: id as u32,
|
||||
identity_key,
|
||||
status: format!("{}", TestRunStatus::Pending),
|
||||
log,
|
||||
})
|
||||
}
|
||||
|
||||
// TODO dz do we need these?
|
||||
pub fn now_utc() -> DateTime<chrono::Utc> {
|
||||
SystemTime::now().into()
|
||||
}
|
||||
|
||||
pub fn now_utc_as_rfc3339() -> String {
|
||||
now_utc().to_rfc3339()
|
||||
}
|
||||
@@ -4,7 +4,9 @@
|
||||
mod old_config_v1;
|
||||
mod old_config_v2;
|
||||
mod old_config_v3;
|
||||
mod old_config_v4;
|
||||
|
||||
pub use old_config_v1::try_upgrade_config_v1;
|
||||
pub use old_config_v2::try_upgrade_config_v2;
|
||||
pub use old_config_v3::try_upgrade_config_v3;
|
||||
pub use old_config_v4::try_upgrade_config_v4;
|
||||
|
||||
@@ -4,13 +4,7 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use crate::{config::*, error::KeyIOFailure};
|
||||
use entry_gateway::Debug as EntryGatewayConfigDebug;
|
||||
use exit_gateway::{IpPacketRouter, IpPacketRouterDebug, NetworkRequester, NetworkRequesterDebug};
|
||||
use mixnode::{Verloc, VerlocDebug};
|
||||
use nym_client_core_config_types::{
|
||||
disk_persistence::{ClientKeysPaths, CommonClientPaths},
|
||||
DebugConfig as ClientDebugConfig,
|
||||
};
|
||||
use nym_client_core_config_types::DebugConfig as ClientDebugConfig;
|
||||
use nym_config::serde_helpers::de_maybe_port;
|
||||
use nym_crypto::asymmetric::{ed25519, x25519};
|
||||
use nym_network_requester::{
|
||||
@@ -19,6 +13,7 @@ use nym_network_requester::{
|
||||
};
|
||||
use nym_pemstore::{store_key, store_keypair};
|
||||
use nym_sphinx_acknowledgements::AckKey;
|
||||
use old_configs::old_config_v4::*;
|
||||
use persistence::*;
|
||||
use rand::rngs::OsRng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -90,12 +85,12 @@ pub enum NodeModeV3 {
|
||||
ExitGateway,
|
||||
}
|
||||
|
||||
impl From<NodeModeV3> for NodeMode {
|
||||
impl From<NodeModeV3> for NodeModeV4 {
|
||||
fn from(config: NodeModeV3) -> Self {
|
||||
match config {
|
||||
NodeModeV3::Mixnode => NodeMode::Mixnode,
|
||||
NodeModeV3::EntryGateway => NodeMode::EntryGateway,
|
||||
NodeModeV3::ExitGateway => NodeMode::ExitGateway,
|
||||
NodeModeV3::Mixnode => NodeModeV4::Mixnode,
|
||||
NodeModeV3::EntryGateway => NodeModeV4::EntryGateway,
|
||||
NodeModeV3::ExitGateway => NodeModeV4::ExitGateway,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -601,23 +596,6 @@ impl AuthenticatorPathsV3 {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_common_client_paths(&self) -> CommonClientPaths {
|
||||
CommonClientPaths {
|
||||
keys: ClientKeysPaths {
|
||||
private_identity_key_file: self.private_ed25519_identity_key_file.clone(),
|
||||
public_identity_key_file: self.public_ed25519_identity_key_file.clone(),
|
||||
private_encryption_key_file: self.private_x25519_diffie_hellman_key_file.clone(),
|
||||
public_encryption_key_file: self.public_x25519_diffie_hellman_key_file.clone(),
|
||||
ack_key_file: self.ack_key_file.clone(),
|
||||
},
|
||||
gateway_registrations: self.gateway_registrations.clone(),
|
||||
|
||||
// not needed for embedded providers
|
||||
credentials_database: Default::default(),
|
||||
reply_surb_database: self.reply_surb_database.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ed25519_identity_storage_paths(&self) -> nym_pemstore::KeyPairPath {
|
||||
nym_pemstore::KeyPairPath::new(
|
||||
&self.private_ed25519_identity_key_file,
|
||||
@@ -963,7 +941,7 @@ pub async fn initialise(
|
||||
pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
path: P,
|
||||
prev_config: Option<ConfigV3>,
|
||||
) -> Result<Config, NymNodeError> {
|
||||
) -> Result<ConfigV4, NymNodeError> {
|
||||
tracing::debug!("Updating from 1.1.4");
|
||||
let old_cfg = if let Some(prev_config) = prev_config {
|
||||
prev_config
|
||||
@@ -981,21 +959,21 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
.ok_or(NymNodeError::DataDirDerivationFailure)?,
|
||||
);
|
||||
|
||||
let cfg = Config {
|
||||
let cfg = ConfigV4 {
|
||||
save_path: old_cfg.save_path,
|
||||
id: old_cfg.id,
|
||||
mode: old_cfg.mode.into(),
|
||||
host: Host {
|
||||
host: HostV4 {
|
||||
public_ips: old_cfg.host.public_ips,
|
||||
hostname: old_cfg.host.hostname,
|
||||
location: old_cfg.host.location,
|
||||
},
|
||||
mixnet: Mixnet {
|
||||
mixnet: MixnetV4 {
|
||||
bind_address: old_cfg.mixnet.bind_address,
|
||||
announce_port: None,
|
||||
nym_api_urls: old_cfg.mixnet.nym_api_urls,
|
||||
nyxd_urls: old_cfg.mixnet.nyxd_urls,
|
||||
debug: MixnetDebug {
|
||||
debug: MixnetDebugV4 {
|
||||
packet_forwarding_initial_backoff: old_cfg
|
||||
.mixnet
|
||||
.debug
|
||||
@@ -1009,8 +987,8 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
unsafe_disable_noise: old_cfg.mixnet.debug.unsafe_disable_noise,
|
||||
},
|
||||
},
|
||||
storage_paths: NymNodePaths {
|
||||
keys: KeysPaths {
|
||||
storage_paths: NymNodePathsV4 {
|
||||
keys: KeysPathsV4 {
|
||||
private_ed25519_identity_key_file: old_cfg
|
||||
.storage_paths
|
||||
.keys
|
||||
@@ -1038,7 +1016,7 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
},
|
||||
description: old_cfg.storage_paths.description,
|
||||
},
|
||||
http: Http {
|
||||
http: HttpV4 {
|
||||
bind_address: old_cfg.http.bind_address,
|
||||
landing_page_assets_path: old_cfg.http.landing_page_assets_path,
|
||||
access_token: old_cfg.http.access_token,
|
||||
@@ -1046,13 +1024,13 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
expose_system_hardware: old_cfg.http.expose_system_hardware,
|
||||
expose_crypto_hardware: old_cfg.http.expose_crypto_hardware,
|
||||
},
|
||||
wireguard: Wireguard {
|
||||
wireguard: WireguardV4 {
|
||||
enabled: old_cfg.wireguard.enabled,
|
||||
bind_address: old_cfg.wireguard.bind_address,
|
||||
private_ip: old_cfg.wireguard.private_ip,
|
||||
announced_port: old_cfg.wireguard.announced_port,
|
||||
private_network_prefix: old_cfg.wireguard.private_network_prefix,
|
||||
storage_paths: WireguardPaths {
|
||||
storage_paths: WireguardPathsV4 {
|
||||
private_diffie_hellman_key_file: old_cfg
|
||||
.wireguard
|
||||
.storage_paths
|
||||
@@ -1063,12 +1041,12 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
.public_diffie_hellman_key_file,
|
||||
},
|
||||
},
|
||||
mixnode: MixnodeConfig {
|
||||
storage_paths: MixnodePaths {},
|
||||
verloc: Verloc {
|
||||
mixnode: MixnodeConfigV4 {
|
||||
storage_paths: MixnodePathsV4 {},
|
||||
verloc: VerlocV4 {
|
||||
bind_address: old_cfg.mixnode.verloc.bind_address,
|
||||
announce_port: None,
|
||||
debug: VerlocDebug {
|
||||
debug: VerlocDebugV4 {
|
||||
packets_per_node: old_cfg.mixnode.verloc.debug.packets_per_node,
|
||||
connection_timeout: old_cfg.mixnode.verloc.debug.connection_timeout,
|
||||
packet_timeout: old_cfg.mixnode.verloc.debug.packet_timeout,
|
||||
@@ -1078,16 +1056,16 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
retry_timeout: old_cfg.mixnode.verloc.debug.retry_timeout,
|
||||
},
|
||||
},
|
||||
debug: mixnode::Debug {
|
||||
debug: DebugV4 {
|
||||
node_stats_logging_delay: old_cfg.mixnode.debug.node_stats_logging_delay,
|
||||
node_stats_updating_delay: old_cfg.mixnode.debug.node_stats_updating_delay,
|
||||
},
|
||||
},
|
||||
entry_gateway: EntryGatewayConfig {
|
||||
storage_paths: EntryGatewayPaths {
|
||||
entry_gateway: EntryGatewayConfigV4 {
|
||||
storage_paths: EntryGatewayPathsV4 {
|
||||
clients_storage: old_cfg.entry_gateway.storage_paths.clients_storage,
|
||||
cosmos_mnemonic: old_cfg.entry_gateway.storage_paths.cosmos_mnemonic,
|
||||
authenticator: AuthenticatorPaths {
|
||||
authenticator: AuthenticatorPathsV4 {
|
||||
private_ed25519_identity_key_file: old_cfg
|
||||
.entry_gateway
|
||||
.storage_paths
|
||||
@@ -1129,16 +1107,16 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
bind_address: old_cfg.entry_gateway.bind_address,
|
||||
announce_ws_port: old_cfg.entry_gateway.announce_ws_port,
|
||||
announce_wss_port: old_cfg.entry_gateway.announce_wss_port,
|
||||
debug: EntryGatewayConfigDebug {
|
||||
debug: EntryGatewayConfigDebugV4 {
|
||||
message_retrieval_limit: old_cfg.entry_gateway.debug.message_retrieval_limit,
|
||||
// \/ ADDED
|
||||
zk_nym_tickets: Default::default(),
|
||||
},
|
||||
},
|
||||
exit_gateway: ExitGatewayConfig {
|
||||
storage_paths: ExitGatewayPaths {
|
||||
exit_gateway: ExitGatewayConfigV4 {
|
||||
storage_paths: ExitGatewayPathsV4 {
|
||||
clients_storage: exit_gateway_paths.clients_storage,
|
||||
network_requester: NetworkRequesterPaths {
|
||||
network_requester: NetworkRequesterPathsV4 {
|
||||
private_ed25519_identity_key_file: old_cfg
|
||||
.exit_gateway
|
||||
.storage_paths
|
||||
@@ -1175,7 +1153,7 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
.network_requester
|
||||
.gateway_registrations,
|
||||
},
|
||||
ip_packet_router: IpPacketRouterPaths {
|
||||
ip_packet_router: IpPacketRouterPathsV4 {
|
||||
private_ed25519_identity_key_file: old_cfg
|
||||
.exit_gateway
|
||||
.storage_paths
|
||||
@@ -1212,7 +1190,7 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
.ip_packet_router
|
||||
.gateway_registrations,
|
||||
},
|
||||
authenticator: AuthenticatorPaths {
|
||||
authenticator: AuthenticatorPathsV4 {
|
||||
private_ed25519_identity_key_file: old_cfg
|
||||
.exit_gateway
|
||||
.storage_paths
|
||||
@@ -1252,8 +1230,8 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
},
|
||||
open_proxy: old_cfg.exit_gateway.open_proxy,
|
||||
upstream_exit_policy_url: old_cfg.exit_gateway.upstream_exit_policy_url,
|
||||
network_requester: NetworkRequester {
|
||||
debug: NetworkRequesterDebug {
|
||||
network_requester: NetworkRequesterV4 {
|
||||
debug: NetworkRequesterDebugV4 {
|
||||
enabled: old_cfg.exit_gateway.network_requester.debug.enabled,
|
||||
disable_poisson_rate: old_cfg
|
||||
.exit_gateway
|
||||
@@ -1263,8 +1241,8 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
client_debug: old_cfg.exit_gateway.network_requester.debug.client_debug,
|
||||
},
|
||||
},
|
||||
ip_packet_router: IpPacketRouter {
|
||||
debug: IpPacketRouterDebug {
|
||||
ip_packet_router: IpPacketRouterV4 {
|
||||
debug: IpPacketRouterDebugV4 {
|
||||
enabled: old_cfg.exit_gateway.ip_packet_router.debug.enabled,
|
||||
disable_poisson_rate: old_cfg
|
||||
.exit_gateway
|
||||
@@ -1277,7 +1255,7 @@ pub async fn try_upgrade_config_v3<P: AsRef<Path>>(
|
||||
debug: Default::default(),
|
||||
},
|
||||
authenticator: Default::default(),
|
||||
logging: LoggingSettings {},
|
||||
logging: LoggingSettingsV4 {},
|
||||
};
|
||||
|
||||
Ok(cfg)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -24,6 +24,7 @@ pub const DEFAULT_NYMNODE_DESCRIPTION_FILENAME: &str = "description.toml";
|
||||
|
||||
// Entry Gateway:
|
||||
pub const DEFAULT_CLIENTS_STORAGE_FILENAME: &str = "clients.sqlite";
|
||||
pub const DEFAULT_STATS_STORAGE_FILENAME: &str = "stats.sqlite";
|
||||
pub const DEFAULT_MNEMONIC_FILENAME: &str = "cosmos_mnemonic";
|
||||
|
||||
// Exit Gateway:
|
||||
@@ -147,6 +148,9 @@ pub struct EntryGatewayPaths {
|
||||
/// derived shared keys, available client bandwidths and wireguard peers.
|
||||
pub clients_storage: PathBuf,
|
||||
|
||||
/// Path to sqlite database containing all persistent stats data.
|
||||
pub stats_storage: PathBuf,
|
||||
|
||||
/// Path to file containing cosmos account mnemonic used for zk-nym redemption.
|
||||
pub cosmos_mnemonic: PathBuf,
|
||||
|
||||
@@ -157,6 +161,7 @@ impl EntryGatewayPaths {
|
||||
pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
|
||||
EntryGatewayPaths {
|
||||
clients_storage: data_dir.as_ref().join(DEFAULT_CLIENTS_STORAGE_FILENAME),
|
||||
stats_storage: data_dir.as_ref().join(DEFAULT_STATS_STORAGE_FILENAME),
|
||||
cosmos_mnemonic: data_dir.as_ref().join(DEFAULT_MNEMONIC_FILENAME),
|
||||
authenticator: AuthenticatorPaths::new(data_dir),
|
||||
}
|
||||
@@ -207,6 +212,9 @@ pub struct ExitGatewayPaths {
|
||||
/// derived shared keys, available client bandwidths and wireguard peers.
|
||||
pub clients_storage: PathBuf,
|
||||
|
||||
/// Path to sqlite database containing all persistent stats data.
|
||||
pub stats_storage: PathBuf,
|
||||
|
||||
pub network_requester: NetworkRequesterPaths,
|
||||
|
||||
pub ip_packet_router: IpPacketRouterPaths,
|
||||
@@ -459,6 +467,7 @@ impl ExitGatewayPaths {
|
||||
let data_dir = data_dir.as_ref();
|
||||
ExitGatewayPaths {
|
||||
clients_storage: data_dir.join(DEFAULT_CLIENTS_STORAGE_FILENAME),
|
||||
stats_storage: data_dir.join(DEFAULT_STATS_STORAGE_FILENAME),
|
||||
network_requester: NetworkRequesterPaths::new(data_dir),
|
||||
ip_packet_router: IpPacketRouterPaths::new(data_dir),
|
||||
authenticator: AuthenticatorPaths::new(data_dir),
|
||||
|
||||
@@ -185,6 +185,9 @@ announce_wss_port = {{#if entry_gateway.announce_wss_port }} {{ entry_gateway.an
|
||||
# derived shared keys, available client bandwidths and wireguard peers.
|
||||
clients_storage = '{{ entry_gateway.storage_paths.clients_storage }}'
|
||||
|
||||
# Path to sqlite database containing all persistent stats data.
|
||||
stats_storage = '{{ entry_gateway.storage_paths.stats_storage }}'
|
||||
|
||||
# Path to file containing cosmos account mnemonic used for zk-nym redemption.
|
||||
cosmos_mnemonic = '{{ entry_gateway.storage_paths.cosmos_mnemonic }}'
|
||||
|
||||
@@ -237,6 +240,10 @@ upstream_exit_policy_url = '{{ exit_gateway.upstream_exit_policy_url }}'
|
||||
# derived shared keys, available client bandwidths and wireguard peers.
|
||||
clients_storage = '{{ exit_gateway.storage_paths.clients_storage }}'
|
||||
|
||||
# Path to sqlite database containing all persistent stats data.
|
||||
stats_storage = '{{ exit_gateway.storage_paths.stats_storage }}'
|
||||
|
||||
|
||||
[exit_gateway.storage_paths.network_requester]
|
||||
# Path to file containing network requester ed25519 identity private key.
|
||||
private_ed25519_identity_key_file = '{{ exit_gateway.storage_paths.network_requester.private_ed25519_identity_key_file }}'
|
||||
|
||||
@@ -10,7 +10,8 @@ use std::path::Path;
|
||||
async fn try_upgrade_config(path: &Path) -> Result<(), NymNodeError> {
|
||||
let cfg = try_upgrade_config_v1(path, None).await.ok();
|
||||
let cfg = try_upgrade_config_v2(path, cfg).await.ok();
|
||||
match try_upgrade_config_v3(path, cfg).await {
|
||||
let cfg = try_upgrade_config_v3(path, cfg).await.ok();
|
||||
match try_upgrade_config_v4(path, cfg).await {
|
||||
Ok(cfg) => cfg.save(),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to finish upgrade - {e}");
|
||||
|
||||
@@ -67,6 +67,7 @@ impl MixnodeData {
|
||||
pub struct EntryGatewayData {
|
||||
mnemonic: Zeroizing<bip39::Mnemonic>,
|
||||
client_storage: nym_gateway::node::PersistentStorage,
|
||||
stats_storage: nym_gateway::node::PersistentStatsStorage,
|
||||
sessions_stats: SharedSessionStats,
|
||||
}
|
||||
|
||||
@@ -94,6 +95,11 @@ impl EntryGatewayData {
|
||||
)
|
||||
.await
|
||||
.map_err(nym_gateway::GatewayError::from)?,
|
||||
stats_storage: nym_gateway::node::PersistentStatsStorage::init(
|
||||
&config.storage_paths.stats_storage,
|
||||
)
|
||||
.await
|
||||
.map_err(nym_gateway::GatewayError::from)?,
|
||||
sessions_stats: SharedSessionStats::new(),
|
||||
})
|
||||
}
|
||||
@@ -114,6 +120,7 @@ pub struct ExitGatewayData {
|
||||
auth_x25519: x25519::PublicKey,
|
||||
|
||||
client_storage: nym_gateway::node::PersistentStorage,
|
||||
stats_storage: nym_gateway::node::PersistentStatsStorage,
|
||||
}
|
||||
|
||||
impl ExitGatewayData {
|
||||
@@ -262,6 +269,11 @@ impl ExitGatewayData {
|
||||
.await
|
||||
.map_err(nym_gateway::GatewayError::from)?;
|
||||
|
||||
let stats_storage =
|
||||
nym_gateway::node::PersistentStatsStorage::init(&config.storage_paths.stats_storage)
|
||||
.await
|
||||
.map_err(nym_gateway::GatewayError::from)?;
|
||||
|
||||
Ok(ExitGatewayData {
|
||||
nr_ed25519,
|
||||
nr_x25519,
|
||||
@@ -270,6 +282,7 @@ impl ExitGatewayData {
|
||||
auth_ed25519,
|
||||
auth_x25519,
|
||||
client_storage,
|
||||
stats_storage,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -580,6 +593,7 @@ impl NymNode {
|
||||
self.ed25519_identity_keys.clone(),
|
||||
self.x25519_sphinx_keys.clone(),
|
||||
self.entry_gateway.client_storage.clone(),
|
||||
self.entry_gateway.stats_storage.clone(),
|
||||
);
|
||||
entry_gateway.disable_http_server();
|
||||
entry_gateway.set_task_client(task_client);
|
||||
@@ -610,6 +624,7 @@ impl NymNode {
|
||||
self.ed25519_identity_keys.clone(),
|
||||
self.x25519_sphinx_keys.clone(),
|
||||
self.exit_gateway.client_storage.clone(),
|
||||
self.exit_gateway.stats_storage.clone(),
|
||||
);
|
||||
exit_gateway.disable_http_server();
|
||||
exit_gateway.set_task_client(task_client);
|
||||
|
||||
@@ -107,6 +107,7 @@ impl Config {
|
||||
nyxd_scraper: NyxdScraper {
|
||||
websocket_url,
|
||||
pruning: Default::default(),
|
||||
store_precommits: true,
|
||||
},
|
||||
base: Base {
|
||||
upstream_nyxd: nyxd_url,
|
||||
@@ -122,6 +123,7 @@ impl Config {
|
||||
rpc_url: self.base.upstream_nyxd.clone(),
|
||||
database_path: self.storage_paths.nyxd_scraper.clone(),
|
||||
pruning_options: self.nyxd_scraper.pruning,
|
||||
store_precommits: self.nyxd_scraper.store_precommits,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -249,7 +251,14 @@ pub struct NyxdScraper {
|
||||
// if the value is missing, use `nothing` pruning as this was the past behaviour
|
||||
#[serde(default = "PruningOptions::nothing")]
|
||||
pub pruning: PruningOptions,
|
||||
// TODO: debug with everything that's currently hardcoded in the scraper
|
||||
|
||||
/// Specifies whether to store pre-commits within the database.
|
||||
#[serde(default = "default_store_precommits")]
|
||||
pub store_precommits: bool,
|
||||
}
|
||||
|
||||
fn default_store_precommits() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
impl NyxdScraper {
|
||||
|
||||
@@ -19,6 +19,7 @@ pub mod ecash;
|
||||
mod init;
|
||||
mod list_gateways;
|
||||
mod peer_handler;
|
||||
mod request;
|
||||
mod run;
|
||||
mod sign;
|
||||
mod switch_gateway;
|
||||
@@ -69,6 +70,9 @@ pub(crate) enum Commands {
|
||||
/// parameters.
|
||||
Run(run::Run),
|
||||
|
||||
/// Make a dummy request to a running authenticator
|
||||
Request(request::Request),
|
||||
|
||||
/// Ecash-related functionalities
|
||||
Ecash(Ecash),
|
||||
|
||||
@@ -127,6 +131,7 @@ pub(crate) async fn execute(args: Cli) -> Result<(), AuthenticatorError> {
|
||||
match args.command {
|
||||
Commands::Init(m) => init::execute(m).await?,
|
||||
Commands::Run(m) => run::execute(&m).await?,
|
||||
Commands::Request(r) => request::execute(&r).await?,
|
||||
Commands::Ecash(ecash) => ecash.execute().await?,
|
||||
Commands::ListGateways(args) => list_gateways::execute(args).await?,
|
||||
Commands::AddGateway(args) => add_gateway::execute(args).await?,
|
||||
|
||||
@@ -0,0 +1,142 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::cli::AuthenticatorError;
|
||||
use crate::cli::{override_config, OverrideConfig};
|
||||
use crate::cli::{try_load_current_config, version_check};
|
||||
use clap::{Args, Subcommand};
|
||||
use nym_authenticator_requests::latest::{
|
||||
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage},
|
||||
request::{AuthenticatorRequest, AuthenticatorRequestData},
|
||||
};
|
||||
use nym_client_core::cli_helpers::client_run::CommonClientRunArgs;
|
||||
use nym_sdk::mixnet::{MixnetMessageSender, Recipient, TransmissionLane};
|
||||
use nym_task::TaskHandle;
|
||||
use nym_wireguard_types::PeerPublicKey;
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
#[allow(clippy::struct_excessive_bools)]
|
||||
#[derive(Args, Clone)]
|
||||
pub(crate) struct Request {
|
||||
#[command(flatten)]
|
||||
common_args: CommonClientRunArgs,
|
||||
|
||||
#[command(subcommand)]
|
||||
request: RequestType,
|
||||
|
||||
authenticator_recipient: String,
|
||||
}
|
||||
|
||||
impl From<Request> for OverrideConfig {
|
||||
fn from(request_config: Request) -> Self {
|
||||
OverrideConfig {
|
||||
nym_apis: None,
|
||||
nyxd_urls: request_config.common_args.nyxd_urls,
|
||||
enabled_credentials_mode: request_config.common_args.enabled_credentials_mode,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Subcommand)]
|
||||
pub(crate) enum RequestType {
|
||||
Initial(Initial),
|
||||
Final(Final),
|
||||
QueryBandwidth(QueryBandwidth),
|
||||
}
|
||||
|
||||
#[derive(Args, Clone, Debug)]
|
||||
pub(crate) struct Initial {
|
||||
pub_key: String,
|
||||
}
|
||||
|
||||
#[derive(Args, Clone, Debug)]
|
||||
pub(crate) struct Final {
|
||||
pub_key: String,
|
||||
private_ip: String,
|
||||
mac: String,
|
||||
}
|
||||
|
||||
#[derive(Args, Clone, Debug)]
|
||||
pub(crate) struct QueryBandwidth {
|
||||
pub_key: String,
|
||||
}
|
||||
|
||||
impl TryFrom<RequestType> for AuthenticatorRequestData {
|
||||
type Error = AuthenticatorError;
|
||||
|
||||
fn try_from(value: RequestType) -> Result<Self, Self::Error> {
|
||||
let ret = match value {
|
||||
RequestType::Initial(req) => AuthenticatorRequestData::Initial(InitMessage::new(
|
||||
PeerPublicKey::from_str(&req.pub_key)?,
|
||||
)),
|
||||
RequestType::Final(req) => AuthenticatorRequestData::Final(Box::new(FinalMessage {
|
||||
gateway_client: GatewayClient {
|
||||
pub_key: PeerPublicKey::from_str(&req.pub_key)?,
|
||||
private_ip: IpAddr::from_str(&req.private_ip)?,
|
||||
mac: ClientMac::from_str(&req.mac)?,
|
||||
},
|
||||
credential: None,
|
||||
})),
|
||||
RequestType::QueryBandwidth(req) => {
|
||||
AuthenticatorRequestData::QueryBandwidth(PeerPublicKey::from_str(&req.pub_key)?)
|
||||
}
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn execute(args: &Request) -> Result<(), AuthenticatorError> {
|
||||
let mut config = try_load_current_config(&args.common_args.id).await?;
|
||||
config = override_config(config, OverrideConfig::from(args.clone()));
|
||||
|
||||
if !version_check(&config) {
|
||||
log::error!("failed the local version check");
|
||||
return Err(AuthenticatorError::FailedLocalVersionCheck);
|
||||
}
|
||||
|
||||
let shutdown = TaskHandle::default();
|
||||
let mixnet_client = nym_authenticator::mixnet_client::create_mixnet_client(
|
||||
&config.base,
|
||||
shutdown.get_handle().named("nym_sdk::MixnetClient"),
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
&config.storage_paths.common_paths,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let request_data = AuthenticatorRequestData::try_from(args.request.clone())?;
|
||||
let authenticator_recipient = Recipient::from_str(&args.authenticator_recipient)?;
|
||||
let (request, _) = match request_data {
|
||||
AuthenticatorRequestData::Initial(init_message) => {
|
||||
AuthenticatorRequest::new_initial_request(init_message, *mixnet_client.nym_address())
|
||||
}
|
||||
AuthenticatorRequestData::Final(final_message) => {
|
||||
AuthenticatorRequest::new_final_request(*final_message, *mixnet_client.nym_address())
|
||||
}
|
||||
AuthenticatorRequestData::QueryBandwidth(query_message) => {
|
||||
AuthenticatorRequest::new_query_request(query_message, *mixnet_client.nym_address())
|
||||
}
|
||||
AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
|
||||
AuthenticatorRequest::new_topup_request(*top_up_message, *mixnet_client.nym_address())
|
||||
}
|
||||
};
|
||||
mixnet_client
|
||||
.split_sender()
|
||||
.send(nym_sdk::mixnet::InputMessage::new_regular(
|
||||
authenticator_recipient,
|
||||
request.to_bytes().unwrap(),
|
||||
TransmissionLane::General,
|
||||
None,
|
||||
))
|
||||
.await
|
||||
.map_err(|source| AuthenticatorError::FailedToSendPacketToMixnet { source })?;
|
||||
|
||||
log::info!("Sent request, sleeping 60 seconds or until killed");
|
||||
sleep(Duration::from_secs(60)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -88,6 +88,18 @@ pub enum AuthenticatorError {
|
||||
|
||||
#[error("storage should have the requested bandwidht entry")]
|
||||
MissingClientBandwidthEntry,
|
||||
|
||||
#[error("{0}")]
|
||||
PublicKey(#[from] nym_wireguard_types::Error),
|
||||
|
||||
#[error("{0}")]
|
||||
IpAddr(#[from] std::net::AddrParseError),
|
||||
|
||||
#[error("{0}")]
|
||||
AuthenticatorRequests(#[from] nym_authenticator_requests::Error),
|
||||
|
||||
#[error("{0}")]
|
||||
RecipientFormatting(#[from] nym_sdk::mixnet::RecipientFormattingError),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, AuthenticatorError>;
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user