Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7e0659a5e8 | |||
| 49da178f80 | |||
| fa462b282d | |||
| b7f5a29613 |
Generated
+1
@@ -4470,6 +4470,7 @@ dependencies = [
|
||||
"cw2",
|
||||
"cw3",
|
||||
"cw4",
|
||||
"dashmap",
|
||||
"dirs",
|
||||
"futures",
|
||||
"getset",
|
||||
|
||||
@@ -20,6 +20,7 @@ bloomfilter = { workspace = true }
|
||||
cfg-if = { workspace = true }
|
||||
clap = { workspace = true, features = ["cargo", "derive", "env"] }
|
||||
console-subscriber = { workspace = true, optional = true } # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
|
||||
dashmap = { workspace = true }
|
||||
dirs = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
|
||||
@@ -14,7 +14,6 @@ use nym_sphinx::params::PacketType;
|
||||
use nym_sphinx::receiver::MessageReceiver;
|
||||
use nym_task::TaskClient;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::process;
|
||||
use tokio::time::{sleep, Duration, Instant};
|
||||
use tracing::{debug, error, info, trace};
|
||||
|
||||
@@ -95,10 +94,7 @@ impl<R: MessageReceiver + Send> Monitor<R> {
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("Failed to submit monitor run information to the database - {err}",);
|
||||
|
||||
// TODO: slightly more graceful shutdown here
|
||||
process::exit(1);
|
||||
error!("Failed to submit monitor run information to the database: {err}",);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -89,7 +89,6 @@ pub(crate) async fn submit_gateway_monitoring_results(
|
||||
|
||||
match state
|
||||
.storage
|
||||
.manager
|
||||
.submit_gateway_statuses_v2(message.results())
|
||||
.await
|
||||
{
|
||||
@@ -134,7 +133,6 @@ pub(crate) async fn submit_node_monitoring_results(
|
||||
|
||||
match state
|
||||
.storage
|
||||
.manager
|
||||
.submit_mixnode_statuses_v2(message.results())
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::support::storage::models::{
|
||||
ActiveGateway, ActiveMixnode, GatewayDetails, HistoricalUptime, MixnodeDetails, NodeStatus,
|
||||
RewardingReport, TestedGatewayStatus, TestedMixnodeStatus, TestingRoute,
|
||||
};
|
||||
use crate::support::storage::DbIdCache;
|
||||
use nym_mixnet_contract_common::{EpochId, IdentityKey, NodeId};
|
||||
use nym_types::monitoring::NodeResult;
|
||||
use sqlx::FromRow;
|
||||
@@ -51,24 +52,7 @@ impl AvgGatewayReliability {
|
||||
|
||||
// all SQL goes here
|
||||
impl StorageManager {
|
||||
pub(crate) async fn get_mixnode_mix_ids_by_identity(
|
||||
&self,
|
||||
identity: &str,
|
||||
) -> Result<Vec<NodeId>, sqlx::Error> {
|
||||
let ids = sqlx::query!(
|
||||
r#"SELECT mix_id as "mix_id: NodeId" FROM mixnode_details WHERE identity_key = ?"#,
|
||||
identity
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| row.mix_id)
|
||||
.collect();
|
||||
|
||||
Ok(ids)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_avg_mix_reliability_in_last_24hr(
|
||||
pub(super) async fn get_all_avg_mix_reliability_in_last_24hr(
|
||||
&self,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Vec<AvgMixnodeReliability>, sqlx::Error> {
|
||||
@@ -77,7 +61,7 @@ impl StorageManager {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
|
||||
pub(super) async fn get_all_avg_gateway_reliability_in_last_24hr(
|
||||
&self,
|
||||
end_ts_secs: i64,
|
||||
) -> Result<Vec<AvgGatewayReliability>, sqlx::Error> {
|
||||
@@ -86,7 +70,7 @@ impl StorageManager {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_avg_mix_reliability_in_time_interval(
|
||||
pub(super) async fn get_all_avg_mix_reliability_in_time_interval(
|
||||
&self,
|
||||
start_ts_secs: i64,
|
||||
end_ts_secs: i64,
|
||||
@@ -114,7 +98,7 @@ impl StorageManager {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_avg_gateway_reliability_in_interval(
|
||||
pub(super) async fn get_all_avg_gateway_reliability_in_interval(
|
||||
&self,
|
||||
start_ts_secs: i64,
|
||||
end_ts_secs: i64,
|
||||
@@ -147,7 +131,7 @@ impl StorageManager {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
|
||||
pub(crate) async fn get_mixnode_database_id(
|
||||
pub(super) async fn get_mixnode_database_id(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
) -> Result<Option<i64>, sqlx::Error> {
|
||||
@@ -159,7 +143,7 @@ impl StorageManager {
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_gateway_database_id(
|
||||
pub(super) async fn get_gateway_database_id(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> Result<Option<i64>, sqlx::Error> {
|
||||
@@ -172,7 +156,7 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
/// Tries to obtain row id of given gateway given its identity
|
||||
pub(crate) async fn get_gateway_database_id_by_identity(
|
||||
pub(super) async fn get_gateway_database_id_by_identity(
|
||||
&self,
|
||||
identity: &str,
|
||||
) -> Result<Option<i64>, sqlx::Error> {
|
||||
@@ -187,7 +171,7 @@ impl StorageManager {
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_gateway_node_id_from_identity_key(
|
||||
pub(super) async fn get_gateway_node_id_from_identity_key(
|
||||
&self,
|
||||
identity: &str,
|
||||
) -> Result<Option<NodeId>, sqlx::Error> {
|
||||
@@ -202,7 +186,7 @@ impl StorageManager {
|
||||
Ok(node_id)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_gateway_identity_key(
|
||||
pub(super) async fn get_gateway_identity_key(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> Result<Option<IdentityKey>, sqlx::Error> {
|
||||
@@ -222,7 +206,7 @@ impl StorageManager {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
|
||||
pub(crate) async fn get_mixnode_identity_key(
|
||||
pub(super) async fn get_mixnode_identity_key(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
) -> Result<Option<IdentityKey>, sqlx::Error> {
|
||||
@@ -244,7 +228,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
|
||||
/// * `timestamp`: unix timestamp of the lower bound of the selection.
|
||||
pub(crate) async fn get_mixnode_statuses_since(
|
||||
pub(super) async fn get_mixnode_statuses_since(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
timestamp: i64,
|
||||
@@ -272,7 +256,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `identity`: identity (base58-encoded public key) of the gateway.
|
||||
/// * `timestamp`: unix timestamp of the lower bound of the selection.
|
||||
pub(crate) async fn get_gateway_statuses_since(
|
||||
pub(super) async fn get_gateway_statuses_since(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
timestamp: i64,
|
||||
@@ -298,7 +282,7 @@ impl StorageManager {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
|
||||
pub(crate) async fn get_mixnode_historical_uptimes(
|
||||
pub(super) async fn get_mixnode_historical_uptimes(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
) -> Result<Vec<ApiHistoricalUptime>, sqlx::Error> {
|
||||
@@ -336,7 +320,7 @@ impl StorageManager {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `identity`: identity (base58-encoded public key) of the gateway.
|
||||
pub(crate) async fn get_gateway_historical_uptimes(
|
||||
pub(super) async fn get_gateway_historical_uptimes(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> Result<Vec<ApiHistoricalUptime>, sqlx::Error> {
|
||||
@@ -369,7 +353,7 @@ impl StorageManager {
|
||||
Ok(uptimes)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_historical_mix_uptime_on(
|
||||
pub(super) async fn get_historical_mix_uptime_on(
|
||||
&self,
|
||||
contract_node_id: i64,
|
||||
date: Date,
|
||||
@@ -393,7 +377,7 @@ impl StorageManager {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_historical_gateway_uptime_on(
|
||||
pub(super) async fn get_historical_gateway_uptime_on(
|
||||
&self,
|
||||
contract_node_id: i64,
|
||||
date: Date,
|
||||
@@ -424,7 +408,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
|
||||
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
|
||||
pub(crate) async fn get_mixnode_statuses_by_database_id(
|
||||
pub(super) async fn get_mixnode_statuses_by_database_id(
|
||||
&self,
|
||||
id: i64,
|
||||
since: i64,
|
||||
@@ -445,7 +429,7 @@ impl StorageManager {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_mixnode_average_reliability_in_interval(
|
||||
pub(super) async fn get_mixnode_average_reliability_in_interval(
|
||||
&self,
|
||||
id: i64,
|
||||
start: i64,
|
||||
@@ -507,7 +491,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
|
||||
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
|
||||
pub(crate) async fn get_gateway_statuses_by_database_id(
|
||||
pub(super) async fn get_gateway_statuses_by_database_id(
|
||||
&self,
|
||||
id: i64,
|
||||
since: i64,
|
||||
@@ -534,27 +518,36 @@ impl StorageManager {
|
||||
///
|
||||
/// * `timestamp`: unix timestamp indicating when the measurements took place.
|
||||
/// * `mixnode_results`: reliability results of each node that got tested.
|
||||
pub(crate) async fn submit_mixnode_statuses(
|
||||
pub(super) async fn submit_mixnode_statuses(
|
||||
&self,
|
||||
timestamp: i64,
|
||||
mixnode_results: Vec<NodeResult>,
|
||||
id_cache: &DbIdCache,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
// insert it all in a transaction to make sure all nodes are updated at the same time
|
||||
// (plus it's a nice guard against new nodes)
|
||||
let mut tx = self.connection_pool.begin().await?;
|
||||
for mixnode_result in mixnode_results {
|
||||
let mixnode_id = sqlx::query!(
|
||||
r#"
|
||||
INSERT OR IGNORE INTO mixnode_details(mix_id, identity_key) VALUES (?, ?);
|
||||
SELECT id FROM mixnode_details WHERE mix_id = ?;
|
||||
"#,
|
||||
mixnode_result.node_id,
|
||||
mixnode_result.identity,
|
||||
mixnode_result.node_id,
|
||||
)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?
|
||||
.id;
|
||||
let mixnode_id = match id_cache.mixnode_db_id(mixnode_result.node_id) {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
let mixnode_id = sqlx::query!(
|
||||
r#"
|
||||
INSERT OR IGNORE INTO mixnode_details(mix_id, identity_key) VALUES (?, ?);
|
||||
SELECT id FROM mixnode_details WHERE mix_id = ?;
|
||||
"#,
|
||||
mixnode_result.node_id,
|
||||
mixnode_result.identity,
|
||||
mixnode_result.node_id,
|
||||
)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?
|
||||
.id;
|
||||
id_cache.set_mixnode_db_id(mixnode_result.node_id, mixnode_id);
|
||||
|
||||
mixnode_id
|
||||
}
|
||||
};
|
||||
|
||||
// insert the actual status
|
||||
sqlx::query!(
|
||||
@@ -573,7 +566,7 @@ impl StorageManager {
|
||||
tx.commit().await
|
||||
}
|
||||
|
||||
pub(crate) async fn submit_mixnode_statuses_v2(
|
||||
pub(super) async fn submit_mixnode_statuses_v2(
|
||||
&self,
|
||||
mixnode_results: &[NodeResult],
|
||||
) -> Result<(), sqlx::Error> {
|
||||
@@ -620,10 +613,11 @@ impl StorageManager {
|
||||
///
|
||||
/// * `timestamp`: unix timestamp indicating when the measurements took place.
|
||||
/// * `gateway_results`: reliability results of each node that got tested.
|
||||
pub(crate) async fn submit_gateway_statuses(
|
||||
pub(super) async fn submit_gateway_statuses(
|
||||
&self,
|
||||
timestamp: i64,
|
||||
gateway_results: Vec<NodeResult>,
|
||||
id_cache: &DbIdCache,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
// insert it all in a transaction to make sure all nodes are updated at the same time
|
||||
// (plus it's a nice guard against new nodes)
|
||||
@@ -631,39 +625,45 @@ impl StorageManager {
|
||||
|
||||
for gateway_result in gateway_results {
|
||||
// if gateway info doesn't exist, insert it and get its id
|
||||
|
||||
// same ID "problem" as described for mixnode insertion
|
||||
let gateway_id = sqlx::query!(
|
||||
r#"
|
||||
INSERT OR IGNORE INTO gateway_details(node_id, identity) VALUES (?, ?);
|
||||
SELECT id FROM gateway_details WHERE identity = ?;
|
||||
"#,
|
||||
gateway_result.node_id,
|
||||
gateway_result.identity,
|
||||
gateway_result.identity,
|
||||
)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?
|
||||
.id;
|
||||
let gateway_id = match id_cache.gateway_db_id(gateway_result.node_id) {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
let gateway_id = sqlx::query!(
|
||||
r#"
|
||||
INSERT OR IGNORE INTO gateway_details(node_id, identity) VALUES (?, ?);
|
||||
SELECT id FROM gateway_details WHERE identity = ?;
|
||||
"#,
|
||||
gateway_result.node_id,
|
||||
gateway_result.identity,
|
||||
gateway_result.identity,
|
||||
)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?
|
||||
.id;
|
||||
id_cache.set_gateway_db_id(gateway_result.node_id, gateway_id);
|
||||
gateway_id
|
||||
}
|
||||
};
|
||||
|
||||
// insert the actual status
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO gateway_status (gateway_details_id, reliability, timestamp) VALUES (?, ?, ?);
|
||||
"#,
|
||||
gateway_id,
|
||||
gateway_result.reliability,
|
||||
timestamp
|
||||
)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
r#"
|
||||
INSERT INTO gateway_status (gateway_details_id, reliability, timestamp) VALUES (?, ?, ?);
|
||||
"#,
|
||||
gateway_id,
|
||||
gateway_result.reliability,
|
||||
timestamp
|
||||
)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// finally commit the transaction
|
||||
tx.commit().await
|
||||
}
|
||||
|
||||
pub(crate) async fn submit_gateway_statuses_v2(
|
||||
pub(super) async fn submit_gateway_statuses_v2(
|
||||
&self,
|
||||
gateway_results: &[NodeResult],
|
||||
) -> Result<(), sqlx::Error> {
|
||||
@@ -714,7 +714,7 @@ impl StorageManager {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `testing_route`: test route used for this particular network monitor run.
|
||||
pub(crate) async fn submit_testing_route_used(
|
||||
pub(super) async fn submit_testing_route_used(
|
||||
&self,
|
||||
testing_route: TestingRoute,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
@@ -742,7 +742,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `db_mixnode_id`: id (as saved in the database) of the mixnode.
|
||||
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
|
||||
pub(crate) async fn get_mixnode_testing_route_presence_count_since(
|
||||
pub(super) async fn get_mixnode_testing_route_presence_count_since(
|
||||
&self,
|
||||
db_mixnode_id: i64,
|
||||
since: i64,
|
||||
@@ -781,7 +781,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `gateway_id`: id (as saved in the database) of the gateway.
|
||||
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
|
||||
pub(crate) async fn get_gateway_testing_route_presence_count_since(
|
||||
pub(super) async fn get_gateway_testing_route_presence_count_since(
|
||||
&self,
|
||||
gateway_id: i64,
|
||||
since: i64,
|
||||
@@ -813,7 +813,7 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
/// Checks whether there are already any historical uptimes with this particular date.
|
||||
pub(crate) async fn check_for_historical_uptime_existence(
|
||||
pub(super) async fn check_for_historical_uptime_existence(
|
||||
&self,
|
||||
today_iso_8601: &str,
|
||||
) -> Result<bool, sqlx::Error> {
|
||||
@@ -833,7 +833,7 @@ impl StorageManager {
|
||||
/// * `node_id`: id of the mixnode (as inserted in `mixnode_details_id` table).
|
||||
/// * `date`: date associated with the uptime represented in ISO 8601, i.e. YYYY-MM-DD.
|
||||
/// * `uptime`: the actual uptime of the node during the specified day.
|
||||
pub(crate) async fn insert_mixnode_historical_uptime(
|
||||
pub(super) async fn insert_mixnode_historical_uptime(
|
||||
&self,
|
||||
mix_id: i64,
|
||||
date: &str,
|
||||
@@ -855,7 +855,7 @@ impl StorageManager {
|
||||
/// * `node_id`: id of the gateway (as inserted in `gateway_details_id` table).
|
||||
/// * `date`: date associated with the uptime represented in ISO 8601, i.e. YYYY-MM-DD.
|
||||
/// * `uptime`: the actual uptime of the node during the specified day.
|
||||
pub(crate) async fn insert_gateway_historical_uptime(
|
||||
pub(super) async fn insert_gateway_historical_uptime(
|
||||
&self,
|
||||
db_id: i64,
|
||||
date: &str,
|
||||
@@ -876,7 +876,7 @@ impl StorageManager {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `timestamp`: unix timestamp at which the monitor test run has occurred
|
||||
pub(crate) async fn insert_monitor_run(&self, timestamp: i64) -> Result<i64, sqlx::Error> {
|
||||
pub(super) async fn insert_monitor_run(&self, timestamp: i64) -> Result<i64, sqlx::Error> {
|
||||
let res = sqlx::query!("INSERT INTO monitor_run(timestamp) VALUES (?)", timestamp)
|
||||
.execute(&self.connection_pool)
|
||||
.await?;
|
||||
@@ -889,7 +889,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
|
||||
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
|
||||
pub(crate) async fn get_monitor_runs_count(
|
||||
pub(super) async fn get_monitor_runs_count(
|
||||
&self,
|
||||
since: i64,
|
||||
until: i64,
|
||||
@@ -911,7 +911,7 @@ impl StorageManager {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `until`: timestamp specifying the purge cutoff.
|
||||
pub(crate) async fn purge_old_mixnode_statuses(
|
||||
pub(super) async fn purge_old_mixnode_statuses(
|
||||
&self,
|
||||
timestamp: i64,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
@@ -927,7 +927,7 @@ impl StorageManager {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `until`: timestamp specifying the purge cutoff.
|
||||
pub(crate) async fn purge_old_gateway_statuses(
|
||||
pub(super) async fn purge_old_gateway_statuses(
|
||||
&self,
|
||||
timestamp: i64,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
@@ -944,7 +944,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `since`: indicates the lower bound timestamp for deciding whether given mixnode is active
|
||||
/// * `until`: indicates the upper bound timestamp for deciding whether given mixnode is active
|
||||
pub(crate) async fn get_all_active_mixnodes_in_interval(
|
||||
pub(super) async fn get_all_active_mixnodes_in_interval(
|
||||
&self,
|
||||
since: i64,
|
||||
until: i64,
|
||||
@@ -978,7 +978,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `since`: indicates the lower bound timestamp for deciding whether given gateway is active
|
||||
/// * `until`: indicates the upper bound timestamp for deciding whether given gateway is active
|
||||
pub(crate) async fn get_all_active_gateways_in_interval(
|
||||
pub(super) async fn get_all_active_gateways_in_interval(
|
||||
&self,
|
||||
since: i64,
|
||||
until: i64,
|
||||
@@ -1025,7 +1025,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `report`: report to insert into the database
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn insert_rewarding_report(
|
||||
pub(super) async fn insert_rewarding_report(
|
||||
&self,
|
||||
report: RewardingReport,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
@@ -1044,7 +1044,7 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn get_rewarding_report(
|
||||
pub(super) async fn get_rewarding_report(
|
||||
&self,
|
||||
absolute_epoch_id: EpochId,
|
||||
) -> Result<Option<RewardingReport>, sqlx::Error> {
|
||||
@@ -1069,7 +1069,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
|
||||
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
|
||||
pub(crate) async fn get_all_active_mixnodes_statuses_in_interval(
|
||||
pub(super) async fn get_all_active_mixnodes_statuses_in_interval(
|
||||
&self,
|
||||
since: i64,
|
||||
until: i64,
|
||||
@@ -1102,7 +1102,7 @@ impl StorageManager {
|
||||
///
|
||||
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
|
||||
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
|
||||
pub(crate) async fn get_all_active_gateways_statuses_in_interval(
|
||||
pub(super) async fn get_all_active_gateways_statuses_in_interval(
|
||||
&self,
|
||||
since: i64,
|
||||
until: i64,
|
||||
@@ -1129,7 +1129,7 @@ impl StorageManager {
|
||||
Ok(active_day_statuses)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_mixnode_details_by_db_id(
|
||||
pub(super) async fn get_mixnode_details_by_db_id(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<Option<MixnodeDetails>, sqlx::Error> {
|
||||
@@ -1142,7 +1142,7 @@ impl StorageManager {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_gateway_details_by_db_id(
|
||||
pub(super) async fn get_gateway_details_by_db_id(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> Result<Option<GatewayDetails>, sqlx::Error> {
|
||||
@@ -1154,7 +1154,7 @@ impl StorageManager {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_mixnode_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
|
||||
pub(super) async fn get_mixnode_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
SELECT COUNT(*) as count
|
||||
@@ -1170,7 +1170,7 @@ impl StorageManager {
|
||||
.map(|record| record.count)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_mixnode_statuses(
|
||||
pub(super) async fn get_mixnode_statuses(
|
||||
&self,
|
||||
mix_id: NodeId,
|
||||
limit: u32,
|
||||
@@ -1206,7 +1206,7 @@ impl StorageManager {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_gateway_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
|
||||
pub(super) async fn get_gateway_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
|
||||
sqlx::query!(
|
||||
r#"
|
||||
SELECT COUNT(*) as count
|
||||
@@ -1222,7 +1222,7 @@ impl StorageManager {
|
||||
.map(|record| record.count)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_gateway_statuses(
|
||||
pub(super) async fn get_gateway_statuses(
|
||||
&self,
|
||||
gateway_identity: &str,
|
||||
limit: u32,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use self::manager::{AvgGatewayReliability, AvgMixnodeReliability};
|
||||
use crate::network_monitor::test_route::TestRoute;
|
||||
use crate::node_status_api::models::{
|
||||
GatewayStatusReport, GatewayUptimeHistory, HistoricalUptime as ApiHistoricalUptime,
|
||||
@@ -12,36 +13,70 @@ use crate::storage::models::{NodeStatus, TestingRoute};
|
||||
use crate::support::storage::models::{
|
||||
GatewayDetails, HistoricalUptime, MixnodeDetails, TestedGatewayStatus, TestedMixnodeStatus,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use nym_mixnet_contract_common::NodeId;
|
||||
use nym_types::monitoring::NodeResult;
|
||||
use sqlx::ConnectOptions;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use time::{Date, OffsetDateTime};
|
||||
use tracing::log::LevelFilter;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use self::manager::{AvgGatewayReliability, AvgMixnodeReliability};
|
||||
|
||||
pub(crate) mod manager;
|
||||
pub(crate) mod models;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct DbIdCache {
|
||||
pub mixnodes_v1: DashMap<NodeId, i64>,
|
||||
pub gateways_v1: DashMap<NodeId, i64>,
|
||||
}
|
||||
|
||||
impl DbIdCache {
|
||||
pub(crate) fn mixnode_db_id(&self, node_id: NodeId) -> Option<i64> {
|
||||
self.mixnodes_v1.get(&node_id).map(|v| *v)
|
||||
}
|
||||
|
||||
pub(crate) fn gateway_db_id(&self, node_id: NodeId) -> Option<i64> {
|
||||
self.gateways_v1.get(&node_id).map(|v| *v)
|
||||
}
|
||||
|
||||
pub(crate) fn set_mixnode_db_id(&self, node_id: NodeId, db_id: i64) {
|
||||
self.mixnodes_v1.insert(node_id, db_id);
|
||||
}
|
||||
|
||||
pub(crate) fn set_gateway_db_id(&self, node_id: NodeId, db_id: i64) {
|
||||
self.gateways_v1.insert(node_id, db_id);
|
||||
}
|
||||
}
|
||||
|
||||
// note that clone here is fine as upon cloning the same underlying pool will be used
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct NymApiStorage {
|
||||
pub manager: StorageManager,
|
||||
|
||||
pub db_id_cache: Arc<DbIdCache>,
|
||||
}
|
||||
|
||||
impl NymApiStorage {
|
||||
pub async fn init<P: AsRef<Path>>(database_path: P) -> Result<Self, NymApiStorageError> {
|
||||
// TODO: we can inject here more stuff based on our nym-api global config
|
||||
// struct. Maybe different pool size or timeout intervals?
|
||||
let opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
let connect_opts = sqlx::sqlite::SqliteConnectOptions::new()
|
||||
.filename(database_path)
|
||||
.create_if_missing(true)
|
||||
.disable_statement_logging();
|
||||
.log_statements(LevelFilter::Trace)
|
||||
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(250));
|
||||
|
||||
// TODO: do we want auto_vacuum ?
|
||||
|
||||
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
|
||||
let pool_opts = sqlx::sqlite::SqlitePoolOptions::new()
|
||||
.min_connections(5)
|
||||
.max_connections(25)
|
||||
.acquire_timeout(Duration::from_secs(60));
|
||||
|
||||
let connection_pool = match pool_opts.connect_with(connect_opts).await {
|
||||
Ok(db) => db,
|
||||
Err(err) => {
|
||||
error!("Failed to connect to SQLx database: {err}");
|
||||
@@ -58,32 +93,38 @@ impl NymApiStorage {
|
||||
|
||||
let storage = NymApiStorage {
|
||||
manager: StorageManager { connection_pool },
|
||||
db_id_cache: Arc::new(Default::default()),
|
||||
};
|
||||
|
||||
Ok(storage)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn mix_identity_to_mix_ids(
|
||||
pub(crate) async fn get_mixnode_database_id(
|
||||
&self,
|
||||
identity: &str,
|
||||
) -> Result<Vec<NodeId>, NymApiStorageError> {
|
||||
Ok(self
|
||||
.manager
|
||||
.get_mixnode_mix_ids_by_identity(identity)
|
||||
.await?)
|
||||
node_id: NodeId,
|
||||
) -> Result<Option<i64>, NymApiStorageError> {
|
||||
if let Some(cached) = self.db_id_cache.mixnode_db_id(node_id) {
|
||||
return Ok(Some(cached));
|
||||
}
|
||||
if let Some(retrieved) = self.manager.get_mixnode_database_id(node_id).await? {
|
||||
self.db_id_cache.set_mixnode_db_id(node_id, retrieved);
|
||||
return Ok(Some(retrieved));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn mix_identity_to_latest_mix_id(
|
||||
pub(crate) async fn get_gateway_database_id(
|
||||
&self,
|
||||
identity: &str,
|
||||
) -> Result<Option<NodeId>, NymApiStorageError> {
|
||||
Ok(self
|
||||
.mix_identity_to_mix_ids(identity)
|
||||
.await?
|
||||
.into_iter()
|
||||
.max())
|
||||
node_id: NodeId,
|
||||
) -> Result<Option<i64>, NymApiStorageError> {
|
||||
if let Some(cached) = self.db_id_cache.gateway_db_id(node_id) {
|
||||
return Ok(Some(cached));
|
||||
}
|
||||
if let Some(retrieved) = self.manager.get_gateway_database_id(node_id).await? {
|
||||
self.db_id_cache.set_gateway_db_id(node_id, retrieved);
|
||||
return Ok(Some(retrieved));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
|
||||
@@ -536,7 +577,6 @@ impl NymApiStorage {
|
||||
// we MUST have those entries in the database, otherwise the route wouldn't have been chosen
|
||||
// in the first place
|
||||
let layer1_mix_db_id = self
|
||||
.manager
|
||||
.get_mixnode_database_id(test_route.layer_one_mix().mix_id)
|
||||
.await?
|
||||
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
|
||||
@@ -544,7 +584,6 @@ impl NymApiStorage {
|
||||
})?;
|
||||
|
||||
let layer2_mix_db_id = self
|
||||
.manager
|
||||
.get_mixnode_database_id(test_route.layer_two_mix().mix_id)
|
||||
.await?
|
||||
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
|
||||
@@ -552,7 +591,6 @@ impl NymApiStorage {
|
||||
})?;
|
||||
|
||||
let layer3_mix_db_id = self
|
||||
.manager
|
||||
.get_mixnode_database_id(test_route.layer_three_mix().mix_id)
|
||||
.await?
|
||||
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
|
||||
@@ -560,7 +598,6 @@ impl NymApiStorage {
|
||||
})?;
|
||||
|
||||
let gateway_db_id = self
|
||||
.manager
|
||||
.get_gateway_database_id(test_route.gateway().node_id)
|
||||
.await?
|
||||
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
|
||||
@@ -661,11 +698,11 @@ impl NymApiStorage {
|
||||
let monitor_run_id = self.manager.insert_monitor_run(now).await?;
|
||||
|
||||
self.manager
|
||||
.submit_mixnode_statuses(now, mixnode_results)
|
||||
.submit_mixnode_statuses(now, mixnode_results, &self.db_id_cache)
|
||||
.await?;
|
||||
|
||||
self.manager
|
||||
.submit_gateway_statuses(now, gateway_results)
|
||||
.submit_gateway_statuses(now, gateway_results, &self.db_id_cache)
|
||||
.await?;
|
||||
|
||||
for test_route in test_routes {
|
||||
@@ -675,6 +712,26 @@ impl NymApiStorage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn submit_mixnode_statuses_v2(
|
||||
&self,
|
||||
mixnode_results: &[NodeResult],
|
||||
) -> Result<(), NymApiStorageError> {
|
||||
self.manager
|
||||
.submit_mixnode_statuses_v2(mixnode_results)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn submit_gateway_statuses_v2(
|
||||
&self,
|
||||
gateway_results: &[NodeResult],
|
||||
) -> Result<(), NymApiStorageError> {
|
||||
self.manager
|
||||
.submit_gateway_statuses_v2(gateway_results)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Obtains number of network monitor test runs that have occurred within the specified interval.
|
||||
///
|
||||
/// # Arguments
|
||||
|
||||
Reference in New Issue
Block a user