Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 74ab09b05f |
Generated
+2
-4
@@ -717,8 +717,7 @@ checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
|
||||
[[package]]
|
||||
name = "cosmos-sdk-proto"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0254ffee603f5301d6a66963d9e1cc5091479c22e2e925e1f7689c8027a0828"
|
||||
source = "git+https://github.com/nymtech/cosmos-rust?branch=bugfix/account-id-length-validation#911fbe1236cfed591783ccef01018f7ccc97c496"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"prost-types",
|
||||
@@ -728,8 +727,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "cosmrs"
|
||||
version = "0.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "505ea048e9ff2f906d6b954f9f8157d903ca468bfb301d906b40ecc25ba6838d"
|
||||
source = "git+https://github.com/nymtech/cosmos-rust?branch=bugfix/account-id-length-validation#911fbe1236cfed591783ccef01018f7ccc97c496"
|
||||
dependencies = [
|
||||
"bip32",
|
||||
"cosmos-sdk-proto",
|
||||
|
||||
Generated
+2
-2
@@ -810,7 +810,7 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
|
||||
|
||||
[[package]]
|
||||
name = "mixnet-contract"
|
||||
version = "1.0.0-rc.1"
|
||||
version = "1.0.0"
|
||||
dependencies = [
|
||||
"az",
|
||||
"bs58",
|
||||
@@ -1518,7 +1518,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
||||
|
||||
[[package]]
|
||||
name = "vesting-contract"
|
||||
version = "1.0.0-rc.1"
|
||||
version = "1.0.0"
|
||||
dependencies = [
|
||||
"config",
|
||||
"cosmwasm-std",
|
||||
|
||||
@@ -435,14 +435,6 @@ impl Config {
|
||||
self.network_monitor.minimum_test_routes
|
||||
}
|
||||
|
||||
pub fn get_min_mixnode_reliability(&self) -> u8 {
|
||||
self.network_monitor.min_mixnode_reliability
|
||||
}
|
||||
|
||||
pub fn get_min_gateway_reliability(&self) -> u8 {
|
||||
self.network_monitor.min_gateway_reliability
|
||||
}
|
||||
|
||||
pub fn get_route_test_packets(&self) -> usize {
|
||||
self.network_monitor.route_test_packets
|
||||
}
|
||||
|
||||
@@ -19,11 +19,6 @@ mixnet_contract_address = '{{ base.mixnet_contract_address }}'
|
||||
##### network monitor config options #####
|
||||
|
||||
[network_monitor]
|
||||
|
||||
# Mixnodes and gateways with relialability lower the this get blacklisted by network monitor, get no traffic and cannot be selected into a rewarded set.
|
||||
min_mixnode_reliability = {{ network_monitor.min_mixnode_reliability }} # deafults to 50
|
||||
min_gateway_reliability = {{ network_monitor.min_gateway_reliability }} # defaults to 20
|
||||
|
||||
# Specifies whether network monitoring service is enabled in this process.
|
||||
enabled = {{ network_monitor.enabled }}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::collections::HashSet;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{Notify, RwLock};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time;
|
||||
use validator_api_requests::models::MixnodeStatus;
|
||||
use validator_client::nymd::CosmWasmClient;
|
||||
@@ -28,7 +28,6 @@ pub struct ValidatorCacheRefresher<C> {
|
||||
nymd_client: Client<C>,
|
||||
cache: ValidatorCache,
|
||||
caching_interval: Duration,
|
||||
update_rewarded_set_notify: Option<Arc<Notify>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -89,13 +88,11 @@ impl<C> ValidatorCacheRefresher<C> {
|
||||
nymd_client: Client<C>,
|
||||
caching_interval: Duration,
|
||||
cache: ValidatorCache,
|
||||
update_rewarded_set_notify: Option<Arc<Notify>>,
|
||||
) -> Self {
|
||||
ValidatorCacheRefresher {
|
||||
nymd_client,
|
||||
cache,
|
||||
caching_interval,
|
||||
update_rewarded_set_notify,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,6 +133,7 @@ impl<C> ValidatorCacheRefresher<C> {
|
||||
self.collect_rewarded_and_active_set_details(&mixnodes, rewarded_set_identities);
|
||||
|
||||
let epoch_rewarding_params = self.nymd_client.get_current_epoch_reward_params().await?;
|
||||
let current_epoch = self.nymd_client.get_current_epoch().await?;
|
||||
|
||||
info!(
|
||||
"Updating validator cache. There are {} mixnodes and {} gateways",
|
||||
@@ -150,23 +148,10 @@ impl<C> ValidatorCacheRefresher<C> {
|
||||
rewarded_set,
|
||||
active_set,
|
||||
epoch_rewarding_params,
|
||||
current_epoch,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(notify) = &self.update_rewarded_set_notify {
|
||||
let update_details = self
|
||||
.nymd_client
|
||||
.get_current_rewarded_set_update_details()
|
||||
.await?;
|
||||
|
||||
if update_details.last_refreshed_block + (update_details.refresh_rate_blocks as u64)
|
||||
< update_details.current_height
|
||||
{
|
||||
// there's only ever a single waiter -> the set updater
|
||||
notify.notify_one()
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -221,6 +206,7 @@ impl ValidatorCache {
|
||||
rewarded_set: Vec<MixNodeBond>,
|
||||
active_set: Vec<MixNodeBond>,
|
||||
epoch_rewarding_params: EpochRewardParams,
|
||||
current_epoch: Interval,
|
||||
) {
|
||||
let mut inner = self.inner.write().await;
|
||||
|
||||
@@ -229,6 +215,7 @@ impl ValidatorCache {
|
||||
inner.rewarded_set.update(rewarded_set);
|
||||
inner.active_set.update(active_set);
|
||||
inner.current_reward_params.update(epoch_rewarding_params);
|
||||
inner.current_epoch.update(Some(current_epoch));
|
||||
}
|
||||
|
||||
pub async fn mixnodes_blacklist(&self) -> Cache<HashSet<IdentityKey>> {
|
||||
|
||||
@@ -231,6 +231,8 @@ fn setup_logging() {
|
||||
.filter_module("sled", log::LevelFilter::Warn)
|
||||
.filter_module("tungstenite", log::LevelFilter::Warn)
|
||||
.filter_module("tokio_tungstenite", log::LevelFilter::Warn)
|
||||
.filter_module("_", log::LevelFilter::Warn)
|
||||
.filter_module("rocket::server", log::LevelFilter::Warn)
|
||||
.init();
|
||||
}
|
||||
|
||||
@@ -461,14 +463,11 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
|
||||
// if network monitor is disabled, we're not going to be sending any rewarding hence
|
||||
// we're not starting signing client
|
||||
if config.get_network_monitor_enabled() {
|
||||
let rewarded_set_update_notify = Arc::new(Notify::new());
|
||||
|
||||
let nymd_client = Client::new_signing(&config);
|
||||
let validator_cache_refresher = ValidatorCacheRefresher::new(
|
||||
nymd_client.clone(),
|
||||
config.get_caching_interval(),
|
||||
validator_cache.clone(),
|
||||
Some(Arc::clone(&rewarded_set_update_notify)),
|
||||
);
|
||||
|
||||
// spawn our cacher
|
||||
@@ -480,13 +479,8 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
|
||||
let uptime_updater = HistoricalUptimeUpdater::new(storage.clone());
|
||||
tokio::spawn(async move { uptime_updater.run().await });
|
||||
|
||||
let mut rewarded_set_updater = RewardedSetUpdater::new(
|
||||
nymd_client,
|
||||
rewarded_set_update_notify,
|
||||
validator_cache.clone(),
|
||||
storage,
|
||||
)
|
||||
.await?;
|
||||
let mut rewarded_set_updater =
|
||||
RewardedSetUpdater::new(nymd_client, validator_cache.clone(), storage).await?;
|
||||
|
||||
// spawn rewarded set updater
|
||||
tokio::spawn(async move { rewarded_set_updater.run().await.unwrap() });
|
||||
@@ -496,7 +490,6 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
|
||||
nymd_client,
|
||||
config.get_caching_interval(),
|
||||
validator_cache,
|
||||
None,
|
||||
);
|
||||
|
||||
// spawn our cacher
|
||||
|
||||
@@ -42,8 +42,6 @@ pub(super) struct Monitor {
|
||||
/// The minimum number of test routes that need to be constructed (and working) in order for
|
||||
/// a monitor test run to be valid.
|
||||
minimum_test_routes: usize,
|
||||
min_mixnode_reliability: u8,
|
||||
min_gateway_reliability: u8,
|
||||
}
|
||||
|
||||
impl Monitor {
|
||||
@@ -68,8 +66,6 @@ impl Monitor {
|
||||
route_test_packets: config.get_route_test_packets(),
|
||||
test_routes: config.get_test_routes(),
|
||||
minimum_test_routes: config.get_minimum_test_routes(),
|
||||
min_mixnode_reliability: config.get_min_mixnode_reliability(),
|
||||
min_gateway_reliability: config.get_min_gateway_reliability(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,35 +74,6 @@ impl Monitor {
|
||||
async fn submit_new_node_statuses(&mut self, test_summary: TestSummary) {
|
||||
// indicate our run has completed successfully and should be used in any future
|
||||
// uptime calculations
|
||||
|
||||
for result in test_summary.mixnode_results.iter() {
|
||||
if result.reliability < self.min_mixnode_reliability {
|
||||
self.packet_preparer
|
||||
.validator_cache()
|
||||
.insert_mixnodes_blacklist(result.identity.clone())
|
||||
.await;
|
||||
} else {
|
||||
self.packet_preparer
|
||||
.validator_cache()
|
||||
.remove_mixnodes_blacklist(&result.identity)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
for result in test_summary.gateway_results.iter() {
|
||||
if result.reliability < self.min_gateway_reliability {
|
||||
self.packet_preparer
|
||||
.validator_cache()
|
||||
.insert_gateways_blacklist(result.identity.clone())
|
||||
.await;
|
||||
} else {
|
||||
self.packet_preparer
|
||||
.validator_cache()
|
||||
.remove_gateways_blacklist(&result.identity)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = self
|
||||
.node_status_storage
|
||||
.insert_monitor_run_results(
|
||||
|
||||
@@ -151,10 +151,6 @@ impl PacketPreparer {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn validator_cache(&mut self) -> &mut ValidatorCache {
|
||||
&mut self.validator_cache
|
||||
}
|
||||
|
||||
async fn wrap_test_packet(
|
||||
&mut self,
|
||||
packet: &TestPacket,
|
||||
|
||||
@@ -25,6 +25,15 @@ impl Uptime {
|
||||
Uptime(0)
|
||||
}
|
||||
|
||||
pub fn new(uptime: f32) -> Self {
|
||||
if uptime > 100f32 {
|
||||
error!("Got uptime {}, max is 100, returning 0", uptime);
|
||||
Uptime(0)
|
||||
} else {
|
||||
Uptime(uptime as u8)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_ratio(numerator: usize, denominator: usize) -> Result<Self, InvalidUptime> {
|
||||
if denominator == 0 {
|
||||
return Ok(Self::zero());
|
||||
|
||||
@@ -118,18 +118,16 @@ pub(crate) async fn get_mixnode_reward_estimation(
|
||||
) -> Result<Json<RewardEstimationResponse>, ErrorResponse> {
|
||||
let (bond, status) = cache.mixnode_details(&identity).await;
|
||||
if let Some(bond) = bond {
|
||||
let interval_reward_params = cache.epoch_reward_params().await;
|
||||
let as_at = interval_reward_params.timestamp();
|
||||
let interval_reward_params = interval_reward_params.into_inner();
|
||||
let reward_params = cache.epoch_reward_params().await;
|
||||
let as_at = reward_params.timestamp();
|
||||
let reward_params = reward_params.into_inner();
|
||||
|
||||
let current_epoch = cache.current_epoch().await.into_inner();
|
||||
info!("{:?}", current_epoch);
|
||||
|
||||
let uptime = if let Some(epoch) = current_epoch {
|
||||
storage
|
||||
.get_average_mixnode_uptime_in_interval(
|
||||
&identity,
|
||||
epoch.start_unix_timestamp(),
|
||||
epoch.end_unix_timestamp(),
|
||||
)
|
||||
.get_average_mixnode_uptime_in_the_last_24hrs(&identity, epoch.end_unix_timestamp())
|
||||
.await
|
||||
.map_err(|err| ErrorResponse::new(err.to_string(), Status::NotFound))?
|
||||
} else {
|
||||
@@ -137,7 +135,7 @@ pub(crate) async fn get_mixnode_reward_estimation(
|
||||
};
|
||||
|
||||
let node_reward_params = NodeRewardParams::new(0, uptime.u8() as u128, status.is_active());
|
||||
let reward_params = RewardParams::new(interval_reward_params, node_reward_params);
|
||||
let reward_params = RewardParams::new(reward_params, node_reward_params);
|
||||
let epoch_start = if let Some(epoch) = current_epoch {
|
||||
epoch.start_unix_timestamp()
|
||||
} else {
|
||||
|
||||
@@ -8,7 +8,6 @@ use mixnet_contract_common::Interval;
|
||||
use mixnet_contract_common::{
|
||||
reward_params::EpochRewardParams, ContractStateParams, Delegation, ExecuteMsg, GatewayBond,
|
||||
IdentityKey, MixNodeBond, MixnodeRewardingStatusResponse, RewardedSetNodeStatus,
|
||||
RewardedSetUpdateDetails,
|
||||
};
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
@@ -265,19 +264,6 @@ impl<C> Client<C> {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_current_rewarded_set_update_details(
|
||||
&self,
|
||||
) -> Result<RewardedSetUpdateDetails, ValidatorClientError>
|
||||
where
|
||||
C: CosmWasmClient + Sync,
|
||||
{
|
||||
self.0
|
||||
.read()
|
||||
.await
|
||||
.get_current_rewarded_set_update_details()
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn advance_current_epoch(&self) -> Result<(), ValidatorClientError>
|
||||
where
|
||||
|
||||
@@ -22,9 +22,9 @@ use mixnet_contract_common::{IdentityKey, Interval, MixNodeBond};
|
||||
use rand::prelude::SliceRandom;
|
||||
use rand::rngs::OsRng;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Notify;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::time::sleep;
|
||||
use validator_client::nymd::{CosmosCoin, SigningNymdClient};
|
||||
|
||||
pub(crate) mod error;
|
||||
@@ -62,7 +62,6 @@ type Epoch = Interval;
|
||||
|
||||
pub struct RewardedSetUpdater {
|
||||
nymd_client: Client<SigningNymdClient>,
|
||||
update_rewarded_set_notify: Arc<Notify>,
|
||||
validator_cache: ValidatorCache,
|
||||
storage: ValidatorApiStorage,
|
||||
}
|
||||
@@ -74,13 +73,11 @@ impl RewardedSetUpdater {
|
||||
|
||||
pub(crate) async fn new(
|
||||
nymd_client: Client<SigningNymdClient>,
|
||||
update_rewarded_set_notify: Arc<Notify>,
|
||||
validator_cache: ValidatorCache,
|
||||
storage: ValidatorApiStorage,
|
||||
) -> Result<Self, RewardingError> {
|
||||
Ok(RewardedSetUpdater {
|
||||
nymd_client,
|
||||
update_rewarded_set_notify,
|
||||
validator_cache,
|
||||
storage,
|
||||
})
|
||||
@@ -139,7 +136,7 @@ impl RewardedSetUpdater {
|
||||
.insert_rewarding_report(rewarding_report)
|
||||
.await?;
|
||||
|
||||
Ok(self.generate_reward_messages(&to_reward).await?)
|
||||
self.generate_reward_messages(&to_reward).await
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
@@ -177,9 +174,8 @@ impl RewardedSetUpdater {
|
||||
for rewarded_node in rewarded_set.into_iter() {
|
||||
let uptime = self
|
||||
.storage
|
||||
.get_average_mixnode_uptime_in_interval(
|
||||
.get_average_mixnode_uptime_in_the_last_24hrs(
|
||||
rewarded_node.identity(),
|
||||
epoch.start_unix_timestamp(),
|
||||
epoch.end_unix_timestamp(),
|
||||
)
|
||||
.await?;
|
||||
@@ -200,7 +196,7 @@ impl RewardedSetUpdater {
|
||||
}
|
||||
|
||||
// This is where the epoch gets advanced, and all epoch related transactions originate
|
||||
async fn update_rewarded_set(&self) -> Result<(), RewardingError> {
|
||||
async fn update(&self) -> Result<(), RewardingError> {
|
||||
let epoch = self.epoch().await?;
|
||||
log::info!("Starting rewarded set update");
|
||||
// we know the entries are not stale, as a matter of fact they were JUST updated, since we got notified
|
||||
@@ -242,13 +238,85 @@ impl RewardedSetUpdater {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_blacklist(&mut self, epoch: &Interval) -> Result<(), RewardingError> {
|
||||
info!("Updating blacklist");
|
||||
let mixnodes = self
|
||||
.storage
|
||||
.get_all_avg_mix_reliability_in_last_24hr(epoch.end_unix_timestamp())
|
||||
.await?;
|
||||
let gateways = self
|
||||
.storage
|
||||
.get_all_avg_gateway_reliability_in_last_24hr(epoch.end_unix_timestamp())
|
||||
.await?;
|
||||
for mix in mixnodes {
|
||||
if mix.value() <= 50.0 {
|
||||
self.validator_cache
|
||||
.insert_mixnodes_blacklist(mix.identity().to_owned())
|
||||
.await;
|
||||
} else {
|
||||
self.validator_cache
|
||||
.remove_mixnodes_blacklist(mix.identity())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
for gateway in gateways {
|
||||
if gateway.value() <= 50.0 {
|
||||
self.validator_cache
|
||||
.insert_gateways_blacklist(gateway.identity().to_owned())
|
||||
.await;
|
||||
} else {
|
||||
self.validator_cache
|
||||
.remove_gateways_blacklist(gateway.identity())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Blacklisted mixnodes: {}",
|
||||
self.validator_cache
|
||||
.mixnodes_blacklist()
|
||||
.await
|
||||
.into_inner()
|
||||
.len()
|
||||
);
|
||||
info!(
|
||||
"Blacklisted gateways: {}",
|
||||
self.validator_cache
|
||||
.gateways_blacklist()
|
||||
.await
|
||||
.into_inner()
|
||||
.len()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn run(&mut self) -> Result<(), RewardingError> {
|
||||
self.validator_cache.wait_for_initial_values().await;
|
||||
|
||||
loop {
|
||||
// wait until the cache refresher determined its time to update the rewarded/active sets
|
||||
self.update_rewarded_set_notify.notified().await;
|
||||
self.update_rewarded_set().await?;
|
||||
let time = OffsetDateTime::now_utc().unix_timestamp();
|
||||
let epoch = self.epoch().await?;
|
||||
let time_to_epoch_change = epoch.end_unix_timestamp() - time;
|
||||
if time_to_epoch_change <= 0 {
|
||||
self.update_blacklist(&epoch).await?;
|
||||
log::info!(
|
||||
"Time to epoch change is {}, updating rewarded set",
|
||||
time_to_epoch_change
|
||||
);
|
||||
self.update().await?;
|
||||
} else {
|
||||
log::info!(
|
||||
"Waiting for epoch change, time to epoch change is {}",
|
||||
time_to_epoch_change
|
||||
);
|
||||
// Sleep at most 300 before checking again, to keep logs busy
|
||||
let s = time_to_epoch_change.min(300).max(0) as u64;
|
||||
sleep(Duration::from_secs(s)).await;
|
||||
}
|
||||
// allow some blocks to pass
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
#[allow(unreachable_code)]
|
||||
Ok(())
|
||||
|
||||
@@ -11,8 +11,97 @@ pub(crate) struct StorageManager {
|
||||
pub(crate) connection_pool: sqlx::SqlitePool,
|
||||
}
|
||||
|
||||
pub struct AvgReliability {
|
||||
identity: String,
|
||||
value: Option<f32>,
|
||||
}
|
||||
|
||||
impl AvgReliability {
|
||||
pub fn identity(&self) -> &str {
|
||||
&self.identity
|
||||
}
|
||||
|
||||
pub fn value(&self) -> f32 {
|
||||
self.value.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
// all SQL goes here
|
||||
impl StorageManager {
|
||||
pub(super) async fn get_all_avg_mix_reliability_in_last_24hr(
|
||||
&self,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Vec<AvgReliability>, sqlx::Error> {
|
||||
let start_ts_secs = end_ts_secs - 86400;
|
||||
self.get_all_avg_mix_reliability_in_interval(start_ts_secs, end_ts_secs)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn get_all_avg_gateway_reliability_in_last_24hr(
|
||||
&self,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Vec<AvgReliability>, sqlx::Error> {
|
||||
let start_ts_secs = end_ts_secs - 86400;
|
||||
self.get_all_avg_gateway_reliability_in_interval(start_ts_secs, end_ts_secs)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn get_all_avg_mix_reliability_in_interval(
|
||||
&self,
|
||||
start_ts_secs: i64,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Vec<AvgReliability>, sqlx::Error> {
|
||||
let result = sqlx::query_as!(
|
||||
AvgReliability,
|
||||
r#"
|
||||
SELECT
|
||||
d.identity as "identity: String",
|
||||
AVG(s.reliability) as "value: f32"
|
||||
FROM
|
||||
mixnode_details d
|
||||
JOIN
|
||||
mixnode_status s on d.id = s.mixnode_details_id
|
||||
WHERE
|
||||
timestamp >= ? AND
|
||||
timestamp <= ?
|
||||
GROUP BY 1
|
||||
"#,
|
||||
start_ts_secs,
|
||||
end_ts_secs
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(super) async fn get_all_avg_gateway_reliability_in_interval(
|
||||
&self,
|
||||
start_ts_secs: i64,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Vec<AvgReliability>, sqlx::Error> {
|
||||
let result = sqlx::query_as!(
|
||||
AvgReliability,
|
||||
r#"
|
||||
SELECT
|
||||
d.identity as "identity: String",
|
||||
AVG(reliability) as "value: f32"
|
||||
FROM
|
||||
gateway_details d
|
||||
JOIN
|
||||
gateway_status s on d.id = s.gateway_details_id
|
||||
WHERE
|
||||
timestamp >= ? AND
|
||||
timestamp <= ?
|
||||
GROUP BY 1
|
||||
"#,
|
||||
start_ts_secs,
|
||||
end_ts_secs
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Tries to obtain row id of given mixnode given its identity.
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -247,6 +336,26 @@ impl StorageManager {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn get_average_reliability_in_interval(
|
||||
&self,
|
||||
id: i64,
|
||||
start: i64,
|
||||
end: i64,
|
||||
) -> Result<Option<f32>, sqlx::Error> {
|
||||
let result = sqlx::query!(
|
||||
r#"
|
||||
SELECT AVG(reliability) as "reliability: f32" FROM mixnode_status
|
||||
WHERE mixnode_details_id= ? AND timestamp >= ? AND timestamp <= ?
|
||||
"#,
|
||||
id,
|
||||
start,
|
||||
end
|
||||
)
|
||||
.fetch_one(&self.connection_pool)
|
||||
.await?;
|
||||
Ok(result.reliability)
|
||||
}
|
||||
|
||||
/// Gets all reliability statuses for gateway with particular id that were inserted
|
||||
/// into the database within the specified time interval.
|
||||
///
|
||||
|
||||
@@ -16,6 +16,8 @@ use sqlx::ConnectOptions;
|
||||
use std::path::PathBuf;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use self::manager::AvgReliability;
|
||||
|
||||
pub(crate) mod manager;
|
||||
pub(crate) mod models;
|
||||
|
||||
@@ -65,6 +67,32 @@ impl ValidatorApiStorage {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
|
||||
&self,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Vec<AvgReliability>, ValidatorApiStorageError> {
|
||||
let result = self
|
||||
.manager
|
||||
.get_all_avg_gateway_reliability_in_last_24hr(end_ts_secs)
|
||||
.await
|
||||
.map_err(|e| ValidatorApiStorageError::InternalDatabaseError(format!("{}", e)))?;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_avg_mix_reliability_in_last_24hr(
|
||||
&self,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Vec<AvgReliability>, ValidatorApiStorageError> {
|
||||
let result = self
|
||||
.manager
|
||||
.get_all_avg_mix_reliability_in_last_24hr(end_ts_secs)
|
||||
.await
|
||||
.map_err(|e| ValidatorApiStorageError::InternalDatabaseError(format!("{}", e)))?;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Gets all statuses for particular mixnode that were inserted
|
||||
/// since the provided timestamp.
|
||||
///
|
||||
@@ -258,6 +286,16 @@ impl ValidatorApiStorage {
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) async fn get_average_mixnode_uptime_in_the_last_24hrs(
|
||||
&self,
|
||||
identity: &str,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Uptime, ValidatorApiStorageError> {
|
||||
let start = end_ts_secs - 86400;
|
||||
self.get_average_mixnode_uptime_in_interval(identity, start, end_ts_secs)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Based on the data available in the validator API, determines the average uptime of particular
|
||||
/// mixnode during the specified time interval.
|
||||
///
|
||||
@@ -282,28 +320,17 @@ impl ValidatorApiStorage {
|
||||
None => return Ok(Uptime::zero()),
|
||||
};
|
||||
|
||||
let monitor_runs = self.get_monitor_runs_count(start, end).await?;
|
||||
let mixnode_statuses = self
|
||||
let reliability = self
|
||||
.manager
|
||||
.get_mixnode_statuses_by_id(mixnode_database_id, start, end)
|
||||
.get_average_reliability_in_interval(mixnode_database_id, start, end)
|
||||
.await
|
||||
.map_err(|e| ValidatorApiStorageError::InternalDatabaseError(format!("{}", e)))?;
|
||||
|
||||
let mut total: f32 = 0.0;
|
||||
for mixnode_status in mixnode_statuses {
|
||||
total += mixnode_status.reliability() as f32;
|
||||
if let Some(reliability) = reliability {
|
||||
Ok(Uptime::new(reliability))
|
||||
} else {
|
||||
Ok(Uptime::zero())
|
||||
}
|
||||
|
||||
let uptime = match Uptime::from_uptime_sum(total, monitor_runs) {
|
||||
Ok(uptime) => uptime,
|
||||
Err(_) => {
|
||||
// this should really ever happen...
|
||||
error!("mixnode {} has uptime > 100!", identity);
|
||||
Uptime::default()
|
||||
}
|
||||
};
|
||||
|
||||
Ok(uptime)
|
||||
}
|
||||
|
||||
/// Obtain status reports of mixnodes that were active in the specified time interval.
|
||||
|
||||
Reference in New Issue
Block a user