Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1bdcf9c3cf | |||
| 4ebb9cd239 | |||
| 620d68ea2f | |||
| b747308f74 | |||
| afdd721cc3 | |||
| 9f5c4c5968 | |||
| 9583a5c6c8 | |||
| da60fc0ade | |||
| 96b54c455e | |||
| cc983963d4 | |||
| 40d9321aec | |||
| e5a29cc76e | |||
| 56c55f6b95 | |||
| 2f051fd943 | |||
| c03cf86000 | |||
| ab11508235 | |||
| e65bfaeb31 | |||
| 5a6982fd10 | |||
| 7abe1f505c |
@@ -0,0 +1,55 @@
|
||||
name: Build and upload Data observatory container to harbor.nymte.ch
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
WORKING_DIRECTORY: "nym-data-observatory"
|
||||
CONTAINER_NAME: "data-observatory"
|
||||
|
||||
jobs:
|
||||
build-container:
|
||||
runs-on: arc-ubuntu-22.04-dind
|
||||
steps:
|
||||
- name: Login to Harbor
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: harbor.nymte.ch
|
||||
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
|
||||
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
|
||||
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Configure git identity
|
||||
run: |
|
||||
git config --global user.email "lawrence@nymtech.net"
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.3
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
- name: Check if tag exists
|
||||
run: |
|
||||
if git rev-parse ${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
|
||||
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
|
||||
fi
|
||||
|
||||
- name: Remove existing tag if exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
|
||||
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
fi
|
||||
|
||||
- name: Create tag
|
||||
run: |
|
||||
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
|
||||
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
|
||||
- name: BuildAndPushImageOnHarbor
|
||||
run: |
|
||||
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
|
||||
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
|
||||
Generated
+841
-470
File diff suppressed because it is too large
Load Diff
+8
-8
@@ -54,6 +54,7 @@ members = [
|
||||
"common/exit-policy",
|
||||
"common/gateway-requests",
|
||||
"common/gateway-storage",
|
||||
"common/gateway-stats-storage",
|
||||
"common/http-api-client",
|
||||
"common/http-api-common",
|
||||
"common/inclusion-probability",
|
||||
@@ -189,10 +190,9 @@ aes = "0.8.1"
|
||||
aes-gcm = "0.10.1"
|
||||
aes-gcm-siv = "0.11.1"
|
||||
aead = "0.5.2"
|
||||
anyhow = "1.0.89"
|
||||
anyhow = "1.0.90"
|
||||
argon2 = "0.5.0"
|
||||
async-trait = "0.1.83"
|
||||
axum-client-ip = "0.6.1"
|
||||
axum = "0.7.5"
|
||||
axum-extra = "0.9.4"
|
||||
base64 = "0.22.1"
|
||||
@@ -215,7 +215,7 @@ chacha20 = "0.9.0"
|
||||
chacha20poly1305 = "0.10.1"
|
||||
chrono = "0.4.31"
|
||||
cipher = "0.4.3"
|
||||
clap = "4.5.18"
|
||||
clap = "4.5.20"
|
||||
clap_complete = "4.5"
|
||||
clap_complete_fig = "4.5"
|
||||
colored = "2.0"
|
||||
@@ -285,7 +285,7 @@ opentelemetry-jaeger = "0.18.0"
|
||||
parking_lot = "0.12.3"
|
||||
pem = "0.8"
|
||||
petgraph = "0.6.5"
|
||||
pin-project = "1.0"
|
||||
pin-project = "1.1"
|
||||
pin-project-lite = "0.2.14"
|
||||
pretty_env_logger = "0.4.0"
|
||||
publicsuffix = "2.2.3"
|
||||
@@ -305,7 +305,7 @@ rocket_okapi = "0.8.0"
|
||||
safer-ffi = "0.1.13"
|
||||
schemars = "0.8.21"
|
||||
semver = "1.0.23"
|
||||
serde = "1.0.210"
|
||||
serde = "1.0.211"
|
||||
serde_bytes = "0.11.15"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0.132"
|
||||
@@ -402,10 +402,10 @@ indexed_db_futures = { git = "https://github.com/TiemenSch/rust-indexed-db", bra
|
||||
js-sys = "0.3.70"
|
||||
serde-wasm-bindgen = "0.6.5"
|
||||
tsify = "0.4.5"
|
||||
wasm-bindgen = "0.2.93"
|
||||
wasm-bindgen-futures = "0.4.43"
|
||||
wasm-bindgen = "0.2.95"
|
||||
wasm-bindgen-futures = "0.4.45"
|
||||
wasmtimer = "0.2.0"
|
||||
web-sys = "0.3.70"
|
||||
web-sys = "0.3.72"
|
||||
|
||||
|
||||
# Profile settings for individual crates
|
||||
|
||||
@@ -27,7 +27,7 @@ pub type HmacSha256 = Hmac<Sha256>;
|
||||
pub type Nonce = u64;
|
||||
pub type Taken = Option<SystemTime>;
|
||||
|
||||
pub const BANDWIDTH_CAP_PER_DAY: u64 = 250 * 1024 * 1024 * 1024; // 250 GB
|
||||
pub const BANDWIDTH_CAP_PER_DAY: u64 = 1024 * 1024 * 1024; // 1 GB
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct InitMessage {
|
||||
|
||||
@@ -98,16 +98,6 @@ impl TryFrom<v3::response::AuthenticatorResponse> for v2::response::Authenticato
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v2::response::AuthenticatorResponse> for v3::response::AuthenticatorResponse {
|
||||
fn from(value: v2::response::AuthenticatorResponse) -> Self {
|
||||
Self {
|
||||
protocol: value.protocol,
|
||||
data: value.data.into(),
|
||||
reply_to: value.reply_to,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<v3::response::AuthenticatorResponseData> for v2::response::AuthenticatorResponseData {
|
||||
type Error = crate::Error;
|
||||
|
||||
@@ -139,22 +129,6 @@ impl TryFrom<v3::response::AuthenticatorResponseData> for v2::response::Authenti
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v2::response::AuthenticatorResponseData> for v3::response::AuthenticatorResponseData {
|
||||
fn from(value: v2::response::AuthenticatorResponseData) -> Self {
|
||||
match value {
|
||||
v2::response::AuthenticatorResponseData::PendingRegistration(
|
||||
pending_registration_response,
|
||||
) => Self::PendingRegistration(pending_registration_response.into()),
|
||||
v2::response::AuthenticatorResponseData::Registered(registered_response) => {
|
||||
Self::Registered(registered_response.into())
|
||||
}
|
||||
v2::response::AuthenticatorResponseData::RemainingBandwidth(
|
||||
remaining_bandwidth_response,
|
||||
) => Self::RemainingBandwidth(remaining_bandwidth_response.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v3::response::PendingRegistrationResponse> for v2::response::PendingRegistrationResponse {
|
||||
fn from(value: v3::response::PendingRegistrationResponse) -> Self {
|
||||
Self {
|
||||
@@ -165,16 +139,6 @@ impl From<v3::response::PendingRegistrationResponse> for v2::response::PendingRe
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v2::response::PendingRegistrationResponse> for v3::response::PendingRegistrationResponse {
|
||||
fn from(value: v2::response::PendingRegistrationResponse) -> Self {
|
||||
Self {
|
||||
request_id: value.request_id,
|
||||
reply_to: value.reply_to,
|
||||
reply: value.reply.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v3::response::RegisteredResponse> for v2::response::RegisteredResponse {
|
||||
fn from(value: v3::response::RegisteredResponse) -> Self {
|
||||
Self {
|
||||
@@ -185,16 +149,6 @@ impl From<v3::response::RegisteredResponse> for v2::response::RegisteredResponse
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v2::response::RegisteredResponse> for v3::response::RegisteredResponse {
|
||||
fn from(value: v2::response::RegisteredResponse) -> Self {
|
||||
Self {
|
||||
request_id: value.request_id,
|
||||
reply_to: value.reply_to,
|
||||
reply: value.reply.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v3::response::RemainingBandwidthResponse> for v2::response::RemainingBandwidthResponse {
|
||||
fn from(value: v3::response::RemainingBandwidthResponse) -> Self {
|
||||
Self {
|
||||
@@ -205,16 +159,6 @@ impl From<v3::response::RemainingBandwidthResponse> for v2::response::RemainingB
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v2::response::RemainingBandwidthResponse> for v3::response::RemainingBandwidthResponse {
|
||||
fn from(value: v2::response::RemainingBandwidthResponse) -> Self {
|
||||
Self {
|
||||
request_id: value.request_id,
|
||||
reply_to: value.reply_to,
|
||||
reply: value.reply.map(Into::into),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v3::registration::RegistrationData> for v2::registration::RegistrationData {
|
||||
fn from(value: v3::registration::RegistrationData) -> Self {
|
||||
Self {
|
||||
@@ -225,16 +169,6 @@ impl From<v3::registration::RegistrationData> for v2::registration::Registration
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v2::registration::RegistrationData> for v3::registration::RegistrationData {
|
||||
fn from(value: v2::registration::RegistrationData) -> Self {
|
||||
Self {
|
||||
nonce: value.nonce,
|
||||
gateway_data: value.gateway_data.into(),
|
||||
wg_port: value.wg_port,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v3::registration::RegistredData> for v2::registration::RegistredData {
|
||||
fn from(value: v3::registration::RegistredData) -> Self {
|
||||
Self {
|
||||
@@ -245,16 +179,6 @@ impl From<v3::registration::RegistredData> for v2::registration::RegistredData {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v2::registration::RegistredData> for v3::registration::RegistredData {
|
||||
fn from(value: v2::registration::RegistredData) -> Self {
|
||||
Self {
|
||||
pub_key: value.pub_key,
|
||||
private_ip: value.private_ip,
|
||||
wg_port: value.wg_port,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v3::registration::RemainingBandwidthData> for v2::registration::RemainingBandwidthData {
|
||||
fn from(value: v3::registration::RemainingBandwidthData) -> Self {
|
||||
Self {
|
||||
@@ -262,11 +186,3 @@ impl From<v3::registration::RemainingBandwidthData> for v2::registration::Remain
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<v2::registration::RemainingBandwidthData> for v3::registration::RemainingBandwidthData {
|
||||
fn from(value: v2::registration::RemainingBandwidthData) -> Self {
|
||||
Self {
|
||||
available_bandwidth: value.available_bandwidth,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ pub type HmacSha256 = Hmac<Sha256>;
|
||||
pub type Nonce = u64;
|
||||
pub type Taken = Option<SystemTime>;
|
||||
|
||||
pub const BANDWIDTH_CAP_PER_DAY: u64 = 250 * 1024 * 1024 * 1024; // 250 GB
|
||||
pub const BANDWIDTH_CAP_PER_DAY: u64 = 1024 * 1024 * 1024; // 1 GB
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct InitMessage {
|
||||
|
||||
@@ -112,7 +112,7 @@ impl GeoAwareTopologyProvider {
|
||||
async fn get_topology(&self) -> Option<NymTopology> {
|
||||
let mixnodes = match self
|
||||
.validator_client
|
||||
.get_all_basic_active_mixing_assigned_nodes(Some(self.client_version.clone()))
|
||||
.get_basic_active_mixing_assigned_nodes(Some(self.client_version.clone()))
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
|
||||
@@ -6,6 +6,7 @@ pub(crate) use accessor::{TopologyAccessor, TopologyReadPermit};
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_sphinx::addressing::nodes::NodeIdentity;
|
||||
use nym_topology::provider_trait::TopologyProvider;
|
||||
use nym_topology::NymTopologyError;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -17,11 +18,7 @@ use wasmtimer::tokio::sleep;
|
||||
|
||||
mod accessor;
|
||||
pub mod geo_aware_provider;
|
||||
pub mod nym_api_provider;
|
||||
|
||||
pub use geo_aware_provider::GeoAwareTopologyProvider;
|
||||
pub use nym_api_provider::{Config as NymApiTopologyProviderConfig, NymApiTopologyProvider};
|
||||
pub use nym_topology::provider_trait::TopologyProvider;
|
||||
pub(crate) mod nym_api_provider;
|
||||
|
||||
// TODO: move it to config later
|
||||
const MAX_FAILURE_COUNT: usize = 10;
|
||||
|
||||
@@ -14,10 +14,9 @@ use url::Url;
|
||||
pub const DEFAULT_MIN_MIXNODE_PERFORMANCE: u8 = 50;
|
||||
pub const DEFAULT_MIN_GATEWAY_PERFORMANCE: u8 = 50;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
pub min_mixnode_performance: u8,
|
||||
pub min_gateway_performance: u8,
|
||||
pub(crate) struct Config {
|
||||
pub(crate) min_mixnode_performance: u8,
|
||||
pub(crate) min_gateway_performance: u8,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@@ -30,7 +29,7 @@ impl Default for Config {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NymApiTopologyProvider {
|
||||
pub(crate) struct NymApiTopologyProvider {
|
||||
config: Config,
|
||||
|
||||
validator_client: nym_validator_client::client::NymApiClient,
|
||||
@@ -41,7 +40,7 @@ pub struct NymApiTopologyProvider {
|
||||
}
|
||||
|
||||
impl NymApiTopologyProvider {
|
||||
pub fn new(
|
||||
pub(crate) fn new(
|
||||
config: Config,
|
||||
mut nym_api_urls: Vec<Url>,
|
||||
client_version: String,
|
||||
@@ -99,7 +98,7 @@ impl NymApiTopologyProvider {
|
||||
async fn get_current_compatible_topology(&mut self) -> Option<NymTopology> {
|
||||
let mixnodes = match self
|
||||
.validator_client
|
||||
.get_all_basic_active_mixing_assigned_nodes(Some(self.client_version.clone()))
|
||||
.get_basic_active_mixing_assigned_nodes(Some(self.client_version.clone()))
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
|
||||
@@ -121,9 +121,7 @@ pub async fn current_mixnodes<R: Rng>(
|
||||
|
||||
log::trace!("Fetching list of mixnodes from: {nym_api}");
|
||||
|
||||
let mixnodes = client
|
||||
.get_all_basic_active_mixing_assigned_nodes(None)
|
||||
.await?;
|
||||
let mixnodes = client.get_basic_active_mixing_assigned_nodes(None).await?;
|
||||
let valid_mixnodes = mixnodes
|
||||
.iter()
|
||||
.filter_map(|mixnode| mixnode.try_into().ok())
|
||||
|
||||
@@ -18,14 +18,13 @@ use nym_api_requests::ecash::{
|
||||
PartialExpirationDateSignatureResponse, VerificationKeyResponse,
|
||||
};
|
||||
use nym_api_requests::models::{
|
||||
ApiHealthResponse, GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
|
||||
NymNodeDescription, RewardEstimationResponse, StakeSaturationResponse,
|
||||
GatewayCoreStatusResponse, MixnodeCoreStatusResponse, MixnodeStatusResponse,
|
||||
RewardEstimationResponse, StakeSaturationResponse,
|
||||
};
|
||||
use nym_api_requests::models::{LegacyDescribedGateway, MixNodeBondAnnotated};
|
||||
use nym_api_requests::nym_nodes::SkimmedNode;
|
||||
use nym_coconut_dkg_common::types::EpochId;
|
||||
use nym_http_api_client::UserAgent;
|
||||
use nym_mixnet_contract_common::NymNodeDetails;
|
||||
use nym_network_defaults::NymNetworkDetails;
|
||||
use time::Date;
|
||||
use url::Url;
|
||||
@@ -34,6 +33,7 @@ pub use crate::nym_api::NymApiClientExt;
|
||||
pub use nym_mixnet_contract_common::{
|
||||
mixnode::MixNodeDetails, GatewayBond, IdentityKey, IdentityKeyRef, NodeId,
|
||||
};
|
||||
|
||||
// re-export the type to not break existing imports
|
||||
pub use crate::coconut::EcashApiClient;
|
||||
|
||||
@@ -106,9 +106,7 @@ impl Config {
|
||||
|
||||
pub struct Client<C, S = NoSigner> {
|
||||
// ideally they would have been read-only, but unfortunately rust doesn't have such features
|
||||
// #[deprecated(note = "please use `nym_api_client` instead")]
|
||||
pub nym_api: nym_api::Client,
|
||||
// pub nym_api_client: NymApiClient,
|
||||
pub nyxd: NyxdClient<C, S>,
|
||||
}
|
||||
|
||||
@@ -192,8 +190,6 @@ impl<C, S> Client<C, S> {
|
||||
}
|
||||
|
||||
// validator-api wrappers
|
||||
// we have to allow the use of deprecated method here as they're calling the deprecated trait methods
|
||||
#[allow(deprecated)]
|
||||
impl<C, S> Client<C, S> {
|
||||
pub fn api_url(&self) -> &Url {
|
||||
self.nym_api.current_url()
|
||||
@@ -203,102 +199,50 @@ impl<C, S> Client<C, S> {
|
||||
self.nym_api.change_base_url(new_endpoint)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_mixnodes(&self) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_mixnodes().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_mixnodes_detailed(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_mixnodes_detailed().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_mixnodes_detailed_unfiltered(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_mixnodes_detailed_unfiltered().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_rewarded_mixnodes(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_rewarded_mixnodes().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_rewarded_mixnodes_detailed(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_rewarded_mixnodes_detailed().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_active_mixnodes(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_active_mixnodes().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_active_mixnodes_detailed(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeBondAnnotated>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_active_mixnodes_detailed().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_gateways().await?)
|
||||
}
|
||||
|
||||
// TODO: combine with NymApiClient...
|
||||
pub async fn get_all_cached_described_nodes(
|
||||
&self,
|
||||
) -> Result<Vec<NymNodeDescription>, ValidatorClientError> {
|
||||
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
|
||||
let mut page = 0;
|
||||
let mut descriptions = Vec::new();
|
||||
|
||||
loop {
|
||||
let mut res = self.nym_api.get_nodes_described(Some(page), None).await?;
|
||||
|
||||
descriptions.append(&mut res.data);
|
||||
if descriptions.len() < res.pagination.total {
|
||||
page += 1
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(descriptions)
|
||||
}
|
||||
|
||||
// TODO: combine with NymApiClient...
|
||||
pub async fn get_all_cached_bonded_nym_nodes(
|
||||
&self,
|
||||
) -> Result<Vec<NymNodeDetails>, ValidatorClientError> {
|
||||
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
|
||||
let mut page = 0;
|
||||
let mut bonds = Vec::new();
|
||||
|
||||
loop {
|
||||
let mut res = self.nym_api.get_nym_nodes(Some(page), None).await?;
|
||||
|
||||
bonds.append(&mut res.data);
|
||||
if bonds.len() < res.pagination.total {
|
||||
page += 1
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(bonds)
|
||||
}
|
||||
|
||||
pub async fn blind_sign(
|
||||
&self,
|
||||
request_body: &BlindSignRequestBody,
|
||||
@@ -314,8 +258,6 @@ pub struct NymApiClient {
|
||||
// we could re-implement the communication with the REST API on port 1317
|
||||
}
|
||||
|
||||
// we have to allow the use of deprecated method here as they're calling the deprecated trait methods
|
||||
#[allow(deprecated)]
|
||||
impl NymApiClient {
|
||||
pub fn new(api_url: Url) -> Self {
|
||||
let nym_api = nym_api::Client::new(api_url, None);
|
||||
@@ -330,10 +272,10 @@ impl NymApiClient {
|
||||
NymApiClient { nym_api }
|
||||
}
|
||||
|
||||
pub fn new_with_user_agent(api_url: Url, user_agent: impl Into<UserAgent>) -> Self {
|
||||
pub fn new_with_user_agent(api_url: Url, user_agent: UserAgent) -> Self {
|
||||
let nym_api = nym_api::Client::builder::<_, ValidatorClientError>(api_url)
|
||||
.expect("invalid api url")
|
||||
.with_user_agent(user_agent.into())
|
||||
.with_user_agent(user_agent)
|
||||
.build::<ValidatorClientError>()
|
||||
.expect("failed to build nym api client");
|
||||
|
||||
@@ -348,7 +290,7 @@ impl NymApiClient {
|
||||
self.nym_api.change_base_url(new_endpoint);
|
||||
}
|
||||
|
||||
#[deprecated(note = "use get_all_basic_active_mixing_assigned_nodes instead")]
|
||||
#[deprecated(note = "use get_basic_active_mixing_assigned_nodes instead")]
|
||||
pub async fn get_basic_mixnodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
@@ -385,7 +327,7 @@ impl NymApiClient {
|
||||
loop {
|
||||
let mut res = self
|
||||
.nym_api
|
||||
.get_basic_entry_assigned_nodes(
|
||||
.get_all_basic_entry_assigned_nodes(
|
||||
semver_compatibility.clone(),
|
||||
false,
|
||||
Some(page),
|
||||
@@ -406,7 +348,7 @@ impl NymApiClient {
|
||||
|
||||
/// retrieve basic information for nodes that got assigned 'mixing' node in this epoch
|
||||
/// this includes legacy mixnodes and nym-nodes
|
||||
pub async fn get_all_basic_active_mixing_assigned_nodes(
|
||||
pub async fn get_basic_active_mixing_assigned_nodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
|
||||
@@ -436,142 +378,32 @@ impl NymApiClient {
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
/// retrieve basic information for nodes are capable of operating as a mixnode
|
||||
/// this includes legacy mixnodes and nym-nodes
|
||||
pub async fn get_all_basic_mixing_capable_nodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
|
||||
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
|
||||
let mut page = 0;
|
||||
let mut nodes = Vec::new();
|
||||
|
||||
loop {
|
||||
let mut res = self
|
||||
.nym_api
|
||||
.get_basic_mixing_capable_nodes(
|
||||
semver_compatibility.clone(),
|
||||
false,
|
||||
Some(page),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
nodes.append(&mut res.nodes.data);
|
||||
if nodes.len() < res.nodes.pagination.total {
|
||||
page += 1
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
/// retrieve basic information for all bonded nodes on the network
|
||||
pub async fn get_all_basic_nodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
) -> Result<Vec<SkimmedNode>, ValidatorClientError> {
|
||||
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
|
||||
let mut page = 0;
|
||||
let mut nodes = Vec::new();
|
||||
|
||||
loop {
|
||||
let mut res = self
|
||||
.nym_api
|
||||
.get_basic_nodes(semver_compatibility.clone(), false, Some(page), None)
|
||||
.await?;
|
||||
|
||||
nodes.append(&mut res.nodes.data);
|
||||
if nodes.len() < res.nodes.pagination.total {
|
||||
page += 1
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
pub async fn health(&self) -> Result<ApiHealthResponse, ValidatorClientError> {
|
||||
Ok(self.nym_api.health().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_active_mixnodes(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_active_mixnodes().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_rewarded_mixnodes(
|
||||
&self,
|
||||
) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_rewarded_mixnodes().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_mixnodes(&self) -> Result<Vec<MixNodeDetails>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_mixnodes().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_gateways(&self) -> Result<Vec<GatewayBond>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_gateways().await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_cached_described_gateways(
|
||||
&self,
|
||||
) -> Result<Vec<LegacyDescribedGateway>, ValidatorClientError> {
|
||||
Ok(self.nym_api.get_gateways_described().await?)
|
||||
}
|
||||
|
||||
pub async fn get_all_described_nodes(
|
||||
&self,
|
||||
) -> Result<Vec<NymNodeDescription>, ValidatorClientError> {
|
||||
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
|
||||
let mut page = 0;
|
||||
let mut descriptions = Vec::new();
|
||||
|
||||
loop {
|
||||
let mut res = self.nym_api.get_nodes_described(Some(page), None).await?;
|
||||
|
||||
descriptions.append(&mut res.data);
|
||||
if descriptions.len() < res.pagination.total {
|
||||
page += 1
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(descriptions)
|
||||
}
|
||||
|
||||
pub async fn get_all_bonded_nym_nodes(
|
||||
&self,
|
||||
) -> Result<Vec<NymNodeDetails>, ValidatorClientError> {
|
||||
// TODO: deal with paging in macro or some helper function or something, because it's the same pattern everywhere
|
||||
let mut page = 0;
|
||||
let mut bonds = Vec::new();
|
||||
|
||||
loop {
|
||||
let mut res = self.nym_api.get_nym_nodes(Some(page), None).await?;
|
||||
|
||||
bonds.append(&mut res.data);
|
||||
if bonds.len() < res.pagination.total {
|
||||
page += 1
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(bonds)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_gateway_core_status_count(
|
||||
&self,
|
||||
identity: IdentityKeyRef<'_>,
|
||||
@@ -583,7 +415,6 @@ impl NymApiClient {
|
||||
.await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_mixnode_core_status_count(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -595,7 +426,6 @@ impl NymApiClient {
|
||||
.await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_mixnode_status(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -603,7 +433,6 @@ impl NymApiClient {
|
||||
Ok(self.nym_api.get_mixnode_status(mix_id).await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_mixnode_reward_estimation(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -611,7 +440,6 @@ impl NymApiClient {
|
||||
Ok(self.nym_api.get_mixnode_reward_estimation(mix_id).await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn get_mixnode_stake_saturation(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
@@ -643,7 +471,6 @@ impl NymApiClient {
|
||||
.await?)
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
pub async fn spent_credentials_filter(
|
||||
&self,
|
||||
) -> Result<SpentCredentialsResponse, ValidatorClientError> {
|
||||
|
||||
@@ -164,7 +164,7 @@ async fn test_nym_api_connection(
|
||||
) -> ConnectionResult {
|
||||
let result = match timeout(
|
||||
Duration::from_secs(CONNECTION_TEST_TIMEOUT_SEC),
|
||||
client.health(),
|
||||
client.get_cached_mixnodes(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -11,11 +11,9 @@ use nym_api_requests::ecash::models::{
|
||||
};
|
||||
use nym_api_requests::ecash::VerificationKeyResponse;
|
||||
use nym_api_requests::models::{
|
||||
AnnotationResponse, ApiHealthResponse, LegacyDescribedMixNode, NodePerformanceResponse,
|
||||
NodeRefreshBody, NymNodeDescription,
|
||||
AnnotationResponse, LegacyDescribedMixNode, NodePerformanceResponse,
|
||||
};
|
||||
use nym_api_requests::nym_nodes::PaginatedCachedNodesResponse;
|
||||
use nym_api_requests::pagination::PaginatedResponse;
|
||||
pub use nym_api_requests::{
|
||||
ecash::{
|
||||
models::{
|
||||
@@ -40,7 +38,7 @@ use nym_contracts_common::IdentityKey;
|
||||
pub use nym_http_api_client::Client;
|
||||
use nym_http_api_client::{ApiClient, NO_PARAMS};
|
||||
use nym_mixnet_contract_common::mixnode::MixNodeDetails;
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, NodeId, NymNodeDetails};
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKeyRef, NodeId};
|
||||
use time::format_description::BorrowedFormatItem;
|
||||
use time::Date;
|
||||
use tracing::instrument;
|
||||
@@ -55,26 +53,12 @@ pub fn rfc_3339_date() -> Vec<BorrowedFormatItem<'static>> {
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
||||
pub trait NymApiClientExt: ApiClient {
|
||||
async fn health(&self) -> Result<ApiHealthResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
routes::API_STATUS_ROUTES,
|
||||
routes::HEALTH,
|
||||
],
|
||||
NO_PARAMS,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.get_json(&[routes::API_VERSION, routes::MIXNODES], NO_PARAMS)
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -89,7 +73,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways_detailed(&self) -> Result<Vec<GatewayBondAnnotated>, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -104,7 +87,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_detailed_unfiltered(
|
||||
&self,
|
||||
@@ -121,14 +103,12 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways(&self) -> Result<Vec<GatewayBond>, NymAPIError> {
|
||||
self.get_json(&[routes::API_VERSION, routes::GATEWAYS], NO_PARAMS)
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways_described(&self) -> Result<Vec<LegacyDescribedGateway>, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -138,7 +118,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_described(&self) -> Result<Vec<LegacyDescribedMixNode>, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -148,47 +127,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn get_nodes_described(
|
||||
&self,
|
||||
page: Option<u32>,
|
||||
per_page: Option<u32>,
|
||||
) -> Result<PaginatedResponse<NymNodeDescription>, NymAPIError> {
|
||||
let mut params = Vec::new();
|
||||
|
||||
if let Some(page) = page {
|
||||
params.push(("page", page.to_string()))
|
||||
}
|
||||
|
||||
if let Some(per_page) = per_page {
|
||||
params.push(("per_page", per_page.to_string()))
|
||||
}
|
||||
|
||||
self.get_json(&[routes::API_VERSION, "nym-nodes", "described"], ¶ms)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn get_nym_nodes(
|
||||
&self,
|
||||
page: Option<u32>,
|
||||
per_page: Option<u32>,
|
||||
) -> Result<PaginatedResponse<NymNodeDetails>, NymAPIError> {
|
||||
let mut params = Vec::new();
|
||||
|
||||
if let Some(page) = page {
|
||||
params.push(("page", page.to_string()))
|
||||
}
|
||||
|
||||
if let Some(per_page) = per_page {
|
||||
params.push(("per_page", per_page.to_string()))
|
||||
}
|
||||
|
||||
self.get_json(&[routes::API_VERSION, "nym-nodes", "bonded"], ¶ms)
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[tracing::instrument(level = "debug", skip_all)]
|
||||
async fn get_basic_mixnodes(
|
||||
&self,
|
||||
@@ -213,7 +151,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_basic_gateways(
|
||||
&self,
|
||||
@@ -241,7 +178,7 @@ pub trait NymApiClientExt: ApiClient {
|
||||
/// retrieve basic information for nodes are capable of operating as an entry gateway
|
||||
/// this includes legacy gateways and nym-nodes
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_basic_entry_assigned_nodes(
|
||||
async fn get_all_basic_entry_assigned_nodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
no_legacy: bool,
|
||||
@@ -322,82 +259,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// retrieve basic information for nodes that got assigned 'mixing' node in this epoch
|
||||
/// this includes legacy mixnodes and nym-nodes
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_basic_mixing_capable_nodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
no_legacy: bool,
|
||||
page: Option<u32>,
|
||||
per_page: Option<u32>,
|
||||
) -> Result<PaginatedCachedNodesResponse<SkimmedNode>, NymAPIError> {
|
||||
let mut params = Vec::new();
|
||||
|
||||
if let Some(arg) = &semver_compatibility {
|
||||
params.push(("semver_compatibility", arg.clone()))
|
||||
}
|
||||
|
||||
if no_legacy {
|
||||
params.push(("no_legacy", "true".to_string()))
|
||||
}
|
||||
|
||||
if let Some(page) = page {
|
||||
params.push(("page", page.to_string()))
|
||||
}
|
||||
|
||||
if let Some(per_page) = per_page {
|
||||
params.push(("per_page", per_page.to_string()))
|
||||
}
|
||||
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"unstable",
|
||||
"nym-nodes",
|
||||
"skimmed",
|
||||
"mixnodes",
|
||||
"all",
|
||||
],
|
||||
¶ms,
|
||||
)
|
||||
.await
|
||||
}
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
|
||||
async fn get_basic_nodes(
|
||||
&self,
|
||||
semver_compatibility: Option<String>,
|
||||
no_legacy: bool,
|
||||
page: Option<u32>,
|
||||
per_page: Option<u32>,
|
||||
) -> Result<PaginatedCachedNodesResponse<SkimmedNode>, NymAPIError> {
|
||||
let mut params = Vec::new();
|
||||
|
||||
if let Some(arg) = &semver_compatibility {
|
||||
params.push(("semver_compatibility", arg.clone()))
|
||||
}
|
||||
|
||||
if no_legacy {
|
||||
params.push(("no_legacy", "true".to_string()))
|
||||
}
|
||||
|
||||
if let Some(page) = page {
|
||||
params.push(("page", page.to_string()))
|
||||
}
|
||||
|
||||
if let Some(per_page) = per_page {
|
||||
params.push(("per_page", per_page.to_string()))
|
||||
}
|
||||
|
||||
self.get_json(
|
||||
&[routes::API_VERSION, "unstable", "nym-nodes", "skimmed"],
|
||||
¶ms,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_active_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -407,7 +268,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_active_mixnodes_detailed(&self) -> Result<Vec<MixNodeBondAnnotated>, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -423,7 +283,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_rewarded_mixnodes(&self) -> Result<Vec<MixNodeDetails>, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -433,7 +292,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_report(
|
||||
&self,
|
||||
@@ -452,7 +310,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateway_report(
|
||||
&self,
|
||||
@@ -471,7 +328,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_history(
|
||||
&self,
|
||||
@@ -490,7 +346,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateway_history(
|
||||
&self,
|
||||
@@ -509,7 +364,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_rewarded_mixnodes_detailed(
|
||||
&self,
|
||||
@@ -527,7 +381,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateway_core_status_count(
|
||||
&self,
|
||||
@@ -560,7 +413,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_core_status_count(
|
||||
&self,
|
||||
@@ -594,7 +446,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_status(
|
||||
&self,
|
||||
@@ -613,7 +464,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_reward_estimation(
|
||||
&self,
|
||||
@@ -632,7 +482,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn compute_mixnode_reward_estimation(
|
||||
&self,
|
||||
@@ -653,7 +502,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_stake_saturation(
|
||||
&self,
|
||||
@@ -672,7 +520,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnode_inclusion_probability(
|
||||
&self,
|
||||
@@ -696,35 +543,18 @@ pub trait NymApiClientExt: ApiClient {
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> Result<NodePerformanceResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"nym-nodes",
|
||||
"performance",
|
||||
&node_id.to_string(),
|
||||
],
|
||||
NO_PARAMS,
|
||||
)
|
||||
.await
|
||||
self.get_json_from(format!("/v1/nym-nodes/performance/{node_id}"))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_node_annotation(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> Result<AnnotationResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
routes::API_VERSION,
|
||||
"nym-nodes",
|
||||
"annotation",
|
||||
&node_id.to_string(),
|
||||
],
|
||||
NO_PARAMS,
|
||||
)
|
||||
.await
|
||||
self.get_json_from(format!("/v1/nym-nodes/annotation/{node_id}"))
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
async fn get_mixnode_avg_uptime(&self, mix_id: NodeId) -> Result<UptimeResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
&[
|
||||
@@ -739,7 +569,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_mixnodes_blacklisted(&self) -> Result<Vec<NodeId>, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -749,7 +578,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn get_gateways_blacklisted(&self) -> Result<Vec<IdentityKey>, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -810,7 +638,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
#[deprecated]
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn double_spending_filter_v1(&self) -> Result<SpentCredentialsResponse, NymAPIError> {
|
||||
self.get_json(
|
||||
@@ -934,18 +761,6 @@ pub trait NymApiClientExt: ApiClient {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn force_refresh_describe_cache(
|
||||
&self,
|
||||
request: &NodeRefreshBody,
|
||||
) -> Result<(), NymAPIError> {
|
||||
self.post_json(
|
||||
&[routes::API_VERSION, "nym-nodes", "refresh-described"],
|
||||
NO_PARAMS,
|
||||
request,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
async fn epoch_credentials(
|
||||
&self,
|
||||
|
||||
@@ -36,8 +36,6 @@ pub mod ecash {
|
||||
}
|
||||
|
||||
pub const STATUS_ROUTES: &str = "status";
|
||||
pub const API_STATUS_ROUTES: &str = "api-status";
|
||||
pub const HEALTH: &str = "health";
|
||||
pub const MIXNODE: &str = "mixnode";
|
||||
pub const GATEWAY: &str = "gateway";
|
||||
pub const NYM_NODES: &str = "nym-nodes";
|
||||
|
||||
@@ -10,10 +10,10 @@ use cosmrs::AccountId;
|
||||
use nym_contracts_common::signing::Nonce;
|
||||
use nym_mixnet_contract_common::gateway::{PreassignedGatewayIdsResponse, PreassignedId};
|
||||
use nym_mixnet_contract_common::nym_node::{
|
||||
EpochAssignmentResponse, NodeDetailsByIdentityResponse, NodeDetailsResponse,
|
||||
NodeOwnershipResponse, NodeRewardingDetailsResponse, PagedNymNodeBondsResponse,
|
||||
PagedNymNodeDetailsResponse, PagedUnbondedNymNodesResponse, Role, RolesMetadataResponse,
|
||||
StakeSaturationResponse, UnbondedNodeResponse, UnbondedNymNode,
|
||||
EpochAssignmentResponse, NodeDetailsByIdentityResponse, NodeOwnershipResponse,
|
||||
NodeRewardingDetailsResponse, PagedNymNodeBondsResponse, PagedNymNodeDetailsResponse,
|
||||
PagedUnbondedNymNodesResponse, Role, RolesMetadataResponse, StakeSaturationResponse,
|
||||
UnbondedNodeResponse, UnbondedNymNode,
|
||||
};
|
||||
use nym_mixnet_contract_common::reward_params::WorkFactor;
|
||||
use nym_mixnet_contract_common::{
|
||||
@@ -316,7 +316,10 @@ pub trait MixnetQueryClient {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_nymnode_details(&self, node_id: NodeId) -> Result<NodeDetailsResponse, NyxdError> {
|
||||
async fn get_nymnode_details(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> Result<NodeOwnershipResponse, NyxdError> {
|
||||
self.query_mixnet_contract(MixnetQueryMsg::GetNymNodeDetails { node_id })
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -2,9 +2,10 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::context::QueryClientWithNyxd;
|
||||
use crate::utils::show_error;
|
||||
use crate::utils::{pretty_cosmwasm_coin, show_error};
|
||||
use clap::Parser;
|
||||
use comfy_table::Table;
|
||||
use nym_validator_client::client::NymApiClientExt;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Args {
|
||||
@@ -14,11 +15,12 @@ pub struct Args {
|
||||
}
|
||||
|
||||
pub async fn query(args: Args, client: &QueryClientWithNyxd) {
|
||||
match client.get_all_cached_described_nodes().await {
|
||||
match client.nym_api.get_gateways().await {
|
||||
Ok(res) => match args.identity_key {
|
||||
Some(identity_key) => {
|
||||
let node = res.iter().find(|node| {
|
||||
node.ed25519_identity_key()
|
||||
node.gateway
|
||||
.identity_key
|
||||
.to_string()
|
||||
.eq_ignore_ascii_case(&identity_key)
|
||||
});
|
||||
@@ -30,16 +32,14 @@ pub async fn query(args: Args, client: &QueryClientWithNyxd) {
|
||||
None => {
|
||||
let mut table = Table::new();
|
||||
|
||||
table.set_header(vec!["Node Id", "Identity Key", "Version", "Is Legacy"]);
|
||||
for node in res
|
||||
.into_iter()
|
||||
.filter(|node| node.description.declared_role.entry)
|
||||
{
|
||||
table.set_header(vec!["Identity Key", "Owner", "Host", "Bond", "Version"]);
|
||||
for node in res {
|
||||
table.add_row(vec![
|
||||
node.node_id.to_string(),
|
||||
node.ed25519_identity_key().to_base58_string(),
|
||||
node.description.build_information.build_version,
|
||||
(!node.contract_node_type.is_nym_node()).to_string(),
|
||||
node.gateway.identity_key.to_string(),
|
||||
node.owner.to_string(),
|
||||
node.gateway.host.to_string(),
|
||||
pretty_cosmwasm_coin(&node.pledge_amount),
|
||||
node.gateway.version.clone(),
|
||||
]);
|
||||
}
|
||||
|
||||
|
||||
@@ -2,9 +2,10 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::context::QueryClientWithNyxd;
|
||||
use crate::utils::show_error;
|
||||
use crate::utils::{pretty_decimal_with_denom, show_error};
|
||||
use clap::Parser;
|
||||
use comfy_table::Table;
|
||||
use nym_validator_client::client::NymApiClientExt;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Args {
|
||||
@@ -14,11 +15,13 @@ pub struct Args {
|
||||
}
|
||||
|
||||
pub async fn query(args: Args, client: &QueryClientWithNyxd) {
|
||||
match client.get_all_cached_described_nodes().await {
|
||||
match client.nym_api.get_mixnodes().await {
|
||||
Ok(res) => match args.identity_key {
|
||||
Some(identity_key) => {
|
||||
let node = res.iter().find(|node| {
|
||||
node.ed25519_identity_key()
|
||||
node.bond_information
|
||||
.mix_node
|
||||
.identity_key
|
||||
.to_string()
|
||||
.eq_ignore_ascii_case(&identity_key)
|
||||
});
|
||||
@@ -30,16 +33,25 @@ pub async fn query(args: Args, client: &QueryClientWithNyxd) {
|
||||
None => {
|
||||
let mut table = Table::new();
|
||||
|
||||
table.set_header(vec!["Node Id", "Identity Key", "Version", "Is Legacy"]);
|
||||
for node in res
|
||||
.into_iter()
|
||||
.filter(|node| node.description.declared_role.mixnode)
|
||||
{
|
||||
table.set_header(vec![
|
||||
"Mix id",
|
||||
"Identity Key",
|
||||
"Owner",
|
||||
"Host",
|
||||
"Bond",
|
||||
"Total Delegations",
|
||||
"Version",
|
||||
]);
|
||||
for node in res {
|
||||
let denom = &node.bond_information.original_pledge().denom;
|
||||
table.add_row(vec![
|
||||
node.node_id.to_string(),
|
||||
node.ed25519_identity_key().to_base58_string(),
|
||||
node.description.build_information.build_version,
|
||||
(!node.contract_node_type.is_nym_node()).to_string(),
|
||||
node.mix_id().to_string(),
|
||||
node.bond_information.mix_node.identity_key.clone(),
|
||||
node.bond_information.owner.clone().into_string(),
|
||||
node.bond_information.mix_node.host.clone(),
|
||||
pretty_decimal_with_denom(node.rewarding_details.operator, denom),
|
||||
pretty_decimal_with_denom(node.rewarding_details.delegates, denom),
|
||||
node.bond_information.mix_node.version,
|
||||
]);
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ use crate::{
|
||||
use cosmwasm_schema::cw_serde;
|
||||
use cosmwasm_std::{Addr, Coin, Decimal, StdResult, Uint128};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||
|
||||
/// Full details associated with given mixnode.
|
||||
@@ -648,39 +647,14 @@ impl From<LegacyMixLayer> for u8 {
|
||||
export_to = "ts-packages/types/src/types/rust/PendingMixnodeChanges.ts"
|
||||
)
|
||||
)]
|
||||
// note: we had to remove `#[cw_serde]` as it enforces `#[serde(deny_unknown_fields)]` which we do not want
|
||||
// with the addition of .cost_params_change field
|
||||
#[derive(
|
||||
::cosmwasm_schema::serde::Serialize,
|
||||
::cosmwasm_schema::serde::Deserialize,
|
||||
::std::clone::Clone,
|
||||
::std::fmt::Debug,
|
||||
::std::cmp::PartialEq,
|
||||
::cosmwasm_schema::schemars::JsonSchema,
|
||||
Default,
|
||||
Copy,
|
||||
)]
|
||||
#[schemars(crate = "::cosmwasm_schema::schemars")]
|
||||
#[cw_serde]
|
||||
#[derive(Default, Copy)]
|
||||
pub struct PendingMixNodeChanges {
|
||||
pub pledge_change: Option<EpochEventId>,
|
||||
|
||||
#[serde(default)]
|
||||
pub cost_params_change: Option<IntervalEventId>,
|
||||
}
|
||||
|
||||
#[derive(Default, Copy, Clone, Debug, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct LegacyPendingMixNodeChanges {
|
||||
pub pledge_change: Option<EpochEventId>,
|
||||
}
|
||||
|
||||
impl From<PendingMixNodeChanges> for LegacyPendingMixNodeChanges {
|
||||
fn from(value: PendingMixNodeChanges) -> Self {
|
||||
LegacyPendingMixNodeChanges {
|
||||
pledge_change: value.pledge_change,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PendingMixNodeChanges {
|
||||
pub fn new_empty() -> PendingMixNodeChanges {
|
||||
PendingMixNodeChanges {
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::sync::Arc;
|
||||
use time::{Date, OffsetDateTime};
|
||||
use tracing::*;
|
||||
|
||||
use nym_credentials::ecash::utils::{cred_exp_date, ecash_today, EcashTime};
|
||||
use nym_credentials::ecash::utils::{ecash_today, EcashTime};
|
||||
use nym_credentials_interface::{Bandwidth, ClientTicket, TicketType};
|
||||
use nym_gateway_requests::models::CredentialSpendingRequest;
|
||||
use nym_gateway_storage::Storage;
|
||||
@@ -131,7 +131,7 @@ impl<S: Storage + Clone + 'static> CredentialVerifier<S> {
|
||||
let bandwidth = Bandwidth::ticket_amount(credential_type.into());
|
||||
|
||||
self.bandwidth_storage_manager
|
||||
.increase_bandwidth(bandwidth, cred_exp_date())
|
||||
.increase_bandwidth(bandwidth, spend_date)
|
||||
.await?;
|
||||
|
||||
Ok(self
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
|
||||
use std::fmt::{self, Debug, Display, Formatter};
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
use thiserror::Error;
|
||||
use zeroize::{Zeroize, ZeroizeOnDrop};
|
||||
@@ -112,18 +112,12 @@ impl PemStorableKeyPair for KeyPair {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Hash, Copy, Clone)]
|
||||
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug)]
|
||||
pub struct PublicKey(x25519_dalek::PublicKey);
|
||||
|
||||
impl Display for PublicKey {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
Display::fmt(&self.to_base58_string(), f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for PublicKey {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
Debug::fmt(&self.to_base58_string(), f)
|
||||
write!(f, "{}", self.to_base58_string())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,16 +31,8 @@ pub mod option_bs58_x25519_pubkey {
|
||||
pub fn deserialize<'de, D: Deserializer<'de>>(
|
||||
deserializer: D,
|
||||
) -> Result<Option<PublicKey>, D::Error> {
|
||||
match Option::<String>::deserialize(deserializer)? {
|
||||
None => Ok(None),
|
||||
Some(s) => {
|
||||
if s.is_empty() {
|
||||
Ok(None)
|
||||
} else {
|
||||
Some(PublicKey::from_base58_string(&s).map_err(serde::de::Error::custom))
|
||||
.transpose()
|
||||
}
|
||||
}
|
||||
}
|
||||
let s = Option::<String>::deserialize(deserializer)?;
|
||||
s.map(|s| PublicKey::from_base58_string(&s).map_err(serde::de::Error::custom))
|
||||
.transpose()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ pub use ed25519_dalek::SignatureError;
|
||||
use ed25519_dalek::{Signer, SigningKey};
|
||||
pub use ed25519_dalek::{Verifier, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH, SIGNATURE_LENGTH};
|
||||
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
|
||||
use std::fmt::{self, Debug, Display, Formatter};
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
use thiserror::Error;
|
||||
use zeroize::{Zeroize, ZeroizeOnDrop};
|
||||
@@ -119,18 +119,12 @@ impl PemStorableKeyPair for KeyPair {
|
||||
}
|
||||
|
||||
/// ed25519 EdDSA Public Key
|
||||
#[derive(Copy, Clone, Eq, PartialEq)]
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
pub struct PublicKey(ed25519_dalek::VerifyingKey);
|
||||
|
||||
impl Display for PublicKey {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
Display::fmt(&self.to_base58_string(), f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for PublicKey {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
Debug::fmt(&self.to_base58_string(), f)
|
||||
write!(f, "{}", self.to_base58_string())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
[package]
|
||||
name = "nym-gateway-stats-storage"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
sqlx = { workspace = true, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
"macros",
|
||||
"migrate",
|
||||
"time",
|
||||
] }
|
||||
time = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
nym-sphinx = { path = "../nymsphinx" }
|
||||
nym-credentials-interface = { path = "../credentials-interface" }
|
||||
|
||||
|
||||
[build-dependencies]
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
sqlx = { workspace = true, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"sqlite",
|
||||
"macros",
|
||||
"migrate",
|
||||
] }
|
||||
@@ -0,0 +1,28 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use sqlx::{Connection, SqliteConnection};
|
||||
use std::env;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let out_dir = env::var("OUT_DIR").unwrap();
|
||||
let database_path = format!("{}/gateway-stats-example.sqlite", out_dir);
|
||||
|
||||
let mut conn = SqliteConnection::connect(&format!("sqlite://{}?mode=rwc", database_path))
|
||||
.await
|
||||
.expect("Failed to create SQLx database connection");
|
||||
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&mut conn)
|
||||
.await
|
||||
.expect("Failed to perform SQLx migrations");
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);
|
||||
|
||||
#[cfg(target_family = "windows")]
|
||||
// for some strange reason we need to add a leading `/` to the windows path even though it's
|
||||
// not a valid windows path... but hey, it works...
|
||||
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
* SPDX-License-Identifier: GPL-3.0-only
|
||||
*/
|
||||
|
||||
CREATE TABLE sessions_active
|
||||
(
|
||||
client_address TEXT NOT NULL PRIMARY KEY UNIQUE,
|
||||
start_time TIMESTAMP WITHOUT TIME ZONE NOT NULL,
|
||||
typ TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE sessions_finished
|
||||
(
|
||||
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
day DATE NOT NULL,
|
||||
duration_ms INTEGER NOT NULL,
|
||||
typ TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE sessions_unique_users
|
||||
(
|
||||
day DATE NOT NULL,
|
||||
client_address TEXT NOT NULL,
|
||||
PRIMARY KEY (day, client_address)
|
||||
);
|
||||
@@ -0,0 +1,13 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum StatsStorageError {
|
||||
#[error("Database experienced an internal error: {0}")]
|
||||
InternalDatabaseError(#[from] sqlx::Error),
|
||||
|
||||
#[error("Failed to perform database migration: {0}")]
|
||||
MigrationError(#[from] sqlx::migrate::MigrateError),
|
||||
}
|
||||
@@ -0,0 +1,195 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use error::StatsStorageError;
|
||||
use models::{ActiveSession, FinishedSession, SessionType, StoredFinishedSession};
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use sessions::SessionManager;
|
||||
use sqlx::ConnectOptions;
|
||||
use std::path::Path;
|
||||
use time::Date;
|
||||
use tracing::{debug, error};
|
||||
|
||||
pub mod error;
|
||||
pub mod models;
|
||||
mod sessions;
|
||||
|
||||
// note that clone here is fine as upon cloning the same underlying pool will be used
|
||||
#[derive(Clone)]
|
||||
pub struct PersistentStatsStorage {
|
||||
session_manager: SessionManager,
|
||||
}
|
||||
|
||||
impl PersistentStatsStorage {
|
||||
/// Initialises `PersistentStatsStorage` using the provided path.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `database_path`: path to the database.
|
||||
pub async fn init<P: AsRef<Path> + Send>(database_path: P) -> Result<Self, StatsStorageError> {
|
||||
debug!(
|
||||
"Attempting to connect to database {:?}",
|
||||
database_path.as_ref().as_os_str()
|
||||
);
|
||||
|
||||
// TODO: we can inject here more stuff based on our gateway global config
|
||||
// struct. Maybe different pool size or timeout intervals?
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.filename(database_path)
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
|
||||
// TODO: do we want auto_vacuum ?
|
||||
|
||||
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
|
||||
Ok(db) => db,
|
||||
Err(err) => {
|
||||
error!("Failed to connect to SQLx database: {err}");
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
|
||||
error!("Failed to perform migration on the SQLx database: {err}");
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
// the cloning here are cheap as connection pool is stored behind an Arc
|
||||
Ok(PersistentStatsStorage {
|
||||
session_manager: sessions::SessionManager::new(connection_pool),
|
||||
})
|
||||
}
|
||||
|
||||
//Sessions fn
|
||||
pub async fn insert_finished_session(
|
||||
&self,
|
||||
date: Date,
|
||||
session: FinishedSession,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.insert_finished_session(
|
||||
date,
|
||||
session.duration.whole_milliseconds() as i64,
|
||||
session.typ.to_string().into(),
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_finished_sessions(
|
||||
&self,
|
||||
date: Date,
|
||||
) -> Result<Vec<StoredFinishedSession>, StatsStorageError> {
|
||||
Ok(self.session_manager.get_finished_sessions(date).await?)
|
||||
}
|
||||
|
||||
pub async fn delete_finished_sessions(
|
||||
&self,
|
||||
before_date: Date,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.delete_finished_sessions(before_date)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn insert_unique_user(
|
||||
&self,
|
||||
date: Date,
|
||||
client_address_bs58: String,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.insert_unique_user(date, client_address_bs58)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_unique_users_count(&self, date: Date) -> Result<i32, StatsStorageError> {
|
||||
Ok(self.session_manager.get_unique_users_count(date).await?)
|
||||
}
|
||||
|
||||
pub async fn delete_unique_users(&self, before_date: Date) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.delete_unique_users(before_date)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn insert_active_session(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
session: ActiveSession,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.insert_active_session(
|
||||
client_address.as_base58_string(),
|
||||
session.start,
|
||||
session.typ.to_string().into(),
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn update_active_session_type(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
session_type: SessionType,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.update_active_session_type(
|
||||
client_address.as_base58_string(),
|
||||
session_type.to_string().into(),
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_active_session(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
) -> Result<Option<ActiveSession>, StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.get_active_session(client_address.as_base58_string())
|
||||
.await?
|
||||
.map(Into::into))
|
||||
}
|
||||
|
||||
pub async fn get_all_active_sessions(&self) -> Result<Vec<ActiveSession>, StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.get_all_active_sessions()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn get_started_sessions_count(
|
||||
&self,
|
||||
start_date: Date,
|
||||
) -> Result<i32, StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.get_started_sessions_count(start_date)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn get_active_users(&self) -> Result<Vec<String>, StatsStorageError> {
|
||||
Ok(self.session_manager.get_active_users().await?)
|
||||
}
|
||||
|
||||
pub async fn delete_active_session(
|
||||
&self,
|
||||
client_address: DestinationAddressBytes,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
Ok(self
|
||||
.session_manager
|
||||
.delete_active_session(client_address.as_base58_string())
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn cleanup_active_sessions(&self) -> Result<(), StatsStorageError> {
|
||||
Ok(self.session_manager.cleanup_active_sessions().await?)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_credentials_interface::TicketType;
|
||||
use sqlx::prelude::FromRow;
|
||||
use time::{Duration, OffsetDateTime};
|
||||
|
||||
#[derive(FromRow)]
|
||||
pub struct StoredFinishedSession {
|
||||
duration_ms: i64,
|
||||
typ: String,
|
||||
}
|
||||
|
||||
impl StoredFinishedSession {
|
||||
pub fn serialize(&self) -> (u64, String) {
|
||||
(
|
||||
self.duration_ms as u64, //we are sure that it fits in a u64, see `fn end_at`
|
||||
self.typ.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FinishedSession {
|
||||
pub duration: Duration,
|
||||
pub typ: SessionType,
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
pub enum SessionType {
|
||||
Vpn,
|
||||
Mixnet,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl SessionType {
|
||||
pub fn to_string(&self) -> &str {
|
||||
match self {
|
||||
Self::Vpn => "vpn",
|
||||
Self::Mixnet => "mixnet",
|
||||
Self::Unknown => "unknown",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_string(s: &str) -> Self {
|
||||
match s {
|
||||
"vpn" => Self::Vpn,
|
||||
"mixnet" => Self::Mixnet,
|
||||
_ => Self::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TicketType> for SessionType {
|
||||
fn from(value: TicketType) -> Self {
|
||||
match value {
|
||||
TicketType::V1MixnetEntry => Self::Mixnet,
|
||||
TicketType::V1MixnetExit => Self::Mixnet,
|
||||
TicketType::V1WireguardEntry => Self::Vpn,
|
||||
TicketType::V1WireguardExit => Self::Vpn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromRow)]
|
||||
pub(crate) struct StoredActiveSession {
|
||||
start_time: OffsetDateTime,
|
||||
typ: String,
|
||||
}
|
||||
|
||||
pub struct ActiveSession {
|
||||
pub start: OffsetDateTime,
|
||||
pub typ: SessionType,
|
||||
}
|
||||
|
||||
impl ActiveSession {
|
||||
pub fn new(start_time: OffsetDateTime) -> Self {
|
||||
ActiveSession {
|
||||
start: start_time,
|
||||
typ: SessionType::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_type(&mut self, ticket_type: TicketType) {
|
||||
self.typ = ticket_type.into();
|
||||
}
|
||||
|
||||
pub fn end_at(self, stop_time: OffsetDateTime) -> Option<FinishedSession> {
|
||||
let session_duration = stop_time - self.start;
|
||||
//ensure duration is positive to fit in a u64
|
||||
//u64::max milliseconds is 500k millenia so no overflow issue
|
||||
if session_duration > Duration::ZERO {
|
||||
Some(FinishedSession {
|
||||
duration: session_duration,
|
||||
typ: self.typ,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StoredActiveSession> for ActiveSession {
|
||||
fn from(value: StoredActiveSession) -> Self {
|
||||
ActiveSession {
|
||||
start: value.start_time,
|
||||
typ: SessionType::from_string(&value.typ),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,177 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use time::{Date, OffsetDateTime};
|
||||
|
||||
use crate::models::{StoredActiveSession, StoredFinishedSession};
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, sqlx::Error>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SessionManager {
|
||||
connection_pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
impl SessionManager {
|
||||
/// Creates new instance of the `SessionsManager` with the provided sqlite connection pool.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `connection_pool`: database connection pool to use.
|
||||
pub(crate) fn new(connection_pool: sqlx::SqlitePool) -> Self {
|
||||
SessionManager { connection_pool }
|
||||
}
|
||||
|
||||
pub(crate) async fn insert_finished_session(
|
||||
&self,
|
||||
date: Date,
|
||||
duration_ms: i64,
|
||||
typ: String,
|
||||
) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"INSERT INTO sessions_finished (day, duration_ms, typ) VALUES (?, ?, ?)",
|
||||
date,
|
||||
duration_ms,
|
||||
typ
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_finished_sessions(
|
||||
&self,
|
||||
date: Date,
|
||||
) -> Result<Vec<StoredFinishedSession>> {
|
||||
sqlx::query_as("SELECT duration_ms, typ FROM sessions_finished WHERE day = ?")
|
||||
.bind(date)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_finished_sessions(&self, before_date: Date) -> Result<()> {
|
||||
sqlx::query!("DELETE FROM sessions_finished WHERE day <= ? ", before_date)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn insert_unique_user(
|
||||
&self,
|
||||
date: Date,
|
||||
client_address_b58: String,
|
||||
) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"INSERT OR IGNORE INTO sessions_unique_users (day, client_address) VALUES (?, ?)",
|
||||
date,
|
||||
client_address_b58,
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_unique_users_count(&self, date: Date) -> Result<i32> {
|
||||
Ok(sqlx::query!(
|
||||
"SELECT COUNT(*) as count FROM sessions_unique_users WHERE day = ?",
|
||||
date
|
||||
)
|
||||
.fetch_one(&self.connection_pool)
|
||||
.await?
|
||||
.count)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_unique_users(&self, before_date: Date) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"DELETE FROM sessions_unique_users WHERE day <= ? ",
|
||||
before_date
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn insert_active_session(
|
||||
&self,
|
||||
client_address_b58: String,
|
||||
start_time: OffsetDateTime,
|
||||
typ: String,
|
||||
) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"INSERT INTO sessions_active (client_address, start_time, typ) VALUES (?, ?, ?)",
|
||||
client_address_b58,
|
||||
start_time,
|
||||
typ
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn update_active_session_type(
|
||||
&self,
|
||||
client_address_b58: String,
|
||||
typ: String,
|
||||
) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"UPDATE sessions_active SET typ = ? WHERE client_address = ?",
|
||||
typ,
|
||||
client_address_b58,
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_active_session(
|
||||
&self,
|
||||
client_address_b58: String,
|
||||
) -> Result<Option<StoredActiveSession>> {
|
||||
sqlx::query_as("SELECT start_time, typ FROM sessions_active WHERE client_address = ?")
|
||||
.bind(client_address_b58)
|
||||
.fetch_optional(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_active_sessions(&self) -> Result<Vec<StoredActiveSession>> {
|
||||
sqlx::query_as("SELECT start_time, typ FROM sessions_active")
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_started_sessions_count(&self, start_date: Date) -> Result<i32> {
|
||||
Ok(sqlx::query!(
|
||||
"SELECT COUNT(*) as count FROM sessions_active WHERE date(start_time) = ?",
|
||||
start_date
|
||||
)
|
||||
.fetch_one(&self.connection_pool)
|
||||
.await?
|
||||
.count)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_active_users(&self) -> Result<Vec<String>> {
|
||||
Ok(sqlx::query!("SELECT client_address from sessions_active")
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|record| record.client_address)
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_active_session(&self, client_address_b58: String) -> Result<()> {
|
||||
sqlx::query!(
|
||||
"DELETE FROM sessions_active WHERE client_address = ?",
|
||||
client_address_b58
|
||||
)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn cleanup_active_sessions(&self) -> Result<()> {
|
||||
sqlx::query!("DELETE FROM sessions_active")
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -35,9 +35,6 @@ pub enum HttpClientError<E: Display = String> {
|
||||
source: reqwest::Error,
|
||||
},
|
||||
|
||||
#[error("failed to deserialise received response: {source}")]
|
||||
ResponseDeserialisationFailure { source: serde_json::Error },
|
||||
|
||||
#[error("provided url is malformed: {source}")]
|
||||
MalformedUrl {
|
||||
#[from]
|
||||
@@ -315,7 +312,6 @@ impl Client {
|
||||
parse_response(res, true).await
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub async fn get_json_endpoint<T, S, E>(&self, endpoint: S) -> Result<T, HttpClientError<E>>
|
||||
where
|
||||
for<'a> T: Deserialize<'a>,
|
||||
|
||||
@@ -11,7 +11,6 @@ license.workspace = true
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
axum-client-ip.workspace = true
|
||||
axum.workspace = true
|
||||
bytes = { workspace = true }
|
||||
colored.workspace = true
|
||||
|
||||
@@ -1,18 +1,18 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use axum::extract::Request;
|
||||
use axum::extract::{ConnectInfo, Request};
|
||||
use axum::http::header::{HOST, USER_AGENT};
|
||||
use axum::http::HeaderValue;
|
||||
use axum::middleware::Next;
|
||||
use axum::response::IntoResponse;
|
||||
use axum_client_ip::InsecureClientIp;
|
||||
use colored::Colorize;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Instant;
|
||||
use tracing::info;
|
||||
|
||||
pub async fn logger(
|
||||
InsecureClientIp(addr): InsecureClientIp,
|
||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
||||
request: Request,
|
||||
next: Next,
|
||||
) -> impl IntoResponse {
|
||||
|
||||
@@ -14,6 +14,7 @@ use nym_task::TaskClient;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::net::SocketAddr;
|
||||
use std::net::ToSocketAddrs;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -312,7 +313,7 @@ impl VerlocMeasurer {
|
||||
info!("Starting verloc measurements");
|
||||
// TODO: should we also measure gateways?
|
||||
|
||||
let all_mixes = match self.validator_client.get_all_described_nodes().await {
|
||||
let all_mixes = match self.validator_client.get_cached_mixnodes().await {
|
||||
Ok(nodes) => nodes,
|
||||
Err(err) => {
|
||||
error!(
|
||||
@@ -331,14 +332,22 @@ impl VerlocMeasurer {
|
||||
// we only care about address and identity
|
||||
let tested_nodes = all_mixes
|
||||
.into_iter()
|
||||
.filter(|n| n.description.declared_role.mixnode)
|
||||
.filter_map(|node| {
|
||||
// try to parse the identity and host
|
||||
let node_identity = node.ed25519_identity_key();
|
||||
let mix_node = node.bond_information.mix_node;
|
||||
// check if the node has sufficient version to be able to understand the packets
|
||||
let node_version = parse_version(&mix_node.version).ok()?;
|
||||
if node_version < self.config.minimum_compatible_node_version {
|
||||
return None;
|
||||
}
|
||||
|
||||
let ip = node.description.host_information.ip_address.first()?;
|
||||
let verloc_port = node.description.verloc_port();
|
||||
let verloc_host = SocketAddr::new(*ip, verloc_port);
|
||||
// try to parse the identity and host
|
||||
let node_identity =
|
||||
identity::PublicKey::from_base58_string(mix_node.identity_key).ok()?;
|
||||
|
||||
let verloc_host = (&*mix_node.host, mix_node.verloc_port)
|
||||
.to_socket_addrs()
|
||||
.ok()?
|
||||
.next()?;
|
||||
|
||||
// TODO: possible problem in the future, this does name resolution and theoretically
|
||||
// if a lot of nodes maliciously mis-configured themselves, it might take a while to resolve them all
|
||||
|
||||
@@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct TestrunAssignment {
|
||||
/// has nothing to do with GW identity key. This is PK from `gateways` table
|
||||
pub testrun_id: i64,
|
||||
pub gateway_identity_key: String,
|
||||
pub gateway_pk_id: i64,
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ nym-sphinx-types = { path = "../types" }
|
||||
nym-topology = { path = "../../topology" }
|
||||
|
||||
[target."cfg(target_arch = \"wasm32\")".dependencies.wasm-bindgen]
|
||||
version = "0.2.93"
|
||||
version = "0.2.95"
|
||||
|
||||
[dev-dependencies]
|
||||
rand_chacha = { workspace = true }
|
||||
|
||||
@@ -42,8 +42,32 @@ impl PendingSync {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockProcessorConfig {
|
||||
pub pruning_options: PruningOptions,
|
||||
pub store_precommits: bool,
|
||||
}
|
||||
|
||||
impl Default for BlockProcessorConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
pruning_options: PruningOptions::nothing(),
|
||||
store_precommits: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockProcessorConfig {
|
||||
pub fn new(pruning_options: PruningOptions, store_precommits: bool) -> Self {
|
||||
Self {
|
||||
pruning_options,
|
||||
store_precommits,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BlockProcessor {
|
||||
pruning_options: PruningOptions,
|
||||
config: BlockProcessorConfig,
|
||||
cancel: CancellationToken,
|
||||
synced: Arc<Notify>,
|
||||
last_processed_height: u32,
|
||||
@@ -65,9 +89,10 @@ pub struct BlockProcessor {
|
||||
msg_modules: Vec<Box<dyn MsgModule + Send>>,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
impl BlockProcessor {
|
||||
pub async fn new(
|
||||
pruning_options: PruningOptions,
|
||||
config: BlockProcessorConfig,
|
||||
cancel: CancellationToken,
|
||||
synced: Arc<Notify>,
|
||||
incoming: UnboundedReceiver<BlockToProcess>,
|
||||
@@ -82,7 +107,7 @@ impl BlockProcessor {
|
||||
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
|
||||
|
||||
Ok(BlockProcessor {
|
||||
pruning_options,
|
||||
config,
|
||||
cancel,
|
||||
synced,
|
||||
last_processed_height,
|
||||
@@ -101,7 +126,7 @@ impl BlockProcessor {
|
||||
}
|
||||
|
||||
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
|
||||
self.pruning_options = pruning_options;
|
||||
self.config.pruning_options = pruning_options;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -128,7 +153,7 @@ impl BlockProcessor {
|
||||
// we won't end up with a corrupted storage.
|
||||
let mut tx = self.storage.begin_processing_tx().await?;
|
||||
|
||||
persist_block(&full_info, &mut tx).await?;
|
||||
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
|
||||
|
||||
// let the modules do whatever they want
|
||||
// the ones wanting the full block:
|
||||
@@ -241,7 +266,7 @@ impl BlockProcessor {
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
||||
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||
let last_to_keep = self.last_processed_height - keep_recent;
|
||||
|
||||
info!(
|
||||
@@ -282,12 +307,12 @@ impl BlockProcessor {
|
||||
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||
debug!("checking for storage pruning");
|
||||
|
||||
if self.pruning_options.strategy.is_nothing() {
|
||||
if self.config.pruning_options.strategy.is_nothing() {
|
||||
trace!("the current pruning strategy is 'nothing'");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let interval = self.pruning_options.strategy_interval();
|
||||
let interval = self.config.pruning_options.strategy_interval();
|
||||
if self.last_pruned_height + interval <= self.last_processed_height {
|
||||
self.prune_storage().await?;
|
||||
}
|
||||
@@ -371,7 +396,7 @@ impl BlockProcessor {
|
||||
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
|
||||
// in case we were offline for a while,
|
||||
// make sure we don't request blocks we'd have to prune anyway
|
||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
||||
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||
let last_to_keep = latest_block - keep_recent;
|
||||
self.last_processed_height = max(self.last_processed_height, last_to_keep);
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::block_processor::types::BlockToProcess;
|
||||
use crate::block_processor::BlockProcessor;
|
||||
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
|
||||
use crate::block_requester::{BlockRequest, BlockRequester};
|
||||
use crate::error::ScraperError;
|
||||
use crate::modules::{BlockModule, MsgModule, TxModule};
|
||||
@@ -34,6 +34,8 @@ pub struct Config {
|
||||
pub database_path: PathBuf,
|
||||
|
||||
pub pruning_options: PruningOptions,
|
||||
|
||||
pub store_precommits: bool,
|
||||
}
|
||||
|
||||
pub struct NyxdScraperBuilder {
|
||||
@@ -60,8 +62,14 @@ impl NyxdScraperBuilder {
|
||||
req_rx,
|
||||
processing_tx.clone(),
|
||||
);
|
||||
let mut block_processor = BlockProcessor::new(
|
||||
|
||||
let block_processor_config = BlockProcessorConfig::new(
|
||||
scraper.config.pruning_options,
|
||||
scraper.config.store_precommits,
|
||||
);
|
||||
|
||||
let mut block_processor = BlockProcessor::new(
|
||||
block_processor_config,
|
||||
scraper.cancel_token.clone(),
|
||||
scraper.startup_sync.clone(),
|
||||
processing_rx,
|
||||
@@ -275,8 +283,11 @@ impl NyxdScraper {
|
||||
req_tx: Sender<BlockRequest>,
|
||||
processing_rx: UnboundedReceiver<BlockToProcess>,
|
||||
) -> Result<BlockProcessor, ScraperError> {
|
||||
let block_processor_config =
|
||||
BlockProcessorConfig::new(self.config.pruning_options, self.config.store_precommits);
|
||||
|
||||
BlockProcessor::new(
|
||||
self.config.pruning_options,
|
||||
block_processor_config,
|
||||
self.cancel_token.clone(),
|
||||
self.startup_sync.clone(),
|
||||
processing_rx,
|
||||
|
||||
@@ -212,6 +212,7 @@ impl ScraperStorage {
|
||||
pub async fn persist_block(
|
||||
block: &FullBlockInformation,
|
||||
tx: &mut StorageTransaction,
|
||||
store_precommits: bool,
|
||||
) -> Result<(), ScraperError> {
|
||||
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
|
||||
|
||||
@@ -224,11 +225,12 @@ pub async fn persist_block(
|
||||
// persist block data
|
||||
persist_block_data(&block.block, total_gas, tx).await?;
|
||||
|
||||
// persist commits
|
||||
if let Some(commit) = &block.block.last_commit {
|
||||
persist_commits(commit, &block.validators, tx).await?;
|
||||
} else {
|
||||
warn!("no commits for block {}", block.block.header.height)
|
||||
if store_precommits {
|
||||
if let Some(commit) = &block.block.last_commit {
|
||||
persist_commits(commit, &block.validators, tx).await?;
|
||||
} else {
|
||||
warn!("no commits for block {}", block.block.header.height)
|
||||
}
|
||||
}
|
||||
|
||||
// persist txs
|
||||
|
||||
@@ -286,10 +286,6 @@ impl NymTopology {
|
||||
self.get_gateway(gateway_identity).is_some()
|
||||
}
|
||||
|
||||
pub fn insert_gateway(&mut self, gateway: gateway::LegacyNode) {
|
||||
self.gateways.push(gateway)
|
||||
}
|
||||
|
||||
pub fn set_gateways(&mut self, gateways: Vec<gateway::LegacyNode>) {
|
||||
self.gateways = gateways
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ impl<'a> TryFrom<&'a SkimmedNode> for LegacyNode {
|
||||
});
|
||||
}
|
||||
|
||||
let layer = match value.role {
|
||||
let layer = match value.epoch_role {
|
||||
NodeRole::Mixnode { layer } => layer
|
||||
.try_into()
|
||||
.map_err(|_| MixnodeConversionError::InvalidLayer)?,
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::deprecated::DelegationEvent;
|
||||
use crate::error::TypesError;
|
||||
use crate::mixnode::NodeCostParams;
|
||||
use cosmwasm_std::Decimal;
|
||||
use nym_mixnet_contract_common::{Delegation as MixnetContractDelegation, NodeId, NodeRewarding};
|
||||
use nym_mixnet_contract_common::{Delegation as MixnetContractDelegation, NodeId};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -70,14 +70,6 @@ pub struct DelegationWithEverything {
|
||||
pub mixnode_is_unbonding: Option<bool>,
|
||||
}
|
||||
|
||||
pub struct NodeInformation {
|
||||
pub owner: String,
|
||||
pub mix_id: NodeId,
|
||||
pub node_identity: String,
|
||||
pub rewarding_details: NodeRewarding,
|
||||
pub is_unbonding: bool,
|
||||
}
|
||||
|
||||
#[cfg_attr(feature = "generate-ts", derive(ts_rs::TS))]
|
||||
#[cfg_attr(
|
||||
feature = "generate-ts",
|
||||
|
||||
@@ -68,7 +68,7 @@ pub async fn current_network_topology_async(
|
||||
|
||||
let api_client = NymApiClient::new(url);
|
||||
let mixnodes = api_client
|
||||
.get_all_basic_active_mixing_assigned_nodes(None)
|
||||
.get_basic_active_mixing_assigned_nodes(None)
|
||||
.await?;
|
||||
let gateways = api_client.get_all_basic_entry_assigned_nodes(None).await?;
|
||||
|
||||
|
||||
@@ -99,25 +99,12 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
|
||||
let peers = all_peers
|
||||
.into_iter()
|
||||
.map(Peer::try_from)
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
.map(|mut peer| {
|
||||
// since WGApi doesn't set those values on init, let's set them to 0
|
||||
peer.rx_bytes = 0;
|
||||
peer.tx_bytes = 0;
|
||||
peer
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
for peer in peers.iter() {
|
||||
let bandwidth_manager =
|
||||
PeerController::generate_bandwidth_manager(storage.clone(), &peer.public_key)
|
||||
.await?
|
||||
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
|
||||
// Update storage with *x_bytes set to 0, as in kernel peers we can't set those values
|
||||
// so we need to restart counting. Hopefully the bandwidth was counted in available_bandwidth
|
||||
storage
|
||||
.insert_wireguard_peer(peer, bandwidth_manager.is_some())
|
||||
.await?;
|
||||
peer_bandwidth_managers.insert(peer.public_key.clone(), bandwidth_manager);
|
||||
}
|
||||
wg_api.create_interface()?;
|
||||
|
||||
@@ -7,8 +7,8 @@ use defguard_wireguard_rs::{
|
||||
WireguardInterfaceApi,
|
||||
};
|
||||
use futures::channel::oneshot;
|
||||
use nym_authenticator_requests::latest::registration::{
|
||||
RemainingBandwidthData, BANDWIDTH_CAP_PER_DAY,
|
||||
use nym_authenticator_requests::{
|
||||
latest::registration::RemainingBandwidthData, v1::registration::BANDWIDTH_CAP_PER_DAY,
|
||||
};
|
||||
use nym_credential_verification::{
|
||||
bandwidth_storage_manager::BandwidthStorageManager, BandwidthFlushingBehaviourConfig,
|
||||
@@ -158,13 +158,10 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
.ok_or(Error::MissingClientBandwidthEntry)?
|
||||
.client_id
|
||||
{
|
||||
let bandwidth = storage
|
||||
.get_available_bandwidth(client_id)
|
||||
.await?
|
||||
.ok_or(Error::MissingClientBandwidthEntry)?;
|
||||
storage.create_bandwidth_entry(client_id).await?;
|
||||
Ok(Some(BandwidthStorageManager::new(
|
||||
storage,
|
||||
ClientBandwidth::new(bandwidth.into()),
|
||||
ClientBandwidth::new(Default::default()),
|
||||
client_id,
|
||||
BandwidthFlushingBehaviourConfig::default(),
|
||||
true,
|
||||
@@ -230,7 +227,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
// host information not updated yet
|
||||
return Ok(None);
|
||||
};
|
||||
BANDWIDTH_CAP_PER_DAY.saturating_sub(peer.rx_bytes + peer.tx_bytes) as i64
|
||||
BANDWIDTH_CAP_PER_DAY.saturating_sub((peer.rx_bytes + peer.tx_bytes) as i64)
|
||||
};
|
||||
|
||||
Ok(Some(RemainingBandwidthData {
|
||||
|
||||
@@ -6,7 +6,7 @@ use crate::peer_controller::PeerControlRequest;
|
||||
use defguard_wireguard_rs::host::Peer;
|
||||
use defguard_wireguard_rs::{host::Host, key::Key};
|
||||
use futures::channel::oneshot;
|
||||
use nym_authenticator_requests::latest::registration::BANDWIDTH_CAP_PER_DAY;
|
||||
use nym_authenticator_requests::v2::registration::BANDWIDTH_CAP_PER_DAY;
|
||||
use nym_credential_verification::bandwidth_storage_manager::BandwidthStorageManager;
|
||||
use nym_gateway_storage::models::WireguardPeer;
|
||||
use nym_gateway_storage::Storage;
|
||||
@@ -18,7 +18,7 @@ use tokio::sync::{mpsc, RwLock};
|
||||
use tokio_stream::{wrappers::IntervalStream, StreamExt};
|
||||
|
||||
pub(crate) type SharedBandwidthStorageManager<St> = Arc<RwLock<BandwidthStorageManager<St>>>;
|
||||
const AUTO_REMOVE_AFTER: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 30 days
|
||||
const AUTO_REMOVE_AFTER: Duration = Duration::from_secs(60 * 60 * 24); // 24 hours
|
||||
|
||||
pub struct PeerHandle<St> {
|
||||
storage: St,
|
||||
@@ -75,8 +75,8 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
|
||||
|
||||
async fn active_peer(
|
||||
&mut self,
|
||||
storage_peer: &WireguardPeer,
|
||||
kernel_peer: &Peer,
|
||||
storage_peer: WireguardPeer,
|
||||
kernel_peer: Peer,
|
||||
) -> Result<bool, Error> {
|
||||
if let Some(bandwidth_manager) = &self.bandwidth_storage_manager {
|
||||
let spent_bandwidth = (kernel_peer.rx_bytes + kernel_peer.tx_bytes)
|
||||
@@ -84,13 +84,12 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
|
||||
.ok_or(Error::InconsistentConsumedBytes)?
|
||||
.try_into()
|
||||
.map_err(|_| Error::InconsistentConsumedBytes)?;
|
||||
if spent_bandwidth > 0
|
||||
&& bandwidth_manager
|
||||
.write()
|
||||
.await
|
||||
.try_use_bandwidth(spent_bandwidth)
|
||||
.await
|
||||
.is_err()
|
||||
if bandwidth_manager
|
||||
.write()
|
||||
.await
|
||||
.try_use_bandwidth(spent_bandwidth)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
let success = self.remove_peer().await?;
|
||||
return Ok(!success);
|
||||
@@ -98,7 +97,7 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
|
||||
} else {
|
||||
if SystemTime::now().duration_since(self.startup_timestamp)? >= AUTO_REMOVE_AFTER {
|
||||
log::debug!(
|
||||
"Peer {} has been present for 30 days, removing it",
|
||||
"Peer {} has been present for 24 hours, removing it",
|
||||
self.public_key.to_string()
|
||||
);
|
||||
let success = self.remove_peer().await?;
|
||||
@@ -136,12 +135,9 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
|
||||
log::debug!("Peer {:?} not in storage anymore, shutting down handle", self.public_key);
|
||||
return Ok(());
|
||||
};
|
||||
if !self.active_peer(&storage_peer, &kernel_peer).await? {
|
||||
if !self.active_peer(storage_peer, kernel_peer).await? {
|
||||
log::debug!("Peer {:?} doesn't have bandwidth anymore, shutting down handle", self.public_key);
|
||||
return Ok(());
|
||||
} else {
|
||||
// Update storage values
|
||||
self.storage.insert_wireguard_peer(&kernel_peer, self.bandwidth_storage_manager.is_some()).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3689,7 +3689,8 @@
|
||||
"format": "uint32",
|
||||
"minimum": 0.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"Percent": {
|
||||
"description": "Percent represents a value between 0 and 100% (i.e. between 0.0 and 1.0)",
|
||||
@@ -5238,7 +5239,8 @@
|
||||
"format": "uint32",
|
||||
"minimum": 0.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"Percent": {
|
||||
"description": "Percent represents a value between 0 and 100% (i.e. between 0.0 and 1.0)",
|
||||
@@ -5573,7 +5575,8 @@
|
||||
"format": "uint32",
|
||||
"minimum": 0.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"Percent": {
|
||||
"description": "Percent represents a value between 0 and 100% (i.e. between 0.0 and 1.0)",
|
||||
@@ -7592,7 +7595,8 @@
|
||||
"format": "uint32",
|
||||
"minimum": 0.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"Percent": {
|
||||
"description": "Percent represents a value between 0 and 100% (i.e. between 0.0 and 1.0)",
|
||||
|
||||
@@ -315,7 +315,8 @@
|
||||
"format": "uint32",
|
||||
"minimum": 0.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"Percent": {
|
||||
"description": "Percent represents a value between 0 and 100% (i.e. between 0.0 and 1.0)",
|
||||
|
||||
@@ -323,7 +323,8 @@
|
||||
"format": "uint32",
|
||||
"minimum": 0.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"Percent": {
|
||||
"description": "Percent represents a value between 0 and 100% (i.e. between 0.0 and 1.0)",
|
||||
|
||||
@@ -317,7 +317,8 @@
|
||||
"format": "uint32",
|
||||
"minimum": 0.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"Percent": {
|
||||
"description": "Percent represents a value between 0 and 100% (i.e. between 0.0 and 1.0)",
|
||||
|
||||
@@ -319,7 +319,8 @@
|
||||
"format": "uint32",
|
||||
"minimum": 0.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"Percent": {
|
||||
"description": "Percent represents a value between 0 and 100% (i.e. between 0.0 and 1.0)",
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
use super::helpers::must_get_gateway_bond_by_owner;
|
||||
use super::storage;
|
||||
use crate::constants::default_node_costs;
|
||||
use crate::interval::storage as interval_storage;
|
||||
use crate::mixnet_contract_settings::storage as mixnet_params_storage;
|
||||
use crate::nodes::helpers::save_new_nymnode_with_id;
|
||||
use crate::nodes::transactions::add_nym_node_inner;
|
||||
@@ -116,10 +115,6 @@ pub fn try_migrate_to_nymnode(
|
||||
comment: "legacy gateway did not have a pre-assigned node id".to_string(),
|
||||
})?;
|
||||
|
||||
let current_epoch =
|
||||
interval_storage::current_interval(deps.storage)?.current_epoch_absolute_id();
|
||||
let previous_epoch = current_epoch.saturating_sub(1);
|
||||
|
||||
// create nym-node entry
|
||||
// for gateways it's quite straightforward as there are no delegations or rewards to worry about
|
||||
save_new_nymnode_with_id(
|
||||
@@ -130,7 +125,6 @@ pub fn try_migrate_to_nymnode(
|
||||
cost_params,
|
||||
info.sender.clone(),
|
||||
gateway_bond.pledge_amount,
|
||||
previous_epoch,
|
||||
)?;
|
||||
|
||||
storage::PREASSIGNED_LEGACY_IDS.remove(deps.storage, gateway_identity.clone());
|
||||
|
||||
@@ -22,8 +22,6 @@ pub(crate) fn save_new_nymnode(
|
||||
pledge: Coin,
|
||||
) -> Result<NodeId, MixnetContractError> {
|
||||
let node_id = next_nymnode_id_counter(storage)?;
|
||||
let current_epoch = interval_storage::current_interval(storage)?.current_epoch_absolute_id();
|
||||
|
||||
save_new_nymnode_with_id(
|
||||
storage,
|
||||
node_id,
|
||||
@@ -32,13 +30,11 @@ pub(crate) fn save_new_nymnode(
|
||||
cost_params,
|
||||
owner,
|
||||
pledge,
|
||||
current_epoch,
|
||||
)?;
|
||||
|
||||
Ok(node_id)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn save_new_nymnode_with_id(
|
||||
storage: &mut dyn Storage,
|
||||
node_id: NodeId,
|
||||
@@ -47,9 +43,10 @@ pub(crate) fn save_new_nymnode_with_id(
|
||||
cost_params: NodeCostParams,
|
||||
owner: Addr,
|
||||
pledge: Coin,
|
||||
last_rewarding_epoch: u32,
|
||||
) -> Result<(), MixnetContractError> {
|
||||
let node_rewarding = NodeRewarding::initialise_new(cost_params, &pledge, last_rewarding_epoch)?;
|
||||
let current_epoch = interval_storage::current_interval(storage)?.current_epoch_absolute_id();
|
||||
|
||||
let node_rewarding = NodeRewarding::initialise_new(cost_params, &pledge, current_epoch)?;
|
||||
let node_bond = NymNodeBond::new(node_id, owner, pledge, node, bonding_height);
|
||||
|
||||
// save node bond data
|
||||
|
||||
@@ -52,8 +52,8 @@ pub(crate) fn save_assignment(
|
||||
|
||||
// update metadata
|
||||
let mut metadata = ROLES_METADATA.load(storage, inactive)?;
|
||||
let highest_id = assignment.nodes.iter().max().copied().unwrap_or_default();
|
||||
metadata.set_highest_id(highest_id, assignment.role);
|
||||
let last = assignment.nodes.last().copied().unwrap_or_default();
|
||||
metadata.set_highest_id(last, assignment.role);
|
||||
metadata.set_role_count(assignment.role, assignment.nodes.len() as u32);
|
||||
if assignment.is_final_assignment() {
|
||||
metadata.fully_assigned = true
|
||||
@@ -104,10 +104,7 @@ pub(crate) fn next_nymnode_id_counter(store: &mut dyn Storage) -> StdResult<Node
|
||||
}
|
||||
|
||||
pub(crate) fn initialise_storage(storage: &mut dyn Storage) -> Result<(), MixnetContractError> {
|
||||
let active_bucket = RoleStorageBucket::default();
|
||||
let inactive_bucket = active_bucket.other();
|
||||
|
||||
ACTIVE_ROLES_BUCKET.save(storage, &active_bucket)?;
|
||||
ACTIVE_ROLES_BUCKET.save(storage, &RoleStorageBucket::default())?;
|
||||
let roles = vec![
|
||||
Role::Layer1,
|
||||
Role::Layer2,
|
||||
@@ -117,12 +114,24 @@ pub(crate) fn initialise_storage(storage: &mut dyn Storage) -> Result<(), Mixnet
|
||||
Role::Standby,
|
||||
];
|
||||
for role in roles {
|
||||
ROLES.save(storage, (active_bucket as u8, role), &vec![])?;
|
||||
ROLES.save(storage, (inactive_bucket as u8, role), &vec![])?
|
||||
ROLES.save(storage, (RoleStorageBucket::default() as u8, role), &vec![])?;
|
||||
ROLES.save(
|
||||
storage,
|
||||
(RoleStorageBucket::default().other() as u8, role),
|
||||
&vec![],
|
||||
)?
|
||||
}
|
||||
|
||||
ROLES_METADATA.save(storage, active_bucket as u8, &Default::default())?;
|
||||
ROLES_METADATA.save(storage, inactive_bucket as u8, &Default::default())?;
|
||||
ROLES_METADATA.save(
|
||||
storage,
|
||||
RoleStorageBucket::default() as u8,
|
||||
&Default::default(),
|
||||
)?;
|
||||
ROLES_METADATA.save(
|
||||
storage,
|
||||
RoleStorageBucket::default().other() as u8,
|
||||
&Default::default(),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -131,7 +140,6 @@ pub(crate) fn initialise_storage(storage: &mut dyn Storage) -> Result<(), Mixnet
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::support::tests::test_helpers;
|
||||
use crate::support::tests::test_helpers::TestSetup;
|
||||
|
||||
#[test]
|
||||
fn next_id() {
|
||||
@@ -141,33 +149,4 @@ mod tests {
|
||||
assert_eq!(i, next_nymnode_id_counter(deps.as_mut().storage).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn assigning_role_uses_highest_id_even_if_not_sorted() {
|
||||
let mut test = TestSetup::new();
|
||||
let deps = test.deps_mut();
|
||||
|
||||
let sorted = RoleAssignment {
|
||||
role: Role::EntryGateway,
|
||||
nodes: vec![1, 2, 3],
|
||||
};
|
||||
|
||||
let unsorted = RoleAssignment {
|
||||
role: Role::Layer1,
|
||||
nodes: vec![8, 5, 4],
|
||||
};
|
||||
|
||||
save_assignment(deps.storage, sorted).unwrap();
|
||||
save_assignment(deps.storage, unsorted).unwrap();
|
||||
|
||||
let storage = deps.as_ref().storage;
|
||||
|
||||
let active_bucket = ACTIVE_ROLES_BUCKET.load(storage).unwrap();
|
||||
let inactive = active_bucket.other() as u8;
|
||||
let metadata = ROLES_METADATA.load(storage, inactive).unwrap();
|
||||
|
||||
assert_eq!(metadata.entry_gateway_metadata.highest_id, 3);
|
||||
assert_eq!(metadata.layer1_metadata.highest_id, 8);
|
||||
assert_eq!(metadata.highest_rewarded_id(), 8)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,16 +90,12 @@ mod families_purge {
|
||||
|
||||
mod nym_nodes_usage {
|
||||
use crate::constants::{CONTRACT_STATE_KEY, REWARDING_PARAMS_KEY};
|
||||
use crate::interval::storage::current_interval;
|
||||
use crate::mixnet_contract_settings::storage::CONTRACT_STATE;
|
||||
use crate::nodes::storage::helpers::RoleStorageBucket;
|
||||
use crate::nodes::storage::rewarded_set::{ACTIVE_ROLES_BUCKET, ROLES, ROLES_METADATA};
|
||||
use crate::rewards::storage::RewardingStorage;
|
||||
use crate::support::helpers::ensure_epoch_in_progress_state;
|
||||
use cosmwasm_std::{Addr, Coin, DepsMut, Order, StdResult, Storage};
|
||||
use cw_storage_plus::{Item, Map};
|
||||
use mixnet_contract_common::error::MixnetContractError;
|
||||
use mixnet_contract_common::nym_node::{RewardedSetMetadata, Role};
|
||||
use mixnet_contract_common::reward_params::RewardedSetParams;
|
||||
use mixnet_contract_common::{
|
||||
ContractState, ContractStateParams, IntervalRewardParams, MigrateMsg, NodeId,
|
||||
@@ -177,9 +173,7 @@ mod nym_nodes_usage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn preassign_gateway_ids(
|
||||
storage: &mut dyn Storage,
|
||||
) -> Result<(Option<NodeId>, Option<NodeId>), MixnetContractError> {
|
||||
fn preassign_gateway_ids(storage: &mut dyn Storage) -> Result<(), MixnetContractError> {
|
||||
// that one is a big if. we have ~100 gateways so we **might** be able to fit it within migration.
|
||||
// if not, then we'll have to do it in batches/change our approach
|
||||
|
||||
@@ -188,15 +182,8 @@ mod nym_nodes_usage {
|
||||
.map(|res| res.map(|row| row.1))
|
||||
.collect::<StdResult<Vec<_>>>()?;
|
||||
|
||||
let mut start = None;
|
||||
let mut end = None;
|
||||
for gateway in gateways {
|
||||
let id = crate::nodes::storage::next_nymnode_id_counter(storage)?;
|
||||
if start.is_none() {
|
||||
start = Some(id)
|
||||
}
|
||||
end = Some(id);
|
||||
|
||||
crate::gateways::storage::PREASSIGNED_LEGACY_IDS.save(
|
||||
storage,
|
||||
gateway.gateway.identity_key,
|
||||
@@ -204,12 +191,10 @@ mod nym_nodes_usage {
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok((start, end))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cleanup_legacy_storage(
|
||||
storage: &mut dyn Storage,
|
||||
) -> Result<Vec<NodeId>, MixnetContractError> {
|
||||
fn cleanup_legacy_storage(storage: &mut dyn Storage) -> Result<(), MixnetContractError> {
|
||||
#[derive(Copy, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct LayerDistribution {
|
||||
pub layer1: u64,
|
||||
@@ -239,11 +224,11 @@ mod nym_nodes_usage {
|
||||
.keys(storage, None, None, Order::Ascending)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
for &node_id in &rewarded_ids {
|
||||
for node_id in rewarded_ids {
|
||||
REWARDED_SET.remove(storage, node_id)
|
||||
}
|
||||
|
||||
Ok(rewarded_ids)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn migrate_rewarded_set_params(storage: &mut dyn Storage) -> Result<(), MixnetContractError> {
|
||||
@@ -283,98 +268,6 @@ mod nym_nodes_usage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn assign_temporary_rewarded_set(
|
||||
storage: &mut dyn Storage,
|
||||
(min_available_gateway, max_available_gateway): (Option<NodeId>, Option<NodeId>),
|
||||
current_rewarded_set_mixnodes: Vec<NodeId>,
|
||||
) -> Result<(), MixnetContractError> {
|
||||
let epoch_id = current_interval(storage)?.current_epoch_absolute_id();
|
||||
|
||||
// in the previous step we explicitly set rewarded set to 120 mixnodes and 50 entry gateways
|
||||
// note: we can't assign exit gateways because the contract itself doesn't know which might support it
|
||||
|
||||
let active_bucket = RoleStorageBucket::default();
|
||||
let inactive_bucket = active_bucket.other();
|
||||
ACTIVE_ROLES_BUCKET.save(storage, &active_bucket)?;
|
||||
|
||||
// ACTIVE BUCKET:
|
||||
let mut active_metadata = RewardedSetMetadata::new(epoch_id);
|
||||
|
||||
let mut current_rewarded_set_mixnodes = current_rewarded_set_mixnodes;
|
||||
// ensure it's sorted. it should have already been, but better safe than sorry..
|
||||
current_rewarded_set_mixnodes.sort();
|
||||
|
||||
let mut layer1 = Vec::new();
|
||||
let mut layer2 = Vec::new();
|
||||
let mut layer3 = Vec::new();
|
||||
let mut entry = Vec::new();
|
||||
|
||||
for (i, mix_id) in current_rewarded_set_mixnodes
|
||||
.into_iter()
|
||||
.take(120)
|
||||
.enumerate()
|
||||
{
|
||||
if i % 3 == 0 {
|
||||
layer1.push(mix_id);
|
||||
} else if i % 3 == 1 {
|
||||
layer2.push(mix_id);
|
||||
} else if i % 3 == 2 {
|
||||
layer3.push(mix_id);
|
||||
}
|
||||
}
|
||||
|
||||
if let (Some(min_id), Some(max_id)) = (min_available_gateway, max_available_gateway) {
|
||||
// we can assign the gateway nodes
|
||||
entry = (min_id..=max_id).take(50).collect();
|
||||
}
|
||||
|
||||
// ACTIVE BUCKET:
|
||||
active_metadata.fully_assigned = true;
|
||||
|
||||
// layer1
|
||||
ROLES.save(storage, (active_bucket as u8, Role::Layer1), &layer1)?;
|
||||
active_metadata.layer1_metadata.num_nodes = layer1.len() as u32;
|
||||
active_metadata.layer1_metadata.highest_id = layer1.last().copied().unwrap_or_default();
|
||||
|
||||
// layer2
|
||||
ROLES.save(storage, (active_bucket as u8, Role::Layer2), &layer2)?;
|
||||
active_metadata.layer2_metadata.num_nodes = layer2.len() as u32;
|
||||
active_metadata.layer2_metadata.highest_id = layer2.last().copied().unwrap_or_default();
|
||||
|
||||
// layer3
|
||||
ROLES.save(storage, (active_bucket as u8, Role::Layer3), &layer3)?;
|
||||
active_metadata.layer3_metadata.num_nodes = layer3.len() as u32;
|
||||
active_metadata.layer3_metadata.highest_id = layer3.last().copied().unwrap_or_default();
|
||||
|
||||
// entry
|
||||
ROLES.save(storage, (active_bucket as u8, Role::EntryGateway), &entry)?;
|
||||
active_metadata.entry_gateway_metadata.num_nodes = entry.len() as u32;
|
||||
active_metadata.entry_gateway_metadata.highest_id =
|
||||
entry.last().copied().unwrap_or_default();
|
||||
|
||||
// nothing for exit or standby
|
||||
ROLES.save(storage, (active_bucket as u8, Role::ExitGateway), &vec![])?;
|
||||
ROLES.save(storage, (active_bucket as u8, Role::Standby), &vec![])?;
|
||||
ROLES_METADATA.save(storage, active_bucket as u8, &active_metadata)?;
|
||||
|
||||
// SECONDARY BUCKET
|
||||
let roles = vec![
|
||||
Role::Layer1,
|
||||
Role::Layer2,
|
||||
Role::Layer3,
|
||||
Role::EntryGateway,
|
||||
Role::ExitGateway,
|
||||
Role::Standby,
|
||||
];
|
||||
for role in roles {
|
||||
ROLES.save(storage, (inactive_bucket as u8, role), &vec![])?
|
||||
}
|
||||
|
||||
ROLES_METADATA.save(storage, inactive_bucket as u8, &Default::default())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn migrate_to_nym_nodes_usage(
|
||||
deps: DepsMut<'_>,
|
||||
_msg: &MigrateMsg,
|
||||
@@ -391,18 +284,16 @@ mod nym_nodes_usage {
|
||||
|
||||
// pre-assign NodeId to all gateways (that will be applied during nym-node migration)
|
||||
// to simplify all other code during the intermediate period
|
||||
let gateways = preassign_gateway_ids(deps.storage)?;
|
||||
preassign_gateway_ids(deps.storage)?;
|
||||
|
||||
// initialise all the storage structures required by nym-nodes
|
||||
crate::nodes::storage::initialise_storage(deps.storage)?;
|
||||
|
||||
// update the simple active/rewarded set sizes to actually contain the distribution of roles
|
||||
migrate_rewarded_set_params(deps.storage)?;
|
||||
|
||||
// remove all redundant storage items
|
||||
let old_rewarded_set_mixnodes = cleanup_legacy_storage(deps.storage)?;
|
||||
|
||||
// assign initial rewarded set
|
||||
// and initialise all the storage structures required by nym-nodes
|
||||
// based on the nodes that are in the contract right now
|
||||
assign_temporary_rewarded_set(deps.storage, gateways, old_rewarded_set_mixnodes)?;
|
||||
cleanup_legacy_storage(deps.storage)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -13,8 +13,6 @@ pub use nym_explorer_api_requests::{
|
||||
|
||||
// Paths
|
||||
const API_VERSION: &str = "v1";
|
||||
const TMP: &str = "tmp";
|
||||
const UNSTABLE: &str = "unstable";
|
||||
const MIXNODES: &str = "mix-nodes";
|
||||
const GATEWAYS: &str = "gateways";
|
||||
|
||||
@@ -98,13 +96,6 @@ impl ExplorerClient {
|
||||
pub async fn get_gateways(&self) -> Result<Vec<PrettyDetailedGatewayBond>, ExplorerApiError> {
|
||||
self.query_explorer_api(&[API_VERSION, GATEWAYS]).await
|
||||
}
|
||||
|
||||
pub async fn unstable_get_gateways(
|
||||
&self,
|
||||
) -> Result<Vec<PrettyDetailedGatewayBond>, ExplorerApiError> {
|
||||
self.query_explorer_api(&[API_VERSION, TMP, UNSTABLE, GATEWAYS])
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
fn combine_url(mut base_url: Url, paths: &[&str]) -> Result<Url, ExplorerApiError> {
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
use crate::state::ExplorerApiStateContext;
|
||||
use log::{info, warn};
|
||||
use nym_explorer_api_requests::Location;
|
||||
use nym_network_defaults::DEFAULT_NYM_NODE_HTTP_PORT;
|
||||
use nym_task::TaskClient;
|
||||
|
||||
pub(crate) struct GeoLocateTask {
|
||||
@@ -26,7 +25,6 @@ impl GeoLocateTask {
|
||||
_ = interval_timer.tick() => {
|
||||
self.locate_mix_nodes().await;
|
||||
self.locate_gateways().await;
|
||||
self.locate_nym_nodes().await;
|
||||
}
|
||||
_ = self.shutdown.recv() => {
|
||||
trace!("Listener: Received shutdown");
|
||||
@@ -111,83 +109,6 @@ impl GeoLocateTask {
|
||||
trace!("All mix nodes located");
|
||||
}
|
||||
|
||||
async fn locate_nym_nodes(&mut self) {
|
||||
// I'm unwrapping to the default value to get rid of an extra indentation level from the `if let Some(...) = ...`
|
||||
// If the value is None, we'll unwrap to an empty hashmap and the `values()` loop won't do any work anyway
|
||||
let nym_nodes = self.state.inner.nymnodes.get_bonded_nymnodes().await;
|
||||
|
||||
let geo_ip = self.state.inner.geo_ip.0.clone();
|
||||
|
||||
for (i, cache_item) in nym_nodes.values().enumerate() {
|
||||
if self
|
||||
.state
|
||||
.inner
|
||||
.nymnodes
|
||||
.is_location_valid(cache_item.node_id())
|
||||
.await
|
||||
{
|
||||
// when the cached location is valid, don't locate and continue to next mix node
|
||||
continue;
|
||||
}
|
||||
|
||||
let bonded_host = &cache_item.bond_information.node.host;
|
||||
|
||||
match geo_ip.query(
|
||||
bonded_host,
|
||||
Some(
|
||||
cache_item
|
||||
.bond_information
|
||||
.node
|
||||
.custom_http_port
|
||||
.unwrap_or(DEFAULT_NYM_NODE_HTTP_PORT),
|
||||
),
|
||||
) {
|
||||
Ok(opt) => match opt {
|
||||
Some(location) => {
|
||||
let location: Location = location.into();
|
||||
|
||||
trace!(
|
||||
"{} mix nodes already located. host {} is located in {:#?}",
|
||||
i,
|
||||
bonded_host,
|
||||
location.three_letter_iso_country_code,
|
||||
);
|
||||
|
||||
if i > 0 && (i % 100) == 0 {
|
||||
info!("Located {} nym-nodes...", i + 1,);
|
||||
}
|
||||
|
||||
self.state
|
||||
.inner
|
||||
.nymnodes
|
||||
.set_location(cache_item.node_id(), Some(location))
|
||||
.await;
|
||||
|
||||
// one node has been located, so return out of the loop
|
||||
return;
|
||||
}
|
||||
None => {
|
||||
warn!("❌ Location for {bonded_host} not found.");
|
||||
self.state
|
||||
.inner
|
||||
.nymnodes
|
||||
.set_location(cache_item.node_id(), None)
|
||||
.await;
|
||||
}
|
||||
},
|
||||
Err(_e) => {
|
||||
// warn!(
|
||||
// "❌ Oh no! Location for {} failed. Error: {:#?}",
|
||||
// cache_item.mix_node().host,
|
||||
// e
|
||||
// );
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
trace!("All nym-nodes nodes located");
|
||||
}
|
||||
|
||||
async fn locate_gateways(&mut self) {
|
||||
let gateways = self.state.inner.gateways.get_gateways().await;
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ use crate::gateways::http::gateways_make_default_routes;
|
||||
use crate::http::swagger::get_docs;
|
||||
use crate::mix_node::http::mix_node_make_default_routes;
|
||||
use crate::mix_nodes::http::mix_nodes_make_default_routes;
|
||||
use crate::nym_nodes::http::unstable_temp_nymnodes_make_default_routes;
|
||||
use crate::overview::http::overview_make_default_routes;
|
||||
use crate::ping::http::ping_make_default_routes;
|
||||
use crate::service_providers::http::service_providers_make_default_routes;
|
||||
@@ -59,7 +58,6 @@ fn configure_rocket(state: ExplorerApiStateContext) -> Rocket<Build> {
|
||||
"/ping" => ping_make_default_routes(&openapi_settings),
|
||||
"/validators" => validators_make_default_routes(&openapi_settings),
|
||||
"/service-providers" => service_providers_make_default_routes(&openapi_settings),
|
||||
"/tmp/unstable" => unstable_temp_nymnodes_make_default_routes(&openapi_settings),
|
||||
};
|
||||
|
||||
building_rocket
|
||||
|
||||
@@ -22,7 +22,6 @@ mod http;
|
||||
mod location;
|
||||
mod mix_node;
|
||||
pub(crate) mod mix_nodes;
|
||||
mod nym_nodes;
|
||||
mod overview;
|
||||
mod ping;
|
||||
pub(crate) mod service_providers;
|
||||
|
||||
@@ -8,9 +8,6 @@ use nym_contracts_common::truncate_decimal;
|
||||
use nym_mixnet_contract_common::NodeId;
|
||||
use nym_validator_client::client::NymApiClientExt;
|
||||
|
||||
// use deprecated method as hopefully this whole API will be sunset soon-enough...
|
||||
// and we're only getting info for legacy node so the relevant data should still exist
|
||||
#[allow(deprecated)]
|
||||
pub(crate) async fn retrieve_mixnode_econ_stats(
|
||||
client: &ThreadsafeValidatorClient,
|
||||
mix_id: NodeId,
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::state::ExplorerApiStateContext;
|
||||
use nym_explorer_api_requests::PrettyDetailedGatewayBond;
|
||||
use okapi::openapi3::OpenApi;
|
||||
use rocket::serde::json::Json;
|
||||
use rocket::{Route, State};
|
||||
use rocket_okapi::settings::OpenApiSettings;
|
||||
|
||||
pub fn unstable_temp_nymnodes_make_default_routes(
|
||||
settings: &OpenApiSettings,
|
||||
) -> (Vec<Route>, OpenApi) {
|
||||
openapi_get_routes_spec![settings: all_gateways]
|
||||
}
|
||||
|
||||
#[openapi(tag = "UNSTABLE")]
|
||||
#[get("/gateways")]
|
||||
pub(crate) async fn all_gateways(
|
||||
state: &State<ExplorerApiStateContext>,
|
||||
) -> Json<Vec<PrettyDetailedGatewayBond>> {
|
||||
let mut gateways = state.inner.gateways.get_detailed_gateways().await;
|
||||
gateways.append(&mut state.inner.nymnodes.pretty_gateways().await);
|
||||
|
||||
Json(gateways)
|
||||
}
|
||||
@@ -1,8 +0,0 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use nym_mixnet_contract_common::NodeId;
|
||||
|
||||
use crate::location::LocationCache;
|
||||
|
||||
pub(crate) type NymNodeLocationCache = LocationCache<NodeId>;
|
||||
@@ -1,10 +0,0 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
pub(crate) mod http;
|
||||
pub(crate) mod location;
|
||||
pub(crate) mod models;
|
||||
|
||||
pub(crate) const CACHE_ENTRY_TTL: Duration = Duration::from_secs(1200);
|
||||
@@ -1,154 +0,0 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::location::{LocationCache, LocationCacheItem};
|
||||
use crate::nym_nodes::location::NymNodeLocationCache;
|
||||
use crate::nym_nodes::CACHE_ENTRY_TTL;
|
||||
use nym_explorer_api_requests::{Location, PrettyDetailedGatewayBond};
|
||||
use nym_mixnet_contract_common::{Gateway, NodeId, NymNodeDetails};
|
||||
use nym_validator_client::models::NymNodeDescription;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
|
||||
pub(crate) struct NymNodesCache {
|
||||
pub(crate) valid_until: SystemTime,
|
||||
pub(crate) bonded_nym_nodes: HashMap<NodeId, NymNodeDetails>,
|
||||
pub(crate) described_nodes: HashMap<NodeId, NymNodeDescription>,
|
||||
}
|
||||
|
||||
impl NymNodesCache {
|
||||
fn new() -> Self {
|
||||
NymNodesCache {
|
||||
valid_until: SystemTime::now() - Duration::from_secs(60), // in the past
|
||||
bonded_nym_nodes: Default::default(),
|
||||
described_nodes: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
// fn is_valid(&self) -> bool {
|
||||
// self.valid_until >= SystemTime::now()
|
||||
// }
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ThreadSafeNymNodesCache {
|
||||
nymnodes: Arc<RwLock<NymNodesCache>>,
|
||||
locations: Arc<RwLock<LocationCache<NodeId>>>,
|
||||
}
|
||||
|
||||
impl ThreadSafeNymNodesCache {
|
||||
pub(crate) fn new() -> Self {
|
||||
ThreadSafeNymNodesCache {
|
||||
nymnodes: Arc::new(RwLock::new(NymNodesCache::new())),
|
||||
locations: Arc::new(RwLock::new(NymNodeLocationCache::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_with_location_cache(locations: NymNodeLocationCache) -> Self {
|
||||
ThreadSafeNymNodesCache {
|
||||
nymnodes: Arc::new(RwLock::new(NymNodesCache::new())),
|
||||
locations: Arc::new(RwLock::new(locations)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn is_location_valid(&self, node_id: NodeId) -> bool {
|
||||
self.locations
|
||||
.read()
|
||||
.await
|
||||
.get(&node_id)
|
||||
.map_or(false, |cache_item| {
|
||||
cache_item.valid_until > SystemTime::now()
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn get_bonded_nymnodes(
|
||||
&self,
|
||||
) -> RwLockReadGuard<HashMap<NodeId, NymNodeDetails>> {
|
||||
let guard = self.nymnodes.read().await;
|
||||
RwLockReadGuard::map(guard, |n| &n.bonded_nym_nodes)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_locations(&self) -> NymNodeLocationCache {
|
||||
self.locations.read().await.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn set_location(&self, node_id: NodeId, location: Option<Location>) {
|
||||
// cache the location for this mix node so that it can be used when the mix node list is refreshed
|
||||
self.locations
|
||||
.write()
|
||||
.await
|
||||
.insert(node_id, LocationCacheItem::new_from_location(location));
|
||||
}
|
||||
|
||||
pub(crate) async fn update_cache(
|
||||
&self,
|
||||
all_bonds: Vec<NymNodeDetails>,
|
||||
descriptions: Vec<NymNodeDescription>,
|
||||
) {
|
||||
let mut guard = self.nymnodes.write().await;
|
||||
guard.bonded_nym_nodes = all_bonds
|
||||
.into_iter()
|
||||
.map(|details| (details.node_id(), details))
|
||||
.collect();
|
||||
guard.described_nodes = descriptions
|
||||
.into_iter()
|
||||
.map(|description| (description.node_id, description))
|
||||
.collect();
|
||||
|
||||
guard.valid_until = SystemTime::now() + CACHE_ENTRY_TTL;
|
||||
}
|
||||
|
||||
pub(crate) async fn pretty_gateways(&self) -> Vec<PrettyDetailedGatewayBond> {
|
||||
let nodes_guard = self.nymnodes.read().await;
|
||||
let location_guard = self.locations.read().await;
|
||||
|
||||
let mut pretty_gateways = vec![];
|
||||
|
||||
for (node_id, native_nymnode) in &nodes_guard.bonded_nym_nodes {
|
||||
let Some(description) = nodes_guard.described_nodes.get(node_id) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
if description.description.declared_role.entry {
|
||||
let location = location_guard.get(node_id);
|
||||
let bond = &native_nymnode.bond_information;
|
||||
|
||||
pretty_gateways.push(PrettyDetailedGatewayBond {
|
||||
pledge_amount: bond.original_pledge.clone(),
|
||||
owner: bond.owner.clone(),
|
||||
block_height: bond.bonding_height,
|
||||
gateway: Gateway {
|
||||
host: bond.node.host.clone(),
|
||||
mix_port: description.description.mix_port(),
|
||||
clients_port: description.description.mixnet_websockets.ws_port,
|
||||
location: description
|
||||
.description
|
||||
.auxiliary_details
|
||||
.location
|
||||
.as_ref()
|
||||
.map(|l| l.to_string())
|
||||
.unwrap_or_default(),
|
||||
sphinx_key: description
|
||||
.description
|
||||
.host_information
|
||||
.keys
|
||||
.x25519
|
||||
.to_base58_string(),
|
||||
identity_key: bond.node.identity_key.clone(),
|
||||
version: description
|
||||
.description
|
||||
.build_information
|
||||
.build_version
|
||||
.clone(),
|
||||
},
|
||||
proxy: None,
|
||||
location: location.and_then(|l| l.location.clone()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pretty_gateways
|
||||
}
|
||||
}
|
||||
@@ -18,8 +18,6 @@ use crate::gateways::models::ThreadsafeGatewayCache;
|
||||
use crate::mix_node::models::ThreadsafeMixNodeCache;
|
||||
use crate::mix_nodes::location::MixnodeLocationCache;
|
||||
use crate::mix_nodes::models::ThreadsafeMixNodesCache;
|
||||
use crate::nym_nodes::location::NymNodeLocationCache;
|
||||
use crate::nym_nodes::models::ThreadSafeNymNodesCache;
|
||||
use crate::ping::models::ThreadsafePingCache;
|
||||
use crate::validators::models::ThreadsafeValidatorCache;
|
||||
|
||||
@@ -32,7 +30,6 @@ pub struct ExplorerApiState {
|
||||
pub(crate) gateways: ThreadsafeGatewayCache,
|
||||
pub(crate) mixnode: ThreadsafeMixNodeCache,
|
||||
pub(crate) mixnodes: ThreadsafeMixNodesCache,
|
||||
pub(crate) nymnodes: ThreadSafeNymNodesCache,
|
||||
pub(crate) ping: ThreadsafePingCache,
|
||||
pub(crate) validators: ThreadsafeValidatorCache,
|
||||
pub(crate) geo_ip: ThreadsafeGeoIp,
|
||||
@@ -52,7 +49,6 @@ pub struct ExplorerApiStateOnDisk {
|
||||
pub(crate) country_node_distribution: CountryNodesDistribution,
|
||||
pub(crate) mixnode_location_cache: MixnodeLocationCache,
|
||||
pub(crate) gateway_location_cache: GatewayLocationCache,
|
||||
pub(crate) nymnode_location_cache: NymNodeLocationCache,
|
||||
pub(crate) as_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
@@ -89,9 +85,6 @@ impl ExplorerApiStateContext {
|
||||
mixnodes: ThreadsafeMixNodesCache::new_with_location_cache(
|
||||
state.mixnode_location_cache,
|
||||
),
|
||||
nymnodes: ThreadSafeNymNodesCache::new_with_location_cache(
|
||||
state.nymnode_location_cache,
|
||||
),
|
||||
ping: ThreadsafePingCache::new(),
|
||||
validators: ThreadsafeValidatorCache::new(),
|
||||
validator_client: ThreadsafeValidatorClient::new(),
|
||||
@@ -108,7 +101,6 @@ impl ExplorerApiStateContext {
|
||||
gateways: ThreadsafeGatewayCache::new(),
|
||||
mixnode: ThreadsafeMixNodeCache::new(),
|
||||
mixnodes: ThreadsafeMixNodesCache::new(),
|
||||
nymnodes: ThreadSafeNymNodesCache::new(),
|
||||
ping: ThreadsafePingCache::new(),
|
||||
validators: ThreadsafeValidatorCache::new(),
|
||||
validator_client: ThreadsafeValidatorClient::new(),
|
||||
@@ -125,7 +117,6 @@ impl ExplorerApiStateContext {
|
||||
country_node_distribution: self.inner.country_node_distribution.get_all().await,
|
||||
mixnode_location_cache: self.inner.mixnodes.get_locations().await,
|
||||
gateway_location_cache: self.inner.gateways.get_locations().await,
|
||||
nymnode_location_cache: self.inner.nymnodes.get_locations().await,
|
||||
as_at: Utc::now(),
|
||||
};
|
||||
serde_json::to_writer(file, &state).expect("error writing state to disk");
|
||||
|
||||
@@ -1,24 +1,22 @@
|
||||
// Copyright 2022 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::mix_nodes::CACHE_REFRESH_RATE;
|
||||
use crate::state::ExplorerApiStateContext;
|
||||
use nym_mixnet_contract_common::{GatewayBond, NymNodeDetails};
|
||||
use nym_mixnet_contract_common::GatewayBond;
|
||||
use nym_task::TaskClient;
|
||||
use nym_validator_client::models::{MixNodeBondAnnotated, NymNodeDescription};
|
||||
use nym_validator_client::models::MixNodeBondAnnotated;
|
||||
use nym_validator_client::nyxd::error::NyxdError;
|
||||
use nym_validator_client::nyxd::{Paging, TendermintRpcClient, ValidatorResponse};
|
||||
use nym_validator_client::{QueryHttpRpcValidatorClient, ValidatorClientError};
|
||||
use std::future::Future;
|
||||
use tokio::time::MissedTickBehavior;
|
||||
|
||||
use crate::mix_nodes::CACHE_REFRESH_RATE;
|
||||
use crate::state::ExplorerApiStateContext;
|
||||
|
||||
pub(crate) struct ExplorerApiTasks {
|
||||
state: ExplorerApiStateContext,
|
||||
shutdown: TaskClient,
|
||||
}
|
||||
|
||||
// allow usage of deprecated methods here as we actually want to be explicitly querying for legacy data
|
||||
#[allow(deprecated)]
|
||||
impl ExplorerApiTasks {
|
||||
pub(crate) fn new(state: ExplorerApiStateContext, shutdown: TaskClient) -> Self {
|
||||
ExplorerApiTasks { state, shutdown }
|
||||
@@ -41,28 +39,6 @@ impl ExplorerApiTasks {
|
||||
bonds
|
||||
}
|
||||
|
||||
async fn retrieve_bonded_nymnodes(&self) -> Result<Vec<NymNodeDetails>, ValidatorClientError> {
|
||||
info!("About to retrieve all nymnode bonds...");
|
||||
self.state
|
||||
.inner
|
||||
.validator_client
|
||||
.0
|
||||
.get_all_cached_bonded_nym_nodes()
|
||||
.await
|
||||
}
|
||||
|
||||
async fn retrieve_node_descriptions(
|
||||
&self,
|
||||
) -> Result<Vec<NymNodeDescription>, ValidatorClientError> {
|
||||
info!("About to retrieve node descriptions...");
|
||||
self.state
|
||||
.inner
|
||||
.validator_client
|
||||
.0
|
||||
.get_all_cached_described_nodes()
|
||||
.await
|
||||
}
|
||||
|
||||
async fn retrieve_all_mixnodes(&self) -> Vec<MixNodeBondAnnotated> {
|
||||
info!("About to retrieve all mixnode bonds...");
|
||||
self.retrieve_mixnodes(
|
||||
@@ -154,33 +130,10 @@ impl ExplorerApiTasks {
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_nymnodes_cache(&self) {
|
||||
let nym_node_bonds = self.retrieve_bonded_nymnodes().await.unwrap_or_else(|err| {
|
||||
error!("failed to retrieve nym node bonds: {err}");
|
||||
Vec::new()
|
||||
});
|
||||
|
||||
let all_descriptions = self
|
||||
.retrieve_node_descriptions()
|
||||
.await
|
||||
.unwrap_or_else(|err| {
|
||||
error!("failed to retrieve node descriptions: {err}");
|
||||
Vec::new()
|
||||
});
|
||||
|
||||
self.state
|
||||
.inner
|
||||
.nymnodes
|
||||
.update_cache(nym_node_bonds, all_descriptions)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) fn start(mut self) {
|
||||
info!("Spawning mix nodes task runner...");
|
||||
tokio::spawn(async move {
|
||||
let mut interval_timer = tokio::time::interval(CACHE_REFRESH_RATE);
|
||||
interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
_ = interval_timer.tick() => {
|
||||
@@ -194,10 +147,6 @@ impl ExplorerApiTasks {
|
||||
|
||||
info!("Updating mix node cache...");
|
||||
self.update_mixnode_cache().await;
|
||||
|
||||
info!("Updating nymnode cache...");
|
||||
self.update_nymnodes_cache().await;
|
||||
info!("Done");
|
||||
}
|
||||
_ = self.shutdown.recv() => {
|
||||
trace!("Listener: Received shutdown");
|
||||
|
||||
@@ -102,6 +102,7 @@ export const isLessThan = (a: number, b: number) => a < b;
|
||||
*/
|
||||
|
||||
export const isBalanceEnough = (fee: string, tx: string = '0', balance: string = '0') => {
|
||||
console.log('balance', balance, fee, tx);
|
||||
try {
|
||||
return Big(balance).gte(Big(fee).plus(Big(tx)));
|
||||
} catch (e) {
|
||||
|
||||
+1
-2
@@ -69,6 +69,7 @@ nym-credentials-interface = { path = "../common/credentials-interface" }
|
||||
nym-credential-verification = { path = "../common/credential-verification" }
|
||||
nym-crypto = { path = "../common/crypto" }
|
||||
nym-gateway-storage = { path = "../common/gateway-storage" }
|
||||
nym-gateway-stats-storage = { path = "../common/gateway-stats-storage" }
|
||||
nym-gateway-requests = { path = "../common/gateway-requests" }
|
||||
nym-mixnet-client = { path = "../common/client-libs/mixnet-client" }
|
||||
nym-mixnode-common = { path = "../common/mixnode-common" }
|
||||
@@ -76,11 +77,9 @@ nym-network-defaults = { path = "../common/network-defaults" }
|
||||
nym-network-requester = { path = "../service-providers/network-requester" }
|
||||
nym-node-http-api = { path = "../nym-node/nym-node-http-api" }
|
||||
nym-pemstore = { path = "../common/pemstore" }
|
||||
nym-sdk = { path = "../sdk/rust/nym-sdk" }
|
||||
nym-sphinx = { path = "../common/nymsphinx" }
|
||||
nym-statistics-common = { path = "../common/statistics" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-topology = { path = "../common/topology" }
|
||||
nym-types = { path = "../common/types" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-ip-packet-router = { path = "../service-providers/ip-packet-router" }
|
||||
|
||||
@@ -12,6 +12,7 @@ pub const DEFAULT_PRIVATE_SPHINX_KEY_FILENAME: &str = "private_sphinx.pem";
|
||||
pub const DEFAULT_PUBLIC_SPHINX_KEY_FILENAME: &str = "public_sphinx.pem";
|
||||
|
||||
pub const DEFAULT_CLIENTS_STORAGE_FILENAME: &str = "db.sqlite";
|
||||
pub const DEFAULT_STATS_STORAGE_FILENAME: &str = "stats.sqlite";
|
||||
|
||||
pub const DEFAULT_NETWORK_REQUESTER_CONFIG_FILENAME: &str = "network_requester_config.toml";
|
||||
pub const DEFAULT_NETWORK_REQUESTER_DATA_DIR: &str = "network-requester-data";
|
||||
@@ -39,6 +40,9 @@ pub struct GatewayPaths {
|
||||
#[serde(alias = "persistent_storage")]
|
||||
pub clients_storage: PathBuf,
|
||||
|
||||
/// Path to sqlite database containing all persistent stats data.
|
||||
pub stats_storage: PathBuf,
|
||||
|
||||
/// Path to the configuration of the embedded network requester.
|
||||
#[serde(deserialize_with = "de_maybe_stringified")]
|
||||
pub network_requester_config: Option<PathBuf>,
|
||||
@@ -54,7 +58,9 @@ impl GatewayPaths {
|
||||
pub fn new_default<P: AsRef<Path>>(id: P) -> Self {
|
||||
GatewayPaths {
|
||||
keys: KeysPaths::new_default(id.as_ref()),
|
||||
clients_storage: default_data_directory(id).join(DEFAULT_CLIENTS_STORAGE_FILENAME),
|
||||
clients_storage: default_data_directory(id.as_ref())
|
||||
.join(DEFAULT_CLIENTS_STORAGE_FILENAME),
|
||||
stats_storage: default_data_directory(id).join(DEFAULT_STATS_STORAGE_FILENAME),
|
||||
// node_description: default_config_filepath(id).join(DEFAULT_DESCRIPTION_FILENAME),
|
||||
network_requester_config: None,
|
||||
ip_packet_router_config: None,
|
||||
@@ -70,6 +76,7 @@ impl GatewayPaths {
|
||||
public_sphinx_key_file: Default::default(),
|
||||
},
|
||||
clients_storage: Default::default(),
|
||||
stats_storage: Default::default(),
|
||||
network_requester_config: None,
|
||||
ip_packet_router_config: None,
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_authenticator::error::AuthenticatorError;
|
||||
use nym_gateway_stats_storage::error::StatsStorageError;
|
||||
use nym_gateway_storage::error::StorageError;
|
||||
use nym_ip_packet_router::error::IpPacketRouterError;
|
||||
use nym_network_requester::error::{ClientCoreError, NetworkRequesterError};
|
||||
@@ -115,6 +116,12 @@ pub enum GatewayError {
|
||||
source: StorageError,
|
||||
},
|
||||
|
||||
#[error("stats storage failure: {source}")]
|
||||
StatsStorageError {
|
||||
#[from]
|
||||
source: StatsStorageError,
|
||||
},
|
||||
|
||||
#[error("Path to network requester configuration file hasn't been specified. Perhaps try to run `setup-network-requester`?")]
|
||||
UnspecifiedNetworkRequesterConfig,
|
||||
|
||||
|
||||
+11
-60
@@ -3,18 +3,14 @@
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::error::GatewayError;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_gateway_stats_storage::PersistentStatsStorage;
|
||||
use nym_gateway_storage::PersistentStorage;
|
||||
use nym_pemstore::traits::PemStorableKeyPair;
|
||||
use nym_pemstore::KeyPairPath;
|
||||
use nym_sdk::{NymApiTopologyProvider, NymApiTopologyProviderConfig, UserAgent};
|
||||
use nym_topology::{gateway, NymTopology, TopologyProvider};
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::debug;
|
||||
use url::Url;
|
||||
|
||||
pub async fn load_network_requester_config<P: AsRef<Path>>(
|
||||
id: &str,
|
||||
@@ -79,6 +75,14 @@ pub(crate) async fn initialise_main_storage(
|
||||
Ok(PersistentStorage::init(path, retrieval_limit).await?)
|
||||
}
|
||||
|
||||
pub(crate) async fn initialise_stats_storage(
|
||||
config: &Config,
|
||||
) -> Result<PersistentStatsStorage, GatewayError> {
|
||||
let path = &config.storage_paths.stats_storage;
|
||||
|
||||
Ok(PersistentStatsStorage::init(path).await?)
|
||||
}
|
||||
|
||||
pub fn load_keypair<T: PemStorableKeyPair>(
|
||||
paths: KeyPairPath,
|
||||
name: impl Into<String>,
|
||||
@@ -98,56 +102,3 @@ pub(crate) fn load_sphinx_keys(config: &Config) -> Result<encryption::KeyPair, G
|
||||
);
|
||||
load_keypair(sphinx_paths, "gateway sphinx")
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GatewayTopologyProvider {
|
||||
inner: Arc<Mutex<GatewayTopologyProviderInner>>,
|
||||
}
|
||||
|
||||
impl GatewayTopologyProvider {
|
||||
pub fn new(
|
||||
gateway_node: gateway::LegacyNode,
|
||||
user_agent: UserAgent,
|
||||
nym_api_url: Vec<Url>,
|
||||
) -> GatewayTopologyProvider {
|
||||
GatewayTopologyProvider {
|
||||
inner: Arc::new(Mutex::new(GatewayTopologyProviderInner {
|
||||
inner: NymApiTopologyProvider::new(
|
||||
NymApiTopologyProviderConfig {
|
||||
min_mixnode_performance: 50,
|
||||
min_gateway_performance: 0,
|
||||
},
|
||||
nym_api_url,
|
||||
env!("CARGO_PKG_VERSION").to_string(),
|
||||
Some(user_agent),
|
||||
),
|
||||
gateway_node,
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct GatewayTopologyProviderInner {
|
||||
inner: NymApiTopologyProvider,
|
||||
gateway_node: gateway::LegacyNode,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TopologyProvider for GatewayTopologyProvider {
|
||||
async fn get_new_topology(&mut self) -> Option<NymTopology> {
|
||||
let mut guard = self.inner.lock().await;
|
||||
match guard.inner.get_new_topology().await {
|
||||
None => None,
|
||||
Some(mut base) => {
|
||||
if !base.gateway_exists(&guard.gateway_node.identity_key) {
|
||||
debug!(
|
||||
"{} didn't exist in topology. inserting it.",
|
||||
guard.gateway_node.identity_key
|
||||
);
|
||||
base.insert_gateway(guard.gateway_node.clone());
|
||||
}
|
||||
Some(base)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+28
-67
@@ -13,11 +13,10 @@ use crate::node::client_handling::active_clients::ActiveClientsStore;
|
||||
use crate::node::client_handling::embedded_clients::{LocalEmbeddedClientHandle, MessageRouter};
|
||||
use crate::node::client_handling::websocket;
|
||||
use crate::node::helpers::{
|
||||
initialise_main_storage, load_network_requester_config, GatewayTopologyProvider,
|
||||
initialise_main_storage, initialise_stats_storage, load_network_requester_config,
|
||||
};
|
||||
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use nym_bin_common::bin_info;
|
||||
use nym_credential_verification::ecash::{
|
||||
credential_sender::CredentialHandlerConfig, EcashManager,
|
||||
};
|
||||
@@ -28,15 +27,13 @@ use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilte
|
||||
use nym_node_http_api::state::metrics::SharedSessionStats;
|
||||
use nym_statistics_common::events::{self, StatsEventSender};
|
||||
use nym_task::{TaskClient, TaskHandle, TaskManager};
|
||||
use nym_topology::NetworkAddress;
|
||||
use nym_types::gateway::GatewayNodeDetailsResponse;
|
||||
use nym_validator_client::client::NodeId;
|
||||
use nym_validator_client::nyxd::{Coin, CosmWasmClient};
|
||||
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient};
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use statistics::GatewayStatisticsCollector;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
@@ -46,6 +43,7 @@ pub(crate) mod helpers;
|
||||
pub(crate) mod mixnet_handling;
|
||||
pub(crate) mod statistics;
|
||||
|
||||
pub use nym_gateway_stats_storage::PersistentStatsStorage;
|
||||
pub use nym_gateway_storage::{PersistentStorage, Storage};
|
||||
|
||||
// TODO: should this struct live here?
|
||||
@@ -101,6 +99,8 @@ pub async fn create_gateway(
|
||||
|
||||
let storage = initialise_main_storage(&config).await?;
|
||||
|
||||
let stats_storage = initialise_stats_storage(&config).await?;
|
||||
|
||||
let nr_opts = network_requester_config.map(|config| LocalNetworkRequesterOpts {
|
||||
config: config.clone(),
|
||||
custom_mixnet_path: custom_mixnet.clone(),
|
||||
@@ -111,7 +111,7 @@ pub async fn create_gateway(
|
||||
custom_mixnet_path: custom_mixnet.clone(),
|
||||
});
|
||||
|
||||
Gateway::new(config, nr_opts, ip_opts, storage)
|
||||
Gateway::new(config, nr_opts, ip_opts, storage, stats_storage)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -152,7 +152,9 @@ pub struct Gateway<St = PersistentStorage> {
|
||||
/// x25519 keypair used for Diffie-Hellman. Currently only used for sphinx key derivation.
|
||||
sphinx_keypair: Arc<encryption::KeyPair>,
|
||||
|
||||
storage: St,
|
||||
client_storage: St,
|
||||
|
||||
stats_storage: PersistentStatsStorage,
|
||||
|
||||
wireguard_data: Option<nym_wireguard::WireguardData>,
|
||||
|
||||
@@ -168,10 +170,12 @@ impl<St> Gateway<St> {
|
||||
config: Config,
|
||||
network_requester_opts: Option<LocalNetworkRequesterOpts>,
|
||||
ip_packet_router_opts: Option<LocalIpPacketRouterOpts>,
|
||||
storage: St,
|
||||
client_storage: St,
|
||||
stats_storage: PersistentStatsStorage,
|
||||
) -> Result<Self, GatewayError> {
|
||||
Ok(Gateway {
|
||||
storage,
|
||||
client_storage,
|
||||
stats_storage,
|
||||
identity_keypair: Arc::new(load_identity_keys(&config)?),
|
||||
sphinx_keypair: Arc::new(helpers::load_sphinx_keys(&config)?),
|
||||
config,
|
||||
@@ -184,7 +188,7 @@ impl<St> Gateway<St> {
|
||||
task_client: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_loaded(
|
||||
config: Config,
|
||||
network_requester_opts: Option<LocalNetworkRequesterOpts>,
|
||||
@@ -192,7 +196,8 @@ impl<St> Gateway<St> {
|
||||
authenticator_opts: Option<LocalAuthenticatorOpts>,
|
||||
identity_keypair: Arc<identity::KeyPair>,
|
||||
sphinx_keypair: Arc<encryption::KeyPair>,
|
||||
storage: St,
|
||||
client_storage: St,
|
||||
stats_storage: PersistentStatsStorage,
|
||||
) -> Self {
|
||||
Gateway {
|
||||
config,
|
||||
@@ -201,7 +206,8 @@ impl<St> Gateway<St> {
|
||||
authenticator_opts,
|
||||
identity_keypair,
|
||||
sphinx_keypair,
|
||||
storage,
|
||||
client_storage,
|
||||
stats_storage,
|
||||
wireguard_data: None,
|
||||
session_stats: None,
|
||||
run_http_server: true,
|
||||
@@ -230,39 +236,6 @@ impl<St> Gateway<St> {
|
||||
crate::helpers::node_details(&self.config).await
|
||||
}
|
||||
|
||||
fn gateway_topology_provider(&self) -> GatewayTopologyProvider {
|
||||
GatewayTopologyProvider::new(
|
||||
self.as_topology_node(),
|
||||
bin_info!().into(),
|
||||
self.config.gateway.nym_api_urls.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
fn as_topology_node(&self) -> nym_topology::gateway::LegacyNode {
|
||||
let ip = self
|
||||
.config
|
||||
.host
|
||||
.public_ips
|
||||
.first()
|
||||
.copied()
|
||||
.unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST));
|
||||
let mix_host = SocketAddr::new(ip, self.config.gateway.mix_port);
|
||||
|
||||
nym_topology::gateway::LegacyNode {
|
||||
// those fields are irrelevant for the purposes of routing so it's fine if they're inaccurate.
|
||||
// the only thing that matters is the identity key (and maybe version)
|
||||
node_id: NodeId::MAX,
|
||||
mix_host,
|
||||
host: NetworkAddress::IpAddr(ip),
|
||||
clients_ws_port: self.config.gateway.clients_port,
|
||||
clients_wss_port: self.config.gateway.clients_wss_port,
|
||||
sphinx_key: *self.sphinx_keypair.public_key(),
|
||||
|
||||
identity_key: *self.identity_keypair.public_key(),
|
||||
version: env!("CARGO_PKG_VERSION").into(),
|
||||
}
|
||||
}
|
||||
|
||||
fn start_mix_socket_listener(
|
||||
&self,
|
||||
ack_sender: MixForwardingSender,
|
||||
@@ -278,7 +251,7 @@ impl<St> Gateway<St> {
|
||||
|
||||
let connection_handler = ConnectionHandler::new(
|
||||
packet_processor,
|
||||
self.storage.clone(),
|
||||
self.client_storage.clone(),
|
||||
ack_sender,
|
||||
active_clients_store,
|
||||
);
|
||||
@@ -295,7 +268,6 @@ impl<St> Gateway<St> {
|
||||
async fn start_authenticator(
|
||||
&mut self,
|
||||
forwarding_channel: MixForwardingSender,
|
||||
topology_provider: GatewayTopologyProvider,
|
||||
shutdown: TaskClient,
|
||||
ecash_verifier: Arc<EcashManager<St>>,
|
||||
) -> Result<StartedAuthenticator, Box<dyn std::error::Error + Send + Sync>>
|
||||
@@ -314,7 +286,7 @@ impl<St> Gateway<St> {
|
||||
forwarding_channel,
|
||||
router_tx,
|
||||
);
|
||||
let all_peers = self.storage.get_all_wireguard_peers().await?;
|
||||
let all_peers = self.client_storage.get_all_wireguard_peers().await?;
|
||||
let used_private_network_ips = all_peers
|
||||
.iter()
|
||||
.cloned()
|
||||
@@ -343,7 +315,6 @@ impl<St> Gateway<St> {
|
||||
.with_shutdown(shutdown.fork("authenticator"))
|
||||
.with_wait_for_gateway(true)
|
||||
.with_minimum_gateway_performance(0)
|
||||
.with_custom_topology_provider(Box::new(topology_provider))
|
||||
.with_on_start(on_start_tx);
|
||||
|
||||
if let Some(custom_mixnet) = &opts.custom_mixnet_path {
|
||||
@@ -370,7 +341,7 @@ impl<St> Gateway<St> {
|
||||
.start_with_shutdown(router_shutdown);
|
||||
|
||||
let wg_api = nym_wireguard::start_wireguard(
|
||||
self.storage.clone(),
|
||||
self.client_storage.clone(),
|
||||
all_peers,
|
||||
shutdown,
|
||||
wireguard_data,
|
||||
@@ -392,7 +363,6 @@ impl<St> Gateway<St> {
|
||||
async fn start_authenticator(
|
||||
&self,
|
||||
_forwarding_channel: MixForwardingSender,
|
||||
_topology_provider: GatewayTopologyProvider,
|
||||
_shutdown: TaskClient,
|
||||
_ecash_verifier: Arc<EcashManager<St>>,
|
||||
) -> Result<StartedAuthenticator, Box<dyn std::error::Error + Send + Sync>> {
|
||||
@@ -418,7 +388,7 @@ impl<St> Gateway<St> {
|
||||
|
||||
let shared_state = websocket::CommonHandlerState {
|
||||
ecash_verifier,
|
||||
storage: self.storage.clone(),
|
||||
storage: self.client_storage.clone(),
|
||||
local_identity: Arc::clone(&self.identity_keypair),
|
||||
only_coconut_credentials: self.config.gateway.only_coconut_credentials,
|
||||
bandwidth_cfg: (&self.config).into(),
|
||||
@@ -456,7 +426,7 @@ impl<St> Gateway<St> {
|
||||
info!("Starting gateway stats collector...");
|
||||
|
||||
let (mut stats_collector, stats_event_sender) =
|
||||
GatewayStatisticsCollector::new(shared_session_stats);
|
||||
GatewayStatisticsCollector::new(shared_session_stats, self.stats_storage.clone());
|
||||
tokio::spawn(async move { stats_collector.run(shutdown).await });
|
||||
stats_event_sender
|
||||
}
|
||||
@@ -465,7 +435,6 @@ impl<St> Gateway<St> {
|
||||
async fn start_network_requester(
|
||||
&self,
|
||||
forwarding_channel: MixForwardingSender,
|
||||
topology_provider: GatewayTopologyProvider,
|
||||
shutdown: TaskClient,
|
||||
) -> Result<StartedNetworkRequester, GatewayError> {
|
||||
info!("Starting network requester...");
|
||||
@@ -493,7 +462,6 @@ impl<St> Gateway<St> {
|
||||
.with_custom_gateway_transceiver(Box::new(transceiver))
|
||||
.with_wait_for_gateway(true)
|
||||
.with_minimum_gateway_performance(0)
|
||||
.with_custom_topology_provider(Box::new(topology_provider))
|
||||
.with_on_start(on_start_tx);
|
||||
|
||||
if let Some(custom_mixnet) = &nr_opts.custom_mixnet_path {
|
||||
@@ -531,7 +499,6 @@ impl<St> Gateway<St> {
|
||||
async fn start_ip_packet_router(
|
||||
&self,
|
||||
forwarding_channel: MixForwardingSender,
|
||||
topology_provider: GatewayTopologyProvider,
|
||||
shutdown: TaskClient,
|
||||
) -> Result<LocalEmbeddedClientHandle, GatewayError> {
|
||||
info!("Starting IP packet provider...");
|
||||
@@ -560,7 +527,6 @@ impl<St> Gateway<St> {
|
||||
.with_custom_gateway_transceiver(Box::new(transceiver))
|
||||
.with_wait_for_gateway(true)
|
||||
.with_minimum_gateway_performance(0)
|
||||
.with_custom_topology_provider(Box::new(topology_provider))
|
||||
.with_on_start(on_start_tx);
|
||||
|
||||
if let Some(custom_mixnet) = &ip_opts.custom_mixnet_path {
|
||||
@@ -622,7 +588,7 @@ impl<St> Gateway<St> {
|
||||
// TODO: if anything, this should be getting data directly from the contract
|
||||
// as opposed to the validator API
|
||||
let validator_client = self.random_api_client()?;
|
||||
let existing_nodes = match validator_client.get_all_basic_nodes(None).await {
|
||||
let existing_nodes = match validator_client.get_cached_gateways().await {
|
||||
Ok(nodes) => nodes,
|
||||
Err(err) => {
|
||||
error!("failed to grab initial network gateways - {err}\n Please try to startup again in few minutes");
|
||||
@@ -630,9 +596,9 @@ impl<St> Gateway<St> {
|
||||
}
|
||||
};
|
||||
|
||||
Ok(existing_nodes
|
||||
.iter()
|
||||
.any(|node| &node.ed25519_identity_pubkey == self.identity_keypair.public_key()))
|
||||
Ok(existing_nodes.iter().any(|node| {
|
||||
node.gateway.identity_key == self.identity_keypair.public_key().to_base58_string()
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn run(mut self) -> Result<(), GatewayError>
|
||||
@@ -677,8 +643,6 @@ impl<St> Gateway<St> {
|
||||
shutdown.fork("statistics::GatewayStatisticsCollector"),
|
||||
);
|
||||
|
||||
let topology_provider = self.gateway_topology_provider();
|
||||
|
||||
let handler_config = CredentialHandlerConfig {
|
||||
revocation_bandwidth_penalty: self
|
||||
.config
|
||||
@@ -701,7 +665,7 @@ impl<St> Gateway<St> {
|
||||
nyxd_client,
|
||||
self.identity_keypair.public_key().to_bytes(),
|
||||
shutdown.fork("EcashVerifier"),
|
||||
self.storage.clone(),
|
||||
self.client_storage.clone(),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
@@ -727,7 +691,6 @@ impl<St> Gateway<St> {
|
||||
let embedded_nr = self
|
||||
.start_network_requester(
|
||||
mix_forwarding_channel.clone(),
|
||||
topology_provider.clone(),
|
||||
shutdown.fork("NetworkRequester"),
|
||||
)
|
||||
.await?;
|
||||
@@ -743,7 +706,6 @@ impl<St> Gateway<St> {
|
||||
let embedded_ip_sp = self
|
||||
.start_ip_packet_router(
|
||||
mix_forwarding_channel.clone(),
|
||||
topology_provider.clone(),
|
||||
shutdown.fork("ip_service_provider"),
|
||||
)
|
||||
.await?;
|
||||
@@ -756,7 +718,6 @@ impl<St> Gateway<St> {
|
||||
let embedded_auth = self
|
||||
.start_authenticator(
|
||||
mix_forwarding_channel,
|
||||
topology_provider,
|
||||
shutdown.fork("authenticator"),
|
||||
ecash_verifier,
|
||||
)
|
||||
|
||||
@@ -2,13 +2,14 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use futures::{channel::mpsc, StreamExt};
|
||||
use nym_gateway_stats_storage::PersistentStatsStorage;
|
||||
use nym_node_http_api::state::metrics::SharedSessionStats;
|
||||
use nym_statistics_common::events::{StatsEvent, StatsEventReceiver, StatsEventSender};
|
||||
use nym_task::TaskClient;
|
||||
use sessions::SessionStatsHandler;
|
||||
use std::time::Duration;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::trace;
|
||||
use tracing::{error, trace, warn};
|
||||
|
||||
pub mod sessions;
|
||||
|
||||
@@ -23,21 +24,38 @@ pub(crate) struct GatewayStatisticsCollector {
|
||||
impl GatewayStatisticsCollector {
|
||||
pub fn new(
|
||||
shared_session_stats: SharedSessionStats,
|
||||
stats_storage: PersistentStatsStorage,
|
||||
) -> (GatewayStatisticsCollector, StatsEventSender) {
|
||||
let (stats_event_tx, stats_event_rx) = mpsc::unbounded();
|
||||
|
||||
let session_stats = SessionStatsHandler::new(shared_session_stats, stats_storage);
|
||||
let collector = GatewayStatisticsCollector {
|
||||
stats_event_rx,
|
||||
session_stats: SessionStatsHandler::new(shared_session_stats),
|
||||
session_stats,
|
||||
};
|
||||
(collector, stats_event_tx)
|
||||
}
|
||||
|
||||
async fn update_shared_state(&mut self, update_time: OffsetDateTime) {
|
||||
self.session_stats.update_shared_state(update_time).await;
|
||||
if let Err(e) = self
|
||||
.session_stats
|
||||
.maybe_update_shared_state(update_time)
|
||||
.await
|
||||
{
|
||||
error!("Failed to update session stats - {e}");
|
||||
}
|
||||
//here goes additionnal stats handler update
|
||||
}
|
||||
|
||||
async fn on_start(&mut self) {
|
||||
if let Err(e) = self.session_stats.on_start().await {
|
||||
error!("Failed to cleanup session stats handler - {e}");
|
||||
}
|
||||
//here goes additionnal stats handler start cleanup
|
||||
}
|
||||
|
||||
pub async fn run(&mut self, mut shutdown: TaskClient) {
|
||||
self.on_start().await;
|
||||
let mut update_interval = tokio::time::interval(STATISTICS_UPDATE_TIMER_INTERVAL);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
@@ -53,7 +71,10 @@ impl GatewayStatisticsCollector {
|
||||
Some(stat_event) = self.stats_event_rx.next() => {
|
||||
//dispatching event to proper handler
|
||||
match stat_event {
|
||||
StatsEvent::SessionStatsEvent(event) => self.session_stats.handle_event(event),
|
||||
StatsEvent::SessionStatsEvent(event) => {
|
||||
if let Err(e) = self.session_stats.handle_event(event).await{
|
||||
warn!("Session event handling error - {e}");
|
||||
}},
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
@@ -2,176 +2,158 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use nym_credentials_interface::TicketType;
|
||||
use nym_gateway_stats_storage::models::FinishedSession;
|
||||
use nym_gateway_stats_storage::PersistentStatsStorage;
|
||||
use nym_gateway_stats_storage::{error::StatsStorageError, models::ActiveSession};
|
||||
use nym_node_http_api::state::metrics::SharedSessionStats;
|
||||
use nym_sphinx::DestinationAddressBytes;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use time::{Date, Duration, OffsetDateTime};
|
||||
|
||||
use nym_statistics_common::events::SessionEvent;
|
||||
|
||||
const FINISHED_SESSIONS_CAP: usize = 1_000_000; //to be on the safe side of memory blowups until persistent storage
|
||||
|
||||
#[derive(PartialEq)]
|
||||
enum SessionType {
|
||||
Vpn,
|
||||
Mixnet,
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl SessionType {
|
||||
fn to_string(&self) -> &str {
|
||||
match self {
|
||||
Self::Vpn => "vpn",
|
||||
Self::Mixnet => "mixnet",
|
||||
Self::Unknown => "unknown",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TicketType> for SessionType {
|
||||
fn from(value: TicketType) -> Self {
|
||||
match value {
|
||||
TicketType::V1MixnetEntry => Self::Mixnet,
|
||||
TicketType::V1MixnetExit => Self::Mixnet,
|
||||
TicketType::V1WireguardEntry => Self::Vpn,
|
||||
TicketType::V1WireguardExit => Self::Vpn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct FinishedSession {
|
||||
duration: Duration,
|
||||
typ: SessionType,
|
||||
}
|
||||
|
||||
impl FinishedSession {
|
||||
fn serialize(&self) -> (u64, String) {
|
||||
(
|
||||
self.duration.whole_milliseconds() as u64, //we are sure that it fits in a u64, see `fn end_at`
|
||||
self.typ.to_string().into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
struct ActiveSession {
|
||||
start: OffsetDateTime,
|
||||
typ: SessionType,
|
||||
}
|
||||
|
||||
impl ActiveSession {
|
||||
fn new(start_time: OffsetDateTime) -> Self {
|
||||
ActiveSession {
|
||||
start: start_time,
|
||||
typ: SessionType::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
fn set_type(&mut self, ticket_type: TicketType) {
|
||||
self.typ = ticket_type.into();
|
||||
}
|
||||
|
||||
fn end_at(self, stop_time: OffsetDateTime) -> Option<FinishedSession> {
|
||||
let session_duration = stop_time - self.start;
|
||||
//ensure duration is positive to fit in a u64
|
||||
//u64::max milliseconds is 500k millenia so no overflow issue
|
||||
if session_duration > Duration::ZERO {
|
||||
Some(FinishedSession {
|
||||
duration: session_duration,
|
||||
typ: self.typ,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct SessionStatsHandler {
|
||||
last_update_day: Date,
|
||||
storage: PersistentStatsStorage,
|
||||
current_day: Date,
|
||||
|
||||
shared_session_stats: SharedSessionStats,
|
||||
active_sessions: HashMap<DestinationAddressBytes, ActiveSession>,
|
||||
unique_users: HashSet<DestinationAddressBytes>,
|
||||
sessions_started: u32,
|
||||
finished_sessions: Vec<FinishedSession>,
|
||||
}
|
||||
|
||||
impl SessionStatsHandler {
|
||||
pub fn new(shared_session_stats: SharedSessionStats) -> Self {
|
||||
pub fn new(shared_session_stats: SharedSessionStats, storage: PersistentStatsStorage) -> Self {
|
||||
SessionStatsHandler {
|
||||
last_update_day: OffsetDateTime::now_utc().date(),
|
||||
storage,
|
||||
current_day: OffsetDateTime::now_utc().date(),
|
||||
shared_session_stats,
|
||||
active_sessions: Default::default(),
|
||||
unique_users: Default::default(),
|
||||
sessions_started: 0,
|
||||
finished_sessions: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn handle_event(&mut self, event: SessionEvent) {
|
||||
pub(crate) async fn handle_event(
|
||||
&mut self,
|
||||
event: SessionEvent,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
match event {
|
||||
SessionEvent::SessionStart { start_time, client } => {
|
||||
self.handle_session_start(start_time, client);
|
||||
self.handle_session_start(start_time, client).await
|
||||
}
|
||||
|
||||
SessionEvent::SessionStop { stop_time, client } => {
|
||||
self.handle_session_stop(stop_time, client);
|
||||
self.handle_session_stop(stop_time, client).await
|
||||
}
|
||||
|
||||
SessionEvent::EcashTicket {
|
||||
ticket_type,
|
||||
client,
|
||||
} => self.handle_ecash_ticket(ticket_type, client),
|
||||
} => self.handle_ecash_ticket(ticket_type, client).await,
|
||||
}
|
||||
}
|
||||
fn handle_session_start(
|
||||
async fn handle_session_start(
|
||||
&mut self,
|
||||
start_time: OffsetDateTime,
|
||||
client: DestinationAddressBytes,
|
||||
) {
|
||||
self.sessions_started += 1;
|
||||
self.unique_users.insert(client);
|
||||
self.active_sessions
|
||||
.insert(client, ActiveSession::new(start_time));
|
||||
}
|
||||
fn handle_session_stop(&mut self, stop_time: OffsetDateTime, client: DestinationAddressBytes) {
|
||||
if let Some(session) = self.active_sessions.remove(&client) {
|
||||
if let Some(finished_session) = session.end_at(stop_time) {
|
||||
if self.finished_sessions.len() < FINISHED_SESSIONS_CAP {
|
||||
self.finished_sessions.push(finished_session);
|
||||
}
|
||||
}
|
||||
}
|
||||
) -> Result<(), StatsStorageError> {
|
||||
self.storage
|
||||
.insert_unique_user(self.current_day, client.as_base58_string())
|
||||
.await?;
|
||||
self.storage
|
||||
.insert_active_session(client, ActiveSession::new(start_time))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_ecash_ticket(&mut self, ticket_type: TicketType, client: DestinationAddressBytes) {
|
||||
if let Some(active_session) = self.active_sessions.get_mut(&client) {
|
||||
if active_session.typ == SessionType::Unknown {
|
||||
active_session.set_type(ticket_type);
|
||||
async fn handle_session_stop(
|
||||
&mut self,
|
||||
stop_time: OffsetDateTime,
|
||||
client: DestinationAddressBytes,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
if let Some(session) = self.storage.get_active_session(client).await? {
|
||||
if let Some(finished_session) = session.end_at(stop_time) {
|
||||
self.storage
|
||||
.insert_finished_session(self.current_day, finished_session)
|
||||
.await?;
|
||||
self.storage.delete_active_session(client).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_ecash_ticket(
|
||||
&mut self,
|
||||
ticket_type: TicketType,
|
||||
client: DestinationAddressBytes,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
self.storage
|
||||
.update_active_session_type(client, ticket_type.into())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn on_start(&mut self) -> Result<(), StatsStorageError> {
|
||||
let yesterday = OffsetDateTime::now_utc().date() - Duration::DAY;
|
||||
//publish yesterday's data if any
|
||||
self.publish_stats(yesterday).await?;
|
||||
//store "active" sessions as duration 0
|
||||
for active_session in self.storage.get_all_active_sessions().await? {
|
||||
self.storage
|
||||
.insert_finished_session(
|
||||
self.current_day,
|
||||
FinishedSession {
|
||||
duration: Duration::ZERO,
|
||||
typ: active_session.typ,
|
||||
},
|
||||
)
|
||||
.await?
|
||||
}
|
||||
//cleanup active sessions
|
||||
self.storage.cleanup_active_sessions().await?;
|
||||
|
||||
//delete old entries
|
||||
self.delete_old_stats(yesterday - Duration::DAY).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//update shared state once a day has passed, with data from the previous day
|
||||
pub(crate) async fn update_shared_state(&mut self, update_time: OffsetDateTime) {
|
||||
let update_date = update_time.date();
|
||||
if update_date != self.last_update_day {
|
||||
{
|
||||
let mut shared_state = self.shared_session_stats.write().await;
|
||||
shared_state.update_time = self.last_update_day;
|
||||
shared_state.unique_active_users = self.unique_users.len() as u32;
|
||||
shared_state.session_started = self.sessions_started;
|
||||
shared_state.sessions = self
|
||||
.finished_sessions
|
||||
.iter()
|
||||
.map(|s| s.serialize())
|
||||
.collect();
|
||||
}
|
||||
self.reset_stats(update_date);
|
||||
async fn publish_stats(&mut self, stats_date: Date) -> Result<(), StatsStorageError> {
|
||||
let finished_sessions = self.storage.get_finished_sessions(stats_date).await?;
|
||||
let user_count = self.storage.get_unique_users_count(stats_date).await?;
|
||||
let session_started = self.storage.get_started_sessions_count(stats_date).await? as u32;
|
||||
{
|
||||
let mut shared_state = self.shared_session_stats.write().await;
|
||||
shared_state.update_time = stats_date;
|
||||
shared_state.unique_active_users = user_count as u32;
|
||||
shared_state.session_started = session_started;
|
||||
shared_state.sessions = finished_sessions.iter().map(|s| s.serialize()).collect();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) async fn maybe_update_shared_state(
|
||||
&mut self,
|
||||
update_time: OffsetDateTime,
|
||||
) -> Result<(), StatsStorageError> {
|
||||
let update_date = update_time.date();
|
||||
if update_date != self.current_day {
|
||||
self.publish_stats(self.current_day).await?;
|
||||
self.delete_old_stats(self.current_day - Duration::DAY)
|
||||
.await?;
|
||||
self.reset_stats(update_date).await?;
|
||||
self.current_day = update_date;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reset_stats(&mut self, reset_day: Date) {
|
||||
self.last_update_day = reset_day;
|
||||
self.unique_users = self.active_sessions.keys().copied().collect();
|
||||
self.finished_sessions = Default::default();
|
||||
self.sessions_started = 0;
|
||||
async fn reset_stats(&mut self, reset_day: Date) -> Result<(), StatsStorageError> {
|
||||
//active users reset
|
||||
let new_active_users = self.storage.get_active_users().await?;
|
||||
for user in new_active_users {
|
||||
self.storage.insert_unique_user(reset_day, user).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_old_stats(&mut self, delete_before: Date) -> Result<(), StatsStorageError> {
|
||||
self.storage.delete_finished_sessions(delete_before).await?;
|
||||
self.storage.delete_unique_users(delete_before).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,7 +234,7 @@ impl MixNode {
|
||||
// TODO: if anything, this should be getting data directly from the contract
|
||||
// as opposed to the validator API
|
||||
let validator_client = self.random_api_client();
|
||||
let existing_nodes = match validator_client.get_all_basic_nodes(None).await {
|
||||
let existing_nodes = match validator_client.get_cached_mixnodes().await {
|
||||
Ok(nodes) => nodes,
|
||||
Err(err) => {
|
||||
error!(
|
||||
@@ -245,9 +245,10 @@ impl MixNode {
|
||||
}
|
||||
};
|
||||
|
||||
existing_nodes
|
||||
.iter()
|
||||
.any(|node| &node.ed25519_identity_pubkey == self.identity_keypair.public_key())
|
||||
existing_nodes.iter().any(|node| {
|
||||
node.bond_information.mix_node.identity_key
|
||||
== self.identity_keypair.public_key().to_base58_string()
|
||||
})
|
||||
}
|
||||
|
||||
async fn wait_for_interrupt(&self, shutdown: TaskHandle) {
|
||||
|
||||
@@ -2,8 +2,10 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use cosmwasm_std::Decimal;
|
||||
use nym_mixnet_contract_common::mixnode::LegacyPendingMixNodeChanges;
|
||||
use nym_mixnet_contract_common::{GatewayBond, LegacyMixLayer, MixNodeBond, NodeId, NodeRewarding};
|
||||
use nym_mixnet_contract_common::mixnode::PendingMixNodeChanges;
|
||||
use nym_mixnet_contract_common::{
|
||||
GatewayBond, LegacyMixLayer, MixNodeBond, MixNodeDetails, NodeId, NodeRewarding,
|
||||
};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::ops::Deref;
|
||||
@@ -62,7 +64,7 @@ pub struct LegacyMixNodeDetailsWithLayer {
|
||||
|
||||
/// Adjustments to the mixnode that are ought to happen during future epoch transitions.
|
||||
#[serde(default)]
|
||||
pub pending_changes: LegacyPendingMixNodeChanges,
|
||||
pub pending_changes: PendingMixNodeChanges,
|
||||
}
|
||||
|
||||
impl LegacyMixNodeDetailsWithLayer {
|
||||
@@ -78,3 +80,13 @@ impl LegacyMixNodeDetailsWithLayer {
|
||||
self.bond_information.is_unbonding
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LegacyMixNodeDetailsWithLayer> for MixNodeDetails {
|
||||
fn from(value: LegacyMixNodeDetailsWithLayer) -> Self {
|
||||
MixNodeDetails {
|
||||
bond_information: value.bond_information.into(),
|
||||
rewarding_details: value.rewarding_details,
|
||||
pending_changes: value.pending_changes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::helpers::unix_epoch;
|
||||
use crate::helpers::PlaceholderJsonSchemaImpl;
|
||||
use crate::legacy::{
|
||||
LegacyGatewayBondWithId, LegacyMixNodeBondWithLayer, LegacyMixNodeDetailsWithLayer,
|
||||
};
|
||||
@@ -17,7 +16,7 @@ use nym_crypto::asymmetric::x25519::{
|
||||
use nym_mixnet_contract_common::nym_node::Role;
|
||||
use nym_mixnet_contract_common::reward_params::{Performance, RewardingParams};
|
||||
use nym_mixnet_contract_common::rewarding::RewardEstimate;
|
||||
use nym_mixnet_contract_common::{GatewayBond, IdentityKey, Interval, MixNode, NodeId, Percent};
|
||||
use nym_mixnet_contract_common::{IdentityKey, Interval, MixNode, NodeId, Percent};
|
||||
use nym_network_defaults::{DEFAULT_MIX_LISTENING_PORT, DEFAULT_VERLOC_LISTENING_PORT};
|
||||
use nym_node_requests::api::v1::authenticator::models::Authenticator;
|
||||
use nym_node_requests::api::v1::gateway::models::Wireguard;
|
||||
@@ -139,48 +138,6 @@ pub struct NodePerformance {
|
||||
pub last_24h: Performance,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[cfg_attr(feature = "generate-ts", derive(ts_rs::TS))]
|
||||
#[cfg_attr(
|
||||
feature = "generate-ts",
|
||||
ts(export, export_to = "ts-packages/types/src/types/rust/DisplayRole.ts")
|
||||
)]
|
||||
pub enum DisplayRole {
|
||||
EntryGateway,
|
||||
Layer1,
|
||||
Layer2,
|
||||
Layer3,
|
||||
ExitGateway,
|
||||
Standby,
|
||||
}
|
||||
|
||||
impl From<Role> for DisplayRole {
|
||||
fn from(role: Role) -> Self {
|
||||
match role {
|
||||
Role::EntryGateway => DisplayRole::EntryGateway,
|
||||
Role::Layer1 => DisplayRole::Layer1,
|
||||
Role::Layer2 => DisplayRole::Layer2,
|
||||
Role::Layer3 => DisplayRole::Layer3,
|
||||
Role::ExitGateway => DisplayRole::ExitGateway,
|
||||
Role::Standby => DisplayRole::Standby,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DisplayRole> for Role {
|
||||
fn from(role: DisplayRole) -> Self {
|
||||
match role {
|
||||
DisplayRole::EntryGateway => Role::EntryGateway,
|
||||
DisplayRole::Layer1 => Role::Layer1,
|
||||
DisplayRole::Layer2 => Role::Layer2,
|
||||
DisplayRole::Layer3 => Role::Layer3,
|
||||
DisplayRole::ExitGateway => Role::ExitGateway,
|
||||
DisplayRole::Standby => Role::Standby,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// imo for now there's no point in exposing more than that,
|
||||
// nym-api shouldn't be calculating apy or stake saturation for you.
|
||||
// it should just return its own metrics (performance) and then you can do with it as you wish
|
||||
@@ -196,7 +153,7 @@ impl From<DisplayRole> for Role {
|
||||
pub struct NodeAnnotation {
|
||||
#[cfg_attr(feature = "generate-ts", ts(type = "string"))]
|
||||
pub last_24h_performance: Performance,
|
||||
pub current_role: Option<DisplayRole>,
|
||||
pub current_role: Option<Role>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, JsonSchema, ToSchema)]
|
||||
@@ -329,7 +286,7 @@ impl MixNodeBondAnnotated {
|
||||
.sphinx_key
|
||||
.parse()
|
||||
.map_err(|_| MalformedNodeBond::InvalidX25519Key)?,
|
||||
role,
|
||||
epoch_role: role,
|
||||
supported_roles: DeclaredRoles {
|
||||
mixnode: true,
|
||||
entry: false,
|
||||
@@ -388,7 +345,7 @@ impl GatewayBondAnnotated {
|
||||
.sphinx_key
|
||||
.parse()
|
||||
.map_err(|_| MalformedNodeBond::InvalidX25519Key)?,
|
||||
role,
|
||||
epoch_role: role,
|
||||
supported_roles: DeclaredRoles {
|
||||
mixnode: false,
|
||||
entry: true,
|
||||
@@ -853,10 +810,6 @@ impl NymNodeDescription {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ed25519_identity_key(&self) -> ed25519::PublicKey {
|
||||
self.description.host_information.keys.ed25519
|
||||
}
|
||||
|
||||
pub fn to_skimmed_node(&self, role: NodeRole, performance: Performance) -> SkimmedNode {
|
||||
let keys = &self.description.host_information.keys;
|
||||
let entry = if self.description.declared_role.entry {
|
||||
@@ -874,7 +827,7 @@ impl NymNodeDescription {
|
||||
// we can't use the declared roles, we have to take whatever was provided in the contract.
|
||||
// why? say this node COULD operate as an exit, but it might be the case the contract decided
|
||||
// to assign it an ENTRY role only. we have to use that one instead.
|
||||
role,
|
||||
epoch_role: role,
|
||||
supported_roles: self.description.declared_role,
|
||||
entry,
|
||||
performance,
|
||||
@@ -898,12 +851,6 @@ pub enum DescribedNodeType {
|
||||
NymNode,
|
||||
}
|
||||
|
||||
impl DescribedNodeType {
|
||||
pub fn is_nym_node(&self) -> bool {
|
||||
matches!(self, DescribedNodeType::NymNode)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
|
||||
#[cfg_attr(feature = "generate-ts", derive(ts_rs::TS))]
|
||||
#[cfg_attr(
|
||||
@@ -988,14 +935,14 @@ impl NymNodeData {
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
|
||||
pub struct LegacyDescribedGateway {
|
||||
pub bond: GatewayBond,
|
||||
pub bond: LegacyGatewayBondWithId,
|
||||
pub self_described: Option<NymNodeData>,
|
||||
}
|
||||
|
||||
impl From<LegacyGatewayBondWithId> for LegacyDescribedGateway {
|
||||
fn from(bond: LegacyGatewayBondWithId) -> Self {
|
||||
LegacyDescribedGateway {
|
||||
bond: bond.bond,
|
||||
bond,
|
||||
self_described: None,
|
||||
}
|
||||
}
|
||||
@@ -1144,67 +1091,6 @@ pub struct NoiseDetails {
|
||||
pub ip_addresses: Vec<IpAddr>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
|
||||
pub struct NodeRefreshBody {
|
||||
#[serde(with = "bs58_ed25519_pubkey")]
|
||||
#[schemars(with = "String")]
|
||||
pub node_identity: ed25519::PublicKey,
|
||||
|
||||
// a poor man's nonce
|
||||
pub request_timestamp: i64,
|
||||
|
||||
#[schemars(with = "PlaceholderJsonSchemaImpl")]
|
||||
pub signature: ed25519::Signature,
|
||||
}
|
||||
|
||||
impl NodeRefreshBody {
|
||||
pub fn plaintext(node_identity: ed25519::PublicKey, request_timestamp: i64) -> Vec<u8> {
|
||||
node_identity
|
||||
.to_bytes()
|
||||
.into_iter()
|
||||
.chain(request_timestamp.to_be_bytes())
|
||||
.chain(b"describe-cache-refresh-request".iter().copied())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn new(private_key: &ed25519::PrivateKey) -> Self {
|
||||
let node_identity = private_key.public_key();
|
||||
let request_timestamp = OffsetDateTime::now_utc().unix_timestamp();
|
||||
let signature = private_key.sign(Self::plaintext(node_identity, request_timestamp));
|
||||
NodeRefreshBody {
|
||||
node_identity,
|
||||
request_timestamp,
|
||||
signature,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn verify_signature(&self) -> bool {
|
||||
self.node_identity
|
||||
.verify(
|
||||
Self::plaintext(self.node_identity, self.request_timestamp),
|
||||
&self.signature,
|
||||
)
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
pub fn is_stale(&self) -> bool {
|
||||
let Ok(encoded) = OffsetDateTime::from_unix_timestamp(self.request_timestamp) else {
|
||||
return true;
|
||||
};
|
||||
let now = OffsetDateTime::now_utc();
|
||||
|
||||
if encoded > now {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (encoded + Duration::from_secs(30)) < now {
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -141,8 +141,8 @@ pub struct SkimmedNode {
|
||||
#[schemars(with = "String")]
|
||||
pub x25519_sphinx_pubkey: x25519::PublicKey,
|
||||
|
||||
#[serde(alias = "epoch_role")]
|
||||
pub role: NodeRole,
|
||||
#[serde(alias = "role")]
|
||||
pub epoch_role: NodeRole,
|
||||
|
||||
// needed for the purposes of sending appropriate test packets
|
||||
#[serde(default)]
|
||||
@@ -157,7 +157,7 @@ pub struct SkimmedNode {
|
||||
|
||||
impl SkimmedNode {
|
||||
pub fn get_mix_layer(&self) -> Option<u8> {
|
||||
match self.role {
|
||||
match self.epoch_role {
|
||||
NodeRole::Mixnode { layer } => Some(layer),
|
||||
_ => None,
|
||||
}
|
||||
|
||||
@@ -24,21 +24,21 @@ use utoipa::IntoParams;
|
||||
pub(crate) fn aggregation_routes(ecash_state: Arc<EcashState>) -> Router<AppState> {
|
||||
Router::new()
|
||||
.route(
|
||||
"/master-verification-key",
|
||||
"/master-verification-key:epoch_id",
|
||||
axum::routing::get({
|
||||
let ecash_state = Arc::clone(&ecash_state);
|
||||
|epoch_id| master_verification_key(epoch_id, ecash_state)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/aggregated-expiration-date-signatures",
|
||||
"/aggregated-expiration-date-signatures:expiration_date",
|
||||
axum::routing::get({
|
||||
let ecash_state = Arc::clone(&ecash_state);
|
||||
|expiration_date| expiration_date_signatures(expiration_date, ecash_state)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/aggregated-coin-indices-signatures",
|
||||
"/aggregated-coin-indices-signatures:epoch_id",
|
||||
axum::routing::get({
|
||||
let ecash_state = Arc::clone(&ecash_state);
|
||||
|epoch_id| coin_indices_signatures(epoch_id, ecash_state)
|
||||
@@ -52,7 +52,7 @@ pub(crate) fn aggregation_routes(ecash_state: Arc<EcashState>) -> Router<AppStat
|
||||
params(
|
||||
EpochIdParam
|
||||
),
|
||||
path = "/v1/ecash/master-verification-key",
|
||||
path = "/v1/ecash/master-verification-key/{epoch_id}",
|
||||
responses(
|
||||
(status = 200, body = VerificationKeyResponse)
|
||||
)
|
||||
@@ -83,7 +83,7 @@ struct ExpirationDateParam {
|
||||
params(
|
||||
ExpirationDateParam
|
||||
),
|
||||
path = "/v1/ecash/aggregated-expiration-date-signatures",
|
||||
path = "/v1/ecash/aggregated-expiration-date-signatures/{epoch_id}",
|
||||
responses(
|
||||
(status = 200, body = AggregatedExpirationDateSignatureResponse)
|
||||
)
|
||||
@@ -120,7 +120,7 @@ async fn expiration_date_signatures(
|
||||
params(
|
||||
EpochIdParam
|
||||
),
|
||||
path = "/v1/ecash/aggregated-coin-indices-signatures",
|
||||
path = "/v1/ecash/aggregated-coin-indices-signatures/{epoch_id}",
|
||||
responses(
|
||||
(status = 200, body = AggregatedCoinIndicesSignatureResponse)
|
||||
)
|
||||
|
||||
@@ -7,7 +7,7 @@ use crate::ecash::helpers::blind_sign;
|
||||
use crate::ecash::state::EcashState;
|
||||
use crate::node_status_api::models::AxumResult;
|
||||
use crate::support::http::state::AppState;
|
||||
use axum::extract::Query;
|
||||
use axum::extract::Path;
|
||||
use axum::{Json, Router};
|
||||
use nym_api_requests::ecash::{
|
||||
BlindSignRequestBody, BlindedSignatureResponse, PartialCoinIndicesSignatureResponse,
|
||||
@@ -32,14 +32,14 @@ pub(crate) fn partial_signing_routes(ecash_state: Arc<EcashState>) -> Router<App
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/partial-expiration-date-signatures",
|
||||
"/partial-expiration-date-signatures:expiration_date",
|
||||
axum::routing::get({
|
||||
let ecash_state = Arc::clone(&ecash_state);
|
||||
|expiration_date| partial_expiration_date_signatures(expiration_date, ecash_state)
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/partial-coin-indices-signatures",
|
||||
"/partial-coin-indices-signatures:epoch_id",
|
||||
axum::routing::get({
|
||||
let ecash_state = Arc::clone(&ecash_state);
|
||||
|epoch_id| partial_coin_indices_signatures(epoch_id, ecash_state)
|
||||
@@ -127,14 +127,14 @@ struct ExpirationDateParam {
|
||||
params(
|
||||
ExpirationDateParam
|
||||
),
|
||||
path = "/v1/ecash/partial-expiration-date-signatures",
|
||||
path = "/v1/ecash/partial-expiration-date-signatures/{expiration_date}",
|
||||
responses(
|
||||
(status = 200, body = PartialExpirationDateSignatureResponse),
|
||||
(status = 400, body = ErrorResponse, description = "this nym-api is not an ecash signer in the current epoch"),
|
||||
)
|
||||
)]
|
||||
async fn partial_expiration_date_signatures(
|
||||
Query(ExpirationDateParam { expiration_date }): Query<ExpirationDateParam>,
|
||||
Path(ExpirationDateParam { expiration_date }): Path<ExpirationDateParam>,
|
||||
state: Arc<EcashState>,
|
||||
) -> AxumResult<Json<PartialExpirationDateSignatureResponse>> {
|
||||
state.ensure_signer().await?;
|
||||
@@ -165,14 +165,14 @@ async fn partial_expiration_date_signatures(
|
||||
params(
|
||||
EpochIdParam
|
||||
),
|
||||
path = "/v1/ecash/partial-coin-indices-signatures",
|
||||
path = "/v1/ecash/partial-coin-indices-signatures/{epoch_id}",
|
||||
responses(
|
||||
(status = 200, body = PartialExpirationDateSignatureResponse),
|
||||
(status = 400, body = ErrorResponse, description = "this nym-api is not an ecash signer in the current epoch"),
|
||||
)
|
||||
)]
|
||||
async fn partial_coin_indices_signatures(
|
||||
Query(EpochIdParam { epoch_id }): Query<EpochIdParam>,
|
||||
Path(EpochIdParam { epoch_id }): Path<EpochIdParam>,
|
||||
state: Arc<EcashState>,
|
||||
) -> AxumResult<Json<PartialCoinIndicesSignatureResponse>> {
|
||||
state.ensure_signer().await?;
|
||||
|
||||
@@ -20,7 +20,6 @@ use time::macros::time;
|
||||
use time::{OffsetDateTime, Time};
|
||||
use tracing::{error, warn};
|
||||
|
||||
#[allow(deprecated)]
|
||||
pub(crate) fn spending_routes(ecash_state: Arc<EcashState>) -> Router<AppState> {
|
||||
Router::new()
|
||||
.route(
|
||||
@@ -243,7 +242,6 @@ async fn batch_redeem_tickets(
|
||||
(status = 500, body = ErrorResponse, description = "bloomfilters got disabled"),
|
||||
)
|
||||
)]
|
||||
#[deprecated]
|
||||
async fn double_spending_filter_v1(
|
||||
_state: Arc<EcashState>,
|
||||
) -> AxumResult<Json<SpentCredentialsResponse>> {
|
||||
|
||||
@@ -1261,7 +1261,6 @@ struct TestFixture {
|
||||
impl TestFixture {
|
||||
fn build_app_state(storage: NymApiStorage) -> AppState {
|
||||
AppState {
|
||||
forced_refresh: Default::default(),
|
||||
nym_contract_cache: NymContractCache::new(),
|
||||
node_status_cache: NodeStatusCache::new(),
|
||||
circulating_supply_cache: CirculatingSupplyCache::new("unym".to_owned()),
|
||||
|
||||
@@ -169,7 +169,7 @@ impl EpochAdvancer {
|
||||
let standby_eligible = all_choices
|
||||
.iter()
|
||||
.filter(|node| {
|
||||
!exit_gateways.contains(&node.0.node_id)
|
||||
exit_gateways.contains(&node.0.node_id)
|
||||
&& !entry_gateways.contains(&node.0.node_id)
|
||||
&& !mixnodes.contains(&node.0.node_id)
|
||||
})
|
||||
@@ -228,24 +228,14 @@ impl EpochAdvancer {
|
||||
)
|
||||
}
|
||||
|
||||
let mut rewarded_set = RewardedSet {
|
||||
Ok(RewardedSet {
|
||||
entry_gateways: entry_gateways.into_iter().collect(),
|
||||
exit_gateways: exit_gateways.into_iter().collect(),
|
||||
layer1,
|
||||
layer2,
|
||||
layer3,
|
||||
standby,
|
||||
};
|
||||
|
||||
// make sure to sort the rewarded set values
|
||||
rewarded_set.entry_gateways.sort();
|
||||
rewarded_set.exit_gateways.sort();
|
||||
rewarded_set.layer1.sort();
|
||||
rewarded_set.layer2.sort();
|
||||
rewarded_set.layer3.sort();
|
||||
rewarded_set.standby.sort();
|
||||
|
||||
Ok(rewarded_set)
|
||||
})
|
||||
}
|
||||
|
||||
async fn attach_performance_to_eligible_nodes(
|
||||
|
||||
@@ -274,8 +274,8 @@ impl<R: MessageReceiver + Send> Monitor<R> {
|
||||
info!("Received {}/{} packets", total_received, total_sent);
|
||||
|
||||
let summary = self.summary_producer.produce_summary(
|
||||
prepared_packets.mixnodes_under_test,
|
||||
prepared_packets.gateways_under_test,
|
||||
prepared_packets.tested_mixnodes,
|
||||
prepared_packets.tested_gateways,
|
||||
received,
|
||||
prepared_packets.invalid_mixnodes,
|
||||
prepared_packets.invalid_gateways,
|
||||
|
||||
@@ -25,7 +25,7 @@ use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, error, info, trace};
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
const DEFAULT_AVERAGE_PACKET_DELAY: Duration = Duration::from_millis(200);
|
||||
const DEFAULT_AVERAGE_ACK_DELAY: Duration = Duration::from_millis(200);
|
||||
@@ -59,10 +59,10 @@ pub(crate) struct PreparedPackets {
|
||||
pub(super) packets: Vec<GatewayPackets>,
|
||||
|
||||
/// Vector containing list of public keys and owners of all nodes mixnodes being tested.
|
||||
pub(super) mixnodes_under_test: Vec<TestableNode>,
|
||||
pub(super) tested_mixnodes: Vec<TestableNode>,
|
||||
|
||||
/// Vector containing list of public keys and owners of all gateways being tested.
|
||||
pub(super) gateways_under_test: Vec<TestableNode>,
|
||||
pub(super) tested_gateways: Vec<TestableNode>,
|
||||
|
||||
/// All mixnodes that failed to get parsed correctly or were not version compatible.
|
||||
/// They will be marked to the validator as being down for the test.
|
||||
@@ -151,38 +151,34 @@ impl PacketPreparer {
|
||||
self.contract_cache.wait_for_initial_values().await;
|
||||
self.described_cache.naive_wait_for_initial_values().await;
|
||||
|
||||
let described_nodes = self
|
||||
.described_cache
|
||||
.get()
|
||||
.await
|
||||
.expect("the self-describe cache should have been initialised!");
|
||||
|
||||
// now wait for at least `minimum_full_routes` mixnodes per layer and `minimum_full_routes` gateway to be online
|
||||
info!("Waiting for minimal topology to be online");
|
||||
let initialisation_backoff = Duration::from_secs(30);
|
||||
loop {
|
||||
let gateways = self.contract_cache.legacy_gateways_all().await;
|
||||
let mixnodes = self.contract_cache.legacy_mixnodes_all_basic().await;
|
||||
let nym_nodes = self.contract_cache.nym_nodes().await;
|
||||
|
||||
let mut gateways_count = gateways.len();
|
||||
let mut mixnodes_count = mixnodes.len();
|
||||
if gateways.len() < minimum_full_routes {
|
||||
self.topology_wait_backoff(initialisation_backoff).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
for nym_node in nym_nodes {
|
||||
if let Some(described) = described_nodes.get_description(&nym_node.node_id()) {
|
||||
if described.declared_role.mixnode {
|
||||
mixnodes_count += 1;
|
||||
} else if described.declared_role.entry {
|
||||
gateways_count += 1;
|
||||
}
|
||||
let mut layer1_count = 0;
|
||||
let mut layer2_count = 0;
|
||||
let mut layer3_count = 0;
|
||||
|
||||
for mix in mixnodes {
|
||||
match mix.layer {
|
||||
LegacyMixLayer::One => layer1_count += 1,
|
||||
LegacyMixLayer::Two => layer2_count += 1,
|
||||
LegacyMixLayer::Three => layer3_count += 1,
|
||||
}
|
||||
}
|
||||
|
||||
debug!(
|
||||
"we have {mixnodes_count} possible mixnodes and {gateways_count} possible gateways"
|
||||
);
|
||||
|
||||
if gateways_count >= minimum_full_routes && mixnodes_count * 3 >= minimum_full_routes {
|
||||
if layer1_count >= minimum_full_routes
|
||||
&& layer2_count >= minimum_full_routes
|
||||
&& layer3_count >= minimum_full_routes
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -533,41 +529,30 @@ impl PacketPreparer {
|
||||
let mixing_nym_nodes = descriptions.mixing_nym_nodes();
|
||||
let gateway_capable_nym_nodes = descriptions.entry_capable_nym_nodes();
|
||||
|
||||
let (mut mixnodes_to_test_details, invalid_mixnodes) =
|
||||
self.filter_outdated_and_malformed_mixnodes(mixnodes);
|
||||
let (mut gateways_to_test_details, invalid_gateways) =
|
||||
self.filter_outdated_and_malformed_gateways(gateways);
|
||||
let (mixnodes, invalid_mixnodes) = self.filter_outdated_and_malformed_mixnodes(mixnodes);
|
||||
let (gateways, invalid_gateways) = self.filter_outdated_and_malformed_gateways(gateways);
|
||||
|
||||
// summary of nodes that got tested
|
||||
let mut mixnodes_under_test = mixnodes_to_test_details
|
||||
.iter()
|
||||
.map(|node| node.into())
|
||||
.collect::<Vec<_>>();
|
||||
let mut gateways_under_test = gateways_to_test_details
|
||||
.iter()
|
||||
.map(|node| node.into())
|
||||
.collect::<Vec<_>>();
|
||||
let mut tested_mixnodes = mixnodes.iter().map(|node| node.into()).collect::<Vec<_>>();
|
||||
let mut tested_gateways = gateways.iter().map(|node| node.into()).collect::<Vec<_>>();
|
||||
|
||||
// try to add nym-nodes into the fold
|
||||
if let Some(rewarded_set) = rewarded_set {
|
||||
let mut rng = thread_rng();
|
||||
for mix in mixing_nym_nodes {
|
||||
if let Some(parsed) = self.nym_node_to_legacy_mix(&mut rng, &rewarded_set, mix) {
|
||||
mixnodes_under_test.push(TestableNode::from(&parsed));
|
||||
mixnodes_to_test_details.push(parsed);
|
||||
tested_mixnodes.push(TestableNode::from(&parsed));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for gateway in gateway_capable_nym_nodes {
|
||||
if let Some(parsed) = self.nym_node_to_legacy_gateway(gateway) {
|
||||
gateways_under_test.push((&parsed, gateway.node_id).into());
|
||||
gateways_to_test_details.push((parsed, gateway.node_id));
|
||||
tested_gateways.push((&parsed, gateway.node_id).into())
|
||||
}
|
||||
}
|
||||
|
||||
let packets_to_create = (test_routes.len() * self.per_node_test_packets)
|
||||
* (mixnodes_under_test.len() + gateways_under_test.len());
|
||||
* (tested_mixnodes.len() + tested_gateways.len());
|
||||
info!("Need to create {} mix packets", packets_to_create);
|
||||
|
||||
let mut all_gateway_packets = HashMap::new();
|
||||
@@ -589,7 +574,7 @@ impl PacketPreparer {
|
||||
#[allow(clippy::unwrap_used)]
|
||||
let mixnode_test_packets = mix_tester
|
||||
.mixnodes_test_packets(
|
||||
&mixnodes_to_test_details,
|
||||
&mixnodes,
|
||||
route_ext,
|
||||
self.per_node_test_packets as u32,
|
||||
None,
|
||||
@@ -603,7 +588,7 @@ impl PacketPreparer {
|
||||
gateway_packets.push_packets(mix_packets);
|
||||
|
||||
// and generate test packets for gateways (note the variable recipient)
|
||||
for (gateway, node_id) in &gateways_to_test_details {
|
||||
for (gateway, node_id) in &gateways {
|
||||
let recipient = self.create_packet_sender(gateway);
|
||||
let gateway_identity = gateway.identity_key;
|
||||
let gateway_address = gateway.clients_address();
|
||||
@@ -639,8 +624,8 @@ impl PacketPreparer {
|
||||
|
||||
PreparedPackets {
|
||||
packets,
|
||||
mixnodes_under_test,
|
||||
gateways_under_test,
|
||||
tested_mixnodes,
|
||||
tested_gateways,
|
||||
invalid_mixnodes,
|
||||
invalid_gateways,
|
||||
}
|
||||
|
||||
@@ -9,10 +9,9 @@ use crate::support::config;
|
||||
use crate::support::config::DEFAULT_NODE_DESCRIBE_BATCH_SIZE;
|
||||
use async_trait::async_trait;
|
||||
use futures::{stream, StreamExt};
|
||||
use nym_api_requests::legacy::{LegacyGatewayBondWithId, LegacyMixNodeDetailsWithLayer};
|
||||
use nym_api_requests::models::{DescribedNodeType, NymNodeData, NymNodeDescription};
|
||||
use nym_config::defaults::DEFAULT_NYM_NODE_HTTP_PORT;
|
||||
use nym_mixnet_contract_common::{LegacyMixLayer, NodeId, NymNodeDetails};
|
||||
use nym_mixnet_contract_common::{LegacyMixLayer, NodeId};
|
||||
use nym_node_requests::api::client::{NymNodeApiClientError, NymNodeApiClientExt};
|
||||
use nym_topology::gateway::GatewayConversionError;
|
||||
use nym_topology::mix::MixnodeConversionError;
|
||||
@@ -146,16 +145,11 @@ impl NodeDescriptionTopologyExt for NymNodeDescription {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DescribedNodes {
|
||||
nodes: HashMap<NodeId, NymNodeDescription>,
|
||||
}
|
||||
|
||||
impl DescribedNodes {
|
||||
pub fn force_update(&mut self, node: NymNodeDescription) {
|
||||
self.nodes.insert(node.node_id, node);
|
||||
}
|
||||
|
||||
pub fn get_description(&self, node_id: &NodeId) -> Option<&NymNodeData> {
|
||||
self.nodes.get(node_id).map(|n| &n.description)
|
||||
}
|
||||
@@ -296,8 +290,7 @@ async fn try_get_description(
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct RefreshData {
|
||||
struct RefreshData {
|
||||
host: String,
|
||||
node_id: NodeId,
|
||||
node_type: DescribedNodeType,
|
||||
@@ -305,39 +298,6 @@ pub(crate) struct RefreshData {
|
||||
port: Option<u16>,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a LegacyMixNodeDetailsWithLayer> for RefreshData {
|
||||
fn from(node: &'a LegacyMixNodeDetailsWithLayer) -> Self {
|
||||
RefreshData::new(
|
||||
&node.bond_information.mix_node.host,
|
||||
DescribedNodeType::LegacyMixnode,
|
||||
node.mix_id(),
|
||||
Some(node.bond_information.mix_node.http_api_port),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a LegacyGatewayBondWithId> for RefreshData {
|
||||
fn from(node: &'a LegacyGatewayBondWithId) -> Self {
|
||||
RefreshData::new(
|
||||
&node.bond.gateway.host,
|
||||
DescribedNodeType::LegacyGateway,
|
||||
node.node_id,
|
||||
None,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a NymNodeDetails> for RefreshData {
|
||||
fn from(node: &'a NymNodeDetails) -> Self {
|
||||
RefreshData::new(
|
||||
&node.bond_information.node.host,
|
||||
DescribedNodeType::NymNode,
|
||||
node.node_id(),
|
||||
node.bond_information.node.custom_http_port,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl RefreshData {
|
||||
pub fn new(
|
||||
host: impl Into<String>,
|
||||
@@ -353,11 +313,7 @@ impl RefreshData {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn node_id(&self) -> NodeId {
|
||||
self.node_id
|
||||
}
|
||||
|
||||
pub(crate) async fn try_refresh(self) -> Option<NymNodeDescription> {
|
||||
async fn try_refresh(self) -> Option<NymNodeDescription> {
|
||||
match try_get_description(self).await {
|
||||
Ok(description) => Some(description),
|
||||
Err(err) => {
|
||||
@@ -383,13 +339,18 @@ impl CacheItemProvider for NodeDescriptionProvider {
|
||||
// - legacy gateways (because they might already be running nym-nodes, but haven't updated contract info)
|
||||
// - nym-nodes
|
||||
|
||||
let mut nodes_to_query: Vec<RefreshData> = Vec::new();
|
||||
let mut nodes_to_query = Vec::new();
|
||||
|
||||
match self.contract_cache.all_cached_legacy_mixnodes().await {
|
||||
None => error!("failed to obtain mixnodes information from the cache"),
|
||||
Some(legacy_mixnodes) => {
|
||||
for node in &**legacy_mixnodes {
|
||||
nodes_to_query.push(node.into())
|
||||
nodes_to_query.push(RefreshData::new(
|
||||
&node.bond_information.mix_node.host,
|
||||
DescribedNodeType::LegacyMixnode,
|
||||
node.mix_id(),
|
||||
Some(node.bond_information.mix_node.http_api_port),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -398,7 +359,12 @@ impl CacheItemProvider for NodeDescriptionProvider {
|
||||
None => error!("failed to obtain gateways information from the cache"),
|
||||
Some(legacy_gateways) => {
|
||||
for node in &**legacy_gateways {
|
||||
nodes_to_query.push(node.into())
|
||||
nodes_to_query.push(RefreshData::new(
|
||||
&node.bond.gateway.host,
|
||||
DescribedNodeType::LegacyGateway,
|
||||
node.node_id,
|
||||
None,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -407,7 +373,12 @@ impl CacheItemProvider for NodeDescriptionProvider {
|
||||
None => error!("failed to obtain nym-nodes information from the cache"),
|
||||
Some(nym_nodes) => {
|
||||
for node in &**nym_nodes {
|
||||
nodes_to_query.push(node.into())
|
||||
nodes_to_query.push(RefreshData::new(
|
||||
&node.bond_information.node.host,
|
||||
DescribedNodeType::NymNode,
|
||||
node.node_id(),
|
||||
node.bond_information.node.custom_http_port,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,7 +210,6 @@ impl ResolvedNodeDescribedInfo {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct UnwrappedResolvedNodeDescribedInfo {
|
||||
pub(crate) build_info: BinaryBuildInformationOwned,
|
||||
pub(crate) roles: DeclaredRoles,
|
||||
|
||||
+3
-3
@@ -209,7 +209,7 @@ pub(crate) async fn produce_node_annotations(
|
||||
legacy_mix.mix_id(),
|
||||
NodeAnnotation {
|
||||
last_24h_performance: perf,
|
||||
current_role: rewarded_set.role(legacy_mix.mix_id()).map(|r| r.into()),
|
||||
current_role: rewarded_set.role(legacy_mix.mix_id()),
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -229,7 +229,7 @@ pub(crate) async fn produce_node_annotations(
|
||||
legacy_gateway.node_id,
|
||||
NodeAnnotation {
|
||||
last_24h_performance: perf,
|
||||
current_role: rewarded_set.role(legacy_gateway.node_id).map(|r| r.into()),
|
||||
current_role: rewarded_set.role(legacy_gateway.node_id),
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -249,7 +249,7 @@ pub(crate) async fn produce_node_annotations(
|
||||
nym_node.node_id(),
|
||||
NodeAnnotation {
|
||||
last_24h_performance: perf,
|
||||
current_role: rewarded_set.role(nym_node.node_id()).map(|r| r.into()),
|
||||
current_role: rewarded_set.role(nym_node.node_id()),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ use nym_serde_helpers::date::DATE_FORMAT;
|
||||
use reqwest::StatusCode;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::Error;
|
||||
use std::fmt::Display;
|
||||
use thiserror::Error;
|
||||
use time::{Date, OffsetDateTime};
|
||||
@@ -355,13 +354,6 @@ impl AxumErrorResponse {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn unauthorised(msg: impl Display) -> Self {
|
||||
Self {
|
||||
message: RequestError::new(msg.to_string()),
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn unprocessable_entity(msg: impl Display) -> Self {
|
||||
Self {
|
||||
message: RequestError::new(msg.to_string()),
|
||||
@@ -382,13 +374,6 @@ impl AxumErrorResponse {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn too_many(msg: impl Display) -> Self {
|
||||
Self {
|
||||
message: RequestError::new(msg.to_string()),
|
||||
status: StatusCode::TOO_MANY_REQUESTS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<UninitialisedCache> for AxumErrorResponse {
|
||||
@@ -453,7 +438,7 @@ pub enum NymApiStorageError {
|
||||
|
||||
// I don't think we want to expose errors to the user about what really happened
|
||||
#[error("experienced internal database error")]
|
||||
InternalDatabaseError(sqlx::Error),
|
||||
InternalDatabaseError(#[from] sqlx::Error),
|
||||
|
||||
// the same is true here (also note that the message is subtly different so we would be able to distinguish them)
|
||||
#[error("experienced internal storage error")]
|
||||
@@ -464,14 +449,6 @@ pub enum NymApiStorageError {
|
||||
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
|
||||
}
|
||||
|
||||
impl From<sqlx::Error> for NymApiStorageError {
|
||||
fn from(err: Error) -> Self {
|
||||
// those should realistically never be happening so an `error!` is warranted
|
||||
error!("storage failure: {err}");
|
||||
NymApiStorageError::InternalDatabaseError(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl NymApiStorageError {
|
||||
pub fn database_inconsistency<S: Into<String>>(reason: S) -> NymApiStorageError {
|
||||
NymApiStorageError::DatabaseInconsistency {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node_status_api::models::{
|
||||
@@ -9,7 +9,7 @@ use crate::storage::NymApiStorage;
|
||||
use nym_task::{TaskClient, TaskManager};
|
||||
use std::time::Duration;
|
||||
use time::{OffsetDateTime, PrimitiveDateTime, Time};
|
||||
use tokio::time::{interval_at, Instant};
|
||||
use tokio::time::{interval, sleep};
|
||||
use tracing::error;
|
||||
use tracing::{info, trace, warn};
|
||||
|
||||
@@ -93,8 +93,15 @@ impl HistoricalUptimeUpdater {
|
||||
"waiting until {update_datetime} to update the historical uptimes for the first time ({} seconds left)", time_left.as_secs()
|
||||
);
|
||||
|
||||
let start = Instant::now() + time_left;
|
||||
let mut interval = interval_at(start, ONE_DAY);
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
trace!("UpdateHandler: Received shutdown");
|
||||
}
|
||||
_ = sleep(time_left) => {}
|
||||
}
|
||||
|
||||
let mut interval = interval(ONE_DAY);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
@@ -102,7 +109,6 @@ impl HistoricalUptimeUpdater {
|
||||
trace!("UpdateHandler: Received shutdown");
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
info!("updating historical uptimes of nodes");
|
||||
// we don't want to have another select here; uptime update is relatively speedy
|
||||
// and we don't want to exit while we're in the middle of database update
|
||||
if let Err(err) = self.update_uptimes().await {
|
||||
|
||||
-44
@@ -1,7 +1,6 @@
|
||||
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node_describe_cache::RefreshData;
|
||||
use crate::nym_contract_cache::cache::data::CachedContractsInfo;
|
||||
use crate::support::caching::Cache;
|
||||
use data::ValidatorCacheData;
|
||||
@@ -9,7 +8,6 @@ use nym_api_requests::legacy::{
|
||||
LegacyGatewayBondWithId, LegacyMixNodeBondWithLayer, LegacyMixNodeDetailsWithLayer,
|
||||
};
|
||||
use nym_api_requests::models::MixnodeStatus;
|
||||
use nym_crypto::asymmetric::ed25519;
|
||||
use nym_mixnet_contract_common::{Interval, NodeId, NymNodeDetails, RewardedSet, RewardingParams};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
@@ -354,48 +352,6 @@ impl NymContractCache {
|
||||
self.legacy_mixnode_details(mix_id).await.1
|
||||
}
|
||||
|
||||
pub async fn get_node_refresh_data(
|
||||
&self,
|
||||
node_identity: ed25519::PublicKey,
|
||||
) -> Option<RefreshData> {
|
||||
if !self.initialised() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
let encoded_identity = node_identity.to_base58_string();
|
||||
|
||||
// 1. check nymnodes
|
||||
if let Some(nym_node) = inner
|
||||
.nym_nodes
|
||||
.iter()
|
||||
.find(|n| n.bond_information.identity() == encoded_identity)
|
||||
{
|
||||
return Some(nym_node.into());
|
||||
}
|
||||
|
||||
// 2. check legacy mixnodes
|
||||
if let Some(mixnode) = inner
|
||||
.legacy_mixnodes
|
||||
.iter()
|
||||
.find(|n| n.bond_information.identity() == encoded_identity)
|
||||
{
|
||||
return Some(mixnode.into());
|
||||
}
|
||||
|
||||
// 3. check legacy gateways
|
||||
if let Some(gateway) = inner
|
||||
.legacy_gateways
|
||||
.iter()
|
||||
.find(|n| n.identity() == &encoded_identity)
|
||||
{
|
||||
return Some(gateway.into());
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub fn initialised(&self) -> bool {
|
||||
self.initialised.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
+1
-1
@@ -173,7 +173,7 @@ impl NymContractCacheRefresher {
|
||||
layer,
|
||||
},
|
||||
rewarding_details: detail.rewarding_details,
|
||||
pending_changes: detail.pending_changes.into(),
|
||||
pending_changes: detail.pending_changes,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ async fn get_gateways_described(
|
||||
.into_iter()
|
||||
.map(|bond| LegacyDescribedGateway {
|
||||
self_described: self_descriptions.get_description(&bond.node_id).cloned(),
|
||||
bond: bond.bond,
|
||||
bond,
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
|
||||
@@ -5,11 +5,11 @@ use crate::node_status_api::models::{AxumErrorResponse, AxumResult};
|
||||
use crate::support::http::helpers::{NodeIdParam, PaginationRequest};
|
||||
use crate::support::http::state::AppState;
|
||||
use axum::extract::{Path, Query, State};
|
||||
use axum::routing::{get, post};
|
||||
use axum::routing::get;
|
||||
use axum::{Json, Router};
|
||||
use nym_api_requests::models::{
|
||||
AnnotationResponse, NodeDatePerformanceResponse, NodePerformanceResponse, NodeRefreshBody,
|
||||
NoiseDetails, NymNodeDescription, PerformanceHistoryResponse, UptimeHistoryResponse,
|
||||
AnnotationResponse, NodeDatePerformanceResponse, NodePerformanceResponse, NoiseDetails,
|
||||
NymNodeData, PerformanceHistoryResponse, UptimeHistoryResponse,
|
||||
};
|
||||
use nym_api_requests::pagination::{PaginatedResponse, Pagination};
|
||||
use nym_contracts_common::NaiveFloat;
|
||||
@@ -17,8 +17,7 @@ use nym_mixnet_contract_common::reward_params::Performance;
|
||||
use nym_mixnet_contract_common::NymNodeDetails;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
use time::{Date, OffsetDateTime};
|
||||
use time::Date;
|
||||
use utoipa::{IntoParams, ToSchema};
|
||||
|
||||
pub(crate) mod legacy;
|
||||
@@ -26,7 +25,6 @@ pub(crate) mod unstable;
|
||||
|
||||
pub(crate) fn nym_node_routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/refresh-described", post(refresh_described))
|
||||
.route("/noise", get(nodes_noise))
|
||||
.route("/bonded", get(get_bonded_nodes))
|
||||
.route("/described", get(get_described_nodes))
|
||||
@@ -44,63 +42,6 @@ pub(crate) fn nym_node_routes() -> Router<AppState> {
|
||||
.route("/uptime-history/:node_id", get(get_node_uptime_history))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Nym Nodes",
|
||||
post,
|
||||
request_body = NodeRefreshBody,
|
||||
path = "/refresh-described",
|
||||
context_path = "/v1/nym-nodes",
|
||||
)]
|
||||
async fn refresh_described(
|
||||
State(state): State<AppState>,
|
||||
Json(request_body): Json<NodeRefreshBody>,
|
||||
) -> AxumResult<Json<()>> {
|
||||
let Some(refresh_data) = state
|
||||
.nym_contract_cache()
|
||||
.get_node_refresh_data(request_body.node_identity)
|
||||
.await
|
||||
else {
|
||||
return Err(AxumErrorResponse::not_found(format!(
|
||||
"node with identity {} does not seem to exist",
|
||||
request_body.node_identity
|
||||
)));
|
||||
};
|
||||
|
||||
if !request_body.verify_signature() {
|
||||
return Err(AxumErrorResponse::unauthorised("invalid request signature"));
|
||||
}
|
||||
|
||||
if request_body.is_stale() {
|
||||
return Err(AxumErrorResponse::bad_request("the request is stale"));
|
||||
}
|
||||
|
||||
let node_id = refresh_data.node_id();
|
||||
if let Some(last) = state.forced_refresh.last_refreshed(node_id).await {
|
||||
// max 1 refresh a minute
|
||||
let minute_ago = OffsetDateTime::now_utc() - Duration::from_secs(60);
|
||||
if last > minute_ago {
|
||||
return Err(AxumErrorResponse::too_many(
|
||||
"already refreshed node in the last minute",
|
||||
));
|
||||
}
|
||||
}
|
||||
// to make sure you can't ddos the endpoint while a request is in progress
|
||||
state.forced_refresh.set_last_refreshed(node_id).await;
|
||||
|
||||
if let Some(updated_data) = refresh_data.try_refresh().await {
|
||||
let Ok(mut describe_cache) = state.described_nodes_cache.write().await else {
|
||||
return Err(AxumErrorResponse::service_unavailable());
|
||||
};
|
||||
describe_cache.get_mut().force_update(updated_data)
|
||||
} else {
|
||||
return Err(AxumErrorResponse::unprocessable_entity(
|
||||
"failed to refresh node description",
|
||||
));
|
||||
}
|
||||
|
||||
Ok(Json(()))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Nym Nodes",
|
||||
get,
|
||||
@@ -184,27 +125,32 @@ async fn get_bonded_nodes(
|
||||
path = "/described",
|
||||
context_path = "/v1/nym-nodes",
|
||||
responses(
|
||||
(status = 200, body = PaginatedResponse<NymNodeDescription>)
|
||||
(status = 200, body = PaginatedResponse<NymNodeData>)
|
||||
),
|
||||
params(PaginationRequest)
|
||||
)]
|
||||
async fn get_described_nodes(
|
||||
State(state): State<AppState>,
|
||||
Query(pagination): Query<PaginationRequest>,
|
||||
) -> AxumResult<Json<PaginatedResponse<NymNodeDescription>>> {
|
||||
) -> AxumResult<Json<PaginatedResponse<NymNodeData>>> {
|
||||
// TODO: implement it
|
||||
let _ = pagination;
|
||||
|
||||
let cache = state.described_nodes_cache.get().await?;
|
||||
let descriptions = cache.all_nodes().cloned().collect::<Vec<_>>();
|
||||
let descriptions = cache.all_nodes();
|
||||
|
||||
let data = descriptions
|
||||
.map(|n| &n.description)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(Json(PaginatedResponse {
|
||||
pagination: Pagination {
|
||||
total: descriptions.len(),
|
||||
total: data.len(),
|
||||
page: 0,
|
||||
size: descriptions.len(),
|
||||
size: data.len(),
|
||||
},
|
||||
data: descriptions,
|
||||
data,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::{RwLock, RwLockMappedWriteGuard, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
#[error("the cache item has not been initialised")]
|
||||
@@ -45,13 +45,6 @@ impl<T> SharedCache<T> {
|
||||
RwLockReadGuard::try_map(guard, |a| a.inner.as_ref()).map_err(|_| UninitialisedCache)
|
||||
}
|
||||
|
||||
pub(crate) async fn write(
|
||||
&self,
|
||||
) -> Result<RwLockMappedWriteGuard<'_, Cache<T>>, UninitialisedCache> {
|
||||
let guard = self.0.write().await;
|
||||
RwLockWriteGuard::try_map(guard, |a| a.inner.as_mut()).map_err(|_| UninitialisedCache)
|
||||
}
|
||||
|
||||
// ignores expiration data
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn unchecked_get_inner(
|
||||
@@ -141,10 +134,6 @@ impl<T> Cache<T> {
|
||||
self.as_at = OffsetDateTime::now_utc()
|
||||
}
|
||||
|
||||
pub(crate) fn get_mut(&mut self) -> &mut T {
|
||||
&mut self.value
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn has_expired(&self, ttl: Duration, now: Option<OffsetDateTime>) -> bool {
|
||||
let now = now.unwrap_or(OffsetDateTime::now_utc());
|
||||
|
||||
@@ -188,7 +188,6 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
|
||||
};
|
||||
|
||||
let router = router.with_state(AppState {
|
||||
forced_refresh: Default::default(),
|
||||
nym_contract_cache: nym_contract_cache_state.clone(),
|
||||
node_status_cache: node_status_cache_state.clone(),
|
||||
circulating_supply_cache: circulating_supply_cache.clone(),
|
||||
|
||||
@@ -234,9 +234,9 @@ impl Config {
|
||||
fn default_http_socket_addr() -> SocketAddr {
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(debug_assertions)] {
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000)
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080)
|
||||
} else {
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 8000)
|
||||
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 8080)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ use utoipauto::utoipauto;
|
||||
models::CirculatingSupplyResponse,
|
||||
models::CoinSchema,
|
||||
nym_mixnet_contract_common::Interval,
|
||||
nym_api_requests::models::NodeRefreshBody,
|
||||
nym_api_requests::models::GatewayStatusReportResponse,
|
||||
nym_api_requests::models::GatewayUptimeHistoryResponse,
|
||||
nym_api_requests::models::GatewayCoreStatusResponse,
|
||||
|
||||
@@ -15,9 +15,7 @@ use nym_api_requests::models::{GatewayBondAnnotated, MixNodeBondAnnotated, NodeA
|
||||
use nym_mixnet_contract_common::NodeId;
|
||||
use nym_task::TaskManager;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
use tokio::sync::RwLockReadGuard;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
@@ -72,7 +70,6 @@ type AxumJoinHandle = JoinHandle<std::io::Result<()>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AppState {
|
||||
pub(crate) forced_refresh: ForcedRefresh,
|
||||
pub(crate) nym_contract_cache: NymContractCache,
|
||||
pub(crate) node_status_cache: NodeStatusCache,
|
||||
pub(crate) circulating_supply_cache: CirculatingSupplyCache,
|
||||
@@ -82,24 +79,6 @@ pub(crate) struct AppState {
|
||||
pub(crate) node_info_cache: unstable::NodeInfoCache,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct ForcedRefresh {
|
||||
pub(crate) refreshes: Arc<RwLock<HashMap<NodeId, OffsetDateTime>>>,
|
||||
}
|
||||
|
||||
impl ForcedRefresh {
|
||||
pub(crate) async fn last_refreshed(&self, node_id: NodeId) -> Option<OffsetDateTime> {
|
||||
self.refreshes.read().await.get(&node_id).copied()
|
||||
}
|
||||
|
||||
pub(crate) async fn set_last_refreshed(&self, node_id: NodeId) {
|
||||
self.refreshes
|
||||
.write()
|
||||
.await
|
||||
.insert(node_id, OffsetDateTime::now_utc());
|
||||
}
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub(crate) fn nym_contract_cache(&self) -> &NymContractCache {
|
||||
&self.nym_contract_cache
|
||||
|
||||
@@ -119,8 +119,9 @@ impl StorageManager {
|
||||
start_ts_secs: i64,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Vec<AvgGatewayReliability>, sqlx::Error> {
|
||||
let result = sqlx::query_as!(
|
||||
AvgGatewayReliability,
|
||||
// we can't use `query_as!` macro because we don't apply all required table changes during sqlx migrations.
|
||||
// some (like v3 directory) happens at runtime
|
||||
let result = sqlx::query_as(
|
||||
r#"
|
||||
SELECT
|
||||
d.node_id as "node_id: NodeId",
|
||||
@@ -134,9 +135,9 @@ impl StorageManager {
|
||||
timestamp <= ?
|
||||
GROUP BY 1
|
||||
"#,
|
||||
start_ts_secs,
|
||||
end_ts_secs
|
||||
)
|
||||
.bind(start_ts_secs)
|
||||
.bind(end_ts_secs)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(result)
|
||||
@@ -983,8 +984,7 @@ impl StorageManager {
|
||||
since: i64,
|
||||
until: i64,
|
||||
) -> Result<Vec<ActiveGateway>, sqlx::Error> {
|
||||
sqlx::query_as!(
|
||||
ActiveGateway,
|
||||
sqlx::query_as(
|
||||
r#"
|
||||
SELECT DISTINCT identity, node_id as "node_id: NodeId", id
|
||||
FROM gateway_details
|
||||
@@ -994,9 +994,9 @@ impl StorageManager {
|
||||
SELECT 1 FROM gateway_status WHERE timestamp > ? AND timestamp < ?
|
||||
)
|
||||
"#,
|
||||
since,
|
||||
until
|
||||
)
|
||||
.bind(since)
|
||||
.bind(until)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -30,6 +30,10 @@ pub async fn migrate_v3_database(
|
||||
let contract_gateways = nyxd_client.get_gateways().await?;
|
||||
let nym_nodes = nyxd_client.get_nymnodes().await?;
|
||||
|
||||
if preassigned_ids.len() != contract_gateways.len() {
|
||||
bail!("CONTRACT DATA CORRUPTION: THE NUMBER OF PREASSIGNED GATEWAY IDS IS DIFFERENT THAN THE NUMBER OF GATEWAYS")
|
||||
}
|
||||
|
||||
// assign node_id to every gateway
|
||||
let all_known = storage.get_all_known_gateways().await?;
|
||||
for gateway in all_known {
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user