Merge branch 'release/2024.13-magura-patched' into develop

This commit is contained in:
Jędrzej Stuczyński
2024-11-22 15:53:59 +00:00
31 changed files with 398 additions and 187 deletions
+6
View File
@@ -4,6 +4,12 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2024.13-magura-patched] (2024-11-22)
- [experimental] allow clients to change between deterministic route selection based on packet headers and a pseudorandom distribution
- Introduced a configurable limit on retransmission frequency of packets if ACKs are not received
- Filtered out invalid IP addresses on nym-api
## [2024.13-magura] (2024-11-18)
- Limit race probability ([#5145])
Generated
+5 -4
View File
@@ -4470,6 +4470,7 @@ dependencies = [
"cw2",
"cw3",
"cw4",
"dashmap",
"dirs",
"futures",
"getset",
@@ -4781,7 +4782,7 @@ dependencies = [
[[package]]
name = "nym-client"
version = "1.1.43"
version = "1.1.44"
dependencies = [
"bs58",
"clap 4.5.20",
@@ -5898,7 +5899,7 @@ dependencies = [
[[package]]
name = "nym-network-requester"
version = "1.1.44"
version = "1.1.45"
dependencies = [
"addr",
"anyhow",
@@ -5949,7 +5950,7 @@ dependencies = [
[[package]]
name = "nym-node"
version = "1.1.10"
version = "1.1.11"
dependencies = [
"anyhow",
"bip39",
@@ -6305,7 +6306,7 @@ dependencies = [
[[package]]
name = "nym-socks5-client"
version = "1.1.43"
version = "1.1.44"
dependencies = [
"bs58",
"clap 4.5.20",
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.43"
version = "1.1.44"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.43"
version = "1.1.44"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
+10 -1
View File
@@ -393,13 +393,20 @@ pub struct Traffic {
/// poisson distribution.
pub disable_main_poisson_packet_distribution: bool,
/// Specify whether route selection should be determined by the packet header.
pub deterministic_route_selection: bool,
/// Specify how many times particular packet can be retransmitted
/// None - no limit
pub maximum_number_of_retransmissions: Option<u32>,
/// Specifies the packet size used for sent messages.
/// Do not override it unless you understand the consequences of that change.
pub primary_packet_size: PacketSize,
/// Specifies the optional auxiliary packet size for optimizing message streams.
/// Note that its use decreases overall anonymity.
/// Do not set it it unless you understand the consequences of that change.
/// Do not set it unless you understand the consequences of that change.
pub secondary_packet_size: Option<PacketSize>,
pub packet_type: PacketType,
@@ -424,6 +431,8 @@ impl Default for Traffic {
average_packet_delay: DEFAULT_AVERAGE_PACKET_DELAY,
message_sending_average_delay: DEFAULT_MESSAGE_STREAM_AVERAGE_DELAY,
disable_main_poisson_packet_distribution: false,
deterministic_route_selection: false,
maximum_number_of_retransmissions: None,
primary_packet_size: PacketSize::RegularPacket,
secondary_packet_size: None,
packet_type: PacketType::Mix,
@@ -111,6 +111,7 @@ impl From<ConfigV5> for Config {
primary_packet_size: value.debug.traffic.primary_packet_size,
secondary_packet_size: value.debug.traffic.secondary_packet_size,
packet_type: value.debug.traffic.packet_type,
..Default::default()
},
cover_traffic: CoverTraffic {
loop_cover_traffic_average_delay: value
@@ -30,7 +30,8 @@ pub(crate) enum Action {
InsertPending(Vec<PendingAcknowledgement>),
/// Removes given `PendingAcknowledgement` from the 'shared' state. Also cancels the retransmission timer.
/// Initiated by `AcknowledgementListener`
/// Initiated by `AcknowledgementListener` upon receiving the acknowledgement. Also by `RetransmissionRequestListener`
/// upon deciding to abandon the data.
RemovePending(FragmentIdentifier),
/// Starts the retransmission timer on given `PendingAcknowledgement` with the `Duration` based on
@@ -41,7 +42,7 @@ pub(crate) enum Action {
/// Updates the expected delay of given `PendingAcknowledgement` with the new provided `SphinxDelay`.
/// Initiated by `RetransmissionRequestListener`
UpdateDelay(FragmentIdentifier, SphinxDelay),
UpdatePendingAck(FragmentIdentifier, SphinxDelay),
}
impl Action {
@@ -57,8 +58,8 @@ impl Action {
Action::StartTimer(frag_id)
}
pub(crate) fn new_update_delay(frag_id: FragmentIdentifier, delay: SphinxDelay) -> Self {
Action::UpdateDelay(frag_id, delay)
pub(crate) fn new_update_pending_ack(frag_id: FragmentIdentifier, delay: SphinxDelay) -> Self {
Action::UpdatePendingAck(frag_id, delay)
}
}
@@ -135,7 +136,7 @@ impl ActionController {
}
fn handle_start_timer(&mut self, frag_id: FragmentIdentifier) {
trace!("{} is starting its timer", frag_id);
trace!("{frag_id} is starting its timer");
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
// the fact that this branch is now POSSIBLE is a sign of a need to refactor this whole
@@ -193,7 +194,7 @@ impl ActionController {
// initiated basically as a first step of retransmission. At first data has its delay updated
// (as new sphinx packet was created with new expected delivery time)
fn handle_update_delay(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
fn handle_update_pending_ack(&mut self, frag_id: FragmentIdentifier, delay: SphinxDelay) {
trace!("{} is updating its delay", frag_id);
// TODO: is it possible to solve this without either locking or temporarily removing the value?
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.remove(&frag_id) {
@@ -202,7 +203,7 @@ impl ActionController {
// reference to this Arc. HOWEVER, before the Action was pushed onto the queue, the reference
// was dropped hence this unwrap is safe.
let mut inner_data = Arc::try_unwrap(pending_ack_data).unwrap();
inner_data.update_delay(delay);
inner_data.update_retransmitted(delay);
self.pending_acks_data
.insert(frag_id, (Arc::new(inner_data), queue_key));
@@ -225,7 +226,7 @@ impl ActionController {
// about it. Perhaps just reschedule it at later point?
let frag_id = expired_ack.into_inner();
trace!("{} has expired", frag_id);
trace!("{frag_id} has expired");
if let Some((pending_ack_data, queue_key)) = self.pending_acks_data.get_mut(&frag_id) {
if queue_key.is_none() {
@@ -258,7 +259,9 @@ impl ActionController {
Action::InsertPending(pending_acks) => self.handle_insert(pending_acks),
Action::RemovePending(frag_id) => self.handle_remove(frag_id),
Action::StartTimer(frag_id) => self.handle_start_timer(frag_id),
Action::UpdateDelay(frag_id, delay) => self.handle_update_delay(frag_id, delay),
Action::UpdatePendingAck(frag_id, delay) => {
self.handle_update_pending_ack(frag_id, delay)
}
}
}
@@ -71,6 +71,7 @@ pub(crate) struct PendingAcknowledgement {
delay: SphinxDelay,
destination: PacketDestination,
mix_hops: Option<u8>,
retransmissions: u32,
}
impl PendingAcknowledgement {
@@ -86,6 +87,7 @@ impl PendingAcknowledgement {
delay,
destination: PacketDestination::KnownRecipient(recipient.into()),
mix_hops,
retransmissions: 0,
}
}
@@ -105,6 +107,7 @@ impl PendingAcknowledgement {
// Messages sent using SURBs are using the number of mix hops set by the recipient when
// they provided the SURBs, so it doesn't make sense to include it here.
mix_hops: None,
retransmissions: 0,
}
}
@@ -116,8 +119,9 @@ impl PendingAcknowledgement {
self.message_chunk.clone()
}
fn update_delay(&mut self, new_delay: SphinxDelay) {
fn update_retransmitted(&mut self, new_delay: SphinxDelay) {
self.delay = new_delay;
self.retransmissions += 1;
}
}
@@ -163,6 +167,9 @@ impl AcknowledgementControllerConnectors {
/// Configurable parameters of the `AcknowledgementController`
pub(super) struct Config {
/// Specify how many times particular packet can be retransmitted
maximum_retransmissions: Option<u32>,
/// Given ack timeout in the form a * BASE_DELAY + b, it specifies the additive part `b`
ack_wait_addition: Duration,
@@ -174,8 +181,13 @@ pub(super) struct Config {
}
impl Config {
pub(super) fn new(ack_wait_addition: Duration, ack_wait_multiplier: f64) -> Self {
pub(super) fn new(
maximum_retransmissions: Option<u32>,
ack_wait_addition: Duration,
ack_wait_multiplier: f64,
) -> Self {
Config {
maximum_retransmissions,
ack_wait_addition,
ack_wait_multiplier,
packet_size: Default::default(),
@@ -238,6 +250,7 @@ where
// will listen for any ack timeouts and trigger retransmission
let retransmission_request_listener = RetransmissionRequestListener::new(
config.maximum_retransmissions,
connectors.ack_action_sender.clone(),
message_handler,
retransmission_rx,
@@ -20,6 +20,7 @@ use std::sync::{Arc, Weak};
// responsible for packet retransmission upon fired timer
pub(super) struct RetransmissionRequestListener<R> {
maximum_retransmissions: Option<u32>,
action_sender: AckActionSender,
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
@@ -31,12 +32,14 @@ where
R: CryptoRng + Rng,
{
pub(super) fn new(
maximum_retransmissions: Option<u32>,
action_sender: AckActionSender,
message_handler: MessageHandler<R>,
request_receiver: RetransmissionRequestReceiver,
reply_controller_sender: ReplyControllerSender,
) -> Self {
RetransmissionRequestListener {
maximum_retransmissions,
action_sender,
message_handler,
request_receiver,
@@ -77,6 +80,18 @@ where
}
};
let frag_id = timed_out_ack.message_chunk.fragment_identifier();
if let Some(limit) = self.maximum_retransmissions {
if timed_out_ack.retransmissions >= limit {
warn!("reached maximum number of allowed retransmissions for the packet");
self.action_sender
.unbounded_send(Action::new_remove(frag_id))
.unwrap();
return;
}
}
let maybe_prepared_fragment = match &timed_out_ack.destination {
PacketDestination::Anonymous {
recipient_tag,
@@ -101,8 +116,6 @@ where
}
};
let frag_id = timed_out_ack.message_chunk.fragment_identifier();
let prepared_fragment = match maybe_prepared_fragment {
Ok(prepared_fragment) => prepared_fragment,
Err(err) => {
@@ -136,7 +149,7 @@ where
// with the additional poisson delay.
// And since Actions are executed in order `UpdateTimer` will HAVE TO be executed before `StartTimer`
self.action_sender
.unbounded_send(Action::new_update_delay(frag_id, new_delay))
.unbounded_send(Action::new_update_pending_ack(frag_id, new_delay))
.unwrap();
// send to `OutQueueControl` to eventually send to the mix network
@@ -91,6 +91,9 @@ pub(crate) struct Config {
/// and surb-based are going to be sent.
sender_address: Recipient,
/// Specify whether route selection should be determined by the packet header.
deterministic_route_selection: bool,
/// Average delay a data packet is going to get delay at a single mixnode.
average_packet_delay: Duration,
@@ -114,10 +117,12 @@ impl Config {
sender_address: Recipient,
average_packet_delay: Duration,
average_ack_delay: Duration,
deterministic_route_selection: bool,
) -> Self {
Config {
ack_key,
sender_address,
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
@@ -176,6 +181,7 @@ where
{
let message_preparer = MessagePreparer::new(
rng,
config.deterministic_route_selection,
config.sender_address,
config.average_packet_delay,
config.average_ack_delay,
@@ -634,7 +640,7 @@ where
pub(crate) fn update_ack_delay(&self, id: FragmentIdentifier, new_delay: Delay) {
self.action_sender
.unbounded_send(Action::UpdateDelay(id, new_delay))
.unbounded_send(Action::UpdatePendingAck(id, new_delay))
.expect("action control task has died")
}
@@ -65,6 +65,7 @@ pub struct Config {
impl<'a> From<&'a Config> for acknowledgement_control::Config {
fn from(cfg: &'a Config) -> Self {
acknowledgement_control::Config::new(
cfg.traffic.maximum_number_of_retransmissions,
cfg.acks.ack_wait_addition,
cfg.acks.ack_wait_multiplier,
)
@@ -97,6 +98,7 @@ impl<'a> From<&'a Config> for message_handler::Config {
cfg.self_recipient,
cfg.traffic.average_packet_delay,
cfg.acks.average_ack_delay,
cfg.traffic.deterministic_route_selection,
)
.with_custom_primary_packet_size(cfg.traffic.primary_packet_size)
.with_custom_secondary_packet_size(cfg.traffic.secondary_packet_size)
+14 -4
View File
@@ -29,6 +29,9 @@ pub struct NodeTester<R> {
packet_size: PacketSize,
/// Specify whether route selection should be determined by the packet header.
deterministic_route_selection: bool,
/// Average delay a data packet is going to get delay at a single mixnode.
average_packet_delay: Duration,
@@ -48,11 +51,13 @@ impl<R> NodeTester<R>
where
R: Rng + CryptoRng,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
rng: R,
base_topology: NymTopology,
self_address: Option<Recipient>,
packet_size: PacketSize,
deterministic_route_selection: bool,
average_packet_delay: Duration,
average_ack_delay: Duration,
ack_key: Arc<AckKey>,
@@ -62,6 +67,7 @@ where
base_topology,
self_address,
packet_size,
deterministic_route_selection,
average_packet_delay,
average_ack_delay,
num_mix_hops: DEFAULT_NUM_MIX_HOPS,
@@ -289,10 +295,18 @@ where
impl<R: CryptoRng + Rng> FragmentPreparer for NodeTester<R> {
type Rng = R;
fn deterministic_route_selection(&self) -> bool {
self.deterministic_route_selection
}
fn rng(&mut self) -> &mut Self::Rng {
&mut self.rng
}
fn nonce(&self) -> i32 {
1
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
@@ -304,8 +318,4 @@ impl<R: CryptoRng + Rng> FragmentPreparer for NodeTester<R> {
fn average_ack_delay(&self) -> Duration {
self.average_ack_delay
}
fn nonce(&self) -> i32 {
1
}
}
+28 -10
View File
@@ -18,7 +18,7 @@ use nym_sphinx_params::{PacketType, ReplySurbKeyDigestAlgorithm, DEFAULT_NUM_MIX
use nym_sphinx_types::{Delay, NymPacket};
use nym_topology::{NymTopology, NymTopologyError};
use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaCha20Rng;
use rand_chacha::ChaCha8Rng;
use nym_sphinx_chunking::monitoring;
use std::time::Duration;
@@ -51,6 +51,7 @@ impl From<PreparedFragment> for MixPacket {
pub trait FragmentPreparer {
type Rng: CryptoRng + Rng;
fn deterministic_route_selection(&self) -> bool;
fn rng(&mut self) -> &mut Self::Rng;
fn nonce(&self) -> i32;
fn num_mix_hops(&self) -> u8;
@@ -201,9 +202,7 @@ pub trait FragmentPreparer {
// could perform diffie-hellman with its own keys followed by a kdf to re-derive
// the packet encryption key
let seed = fragment.seed().wrapping_mul(self.nonce());
let mut rng = ChaCha20Rng::seed_from_u64(seed as u64);
let fragment_header = fragment.header();
let destination = packet_recipient.gateway();
let hops = mix_hops.unwrap_or(self.num_mix_hops());
monitoring::fragment_sent(&fragment, self.nonce(), *destination, hops);
@@ -241,8 +240,18 @@ pub trait FragmentPreparer {
};
// generate pseudorandom route for the packet
log::trace!("Preparing chunk for sending with {} mix hops", hops);
let route = topology.random_route_to_gateway(&mut rng, hops, destination)?;
log::trace!("Preparing chunk for sending with {hops} mix hops");
let route = if self.deterministic_route_selection() {
log::trace!("using deterministic route selection");
let seed = fragment_header.seed().wrapping_mul(self.nonce());
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
topology.random_route_to_gateway(&mut rng, hops, destination)?
} else {
log::trace!("using pseudorandom route selection");
let mut rng = self.rng();
topology.random_route_to_gateway(&mut rng, hops, destination)?
};
let destination = packet_recipient.as_sphinx_destination();
// including set of delays
@@ -313,6 +322,9 @@ pub struct MessagePreparer<R> {
/// Instance of a cryptographically secure random number generator.
rng: R,
/// Specify whether route selection should be determined by the packet header.
deterministic_route_selection: bool,
/// Address of this client which also represent an address to which all acknowledgements
/// and surb-based are going to be sent.
sender_address: Recipient,
@@ -336,6 +348,7 @@ where
{
pub fn new(
rng: R,
deterministic_route_selection: bool,
sender_address: Recipient,
average_packet_delay: Duration,
average_ack_delay: Duration,
@@ -344,6 +357,7 @@ where
let nonce = rng.gen();
MessagePreparer {
rng,
deterministic_route_selection,
sender_address,
average_packet_delay,
average_ack_delay,
@@ -457,10 +471,18 @@ where
impl<R: CryptoRng + Rng> FragmentPreparer for MessagePreparer<R> {
type Rng = R;
fn deterministic_route_selection(&self) -> bool {
self.deterministic_route_selection
}
fn rng(&mut self) -> &mut Self::Rng {
&mut self.rng
}
fn nonce(&self) -> i32 {
self.nonce
}
fn num_mix_hops(&self) -> u8 {
self.num_mix_hops
}
@@ -472,10 +494,6 @@ impl<R: CryptoRng + Rng> FragmentPreparer for MessagePreparer<R> {
fn average_ack_delay(&self) -> Duration {
self.average_ack_delay
}
fn nonce(&self) -> i32 {
self.nonce
}
}
/*
+11
View File
@@ -162,6 +162,13 @@ pub struct TrafficWasm {
/// a loop cover message is sent instead in order to preserve the rate.
pub message_sending_average_delay_ms: u32,
/// Specify how many times particular packet can be retransmitted
/// None - no limit
pub maximum_number_of_retransmissions: Option<u32>,
/// Specify whether route selection should be determined by the packet header.
pub deterministic_route_selection: bool,
/// Controls whether the main packet stream constantly produces packets according to the predefined
/// poisson distribution.
pub disable_main_poisson_packet_distribution: bool,
@@ -196,6 +203,8 @@ impl From<TrafficWasm> for ConfigTraffic {
message_sending_average_delay: Duration::from_millis(
traffic.message_sending_average_delay_ms as u64,
),
deterministic_route_selection: traffic.deterministic_route_selection,
maximum_number_of_retransmissions: traffic.maximum_number_of_retransmissions,
disable_main_poisson_packet_distribution: traffic
.disable_main_poisson_packet_distribution,
primary_packet_size: PacketSize::RegularPacket,
@@ -211,6 +220,8 @@ impl From<ConfigTraffic> for TrafficWasm {
average_packet_delay_ms: traffic.average_packet_delay.as_millis() as u32,
message_sending_average_delay_ms: traffic.message_sending_average_delay.as_millis()
as u32,
deterministic_route_selection: traffic.deterministic_route_selection,
maximum_number_of_retransmissions: traffic.maximum_number_of_retransmissions,
disable_main_poisson_packet_distribution: traffic
.disable_main_poisson_packet_distribution,
use_extended_packet_size: traffic.secondary_packet_size.is_some(),
@@ -88,6 +88,14 @@ pub struct TrafficWasmOverride {
#[tsify(optional)]
pub message_sending_average_delay_ms: Option<u32>,
/// Specify how many times particular packet can be retransmitted
#[tsify(optional)]
pub maximum_number_of_retransmissions: Option<u32>,
/// Specify whether route selection should be determined by the packet header.
#[tsify(optional)]
pub deterministic_route_selection: Option<bool>,
/// Controls whether the main packet stream constantly produces packets according to the predefined
/// poisson distribution.
#[tsify(optional)]
@@ -113,6 +121,10 @@ impl From<TrafficWasmOverride> for TrafficWasm {
message_sending_average_delay_ms: value
.message_sending_average_delay_ms
.unwrap_or(def.message_sending_average_delay_ms),
maximum_number_of_retransmissions: value.maximum_number_of_retransmissions,
deterministic_route_selection: value
.deterministic_route_selection
.unwrap_or(def.deterministic_route_selection),
disable_main_poisson_packet_distribution: value
.disable_main_poisson_packet_distribution
.unwrap_or(def.disable_main_poisson_packet_distribution),
+1
View File
@@ -20,6 +20,7 @@ bloomfilter = { workspace = true }
cfg-if = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env"] }
console-subscriber = { workspace = true, optional = true } # validator-api needs to be built with RUSTFLAGS="--cfg tokio_unstable"
dashmap = { workspace = true }
dirs = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
+2 -2
View File
@@ -13,7 +13,7 @@ use crate::node_status_api::NodeStatusCache;
use crate::nym_contract_cache::cache::NymContractCache;
use crate::support::caching::cache::SharedCache;
use crate::support::config;
use crate::support::http::state::AppState;
use crate::support::http::state::{AppState, ForcedRefresh};
use crate::support::storage::NymApiStorage;
use async_trait::async_trait;
use axum::Router;
@@ -1264,7 +1264,7 @@ struct TestFixture {
impl TestFixture {
fn build_app_state(storage: NymApiStorage) -> AppState {
AppState {
forced_refresh: Default::default(),
forced_refresh: ForcedRefresh::new(true),
nym_contract_cache: NymContractCache::new(),
node_status_cache: NodeStatusCache::new(),
circulating_supply_cache: CirculatingSupplyCache::new("unym".to_owned()),
+1 -5
View File
@@ -14,7 +14,6 @@ use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::MessageReceiver;
use nym_task::TaskClient;
use std::collections::{HashMap, HashSet};
use std::process;
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, error, info, trace};
@@ -95,10 +94,7 @@ impl<R: MessageReceiver + Send> Monitor<R> {
)
.await
{
error!("Failed to submit monitor run information to the database - {err}",);
// TODO: slightly more graceful shutdown here
process::exit(1);
error!("Failed to submit monitor run information to the database: {err}",);
}
}
@@ -121,6 +121,7 @@ impl PacketPreparer {
test_route.topology().clone(),
self_address,
PacketSize::RegularPacket,
false,
DEFAULT_AVERAGE_PACKET_DELAY,
DEFAULT_AVERAGE_ACK_DELAY,
self.ack_key.clone(),
+37 -12
View File
@@ -57,6 +57,9 @@ pub enum NodeDescribeCacheError {
// TODO: perhaps include more details here like whether key/signature/payload was malformed
#[error("could not verify signed host information for node {node_id}")]
MissignedHostInformation { node_id: NodeId },
#[error("node {node_id} is announcing an illegal ip address")]
IllegalIpAddress { node_id: NodeId },
}
// this exists because I've been moving things around quite a lot and now the place that holds the type
@@ -199,13 +202,18 @@ impl DescribedNodes {
pub struct NodeDescriptionProvider {
contract_cache: NymContractCache,
allow_all_ips: bool,
batch_size: usize,
}
impl NodeDescriptionProvider {
pub(crate) fn new(contract_cache: NymContractCache) -> NodeDescriptionProvider {
pub(crate) fn new(
contract_cache: NymContractCache,
allow_all_ips: bool,
) -> NodeDescriptionProvider {
NodeDescriptionProvider {
contract_cache,
allow_all_ips,
batch_size: DEFAULT_NODE_DESCRIBE_BATCH_SIZE,
}
}
@@ -270,6 +278,7 @@ async fn try_get_client(
async fn try_get_description(
data: RefreshData,
allow_all_ips: bool,
) -> Result<NymNodeDescription, NodeDescribeCacheError> {
let client = try_get_client(&data.host, data.node_id, data.port).await?;
@@ -286,6 +295,12 @@ async fn try_get_description(
});
}
if !allow_all_ips && !host_info.data.check_ips() {
return Err(NodeDescribeCacheError::IllegalIpAddress {
node_id: data.node_id,
});
}
let node_info = query_for_described_data(&client, data.node_id).await?;
let description = node_info.into_node_description(host_info.data);
@@ -357,8 +372,8 @@ impl RefreshData {
self.node_id
}
pub(crate) async fn try_refresh(self) -> Option<NymNodeDescription> {
match try_get_description(self).await {
pub(crate) async fn try_refresh(self, allow_all_ips: bool) -> Option<NymNodeDescription> {
match try_get_description(self, allow_all_ips).await {
Ok(description) => Some(description),
Err(err) => {
debug!("failed to obtain node self-described data: {err}");
@@ -412,11 +427,15 @@ impl CacheItemProvider for NodeDescriptionProvider {
}
}
let nodes = stream::iter(nodes_to_query.into_iter().map(|n| n.try_refresh()))
.buffer_unordered(self.batch_size)
.filter_map(|x| async move { x.map(|d| (d.node_id, d)) })
.collect::<HashMap<_, _>>()
.await;
let nodes = stream::iter(
nodes_to_query
.into_iter()
.map(|n| n.try_refresh(self.allow_all_ips)),
)
.buffer_unordered(self.batch_size)
.filter_map(|x| async move { x.map(|d| (d.node_id, d)) })
.collect::<HashMap<_, _>>()
.await;
info!("refreshed self described data for {} nodes", nodes.len());
@@ -432,8 +451,11 @@ pub(crate) fn new_refresher(
) -> CacheRefresher<DescribedNodes, NodeDescribeCacheError> {
CacheRefresher::new(
Box::new(
NodeDescriptionProvider::new(contract_cache)
.with_batch_size(config.debug.node_describe_batch_size),
NodeDescriptionProvider::new(
contract_cache,
config.debug.node_describe_allow_illegal_ips,
)
.with_batch_size(config.debug.node_describe_batch_size),
),
config.debug.node_describe_caching_interval,
)
@@ -446,8 +468,11 @@ pub(crate) fn new_refresher_with_initial_value(
) -> CacheRefresher<DescribedNodes, NodeDescribeCacheError> {
CacheRefresher::new_with_initial_value(
Box::new(
NodeDescriptionProvider::new(contract_cache)
.with_batch_size(config.debug.node_describe_batch_size),
NodeDescriptionProvider::new(
contract_cache,
config.debug.node_describe_allow_illegal_ips,
)
.with_batch_size(config.debug.node_describe_batch_size),
),
config.debug.node_describe_caching_interval,
initial,
@@ -88,7 +88,6 @@ pub(crate) async fn submit_gateway_monitoring_results(
match state
.storage
.manager
.submit_gateway_statuses_v2(message.results())
.await
{
@@ -133,7 +132,6 @@ pub(crate) async fn submit_node_monitoring_results(
match state
.storage
.manager
.submit_mixnode_statuses_v2(message.results())
.await
{
+2 -1
View File
@@ -86,8 +86,9 @@ async fn refresh_described(
}
// to make sure you can't ddos the endpoint while a request is in progress
state.forced_refresh.set_last_refreshed(node_id).await;
let allow_all_ips = state.forced_refresh.allow_all_ip_addresses;
if let Some(updated_data) = refresh_data.try_refresh().await {
if let Some(updated_data) = refresh_data.try_refresh(allow_all_ips).await {
let Ok(mut describe_cache) = state.described_nodes_cache.write().await else {
return Err(AxumErrorResponse::service_unavailable());
};
+6 -2
View File
@@ -21,7 +21,9 @@ use crate::status::{ApiStatusState, SignerState};
use crate::support::caching::cache::SharedCache;
use crate::support::config::helpers::try_load_current_config;
use crate::support::config::Config;
use crate::support::http::state::{AppState, ShutdownHandles, TASK_MANAGER_TIMEOUT_S};
use crate::support::http::state::{
AppState, ForcedRefresh, ShutdownHandles, TASK_MANAGER_TIMEOUT_S,
};
use crate::support::http::RouterBuilder;
use crate::support::nyxd;
use crate::support::storage::runtime_migrations::m001_directory_services_v2_1::migrate_to_directory_services_v2_1;
@@ -188,7 +190,9 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result<ShutdownHan
};
let router = router.with_state(AppState {
forced_refresh: Default::default(),
forced_refresh: ForcedRefresh::new(
config.topology_cacher.debug.node_describe_allow_illegal_ips,
),
nym_contract_cache: nym_contract_cache_state.clone(),
node_status_cache: node_status_cache_state.clone(),
circulating_supply_cache: circulating_supply_cache.clone(),
+3
View File
@@ -439,6 +439,8 @@ pub struct TopologyCacherDebug {
pub node_describe_caching_interval: Duration,
pub node_describe_batch_size: usize,
pub node_describe_allow_illegal_ips: bool,
}
impl Default for TopologyCacherDebug {
@@ -447,6 +449,7 @@ impl Default for TopologyCacherDebug {
caching_interval: DEFAULT_TOPOLOGY_CACHE_INTERVAL,
node_describe_caching_interval: DEFAULT_NODE_DESCRIBE_CACHE_INTERVAL,
node_describe_batch_size: DEFAULT_NODE_DESCRIBE_BATCH_SIZE,
node_describe_allow_illegal_ips: false,
}
}
}
+9 -1
View File
@@ -82,12 +82,20 @@ pub(crate) struct AppState {
pub(crate) node_info_cache: unstable::NodeInfoCache,
}
#[derive(Clone, Default)]
#[derive(Clone)]
pub(crate) struct ForcedRefresh {
pub(crate) allow_all_ip_addresses: bool,
pub(crate) refreshes: Arc<RwLock<HashMap<NodeId, OffsetDateTime>>>,
}
impl ForcedRefresh {
pub(crate) fn new(allow_all_ip_addresses: bool) -> ForcedRefresh {
ForcedRefresh {
allow_all_ip_addresses,
refreshes: Arc::new(Default::default()),
}
}
pub(crate) async fn last_refreshed(&self, node_id: NodeId) -> Option<OffsetDateTime> {
self.refreshes.read().await.get(&node_id).copied()
}
+96 -96
View File
@@ -7,6 +7,7 @@ use crate::support::storage::models::{
ActiveGateway, ActiveMixnode, GatewayDetails, HistoricalUptime, MixnodeDetails, NodeStatus,
RewardingReport, TestedGatewayStatus, TestedMixnodeStatus, TestingRoute,
};
use crate::support::storage::DbIdCache;
use nym_mixnet_contract_common::{EpochId, IdentityKey, NodeId};
use nym_types::monitoring::NodeResult;
use sqlx::FromRow;
@@ -51,24 +52,7 @@ impl AvgGatewayReliability {
// all SQL goes here
impl StorageManager {
pub(crate) async fn get_mixnode_mix_ids_by_identity(
&self,
identity: &str,
) -> Result<Vec<NodeId>, sqlx::Error> {
let ids = sqlx::query!(
r#"SELECT mix_id as "mix_id: NodeId" FROM mixnode_details WHERE identity_key = ?"#,
identity
)
.fetch_all(&self.connection_pool)
.await?
.into_iter()
.map(|row| row.mix_id)
.collect();
Ok(ids)
}
pub(crate) async fn get_all_avg_mix_reliability_in_last_24hr(
pub(super) async fn get_all_avg_mix_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgMixnodeReliability>, sqlx::Error> {
@@ -77,7 +61,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
pub(super) async fn get_all_avg_gateway_reliability_in_last_24hr(
&self,
end_ts_secs: i64,
) -> Result<Vec<AvgGatewayReliability>, sqlx::Error> {
@@ -86,7 +70,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_all_avg_mix_reliability_in_time_interval(
pub(super) async fn get_all_avg_mix_reliability_in_time_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
@@ -114,7 +98,7 @@ impl StorageManager {
Ok(result)
}
pub(crate) async fn get_all_avg_gateway_reliability_in_interval(
pub(super) async fn get_all_avg_gateway_reliability_in_interval(
&self,
start_ts_secs: i64,
end_ts_secs: i64,
@@ -147,7 +131,7 @@ impl StorageManager {
/// # Arguments
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
pub(crate) async fn get_mixnode_database_id(
pub(super) async fn get_mixnode_database_id(
&self,
mix_id: NodeId,
) -> Result<Option<i64>, sqlx::Error> {
@@ -159,7 +143,7 @@ impl StorageManager {
Ok(id)
}
pub(crate) async fn get_gateway_database_id(
pub(super) async fn get_gateway_database_id(
&self,
node_id: NodeId,
) -> Result<Option<i64>, sqlx::Error> {
@@ -172,7 +156,7 @@ impl StorageManager {
}
/// Tries to obtain row id of given gateway given its identity
pub(crate) async fn get_gateway_database_id_by_identity(
pub(super) async fn get_gateway_database_id_by_identity(
&self,
identity: &str,
) -> Result<Option<i64>, sqlx::Error> {
@@ -187,7 +171,7 @@ impl StorageManager {
Ok(id)
}
pub(crate) async fn get_gateway_node_id_from_identity_key(
pub(super) async fn get_gateway_node_id_from_identity_key(
&self,
identity: &str,
) -> Result<Option<NodeId>, sqlx::Error> {
@@ -202,7 +186,7 @@ impl StorageManager {
Ok(node_id)
}
pub(crate) async fn get_gateway_identity_key(
pub(super) async fn get_gateway_identity_key(
&self,
node_id: NodeId,
) -> Result<Option<IdentityKey>, sqlx::Error> {
@@ -222,7 +206,7 @@ impl StorageManager {
/// # Arguments
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
pub(crate) async fn get_mixnode_identity_key(
pub(super) async fn get_mixnode_identity_key(
&self,
mix_id: NodeId,
) -> Result<Option<IdentityKey>, sqlx::Error> {
@@ -244,7 +228,7 @@ impl StorageManager {
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
/// * `timestamp`: unix timestamp of the lower bound of the selection.
pub(crate) async fn get_mixnode_statuses_since(
pub(super) async fn get_mixnode_statuses_since(
&self,
mix_id: NodeId,
timestamp: i64,
@@ -272,7 +256,7 @@ impl StorageManager {
///
/// * `identity`: identity (base58-encoded public key) of the gateway.
/// * `timestamp`: unix timestamp of the lower bound of the selection.
pub(crate) async fn get_gateway_statuses_since(
pub(super) async fn get_gateway_statuses_since(
&self,
node_id: NodeId,
timestamp: i64,
@@ -298,7 +282,7 @@ impl StorageManager {
/// # Arguments
///
/// * `mix_id`: mix-id (as assigned by the smart contract) of the mixnode.
pub(crate) async fn get_mixnode_historical_uptimes(
pub(super) async fn get_mixnode_historical_uptimes(
&self,
mix_id: NodeId,
) -> Result<Vec<ApiHistoricalUptime>, sqlx::Error> {
@@ -336,7 +320,7 @@ impl StorageManager {
/// # Arguments
///
/// * `identity`: identity (base58-encoded public key) of the gateway.
pub(crate) async fn get_gateway_historical_uptimes(
pub(super) async fn get_gateway_historical_uptimes(
&self,
node_id: NodeId,
) -> Result<Vec<ApiHistoricalUptime>, sqlx::Error> {
@@ -369,7 +353,7 @@ impl StorageManager {
Ok(uptimes)
}
pub(crate) async fn get_historical_mix_uptime_on(
pub(super) async fn get_historical_mix_uptime_on(
&self,
contract_node_id: i64,
date: Date,
@@ -393,7 +377,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_historical_gateway_uptime_on(
pub(super) async fn get_historical_gateway_uptime_on(
&self,
contract_node_id: i64,
date: Date,
@@ -424,7 +408,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_mixnode_statuses_by_database_id(
pub(super) async fn get_mixnode_statuses_by_database_id(
&self,
id: i64,
since: i64,
@@ -445,7 +429,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_mixnode_average_reliability_in_interval(
pub(super) async fn get_mixnode_average_reliability_in_interval(
&self,
id: i64,
start: i64,
@@ -507,7 +491,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_gateway_statuses_by_database_id(
pub(super) async fn get_gateway_statuses_by_database_id(
&self,
id: i64,
since: i64,
@@ -534,27 +518,36 @@ impl StorageManager {
///
/// * `timestamp`: unix timestamp indicating when the measurements took place.
/// * `mixnode_results`: reliability results of each node that got tested.
pub(crate) async fn submit_mixnode_statuses(
pub(super) async fn submit_mixnode_statuses(
&self,
timestamp: i64,
mixnode_results: Vec<NodeResult>,
id_cache: &DbIdCache,
) -> Result<(), sqlx::Error> {
// insert it all in a transaction to make sure all nodes are updated at the same time
// (plus it's a nice guard against new nodes)
let mut tx = self.connection_pool.begin().await?;
for mixnode_result in mixnode_results {
let mixnode_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO mixnode_details(mix_id, identity_key) VALUES (?, ?);
SELECT id FROM mixnode_details WHERE mix_id = ?;
"#,
mixnode_result.node_id,
mixnode_result.identity,
mixnode_result.node_id,
)
.fetch_one(&mut *tx)
.await?
.id;
let mixnode_id = match id_cache.mixnode_db_id(mixnode_result.node_id) {
Some(id) => id,
None => {
let mixnode_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO mixnode_details(mix_id, identity_key) VALUES (?, ?);
SELECT id FROM mixnode_details WHERE mix_id = ?;
"#,
mixnode_result.node_id,
mixnode_result.identity,
mixnode_result.node_id,
)
.fetch_one(&mut *tx)
.await?
.id;
id_cache.set_mixnode_db_id(mixnode_result.node_id, mixnode_id);
mixnode_id
}
};
// insert the actual status
sqlx::query!(
@@ -573,7 +566,7 @@ impl StorageManager {
tx.commit().await
}
pub(crate) async fn submit_mixnode_statuses_v2(
pub(super) async fn submit_mixnode_statuses_v2(
&self,
mixnode_results: &[NodeResult],
) -> Result<(), sqlx::Error> {
@@ -620,10 +613,11 @@ impl StorageManager {
///
/// * `timestamp`: unix timestamp indicating when the measurements took place.
/// * `gateway_results`: reliability results of each node that got tested.
pub(crate) async fn submit_gateway_statuses(
pub(super) async fn submit_gateway_statuses(
&self,
timestamp: i64,
gateway_results: Vec<NodeResult>,
id_cache: &DbIdCache,
) -> Result<(), sqlx::Error> {
// insert it all in a transaction to make sure all nodes are updated at the same time
// (plus it's a nice guard against new nodes)
@@ -631,39 +625,45 @@ impl StorageManager {
for gateway_result in gateway_results {
// if gateway info doesn't exist, insert it and get its id
// same ID "problem" as described for mixnode insertion
let gateway_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO gateway_details(node_id, identity) VALUES (?, ?);
SELECT id FROM gateway_details WHERE identity = ?;
"#,
gateway_result.node_id,
gateway_result.identity,
gateway_result.identity,
)
.fetch_one(&mut *tx)
.await?
.id;
let gateway_id = match id_cache.gateway_db_id(gateway_result.node_id) {
Some(id) => id,
None => {
let gateway_id = sqlx::query!(
r#"
INSERT OR IGNORE INTO gateway_details(node_id, identity) VALUES (?, ?);
SELECT id FROM gateway_details WHERE identity = ?;
"#,
gateway_result.node_id,
gateway_result.identity,
gateway_result.identity,
)
.fetch_one(&mut *tx)
.await?
.id;
id_cache.set_gateway_db_id(gateway_result.node_id, gateway_id);
gateway_id
}
};
// insert the actual status
sqlx::query!(
r#"
INSERT INTO gateway_status (gateway_details_id, reliability, timestamp) VALUES (?, ?, ?);
"#,
gateway_id,
gateway_result.reliability,
timestamp
)
.execute(&mut *tx)
.await?;
r#"
INSERT INTO gateway_status (gateway_details_id, reliability, timestamp) VALUES (?, ?, ?);
"#,
gateway_id,
gateway_result.reliability,
timestamp
)
.execute(&mut *tx)
.await?;
}
// finally commit the transaction
tx.commit().await
}
pub(crate) async fn submit_gateway_statuses_v2(
pub(super) async fn submit_gateway_statuses_v2(
&self,
gateway_results: &[NodeResult],
) -> Result<(), sqlx::Error> {
@@ -714,7 +714,7 @@ impl StorageManager {
/// # Arguments
///
/// * `testing_route`: test route used for this particular network monitor run.
pub(crate) async fn submit_testing_route_used(
pub(super) async fn submit_testing_route_used(
&self,
testing_route: TestingRoute,
) -> Result<(), sqlx::Error> {
@@ -742,7 +742,7 @@ impl StorageManager {
///
/// * `db_mixnode_id`: id (as saved in the database) of the mixnode.
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
pub(crate) async fn get_mixnode_testing_route_presence_count_since(
pub(super) async fn get_mixnode_testing_route_presence_count_since(
&self,
db_mixnode_id: i64,
since: i64,
@@ -781,7 +781,7 @@ impl StorageManager {
///
/// * `gateway_id`: id (as saved in the database) of the gateway.
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
pub(crate) async fn get_gateway_testing_route_presence_count_since(
pub(super) async fn get_gateway_testing_route_presence_count_since(
&self,
gateway_id: i64,
since: i64,
@@ -813,7 +813,7 @@ impl StorageManager {
}
/// Checks whether there are already any historical uptimes with this particular date.
pub(crate) async fn check_for_historical_uptime_existence(
pub(super) async fn check_for_historical_uptime_existence(
&self,
today_iso_8601: &str,
) -> Result<bool, sqlx::Error> {
@@ -833,7 +833,7 @@ impl StorageManager {
/// * `node_id`: id of the mixnode (as inserted in `mixnode_details_id` table).
/// * `date`: date associated with the uptime represented in ISO 8601, i.e. YYYY-MM-DD.
/// * `uptime`: the actual uptime of the node during the specified day.
pub(crate) async fn insert_mixnode_historical_uptime(
pub(super) async fn insert_mixnode_historical_uptime(
&self,
mix_id: i64,
date: &str,
@@ -855,7 +855,7 @@ impl StorageManager {
/// * `node_id`: id of the gateway (as inserted in `gateway_details_id` table).
/// * `date`: date associated with the uptime represented in ISO 8601, i.e. YYYY-MM-DD.
/// * `uptime`: the actual uptime of the node during the specified day.
pub(crate) async fn insert_gateway_historical_uptime(
pub(super) async fn insert_gateway_historical_uptime(
&self,
db_id: i64,
date: &str,
@@ -876,7 +876,7 @@ impl StorageManager {
/// # Arguments
///
/// * `timestamp`: unix timestamp at which the monitor test run has occurred
pub(crate) async fn insert_monitor_run(&self, timestamp: i64) -> Result<i64, sqlx::Error> {
pub(super) async fn insert_monitor_run(&self, timestamp: i64) -> Result<i64, sqlx::Error> {
let res = sqlx::query!("INSERT INTO monitor_run(timestamp) VALUES (?)", timestamp)
.execute(&self.connection_pool)
.await?;
@@ -889,7 +889,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_monitor_runs_count(
pub(super) async fn get_monitor_runs_count(
&self,
since: i64,
until: i64,
@@ -911,7 +911,7 @@ impl StorageManager {
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(crate) async fn purge_old_mixnode_statuses(
pub(super) async fn purge_old_mixnode_statuses(
&self,
timestamp: i64,
) -> Result<(), sqlx::Error> {
@@ -927,7 +927,7 @@ impl StorageManager {
/// # Arguments
///
/// * `until`: timestamp specifying the purge cutoff.
pub(crate) async fn purge_old_gateway_statuses(
pub(super) async fn purge_old_gateway_statuses(
&self,
timestamp: i64,
) -> Result<(), sqlx::Error> {
@@ -944,7 +944,7 @@ impl StorageManager {
///
/// * `since`: indicates the lower bound timestamp for deciding whether given mixnode is active
/// * `until`: indicates the upper bound timestamp for deciding whether given mixnode is active
pub(crate) async fn get_all_active_mixnodes_in_interval(
pub(super) async fn get_all_active_mixnodes_in_interval(
&self,
since: i64,
until: i64,
@@ -978,7 +978,7 @@ impl StorageManager {
///
/// * `since`: indicates the lower bound timestamp for deciding whether given gateway is active
/// * `until`: indicates the upper bound timestamp for deciding whether given gateway is active
pub(crate) async fn get_all_active_gateways_in_interval(
pub(super) async fn get_all_active_gateways_in_interval(
&self,
since: i64,
until: i64,
@@ -1025,7 +1025,7 @@ impl StorageManager {
///
/// * `report`: report to insert into the database
#[allow(unused)]
pub(crate) async fn insert_rewarding_report(
pub(super) async fn insert_rewarding_report(
&self,
report: RewardingReport,
) -> Result<(), sqlx::Error> {
@@ -1044,7 +1044,7 @@ impl StorageManager {
}
#[allow(unused)]
pub(crate) async fn get_rewarding_report(
pub(super) async fn get_rewarding_report(
&self,
absolute_epoch_id: EpochId,
) -> Result<Option<RewardingReport>, sqlx::Error> {
@@ -1069,7 +1069,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_all_active_mixnodes_statuses_in_interval(
pub(super) async fn get_all_active_mixnodes_statuses_in_interval(
&self,
since: i64,
until: i64,
@@ -1102,7 +1102,7 @@ impl StorageManager {
///
/// * `since`: unix timestamp indicating the lower bound interval of the selection.
/// * `until`: unix timestamp indicating the upper bound interval of the selection.
pub(crate) async fn get_all_active_gateways_statuses_in_interval(
pub(super) async fn get_all_active_gateways_statuses_in_interval(
&self,
since: i64,
until: i64,
@@ -1129,7 +1129,7 @@ impl StorageManager {
Ok(active_day_statuses)
}
pub(crate) async fn get_mixnode_details_by_db_id(
pub(super) async fn get_mixnode_details_by_db_id(
&self,
id: i64,
) -> Result<Option<MixnodeDetails>, sqlx::Error> {
@@ -1142,7 +1142,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_gateway_details_by_db_id(
pub(super) async fn get_gateway_details_by_db_id(
&self,
id: i64,
) -> Result<Option<GatewayDetails>, sqlx::Error> {
@@ -1154,7 +1154,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_mixnode_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
pub(super) async fn get_mixnode_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
sqlx::query!(
r#"
SELECT COUNT(*) as count
@@ -1170,7 +1170,7 @@ impl StorageManager {
.map(|record| record.count)
}
pub(crate) async fn get_mixnode_statuses(
pub(super) async fn get_mixnode_statuses(
&self,
mix_id: NodeId,
limit: u32,
@@ -1206,7 +1206,7 @@ impl StorageManager {
.await
}
pub(crate) async fn get_gateway_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
pub(super) async fn get_gateway_statuses_count(&self, db_id: i64) -> Result<i32, sqlx::Error> {
sqlx::query!(
r#"
SELECT COUNT(*) as count
@@ -1222,7 +1222,7 @@ impl StorageManager {
.map(|record| record.count)
}
pub(crate) async fn get_gateway_statuses(
pub(super) async fn get_gateway_statuses(
&self,
gateway_identity: &str,
limit: u32,
+85 -28
View File
@@ -1,6 +1,7 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use self::manager::{AvgGatewayReliability, AvgMixnodeReliability};
use crate::network_monitor::test_route::TestRoute;
use crate::node_status_api::models::{
GatewayStatusReport, GatewayUptimeHistory, HistoricalUptime as ApiHistoricalUptime,
@@ -12,37 +13,71 @@ use crate::storage::models::{NodeStatus, TestingRoute};
use crate::support::storage::models::{
GatewayDetails, HistoricalUptime, MixnodeDetails, TestedGatewayStatus, TestedMixnodeStatus,
};
use dashmap::DashMap;
use nym_mixnet_contract_common::NodeId;
use nym_types::monitoring::NodeResult;
use sqlx::ConnectOptions;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use time::{Date, OffsetDateTime};
use tracing::log::LevelFilter;
use tracing::{error, info, warn};
use self::manager::{AvgGatewayReliability, AvgMixnodeReliability};
pub(crate) mod manager;
pub(crate) mod models;
pub(crate) mod runtime_migrations;
#[derive(Default)]
pub(crate) struct DbIdCache {
pub mixnodes_v1: DashMap<NodeId, i64>,
pub gateways_v1: DashMap<NodeId, i64>,
}
impl DbIdCache {
pub(crate) fn mixnode_db_id(&self, node_id: NodeId) -> Option<i64> {
self.mixnodes_v1.get(&node_id).map(|v| *v)
}
pub(crate) fn gateway_db_id(&self, node_id: NodeId) -> Option<i64> {
self.gateways_v1.get(&node_id).map(|v| *v)
}
pub(crate) fn set_mixnode_db_id(&self, node_id: NodeId, db_id: i64) {
self.mixnodes_v1.insert(node_id, db_id);
}
pub(crate) fn set_gateway_db_id(&self, node_id: NodeId, db_id: i64) {
self.gateways_v1.insert(node_id, db_id);
}
}
// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
pub(crate) struct NymApiStorage {
pub manager: StorageManager,
pub db_id_cache: Arc<DbIdCache>,
}
impl NymApiStorage {
pub async fn init<P: AsRef<Path>>(database_path: P) -> Result<Self, NymApiStorageError> {
// TODO: we can inject here more stuff based on our nym-api global config
// struct. Maybe different pool size or timeout intervals?
let opts = sqlx::sqlite::SqliteConnectOptions::new()
let connect_opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
.log_statements(LevelFilter::Trace)
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(250));
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
let pool_opts = sqlx::sqlite::SqlitePoolOptions::new()
.min_connections(5)
.max_connections(25)
.acquire_timeout(Duration::from_secs(60));
let connection_pool = match pool_opts.connect_with(connect_opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
@@ -59,32 +94,38 @@ impl NymApiStorage {
let storage = NymApiStorage {
manager: StorageManager { connection_pool },
db_id_cache: Arc::new(Default::default()),
};
Ok(storage)
}
#[allow(unused)]
pub(crate) async fn mix_identity_to_mix_ids(
pub(crate) async fn get_mixnode_database_id(
&self,
identity: &str,
) -> Result<Vec<NodeId>, NymApiStorageError> {
Ok(self
.manager
.get_mixnode_mix_ids_by_identity(identity)
.await?)
node_id: NodeId,
) -> Result<Option<i64>, NymApiStorageError> {
if let Some(cached) = self.db_id_cache.mixnode_db_id(node_id) {
return Ok(Some(cached));
}
if let Some(retrieved) = self.manager.get_mixnode_database_id(node_id).await? {
self.db_id_cache.set_mixnode_db_id(node_id, retrieved);
return Ok(Some(retrieved));
}
Ok(None)
}
#[allow(unused)]
pub(crate) async fn mix_identity_to_latest_mix_id(
pub(crate) async fn get_gateway_database_id(
&self,
identity: &str,
) -> Result<Option<NodeId>, NymApiStorageError> {
Ok(self
.mix_identity_to_mix_ids(identity)
.await?
.into_iter()
.max())
node_id: NodeId,
) -> Result<Option<i64>, NymApiStorageError> {
if let Some(cached) = self.db_id_cache.gateway_db_id(node_id) {
return Ok(Some(cached));
}
if let Some(retrieved) = self.manager.get_gateway_database_id(node_id).await? {
self.db_id_cache.set_gateway_db_id(node_id, retrieved);
return Ok(Some(retrieved));
}
Ok(None)
}
pub(crate) async fn get_all_avg_gateway_reliability_in_last_24hr(
@@ -576,7 +617,6 @@ impl NymApiStorage {
// we MUST have those entries in the database, otherwise the route wouldn't have been chosen
// in the first place
let layer1_mix_db_id = self
.manager
.get_mixnode_database_id(test_route.layer_one_mix().mix_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -584,7 +624,6 @@ impl NymApiStorage {
})?;
let layer2_mix_db_id = self
.manager
.get_mixnode_database_id(test_route.layer_two_mix().mix_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -592,7 +631,6 @@ impl NymApiStorage {
})?;
let layer3_mix_db_id = self
.manager
.get_mixnode_database_id(test_route.layer_three_mix().mix_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -600,7 +638,6 @@ impl NymApiStorage {
})?;
let gateway_db_id = self
.manager
.get_gateway_database_id(test_route.gateway().node_id)
.await?
.ok_or_else(|| NymApiStorageError::DatabaseInconsistency {
@@ -701,11 +738,11 @@ impl NymApiStorage {
let monitor_run_id = self.manager.insert_monitor_run(now).await?;
self.manager
.submit_mixnode_statuses(now, mixnode_results)
.submit_mixnode_statuses(now, mixnode_results, &self.db_id_cache)
.await?;
self.manager
.submit_gateway_statuses(now, gateway_results)
.submit_gateway_statuses(now, gateway_results, &self.db_id_cache)
.await?;
for test_route in test_routes {
@@ -715,6 +752,26 @@ impl NymApiStorage {
Ok(())
}
pub(crate) async fn submit_mixnode_statuses_v2(
&self,
mixnode_results: &[NodeResult],
) -> Result<(), NymApiStorageError> {
self.manager
.submit_mixnode_statuses_v2(mixnode_results)
.await?;
Ok(())
}
pub(crate) async fn submit_gateway_statuses_v2(
&self,
gateway_results: &[NodeResult],
) -> Result<(), NymApiStorageError> {
self.manager
.submit_gateway_statuses_v2(gateway_results)
.await?;
Ok(())
}
/// Obtains number of network monitor test runs that have occurred within the specified interval.
///
/// # Arguments
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-node"
version = "1.1.10"
version = "1.1.11"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -59,6 +59,17 @@ pub struct HostInformation {
pub keys: HostKeys,
}
impl HostInformation {
pub fn check_ips(&self) -> bool {
for ip in &self.ip_address {
if ip.is_unspecified() || ip.is_loopback() || ip.is_multicast() {
return false;
}
}
true
}
}
#[derive(Serialize)]
pub struct LegacyHostInformationV2 {
pub ip_address: Vec<IpAddr>,
@@ -4,7 +4,7 @@
[package]
name = "nym-network-requester"
license = "GPL-3.0"
version = "1.1.44"
version = "1.1.45"
authors.workspace = true
edition.workspace = true
rust-version = "1.70"
+1
View File
@@ -231,6 +231,7 @@ impl NymNodeTesterBuilder {
self.base_topology,
Some(address(&managed_keys, gateway_identity)),
PacketSize::default(),
false,
Duration::from_millis(5),
Duration::from_millis(5),
managed_keys.ack_key(),