Compare commits

...

4 Commits

Author SHA1 Message Date
Jędrzej Stuczyński 7e0659a5e8 store in-memory cache of node database ids 2024-11-22 15:35:06 +00:00
Jędrzej Stuczyński 49da178f80 dont blow up upon failing to submit network monitor results 2024-11-22 15:10:26 +00:00
Jędrzej Stuczyński fa462b282d adding additional pool options 2024-11-22 15:08:05 +00:00
Jędrzej Stuczyński b7f5a29613 added statement logging to nym-api db 2024-11-22 15:04:10 +00:00
6 changed files with 184 additions and 131 deletions
Generated
+1
View File
@@ -4470,6 +4470,7 @@ dependencies = [
"cw2",
"cw3",
"cw4",
"dashmap",
"dirs",
"futures",
"getset",
+1
View File
@@ -20,6 +20,7 @@ bloomfilter = { workspace = true }
cfg-if = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env"] }
console-subscriber = { workspace = true, optional = true } # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
dashmap = { workspace = true }
dirs = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
+1 -5
View File
@@ -14,7 +14,6 @@ use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::MessageReceiver;
use nym_task::TaskClient;
use std::collections::{HashMap, HashSet};
use std::process;
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, error, info, trace};
@@ -95,10 +94,7 @@ impl<R: MessageReceiver + Send> Monitor<R> {
)
.await
{
error!("Failed to submit monitor run information to the database - {err}",);
// TODO: slightly more graceful shutdown here
process::exit(1);
error!("Failed to submit monitor run information to the database: {err}",);
}
}
@@ -89,7 +89,6 @@ pub(crate) async fn submit_gateway_monitoring_results(
match state
.storage
.manager
.submit_gateway_statuses_v2(message.results())
.await
{
@@ -134,7 +133,6 @@ pub(crate) async fn submit_node_monitoring_results(
match state
.storage
.manager
.submit_mixnode_statuses_v2(message.results())
.await
{
+96 -96
View File
@@ -7,6 +7,7 @@ use crate::support::storage::models::{
ActiveGateway, ActiveMixnode, GatewayDetails, HistoricalUptime, MixnodeDetails, NodeStatus,
RewardingReport, TestedGatewayStatus, TestedMixnodeStatus, TestingRoute,
};
use crate::support::storage::DbIdCache;
use nym_mixnet_contract_common::{EpochId, IdentityKey, NodeId};
use nym_types::monitoring::NodeResult;
use sqlx::FromRow;
@@ -51,24 +52,7 @@ impl AvgGatewayReliability {
// all SQL goes here
impl StorageManager {
pub(crate) async fn get_mixnode_mix_ids_by_identity(
&self,
identity: &str,
) -> Result<Vec<NodeId>, sqlx::Error> {
let ids = sqlx::query!(
r#"SELECT mix_id as "mix_id: NodeId" FROM mixnode_details WHERE identity_key = ?"#,
identity
)
.fetch_all(&self.connection_pool)
.await?
.into_iter()
.map(|row| row.mix_id)
.collect();
Ok(ids)
}
pub(crate) async fn get_all_avg_mix_reliability_in_last_24hr(
pub(super) async fn get_all_avg_mix_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgMixnodeReliability>, sqlx::Error> {
@@ -77,7 +61,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
pub(super) async fn get_all_avg_gateway_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgGatewayReliability>, sqlx::Error> {
@@ -86,7 +70,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_all_avg_mix_reliability_in_time_interval(
pub(super) async fn get_all_avg_mix_reliability_in_time_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
@@ -114,7 +98,7 @@ impl StorageManager {
Ok(result)
}
pub(crate) async fn get_all_avg_gateway_reliability_in_interval(
pub(super) async fn get_all_avg_gateway_reliability_in_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
@@ -147,7 +131,7 @@ impl StorageManager {
/// # Arguments
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
pub(crate) async fn get_mixnode_database_id(
pub(super) async fn get_mixnode_database_id(
&self,
mix_id: NodeId,
) -> Result<Option<i64>, sqlx::Error> {
@@ -159,7 +143,7 @@ impl StorageManager {
Ok(id)
}
pub(crate) async fn get_gateway_database_id(
pub(super) async fn get_gateway_database_id(
&self,
node_id: NodeId,
) -> Result<Option<i64>, sqlx::Error> {
@@ -172,7 +156,7 @@ impl StorageManager {
}
/// Tries to obtain row id of given gateway given its identity
pub(crate) async fn get_gateway_database_id_by_identity(
pub(super) async fn get_gateway_database_id_by_identity(
&self,
identity: &str,
) -> Result<Option<i64>, sqlx::Error> {
@@ -187,7 +171,7 @@ impl StorageManager {
Ok(id)
}
pub(crate) async fn get_gateway_node_id_from_identity_key(
pub(super) async fn get_gateway_node_id_from_identity_key(
&self,
identity: &str,
) -> Result<Option<NodeId>, sqlx::Error> {
@@ -202,7 +186,7 @@ impl StorageManager {
Ok(node_id)
}
pub(crate) async fn get_gateway_identity_key(
pub(super) async fn get_gateway_identity_key(
&self,
node_id: NodeId,
) -> Result<Option<IdentityKey>, sqlx::Error> {
@@ -222,7 +206,7 @@ impl StorageManager {
/// # Arguments
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
pub(crate) async fn get_mixnode_identity_key(
pub(super) async fn get_mixnode_identity_key(
&self,
mix_id: NodeId,
) -> Result<Option<IdentityKey>, sqlx::Error> {
@@ -244,7 +228,7 @@ impl StorageManager {
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
/// * `timestamp`: unix timestamp of the lower bound of the selection.
pub(crate) async fn get_mixnode_statuses_since(
pub(super) async fn get_mixnode_statuses_since(
&self,
mix_id: NodeId,
timestamp: i64,
@@ -272,7 +256,7 @@ impl StorageManager {
///
/// * `identity`: identity (base58-encoded public key) of the gateway.
/// * `timestamp`: unix timestamp of the lower bound of the selection.
pub(crate) async fn get_gateway_statuses_since(
pub(super) async fn get_gateway_statuses_since(
&self,
node_id: NodeId,
timestamp: i64,
@@ -298,7 +282,7 @@ impl StorageManager {
/// # Arguments
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
pub(crate) async fn get_mixnode_historical_uptimes(
pub(super) async fn get_mixnode_historical_uptimes(
&self,
mix_id: NodeId,
) -> Result<Vec<ApiHistoricalUptime>, sqlx::Error> {
@@ -336,7 +320,7 @@ impl StorageManager {
/// # Arguments
///
/// * `identity`: identity (base58-encoded public key) of the gateway.
pub(crate) async fn get_gateway_historical_uptimes(
pub(super) async fn get_gateway_historical_uptimes(
&self,
node_id: NodeId,
) -> Result<Vec<ApiHistoricalUptime>, sqlx::Error> {
@@ -369,7 +353,7 @@ impl StorageManager {
Ok(uptimes)
}
pub(crate) async fn get_historical_mix_uptime_on(
pub(super) async fn get_historical_mix_uptime_on(
&self,
contract_node_id: i64,
date: Date,
@@ -393,7 +377,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_historical_gateway_uptime_on(
pub(super) async fn get_historical_gateway_uptime_on(
&self,
contract_node_id: i64,
date: Date,
@@ -424,7 +408,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_mixnode_statuses_by_database_id(
pub(super) async fn get_mixnode_statuses_by_database_id(
&self,
id: i64,
since: i64,
@@ -445,7 +429,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_mixnode_average_reliability_in_interval(
pub(super) async fn get_mixnode_average_reliability_in_interval(
&self,
id: i64,
start: i64,
@@ -507,7 +491,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_gateway_statuses_by_database_id(
pub(super) async fn get_gateway_statuses_by_database_id(
&self,
id: i64,
since: i64,
@@ -534,27 +518,36 @@ impl StorageManager {
///
/// * `timestamp`: unix timestamp indicating when the measurements took place.
/// * `mixnode_results`: reliability results of each node that got tested.
pub(crate) async fn submit_mixnode_statuses(
pub(super) async fn submit_mixnode_statuses(
&self,
timestamp: i64,
mixnode_results: Vec<NodeResult>,
id_cache: &DbIdCache,
) -> Result<(), sqlx::Error> {
// insert it all in a transaction to make sure all nodes are updated at the same time
// (plus it's a nice guard against new nodes)
let mut tx = self.connection_pool.begin().await?;
for mixnode_result in mixnode_results {
let mixnode_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO mixnode_details(mix_id, identity_key) VALUES (?, ?);
SELECT id FROM mixnode_details WHERE mix_id = ?;
"#,
mixnode_result.node_id,
mixnode_result.identity,
mixnode_result.node_id,
)
.fetch_one(&mut *tx)
.await?
.id;
let mixnode_id = match id_cache.mixnode_db_id(mixnode_result.node_id) {
Some(id) => id,
None => {
let mixnode_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO mixnode_details(mix_id, identity_key) VALUES (?, ?);
SELECT id FROM mixnode_details WHERE mix_id = ?;
"#,
mixnode_result.node_id,
mixnode_result.identity,
mixnode_result.node_id,
)
.fetch_one(&mut *tx)
.await?
.id;
id_cache.set_mixnode_db_id(mixnode_result.node_id, mixnode_id);
mixnode_id
}
};
// insert the actual status
sqlx::query!(
@@ -573,7 +566,7 @@ impl StorageManager {
tx.commit().await
}
pub(crate) async fn submit_mixnode_statuses_v2(
pub(super) async fn submit_mixnode_statuses_v2(
&self,
mixnode_results: &[NodeResult],
) -> Result<(), sqlx::Error> {
@@ -620,10 +613,11 @@ impl StorageManager {
///
/// * `timestamp`: unix timestamp indicating when the measurements took place.
/// * `gateway_results`: reliability results of each node that got tested.
pub(crate) async fn submit_gateway_statuses(
pub(super) async fn submit_gateway_statuses(
&self,
timestamp: i64,
gateway_results: Vec<NodeResult>,
id_cache: &DbIdCache,
) -> Result<(), sqlx::Error> {
// insert it all in a transaction to make sure all nodes are updated at the same time
// (plus it's a nice guard against new nodes)
@@ -631,39 +625,45 @@ impl StorageManager {
for gateway_result in gateway_results {
// if gateway info doesn't exist, insert it and get its id
// same ID "problem" as described for mixnode insertion
let gateway_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO gateway_details(node_id, identity) VALUES (?, ?);
SELECT id FROM gateway_details WHERE identity = ?;
"#,
gateway_result.node_id,
gateway_result.identity,
gateway_result.identity,
)
.fetch_one(&mut *tx)
.await?
.id;
let gateway_id = match id_cache.gateway_db_id(gateway_result.node_id) {
Some(id) => id,
None => {
let gateway_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO gateway_details(node_id, identity) VALUES (?, ?);
SELECT id FROM gateway_details WHERE identity = ?;
"#,
gateway_result.node_id,
gateway_result.identity,
gateway_result.identity,
)
.fetch_one(&mut *tx)
.await?
.id;
id_cache.set_gateway_db_id(gateway_result.node_id, gateway_id);
gateway_id
}
};
// insert the actual status
sqlx::query!(
r#"
INSERT INTO gateway_status (gateway_details_id, reliability, timestamp) VALUES (?, ?, ?);
"#,
gateway_id,
gateway_result.reliability,
timestamp
)
.execute(&mut *tx)
.await?;
r#"
INSERT INTO gateway_status (gateway_details_id, reliability, timestamp) VALUES (?, ?, ?);
"#,
gateway_id,
gateway_result.reliability,
timestamp
)
.execute(&mut *tx)
.await?;
}
// finally commit the transaction
tx.commit().await
}
pub(crate) async fn submit_gateway_statuses_v2(
pub(super) async fn submit_gateway_statuses_v2(
&self,
gateway_results: &[NodeResult],
) -> Result<(), sqlx::Error> {
@@ -714,7 +714,7 @@ impl StorageManager {
/// # Arguments
///
/// * `testing_route`: test route used for this particular network monitor run.
pub(crate) async fn submit_testing_route_used(
pub(super) async fn submit_testing_route_used(
&self,
testing_route: TestingRoute,
) -> Result<(), sqlx::Error> {
@@ -742,7 +742,7 @@ impl StorageManager {
///
/// * `db_mixnode_id`: id (as saved in the database) of the mixnode.
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
pub(crate) async fn get_mixnode_testing_route_presence_count_since(
pub(super) async fn get_mixnode_testing_route_presence_count_since(
&self,
db_mixnode_id: i64,
since: i64,
@@ -781,7 +781,7 @@ impl StorageManager {
///
/// * `gateway_id`: id (as saved in the database) of the gateway.
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
pub(crate) async fn get_gateway_testing_route_presence_count_since(
pub(super) async fn get_gateway_testing_route_presence_count_since(
&self,
gateway_id: i64,
since: i64,
@@ -813,7 +813,7 @@ impl StorageManager {
}
/// Checks whether there are already any historical uptimes with this particular date.
pub(crate) async fn check_for_historical_uptime_existence(
pub(super) async fn check_for_historical_uptime_existence(
&self,
today_iso_8601: &str,
) -> Result<bool, sqlx::Error> {
@@ -833,7 +833,7 @@ impl StorageManager {
/// * `node_id`: id of the mixnode (as inserted in `mixnode_details_id` table).
/// * `date`: date associated with the uptime represented in ISO 8601, i.e. YYYY-MM-DD.
/// * `uptime`: the actual uptime of the node during the specified day.
pub(crate) async fn insert_mixnode_historical_uptime(
pub(super) async fn insert_mixnode_historical_uptime(
&self,
mix_id: i64,
date: &str,
@@ -855,7 +855,7 @@ impl StorageManager {
/// * `node_id`: id of the gateway (as inserted in `gateway_details_id` table).
/// * `date`: date associated with the uptime represented in ISO 8601, i.e. YYYY-MM-DD.
/// * `uptime`: the actual uptime of the node during the specified day.
pub(crate) async fn insert_gateway_historical_uptime(
pub(super) async fn insert_gateway_historical_uptime(
&self,
db_id: i64,
date: &str,
@@ -876,7 +876,7 @@ impl StorageManager {
/// # Arguments
///
/// * `timestamp`: unix timestamp at which the monitor test run has occurred
pub(crate) async fn insert_monitor_run(&self, timestamp: i64) -> Result<i64, sqlx::Error> {
pub(super) async fn insert_monitor_run(&self, timestamp: i64) -> Result<i64, sqlx::Error> {
let res = sqlx::query!("INSERT INTO monitor_run(timestamp) VALUES (?)", timestamp)
.execute(&self.connection_pool)
.await?;
@@ -889,7 +889,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_monitor_runs_count(
pub(super) async fn get_monitor_runs_count(
&self,
since: i64,
until: i64,
@@ -911,7 +911,7 @@ impl StorageManager {
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(crate) async fn purge_old_mixnode_statuses(
pub(super) async fn purge_old_mixnode_statuses(
&self,
timestamp: i64,
) -> Result<(), sqlx::Error> {
@@ -927,7 +927,7 @@ impl StorageManager {
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(crate) async fn purge_old_gateway_statuses(
pub(super) async fn purge_old_gateway_statuses(
&self,
timestamp: i64,
) -> Result<(), sqlx::Error> {
@@ -944,7 +944,7 @@ impl StorageManager {
///
/// * `since`: indicates the lower bound timestamp for deciding whether given mixnode is active
/// * `until`: indicates the upper bound timestamp for deciding whether given mixnode is active
pub(crate) async fn get_all_active_mixnodes_in_interval(
pub(super) async fn get_all_active_mixnodes_in_interval(
&self,
since: i64,
until: i64,
@@ -978,7 +978,7 @@ impl StorageManager {
///
/// * `since`: indicates the lower bound timestamp for deciding whether given gateway is active
/// * `until`: indicates the upper bound timestamp for deciding whether given gateway is active
pub(crate) async fn get_all_active_gateways_in_interval(
pub(super) async fn get_all_active_gateways_in_interval(
&self,
since: i64,
until: i64,
@@ -1025,7 +1025,7 @@ impl StorageManager {
///
/// * `report`: report to insert into the database
#[allow(unused)]
pub(crate) async fn insert_rewarding_report(
pub(super) async fn insert_rewarding_report(
&self,
report: RewardingReport,
) -> Result<(), sqlx::Error> {
@@ -1044,7 +1044,7 @@ impl StorageManager {
}
#[allow(unused)]
pub(crate) async fn get_rewarding_report(
pub(super) async fn get_rewarding_report(
&self,
absolute_epoch_id: EpochId,
) -> Result<Option<RewardingReport>, sqlx::Error> {
@@ -1069,7 +1069,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_all_active_mixnodes_statuses_in_interval(
pub(super) async fn get_all_active_mixnodes_statuses_in_interval(
&self,
since: i64,
until: i64,
@@ -1102,7 +1102,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_all_active_gateways_statuses_in_interval(
pub(super) async fn get_all_active_gateways_statuses_in_interval(
&self,
since: i64,
until: i64,
@@ -1129,7 +1129,7 @@ impl StorageManager {
Ok(active_day_statuses)
}
pub(crate) async fn get_mixnode_details_by_db_id(
pub(super) async fn get_mixnode_details_by_db_id(
&self,
id: i64,
) -> Result<Option<MixnodeDetails>, sqlx::Error> {
@@ -1142,7 +1142,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_gateway_details_by_db_id(
pub(super) async fn get_gateway_details_by_db_id(
&self,
id: i64,
) -> Result<Option<GatewayDetails>, sqlx::Error> {
@@ -1154,7 +1154,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_mixnode_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
pub(super) async fn get_mixnode_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
sqlx::query!(
r#"
SELECT COUNT(*) as count
@@ -1170,7 +1170,7 @@ impl StorageManager {
.map(|record| record.count)
}
pub(crate) async fn get_mixnode_statuses(
pub(super) async fn get_mixnode_statuses(
&self,
mix_id: NodeId,
limit: u32,
@@ -1206,7 +1206,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_gateway_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
pub(super) async fn get_gateway_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
sqlx::query!(
r#"
SELECT COUNT(*) as count
@@ -1222,7 +1222,7 @@ impl StorageManager {
.map(|record| record.count)
}
pub(crate) async fn get_gateway_statuses(
pub(super) async fn get_gateway_statuses(
&self,
gateway_identity: &str,
limit: u32,
+85 -28
View File
@@ -1,6 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use self::manager::{AvgGatewayReliability, AvgMixnodeReliability};
use crate::network_monitor::test_route::TestRoute;
use crate::node_status_api::models::{
GatewayStatusReport, GatewayUptimeHistory, HistoricalUptime as ApiHistoricalUptime,
@@ -12,36 +13,70 @@ use crate::storage::models::{NodeStatus, TestingRoute};
use crate::support::storage::models::{
GatewayDetails, HistoricalUptime, MixnodeDetails, TestedGatewayStatus, TestedMixnodeStatus,
};
use dashmap::DashMap;
use nym_mixnet_contract_common::NodeId;
use nym_types::monitoring::NodeResult;
use sqlx::ConnectOptions;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use time::{Date, OffsetDateTime};
use tracing::log::LevelFilter;
use tracing::{error, info, warn};
use self::manager::{AvgGatewayReliability, AvgMixnodeReliability};
pub(crate) mod manager;
pub(crate) mod models;
#[derive(Default)]
pub(crate) struct DbIdCache {
pub mixnodes_v1: DashMap<NodeId, i64>,
pub gateways_v1: DashMap<NodeId, i64>,
}
impl DbIdCache {
pub(crate) fn mixnode_db_id(&self, node_id: NodeId) -> Option<i64> {
self.mixnodes_v1.get(&node_id).map(|v| *v)
}
pub(crate) fn gateway_db_id(&self, node_id: NodeId) -> Option<i64> {
self.gateways_v1.get(&node_id).map(|v| *v)
}
pub(crate) fn set_mixnode_db_id(&self, node_id: NodeId, db_id: i64) {
self.mixnodes_v1.insert(node_id, db_id);
}
pub(crate) fn set_gateway_db_id(&self, node_id: NodeId, db_id: i64) {
self.gateways_v1.insert(node_id, db_id);
}
}
// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
pub(crate) struct NymApiStorage {
pub manager: StorageManager,
pub db_id_cache: Arc<DbIdCache>,
}
impl NymApiStorage {
pub async fn init<P: AsRef<Path>>(database_path: P) -> Result<Self, NymApiStorageError> {
// TODO: we can inject here more stuff based on our nym-api global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
let connect_opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
.log_statements(LevelFilter::Trace)
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(250));
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
let pool_opts = sqlx::sqlite::SqlitePoolOptions::new()
.min_connections(5)
.max_connections(25)
.acquire_timeout(Duration::from_secs(60));
let connection_pool = match pool_opts.connect_with(connect_opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
@@ -58,32 +93,38 @@ impl NymApiStorage {
let storage = NymApiStorage {
manager: StorageManager { connection_pool },
db_id_cache: Arc::new(Default::default()),
};
Ok(storage)
}
#[allow(unused)]
pub(crate) async fn mix_identity_to_mix_ids(
pub(crate) async fn get_mixnode_database_id(
&self,
identity: &str,
) -> Result<Vec<NodeId>, NymApiStorageError> {
Ok(self
.manager
.get_mixnode_mix_ids_by_identity(identity)
.await?)
node_id: NodeId,
) -> Result<Option<i64>, NymApiStorageError> {
if let Some(cached) = self.db_id_cache.mixnode_db_id(node_id) {
return Ok(Some(cached));
}
if let Some(retrieved) = self.manager.get_mixnode_database_id(node_id).await? {
self.db_id_cache.set_mixnode_db_id(node_id, retrieved);
return Ok(Some(retrieved));
}
Ok(None)
}
#[allow(unused)]
pub(crate) async fn mix_identity_to_latest_mix_id(
pub(crate) async fn get_gateway_database_id(
&self,
identity: &str,
) -> Result<Option<NodeId>, NymApiStorageError> {
Ok(self
.mix_identity_to_mix_ids(identity)
.await?
.into_iter()
.max())
node_id: NodeId,
) -> Result<Option<i64>, NymApiStorageError> {
if let Some(cached) = self.db_id_cache.gateway_db_id(node_id) {
return Ok(Some(cached));
}
if let Some(retrieved) = self.manager.get_gateway_database_id(node_id).await? {
self.db_id_cache.set_gateway_db_id(node_id, retrieved);
return Ok(Some(retrieved));
}
Ok(None)
}
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
@@ -536,7 +577,6 @@ impl NymApiStorage {
// we MUST have those entries in the database, otherwise the route wouldn't have been chosen
// in the first place
let layer1_mix_db_id = self
.manager
.get_mixnode_database_id(test_route.layer_one_mix().mix_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -544,7 +584,6 @@ impl NymApiStorage {
})?;
let layer2_mix_db_id = self
.manager
.get_mixnode_database_id(test_route.layer_two_mix().mix_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -552,7 +591,6 @@ impl NymApiStorage {
})?;
let layer3_mix_db_id = self
.manager
.get_mixnode_database_id(test_route.layer_three_mix().mix_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -560,7 +598,6 @@ impl NymApiStorage {
})?;
let gateway_db_id = self
.manager
.get_gateway_database_id(test_route.gateway().node_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -661,11 +698,11 @@ impl NymApiStorage {
let monitor_run_id = self.manager.insert_monitor_run(now).await?;
self.manager
.submit_mixnode_statuses(now, mixnode_results)
.submit_mixnode_statuses(now, mixnode_results, &self.db_id_cache)
.await?;
self.manager
.submit_gateway_statuses(now, gateway_results)
.submit_gateway_statuses(now, gateway_results, &self.db_id_cache)
.await?;
for test_route in test_routes {
@@ -675,6 +712,26 @@ impl NymApiStorage {
Ok(())
}
pub(crate) async fn submit_mixnode_statuses_v2(
&self,
mixnode_results: &[NodeResult],
) -> Result<(), NymApiStorageError> {
self.manager
.submit_mixnode_statuses_v2(mixnode_results)
.await?;
Ok(())
}
pub(crate) async fn submit_gateway_statuses_v2(
&self,
gateway_results: &[NodeResult],
) -> Result<(), NymApiStorageError> {
self.manager
.submit_gateway_statuses_v2(gateway_results)
.await?;
Ok(())
}
/// Obtains number of network monitor test runs that have occurred within the specified interval.
///
/// # Arguments