Feature/rust rewarding (#750)

* Calculating gas fees

* Ability to set custom fees

* Added extra test

* Removed commented code

* Moved all msg types to common contract crate

* Temporarily disabling get_tx method

* Finishing up nymd client API

* Comment fix

* Remaining fee values

* Some cleanup

* Removed needless borrow

* Fixed imports in contract tests

* Moved error types around

* New ValidatorClient

* Experiment with new type of defaults

* Removed dead module

* Dealt with unwrap

* Migrated mixnode to use new validator client

* Migrated gateway to use new validator client

* Mixnode and gateway adjustments

* More exported defaults

* Clients using new validator client

* Fixed mixnode upgrade

* Moved default values to a new crate

* Changed behaviour of validator client features

* Migrated basic functions of validator api

* Updated config + fixed startup

* Fixed wasm client build

* Integration with the explorer api

* Removed tokio dev dependency

* Needless borrow

* Fixex wasm client build

* Fixed tauri client build

* Needless borrows

* New tables for rewarding

* Updated cosmos-sdk version

* Removed reward-specific node status routes

* New rewarding-specific config entries

* Additional network defaults

* Initial periodic rewards from validator api

* Replaced print with log

* Filtering nodes with uptime > 0

* Additional failure logging statements

* Fixed operation ordering

* Adjusted next rewarding epoch determination

* Modified rewarding behaviour to keep track of rewarding in progress

* Improved error message on config load failure

* Additional log statement

* Adjusted rewarding gas limit calculation

* Made naming slightly more consistent

* Fixed incorrect parentheses placement

* Fixed fee calculation

* Cargo fmt

* Removed failed merge artifacts

* Introduced comment for any future reward modification

* typos

* Helper functions for the future

* Making @mfahampshire 's life easier

* Redesigned epoch + rewarding skipped epochs (if possible)

* Removed old merge artifacts

* Naming consistency

* Constraining arguments

* Removed unnecessary if branch

* Ignore monitor check for current epoch

* Additional checks for current epoch data

* Monitor threshold check

* cargo fmt

* Fixed post-merge issues in transactions.rs
This commit is contained in:
Jędrzej Stuczyński
2021-09-24 15:49:21 +01:00
committed by GitHub
parent 5dfaff6296
commit 020cad897d
30 changed files with 2578 additions and 334 deletions
Generated
+22 -2
View File
@@ -3003,6 +3003,7 @@ checksum = "c44922cb3dbb1c70b5e5f443d63b64363a898564d739ba5198e3a9138442868d"
name = "network-defaults"
version = "0.1.0"
dependencies = [
"time 0.3.1",
"url",
]
@@ -3328,6 +3329,8 @@ dependencies = [
"serde",
"serde_json",
"sqlx",
"thiserror",
"time 0.3.1",
"tokio",
"topology",
"url",
@@ -5255,7 +5258,6 @@ dependencies = [
"sqlx-rt",
"stringprep",
"thiserror",
"time 0.2.27",
"tokio-stream",
"url",
"webpki",
@@ -5883,11 +5885,23 @@ dependencies = [
"libc",
"standback",
"stdweb",
"time-macros",
"time-macros 0.1.1",
"version_check",
"winapi",
]
[[package]]
name = "time"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a776787d9c5d455bec3db044586ccdd8a9c74d5da5dc319fb80f3db08808fe6"
dependencies = [
"itoa",
"libc",
"serde",
"time-macros 0.2.1",
]
[[package]]
name = "time-macros"
version = "0.1.1"
@@ -5898,6 +5912,12 @@ dependencies = [
"time-macros-impl",
]
[[package]]
name = "time-macros"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04a153416002296880a3b51329a0e3df31c779c53ec827993e865ce427982843"
[[package]]
name = "time-macros-impl"
version = "0.1.2"
@@ -158,6 +158,10 @@ impl<C> Client<C> {
self.mixnet_contract_address = Some(mixnet_contract_address)
}
pub fn get_mixnet_contract_address(&self) -> Option<cosmrs::AccountId> {
self.mixnet_contract_address.clone()
}
pub async fn get_cached_mixnodes(&self) -> Result<Vec<MixNodeBond>, ValidatorClientError> {
Ok(self.validator_api.get_mixnodes().await?)
}
@@ -15,6 +15,7 @@ use cosmrs::rpc::{Error as TendermintRpcError, HttpClient, HttpClientUrl, Simple
use cosmrs::staking::{MsgDelegate, MsgUndelegate};
use cosmrs::tx::{Fee, Msg, MsgType, SignDoc, SignerInfo};
use cosmrs::{cosmwasm, rpc, tx, AccountId, Coin};
use log::debug;
use serde::Serialize;
use sha2::Digest;
use sha2::Sha256;
@@ -256,6 +257,48 @@ pub trait SigningCosmWasmClient: CosmWasmClient {
})
}
async fn execute_multiple<I, M>(
&self,
sender_address: &AccountId,
contract_address: &AccountId,
msgs: I,
fee: Fee,
memo: impl Into<String> + Send + 'static,
) -> Result<ExecuteResult, NymdError>
where
I: IntoIterator<Item = (M, Vec<Coin>)> + Send,
M: Serialize,
{
let messages = msgs
.into_iter()
.map(|(msg, funds)| {
cosmwasm::MsgExecuteContract {
sender: sender_address.clone(),
contract: contract_address.clone(),
msg: serde_json::to_vec(&msg)?,
funds,
}
.to_msg()
.map_err(|_| NymdError::SerializationError("MsgExecuteContract".to_owned()))
})
.collect::<Result<_, _>>()?;
let tx_res = self
.sign_and_broadcast_commit(sender_address, messages, fee, memo)
.await?
.check_response()?;
debug!(
"gas wanted: {:?}, gas used: {:?}",
tx_res.deliver_tx.gas_wanted, tx_res.deliver_tx.gas_used
);
Ok(ExecuteResult {
logs: parse_raw_logs(tx_res.deliver_tx.log)?,
transaction_hash: tx_res.hash,
})
}
async fn send_tokens(
&self,
sender_address: &AccountId,
@@ -4,7 +4,6 @@
use crate::nymd::GasPrice;
use cosmrs::tx::{Fee, Gas};
use cosmrs::Coin;
use cosmwasm_std::Uint128;
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub enum Operation {
@@ -28,13 +27,7 @@ pub enum Operation {
}
pub(crate) fn calculate_fee(gas_price: &GasPrice, gas_limit: Gas) -> Coin {
let limit_uint128 = Uint128::from(gas_limit.value());
let amount = gas_price.amount * limit_uint128;
assert!(amount.u128() <= u64::MAX as u128);
Coin {
denom: gas_price.denom.clone(),
amount: (amount.u128() as u64).into(),
}
gas_price * gas_limit
}
impl Operation {
@@ -61,16 +54,20 @@ impl Operation {
}
}
pub(crate) fn determine_fee(&self, gas_price: &GasPrice, gas_limit: Option<Gas>) -> Fee {
pub(crate) fn determine_custom_fee(gas_price: &GasPrice, gas_limit: Gas) -> Fee {
// we need to know 2 of the following 3 parameters (the third one is being implicit) in order to construct Fee:
// (source: https://docs.cosmos.network/v0.42/basics/gas-fees.html)
// - gas price
// - gas limit
// - fees
let gas_limit = gas_limit.unwrap_or_else(|| self.default_gas_limit());
let fee = calculate_fee(gas_price, gas_limit);
Fee::from_amount_and_gas(fee, gas_limit)
}
pub(crate) fn determine_fee(&self, gas_price: &GasPrice, gas_limit: Option<Gas>) -> Fee {
let gas_limit = gas_limit.unwrap_or_else(|| self.default_gas_limit());
Self::determine_custom_fee(gas_price, gas_limit)
}
}
#[cfg(test)]
@@ -3,8 +3,10 @@
use crate::nymd::error::NymdError;
use config::defaults;
use cosmrs::Denom;
use cosmwasm_std::Decimal;
use cosmrs::tx::Gas;
use cosmrs::{Coin, Denom};
use cosmwasm_std::{Decimal, Fraction, Uint128};
use std::ops::Mul;
use std::str::FromStr;
/// A gas price, i.e. the price of a single unit of gas. This is typically a fraction of
@@ -18,6 +20,36 @@ pub struct GasPrice {
pub denom: Denom,
}
impl<'a> Mul<Gas> for &'a GasPrice {
type Output = Coin;
fn mul(self, gas_limit: Gas) -> Self::Output {
let limit_uint128 = Uint128::from(gas_limit.value());
let mut amount = self.amount * limit_uint128;
let gas_price_numerator = self.amount.numerator();
let gas_price_denominator = self.amount.denominator();
// gas price is a fraction of the smallest fee token unit, so we must ensure that
// for any multiplication, we have rounded up
//
// I don't really like the this solution as it has a theoretical chance of
// overflowing (internally cosmwasm uses U256 to avoid that)
// however, realistically that is impossible to happen as the resultant value
// would have to be way higher than our token limit of 10^15 (1 billion of tokens * 1 million for denomination)
// and max value of u128 is approximately 10^38
if limit_uint128.u128() * gas_price_numerator > amount.u128() * gas_price_denominator {
amount += Uint128::new(1);
}
assert!(amount.u128() <= u64::MAX as u128);
Coin {
denom: self.denom.clone(),
amount: (amount.u128() as u64).into(),
}
}
}
impl FromStr for GasPrice {
type Err = NymdError;
@@ -78,4 +110,15 @@ mod tests {
assert!("0.025 upunk".parse::<GasPrice>().is_err());
assert!("0.025UPUNK".parse::<GasPrice>().is_err());
}
#[test]
fn gas_limit_multiplication() {
// real world example that caused an issue when the result was rounded down
let gas_price: GasPrice = "0.025upunk".parse().unwrap();
let gas_limit: Gas = 157500u64.into();
let fee = &gas_price * gas_limit;
// the failing behaviour was result value of 3937
assert_eq!(fee.amount, 3938u64.into());
}
}
@@ -11,7 +11,6 @@ use crate::nymd::fee_helpers::Operation;
use crate::nymd::wallet::DirectSecp256k1HdWallet;
use cosmrs::rpc::endpoint::broadcast;
use cosmrs::rpc::{Error as TendermintRpcError, HttpClientUrl};
use cosmrs::tx::{Fee, Gas};
use cosmwasm_std::Coin;
use mixnet_contract::{
@@ -29,6 +28,8 @@ pub use crate::nymd::cosmwasm_client::client::CosmWasmClient;
pub use crate::nymd::cosmwasm_client::signing_client::SigningCosmWasmClient;
pub use crate::nymd::gas_price::GasPrice;
pub use cosmrs::rpc::HttpClient as QueryNymdClient;
pub use cosmrs::tendermint::Time as TendermintTime;
pub use cosmrs::tx::{Fee, Gas};
pub use cosmrs::Coin as CosmosCoin;
pub use cosmrs::{AccountId, Denom};
pub use signing_client::Client as SigningNymdClient;
@@ -150,6 +151,17 @@ impl<C> NymdClient<C> {
operation.determine_fee(&self.gas_price, gas_limit)
}
pub fn calculate_custom_fee(&self, gas_limit: impl Into<Gas>) -> Fee {
Operation::determine_custom_fee(&self.gas_price, gas_limit.into())
}
pub async fn get_current_block_timestamp(&self) -> Result<TendermintTime, NymdError>
where
C: CosmWasmClient + Sync,
{
Ok(self.client.get_block(None).await?.block.header.time)
}
pub async fn get_balance(&self, address: &AccountId) -> Result<Option<CosmosCoin>, NymdError>
where
C: CosmWasmClient + Sync,
@@ -391,6 +403,23 @@ impl<C> NymdClient<C> {
.await
}
pub async fn execute_multiple<I, M>(
&self,
contract_address: &AccountId,
msgs: I,
fee: Fee,
memo: impl Into<String> + Send + 'static,
) -> Result<ExecuteResult, NymdError>
where
C: SigningCosmWasmClient + Sync,
I: IntoIterator<Item = (M, Vec<CosmosCoin>)> + Send,
M: Serialize,
{
self.client
.execute_multiple(self.address(), contract_address, msgs, fee, memo)
.await
}
pub async fn upload(
&self,
wasm_code: Vec<u8>,
+1
View File
@@ -8,3 +8,4 @@ edition = "2018"
[dependencies]
url = "2.2"
time = { version = "0.3", features = ["macros"] }
+6
View File
@@ -1,6 +1,8 @@
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::time::Duration;
use time::OffsetDateTime;
use url::Url;
pub struct ValidatorDetails<'a> {
@@ -80,3 +82,7 @@ pub const DEFAULT_SOCKS5_LISTENING_PORT: u16 = 1080;
pub const DEFAULT_VALIDATOR_API_PORT: u16 = 8080;
pub const VALIDATOR_API_VERSION: &str = "v1";
// REWARDING
pub const DEFAULT_FIRST_EPOCH_START: OffsetDateTime = time::macros::datetime!(2021-08-23 12:00 UTC);
pub const DEFAULT_EPOCH_LENGTH: Duration = Duration::from_secs(24 * 60 * 60); // 24h
+19 -2
View File
@@ -408,9 +408,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.95"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "789da6d93f1b866ffe175afc5322a4d76c038605a1c3319bb57b06967ca98a36"
checksum = "a1fa8cddc8fbbee11227ef194b5317ed014b8acbf15139bd716a18ad3fe99ec5"
[[package]]
name = "log"
@@ -461,6 +461,7 @@ dependencies = [
name = "network-defaults"
version = "0.1.0"
dependencies = [
"time",
"url",
]
@@ -756,6 +757,22 @@ dependencies = [
"syn",
]
[[package]]
name = "time"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a776787d9c5d455bec3db044586ccdd8a9c74d5da5dc319fb80f3db08808fe6"
dependencies = [
"libc",
"time-macros",
]
[[package]]
name = "time-macros"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04a153416002296880a3b51329a0e3df31c779c53ec827993e865ce427982843"
[[package]]
name = "tinyvec"
version = "1.3.1"
+9 -1
View File
@@ -425,6 +425,10 @@ pub(crate) fn try_update_state_params(
Ok(Response::default())
}
// Note: if any changes are made to this function or anything it is calling down the stack,
// for example delegation reward distribution, the gas limits must be retested and both
// validator-api/src/rewarding/mod.rs::{MIXNODE_REWARD_OP_BASE_GAS_LIMIT, PER_MIXNODE_DELEGATION_GAS_INCREASE}
// must be updated appropriately.
pub(crate) fn try_reward_mixnode(
deps: DepsMut,
env: Env,
@@ -496,6 +500,10 @@ pub(crate) fn try_reward_mixnode(
})
}
// Note: if any changes are made to this function or anything it is calling down the stack,
// for example delegation reward distribution, the gas limits must be retested and both
// validator-api/src/rewarding/mod.rs::{GATEWAY_REWARD_OP_BASE_GAS_LIMIT, PER_GATEWAY_DELEGATION_GAS_INCREASE}
// must be updated appropriately.
pub(crate) fn try_reward_gateway(
deps: DepsMut,
env: Env,
@@ -524,7 +532,7 @@ pub(crate) fn try_reward_gateway(
}
// check if the bond even exists
let mut current_bond = match gateways(deps.storage).load(gateway_identity.as_bytes()) {
let mut current_bond = match gateways_read(deps.storage).load(gateway_identity.as_bytes()) {
Ok(bond) => bond,
Err(_) => {
return Ok(Response {
+12 -11
View File
@@ -28,25 +28,26 @@ rocket = { version = "0.5.0-rc.1", features = ["json"] }
serde = "1.0"
serde_json = "1.0"
tokio = { version = "1.4", features = ["rt-multi-thread", "macros", "signal", "time"] }
rocket_cors = { git = "https://github.com/lawliet89/rocket_cors", rev = "dfd3662c49e2f6fc37df35091cb94d82f7fb5915" }
rocket_cors = { git="https://github.com/lawliet89/rocket_cors", rev="dfd3662c49e2f6fc37df35091cb94d82f7fb5915" }
url = "2.2"
thiserror = "1"
time = { version = "0.3", features = ["serde-human-readable", "parsing"]}
anyhow = "1"
getset = "0.1.1"
rocket_sync_db_pools = { version = "0.1.0-rc.1", default-features = false }
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"] }
rocket_sync_db_pools = {version = "0.1.0-rc.1", default-features = false}
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"]}
## internal
config = { path = "../common/config" }
crypto = { path = "../common/crypto" }
gateway-client = { path = "../common/client-libs/gateway-client" }
mixnet-contract = { path = "../common/mixnet-contract" }
nymsphinx = { path = "../common/nymsphinx" }
topology = { path = "../common/topology" }
validator-client = { path = "../common/client-libs/validator-client", features = ["nymd-client"] }
version-checker = { path = "../common/version-checker" }
crypto = { path="../common/crypto" }
gateway-client = { path="../common/client-libs/gateway-client" }
mixnet-contract = { path="../common/mixnet-contract" }
nymsphinx = { path="../common/nymsphinx" }
topology = { path="../common/topology" }
validator-client = { path="../common/client-libs/validator-client", features = ["nymd-client"] }
version-checker = { path="../common/version-checker" }
coconut-interface = { path = "../common/coconut-interface" }
credentials = { path = "../common/credentials" }
@@ -0,0 +1,74 @@
-- table to write information about any rewarding that has already begun
-- in case the process crashes during the procedure.
-- this would prevent people from somehow purposely crashing it and getting multiple rewards
-- per epoch
CREATE TABLE epoch_rewarding
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
epoch_timestamp INTEGER NOT NULL,
finished BOOLEAN NOT NULL
);
-- for each epoch there shall be a summary
CREATE TABLE rewarding_report
(
epoch_rewarding_id INTEGER NOT NULL,
eligible_mixnodes INTEGER NOT NULL,
eligible_gateways INTEGER NOT NULL,
possibly_unrewarded_mixnodes INTEGER NOT NULL,
possibly_unrewarded_gateways INTEGER NOT NULL,
FOREIGN KEY (epoch_rewarding_id) REFERENCES epoch_rewarding (id)
);
-- containing possibly many (ideally zero!) failed reward entries
-- (this refers to a reward chunk)
CREATE TABLE failed_mixnode_reward_chunk
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
error_message VARCHAR NOT NULL,
reward_summary_id INTEGER NOT NULL,
FOREIGN KEY (reward_summary_id) REFERENCES epoch_rewarding (id)
);
CREATE TABLE failed_gateway_reward_chunk
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
error_message VARCHAR NOT NULL,
reward_summary_id INTEGER NOT NULL,
FOREIGN KEY (reward_summary_id) REFERENCES epoch_rewarding (id)
);
-- and each such failed_mixnode_reward_chunk contain mixnodes that might have been unrewarded
-- (but we don't know for sure - at least in typescript we could have gotten a timeout yet the tx still was executed)
-- this table only exists because sqlite has no arrays
CREATE TABLE possibly_unrewarded_mixnode
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
identity VARCHAR NOT NULL,
uptime INTEGER NOT NULL,
failed_mixnode_reward_chunk_id INTEGER NOT NULL,
FOREIGN KEY (failed_mixnode_reward_chunk_id) REFERENCES failed_mixnode_reward_chunk (id)
);
CREATE TABLE possibly_unrewarded_gateway
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
identity VARCHAR NOT NULL,
uptime INTEGER NOT NULL,
failed_gateway_reward_chunk_id INTEGER NOT NULL,
FOREIGN KEY (failed_gateway_reward_chunk_id) REFERENCES failed_gateway_reward_chunk (id)
)
+12
View File
@@ -138,6 +138,18 @@ impl ValidatorCache {
pub fn initialised(&self) -> bool {
self.inner.initialised.load(Ordering::Relaxed)
}
pub(crate) async fn wait_for_initial_values(&self) {
let initialisation_backoff = Duration::from_secs(5);
loop {
if self.initialised() {
break;
} else {
debug!("Validator cache hasn't been initialised yet - waiting for {:?} before trying again", initialisation_backoff);
tokio::time::sleep(initialisation_backoff).await;
}
}
}
}
impl ValidatorCacheInner {
+88 -13
View File
@@ -3,11 +3,15 @@
use crate::config::template::config_template;
use coconut_interface::{Base58, KeyPair};
use config::defaults::{default_api_endpoints, DEFAULT_MIXNET_CONTRACT_ADDRESS};
use config::defaults::{
default_api_endpoints, DEFAULT_EPOCH_LENGTH, DEFAULT_FIRST_EPOCH_START,
DEFAULT_MIXNET_CONTRACT_ADDRESS,
};
use config::NymConfig;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::time::Duration;
use time::OffsetDateTime;
use url::Url;
mod template;
@@ -23,6 +27,7 @@ const DEFAULT_GATEWAY_RESPONSE_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_GATEWAY_CONNECTION_TIMEOUT: Duration = Duration::from_millis(2_500);
const DEFAULT_CACHE_INTERVAL: Duration = Duration::from_secs(60);
const DEFAULT_MONITOR_THRESHOLD: u8 = 60;
#[derive(Debug, Default, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
@@ -38,6 +43,9 @@ pub struct Config {
#[serde(default)]
topology_cacher: TopologyCacher,
#[serde(default)]
rewarding: Rewarding,
}
impl NymConfig for Config {
@@ -66,16 +74,13 @@ impl NymConfig for Config {
}
#[derive(Debug, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(deny_unknown_fields, default)]
pub struct Base {
local_validator: Url,
/// Address of the validator contract managing the network
mixnet_contract_address: String,
/// Mnemonic (currently of the network monitor) used for rewarding
mnemonic: String,
// Avoid breaking derives for now
keypair_bs58: String,
}
@@ -87,14 +92,13 @@ impl Default for Base {
.parse()
.expect("default local validator is malformed!"),
mixnet_contract_address: DEFAULT_MIXNET_CONTRACT_ADDRESS.to_string(),
mnemonic: String::default(),
keypair_bs58: String::default(),
}
}
}
#[derive(Debug, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(deny_unknown_fields, default)]
pub struct NetworkMonitor {
/// Specifies whether network monitoring service is enabled in this process.
enabled: bool,
@@ -175,7 +179,7 @@ impl Default for NetworkMonitor {
}
#[derive(Debug, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(deny_unknown_fields, default)]
pub struct NodeStatusAPI {
/// Path to the database file containing uptime statuses for all mixnodes and gateways.
database_path: PathBuf,
@@ -196,7 +200,7 @@ impl Default for NodeStatusAPI {
}
#[derive(Debug, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(deny_unknown_fields, default)]
pub struct TopologyCacher {
#[serde(with = "humantime_serde")]
caching_interval: Duration,
@@ -210,6 +214,41 @@ impl Default for TopologyCacher {
}
}
#[derive(Debug, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Rewarding {
/// Specifies whether rewarding service is enabled in this process.
enabled: bool,
/// Mnemonic (currently of the network monitor) used for rewarding
mnemonic: String,
/// Datetime of the first rewarding epoch of the current length used for referencing
/// starting time of any subsequent epoch.
first_rewarding_epoch: OffsetDateTime,
/// Current length of the epoch. If modified `first_rewarding_epoch` should also get changed.
#[serde(with = "humantime_serde")]
epoch_length: Duration,
/// Specifies the minimum percentage of monitor test run data present in order to
/// distribute rewards for given epoch.
/// Note, only values in range 0-100 are valid
minimum_epoch_monitor_threshold: u8,
}
impl Default for Rewarding {
fn default() -> Self {
Rewarding {
enabled: false,
mnemonic: String::default(),
first_rewarding_epoch: DEFAULT_FIRST_EPOCH_START,
epoch_length: DEFAULT_EPOCH_LENGTH,
minimum_epoch_monitor_threshold: DEFAULT_MONITOR_THRESHOLD,
}
}
}
impl Config {
pub fn new() -> Self {
Config::default()
@@ -219,12 +258,17 @@ impl Config {
KeyPair::try_from_bs58(self.base.keypair_bs58.clone()).unwrap()
}
pub fn enabled_network_monitor(mut self, enabled: bool) -> Self {
pub fn with_network_monitor_enabled(mut self, enabled: bool) -> Self {
self.network_monitor.enabled = enabled;
self
}
pub fn detailed_network_monitor_report(mut self, detailed: bool) -> Self {
pub fn with_rewarding_enabled(mut self, enabled: bool) -> Self {
self.rewarding.enabled = enabled;
self
}
pub fn with_detailed_network_monitor_report(mut self, detailed: bool) -> Self {
self.network_monitor.print_detailed_report = detailed;
self
}
@@ -250,7 +294,7 @@ impl Config {
}
pub fn with_mnemonic<S: Into<String>>(mut self, mnemonic: S) -> Self {
self.base.mnemonic = mnemonic.into();
self.rewarding.mnemonic = mnemonic.into();
self
}
@@ -264,10 +308,29 @@ impl Config {
self
}
pub fn with_first_rewarding_epoch(mut self, first_epoch: OffsetDateTime) -> Self {
self.rewarding.first_rewarding_epoch = first_epoch;
self
}
pub fn with_epoch_length(mut self, epoch_length: Duration) -> Self {
self.rewarding.epoch_length = epoch_length;
self
}
pub fn with_minimum_epoch_monitor_threshold(mut self, threshold: u8) -> Self {
self.rewarding.minimum_epoch_monitor_threshold = threshold;
self
}
pub fn get_network_monitor_enabled(&self) -> bool {
self.network_monitor.enabled
}
pub fn get_rewarding_enabled(&self) -> bool {
self.rewarding.enabled
}
pub fn get_detailed_report(&self) -> bool {
self.network_monitor.print_detailed_report
}
@@ -289,7 +352,7 @@ impl Config {
}
pub fn get_mnemonic(&self) -> String {
self.base.mnemonic.clone()
self.rewarding.mnemonic.clone()
}
pub fn get_network_monitor_run_interval(&self) -> Duration {
@@ -331,4 +394,16 @@ impl Config {
pub fn get_all_validator_api_endpoints(&self) -> Vec<Url> {
self.network_monitor.all_validator_apis.clone()
}
pub fn get_first_rewarding_epoch(&self) -> OffsetDateTime {
self.rewarding.first_rewarding_epoch
}
pub fn get_epoch_length(&self) -> Duration {
self.rewarding.epoch_length
}
pub fn get_minimum_epoch_monitor_threshold(&self) -> u8 {
self.rewarding.minimum_epoch_monitor_threshold
}
}
+22
View File
@@ -72,5 +72,27 @@ packet_delivery_timeout = '{{ network_monitor.packet_delivery_timeout }}'
# Path to the database file containing uptime statuses for all mixnodes and gateways.
database_path = '{{ node_status_api.database_path }}'
##### rewarding config options #####
[rewarding]
# Specifies whether rewarding service is enabled in this process.
enabled = {{ rewarding.enabled }}
# Mnemonic (currently of the network monitor) used for rewarding
mnemonic = '{{ rewarding.mnemonic }}'
# Datetime of the first rewarding epoch of the current length used for referencing
# starting time of any subsequent epoch.
first_rewarding_epoch = '{{ rewarding.first_rewarding_epoch }}'
# Current length of the epoch. If modified `first_rewarding_epoch` should also get changed.
epoch_length = '{{ rewarding.epoch_length }}'
# Specifies the minimum percentage of monitor test run data present in order to
# distribute rewards for given epoch.
# Note, only values in range 0-100 are valid
minimum_epoch_monitor_threshold = {{ rewarding.minimum_epoch_monitor_threshold }}
"#
}
+146 -16
View File
@@ -9,21 +9,27 @@ use crate::config::Config;
use crate::network_monitor::tested_network::good_topology::parse_topology_file;
use crate::network_monitor::NetworkMonitorBuilder;
use crate::nymd_client::Client;
use crate::rewarding::epoch::Epoch;
use crate::rewarding::Rewarder;
use crate::storage::NodeStatusStorage;
use ::config::{defaults::DEFAULT_VALIDATOR_API_PORT, NymConfig};
use anyhow::Result;
use cache::ValidatorCache;
use clap::{App, Arg, ArgMatches};
use coconut::InternalSignRequest;
use log::info;
use log::{info, warn};
use rocket::fairing::AdHoc;
use rocket::http::Method;
use rocket::{Ignite, Rocket};
use rocket_cors::{AllowedHeaders, AllowedOrigins, Cors};
use std::process;
use std::sync::Arc;
use std::time::Duration;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::Notify;
use url::Url;
use validator_client::nymd::SigningNymdClient;
pub(crate) mod cache;
mod coconut;
@@ -31,9 +37,11 @@ pub(crate) mod config;
mod network_monitor;
mod node_status_api;
pub(crate) mod nymd_client;
mod rewarding;
pub(crate) mod storage;
const MONITORING_ENABLED: &str = "enable-monitor";
const REWARDING_ENABLED: &str = "enable-rewarding";
const V4_TOPOLOGY_ARG: &str = "v4-topology-filepath";
const V6_TOPOLOGY_ARG: &str = "v6-topology-filepath";
const API_VALIDATORS_ARG: &str = "api-validators";
@@ -44,6 +52,10 @@ const WRITE_CONFIG_ARG: &str = "save-config";
const KEYPAIR_ARG: &str = "keypair";
const NYMD_VALIDATOR_ARG: &str = "nymd-validator";
const EPOCH_LENGTH_ARG: &str = "epoch-length";
const FIRST_REWARDING_EPOCH_ARG: &str = "first-epoch";
const REWARDING_MONITOR_THRESHOLD_ARG: &str = "monitor-threshold";
pub(crate) const PENALISE_OUTDATED: bool = false;
fn parse_validators(raw: &str) -> Vec<Url> {
@@ -58,23 +70,33 @@ fn parse_validators(raw: &str) -> Vec<Url> {
}
fn parse_args<'a>() -> ArgMatches<'a> {
App::new("Nym Network Monitor")
App::new("Nym Validator API")
.author("Nymtech")
.arg(
Arg::with_name(MONITORING_ENABLED)
.help("specifies whether a network monitoring is enabled on this API")
.long(MONITORING_ENABLED)
.short("m")
)
.arg(
Arg::with_name(REWARDING_ENABLED)
.help("specifies whether a network rewarding is enabled on this API")
.long(REWARDING_ENABLED)
.short("r")
.requires(MONITORING_ENABLED)
)
.arg(
Arg::with_name(V4_TOPOLOGY_ARG)
.help("location of .json file containing IPv4 'good' network topology")
.long(V4_TOPOLOGY_ARG)
.requires(MONITORING_ENABLED)
)
.arg(
Arg::with_name(V6_TOPOLOGY_ARG)
.help("location of .json file containing IPv6 'good' network topology")
.long(V6_TOPOLOGY_ARG)
.takes_value(true)
.requires(MONITORING_ENABLED)
)
.arg(
Arg::with_name(NYMD_VALIDATOR_ARG)
@@ -96,22 +118,49 @@ fn parse_args<'a>() -> ArgMatches<'a> {
.arg(Arg::with_name(MNEMONIC_ARG)
.long(MNEMONIC_ARG)
.help("Mnemonic of the network monitor used for rewarding operators")
.takes_value(true),
.takes_value(true)
.requires(REWARDING_ENABLED),
)
.arg(
Arg::with_name(DETAILED_REPORT_ARG)
.help("specifies whether a detailed report should be printed after each run")
.long(DETAILED_REPORT_ARG)
.requires(MONITORING_ENABLED)
)
.arg(
Arg::with_name(WRITE_CONFIG_ARG)
.help("specifies whether a config file based on provided arguments should be saved to a file")
.long(WRITE_CONFIG_ARG)
.short("w")
)
.arg(Arg::with_name(KEYPAIR_ARG)
.help("Path to the secret key file")
.takes_value(true)
.long(KEYPAIR_ARG))
.arg(
Arg::with_name(KEYPAIR_ARG)
.help("Path to the secret key file")
.takes_value(true)
.long(KEYPAIR_ARG)
)
.arg(
Arg::with_name(FIRST_REWARDING_EPOCH_ARG)
.help("Datetime specifying beginning of the first rewarding epoch of this length. It must be a valid rfc3339 datetime.")
.takes_value(true)
.long(FIRST_REWARDING_EPOCH_ARG)
.requires(REWARDING_ENABLED)
)
.arg(
Arg::with_name(EPOCH_LENGTH_ARG)
.help("Length of the current rewarding epoch in hours")
.takes_value(true)
.long(EPOCH_LENGTH_ARG)
.requires(REWARDING_ENABLED)
)
.arg(
Arg::with_name(REWARDING_MONITOR_THRESHOLD_ARG)
.help("Specifies the minimum percentage of monitor test run data present in order to distribute rewards for given epoch.")
.takes_value(true)
.long(REWARDING_MONITOR_THRESHOLD_ARG)
.requires(REWARDING_ENABLED)
)
.get_matches()
}
@@ -148,7 +197,11 @@ fn setup_logging() {
fn override_config(mut config: Config, matches: &ArgMatches) -> Config {
if matches.is_present(MONITORING_ENABLED) {
config = config.enabled_network_monitor(true)
config = config.with_network_monitor_enabled(true)
}
if matches.is_present(REWARDING_ENABLED) {
config = config.with_rewarding_enabled(true)
}
if let Some(v4_topology_path) = matches.value_of(V4_TOPOLOGY_ARG) {
@@ -182,8 +235,34 @@ fn override_config(mut config: Config, matches: &ArgMatches) -> Config {
config = config.with_mnemonic(mnemonic)
}
if let Some(rewarding_epoch_datetime) = matches.value_of(FIRST_REWARDING_EPOCH_ARG) {
let first_epoch = OffsetDateTime::parse(rewarding_epoch_datetime, &Rfc3339)
.expect("Provided first epoch is not a valid rfc3339 datetime!");
config = config.with_first_rewarding_epoch(first_epoch)
}
if let Some(epoch_length) = matches
.value_of(EPOCH_LENGTH_ARG)
.map(|len| len.parse::<u64>())
{
let epoch_length = epoch_length.expect("Provided epoch length is not a number!");
config = config.with_epoch_length(Duration::from_secs(epoch_length * 60 * 60));
}
if let Some(monitor_threshold) = matches
.value_of(REWARDING_MONITOR_THRESHOLD_ARG)
.map(|t| t.parse::<u8>())
{
let monitor_threshold =
monitor_threshold.expect("Provided monitor threshold is not a number!");
if monitor_threshold > 100 {
panic!("Provided monitor threshold is greater than 100!");
}
config = config.with_minimum_epoch_monitor_threshold(monitor_threshold)
}
if matches.is_present(DETAILED_REPORT_ARG) {
config = config.detailed_network_monitor_report(true)
config = config.with_detailed_network_monitor_report(true)
}
if let Some(keypair_path) = matches.value_of(KEYPAIR_ARG) {
let keypair_bs58 = std::fs::read_to_string(keypair_path)
@@ -254,6 +333,45 @@ fn setup_network_monitor<'a>(
))
}
fn expected_monitor_test_runs(config: &Config) -> usize {
let epoch_length = config.get_epoch_length();
let test_delay = config.get_network_monitor_run_interval();
// this is just a rough estimate. In real world there will be slightly fewer test runs
// as they are not instantaneous and hence do not happen exactly every test_delay
(epoch_length.as_secs() / test_delay.as_secs()) as usize
}
fn setup_rewarder(
config: &Config,
rocket: &Rocket<Ignite>,
nymd_client: &Client<SigningNymdClient>,
) -> Option<Rewarder> {
if config.get_rewarding_enabled() && config.get_network_monitor_enabled() {
// get instances of managed states
let node_status_storage = rocket.state::<NodeStatusStorage>().unwrap().clone();
let validator_cache = rocket.state::<ValidatorCache>().unwrap().clone();
let first_epoch = Epoch::new(
config.get_first_rewarding_epoch(),
config.get_epoch_length(),
);
Some(Rewarder::new(
nymd_client.clone(),
validator_cache,
node_status_storage,
first_epoch,
expected_monitor_test_runs(config),
config.get_minimum_epoch_monitor_threshold(),
))
} else if config.get_rewarding_enabled() {
warn!("Cannot enable rewarding with the network monitor being disabled");
None
} else {
None
}
}
async fn setup_rocket(config: &Config, liftoff_notify: Arc<Notify>) -> Result<Rocket<Ignite>> {
// let's build our rocket!
let rocket_config = rocket::config::Config {
@@ -290,12 +408,13 @@ async fn main() -> Result<()> {
let config = match Config::load_from_file(None) {
Ok(cfg) => cfg,
Err(_) => {
let config_path = Config::default_config_file_path(None)
.into_os_string()
.into_string()
.unwrap();
warn!(
"Configuration file could not be found at {}. Using the default values.",
Config::default_config_file_path(None)
.into_os_string()
.into_string()
.unwrap()
"Could not load the configuration file from {}. Either the file did not exist or was malformed. Using the default values instead",
config_path
);
Config::new()
}
@@ -303,6 +422,10 @@ async fn main() -> Result<()> {
let matches = parse_args();
let config = override_config(config, &matches);
// if we just wanted to write data to the config, exit
if matches.is_present(WRITE_CONFIG_ARG) {
return Ok(());
}
let liftoff_notify = Arc::new(Notify::new());
// let's build our rocket!
@@ -316,13 +439,20 @@ async fn main() -> Result<()> {
if config.get_network_monitor_enabled() {
let nymd_client = Client::new_signing(&config);
let validator_cache_refresher = ValidatorCacheRefresher::new(
nymd_client,
nymd_client.clone(),
config.get_caching_interval(),
validator_cache,
validator_cache.clone(),
);
// spawn our cacher
tokio::spawn(async move { validator_cache_refresher.run().await });
if let Some(rewarder) = setup_rewarder(&config, &rocket, &nymd_client) {
info!("Periodic rewarding is starting...");
tokio::spawn(async move { rewarder.run().await });
} else {
info!("Periodic rewarding is disabled.");
}
} else {
let nymd_client = Client::new_query(&config);
let validator_cache_refresher = ValidatorCacheRefresher::new(
@@ -168,18 +168,31 @@ impl PacketPreparer {
}
pub(crate) async fn wait_for_validator_cache_initial_values(&self) {
let initialisation_backoff = Duration::from_secs(10);
// wait for the cache to get initialised
self.validator_cache.wait_for_initial_values().await;
// now wait for our "good" topology to be online
info!("Waiting for 'good' topology to be online");
let initialisation_backoff = Duration::from_secs(30);
loop {
if self.validator_cache.initialised() {
let gateways = self.validator_cache.gateways().await;
let mixnodes = self.validator_cache.mixnodes().await;
if self
.tested_network
.is_online(&mixnodes.into_inner(), &gateways.into_inner())
{
break;
} else {
debug!("Validator cache hasn't been initialised yet - waiting for {:?} before trying again", initialisation_backoff);
info!(
"Our 'good' topology is still not offline. Going to check again in {:?}",
initialisation_backoff
);
tokio::time::sleep(initialisation_backoff).await;
}
}
}
async fn get_network_nodes(&mut self) -> (Vec<MixNodeBond>, Vec<GatewayBond>) {
async fn get_network_nodes(&self) -> (Vec<MixNodeBond>, Vec<GatewayBond>) {
info!(target: "Monitor", "Obtaining network topology...");
let mixnodes = self.validator_cache.mixnodes().await.into_inner();
@@ -81,6 +81,8 @@ pub(crate) struct PacketSender {
// behaviour is unlikely.
active_gateway_clients: HashMap<[u8; PUBLIC_KEY_LENGTH], GatewayClient>,
// I guess that will be required later on if credentials are got per gateway
// aggregated_verification_key: Arc<VerificationKey>,
fresh_gateway_client_data: Arc<FreshGatewayClientData>,
gateway_connection_timeout: Duration,
max_concurrent_clients: usize,
@@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::network_monitor::test_packet::IpVersion;
use mixnet_contract::{GatewayBond, MixNodeBond};
use topology::{gateway, mix, NymTopology};
pub(crate) mod good_topology;
@@ -68,4 +69,40 @@ impl TestedNetwork {
pub(crate) fn v6_topology(&self) -> &NymTopology {
&self.good_v6_topology
}
/// Given slices of bonded mixnodes and gateways, checks whether all 'good' nodes are present
/// in the lists.
///
/// # Arguments
///
/// * `bonded_mixnodes`: slice of currently bonded mixnodes
/// * `bonded_gateways`: slice of currently bonded gateways
pub(crate) fn is_online(
&self,
bonded_mixnodes: &[MixNodeBond],
bonded_gateways: &[GatewayBond],
) -> bool {
// while technically this is not the most optimal way of checking all nodes as we have to
// go through entire slice multiple times, we only do it every 30s before monitor startup
// so it's not really that bad
for layer_mixes in self.good_v4_topology.mixes().values() {
for mix in layer_mixes {
if !bonded_mixnodes.iter().any(|bonded| {
bonded.mix_node.identity_key == mix.identity_key.to_base58_string()
}) {
return false;
}
}
}
for gateway in self.good_v4_topology.gateways() {
if !bonded_gateways.iter().any(|bonded| {
bonded.gateway.identity_key == gateway.identity_key.to_base58_string()
}) {
return false;
}
}
true
}
}
-3
View File
@@ -26,9 +26,6 @@ pub(crate) fn stage(database_path: PathBuf) -> AdHoc {
routes::gateway_report,
routes::mixnode_uptime_history,
routes::gateway_uptime_history,
routes::mixnodes_full_report,
routes::gateways_full_report,
routes::rewarding_chores,
],
)
})
+18 -18
View File
@@ -7,17 +7,17 @@ use rocket::http::{ContentType, Status};
use rocket::response::{self, Responder, Response};
use rocket::Request;
use serde::{Deserialize, Serialize};
use sqlx::types::time::OffsetDateTime;
use std::convert::TryFrom;
use std::fmt::{self, Display, Formatter};
use std::io::Cursor;
use time::OffsetDateTime;
// todo: put into some error enum
#[derive(Debug)]
pub struct InvalidUptime;
// value in range 0-100
#[derive(Clone, Serialize, Deserialize, Debug)]
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
pub struct Uptime(u8);
impl Uptime {
@@ -76,17 +76,17 @@ impl TryFrom<i64> for Uptime {
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct MixnodeStatusReport {
identity: String,
owner: String,
pub(crate) identity: String,
pub(crate) owner: String,
most_recent_ipv4: bool,
most_recent_ipv6: bool,
pub(crate) most_recent_ipv4: bool,
pub(crate) most_recent_ipv6: bool,
last_hour_ipv4: Uptime,
last_hour_ipv6: Uptime,
pub(crate) last_hour_ipv4: Uptime,
pub(crate) last_hour_ipv6: Uptime,
last_day_ipv4: Uptime,
last_day_ipv6: Uptime,
pub(crate) last_day_ipv4: Uptime,
pub(crate) last_day_ipv6: Uptime,
}
impl MixnodeStatusReport {
@@ -122,17 +122,17 @@ impl MixnodeStatusReport {
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct GatewayStatusReport {
identity: String,
owner: String,
pub(crate) identity: String,
pub(crate) owner: String,
most_recent_ipv4: bool,
most_recent_ipv6: bool,
pub(crate) most_recent_ipv4: bool,
pub(crate) most_recent_ipv6: bool,
last_hour_ipv4: Uptime,
last_hour_ipv6: Uptime,
pub(crate) last_hour_ipv4: Uptime,
pub(crate) last_hour_ipv6: Uptime,
last_day_ipv4: Uptime,
last_day_ipv6: Uptime,
pub(crate) last_day_ipv4: Uptime,
pub(crate) last_day_ipv6: Uptime,
}
impl GatewayStatusReport {
@@ -1,7 +1,6 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::node_status_api::local_guard::LocalRequest;
use crate::node_status_api::models::{
ErrorResponse, GatewayStatusReport, GatewayUptimeHistory, MixnodeStatusReport,
MixnodeUptimeHistory,
@@ -11,22 +10,6 @@ use rocket::http::Status;
use rocket::serde::json::Json;
use rocket::State;
#[get("/daily-chores")]
pub(crate) async fn rewarding_chores(
_local_request: LocalRequest,
storage: &State<NodeStatusStorage>,
) -> Result<&'static str, ErrorResponse> {
if storage
.daily_chores()
.await
.map_err(|err| ErrorResponse::new(err, Status::InternalServerError))?
{
Ok("Updated historical uptimes and purged old reports.")
} else {
Ok("The historical uptimes were already updated at some point today - nothing was done now.")
}
}
#[get("/mixnode/<pubkey>/report")]
pub(crate) async fn mixnode_report(
storage: &State<NodeStatusStorage>,
@@ -74,25 +57,3 @@ pub(crate) async fn gateway_uptime_history(
.map(Json)
.map_err(|err| ErrorResponse::new(err, Status::NotFound))
}
#[get("/mixnodes/all/report")]
pub(crate) async fn mixnodes_full_report(
storage: &State<NodeStatusStorage>,
) -> Result<Json<Vec<MixnodeStatusReport>>, ErrorResponse> {
storage
.get_all_mixnode_reports()
.await
.map(Json)
.map_err(|err| ErrorResponse::new(err, Status::InternalServerError))
}
#[get("/gateways/all/report")]
pub(crate) async fn gateways_full_report(
storage: &State<NodeStatusStorage>,
) -> Result<Json<Vec<GatewayStatusReport>>, ErrorResponse> {
storage
.get_all_gateway_reports()
.await
.map(Json)
.map_err(|err| ErrorResponse::new(err, Status::InternalServerError))
}
+1 -3
View File
@@ -5,15 +5,13 @@ use crate::node_status_api::models::Uptime;
use crate::node_status_api::{FIFTEEN_MINUTES, ONE_HOUR};
use crate::storage::models::NodeStatus;
use log::warn;
use sqlx::types::time::OffsetDateTime;
use std::cmp::min;
use time::OffsetDateTime;
// A temporary helper struct used to produce reports for active nodes.
pub(crate) struct ActiveNodeDayStatuses {
pub(crate) identity: String,
pub(crate) owner: String,
pub(crate) node_id: i64,
pub(crate) ipv4_statuses: Vec<NodeStatus>,
pub(crate) ipv6_statuses: Vec<NodeStatus>,
}
+164 -6
View File
@@ -2,19 +2,32 @@
// SPDX-License-Identifier: Apache-2.0
use crate::config::Config;
use crate::rewarding::{
error::RewardingError, GatewayToReward, MixnodeToReward, GATEWAY_REWARD_OP_BASE_GAS_LIMIT,
MIXNODE_REWARD_OP_BASE_GAS_LIMIT, PER_GATEWAY_DELEGATION_GAS_INCREASE,
PER_MIXNODE_DELEGATION_GAS_INCREASE, REWARDING_GAS_LIMIT_MULTIPLIER,
};
use config::defaults::DEFAULT_VALIDATOR_API_PORT;
use mixnet_contract::{GatewayBond, MixNodeBond};
use mixnet_contract::{Delegation, ExecuteMsg, GatewayBond, IdentityKey, MixNodeBond};
use std::sync::Arc;
use tokio::sync::RwLock;
use validator_client::nymd::{CosmWasmClient, QueryNymdClient, SigningNymdClient};
use validator_client::nymd::{
CosmWasmClient, Fee, QueryNymdClient, SigningCosmWasmClient, SigningNymdClient, TendermintTime,
};
use validator_client::ValidatorClientError;
#[derive(Clone)]
pub(crate) struct Client<C>(Arc<RwLock<validator_client::Client<C>>>);
impl<C> Clone for Client<C> {
fn clone(&self) -> Self {
Client(Arc::clone(&self.0))
}
}
impl Client<QueryNymdClient> {
pub(crate) fn new_query(config: &Config) -> Self {
// the api address is irrelevant here as **WE ARE THE API**
// and we won't be talking on the socket here.
let api_url = format!("http://localhost:{}", DEFAULT_VALIDATOR_API_PORT)
.parse()
.unwrap();
@@ -36,6 +49,7 @@ impl Client<QueryNymdClient> {
impl Client<SigningNymdClient> {
pub(crate) fn new_signing(config: &Config) -> Self {
// the api address is irrelevant here as **WE ARE THE API**
// and we won't be talking on the socket here.
let api_url = format!("http://localhost:{}", DEFAULT_VALIDATOR_API_PORT)
.parse()
.unwrap();
@@ -59,6 +73,25 @@ impl Client<SigningNymdClient> {
}
impl<C> Client<C> {
// a helper function for the future to obtain the current block timestamp
#[allow(dead_code)]
pub(crate) async fn current_block_timestamp(
&self,
) -> Result<TendermintTime, ValidatorClientError>
where
C: CosmWasmClient + Sync,
{
let time = self
.0
.read()
.await
.nymd
.get_current_block_timestamp()
.await?;
Ok(time)
}
pub(crate) async fn get_mixnodes(&self) -> Result<Vec<MixNodeBond>, ValidatorClientError>
where
C: CosmWasmClient + Sync,
@@ -73,8 +106,133 @@ impl<C> Client<C> {
self.0.read().await.get_all_nymd_gateways().await
}
#[allow(dead_code)]
pub(crate) async fn some_rewarding_stuff_here(&self) {
todo!()
pub(crate) async fn get_mixnode_delegations(
&self,
identity: IdentityKey,
) -> Result<Vec<Delegation>, ValidatorClientError>
where
C: CosmWasmClient + Sync,
{
self.0
.read()
.await
.get_all_nymd_mixnode_delegations(identity)
.await
}
pub(crate) async fn get_gateway_delegations(
&self,
identity: IdentityKey,
) -> Result<Vec<Delegation>, ValidatorClientError>
where
C: CosmWasmClient + Sync,
{
self.0
.read()
.await
.get_all_nymd_gateway_delegations(identity)
.await
}
async fn estimate_mixnode_reward_fees(&self, nodes: usize, total_delegations: usize) -> Fee {
let base_gas_limit = MIXNODE_REWARD_OP_BASE_GAS_LIMIT * nodes as u64
+ PER_MIXNODE_DELEGATION_GAS_INCREASE * total_delegations as u64;
let total_gas_limit = (base_gas_limit as f64 * REWARDING_GAS_LIMIT_MULTIPLIER) as u64;
self.0
.read()
.await
.nymd
.calculate_custom_fee(total_gas_limit)
}
async fn estimate_gateway_reward_fees(&self, nodes: usize, total_delegations: usize) -> Fee {
let base_gas_limit = GATEWAY_REWARD_OP_BASE_GAS_LIMIT * nodes as u64
+ PER_GATEWAY_DELEGATION_GAS_INCREASE * total_delegations as u64;
let total_gas_limit = (base_gas_limit as f64 * REWARDING_GAS_LIMIT_MULTIPLIER) as u64;
self.0
.read()
.await
.nymd
.calculate_custom_fee(total_gas_limit)
}
pub(crate) async fn reward_mixnodes(
&self,
nodes: &[MixnodeToReward],
) -> Result<(), RewardingError>
where
C: SigningCosmWasmClient + Sync,
{
let total_delegations = nodes.iter().map(|node| node.total_delegations).sum();
let fee = self
.estimate_mixnode_reward_fees(nodes.len(), total_delegations)
.await;
let msgs: Vec<(ExecuteMsg, _)> = nodes
.iter()
.map(Into::into)
.zip(std::iter::repeat(Vec::new()))
.collect();
let memo = format!("rewarding {} mixnodes", msgs.len());
let contract = self
.0
.read()
.await
.get_mixnet_contract_address()
.ok_or(RewardingError::UnspecifiedContractAddress)?;
// technically we don't require a write lock here, however, we really don't want to be executing
// multiple blocks concurrently as one of them WILL fail due to incorrect sequence number
self.0
.write()
.await
.nymd
.execute_multiple(&contract, msgs, fee, memo)
.await?;
Ok(())
}
pub(crate) async fn reward_gateways(
&self,
nodes: &[GatewayToReward],
) -> Result<(), RewardingError>
where
C: SigningCosmWasmClient + Sync,
{
let total_delegations = nodes.iter().map(|node| node.total_delegations).sum();
let fee = self
.estimate_gateway_reward_fees(nodes.len(), total_delegations)
.await;
let msgs: Vec<(ExecuteMsg, _)> = nodes
.iter()
.map(Into::into)
.zip(std::iter::repeat(Vec::new()))
.collect();
let memo = format!("rewarding {} gateways", msgs.len());
let contract = self
.0
.read()
.await
.get_mixnet_contract_address()
.ok_or(RewardingError::UnspecifiedContractAddress)?;
// technically we don't require a write lock here, however, we really don't want to be executing
// multiple blocks concurrently as one of them WILL fail due to incorrect sequence number
self.0
.write()
.await
.nymd
.execute_multiple(&contract, msgs, fee, memo)
.await?;
Ok(())
}
}
+219
View File
@@ -0,0 +1,219 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::UnixTimestamp;
use config::defaults::{DEFAULT_EPOCH_LENGTH, DEFAULT_FIRST_EPOCH_START};
use std::fmt::{Display, Formatter};
use std::time::Duration;
use time::OffsetDateTime;
// TODO: perhaps this should be moved to a commons crate?
// And become representation of system epoch?
/// Representation of rewarding epoch.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct Epoch {
start: OffsetDateTime,
length: Duration,
}
impl Epoch {
/// Creates new epoch instance.
pub const fn new(start: OffsetDateTime, length: Duration) -> Self {
Epoch { start, length }
}
/// Returns the next epoch.
pub fn next_epoch(&self) -> Self {
Epoch {
start: self.end(),
length: self.length,
}
}
/// Returns the last epoch.
pub fn previous_epoch(&self) -> Self {
Epoch {
start: self.start - self.length,
length: self.length,
}
}
/// Determines whether the provided datetime is contained within the epoch
///
/// # Arguments
///
/// * `datetime`: specified datetime
pub fn contains(&self, datetime: OffsetDateTime) -> bool {
self.start <= datetime && datetime <= self.end()
}
/// Returns new instance of [Epoch] such that the provided datetime would be within
/// its duration.
///
/// # Arguments
///
/// * `now`: current datetime
pub fn current(&self, now: OffsetDateTime) -> Self {
let mut candidate = *self;
if now > self.start {
loop {
if candidate.contains(now) {
return candidate;
}
candidate = candidate.next_epoch();
}
} else {
loop {
if candidate.contains(now) {
return candidate;
}
candidate = candidate.previous_epoch();
}
}
}
/// Returns the starting datetime of this epoch.
pub const fn start(&self) -> OffsetDateTime {
self.start
}
/// Returns the length of this epoch.
pub const fn length(&self) -> Duration {
self.length
}
/// Returns the ending datetime of this epoch.
pub fn end(&self) -> OffsetDateTime {
self.start + self.length
}
/// Returns the unix timestamp of the start of this epoch.
pub const fn start_unix_timestamp(&self) -> UnixTimestamp {
self.start().unix_timestamp()
}
/// Returns the unix timestamp of the end of this epoch.
pub fn end_unix_timestamp(&self) -> UnixTimestamp {
self.end().unix_timestamp()
}
}
impl Display for Epoch {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let length = self.length();
let hours = length.as_secs_f32() / 3600.0;
write!(
f,
"Epoch: {} - {} ({:.1} hours)",
self.start(),
self.end(),
hours
)
}
}
impl Default for Epoch {
fn default() -> Self {
Epoch {
start: DEFAULT_FIRST_EPOCH_START,
length: DEFAULT_EPOCH_LENGTH,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn previous_epoch() {
let epoch = Epoch {
start: time::macros::datetime!(2021-08-23 12:00 UTC),
length: Duration::from_secs(24 * 60 * 60),
};
let expected = Epoch {
start: time::macros::datetime!(2021-08-22 12:00 UTC),
length: Duration::from_secs(24 * 60 * 60),
};
assert_eq!(expected, epoch.previous_epoch())
}
#[test]
fn next_epoch() {
let epoch = Epoch {
start: time::macros::datetime!(2021-08-23 12:00 UTC),
length: Duration::from_secs(24 * 60 * 60),
};
let expected = Epoch {
start: time::macros::datetime!(2021-08-24 12:00 UTC),
length: Duration::from_secs(24 * 60 * 60),
};
assert_eq!(expected, epoch.next_epoch())
}
#[test]
fn checking_for_datetime_inclusion() {
let epoch = Epoch {
start: time::macros::datetime!(2021-08-23 12:00 UTC),
length: Duration::from_secs(24 * 60 * 60),
};
// it must contain its own boundaries
assert!(epoch.contains(epoch.start));
assert!(epoch.contains(epoch.end()));
let in_the_midle = epoch.start + Duration::from_secs(epoch.length.as_secs() / 2);
assert!(epoch.contains(in_the_midle));
assert!(!epoch.contains(epoch.next_epoch().end()));
assert!(!epoch.contains(epoch.previous_epoch().start()));
}
#[test]
fn determining_current_epoch() {
let first_epoch = Epoch {
start: time::macros::datetime!(2021-08-23 12:00 UTC),
length: Duration::from_secs(24 * 60 * 60),
};
// epoch just before
let fake_now = first_epoch.start - Duration::from_secs(123);
assert_eq!(first_epoch.previous_epoch(), first_epoch.current(fake_now));
// this epoch (start boundary)
assert_eq!(first_epoch, first_epoch.current(first_epoch.start));
// this epoch (in the middle)
let fake_now = first_epoch.start + Duration::from_secs(123);
assert_eq!(first_epoch, first_epoch.current(fake_now));
// this epoch (end boundary)
assert_eq!(first_epoch, first_epoch.current(first_epoch.end()));
// next epoch
let fake_now = first_epoch.end() + Duration::from_secs(123);
assert_eq!(first_epoch.next_epoch(), first_epoch.current(fake_now));
// few epochs in the past
let fake_now =
first_epoch.start() - first_epoch.length - first_epoch.length - first_epoch.length;
assert_eq!(
first_epoch
.previous_epoch()
.previous_epoch()
.previous_epoch(),
first_epoch.current(fake_now)
);
// few epochs in the future
let fake_now =
first_epoch.end() + first_epoch.length + first_epoch.length + first_epoch.length;
assert_eq!(
first_epoch.next_epoch().next_epoch().next_epoch(),
first_epoch.current(fake_now)
);
}
}
+47
View File
@@ -0,0 +1,47 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::node_status_api::models::NodeStatusApiError;
use thiserror::Error;
use validator_client::nymd::error::NymdError;
use validator_client::ValidatorClientError;
#[derive(Debug, Error)]
pub(crate) enum RewardingError {
#[error("Could not distribute rewards as the contract address was unspecified")]
UnspecifiedContractAddress,
#[error("There were no mixnodes to reward (network is dead)")]
NoMixnodesToReward,
#[error("There were no gateways to reward (network is dead)")]
NoGatewaysToReward,
#[error("Failed to execute the smart contract - {0}")]
ContractExecutionFailure(NymdError),
// The inner error should be modified at some point...
#[error("We run into storage issues - {0}")]
StorageError(NodeStatusApiError),
#[error("Failed to query the smart contract - {0}")]
ValidatorClientError(ValidatorClientError),
}
impl From<NymdError> for RewardingError {
fn from(err: NymdError) -> Self {
RewardingError::ContractExecutionFailure(err)
}
}
impl From<NodeStatusApiError> for RewardingError {
fn from(err: NodeStatusApiError) -> Self {
RewardingError::StorageError(err)
}
}
impl From<ValidatorClientError> for RewardingError {
fn from(err: ValidatorClientError) -> Self {
RewardingError::ValidatorClientError(err)
}
}
+871
View File
@@ -0,0 +1,871 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cache::ValidatorCache;
use crate::node_status_api::models::{GatewayStatusReport, MixnodeStatusReport, Uptime};
use crate::node_status_api::ONE_DAY;
use crate::nymd_client::Client;
use crate::rewarding::epoch::Epoch;
use crate::rewarding::error::RewardingError;
use crate::storage::models::{
FailedGatewayRewardChunk, FailedMixnodeRewardChunk, PossiblyUnrewardedGateway,
PossiblyUnrewardedMixnode, RewardingReport,
};
use crate::storage::NodeStatusStorage;
use log::{error, info};
use mixnet_contract::{ExecuteMsg, IdentityKey};
use rand::{thread_rng, Rng};
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::time::Duration;
use time::OffsetDateTime;
use tokio::time::sleep;
use validator_client::nymd::SigningNymdClient;
pub(crate) mod epoch;
pub(crate) mod error;
// the actual base cost is around 125_000, but let's give ourselves a bit of safety net in case
// we introduce some tiny contract changes that would bump that value up
pub(crate) const MIXNODE_REWARD_OP_BASE_GAS_LIMIT: u64 = 150_000;
pub(crate) const GATEWAY_REWARD_OP_BASE_GAS_LIMIT: u64 = 150_000;
// For each delegation reward we perform a read and a write is being executed,
// which are the most costly parts involved in process. Both of them are ~1000 sdk gas in cost.
// However, experimentally it looks like first delegation adds total of additional ~3000 of sdk gas
// cost and each subsequent about ~2500.
// Therefore, since base cost is not tuned to the bare minimum, let's treat all of delegations as extra
// 2750 of sdk gas.
pub(crate) const PER_MIXNODE_DELEGATION_GAS_INCREASE: u64 = 2750;
pub(crate) const PER_GATEWAY_DELEGATION_GAS_INCREASE: u64 = 2750;
// Another safety net in case of contract changes,
// the calculated total gas limit is going to get multiplied by that value.
pub(crate) const REWARDING_GAS_LIMIT_MULTIPLIER: f64 = 1.05;
pub(crate) const MAX_TO_REWARD_AT_ONCE: usize = 50;
pub(crate) const REWARDING_TIME_VARIANCE: f32 = 0.05; // 5% (so for example +/-1.2h for 24h epoch)
#[derive(Debug, Clone)]
pub(crate) struct MixnodeToReward {
pub(crate) identity: IdentityKey,
pub(crate) uptime: Uptime,
/// Total number of individual addresses that have delegated to this particular node
pub(crate) total_delegations: usize,
}
#[derive(Debug, Clone)]
pub(crate) struct GatewayToReward {
pub(crate) identity: IdentityKey,
pub(crate) uptime: Uptime,
/// Total number of individual addresses that have delegated to this particular gateway
pub(crate) total_delegations: usize,
}
pub(crate) struct FailedMixnodeRewardChunkDetails {
possibly_unrewarded: Vec<MixnodeToReward>,
error_message: String,
}
pub(crate) struct FailedGatewayRewardChunkDetails {
possibly_unrewarded: Vec<GatewayToReward>,
error_message: String,
}
#[derive(Default)]
pub(crate) struct FailureData {
mixnodes: Option<Vec<FailedMixnodeRewardChunkDetails>>,
gateways: Option<Vec<FailedGatewayRewardChunkDetails>>,
}
impl<'a> From<&'a MixnodeToReward> for ExecuteMsg {
fn from(node: &MixnodeToReward) -> Self {
ExecuteMsg::RewardMixnode {
identity: node.identity.clone(),
uptime: node.uptime.u8() as u32,
}
}
}
impl<'a> From<&'a GatewayToReward> for ExecuteMsg {
fn from(node: &GatewayToReward) -> Self {
ExecuteMsg::RewardGateway {
identity: node.identity.clone(),
uptime: node.uptime.u8() as u32,
}
}
}
pub(crate) struct Rewarder {
nymd_client: Client<SigningNymdClient>,
validator_cache: ValidatorCache,
storage: NodeStatusStorage,
/// The first epoch of the current length.
first_epoch: Epoch,
/// Ideal world, expected number of network monitor test runs per epoch.
/// In reality it will be slightly lower due to network delays, but it's good enough
/// for estimations regarding percentage of available data for reward distribution.
expected_epoch_monitor_runs: usize,
/// Minimum percentage of network monitor test runs reports required in order to distribute
/// rewards.
minimum_epoch_monitor_threshold: u8,
}
impl Rewarder {
pub(crate) fn new(
nymd_client: Client<SigningNymdClient>,
validator_cache: ValidatorCache,
storage: NodeStatusStorage,
first_epoch: Epoch,
expected_epoch_monitor_runs: usize,
minimum_epoch_monitor_threshold: u8,
) -> Self {
Rewarder {
nymd_client,
validator_cache,
storage,
first_epoch,
expected_epoch_monitor_runs,
minimum_epoch_monitor_threshold,
}
}
/// Obtains the current number of delegators that have delegated their stake towards this particular mixnode.
///
/// # Arguments
///
/// * `identity`: identity key of the mixnode
async fn get_mixnode_delegators_count(
&self,
identity: IdentityKey,
) -> Result<usize, RewardingError> {
Ok(self
.nymd_client
.get_mixnode_delegations(identity)
.await?
.len())
}
/// Obtains the current number of delegators that have delegated their stake towards this particular gateway.
///
/// # Arguments
///
/// * `identity`: identity key of the gateway
async fn get_gateway_delegators_count(
&self,
identity: IdentityKey,
) -> Result<usize, RewardingError> {
Ok(self
.nymd_client
.get_gateway_delegations(identity)
.await?
.len())
}
/// Queries the smart contract in order to obtain the current list of bonded mixnodes and then
/// for each mixnode determines how many delegators it has.
async fn produce_mixnode_delegators_map(
&self,
) -> Result<HashMap<IdentityKey, usize>, RewardingError> {
// Technically we could optimise it by creating a concurrent stream and executing multiple
// queries concurrently.
//
// I've actually tested that approach and for 5300 nodes running it all sequentially was taking around 19s
// while running it with 20 concurrent queries was taking around 4.5s.
// Note that the results were a bit biased as I was testing it against remote validator
// while in real world this would be making only local requests.
// During the test my average ping times to the machine were around 2.6ms.
// So I guess the network latency was 2.6ms * 5300 = 13.78s in total in the sequential case.
//
// HOWEVER, even if the method was taking that long in real world,
// in the grand scheme of things it makes absolutely no difference. If the rewards
// distribution is delayed by 15s, it changes nothing as the process itself is not
// instantaneous.
let mut map = HashMap::new();
let bonded_mixnodes = self.validator_cache.mixnodes().await.into_inner();
for mix in bonded_mixnodes.into_iter() {
let delegator_count = self
.get_mixnode_delegators_count(mix.mix_node.identity_key.clone())
.await?;
map.insert(mix.mix_node.identity_key, delegator_count);
}
Ok(map)
}
/// Queries the smart contract in order to obtain the current list of bonded gateways and then
/// for each gateway determines how many delegators it has.
async fn produce_gateway_delegators_map(
&self,
) -> Result<HashMap<IdentityKey, usize>, RewardingError> {
// look at comments in `produce_mixnode_delegators_map` for some optimisation elaboration
let mut map = HashMap::new();
let bonded_gateways = self.validator_cache.gateways().await.into_inner();
for gateway in bonded_gateways.into_iter() {
let delegator_count = self
.get_gateway_delegators_count(gateway.gateway.identity_key.clone())
.await?;
map.insert(gateway.gateway.identity_key, delegator_count);
}
Ok(map)
}
/// Calculates the absolute uptime of given node that is then passed as one of the arguments
/// in the smart contract to determine the actual reward value.
///
/// Currently both ipv4 and ipv6 uptimes carry the same weight in the calculation.
///
/// # Arguments
///
/// * `ipv4_uptime`: ipv4 uptime of the node in the last epoch.
/// * `ipv6_uptime`: ipv6 uptime of the node in the last epoch.
fn calculate_absolute_uptime(&self, ipv4_uptime: Uptime, ipv6_uptime: Uptime) -> Uptime {
// just take average of ipv4 and ipv6 uptimes using equal weights
let abs = ((ipv4_uptime.u8() as f32 + ipv6_uptime.u8() as f32) / 2.0).round();
Uptime::try_from(abs as i64).unwrap()
}
/// Given the list of mixnodes that were tested in the last epoch, tries to determine the
/// subset that are eligible for any rewards.
///
/// As of right now, it is a rather straightforward process. It is only checked whether the node
/// is currently bonded and has uptime > 0.
/// Unlike the typescript rewards script, it currently does not look at the verloc data nor
/// whether the non-mixing ports are open.
///
/// The method also obtains the number of delegators towards the node in order to more accurately
/// approximate the required gas fees when distributing the rewards.
///
/// # Arguments
///
/// * `active_mixnodes`: list of the nodes that were tested at least once by the network monitor
/// in the last epoch.
async fn determine_eligible_mixnodes(
&self,
active_mixnodes: &[MixnodeStatusReport],
) -> Result<Vec<MixnodeToReward>, RewardingError> {
// Currently we don't have as many 'features' as in the typescript reward script,
// such as we don't check ports or verloc data anymore. However, that's fine as
// it's a good price to pay for being able to move rewarding to rust
// and the lack of port data / verloc data will eventually be balanced out anyway
// by people hesitating to delegate to nodes without them and thus those nodes disappearing
// from the active set (once introduced)
let mixnode_delegators = self.produce_mixnode_delegators_map().await?;
// 1. go through all active mixnodes
// 2. filter out nodes that are currently not bonded (as `mixnode_delegators` was obtained by
// querying the validator)
// 3. determine uptime and attach delegators count
let eligible_nodes = active_mixnodes
.iter()
.filter_map(|mix| {
mixnode_delegators
.get(&mix.identity)
.map(|&total_delegations| MixnodeToReward {
identity: mix.identity.clone(),
uptime: self
.calculate_absolute_uptime(mix.last_day_ipv4, mix.last_day_ipv6),
total_delegations,
})
})
.filter(|node| node.uptime.u8() > 0)
.collect();
Ok(eligible_nodes)
}
/// Given the list of gateways that were tested in the last epoch, tries to determine the
/// subset that are eligible for any rewards.
///
/// As of right now, it is a rather straightforward process. It is only checked whether the node
/// is currently bonded and has uptime > 0.
/// Unlike the typescript rewards script, it currently does not look at the non-mixing ports are open.
///
/// The method also obtains the number of delegators towards the node in order to more accurately
/// approximate the required gas fees when distributing the rewards.
///
/// # Arguments
///
/// * `active_gateways`: list of the nodes that were tested at least once by the network monitor
/// in the last epoch.
async fn determine_eligible_gateways(
&self,
active_gateways: &[GatewayStatusReport],
) -> Result<Vec<GatewayToReward>, RewardingError> {
let gateway_delegators = self.produce_gateway_delegators_map().await?;
let eligible_nodes = active_gateways
.iter()
.filter_map(|gateway| {
gateway_delegators
.get(&gateway.identity)
.map(|&total_delegations| GatewayToReward {
identity: gateway.identity.clone(),
uptime: self.calculate_absolute_uptime(
gateway.last_day_ipv4,
gateway.last_day_ipv6,
),
total_delegations,
})
})
.filter(|node| node.uptime.u8() > 0)
.collect();
Ok(eligible_nodes)
}
/// Obtains the lists of all mixnodes and gateways that were tested at least a single time
/// by the network monitor in the specified epoch.
///
/// # Arguments
///
/// * `epoch`: the specified epoch.
async fn get_active_nodes(
&self,
epoch: Epoch,
) -> Result<(Vec<MixnodeStatusReport>, Vec<GatewayStatusReport>), RewardingError> {
let active_mixnodes = self
.storage
.get_all_active_mixnode_reports_in_interval(
epoch.start_unix_timestamp(),
epoch.end_unix_timestamp(),
)
.await?;
let active_gateways = self
.storage
.get_all_active_gateway_reports_in_interval(
epoch.start_unix_timestamp(),
epoch.end_unix_timestamp(),
)
.await?;
Ok((active_mixnodes, active_gateways))
}
/// Using the list of mixnodes eligible for rewards, chunks it into pre-defined sized-chunks
/// and gives out the rewards by calling the smart contract.
///
/// Returns an optional vector containing list of chunks that experienced a smart contract
/// execution error during reward distribution. However, it does not necessarily imply they
/// were not rewarded. There are some edge cases where we time out waiting for block to be included
/// yet the transactions went through.
///
/// Only returns errors for problems originating from before smart contract was called, i.e.
/// we know for sure not a single node has been rewarded.
///
/// # Arguments
///
/// * `eligible_mixnodes`: list of the nodes that are eligible to receive non-zero rewards.
async fn distribute_rewards_to_mixnodes(
&self,
eligible_mixnodes: &[MixnodeToReward],
) -> Option<Vec<FailedMixnodeRewardChunkDetails>> {
let mut failed_chunks = Vec::new();
for (i, mix_chunk) in eligible_mixnodes.chunks(MAX_TO_REWARD_AT_ONCE).enumerate() {
if let Err(err) = self.nymd_client.reward_mixnodes(mix_chunk).await {
error!("failed to reward mixnodes... - {}", err);
failed_chunks.push(FailedMixnodeRewardChunkDetails {
possibly_unrewarded: mix_chunk.to_vec(),
error_message: err.to_string(),
})
}
let rewarded = i * MAX_TO_REWARD_AT_ONCE + mix_chunk.len();
let percentage = rewarded as f32 * 100.0 / eligible_mixnodes.len() as f32;
info!(
"Rewarded {} / {} mixnodes\t{:.2}%",
rewarded,
eligible_mixnodes.len(),
percentage
);
}
if failed_chunks.is_empty() {
None
} else {
Some(failed_chunks)
}
}
/// Using the list of gateways eligible for rewards, chunks it into pre-defined sized-chunks
/// and gives out the rewards by calling the smart contract.
///
/// Returns an optional vector containing list of chunks that experienced a smart contract
/// execution error during reward distribution. However, it does not necessarily imply they
/// were not rewarded. There are some edge cases where we time out waiting for block to be included
/// yet the transactions went through.
///
/// Only returns errors for problems originating from before smart contract was called, i.e.
/// we know for sure not a single node has been rewarded.
///
/// # Arguments
///
/// * `eligible_gateways`: list of the nodes that are eligible to receive non-zero rewards.
async fn distribute_rewards_to_gateways(
&self,
eligible_gateways: &[GatewayToReward],
) -> Option<Vec<FailedGatewayRewardChunkDetails>> {
let mut failed_chunks = Vec::new();
for (i, gateway_chunk) in eligible_gateways.chunks(MAX_TO_REWARD_AT_ONCE).enumerate() {
if let Err(err) = self.nymd_client.reward_gateways(gateway_chunk).await {
error!("failed to reward gateways... - {}", err);
failed_chunks.push(FailedGatewayRewardChunkDetails {
possibly_unrewarded: gateway_chunk.to_vec(),
error_message: err.to_string(),
})
}
let rewarded = i * MAX_TO_REWARD_AT_ONCE + gateway_chunk.len();
let percentage = rewarded as f32 * 100.0 / eligible_gateways.len() as f32;
info!(
"Rewarded {} / {} gateways\t{:.2}%",
rewarded,
eligible_gateways.len(),
percentage
);
}
if failed_chunks.is_empty() {
None
} else {
Some(failed_chunks)
}
}
/// Using the list of active mixnode and gateways, determine which of them are eligible for
/// rewarding and distribute the rewards.
///
/// # Arguments
///
/// * `epoch_rewarding_id`: id of the current epoch rewarding.
///
/// * `active_mixnodes`: list of the nodes that were tested at least once by the network monitor
/// in the last epoch.
///
/// * `active_gateways`: list of the nodes that were tested at least once by the network monitor
/// in the last epoch.
async fn distribute_rewards(
&self,
epoch_rewarding_id: i64,
active_mixnodes: &[MixnodeStatusReport],
active_gateways: &[GatewayStatusReport],
) -> Result<(RewardingReport, Option<FailureData>), RewardingError> {
let mut failure_data = FailureData::default();
let eligible_mixnodes = self.determine_eligible_mixnodes(active_mixnodes).await?;
if eligible_mixnodes.is_empty() {
return Err(RewardingError::NoMixnodesToReward);
}
let eligible_gateways = self.determine_eligible_gateways(active_gateways).await?;
if eligible_gateways.is_empty() {
return Err(RewardingError::NoGatewaysToReward);
}
failure_data.mixnodes = self
.distribute_rewards_to_mixnodes(&eligible_mixnodes)
.await;
failure_data.gateways = self
.distribute_rewards_to_gateways(&eligible_gateways)
.await;
let report = RewardingReport {
epoch_rewarding_id,
eligible_mixnodes: eligible_mixnodes.len() as i64,
eligible_gateways: eligible_gateways.len() as i64,
possibly_unrewarded_mixnodes: failure_data
.mixnodes
.as_ref()
.map(|chunks| {
chunks
.iter()
.map(|chunk| chunk.possibly_unrewarded.len())
.sum::<usize>() as i64
})
.unwrap_or_default(),
possibly_unrewarded_gateways: failure_data
.gateways
.as_ref()
.map(|chunks| {
chunks
.iter()
.map(|chunk| chunk.possibly_unrewarded.len())
.sum::<usize>() as i64
})
.unwrap_or_default(),
};
if failure_data.mixnodes.is_none() && failure_data.gateways.is_none() {
Ok((report, None))
} else {
Ok((report, Some(failure_data)))
}
}
/// Saves information about possibly failed rewarding for future manual inspection.
///
/// Currently there is no automated recovery mechanism.
///
/// # Arguments
///
/// * `failure_data`: information regarding nodes that might have not received reward this epoch.
///
/// * `epoch_rewarding_id`: id of the current epoch rewarding.
async fn save_failure_information(
&self,
failure_data: FailureData,
epoch_rewarding_id: i64,
) -> Result<(), RewardingError> {
if let Some(failed_mixnode_chunks) = failure_data.mixnodes {
for failed_chunk in failed_mixnode_chunks.into_iter() {
// save the chunk
let chunk_id = self
.storage
.insert_failed_mixnode_reward_chunk(FailedMixnodeRewardChunk {
epoch_rewarding_id,
error_message: failed_chunk.error_message,
})
.await?;
// and then all associated nodes
for node in failed_chunk.possibly_unrewarded.into_iter() {
self.storage
.insert_possibly_unrewarded_mixnode(PossiblyUnrewardedMixnode {
chunk_id,
identity: node.identity,
uptime: node.uptime.u8(),
})
.await?;
}
}
}
if let Some(failed_gateway_chunks) = failure_data.gateways {
for failed_chunk in failed_gateway_chunks.into_iter() {
// save the chunk
let chunk_id = self
.storage
.insert_failed_gateway_reward_chunk(FailedGatewayRewardChunk {
epoch_rewarding_id,
error_message: failed_chunk.error_message,
})
.await?;
// and then all associated nodes
for node in failed_chunk.possibly_unrewarded.into_iter() {
self.storage
.insert_possibly_unrewarded_gateway(PossiblyUnrewardedGateway {
chunk_id,
identity: node.identity,
uptime: node.uptime.u8(),
})
.await?;
}
}
}
Ok(())
}
/// Determines random positive or negative time variance that should be added to the rewarding
/// distribution time so that all validators would not attempt to hit the smart contract
/// at exactly the same time.
fn epoch_variance(&self) -> (bool, Duration) {
let mut rng = thread_rng();
let abs_variance_secs = REWARDING_TIME_VARIANCE * self.first_epoch.length().as_secs_f32();
let variance = Duration::from_secs(rng.gen_range(0, abs_variance_secs as u64));
let sign = rng.gen_bool(0.5);
(sign, variance)
}
/// Determines whether this validator has already distributed rewards for the specified epoch
/// so that it wouldn't accidentally attempt to do it again.
///
/// # Arguments
///
/// * `epoch`: epoch to check
async fn check_if_rewarding_happened_at_epoch(
&self,
epoch: Epoch,
) -> Result<bool, RewardingError> {
if let Some(entry) = self
.storage
.get_epoch_rewarding_entry(epoch.start_unix_timestamp())
.await?
{
// log error if the attempt wasn't finished. This error implies the process has crashed
// during the rewards distribution
if !entry.finished {
error!(
"It seems that we haven't successfully finished distributing rewards at {}",
epoch
)
}
Ok(true)
} else {
Ok(false)
}
}
/// Determines whether the specified epoch is eligible for rewards, i.e. it was not rewarded
/// before and we have enough network monitor test data to distribute the rewards based on them.
///
/// # Arguments
///
/// * `epoch`: epoch to check
async fn check_epoch_eligibility(&self, epoch: Epoch) -> Result<bool, RewardingError> {
if self.check_if_rewarding_happened_at_epoch(epoch).await?
|| !self.check_for_monitor_data(epoch).await?
{
Ok(false)
} else {
// we haven't sent rewards during the epoch and we have enough monitor test data
Ok(true)
}
}
/// Determines the next epoch during which the rewards should get distributed.
///
/// # Arguments
///
/// * `now`: current datetime
async fn next_rewarding_epoch(&self, now: OffsetDateTime) -> Result<Epoch, RewardingError> {
// edge case handling for when we decide to change first epoch to be at some time in the future
// (i.e. epoch length transition)
// we don't have to perform checks here as it's impossible to distribute rewards for epochs
// in the future
if self.first_epoch.start() > now {
return Ok(self.first_epoch);
}
let current_epoch = self.first_epoch.current(now);
// check previous epoch in case we had a tiny hiccup
// example:
// epochs start at 12:00pm and last for 24h (ignore variance)
// validator-api crashed at 11:59am before distributing rewards
// and restarted at 12:01 - it has all the data required to distribute the rewards
// for the previous epoch.
let previous_epoch = current_epoch.previous_epoch();
if self.check_epoch_eligibility(previous_epoch).await? {
return Ok(previous_epoch);
}
// check if rewards weren't already given out for the current epoch
// (it can happen for negative variance if the process crashed)
// note that if the epoch ends at say 12:00 and it's 11:59 and we just started,
// we might end up skipping this epoch regardless
if !self
.check_if_rewarding_happened_at_epoch(current_epoch)
.await?
{
return Ok(current_epoch);
}
// if we have given rewards for the previous and the current epoch,
// wait until the next one
Ok(current_epoch.next_epoch())
}
/// Given datetime of the rewarding epoch datetime, determine time until it ends and add (or remove)
/// a little bit of time variance from it in order to prevent all validators distributing
/// the rewards at exactly the same time instant.
///
/// # Arguments
///
/// * `rewarding_epoch`: the rewarding epoch
fn determine_delay_until_next_rewarding(&self, rewarding_epoch: Epoch) -> Option<Duration> {
let now = OffsetDateTime::now_utc();
if now > rewarding_epoch.end() {
return None;
}
// we have a positive duration so we can't fail the conversion
let until_epoch_end: Duration = (rewarding_epoch.end() - now).try_into().unwrap();
// add a bit of variance to the start time
let (sign, variance) = self.epoch_variance();
if sign {
Some(until_epoch_end + variance)
} else {
until_epoch_end.checked_sub(variance)
}
}
/// Distribute rewards to all eligible mixnodes and gateways on the network.
///
/// # Arguments
///
/// * `epoch`: current rewarding epoch
async fn perform_rewarding(&self, epoch: Epoch) -> Result<(), RewardingError> {
info!(
"Starting mixnode and gateway rewarding for epoch {} ...",
epoch
);
// get nodes that were active during the epoch
let (active_mixnodes, active_gateways) = self.get_active_nodes(epoch).await?;
// insert information about beginning the procedure (so that if we crash during it,
// we wouldn't attempt to possibly double reward operators)
let epoch_rewarding_id = self
.storage
.insert_started_epoch_rewarding(epoch.start_unix_timestamp())
.await?;
let (report, failure_data) = self
.distribute_rewards(epoch_rewarding_id, &active_mixnodes, &active_gateways)
.await?;
self.storage
.finish_rewarding_epoch_and_insert_report(report)
.await?;
if let Some(failure_data) = failure_data {
if let Err(err) = self
.save_failure_information(failure_data, epoch_rewarding_id)
.await
{
error!("failed to save information about rewarding failures!");
// TODO: should we just terminate the process here?
return Err(err);
}
}
// TODO: again, this assumes 24h epochs.
let epoch_iso_8601 = epoch.start().date().to_string();
let two_days_ago = (epoch.start() - 2 * ONE_DAY).unix_timestamp();
// NOTE: this works under assumption that epochs are 24h in length.
// If this changes then the historical uptime updates should be performed
// on a timer in another task
if self
.storage
.check_if_historical_uptimes_exist_for_date(&epoch_iso_8601)
.await?
{
error!("We have already updated uptimes for all nodes this day. If you're seeing this warning, it's likely rewards were given out twice this day!")
} else {
info!(
"Updating historical daily uptimes of all nodes and purging old status reports..."
);
self.storage
.update_historical_uptimes(&epoch_iso_8601, &active_mixnodes, &active_gateways)
.await?;
self.storage.purge_old_statuses(two_days_ago).await?;
}
Ok(())
}
/// Checks whether there is enough network monitor test run data to distribute rewards
/// for the specified epoch.
///
/// # Arguments
///
/// * `epoch`: epoch to check
async fn check_for_monitor_data(&self, epoch: Epoch) -> Result<bool, RewardingError> {
let since = epoch.start_unix_timestamp();
let until = epoch.end_unix_timestamp();
let monitor_runs = self.storage.get_monitor_runs_count(since, until).await?;
// check if we have more than threshold percentage of monitor runs for the epoch
let available = monitor_runs as f32 * 100.0 / self.expected_epoch_monitor_runs as f32;
Ok(available >= self.minimum_epoch_monitor_threshold as f32)
}
/// Waits until the next epoch starts
///
/// # Arguments
///
/// * `current_epoch`: current epoch that we want to wait out
async fn wait_until_next_epoch(&self, current_epoch: Epoch) {
let now = OffsetDateTime::now_utc();
let until_end = current_epoch.end() - now;
// otherwise it means the epoch is already over and the next one has begun
if until_end.is_positive() {
// we know for sure that the duration here is positive so conversion can't fail
sleep(until_end.try_into().unwrap()).await;
}
}
pub(crate) async fn run(&self) {
// whatever happens, we shouldn't do anything until the cache is initialised
self.validator_cache.wait_for_initial_values().await;
loop {
// Just a reference for anyone wanting to modify the code to use blockchain timestamps.
// This method is now available:
// let current_block_timestamp = self.nymd_client.current_block_timestamp().await.unwrap();
// and if you look at the source of that, you can easily use block height instead if preferred.
let now = OffsetDateTime::now_utc();
// if we haven't rewarded anyone for the previous epoch, get the start of the previous epoch
// otherwise get the start of the current epoch
// (remember, we will be rewarding at the END of the selected epoch)
let next_rewarding_epoch = match self.next_rewarding_epoch(now).await {
Ok(next_rewarding_epoch) => next_rewarding_epoch,
Err(err) => {
// I'm not entirely sure whether this is recoverable, because failure implies database errors
error!("We failed to determine time until next reward cycle ({}). Going to wait for the epoch length until next attempt", err);
sleep(self.first_epoch.length()).await;
continue;
}
};
// wait's until the start of the *next* epoch, e.g. end of the current chosen epoch
// (it could be none, for example if we are distributing overdue rewards for the previous epoch)
// plus add a bit of variance
if let Some(remaining_time) =
self.determine_delay_until_next_rewarding(next_rewarding_epoch)
{
info!("Next rewarding epoch is {}", next_rewarding_epoch);
info!(
"Rewards distribution will happen at {}. ({:?} left)",
now + remaining_time,
remaining_time
);
sleep(remaining_time).await;
} else {
info!(
"Starting reward distribution for epoch {} immediately!",
next_rewarding_epoch
);
}
// it's time to distribute rewards, however, first let's see if we have enough data to go through with it
// (consider the case of rewards being distributed every 24h at 12:00pm and validator-api
// starting for the very first time at 11:00am. It's not going to have enough data for
// rewards for the *current* epoch, but we couldn't have known that at startup)
match self.check_for_monitor_data(next_rewarding_epoch).await {
Err(_) | Ok(false) => {
warn!("We do not have sufficient monitor data to perform rewarding in this epoch ({})", next_rewarding_epoch);
self.wait_until_next_epoch(next_rewarding_epoch).await;
continue;
}
_ => (),
}
if let Err(err) = self.perform_rewarding(next_rewarding_epoch).await {
// TODO: should we just terminate the process here instead?
error!("Failed to distribute rewards! - {}", err)
}
}
}
}
+365 -78
View File
@@ -4,19 +4,48 @@
use crate::network_monitor::monitor::summary_producer::NodeResult;
use crate::node_status_api::models::{HistoricalUptime, Uptime};
use crate::node_status_api::utils::ActiveNodeDayStatuses;
use crate::storage::models::{ActiveNode, NodeStatus};
use crate::storage::models::{
ActiveNode, EpochRewarding, FailedGatewayRewardChunk, FailedMixnodeRewardChunk, NodeStatus,
PossiblyUnrewardedGateway, PossiblyUnrewardedMixnode, RewardingReport,
};
use crate::storage::UnixTimestamp;
use std::convert::TryFrom;
#[derive(Clone)]
pub(crate) struct StorageManager {
pub(super) connection_pool: sqlx::SqlitePool,
pub(crate) connection_pool: sqlx::SqlitePool,
}
// all SQL goes here
impl StorageManager {
/// Tries to obtain row id of given mixnode given its identity
pub(super) async fn get_mixnode_id(&self, identity: &str) -> Result<Option<i64>, sqlx::Error> {
let id = sqlx::query!(
"SELECT id FROM mixnode_details WHERE identity = ?",
identity
)
.fetch_optional(&self.connection_pool)
.await?
.map(|row| row.id);
Ok(id)
}
/// Tries to obtain row id of given gateway given its identity
pub(super) async fn get_gateway_id(&self, identity: &str) -> Result<Option<i64>, sqlx::Error> {
let id = sqlx::query!(
"SELECT id FROM gateway_details WHERE identity = ?",
identity
)
.fetch_optional(&self.connection_pool)
.await?
.map(|row| row.id);
Ok(id)
}
/// Tries to obtain owner value of given mixnode given its identity
pub(crate) async fn get_mixnode_owner(
pub(super) async fn get_mixnode_owner(
&self,
identity: &str,
) -> Result<Option<String>, sqlx::Error> {
@@ -32,7 +61,7 @@ impl StorageManager {
}
/// Tries to obtain owner value of given gateway given its identity
pub(crate) async fn get_gateway_owner(
pub(super) async fn get_gateway_owner(
&self,
identity: &str,
) -> Result<Option<String>, sqlx::Error> {
@@ -49,7 +78,7 @@ impl StorageManager {
/// Gets all ipv4 statuses for mixnode with particular identity that were inserted
/// into the database after the specified unix timestamp.
pub(crate) async fn get_mixnode_ipv4_statuses_since(
pub(super) async fn get_mixnode_ipv4_statuses_since(
&self,
identity: &str,
timestamp: UnixTimestamp,
@@ -72,7 +101,7 @@ impl StorageManager {
/// Gets all ipv6 statuses for mixnode with particular identity that were inserted
/// into the database after the specified unix timestamp.
pub(crate) async fn get_mixnode_ipv6_statuses_since(
pub(super) async fn get_mixnode_ipv6_statuses_since(
&self,
identity: &str,
timestamp: UnixTimestamp,
@@ -95,7 +124,7 @@ impl StorageManager {
/// Gets all ipv4 statuses for gateway with particular identity that were inserted
/// into the database after the specified unix timestamp.
pub(crate) async fn get_gateway_ipv4_statuses_since(
pub(super) async fn get_gateway_ipv4_statuses_since(
&self,
identity: &str,
timestamp: UnixTimestamp,
@@ -118,7 +147,7 @@ impl StorageManager {
/// Gets all ipv6 statuses for gateway with particular identity that were inserted
/// into the database after the specified unix timestamp.
pub(crate) async fn get_gateway_ipv6_statuses_since(
pub(super) async fn get_gateway_ipv6_statuses_since(
&self,
identity: &str,
timestamp: UnixTimestamp,
@@ -140,7 +169,7 @@ impl StorageManager {
}
/// Gets the historical daily uptime associated with the particular mixnode
pub(crate) async fn get_mixnode_historical_uptimes(
pub(super) async fn get_mixnode_historical_uptimes(
&self,
identity: &str,
) -> Result<Vec<HistoricalUptime>, sqlx::Error> {
@@ -180,7 +209,7 @@ impl StorageManager {
}
/// Gets the historical daily uptime associated with the particular gateway
pub(crate) async fn get_gateway_historical_uptimes(
pub(super) async fn get_gateway_historical_uptimes(
&self,
identity: &str,
) -> Result<Vec<HistoricalUptime>, sqlx::Error> {
@@ -220,91 +249,119 @@ impl StorageManager {
}
/// Gets all ipv4 statuses for mixnode with particular id that were inserted
/// into the database after the specified unix timestamp.
pub(crate) async fn get_mixnode_ipv4_statuses_since_by_id(
/// into the database within the specified time interval.
///
/// # Arguments
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(super) async fn get_mixnode_ipv4_statuses_by_id(
&self,
id: i64,
timestamp: UnixTimestamp,
since: UnixTimestamp,
until: UnixTimestamp,
) -> Result<Vec<NodeStatus>, sqlx::Error> {
sqlx::query_as!(
NodeStatus,
r#"
SELECT timestamp, up
FROM mixnode_ipv4_status
WHERE mixnode_details_id=? AND timestamp > ?;
WHERE mixnode_details_id=? AND timestamp > ? AND timestamp < ?;
"#,
id,
timestamp
since,
until,
)
.fetch_all(&self.connection_pool)
.await
}
/// Gets all ipv6 statuses for mixnode with particular id that were inserted
/// into the database after the specified unix timestamp.
pub(crate) async fn get_mixnode_ipv6_statuses_since_by_id(
/// into the database within the specified time interval.
///
/// # Arguments
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(super) async fn get_mixnode_ipv6_statuses_by_id(
&self,
id: i64,
timestamp: UnixTimestamp,
since: UnixTimestamp,
until: UnixTimestamp,
) -> Result<Vec<NodeStatus>, sqlx::Error> {
sqlx::query_as!(
NodeStatus,
r#"
SELECT timestamp, up
FROM mixnode_ipv6_status
WHERE mixnode_details_id=? AND timestamp > ?;
WHERE mixnode_details_id=? AND timestamp > ? AND timestamp < ?;
"#,
id,
timestamp
since,
until,
)
.fetch_all(&self.connection_pool)
.await
}
/// Gets all ipv4 statuses for gateway with particular id that were inserted
/// into the database after the specified unix timestamp.
pub(crate) async fn get_gateway_ipv4_statuses_since_by_id(
/// into the database within the specified time interval.
///
/// # Arguments
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(super) async fn get_gateway_ipv4_statuses_by_id(
&self,
id: i64,
timestamp: UnixTimestamp,
since: UnixTimestamp,
until: UnixTimestamp,
) -> Result<Vec<NodeStatus>, sqlx::Error> {
sqlx::query_as!(
NodeStatus,
r#"
SELECT timestamp, up
FROM gateway_ipv4_status
WHERE gateway_details_id=? AND timestamp > ?;
WHERE gateway_details_id=? AND timestamp > ? AND timestamp < ?;
"#,
id,
timestamp
since,
until,
)
.fetch_all(&self.connection_pool)
.await
}
/// Gets all ipv6 statuses for gateway with particular id that were inserted
/// into the database after the specified unix timestamp.
pub(crate) async fn get_gateway_ipv6_statuses_since_by_id(
/// into the database within the specified time interval.
///
/// # Arguments
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(super) async fn get_gateway_ipv6_statuses_by_id(
&self,
id: i64,
timestamp: UnixTimestamp,
since: UnixTimestamp,
until: UnixTimestamp,
) -> Result<Vec<NodeStatus>, sqlx::Error> {
sqlx::query_as!(
NodeStatus,
r#"
SELECT timestamp, up
FROM gateway_ipv6_status
WHERE gateway_details_id=? AND timestamp > ?;
WHERE gateway_details_id=? AND timestamp > ? AND timestamp < ?;
"#,
id,
timestamp
since,
until,
)
.fetch_all(&self.connection_pool)
.await
}
/// Tries to submit mixnode [`NodeResult`] from the network monitor to the database.
pub(crate) async fn submit_mixnode_statuses(
pub(super) async fn submit_mixnode_statuses(
&self,
timestamp: UnixTimestamp,
mixnode_results: Vec<NodeResult>,
@@ -356,7 +413,7 @@ impl StorageManager {
}
/// Tries to submit gateway [`NodeResult`] from the network monitor to the database.
pub(crate) async fn submit_gateway_statuses(
pub(super) async fn submit_gateway_statuses(
&self,
timestamp: UnixTimestamp,
gateway_results: Vec<NodeResult>,
@@ -412,7 +469,7 @@ impl StorageManager {
}
/// Checks whether there are already any historical uptimes with this particular date.
pub(crate) async fn check_for_historical_uptime_existence(
pub(super) async fn check_for_historical_uptime_existence(
&self,
today_iso_8601: &str,
) -> Result<bool, sqlx::Error> {
@@ -426,7 +483,7 @@ impl StorageManager {
}
/// Creates new entry for mixnode historical uptime
pub(crate) async fn insert_mixnode_historical_uptime(
pub(super) async fn insert_mixnode_historical_uptime(
&self,
node_id: i64,
date: &str,
@@ -443,8 +500,8 @@ impl StorageManager {
Ok(())
}
/// Creates new entry for gatewy historical uptime
pub(crate) async fn insert_gateway_historical_uptime(
/// Creates new entry for gateway historical uptime
pub(super) async fn insert_gateway_historical_uptime(
&self,
node_id: i64,
date: &str,
@@ -466,7 +523,7 @@ impl StorageManager {
/// # Arguments
///
/// * `timestamp`: unix timestamp at which the monitor test run has occurred
pub(crate) async fn insert_monitor_run(
pub(super) async fn insert_monitor_run(
&self,
timestamp: UnixTimestamp,
) -> Result<(), sqlx::Error> {
@@ -482,7 +539,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_monitor_runs_count(
pub(super) async fn get_monitor_runs_count(
&self,
since: UnixTimestamp,
until: UnixTimestamp,
@@ -498,7 +555,13 @@ impl StorageManager {
Ok(count)
}
pub(crate) async fn purge_old_mixnode_ipv4_statuses(
/// Removes all ipv4 statuses for all mixnodes that are older than the
/// provided timestamp. This method is indirectly called at every reward cycle.
///
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(super) async fn purge_old_mixnode_ipv4_statuses(
&self,
timestamp: UnixTimestamp,
) -> Result<(), sqlx::Error> {
@@ -511,7 +574,13 @@ impl StorageManager {
Ok(())
}
pub(crate) async fn purge_old_mixnode_ipv6_statuses(
/// Removes all ipv6 statuses for all mixnodes that are older than the
/// provided timestamp. This method is indirectly called at every reward cycle.
///
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(super) async fn purge_old_mixnode_ipv6_statuses(
&self,
timestamp: UnixTimestamp,
) -> Result<(), sqlx::Error> {
@@ -524,7 +593,13 @@ impl StorageManager {
Ok(())
}
pub(crate) async fn purge_old_gateway_ipv4_statuses(
/// Removes all ipv4 statuses for all gateways that are older than the
/// provided timestamp. This method is indirectly called at every reward cycle.
///
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(super) async fn purge_old_gateway_ipv4_statuses(
&self,
timestamp: UnixTimestamp,
) -> Result<(), sqlx::Error> {
@@ -537,7 +612,13 @@ impl StorageManager {
Ok(())
}
pub(crate) async fn purge_old_gateway_ipv6_statuses(
/// Removes all ipv6 statuses for all gateways that are older than the
/// provided timestamp. This method is indirectly called at every reward cycle.
///
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(super) async fn purge_old_gateway_ipv6_statuses(
&self,
timestamp: UnixTimestamp,
) -> Result<(), sqlx::Error> {
@@ -550,17 +631,17 @@ impl StorageManager {
Ok(())
}
// ####################################################################################################
// ALL THE METHODS BELOW ARE TEMPORARY AND WILL BE REMOVED ONCE PAYMENTS ARE DONE INSIDE VALIDATOR API
// ####################################################################################################
// NOTE: this method will go away once we move payments into the validator-api
// it just helps us to get rid of having to query for reports of each node individually
/// Returns public key, owner and id of all mixnodes that have had any ipv4 statuses submitted
/// since provided timestamp.
pub(crate) async fn get_all_active_mixnodes(
/// within the provided time interval.
///
/// # Arguments
///
/// * `since`: indicates the lower bound timestamp for deciding whether given mixnode is active
/// * `until`: indicates the upper bound timestamp for deciding whether given mixnode is active
pub(super) async fn get_all_active_mixnodes_in_interval(
&self,
timestamp: UnixTimestamp,
since: UnixTimestamp,
until: UnixTimestamp,
) -> Result<Vec<ActiveNode>, sqlx::Error> {
// find mixnode details of all nodes that have had at least 1 ipv4 status since the provided
// timestamp
@@ -574,22 +655,27 @@ impl StorageManager {
JOIN mixnode_ipv4_status
ON mixnode_details.id = mixnode_ipv4_status.mixnode_details_id
WHERE EXISTS (
SELECT 1 FROM mixnode_ipv4_status WHERE timestamp > ?
SELECT 1 FROM mixnode_ipv4_status WHERE timestamp > ? AND timestamp < ?
)
"#,
timestamp
since,
until
)
.fetch_all(&self.connection_pool)
.await
}
// NOTE: this method will go away once we move payments into the validator-api
// it just helps us to get rid of having to query for reports of each node individually
/// Returns public key, owner and id of all gateways that have had any ipv4 statuses submitted
/// since provided timestamp.
pub(crate) async fn get_all_active_gateways(
/// within the provided time interval.
///
/// # Arguments
///
/// * `since`: indicates the lower bound timestamp for deciding whether given gateway is active
/// * `until`: indicates the upper bound timestamp for deciding whether given gateway is active
pub(super) async fn get_all_active_gateways_in_interval(
&self,
timestamp: UnixTimestamp,
since: UnixTimestamp,
until: UnixTimestamp,
) -> Result<Vec<ActiveNode>, sqlx::Error> {
sqlx::query_as!(
ActiveNode,
@@ -599,38 +685,235 @@ impl StorageManager {
JOIN gateway_ipv4_status
ON gateway_details.id = gateway_ipv4_status.gateway_details_id
WHERE EXISTS (
SELECT 1 FROM gateway_ipv4_status WHERE timestamp > ?
SELECT 1 FROM gateway_ipv4_status WHERE timestamp > ? AND timestamp < ?
)
"#,
timestamp
since,
until,
)
.fetch_all(&self.connection_pool)
.await
}
// NOTE: this method will go away once we move payments into the validator-api
// it just helps us to get rid of having to query for reports of each node individually
// TODO: should that live on the 'Inner' struct or should it rather exist on the actual storage struct
// since technically it doesn't touch any SQL directly
pub(crate) async fn get_all_active_mixnodes_statuses(
/// Inserts information about starting new epoch rewarding into the database.
/// Returns id of the newly created entry.
///
/// # Arguments
///
/// * `epoch_timestamp`: Unix timestamp of this rewarding epoch.
pub(super) async fn insert_new_epoch_rewarding(
&self,
epoch_timestamp: UnixTimestamp,
) -> Result<i64, sqlx::Error> {
let res = sqlx::query!(
r#"
INSERT INTO epoch_rewarding (epoch_timestamp, finished)
VALUES (?, 0)
"#,
epoch_timestamp
)
.execute(&self.connection_pool)
.await?;
Ok(res.last_insert_rowid())
}
/// Sets the `finished` field on the epoch rewarding to true.
///
/// # Arguments
///
/// * `id`: id of the entry we want to update.
pub(super) async fn update_finished_epoch_rewarding(&self, id: i64) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
UPDATE epoch_rewarding
SET finished = 1
WHERE id = ?
"#,
id
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
// /// Tries to obtain the most recent epoch rewarding entry currently stored.
// ///
// /// Returns None if no data exists.
// pub(super) async fn get_most_recent_epoch_rewarding_entry(
// &self,
// ) -> Result<Option<EpochRewarding>, sqlx::Error> {
// sqlx::query_as!(
// EpochRewarding,
// r#"
// SELECT * FROM epoch_rewarding
// ORDER BY epoch_timestamp DESC
// LIMIT 1
// "#,
// )
// .fetch_optional(&self.connection_pool)
// .await
// }
/// Tries to obtain the epoch rewarding entry that has the provided timestamp.
///
/// Returns None if no data exists.
///
/// # Arguments
///
/// * `epoch_timestamp`: Unix timestamp of this rewarding epoch.
pub(super) async fn get_epoch_rewarding_entry(
&self,
epoch_timestamp: UnixTimestamp,
) -> Result<Option<EpochRewarding>, sqlx::Error> {
sqlx::query_as!(
EpochRewarding,
r#"
SELECT * FROM epoch_rewarding
WHERE epoch_timestamp = ?
"#,
epoch_timestamp
)
.fetch_optional(&self.connection_pool)
.await
}
/// Inserts new rewarding report into the database.
///
/// # Arguments
///
/// * `report`: report to insert into the database
pub(super) async fn insert_rewarding_report(
&self,
report: RewardingReport,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO rewarding_report
(epoch_rewarding_id, eligible_mixnodes, eligible_gateways, possibly_unrewarded_mixnodes, possibly_unrewarded_gateways)
VALUES (?, ?, ?, ?, ?);
"#,
report.epoch_rewarding_id,
report.eligible_mixnodes,
report.eligible_gateways,
report.possibly_unrewarded_mixnodes,
report.possibly_unrewarded_gateways,
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
/// Inserts new failed mixnode reward chunk information into the database.
/// Returns id of the newly created entry.
///
/// # Arguments
///
/// * `failed_chunk`: chunk information to insert.
pub(super) async fn insert_failed_mixnode_reward_chunk(
&self,
failed_chunk: FailedMixnodeRewardChunk,
) -> Result<i64, sqlx::Error> {
let res = sqlx::query!(
r#"
INSERT INTO failed_mixnode_reward_chunk (error_message, reward_summary_id) VALUES (?, ?)
"#,
failed_chunk.error_message,
failed_chunk.epoch_rewarding_id,
).execute(&self.connection_pool).await?;
Ok(res.last_insert_rowid())
}
/// Inserts new failed gateway reward chunk information into the database.
/// Returns id of the newly created entry.
///
/// # Arguments
///
/// * `failed_chunk`: chunk information to insert.
pub(super) async fn insert_failed_gateway_reward_chunk(
&self,
failed_chunk: FailedGatewayRewardChunk,
) -> Result<i64, sqlx::Error> {
let res = sqlx::query!(
r#"
INSERT INTO failed_gateway_reward_chunk (error_message, reward_summary_id) VALUES (?, ?)
"#,
failed_chunk.error_message,
failed_chunk.epoch_rewarding_id,
).execute(&self.connection_pool).await?;
Ok(res.last_insert_rowid())
}
/// Inserts information into the database about a mixnode that might have been unfairly unrewarded this epoch.
///
/// # Arguments
///
/// * `mixnode`: mixnode information to insert.
pub(super) async fn insert_possibly_unrewarded_mixnode(
&self,
mixnode: PossiblyUnrewardedMixnode,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO possibly_unrewarded_mixnode (identity, uptime, failed_mixnode_reward_chunk_id) VALUES (?, ?, ?)
"#,
mixnode.identity,
mixnode.uptime,
mixnode.chunk_id
).execute(&self.connection_pool).await?;
Ok(())
}
/// Inserts information into the database about a gateway that might have been unfairly unrewarded this epoch.
///
/// # Arguments
///
/// * `gateway`: mixnode information to insert.
pub(super) async fn insert_possibly_unrewarded_gateway(
&self,
gateway: PossiblyUnrewardedGateway,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO possibly_unrewarded_gateway (identity, uptime, failed_gateway_reward_chunk_id) VALUES (?, ?, ?)
"#,
gateway.identity,
gateway.uptime,
gateway.chunk_id
).execute(&self.connection_pool).await?;
Ok(())
}
/// Obtains all statuses of active mixnodes from the specified time interval.
///
/// # Arguments
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(super) async fn get_all_active_mixnodes_statuses_in_interval(
&self,
since: UnixTimestamp,
until: UnixTimestamp,
) -> Result<Vec<ActiveNodeDayStatuses>, sqlx::Error> {
let active_nodes = self.get_all_active_mixnodes(since).await?;
let active_nodes = self
.get_all_active_mixnodes_in_interval(since, until)
.await?;
let mut active_day_statuses = Vec::with_capacity(active_nodes.len());
for active_node in active_nodes.into_iter() {
let ipv4_statuses = self
.get_mixnode_ipv4_statuses_since_by_id(active_node.id, since)
.get_mixnode_ipv4_statuses_by_id(active_node.id, since, until)
.await?;
let ipv6_statuses = self
.get_mixnode_ipv6_statuses_since_by_id(active_node.id, since)
.get_mixnode_ipv6_statuses_by_id(active_node.id, since, until)
.await?;
let statuses = ActiveNodeDayStatuses {
identity: active_node.identity,
owner: active_node.owner,
node_id: active_node.id,
ipv4_statuses,
ipv6_statuses,
};
@@ -641,29 +924,33 @@ impl StorageManager {
Ok(active_day_statuses)
}
// NOTE: this method will go away once we move payments into the validator-api
// it just helps us to get rid of having to query for reports of each node individually
// TODO: should that live on the 'Inner' struct or should it rather exist on the actual storage struct
// since technically it doesn't touch any SQL directly
pub(crate) async fn get_all_active_gateways_statuses(
/// Obtains all statuses of active gateways from the specified time interval.
///
/// # Arguments
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(super) async fn get_all_active_gateways_statuses_in_interval(
&self,
since: UnixTimestamp,
until: UnixTimestamp,
) -> Result<Vec<ActiveNodeDayStatuses>, sqlx::Error> {
let active_nodes = self.get_all_active_gateways(since).await?;
let active_nodes = self
.get_all_active_gateways_in_interval(since, until)
.await?;
let mut active_day_statuses = Vec::with_capacity(active_nodes.len());
for active_node in active_nodes.into_iter() {
let ipv4_statuses = self
.get_gateway_ipv4_statuses_since_by_id(active_node.id, since)
.get_gateway_ipv4_statuses_by_id(active_node.id, since, until)
.await?;
let ipv6_statuses = self
.get_gateway_ipv6_statuses_since_by_id(active_node.id, since)
.get_gateway_ipv6_statuses_by_id(active_node.id, since, until)
.await?;
let statuses = ActiveNodeDayStatuses {
identity: active_node.identity,
owner: active_node.owner,
node_id: active_node.id,
ipv4_statuses,
ipv6_statuses,
};
+249 -124
View File
@@ -4,22 +4,25 @@
use crate::network_monitor::monitor::summary_producer::NodeResult;
use crate::node_status_api::models::{
GatewayStatusReport, GatewayUptimeHistory, MixnodeStatusReport, MixnodeUptimeHistory,
NodeStatusApiError, Uptime,
NodeStatusApiError,
};
use crate::node_status_api::{ONE_DAY, ONE_HOUR};
use crate::storage::manager::StorageManager;
use crate::storage::models::NodeStatus;
use crate::storage::models::{
EpochRewarding, FailedGatewayRewardChunk, FailedMixnodeRewardChunk, NodeStatus,
PossiblyUnrewardedGateway, PossiblyUnrewardedMixnode, RewardingReport,
};
use rocket::fairing::{self, AdHoc};
use rocket::{Build, Rocket};
use sqlx::types::time::OffsetDateTime;
use sqlx::ConnectOptions;
use std::path::PathBuf;
use time::OffsetDateTime;
pub(crate) mod manager;
pub(crate) mod models;
// A type alias to be more explicit about type of timestamp used.
type UnixTimestamp = i64;
pub(crate) type UnixTimestamp = i64;
// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
@@ -294,32 +297,38 @@ impl NodeStatusStorage {
))
}
// NOTE: this method will go away once we move payments into the validator-api
// it just helps us to get rid of having to query for reports of each node individually
pub(crate) async fn get_all_mixnode_reports(
/// Obtain status reports of mixnodes that were active in the specified time interval.
///
/// # Arguments
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `end`: unix timestamp indicating the upper bound interval of the selection.
// NOTE: even though the arguments would suggest this function is generic in regards to
// epoch length, the constructed reports still assume the epochs are 24h in length.
pub(crate) async fn get_all_active_mixnode_reports_in_interval(
&self,
start: UnixTimestamp,
end: UnixTimestamp,
) -> Result<Vec<MixnodeStatusReport>, NodeStatusApiError> {
let now = OffsetDateTime::now_utc();
let day_ago = (now - ONE_DAY).unix_timestamp();
let hour_ago = (now - ONE_HOUR).unix_timestamp();
if (end - start) as u64 != ONE_DAY.as_secs() {
warn!("Our current epoch length breaks the 24h length assumption")
}
let hour_ago = end - ONE_HOUR.as_secs() as i64;
// determine the number of runs the mixnodes should have been online for
let last_hour_runs_count = self
.get_monitor_runs_count(hour_ago, now.unix_timestamp())
.await?;
let last_day_runs_count = self
.get_monitor_runs_count(day_ago, now.unix_timestamp())
.await?;
let last_hour_runs_count = self.get_monitor_runs_count(hour_ago, end).await?;
let last_day_runs_count = self.get_monitor_runs_count(start, end).await?;
let reports = self
.manager
.get_all_active_mixnodes_statuses(day_ago)
.get_all_active_mixnodes_statuses_in_interval(start, end)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?
.into_iter()
.map(|statuses| {
MixnodeStatusReport::construct_from_last_day_reports(
now,
OffsetDateTime::from_unix_timestamp(end).unwrap(),
statuses.identity,
statuses.owner,
statuses.ipv4_statuses,
@@ -333,32 +342,38 @@ impl NodeStatusStorage {
Ok(reports)
}
// NOTE: this method will go away once we move payments into the validator-api
// it just helps us to get rid of having to query for reports of each node individually
pub(crate) async fn get_all_gateway_reports(
/// Obtain status reports of gateways that were active in the specified time interval.
///
/// # Arguments
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `end`: unix timestamp indicating the upper bound interval of the selection.
// NOTE: even though the arguments would suggest this function is generic in regards to
// epoch length, the constructed reports still assume the epochs are 24h in length.
pub(crate) async fn get_all_active_gateway_reports_in_interval(
&self,
start: UnixTimestamp,
end: UnixTimestamp,
) -> Result<Vec<GatewayStatusReport>, NodeStatusApiError> {
let now = OffsetDateTime::now_utc();
let day_ago = (now - ONE_DAY).unix_timestamp();
let hour_ago = (now - ONE_HOUR).unix_timestamp();
if (end - start) as u64 != ONE_DAY.as_secs() {
warn!("Our current epoch length breaks the 24h length assumption")
}
// determine the number of runs the gateways should have been online for
let last_hour_runs_count = self
.get_monitor_runs_count(hour_ago, now.unix_timestamp())
.await?;
let last_day_runs_count = self
.get_monitor_runs_count(day_ago, now.unix_timestamp())
.await?;
let hour_ago = end - ONE_HOUR.as_secs() as i64;
// determine the number of runs the mixnodes should have been online for
let last_hour_runs_count = self.get_monitor_runs_count(hour_ago, end).await?;
let last_day_runs_count = self.get_monitor_runs_count(start, end).await?;
let reports = self
.manager
.get_all_active_gateways_statuses(day_ago)
.get_all_active_gateways_statuses_in_interval(start, end)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?
.into_iter()
.map(|statuses| {
GatewayStatusReport::construct_from_last_day_reports(
now,
OffsetDateTime::from_unix_timestamp(end).unwrap(),
statuses.identity,
statuses.owner,
statuses.ipv4_statuses,
@@ -428,87 +443,79 @@ impl NodeStatusStorage {
Ok(run_count as usize)
}
// Called on timer/reward script
async fn update_historical_uptimes(
/// Given lists of reports of all active mixnodes and gateways, inserts the data into the
/// historical uptime tables.
///
/// This method is called at every reward cycle. Note that currently to work as expected, it
/// assumes a 24h epoch period. If this assumption is broken, this method should be called
/// on an independent timer.
///
/// # Arguments
///
/// * `today_iso_8601`: today's date expressed in ISO 8601, i.e. YYYY-MM-DD
/// * `mixnode_reports`: slice of reports for all active mixnodes
/// * `gateway_reports`: slice of reports for all active gateways
pub(crate) async fn update_historical_uptimes(
&self,
today_iso_8601: &str,
mixnode_reports: &[MixnodeStatusReport],
gateway_reports: &[GatewayStatusReport],
) -> Result<(), NodeStatusApiError> {
let now = OffsetDateTime::now_utc();
let day_ago = (now - ONE_DAY).unix_timestamp();
for report in mixnode_reports {
// if this ever fails, we have a super weird error because we just constructed report for that node
// and we never delete node data!
let node_id = match self
.manager
.get_mixnode_id(&report.identity)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?
{
Some(node_id) => node_id,
None => {
error!(
"Somehow we failed to grab id of mixnode {} from the database!",
&report.identity
);
continue;
}
};
// get statuses for all active mixnodes...
let active_mixnodes_statuses = self
.manager
.get_all_active_mixnodes_statuses(day_ago)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?;
for statuses in active_mixnodes_statuses.into_iter() {
let ipv4_day_up = statuses
.ipv4_statuses
.iter()
.filter(|status| status.up)
.count();
let ipv6_day_up = statuses
.ipv6_statuses
.iter()
.filter(|status| status.up)
.count();
// calculate their uptimes for the last 24h
let ipv4_uptime = Uptime::from_ratio(ipv4_day_up, statuses.ipv4_statuses.len())
.unwrap()
.u8();
let ipv6_uptime = Uptime::from_ratio(ipv6_day_up, statuses.ipv6_statuses.len())
.unwrap()
.u8();
// and insert into the database
self.manager
.insert_mixnode_historical_uptime(
statuses.node_id,
node_id,
today_iso_8601,
ipv4_uptime,
ipv6_uptime,
report.last_day_ipv4.u8(),
report.last_day_ipv4.u8(),
)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?;
}
// get statuses for all active gateways...
let active_gateways_statuses = self
.manager
.get_all_active_gateways_statuses(day_ago)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?;
for report in gateway_reports {
// if this ever fails, we have a super weird error because we just constructed report for that node
// and we never delete node data!
let node_id = match self
.manager
.get_gateway_id(&report.identity)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?
{
Some(node_id) => node_id,
None => {
error!(
"Somehow we failed to grab id of gateway {} from the database!",
&report.identity
);
continue;
}
};
for statuses in active_gateways_statuses.into_iter() {
let ipv4_day_up = statuses
.ipv4_statuses
.iter()
.filter(|status| status.up)
.count();
let ipv6_day_up = statuses
.ipv6_statuses
.iter()
.filter(|status| status.up)
.count();
// calculate their uptimes for the last 24h
let ipv4_uptime = Uptime::from_ratio(ipv4_day_up, statuses.ipv4_statuses.len())
.unwrap()
.u8();
let ipv6_uptime = Uptime::from_ratio(ipv6_day_up, statuses.ipv6_statuses.len())
.unwrap()
.u8();
// and insert into the database
self.manager
.insert_gateway_historical_uptime(
statuses.node_id,
node_id,
today_iso_8601,
ipv4_uptime,
ipv6_uptime,
report.last_day_ipv4.u8(),
report.last_day_ipv4.u8(),
)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?;
@@ -517,7 +524,7 @@ impl NodeStatusStorage {
Ok(())
}
async fn check_if_historical_uptimes_exist_for_date(
pub(crate) async fn check_if_historical_uptimes_exist_for_date(
&self,
date_iso_8601: &str,
) -> Result<bool, NodeStatusApiError> {
@@ -527,45 +534,163 @@ impl NodeStatusStorage {
.map_err(|_| NodeStatusApiError::InternalDatabaseError)
}
// Called on timer/reward script
async fn purge_old_statuses(&self) -> Result<(), NodeStatusApiError> {
let now = OffsetDateTime::now_utc();
let two_days_ago = (now - 2 * ONE_DAY).unix_timestamp();
/// Removes all ipv4 and ipv6 statuses for all mixnodes and gateways that are older than the
/// provided timestamp. This method is called at every reward cycle.
///
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(crate) async fn purge_old_statuses(
&self,
until: UnixTimestamp,
) -> Result<(), NodeStatusApiError> {
self.manager
.purge_old_mixnode_ipv4_statuses(two_days_ago)
.purge_old_mixnode_ipv4_statuses(until)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?;
self.manager
.purge_old_mixnode_ipv6_statuses(two_days_ago)
.purge_old_mixnode_ipv6_statuses(until)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?;
self.manager
.purge_old_gateway_ipv4_statuses(two_days_ago)
.purge_old_gateway_ipv4_statuses(until)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?;
self.manager
.purge_old_gateway_ipv6_statuses(two_days_ago)
.purge_old_gateway_ipv6_statuses(until)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)
}
pub(crate) async fn daily_chores(&self) -> Result<bool, NodeStatusApiError> {
let today_iso_8601 = OffsetDateTime::now_utc().date().to_string();
////////////////////////////////////////////////////////////////////////
// TODO: Should all of the below really return a "NodeStatusApi" Errors?
////////////////////////////////////////////////////////////////////////
// if we have already performed the update for today's date, don't do anything
if self
.check_if_historical_uptimes_exist_for_date(&today_iso_8601)
.await?
{
Ok(false)
} else {
info!(
"Updating historical daily uptimes of all nodes and purging old status reports..."
);
self.update_historical_uptimes(&today_iso_8601).await?;
self.purge_old_statuses().await?;
Ok(true)
}
/// Inserts information about starting new epoch rewarding into the database.
/// Returns id of the newly created entry.
///
/// # Arguments
///
/// * `epoch_timestamp`: Unix timestamp of this rewarding epoch.
pub(crate) async fn insert_started_epoch_rewarding(
&self,
epoch_timestamp: UnixTimestamp,
) -> Result<i64, NodeStatusApiError> {
self.manager
.insert_new_epoch_rewarding(epoch_timestamp)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)
}
// /// Tries to obtain the most recent epoch rewarding entry currently stored.
// ///
// /// Returns None if no data exists.
// pub(crate) async fn get_most_recent_epoch_rewarding_entry(
// &self,
// ) -> Result<Option<EpochRewarding>, NodeStatusApiError> {
// self.manager
// .get_most_recent_epoch_rewarding_entry()
// .await
// .map_err(|_| NodeStatusApiError::InternalDatabaseError)
// }
/// Tries to obtain the epoch rewarding entry that has the provided timestamp.
///
/// Returns None if no data exists.
///
/// # Arguments
///
/// * `epoch_timestamp`: Unix timestamp of this rewarding epoch.
pub(super) async fn get_epoch_rewarding_entry(
&self,
epoch_timestamp: UnixTimestamp,
) -> Result<Option<EpochRewarding>, NodeStatusApiError> {
self.manager
.get_epoch_rewarding_entry(epoch_timestamp)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)
}
/// Sets the `finished` field on the epoch rewarding to true and inserts the rewarding report into
/// the database.
///
/// # Arguments
///
/// * `report`: report to insert into the database
pub(crate) async fn finish_rewarding_epoch_and_insert_report(
&self,
report: RewardingReport,
) -> Result<(), NodeStatusApiError> {
self.manager
.update_finished_epoch_rewarding(report.epoch_rewarding_id)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)?;
self.manager
.insert_rewarding_report(report)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)
}
/// Inserts new failed mixnode reward chunk information into the database.
/// Returns id of the newly created entry.
///
/// # Arguments
///
/// * `failed_chunk`: chunk information to insert.
pub(crate) async fn insert_failed_mixnode_reward_chunk(
&self,
failed_chunk: FailedMixnodeRewardChunk,
) -> Result<i64, NodeStatusApiError> {
self.manager
.insert_failed_mixnode_reward_chunk(failed_chunk)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)
}
/// Inserts new failed gateway reward chunk information into the database.
/// Returns id of the newly created entry.
///
/// # Arguments
///
/// * `failed_chunk`: chunk information to insert.
pub(crate) async fn insert_failed_gateway_reward_chunk(
&self,
failed_chunk: FailedGatewayRewardChunk,
) -> Result<i64, NodeStatusApiError> {
self.manager
.insert_failed_gateway_reward_chunk(failed_chunk)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)
}
/// Inserts information into the database about a mixnode that might have been unfairly unrewarded this epoch.
///
/// # Arguments
///
/// * `mixnode`: mixnode information to insert.
pub(crate) async fn insert_possibly_unrewarded_mixnode(
&self,
mixnode: PossiblyUnrewardedMixnode,
) -> Result<(), NodeStatusApiError> {
self.manager
.insert_possibly_unrewarded_mixnode(mixnode)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)
}
/// Inserts information into the database about a gateway that might have been unfairly unrewarded this epoch.
///
/// # Arguments
///
/// * `gateway`: mixnode information to insert.
pub(crate) async fn insert_possibly_unrewarded_gateway(
&self,
gateway: PossiblyUnrewardedGateway,
) -> Result<(), NodeStatusApiError> {
self.manager
.insert_possibly_unrewarded_gateway(gateway)
.await
.map_err(|_| NodeStatusApiError::InternalDatabaseError)
}
}
+48 -1
View File
@@ -1,9 +1,11 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::UnixTimestamp;
// Internally used struct to catch results from the database to calculate uptimes for given mixnode/gateway
pub(crate) struct NodeStatus {
pub(crate) timestamp: i64,
pub(crate) timestamp: UnixTimestamp,
pub(crate) up: bool,
}
@@ -13,3 +15,48 @@ pub(crate) struct ActiveNode {
pub(crate) identity: String,
pub(crate) owner: String,
}
pub(crate) struct EpochRewarding {
#[allow(dead_code)]
pub(crate) id: i64,
#[allow(dead_code)]
pub(crate) epoch_timestamp: i64,
pub(crate) finished: bool,
}
pub(crate) struct RewardingReport {
// references particular epoch_rewarding
pub(crate) epoch_rewarding_id: i64,
pub(crate) eligible_mixnodes: i64,
pub(crate) eligible_gateways: i64,
pub(crate) possibly_unrewarded_mixnodes: i64,
pub(crate) possibly_unrewarded_gateways: i64,
}
pub(crate) struct FailedMixnodeRewardChunk {
// references particular epoch_rewarding (there can be multiple chunks in a rewarding epoch)
pub(crate) epoch_rewarding_id: i64,
pub(crate) error_message: String,
}
pub(crate) struct PossiblyUnrewardedMixnode {
// references particular FailedMixnodeRewardChunk (there can be multiple nodes in a chunk)
pub(crate) chunk_id: i64,
pub(crate) identity: String,
pub(crate) uptime: u8,
}
pub(crate) struct FailedGatewayRewardChunk {
// references particular epoch_rewarding (there can be multiple chunks in a rewarding epoch)
pub(crate) epoch_rewarding_id: i64,
pub(crate) error_message: String,
}
pub(crate) struct PossiblyUnrewardedGateway {
// references particular FailedGatewayRewardChunk (there can be multiple nodes in a chunk)
pub(crate) chunk_id: i64,
pub(crate) identity: String,
pub(crate) uptime: u8,
}