Compare commits

...

1 Commits

Author SHA1 Message Date
Drazen Urch 74ab09b05f Have reward set updater run its own timer (#1200)
* Have reward set updater run its own timer

* Filter rocket log spam

* Take last day of uptime for rewarding (#1202)

* Take last day of uptime for rewarding

* Rejigger calculations

* Blacklist based on last 24 hr

* Cleanup

* Clippy
2022-04-10 15:52:30 +02:00
14 changed files with 261 additions and 136 deletions
Generated
+2 -4
View File
@@ -717,8 +717,7 @@ checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "cosmos-sdk-proto"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0254ffee603f5301d6a66963d9e1cc5091479c22e2e925e1f7689c8027a0828"
source = "git+https://github.com/nymtech/cosmos-rust?branch=bugfix/account-id-length-validation#911fbe1236cfed591783ccef01018f7ccc97c496"
dependencies = [
"prost",
"prost-types",
@@ -728,8 +727,7 @@ dependencies = [
[[package]]
name = "cosmrs"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "505ea048e9ff2f906d6b954f9f8157d903ca468bfb301d906b40ecc25ba6838d"
source = "git+https://github.com/nymtech/cosmos-rust?branch=bugfix/account-id-length-validation#911fbe1236cfed591783ccef01018f7ccc97c496"
dependencies = [
"bip32",
"cosmos-sdk-proto",
+2 -2
View File
@@ -810,7 +810,7 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "mixnet-contract"
version = "1.0.0-rc.1"
version = "1.0.0"
dependencies = [
"az",
"bs58",
@@ -1518,7 +1518,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "vesting-contract"
version = "1.0.0-rc.1"
version = "1.0.0"
dependencies = [
"config",
"cosmwasm-std",
-8
View File
@@ -435,14 +435,6 @@ impl Config {
self.network_monitor.minimum_test_routes
}
pub fn get_min_mixnode_reliability(&self) -> u8 {
self.network_monitor.min_mixnode_reliability
}
pub fn get_min_gateway_reliability(&self) -> u8 {
self.network_monitor.min_gateway_reliability
}
pub fn get_route_test_packets(&self) -> usize {
self.network_monitor.route_test_packets
}
-5
View File
@@ -19,11 +19,6 @@ mixnet_contract_address = '{{ base.mixnet_contract_address }}'
##### network monitor config options #####
[network_monitor]
# Mixnodes and gateways with relialability lower the this get blacklisted by network monitor, get no traffic and cannot be selected into a rewarded set.
min_mixnode_reliability = {{ network_monitor.min_mixnode_reliability }} # deafults to 50
min_gateway_reliability = {{ network_monitor.min_gateway_reliability }} # defaults to 20
# Specifies whether network monitoring service is enabled in this process.
enabled = {{ network_monitor.enabled }}
+5 -18
View File
@@ -17,7 +17,7 @@ use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Notify, RwLock};
use tokio::sync::RwLock;
use tokio::time;
use validator_api_requests::models::MixnodeStatus;
use validator_client::nymd::CosmWasmClient;
@@ -28,7 +28,6 @@ pub struct ValidatorCacheRefresher<C> {
nymd_client: Client<C>,
cache: ValidatorCache,
caching_interval: Duration,
update_rewarded_set_notify: Option<Arc<Notify>>,
}
#[derive(Clone)]
@@ -89,13 +88,11 @@ impl<C> ValidatorCacheRefresher<C> {
nymd_client: Client<C>,
caching_interval: Duration,
cache: ValidatorCache,
update_rewarded_set_notify: Option<Arc<Notify>>,
) -> Self {
ValidatorCacheRefresher {
nymd_client,
cache,
caching_interval,
update_rewarded_set_notify,
}
}
@@ -136,6 +133,7 @@ impl<C> ValidatorCacheRefresher<C> {
self.collect_rewarded_and_active_set_details(&mixnodes, rewarded_set_identities);
let epoch_rewarding_params = self.nymd_client.get_current_epoch_reward_params().await?;
let current_epoch = self.nymd_client.get_current_epoch().await?;
info!(
"Updating validator cache. There are {} mixnodes and {} gateways",
@@ -150,23 +148,10 @@ impl<C> ValidatorCacheRefresher<C> {
rewarded_set,
active_set,
epoch_rewarding_params,
current_epoch,
)
.await;
if let Some(notify) = &self.update_rewarded_set_notify {
let update_details = self
.nymd_client
.get_current_rewarded_set_update_details()
.await?;
if update_details.last_refreshed_block + (update_details.refresh_rate_blocks as u64)
< update_details.current_height
{
// there's only ever a single waiter -> the set updater
notify.notify_one()
}
}
Ok(())
}
@@ -221,6 +206,7 @@ impl ValidatorCache {
rewarded_set: Vec<MixNodeBond>,
active_set: Vec<MixNodeBond>,
epoch_rewarding_params: EpochRewardParams,
current_epoch: Interval,
) {
let mut inner = self.inner.write().await;
@@ -229,6 +215,7 @@ impl ValidatorCache {
inner.rewarded_set.update(rewarded_set);
inner.active_set.update(active_set);
inner.current_reward_params.update(epoch_rewarding_params);
inner.current_epoch.update(Some(current_epoch));
}
pub async fn mixnodes_blacklist(&self) -> Cache<HashSet<IdentityKey>> {
+4 -11
View File
@@ -231,6 +231,8 @@ fn setup_logging() {
.filter_module("sled", log::LevelFilter::Warn)
.filter_module("tungstenite", log::LevelFilter::Warn)
.filter_module("tokio_tungstenite", log::LevelFilter::Warn)
.filter_module("_", log::LevelFilter::Warn)
.filter_module("rocket::server", log::LevelFilter::Warn)
.init();
}
@@ -461,14 +463,11 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
// if network monitor is disabled, we're not going to be sending any rewarding hence
// we're not starting signing client
if config.get_network_monitor_enabled() {
let rewarded_set_update_notify = Arc::new(Notify::new());
let nymd_client = Client::new_signing(&config);
let validator_cache_refresher = ValidatorCacheRefresher::new(
nymd_client.clone(),
config.get_caching_interval(),
validator_cache.clone(),
Some(Arc::clone(&rewarded_set_update_notify)),
);
// spawn our cacher
@@ -480,13 +479,8 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
let uptime_updater = HistoricalUptimeUpdater::new(storage.clone());
tokio::spawn(async move { uptime_updater.run().await });
let mut rewarded_set_updater = RewardedSetUpdater::new(
nymd_client,
rewarded_set_update_notify,
validator_cache.clone(),
storage,
)
.await?;
let mut rewarded_set_updater =
RewardedSetUpdater::new(nymd_client, validator_cache.clone(), storage).await?;
// spawn rewarded set updater
tokio::spawn(async move { rewarded_set_updater.run().await.unwrap() });
@@ -496,7 +490,6 @@ async fn run_validator_api(matches: ArgMatches<'static>) -> Result<()> {
nymd_client,
config.get_caching_interval(),
validator_cache,
None,
);
// spawn our cacher
@@ -42,8 +42,6 @@ pub(super) struct Monitor {
/// The minimum number of test routes that need to be constructed (and working) in order for
/// a monitor test run to be valid.
minimum_test_routes: usize,
min_mixnode_reliability: u8,
min_gateway_reliability: u8,
}
impl Monitor {
@@ -68,8 +66,6 @@ impl Monitor {
route_test_packets: config.get_route_test_packets(),
test_routes: config.get_test_routes(),
minimum_test_routes: config.get_minimum_test_routes(),
min_mixnode_reliability: config.get_min_mixnode_reliability(),
min_gateway_reliability: config.get_min_gateway_reliability(),
}
}
@@ -78,35 +74,6 @@ impl Monitor {
async fn submit_new_node_statuses(&mut self, test_summary: TestSummary) {
// indicate our run has completed successfully and should be used in any future
// uptime calculations
for result in test_summary.mixnode_results.iter() {
if result.reliability < self.min_mixnode_reliability {
self.packet_preparer
.validator_cache()
.insert_mixnodes_blacklist(result.identity.clone())
.await;
} else {
self.packet_preparer
.validator_cache()
.remove_mixnodes_blacklist(&result.identity)
.await;
}
}
for result in test_summary.gateway_results.iter() {
if result.reliability < self.min_gateway_reliability {
self.packet_preparer
.validator_cache()
.insert_gateways_blacklist(result.identity.clone())
.await;
} else {
self.packet_preparer
.validator_cache()
.remove_gateways_blacklist(&result.identity)
.await;
}
}
if let Err(err) = self
.node_status_storage
.insert_monitor_run_results(
@@ -151,10 +151,6 @@ impl PacketPreparer {
}
}
pub fn validator_cache(&mut self) -> &mut ValidatorCache {
&mut self.validator_cache
}
async fn wrap_test_packet(
&mut self,
packet: &TestPacket,
@@ -25,6 +25,15 @@ impl Uptime {
Uptime(0)
}
pub fn new(uptime: f32) -> Self {
if uptime > 100f32 {
error!("Got uptime {}, max is 100, returning 0", uptime);
Uptime(0)
} else {
Uptime(uptime as u8)
}
}
pub fn from_ratio(numerator: usize, denominator: usize) -> Result<Self, InvalidUptime> {
if denominator == 0 {
return Ok(Self::zero());
+7 -9
View File
@@ -118,18 +118,16 @@ pub(crate) async fn get_mixnode_reward_estimation(
) -> Result<Json<RewardEstimationResponse>, ErrorResponse> {
let (bond, status) = cache.mixnode_details(&identity).await;
if let Some(bond) = bond {
let interval_reward_params = cache.epoch_reward_params().await;
let as_at = interval_reward_params.timestamp();
let interval_reward_params = interval_reward_params.into_inner();
let reward_params = cache.epoch_reward_params().await;
let as_at = reward_params.timestamp();
let reward_params = reward_params.into_inner();
let current_epoch = cache.current_epoch().await.into_inner();
info!("{:?}", current_epoch);
let uptime = if let Some(epoch) = current_epoch {
storage
.get_average_mixnode_uptime_in_interval(
&identity,
epoch.start_unix_timestamp(),
epoch.end_unix_timestamp(),
)
.get_average_mixnode_uptime_in_the_last_24hrs(&identity, epoch.end_unix_timestamp())
.await
.map_err(|err| ErrorResponse::new(err.to_string(), Status::NotFound))?
} else {
@@ -137,7 +135,7 @@ pub(crate) async fn get_mixnode_reward_estimation(
};
let node_reward_params = NodeRewardParams::new(0, uptime.u8() as u128, status.is_active());
let reward_params = RewardParams::new(interval_reward_params, node_reward_params);
let reward_params = RewardParams::new(reward_params, node_reward_params);
let epoch_start = if let Some(epoch) = current_epoch {
epoch.start_unix_timestamp()
} else {
-14
View File
@@ -8,7 +8,6 @@ use mixnet_contract_common::Interval;
use mixnet_contract_common::{
reward_params::EpochRewardParams, ContractStateParams, Delegation, ExecuteMsg, GatewayBond,
IdentityKey, MixNodeBond, MixnodeRewardingStatusResponse, RewardedSetNodeStatus,
RewardedSetUpdateDetails,
};
use serde::Serialize;
use std::sync::Arc;
@@ -265,19 +264,6 @@ impl<C> Client<C> {
.await
}
pub(crate) async fn get_current_rewarded_set_update_details(
&self,
) -> Result<RewardedSetUpdateDetails, ValidatorClientError>
where
C: CosmWasmClient + Sync,
{
self.0
.read()
.await
.get_current_rewarded_set_update_details()
.await
}
#[allow(dead_code)]
pub(crate) async fn advance_current_epoch(&self) -> Result<(), ValidatorClientError>
where
+79 -11
View File
@@ -22,9 +22,9 @@ use mixnet_contract_common::{IdentityKey, Interval, MixNodeBond};
use rand::prelude::SliceRandom;
use rand::rngs::OsRng;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use time::OffsetDateTime;
use tokio::time::sleep;
use validator_client::nymd::{CosmosCoin, SigningNymdClient};
pub(crate) mod error;
@@ -62,7 +62,6 @@ type Epoch = Interval;
pub struct RewardedSetUpdater {
nymd_client: Client<SigningNymdClient>,
update_rewarded_set_notify: Arc<Notify>,
validator_cache: ValidatorCache,
storage: ValidatorApiStorage,
}
@@ -74,13 +73,11 @@ impl RewardedSetUpdater {
pub(crate) async fn new(
nymd_client: Client<SigningNymdClient>,
update_rewarded_set_notify: Arc<Notify>,
validator_cache: ValidatorCache,
storage: ValidatorApiStorage,
) -> Result<Self, RewardingError> {
Ok(RewardedSetUpdater {
nymd_client,
update_rewarded_set_notify,
validator_cache,
storage,
})
@@ -139,7 +136,7 @@ impl RewardedSetUpdater {
.insert_rewarding_report(rewarding_report)
.await?;
Ok(self.generate_reward_messages(&to_reward).await?)
self.generate_reward_messages(&to_reward).await
}
#[allow(unused_variables)]
@@ -177,9 +174,8 @@ impl RewardedSetUpdater {
for rewarded_node in rewarded_set.into_iter() {
let uptime = self
.storage
.get_average_mixnode_uptime_in_interval(
.get_average_mixnode_uptime_in_the_last_24hrs(
rewarded_node.identity(),
epoch.start_unix_timestamp(),
epoch.end_unix_timestamp(),
)
.await?;
@@ -200,7 +196,7 @@ impl RewardedSetUpdater {
}
// This is where the epoch gets advanced, and all epoch related transactions originate
async fn update_rewarded_set(&self) -> Result<(), RewardingError> {
async fn update(&self) -> Result<(), RewardingError> {
let epoch = self.epoch().await?;
log::info!("Starting rewarded set update");
// we know the entries are not stale, as a matter of fact they were JUST updated, since we got notified
@@ -242,13 +238,85 @@ impl RewardedSetUpdater {
Ok(())
}
async fn update_blacklist(&mut self, epoch: &Interval) -> Result<(), RewardingError> {
info!("Updating blacklist");
let mixnodes = self
.storage
.get_all_avg_mix_reliability_in_last_24hr(epoch.end_unix_timestamp())
.await?;
let gateways = self
.storage
.get_all_avg_gateway_reliability_in_last_24hr(epoch.end_unix_timestamp())
.await?;
for mix in mixnodes {
if mix.value() <= 50.0 {
self.validator_cache
.insert_mixnodes_blacklist(mix.identity().to_owned())
.await;
} else {
self.validator_cache
.remove_mixnodes_blacklist(mix.identity())
.await;
}
}
for gateway in gateways {
if gateway.value() <= 50.0 {
self.validator_cache
.insert_gateways_blacklist(gateway.identity().to_owned())
.await;
} else {
self.validator_cache
.remove_gateways_blacklist(gateway.identity())
.await;
}
}
info!(
"Blacklisted mixnodes: {}",
self.validator_cache
.mixnodes_blacklist()
.await
.into_inner()
.len()
);
info!(
"Blacklisted gateways: {}",
self.validator_cache
.gateways_blacklist()
.await
.into_inner()
.len()
);
Ok(())
}
pub(crate) async fn run(&mut self) -> Result<(), RewardingError> {
self.validator_cache.wait_for_initial_values().await;
loop {
// wait until the cache refresher determined its time to update the rewarded/active sets
self.update_rewarded_set_notify.notified().await;
self.update_rewarded_set().await?;
let time = OffsetDateTime::now_utc().unix_timestamp();
let epoch = self.epoch().await?;
let time_to_epoch_change = epoch.end_unix_timestamp() - time;
if time_to_epoch_change <= 0 {
self.update_blacklist(&epoch).await?;
log::info!(
"Time to epoch change is {}, updating rewarded set",
time_to_epoch_change
);
self.update().await?;
} else {
log::info!(
"Waiting for epoch change, time to epoch change is {}",
time_to_epoch_change
);
// Sleep at most 300 before checking again, to keep logs busy
let s = time_to_epoch_change.min(300).max(0) as u64;
sleep(Duration::from_secs(s)).await;
}
// allow some blocks to pass
sleep(Duration::from_secs(10)).await;
}
#[allow(unreachable_code)]
Ok(())
+109
View File
@@ -11,8 +11,97 @@ pub(crate) struct StorageManager {
pub(crate) connection_pool: sqlx::SqlitePool,
}
pub struct AvgReliability {
identity: String,
value: Option<f32>,
}
impl AvgReliability {
pub fn identity(&self) -> &str {
&self.identity
}
pub fn value(&self) -> f32 {
self.value.unwrap_or_default()
}
}
// all SQL goes here
impl StorageManager {
pub(super) async fn get_all_avg_mix_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgReliability>, sqlx::Error> {
let start_ts_secs = end_ts_secs - 86400;
self.get_all_avg_mix_reliability_in_interval(start_ts_secs, end_ts_secs)
.await
}
pub(super) async fn get_all_avg_gateway_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgReliability>, sqlx::Error> {
let start_ts_secs = end_ts_secs - 86400;
self.get_all_avg_gateway_reliability_in_interval(start_ts_secs, end_ts_secs)
.await
}
pub(super) async fn get_all_avg_mix_reliability_in_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
) -> Result<Vec<AvgReliability>, sqlx::Error> {
let result = sqlx::query_as!(
AvgReliability,
r#"
SELECT
d.identity as "identity: String",
AVG(s.reliability) as "value: f32"
FROM
mixnode_details d
JOIN
mixnode_status s on d.id = s.mixnode_details_id
WHERE
timestamp >= ? AND
timestamp <= ?
GROUP BY 1
"#,
start_ts_secs,
end_ts_secs
)
.fetch_all(&self.connection_pool)
.await?;
Ok(result)
}
pub(super) async fn get_all_avg_gateway_reliability_in_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
) -> Result<Vec<AvgReliability>, sqlx::Error> {
let result = sqlx::query_as!(
AvgReliability,
r#"
SELECT
d.identity as "identity: String",
AVG(reliability) as "value: f32"
FROM
gateway_details d
JOIN
gateway_status s on d.id = s.gateway_details_id
WHERE
timestamp >= ? AND
timestamp <= ?
GROUP BY 1
"#,
start_ts_secs,
end_ts_secs
)
.fetch_all(&self.connection_pool)
.await?;
Ok(result)
}
/// Tries to obtain row id of given mixnode given its identity.
///
/// # Arguments
@@ -247,6 +336,26 @@ impl StorageManager {
.await
}
pub(super) async fn get_average_reliability_in_interval(
&self,
id: i64,
start: i64,
end: i64,
) -> Result<Option<f32>, sqlx::Error> {
let result = sqlx::query!(
r#"
SELECT AVG(reliability) as "reliability: f32" FROM mixnode_status
WHERE mixnode_details_id= ? AND timestamp >= ? AND timestamp <= ?
"#,
id,
start,
end
)
.fetch_one(&self.connection_pool)
.await?;
Ok(result.reliability)
}
/// Gets all reliability statuses for gateway with particular id that were inserted
/// into the database within the specified time interval.
///
+44 -17
View File
@@ -16,6 +16,8 @@ use sqlx::ConnectOptions;
use std::path::PathBuf;
use time::OffsetDateTime;
use self::manager::AvgReliability;
pub(crate) mod manager;
pub(crate) mod models;
@@ -65,6 +67,32 @@ impl ValidatorApiStorage {
})
}
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgReliability>, ValidatorApiStorageError> {
let result = self
.manager
.get_all_avg_gateway_reliability_in_last_24hr(end_ts_secs)
.await
.map_err(|e| ValidatorApiStorageError::InternalDatabaseError(format!("{}", e)))?;
Ok(result)
}
pub(crate) async fn get_all_avg_mix_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgReliability>, ValidatorApiStorageError> {
let result = self
.manager
.get_all_avg_mix_reliability_in_last_24hr(end_ts_secs)
.await
.map_err(|e| ValidatorApiStorageError::InternalDatabaseError(format!("{}", e)))?;
Ok(result)
}
/// Gets all statuses for particular mixnode that were inserted
/// since the provided timestamp.
///
@@ -258,6 +286,16 @@ impl ValidatorApiStorage {
))
}
pub(crate) async fn get_average_mixnode_uptime_in_the_last_24hrs(
&self,
identity: &str,
end_ts_secs: i64,
) -> Result<Uptime, ValidatorApiStorageError> {
let start = end_ts_secs - 86400;
self.get_average_mixnode_uptime_in_interval(identity, start, end_ts_secs)
.await
}
/// Based on the data available in the validator API, determines the average uptime of particular
/// mixnode during the specified time interval.
///
@@ -282,28 +320,17 @@ impl ValidatorApiStorage {
None => return Ok(Uptime::zero()),
};
let monitor_runs = self.get_monitor_runs_count(start, end).await?;
let mixnode_statuses = self
let reliability = self
.manager
.get_mixnode_statuses_by_id(mixnode_database_id, start, end)
.get_average_reliability_in_interval(mixnode_database_id, start, end)
.await
.map_err(|e| ValidatorApiStorageError::InternalDatabaseError(format!("{}", e)))?;
let mut total: f32 = 0.0;
for mixnode_status in mixnode_statuses {
total += mixnode_status.reliability() as f32;
if let Some(reliability) = reliability {
Ok(Uptime::new(reliability))
} else {
Ok(Uptime::zero())
}
let uptime = match Uptime::from_uptime_sum(total, monitor_runs) {
Ok(uptime) => uptime,
Err(_) => {
// this should really ever happen...
error!("mixnode {} has uptime > 100!", identity);
Uptime::default()
}
};
Ok(uptime)
}
/// Obtain status reports of mixnodes that were active in the specified time interval.