Feature/replay protection (#5682)

* remove old packettype + fix: apply routing filter BEFORE delaying

* updated sphinx crate for allow usage of reply tags

* full pipeline for placeholder checking of packet replay

* replay protection with batched insertion

* running background task for clearing/flushing the BF

* allow disabling the replay detection + cleanup

* allow unwrap in bench code
This commit is contained in:
Jędrzej Stuczyński
2025-04-08 09:50:25 +01:00
committed by GitHub
parent 0870911b3c
commit 0e38126fc5
45 changed files with 3230 additions and 754 deletions
Generated
+20 -4
View File
@@ -908,6 +908,16 @@ dependencies = [
"generic-array 0.14.7",
]
[[package]]
name = "bloomfilter"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f6d7f06817e48ea4e17532fa61bc4e8b9a101437f0623f69d2ea54284f3a817"
dependencies = [
"getrandom 0.2.15",
"siphasher 1.0.1",
]
[[package]]
name = "bls12_381"
version = "0.8.0"
@@ -1564,6 +1574,7 @@ dependencies = [
"ciborium",
"clap",
"criterion-plot",
"futures",
"is-terminal",
"itertools 0.10.5",
"num-traits",
@@ -1576,6 +1587,7 @@ dependencies = [
"serde_derive",
"serde_json",
"tinytemplate",
"tokio",
"walkdir",
]
@@ -6115,15 +6127,18 @@ dependencies = [
"axum 0.7.9",
"bip39",
"blake2 0.8.1",
"bloomfilter",
"bs58",
"cargo_metadata 0.18.1",
"celes",
"chacha",
"clap",
"colored",
"criterion",
"csv",
"cupid",
"futures",
"hkdf",
"human-repr",
"humantime-serde",
"indicatif",
@@ -6163,6 +6178,7 @@ dependencies = [
"rand 0.8.5",
"serde",
"serde_json",
"sha2 0.10.8",
"sysinfo",
"thiserror 2.0.12",
"time",
@@ -6713,8 +6729,6 @@ name = "nym-sphinx-framing"
version = "0.1.0"
dependencies = [
"bytes",
"log",
"nym-metrics",
"nym-sphinx-acknowledgements",
"nym-sphinx-addressing",
"nym-sphinx-forwarding",
@@ -6723,6 +6737,7 @@ dependencies = [
"thiserror 2.0.12",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
@@ -9246,9 +9261,9 @@ dependencies = [
[[package]]
name = "sphinx-packet"
version = "0.3.2"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c23047e0cf36ff6904603f499fd13153425cdf5ba47bfbaedbc999da0bd92f4e"
checksum = "b63a72efe7dce8a546d5cb855e60699ae69203d0d7e4335a654eb87e93d7d141"
dependencies = [
"aes",
"arrayref",
@@ -9267,6 +9282,7 @@ dependencies = [
"sha2 0.10.8",
"subtle 2.6.1",
"x25519-dalek",
"zeroize",
]
[[package]]
+2 -5
View File
@@ -204,7 +204,7 @@ bip39 = { version = "2.0.0", features = ["zeroize"] }
bit-vec = "0.7.0" # can we unify those?
bitvec = "1.0.0"
blake3 = "1.7.0"
bloomfilter = "1.0.14"
bloomfilter = "3.0.1"
bs58 = "0.5.1"
bytecodec = "0.4.15"
bytes = "1.10.1"
@@ -303,9 +303,6 @@ rand_seeder = "0.2.3"
rayon = "1.5.1"
regex = "1.10.6"
reqwest = { version = "0.12.15", default-features = false }
rocket = "0.5.0"
rocket_cors = "0.6.0"
rocket_okapi = "0.8.0"
rs_merkle = "1.5.0"
safer-ffi = "0.1.13"
schemars = "0.8.22"
@@ -320,7 +317,7 @@ serde_with = "3.9.0"
serde_yaml = "0.9.25"
sha2 = "0.10.8"
si-scale = "0.2.3"
sphinx-packet = "=0.3.2"
sphinx-packet = "=0.5.0"
sqlx = "0.7.4"
strum = "0.26"
strum_macros = "0.26"
@@ -10,9 +10,6 @@ pub enum MixProcessingError {
#[error("failed to recover the expected SURB-Ack packet: {0}")]
MalformedSurbAck(#[from] SurbAckRecoveryError),
#[error("the received packet was set to use the very old and very much deprecated 'VPN' mode")]
ReceivedOldTypeVpnPacket,
#[error("failed to process received Nym packet: {0}")]
NymPacketProcessingError(#[from] PacketProcessingError),
}
-29
View File
@@ -37,32 +37,3 @@ impl TicketTypeRepr {
}
}
}
// Constants for bloom filter for double spending detection
//Chosen for FP of
//Calculator at https://hur.st/bloomfilter/
pub const ECASH_DS_BLOOMFILTER_PARAMS: BloomfilterParameters = BloomfilterParameters {
num_hashes: 10,
bitmap_size: 1_500_000_000,
sip_keys: [
(12345678910111213141, 1415926535897932384),
(7182818284590452353, 3571113171923293137),
],
};
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct BloomfilterParameters {
pub num_hashes: u32,
pub bitmap_size: u64,
pub sip_keys: [(u64, u64); 2],
}
impl BloomfilterParameters {
pub const fn byte_size(&self) -> u64 {
self.bitmap_size / 8
}
pub const fn default_ecash() -> Self {
ECASH_DS_BLOOMFILTER_PARAMS
}
}
+17
View File
@@ -363,6 +363,7 @@ impl MetricsController {
buffer
}
#[inline(always)]
pub fn to_writer(&self, writer: &mut dyn std::io::Write) {
let metrics = self.gather();
match writer.write_all(&metrics) {
@@ -371,6 +372,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn register_int_gauge<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_int_gauge(name, help.into().unwrap_or(name)) else {
return;
@@ -378,6 +380,7 @@ impl MetricsController {
self.register_metric(metric);
}
#[inline(always)]
pub fn register_float_gauge<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_float_gauge(name, help.into().unwrap_or(name)) else {
return;
@@ -385,6 +388,7 @@ impl MetricsController {
self.register_metric(metric);
}
#[inline(always)]
pub fn register_int_counter<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
let Some(metric) = Metric::new_int_counter(name, help.into().unwrap_or(name)) else {
return;
@@ -392,6 +396,7 @@ impl MetricsController {
self.register_metric(metric);
}
#[inline(always)]
pub fn register_histogram<'a>(
&self,
name: &str,
@@ -404,6 +409,7 @@ impl MetricsController {
self.register_metric(metric);
}
#[inline(always)]
pub fn set(&self, name: &str, value: i64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.set(value);
@@ -413,6 +419,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn set_float(&self, name: &str, value: f64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.set_float(value);
@@ -422,6 +429,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn add_to_histogram(&self, name: &str, value: f64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.add_histogram_observation(value);
@@ -431,12 +439,14 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn start_timer(&self, name: &str) -> Option<HistogramTimer> {
self.registry_index
.get(name)
.and_then(|metric| metric.start_timer())
}
#[inline(always)]
pub fn inc(&self, name: &str) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.inc();
@@ -446,6 +456,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn inc_by(&self, name: &str, value: i64) -> bool {
if let Some(metric) = self.registry_index.get(name) {
metric.inc_by(value);
@@ -455,6 +466,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn maybe_register_and_set<'a>(
&self,
name: &str,
@@ -468,6 +480,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn maybe_register_and_set_float<'a>(
&self,
name: &str,
@@ -481,6 +494,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn maybe_register_and_add_to_histogram<'a>(
&self,
name: &str,
@@ -495,6 +509,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn maybe_register_and_inc<'a>(&self, name: &str, help: impl Into<Option<&'a str>>) {
if !self.inc(name) {
let help = help.into();
@@ -503,6 +518,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn maybe_register_and_inc_by<'a>(
&self,
name: &str,
@@ -516,6 +532,7 @@ impl MetricsController {
}
}
#[inline(always)]
pub fn register_metric(&self, metric: impl Into<Metric>) {
let m = metric.into();
let fq_name = m.fq_name();
@@ -57,8 +57,6 @@ impl SurbAck {
let packet_size = match packet_type {
PacketType::Outfox => surb_ack_payload.len().max(MIN_PACKET_SIZE),
PacketType::Mix => PacketSize::AckPacket.payload_size(),
#[allow(deprecated)]
PacketType::Vpn => PacketSize::AckPacket.payload_size(),
};
let surb_ack_packet = match packet_type {
@@ -75,14 +73,6 @@ impl SurbAck {
&destination,
&delays,
)?,
#[allow(deprecated)]
PacketType::Vpn => NymPacket::sphinx_build(
packet_size,
surb_ack_payload,
&route,
&destination,
&delays,
)?,
};
// in our case, the last hop is a gateway that does NOT do any delays
@@ -106,8 +96,6 @@ impl SurbAck {
PacketSize::OutfoxAckPacket.size() + MAX_NODE_ADDRESS_UNPADDED_LEN
}
PacketType::Mix => PacketSize::AckPacket.size() + MAX_NODE_ADDRESS_UNPADDED_LEN,
#[allow(deprecated)]
PacketType::Vpn => PacketSize::AckPacket.size() + MAX_NODE_ADDRESS_UNPADDED_LEN,
}
}
@@ -139,8 +127,6 @@ impl SurbAck {
let packet = match packet_type {
PacketType::Outfox => NymPacket::outfox_from_bytes(&b[address_offset..])?,
PacketType::Mix => NymPacket::sphinx_from_bytes(&b[address_offset..])?,
#[allow(deprecated)]
PacketType::Vpn => NymPacket::sphinx_from_bytes(&b[address_offset..])?,
};
Ok((address, packet))
-8
View File
@@ -132,14 +132,6 @@ where
&destination,
&delays,
)?,
#[allow(deprecated)]
PacketType::Vpn => NymPacket::sphinx_build(
packet_size.payload_size(),
packet_payload,
&route,
&destination,
&delays,
)?,
PacketType::Outfox => NymPacket::outfox_build(
packet_payload,
&route,
+1 -2
View File
@@ -11,12 +11,11 @@ repository = { workspace = true }
bytes = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
thiserror = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
nym-sphinx-types = { path = "../types", features = ["sphinx", "outfox"] }
nym-sphinx-params = { path = "../params", features = ["sphinx", "outfox"] }
nym-sphinx-forwarding = { path = "../forwarding" }
nym-metrics = { path = "../../nym-metrics" }
nym-sphinx-addressing = { path = "../addressing" }
nym-sphinx-acknowledgements = { path = "../acknowledgements" }
-2
View File
@@ -85,8 +85,6 @@ impl Decoder for NymCodec {
match header.packet_type {
PacketType::Outfox => NymPacket::outfox_from_bytes(slice)?,
PacketType::Mix => NymPacket::sphinx_from_bytes(slice)?,
#[allow(deprecated)]
PacketType::Vpn => NymPacket::sphinx_from_bytes(slice)?,
}
} else {
return Ok(None);
+8
View File
@@ -47,6 +47,14 @@ impl FramedNymPacket {
pub fn into_inner(self) -> NymPacket {
self.packet
}
pub fn packet(&self) -> &NymPacket {
&self.packet
}
pub fn is_sphinx(&self) -> bool {
self.packet.is_sphinx()
}
}
// Contains any metadata that might be useful for sending between mix nodes.
+192 -91
View File
@@ -1,18 +1,20 @@
use log::{debug, error, info, trace};
// Copyright 2021-2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::packet::FramedNymPacket;
use nym_sphinx_acknowledgements::surb_ack::{SurbAck, SurbAckRecoveryError};
use nym_sphinx_addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError};
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_params::{PacketSize, PacketType};
use nym_sphinx_types::header::shared_secret::ExpandedSharedSecret;
use nym_sphinx_types::{
Delay as SphinxDelay, DestinationAddressBytes, NodeAddressBytes, NymPacket, NymPacketError,
NymProcessedPacket, OutfoxError, PrivateKey, ProcessedPacketData, SphinxError,
Version as SphinxPacketVersion,
NymProcessedPacket, OutfoxError, OutfoxProcessedPacket, PrivateKey, ProcessedPacketData,
SphinxError, Version as SphinxPacketVersion, REPLAY_TAG_SIZE,
};
use std::fmt::Display;
use thiserror::Error;
use crate::packet::FramedNymPacket;
use nym_metrics::nanos;
use nym_sphinx_forwarding::packet::MixPacket;
use tracing::{debug, error, info, trace};
#[derive(Debug)]
pub enum MixProcessingResultData {
@@ -49,6 +51,26 @@ pub struct MixProcessingResult {
pub processing_data: MixProcessingResultData,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum PartialMixProcessingResult {
Sphinx {
expanded_shared_secret: ExpandedSharedSecret,
},
Outfox,
}
impl PartialMixProcessingResult {
pub fn replay_tag(&self) -> Option<&[u8; REPLAY_TAG_SIZE]> {
match self {
PartialMixProcessingResult::Sphinx {
expanded_shared_secret,
} => Some(expanded_shared_secret.replay_tag()),
PartialMixProcessingResult::Outfox => None,
}
}
}
type ForwardAck = MixPacket;
#[derive(Debug)]
@@ -75,59 +97,192 @@ pub enum PacketProcessingError {
#[error("failed to recover the expected SURB-Ack packet: {0}")]
MalformedSurbAck(#[from] SurbAckRecoveryError),
#[error("the received packet was set to use the very old and very much deprecated 'VPN' mode")]
ReceivedOldTypeVpnPacket,
#[error("failed to process received outfox packet: {0}")]
OutfoxProcessingError(#[from] OutfoxError),
#[error("attempted to partially process an outfox packet")]
PartialOutfoxProcessing,
#[error("this packet has already been processed before")]
PacketReplay,
}
pub struct PartiallyUnwrappedPacket {
received_data: FramedNymPacket,
partial_result: PartialMixProcessingResult,
}
impl PartiallyUnwrappedPacket {
/// Attempt to partially unwrap received packet to derive relevant keys
/// to allow us to reject it for obvious bad behaviour (like replay or invalid mac)
/// without performing full processing
pub fn new(
received_data: FramedNymPacket,
sphinx_key: &PrivateKey,
) -> Result<Self, PacketProcessingError> {
let partial_result = match received_data.packet() {
NymPacket::Sphinx(packet) => {
let expanded_shared_secret =
packet.header.compute_expanded_shared_secret(sphinx_key);
// don't continue if the header is malformed
packet
.header
.ensure_header_integrity(&expanded_shared_secret)?;
PartialMixProcessingResult::Sphinx {
expanded_shared_secret,
}
}
NymPacket::Outfox(_) => PartialMixProcessingResult::Outfox,
};
Ok(PartiallyUnwrappedPacket {
received_data,
partial_result,
})
}
pub fn finalise_unwrapping(self) -> Result<MixProcessingResult, PacketProcessingError> {
let packet_size = self.received_data.packet_size();
let packet_type = self.received_data.packet_type();
let packet = self.received_data.into_inner();
// currently partial unwrapping is only implemented for sphinx packets.
// attempting to call it for anything else should result in a failure
let (
NymPacket::Sphinx(packet),
PartialMixProcessingResult::Sphinx {
expanded_shared_secret,
},
) = (packet, self.partial_result)
else {
return Err(PacketProcessingError::PartialOutfoxProcessing);
};
let processed_packet = packet.process_with_expanded_secret(&expanded_shared_secret)?;
wrap_processed_sphinx_packet(processed_packet, packet_size, packet_type)
}
pub fn replay_tag(&self) -> Option<&[u8; REPLAY_TAG_SIZE]> {
self.partial_result.replay_tag()
}
}
impl From<(FramedNymPacket, PartialMixProcessingResult)> for PartiallyUnwrappedPacket {
fn from(
(received_data, partial_result): (FramedNymPacket, PartialMixProcessingResult),
) -> Self {
PartiallyUnwrappedPacket {
received_data,
partial_result,
}
}
}
pub fn process_framed_packet(
received: FramedNymPacket,
sphinx_key: &PrivateKey,
) -> Result<MixProcessingResult, PacketProcessingError> {
nanos!("process_received", {
let packet_size = received.packet_size();
let packet_type = received.packet_type();
let packet_size = received.packet_size();
let packet_type = received.packet_type();
// unwrap the sphinx packet and if possible and appropriate, cache keys
let processed_packet = perform_framed_unwrapping(received, sphinx_key)?;
// unwrap the sphinx packet
let processed_packet = perform_framed_unwrapping(received, sphinx_key)?;
// for forward packets, extract next hop and set delay (but do NOT delay here)
// for final packets, extract SURBAck
let final_processing_result =
perform_final_processing(processed_packet, packet_size, packet_type);
if final_processing_result.is_err() {
error!("{:?}", final_processing_result)
}
final_processing_result
})
// for forward packets, extract next hop and set delay (but do NOT delay here)
// for final packets, extract SURBAck
perform_final_processing(processed_packet, packet_size, packet_type)
}
fn perform_framed_unwrapping(
received: FramedNymPacket,
sphinx_key: &PrivateKey,
) -> Result<NymProcessedPacket, PacketProcessingError> {
nanos!("perform_initial_unwrapping", {
let packet = received.into_inner();
perform_framed_packet_processing(packet, sphinx_key)
})
let packet = received.into_inner();
perform_framed_packet_processing(packet, sphinx_key)
}
fn perform_framed_packet_processing(
packet: NymPacket,
sphinx_key: &PrivateKey,
) -> Result<NymProcessedPacket, PacketProcessingError> {
nanos!("perform_initial_packet_processing", {
packet.process(sphinx_key).map_err(|err| {
debug!("Failed to unwrap NymPacket packet: {err}");
PacketProcessingError::NymPacketProcessingError(err)
})
packet.process(sphinx_key).map_err(|err| {
debug!("Failed to unwrap NymPacket packet: {err}");
PacketProcessingError::NymPacketProcessingError(err)
})
}
fn wrap_processed_sphinx_packet(
packet: nym_sphinx_types::ProcessedPacket,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<MixProcessingResult, PacketProcessingError> {
let processing_data = match packet.data {
ProcessedPacketData::ForwardHop {
next_hop_packet,
next_hop_address,
delay,
} => process_forward_hop(
NymPacket::Sphinx(next_hop_packet),
next_hop_address,
delay,
packet_type,
),
// right now there's no use for the surb_id included in the header - probably it should get removed from the
// sphinx all together?
ProcessedPacketData::FinalHop {
destination,
identifier: _,
payload,
} => process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
),
}?;
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Sphinx(packet.version),
processing_data,
})
}
fn wrap_processed_outfox_packet(
packet: OutfoxProcessedPacket,
packet_size: PacketSize,
packet_type: PacketType,
) -> Result<MixProcessingResult, PacketProcessingError> {
let next_address = *packet.next_address();
let packet = packet.into_packet();
if packet.is_final_hop() {
let processing_data = process_final_hop(
DestinationAddressBytes::from_bytes(next_address),
packet.recover_plaintext()?.to_vec(),
packet_size,
packet_type,
)?;
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Outfox,
processing_data,
})
} else {
let packet = MixPacket::new(
NymNodeRoutingAddress::try_from_bytes(&next_address)?,
NymPacket::Outfox(packet),
PacketType::Outfox,
);
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Outfox,
processing_data: MixProcessingResultData::ForwardHop {
packet,
delay: None,
},
})
}
}
fn perform_final_processing(
packet: NymProcessedPacket,
packet_size: PacketSize,
@@ -135,64 +290,10 @@ fn perform_final_processing(
) -> Result<MixProcessingResult, PacketProcessingError> {
match packet {
NymProcessedPacket::Sphinx(packet) => {
let processing_data = match packet.data {
ProcessedPacketData::ForwardHop {
next_hop_packet,
next_hop_address,
delay,
} => process_forward_hop(
NymPacket::Sphinx(next_hop_packet),
next_hop_address,
delay,
packet_type,
),
// right now there's no use for the surb_id included in the header - probably it should get removed from the
// sphinx all together?
ProcessedPacketData::FinalHop {
destination,
identifier: _,
payload,
} => process_final_hop(
destination,
payload.recover_plaintext()?,
packet_size,
packet_type,
),
}?;
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Sphinx(packet.version),
processing_data,
})
wrap_processed_sphinx_packet(packet, packet_size, packet_type)
}
NymProcessedPacket::Outfox(packet) => {
let next_address = *packet.next_address();
let packet = packet.into_packet();
if packet.is_final_hop() {
let processing_data = process_final_hop(
DestinationAddressBytes::from_bytes(next_address),
packet.recover_plaintext()?.to_vec(),
packet_size,
packet_type,
)?;
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Outfox,
processing_data,
})
} else {
let packet = MixPacket::new(
NymNodeRoutingAddress::try_from_bytes(&next_address)?,
NymPacket::Outfox(packet),
PacketType::Outfox,
);
Ok(MixProcessingResult {
packet_version: MixPacketVersion::Outfox,
processing_data: MixProcessingResultData::ForwardHop {
packet,
delay: None,
},
})
}
wrap_processed_outfox_packet(packet, packet_size, packet_type)
}
}
}
@@ -272,9 +272,6 @@ impl PacketSize {
let overhead = match packet_type {
#[cfg(feature = "sphinx")]
PacketType::Mix => SPHINX_PACKET_OVERHEAD,
#[allow(deprecated)]
#[cfg(feature = "sphinx")]
PacketType::Vpn => SPHINX_PACKET_OVERHEAD,
#[cfg(feature = "outfox")]
PacketType::Outfox => OUTFOX_PACKET_OVERHEAD,
_ => 0,
@@ -27,11 +27,6 @@ pub enum PacketType {
#[serde(alias = "sphinx")]
Mix = 0,
/// Represents a packet that should be sent through the network as fast as possible.
#[deprecated]
#[serde(rename = "unsupported-mix-vpn")]
Vpn = 1,
/// Abusing this to add Outfox support
#[serde(rename = "outfox")]
Outfox = 2,
@@ -41,8 +36,6 @@ impl fmt::Display for PacketType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PacketType::Mix => write!(f, "Mix"),
#[allow(deprecated)]
PacketType::Vpn => write!(f, "Vpn"),
PacketType::Outfox => write!(f, "Outfox"),
}
}
-8
View File
@@ -270,14 +270,6 @@ pub trait FragmentPreparer {
&destination,
&delays,
)?,
#[allow(deprecated)]
PacketType::Vpn => NymPacket::sphinx_build(
packet_size.payload_size(),
packet_payload,
&route,
&destination,
&delays,
)?,
};
// from the previously constructed route extract the first hop
+8 -3
View File
@@ -5,7 +5,7 @@ use std::{array::TryFromSliceError, fmt};
use thiserror::Error;
#[cfg(feature = "outfox")]
use nym_outfox::packet::{OutfoxPacket, OutfoxProcessedPacket};
pub use nym_outfox::packet::{OutfoxPacket, OutfoxProcessedPacket};
#[cfg(feature = "sphinx")]
pub use sphinx_packet::{SphinxPacket, SphinxPacketBuilder};
@@ -21,7 +21,7 @@ pub use nym_outfox::{
pub use sphinx_packet::{
constants::{
self, DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, MAX_PATH_LENGTH, NODE_ADDRESS_LENGTH,
PAYLOAD_KEY_SIZE,
PAYLOAD_KEY_SIZE, REPLAY_TAG_SIZE,
},
crypto::{self, PrivateKey, PublicKey},
header::{self, delays, delays::Delay, ProcessedHeader, SphinxHeader, HEADER_SIZE},
@@ -176,10 +176,15 @@ impl NymPacket {
}
#[cfg(feature = "sphinx")]
pub fn as_sphinx_packet(self) -> Option<SphinxPacket> {
pub fn to_sphinx_packet(self) -> Option<SphinxPacket> {
match self {
NymPacket::Sphinx(packet) => Some(packet),
_ => None,
}
}
#[cfg(feature = "sphinx")]
pub fn is_sphinx(&self) -> bool {
matches!(self, NymPacket::Sphinx(_))
}
}
+1 -1
View File
@@ -209,7 +209,7 @@ impl ShutdownManager {
legacy_task_manager: None,
shutdown_signals: Default::default(),
tracker: Default::default(),
max_shutdown_duration: Default::default(),
max_shutdown_duration: Duration::from_secs(10),
};
// we need to add an explicit watcher for the cancellation token being cancelled
+13 -11
View File
@@ -19,6 +19,7 @@ anyhow.workspace = true
arc-swap = { workspace = true }
bip39 = { workspace = true, features = ["zeroize"] }
bs58.workspace = true
bloomfilter = { workspace = true }
celes = { workspace = true } # country codes
colored = { workspace = true }
csv = { workspace = true }
@@ -95,24 +96,25 @@ nym-ip-packet-router = { path = "../service-providers/ip-packet-router" }
# throughput tester to recreate lioness
# we don't care about particular versions - just pull whatever is used by sphinx
[dependencies.lioness]
version = "*"
[dependencies.chacha]
version = "*"
[dependencies.arrayref]
version = "*"
[dependencies.blake2]
version = "*"
lioness = "*"
chacha = "*"
arrayref = "*"
blake2 = "=0.8.1"
sha2 = { workspace = true }
hkdf = { workspace = true }
[[bench]]
name = "benchmarks"
harness = false
[build-dependencies]
# temporary bonding information v1 (to grab and parse nym-mixnode and nym-gateway package versions)
cargo_metadata = { workspace = true }
[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio"] }
[lints]
workspace = true
+78
View File
@@ -0,0 +1,78 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// unwraps in tests/benches are fine...
#![allow(clippy::unwrap_used)]
use bloomfilter::Bloom;
use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion};
use nym_sphinx_types::REPLAY_TAG_SIZE;
use rand::{thread_rng, Rng};
use std::sync::Mutex;
pub fn uncontested_bloomfilter_check(c: &mut Criterion) {
let mut bloomfilter = Bloom::new_for_fp_rate(725760000, 1e-5).unwrap();
c.bench_function("bf_725760000_1e-5_check", |b| {
b.iter_batched(
|| {
let mut rng = thread_rng();
let mut reply_tag = [0; REPLAY_TAG_SIZE];
rng.fill(&mut reply_tag);
reply_tag
},
|replay_tag| {
black_box(bloomfilter.check_and_set(&replay_tag));
},
BatchSize::SmallInput,
)
});
}
pub fn uncontested_bloomfilter_check_with_exclusive_mutex(c: &mut Criterion) {
let bloomfilter = Mutex::new(Bloom::new_for_fp_rate(725760000, 1e-5).unwrap());
c.bench_function("bf_725760000_1e-5_uncontested_std_mutex_check", |b| {
b.iter_batched(
|| {
let mut rng = thread_rng();
let mut reply_tag = [0; REPLAY_TAG_SIZE];
rng.fill(&mut reply_tag);
reply_tag
},
|replay_tag| {
black_box(bloomfilter.lock().unwrap().check_and_set(&replay_tag));
},
BatchSize::SmallInput,
)
});
}
pub fn uncontested_bloomfilter_check_with_exclusive_tokio_mutex(c: &mut Criterion) {
let bloomfilter = tokio::sync::Mutex::new(Bloom::new_for_fp_rate(725760000, 1e-5).unwrap());
let runtime = tokio::runtime::Runtime::new().unwrap();
c.bench_function("bf_725760000_1e-5_uncontested_tokio_mutex_check", |b| {
b.to_async(&runtime).iter_batched(
|| {
let mut rng = thread_rng();
let mut reply_tag = [0; REPLAY_TAG_SIZE];
rng.fill(&mut reply_tag);
reply_tag
},
async |replay_tag| {
black_box(bloomfilter.lock().await.check_and_set(&replay_tag));
},
BatchSize::SmallInput,
)
});
}
criterion_group!(
nym_node_benches,
uncontested_bloomfilter_check,
uncontested_bloomfilter_check_with_exclusive_mutex,
uncontested_bloomfilter_check_with_exclusive_tokio_mutex
);
// TODO: somehow bench heavily contested cases...
criterion_main!(nym_node_benches);
+15
View File
@@ -46,6 +46,13 @@ impl MixingStats {
.store(update_timestamp, Ordering::Release);
}
pub fn ingress_replayed_packet(&self, source: IpAddr) {
self.ingress
.replayed_packets_received
.fetch_add(1, Ordering::Relaxed);
self.ingress.senders.entry(source).or_default().replayed += 1;
}
pub fn ingress_malformed_packet(&self, source: IpAddr) {
self.ingress
.malformed_packets_received
@@ -197,6 +204,7 @@ pub struct IngressRecipientStats {
pub forward_packets: IngressPacketsStats,
pub final_hop_packets: IngressPacketsStats,
pub malformed: usize,
pub replayed: usize,
}
#[derive(Debug, Default, Copy, Clone, Hash, PartialEq, Eq)]
@@ -232,6 +240,9 @@ pub struct IngressMixingStats {
// packets that failed to get unwrapped
malformed_packets_received: AtomicUsize,
// packets that were already received and processed before
replayed_packets_received: AtomicUsize,
// (forward) packets that had invalid, i.e. too large, delays
excessive_delay_packets: AtomicUsize,
@@ -253,6 +264,10 @@ impl IngressMixingStats {
self.final_hop_packets_received.load(Ordering::Relaxed)
}
pub fn replayed_packets_received(&self) -> usize {
self.replayed_packets_received.load(Ordering::Relaxed)
}
pub fn malformed_packets_received(&self) -> usize {
self.malformed_packets_received.load(Ordering::Relaxed)
}
@@ -39,6 +39,9 @@ pub enum PrometheusMetric {
#[strum(props(help = "The number of ingress final hop sphinx packets received"))]
MixnetIngressFinalHopPacketsReceived,
#[strum(props(help = "The number of ingress replayed sphinx packets received"))]
MixnetIngressReplayedPacketsReceived,
#[strum(props(help = "The number of ingress malformed sphinx packets received"))]
MixnetIngressMalformedPacketsReceived,
@@ -208,6 +211,9 @@ impl PrometheusMetric {
PrometheusMetric::MixnetIngressFinalHopPacketsReceived => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetIngressReplayedPacketsReceived => {
Metric::new_int_gauge(&name, help)
}
PrometheusMetric::MixnetIngressMalformedPacketsReceived => {
Metric::new_int_gauge(&name, help)
}
@@ -382,7 +388,7 @@ mod tests {
// a sanity check for anyone adding new metrics. if this test fails,
// make sure any methods on `PrometheusMetric` enum don't need updating
// or require custom Display impl
assert_eq!(38, PrometheusMetric::COUNT)
assert_eq!(39, PrometheusMetric::COUNT)
}
#[test]
@@ -40,6 +40,9 @@ pub mod packets {
// packets that failed to get unwrapped
pub malformed_packets_received: usize,
// packets that were already received and processed before
pub replayed_packets_received: usize,
// (forward) packets that had invalid, i.e. too large, delays
pub excessive_delay_packets: usize,
+1 -1
View File
@@ -167,7 +167,7 @@ impl Args {
)
.with_host(self.host.build_config_section())
.with_http(self.http.build_config_section())
.with_mixnet(self.mixnet.build_config_section())
.with_mixnet(self.mixnet.build_config_section(&data_dir))
.with_wireguard(self.wireguard.build_config_section(&data_dir))
.with_storage_paths(NymNodePaths::new(&data_dir))
.with_verloc(self.verloc.build_config_section())
+13 -2
View File
@@ -220,12 +220,20 @@ pub(crate) struct MixnetArgs {
env = NYMNODE_UNSAFE_DISABLE_NOISE
)]
pub(crate) unsafe_disable_noise: bool,
/// Specifies whether this node should **NOT** be using replay protection
#[clap(
hide = true,
long,
env = NYMNODE_UNSAFE_DISABLE_REPLAY_PROTECTION
)]
pub(crate) unsafe_disable_replay_protection: bool,
}
impl MixnetArgs {
// TODO: could we perhaps make a clap error here and call `safe_exit` instead?
pub(crate) fn build_config_section(self) -> config::Mixnet {
self.override_config_section(config::Mixnet::default())
pub(crate) fn build_config_section<P: AsRef<Path>>(self, data_dir: P) -> config::Mixnet {
self.override_config_section(config::Mixnet::new_default(data_dir))
}
pub(crate) fn override_config_section(self, mut section: config::Mixnet) -> config::Mixnet {
@@ -244,6 +252,9 @@ impl MixnetArgs {
if self.unsafe_disable_noise {
section.debug.unsafe_disable_noise = true
}
if self.unsafe_disable_replay_protection {
section.replay_protection.debug.unsafe_disabled = true
}
section
}
}
+181 -6
View File
@@ -1,11 +1,12 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::persistence::NymNodePaths;
use crate::config::persistence::{NymNodePaths, ReplayProtectionPaths};
use crate::config::template::CONFIG_TEMPLATE;
use crate::error::NymNodeError;
use celes::Country;
use clap::ValueEnum;
use human_repr::HumanCount;
use nym_bin_common::logging::LoggingSettings;
use nym_config::defaults::{
mainnet, var_names, DEFAULT_MIX_LISTENING_PORT, DEFAULT_NYM_NODE_HTTP_PORT,
@@ -26,6 +27,7 @@ use std::fmt::{Display, Formatter};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::time::Duration;
use sysinfo::System;
use tracing::{debug, error};
use url::Url;
@@ -42,6 +44,7 @@ pub mod upgrade_helpers;
pub use crate::config::gateway_tasks::GatewayTasksConfig;
pub use crate::config::metrics::MetricsConfig;
pub use crate::config::service_providers::ServiceProvidersConfig;
use crate::node::replay_protection::{bitmap_size, items_in_bloomfilter};
const DEFAULT_NYMNODES_DIR: &str = "nym-nodes";
@@ -264,7 +267,9 @@ impl ConfigBuilder {
modes: self.modes,
host: self.host.unwrap_or_default(),
http: self.http.unwrap_or_default(),
mixnet: self.mixnet.unwrap_or_default(),
mixnet: self
.mixnet
.unwrap_or_else(|| Mixnet::new_default(&self.data_dir)),
verloc: self.verloc.unwrap_or_default(),
wireguard: self
.wireguard
@@ -417,6 +422,12 @@ impl Config {
pub fn read_from_toml_file<P: AsRef<Path>>(path: P) -> Result<Self, NymNodeError> {
Self::read_from_path(path)
}
pub fn validate(&self) -> Result<(), NymNodeError> {
self.mixnet.validate()?;
Ok(())
}
}
// TODO: this is very much a WIP. we need proper ssl certificate support here
@@ -496,7 +507,6 @@ impl Default for Http {
}
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
#[serde(default)]
#[serde(deny_unknown_fields)]
pub struct Mixnet {
/// Address this node will bind to for listening for mixnet packets
@@ -516,13 +526,35 @@ pub struct Mixnet {
/// Addresses to nyxd which the node uses to interact with the nyx chain.
pub nyxd_urls: Vec<Url>,
/// Settings for controlling replay detection
pub replay_protection: ReplayProtection,
#[serde(default)]
pub debug: MixnetDebug,
}
impl Mixnet {
pub fn validate(&self) -> Result<(), NymNodeError> {
if self.nym_api_urls.is_empty() {
return Err(NymNodeError::config_validation_failure(
"no nym api urls provided",
));
}
if self.nyxd_urls.is_empty() {
return Err(NymNodeError::config_validation_failure(
"no nyxd urls provided",
));
}
self.replay_protection.validate()?;
Ok(())
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
#[serde(default)]
#[serde(deny_unknown_fields)]
pub struct MixnetDebug {
/// Specifies the duration of time this node is willing to delay a forward packet for.
#[serde(with = "humantime_serde")]
@@ -549,6 +581,148 @@ pub struct MixnetDebug {
pub unsafe_disable_noise: bool,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
pub struct ReplayProtection {
/// Paths for current bloomfilters
pub storage_paths: persistence::ReplayProtectionPaths,
#[serde(default)]
pub debug: ReplayProtectionDebug,
}
impl ReplayProtection {
pub fn new_default<P: AsRef<Path>>(data_dir: P) -> Self {
ReplayProtection {
storage_paths: ReplayProtectionPaths::new(data_dir),
debug: Default::default(),
}
}
}
impl ReplayProtection {
pub fn validate(&self) -> Result<(), NymNodeError> {
self.debug.validate()?;
Ok(())
}
}
#[derive(Debug, Copy, Clone, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct ReplayProtectionDebug {
/// Specifies whether this node should **NOT** use replay protection
pub unsafe_disabled: bool,
/// How long the processing task is willing to skip mutex acquisition before it will block the thread
/// until it actually obtains it
pub maximum_replay_detection_deferral: Duration,
/// How many packets the processing task is willing to queue before it will block the thread
/// until it obtains the mutex
pub maximum_replay_detection_pending_packets: usize,
/// Probability of false positives, fraction between 0 and 1 or a number indicating 1-in-p
pub false_positive_rate: f64,
/// Defines initial expected number of packets this node will process a second,
/// so that an initial bloomfilter could be established.
/// As the node is running and BF are cleared, the value will be adjusted dynamically
pub initial_expected_packets_per_second: usize,
/// Defines minimum expected number of packets this node will process a second
/// when used for calculating the BF size after reset.
/// This is to avoid degenerate cases where node receives 0 packets (because say it's misconfigured)
/// and it constructs an empty bloomfilter.
pub bloomfilter_minimum_packets_per_second_size: usize,
/// Specifies the amount the bloomfilter size is going to get multiplied by after each reset.
/// It's performed in case the traffic rates increase before the next bloomfilter update.
pub bloomfilter_size_multiplier: f64,
// NOTE: this field is temporary until replay detection bloomfilter rotation is tied
// to key rotation
/// Specifies how often the bloomfilter is cleared
#[serde(with = "humantime_serde")]
pub bloomfilter_reset_rate: Duration,
/// Specifies how often the bloomfilter is flushed to disk for recovery in case of a crash
#[serde(with = "humantime_serde")]
pub bloomfilter_disk_flushing_rate: Duration,
}
impl ReplayProtectionDebug {
pub const DEFAULT_MAXIMUM_REPLAY_DETECTION_DEFERRAL: Duration = Duration::from_millis(50);
pub const DEFAULT_MAXIMUM_REPLAY_DETECTION_PENDING_PACKETS: usize = 100;
// 12% (completely arbitrary)
pub const DEFAULT_BLOOMFILTER_SIZE_MULTIPLIER: f64 = 1.12;
// 10^-5
pub const DEFAULT_REPLAY_DETECTION_FALSE_POSITIVE_RATE: f64 = 1e-5;
// 25h (key rotation will be happening every 24h + 1h of overlap)
pub const DEFAULT_REPLAY_DETECTION_BF_RESET_RATE: Duration = Duration::from_secs(25 * 60 * 60);
// we must have some reasonable balance between losing values and trashing the disk.
// since on average HDD it would take ~30s to save a 2GB bloomfilter
pub const DEFAULT_BF_DISK_FLUSHING_RATE: Duration = Duration::from_secs(10 * 60);
// this value will have to be adjusted in the future
pub const DEFAULT_INITIAL_EXPECTED_PACKETS_PER_SECOND: usize = 2000;
pub const DEFAULT_BLOOMFILTER_MINIMUM_PACKETS_PER_SECOND_SIZE: usize = 200;
pub fn validate(&self) -> Result<(), NymNodeError> {
if self.false_positive_rate >= 1.0 || self.false_positive_rate <= 0.0 {
return Err(NymNodeError::config_validation_failure(
"false positive rate for replay detection can't be larger than (or equal to) 1 or smaller than (or equal to) 0",
));
}
let items_in_filter = items_in_bloomfilter(
self.bloomfilter_reset_rate,
self.initial_expected_packets_per_second,
);
let bitmap_size = bitmap_size(self.false_positive_rate, items_in_filter);
let bloomfilter_size = bitmap_size / 8;
let mut sys_info = System::new();
sys_info.refresh_memory();
// we'll need 2x size of the bloomfilter
// as during key transition we'll have to simultaneously use two filters
// plus we also need to make a memcopy during disk flush
let required_memory = 2 * bloomfilter_size;
let memory = sys_info.available_memory();
if (memory as usize) < required_memory {
return Err(NymNodeError::config_validation_failure(
format!("system does not have sufficient memory to allocate required replay protection bloomfilters. {} is available whilst at least {} is needed",memory.human_count_bytes(), required_memory.human_count_bytes())));
}
Ok(())
}
}
impl Default for ReplayProtectionDebug {
fn default() -> Self {
ReplayProtectionDebug {
unsafe_disabled: false,
maximum_replay_detection_deferral: Self::DEFAULT_MAXIMUM_REPLAY_DETECTION_DEFERRAL,
maximum_replay_detection_pending_packets:
Self::DEFAULT_MAXIMUM_REPLAY_DETECTION_PENDING_PACKETS,
false_positive_rate: Self::DEFAULT_REPLAY_DETECTION_FALSE_POSITIVE_RATE,
initial_expected_packets_per_second: Self::DEFAULT_INITIAL_EXPECTED_PACKETS_PER_SECOND,
bloomfilter_minimum_packets_per_second_size:
Self::DEFAULT_BLOOMFILTER_MINIMUM_PACKETS_PER_SECOND_SIZE,
bloomfilter_size_multiplier: Self::DEFAULT_BLOOMFILTER_SIZE_MULTIPLIER,
bloomfilter_reset_rate: Self::DEFAULT_REPLAY_DETECTION_BF_RESET_RATE,
bloomfilter_disk_flushing_rate: Self::DEFAULT_BF_DISK_FLUSHING_RATE,
}
}
}
impl MixnetDebug {
// given that genuine clients are using mean delay of 50ms,
// the probability of them delaying for over 10s is 10^-87
@@ -574,8 +748,8 @@ impl Default for MixnetDebug {
}
}
impl Default for Mixnet {
fn default() -> Self {
impl Mixnet {
pub fn new_default<P: AsRef<Path>>(data_dir: P) -> Self {
// SAFETY:
// our hardcoded values should always be valid
#[allow(clippy::expect_used)]
@@ -598,6 +772,7 @@ impl Default for Mixnet {
announce_port: None,
nym_api_urls,
nyxd_urls,
replay_protection: ReplayProtection::new_default(data_dir),
debug: Default::default(),
}
}
+2
View File
@@ -8,6 +8,7 @@ mod old_config_v4;
mod old_config_v5;
mod old_config_v6;
mod old_config_v7;
mod old_config_v8;
pub use old_config_v1::try_upgrade_config_v1;
pub use old_config_v2::try_upgrade_config_v2;
@@ -16,3 +17,4 @@ pub use old_config_v4::try_upgrade_config_v4;
pub use old_config_v5::try_upgrade_config_v5;
pub use old_config_v6::try_upgrade_config_v6;
pub use old_config_v7::try_upgrade_config_v7;
pub use old_config_v8::try_upgrade_config_v8;
+41 -340
View File
@@ -1,22 +1,20 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
#![allow(dead_code)]
use crate::config::authenticator::{Authenticator, AuthenticatorDebug};
use crate::config::gateway_tasks::ZkNymTicketHandlerDebug;
use crate::config::service_providers::{
IpPacketRouter, IpPacketRouterDebug, NetworkRequester, NetworkRequesterDebug,
use crate::config::old_configs::old_config_v8::{
AuthenticatorDebugV8, AuthenticatorPathsV8, AuthenticatorV8, ConfigV8,
GatewayTasksConfigDebugV8, GatewayTasksConfigV8, GatewayTasksPathsV8, HostV8, HttpV8,
IpPacketRouterDebugV8, IpPacketRouterPathsV8, IpPacketRouterV8, KeysPathsV8, LoggingSettingsV8,
MixnetDebugV8, MixnetV8, NetworkRequesterDebugV8, NetworkRequesterPathsV8, NetworkRequesterV8,
NodeModesV8, NymNodePathsV8, ServiceProvidersConfigDebugV8, ServiceProvidersConfigV8,
ServiceProvidersPathsV8, VerlocDebugV8, VerlocV8, WireguardPathsV8, WireguardV8,
ZkNymTicketHandlerDebugV8,
};
use crate::config::*;
use crate::error::{EntryGatewayError, NymNodeError};
use crate::error::NymNodeError;
use celes::Country;
use clap::ValueEnum;
use gateway_tasks::DEFAULT_WS_PORT;
use nym_client_core_config_types::{
disk_persistence::{ClientKeysPaths, CommonClientPaths},
DebugConfig as ClientDebugConfig,
};
use nym_client_core_config_types::DebugConfig as ClientDebugConfig;
use nym_config::defaults::{mainnet, var_names};
use nym_config::helpers::inaddr_any;
use nym_config::{
@@ -24,17 +22,13 @@ use nym_config::{
serde_helpers::{de_maybe_port, de_maybe_stringified},
};
use nym_config::{parse_urls, read_config_from_toml_file};
use persistence::*;
use serde::{Deserialize, Serialize};
use std::fs::create_dir_all;
use std::env;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::time::Duration;
use std::{env, fs, io};
use tracing::info;
use tracing::{debug, instrument};
use url::Url;
use zeroize::Zeroizing;
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
@@ -132,14 +126,6 @@ impl From<&[NodeModeV7]> for NodeModesV7 {
}
impl NodeModesV7 {
pub fn any_enabled(&self) -> bool {
self.mixnode || self.entry || self.exit
}
pub fn standalone_exit(&self) -> bool {
!self.mixnode && !self.entry && self.exit
}
pub fn with_mode(&mut self, mode: NodeModeV7) -> &mut Self {
match mode {
NodeModeV7::Mixnode => self.with_mixnode(),
@@ -149,10 +135,6 @@ impl NodeModesV7 {
}
}
pub fn expects_final_hop_traffic(&self) -> bool {
self.entry || self.exit
}
pub fn with_mixnode(&mut self) -> &mut Self {
self.mixnode = true;
self
@@ -320,45 +302,6 @@ pub struct KeysPathsV7 {
pub public_x25519_noise_key_file: PathBuf,
}
impl KeysPathsV7 {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
let data_dir = data_dir.as_ref();
KeysPathsV7 {
private_ed25519_identity_key_file: data_dir
.join(DEFAULT_ED25519_PRIVATE_IDENTITY_KEY_FILENAME),
public_ed25519_identity_key_file: data_dir
.join(DEFAULT_ED25519_PUBLIC_IDENTITY_KEY_FILENAME),
private_x25519_sphinx_key_file: data_dir
.join(DEFAULT_X25519_PRIVATE_SPHINX_KEY_FILENAME),
public_x25519_sphinx_key_file: data_dir.join(DEFAULT_X25519_PUBLIC_SPHINX_KEY_FILENAME),
private_x25519_noise_key_file: data_dir.join(DEFAULT_X25519_PRIVATE_NOISE_KEY_FILENAME),
public_x25519_noise_key_file: data_dir.join(DEFAULT_X25519_PUBLIC_NOISE_KEY_FILENAME),
}
}
pub fn ed25519_identity_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_ed25519_identity_key_file,
&self.public_ed25519_identity_key_file,
)
}
pub fn x25519_sphinx_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_x25519_sphinx_key_file,
&self.public_x25519_sphinx_key_file,
)
}
pub fn x25519_noise_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_x25519_noise_key_file,
&self.public_x25519_noise_key_file,
)
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct NymNodePathsV7 {
@@ -701,56 +644,6 @@ pub struct NetworkRequesterPathsV7 {
// it's possible we might have to add credential storage here for return tickets
}
impl NetworkRequesterPathsV7 {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
let data_dir = data_dir.as_ref();
NetworkRequesterPathsV7 {
private_ed25519_identity_key_file: data_dir
.join(DEFAULT_ED25519_NR_PRIVATE_IDENTITY_KEY_FILENAME),
public_ed25519_identity_key_file: data_dir
.join(DEFAULT_ED25519_NR_PUBLIC_IDENTITY_KEY_FILENAME),
private_x25519_diffie_hellman_key_file: data_dir
.join(DEFAULT_X25519_NR_PRIVATE_DH_KEY_FILENAME),
public_x25519_diffie_hellman_key_file: data_dir
.join(DEFAULT_X25519_NR_PUBLIC_DH_KEY_FILENAME),
ack_key_file: data_dir.join(DEFAULT_NR_ACK_KEY_FILENAME),
reply_surb_database: data_dir.join(DEFAULT_NR_REPLY_SURB_DB_FILENAME),
gateway_registrations: data_dir.join(DEFAULT_NR_GATEWAYS_DB_FILENAME),
}
}
pub fn to_common_client_paths(&self) -> CommonClientPaths {
CommonClientPaths {
keys: ClientKeysPaths {
private_identity_key_file: self.private_ed25519_identity_key_file.clone(),
public_identity_key_file: self.public_ed25519_identity_key_file.clone(),
private_encryption_key_file: self.private_x25519_diffie_hellman_key_file.clone(),
public_encryption_key_file: self.public_x25519_diffie_hellman_key_file.clone(),
ack_key_file: self.ack_key_file.clone(),
},
gateway_registrations: self.gateway_registrations.clone(),
// not needed for embedded providers
credentials_database: Default::default(),
reply_surb_database: self.reply_surb_database.clone(),
}
}
pub fn ed25519_identity_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_ed25519_identity_key_file,
&self.public_ed25519_identity_key_file,
)
}
pub fn x25519_diffie_hellman_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_x25519_diffie_hellman_key_file,
&self.public_x25519_diffie_hellman_key_file,
)
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IpPacketRouterPathsV7 {
@@ -781,56 +674,6 @@ pub struct IpPacketRouterPathsV7 {
// it's possible we might have to add credential storage here for return tickets
}
impl IpPacketRouterPathsV7 {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
let data_dir = data_dir.as_ref();
IpPacketRouterPathsV7 {
private_ed25519_identity_key_file: data_dir
.join(DEFAULT_ED25519_IPR_PRIVATE_IDENTITY_KEY_FILENAME),
public_ed25519_identity_key_file: data_dir
.join(DEFAULT_ED25519_IPR_PUBLIC_IDENTITY_KEY_FILENAME),
private_x25519_diffie_hellman_key_file: data_dir
.join(DEFAULT_X25519_IPR_PRIVATE_DH_KEY_FILENAME),
public_x25519_diffie_hellman_key_file: data_dir
.join(DEFAULT_X25519_IPR_PUBLIC_DH_KEY_FILENAME),
ack_key_file: data_dir.join(DEFAULT_IPR_ACK_KEY_FILENAME),
reply_surb_database: data_dir.join(DEFAULT_IPR_REPLY_SURB_DB_FILENAME),
gateway_registrations: data_dir.join(DEFAULT_IPR_GATEWAYS_DB_FILENAME),
}
}
pub fn to_common_client_paths(&self) -> CommonClientPaths {
CommonClientPaths {
keys: ClientKeysPaths {
private_identity_key_file: self.private_ed25519_identity_key_file.clone(),
public_identity_key_file: self.public_ed25519_identity_key_file.clone(),
private_encryption_key_file: self.private_x25519_diffie_hellman_key_file.clone(),
public_encryption_key_file: self.public_x25519_diffie_hellman_key_file.clone(),
ack_key_file: self.ack_key_file.clone(),
},
gateway_registrations: self.gateway_registrations.clone(),
// not needed for embedded providers
credentials_database: Default::default(),
reply_surb_database: self.reply_surb_database.clone(),
}
}
pub fn ed25519_identity_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_ed25519_identity_key_file,
&self.public_ed25519_identity_key_file,
)
}
pub fn x25519_diffie_hellman_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_x25519_diffie_hellman_key_file,
&self.public_x25519_diffie_hellman_key_file,
)
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct AuthenticatorPathsV7 {
@@ -861,56 +704,6 @@ pub struct AuthenticatorPathsV7 {
// it's possible we might have to add credential storage here for return tickets
}
impl AuthenticatorPathsV7 {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
let data_dir = data_dir.as_ref();
AuthenticatorPathsV7 {
private_ed25519_identity_key_file: data_dir
.join(DEFAULT_ED25519_AUTH_PRIVATE_IDENTITY_KEY_FILENAME),
public_ed25519_identity_key_file: data_dir
.join(DEFAULT_ED25519_AUTH_PUBLIC_IDENTITY_KEY_FILENAME),
private_x25519_diffie_hellman_key_file: data_dir
.join(DEFAULT_X25519_AUTH_PRIVATE_DH_KEY_FILENAME),
public_x25519_diffie_hellman_key_file: data_dir
.join(DEFAULT_X25519_AUTH_PUBLIC_DH_KEY_FILENAME),
ack_key_file: data_dir.join(DEFAULT_AUTH_ACK_KEY_FILENAME),
reply_surb_database: data_dir.join(DEFAULT_AUTH_REPLY_SURB_DB_FILENAME),
gateway_registrations: data_dir.join(DEFAULT_AUTH_GATEWAYS_DB_FILENAME),
}
}
pub fn to_common_client_paths(&self) -> CommonClientPaths {
CommonClientPaths {
keys: ClientKeysPaths {
private_identity_key_file: self.private_ed25519_identity_key_file.clone(),
public_identity_key_file: self.public_ed25519_identity_key_file.clone(),
private_encryption_key_file: self.private_x25519_diffie_hellman_key_file.clone(),
public_encryption_key_file: self.public_x25519_diffie_hellman_key_file.clone(),
ack_key_file: self.ack_key_file.clone(),
},
gateway_registrations: self.gateway_registrations.clone(),
// not needed for embedded providers
credentials_database: Default::default(),
reply_surb_database: self.reply_surb_database.clone(),
}
}
pub fn ed25519_identity_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_ed25519_identity_key_file,
&self.public_ed25519_identity_key_file,
)
}
pub fn x25519_diffie_hellman_storage_paths(&self) -> nym_pemstore::KeyPairPath {
nym_pemstore::KeyPairPath::new(
&self.private_x25519_diffie_hellman_key_file,
&self.public_x25519_diffie_hellman_key_file,
)
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ExitGatewayPathsV7 {
@@ -1106,53 +899,6 @@ pub struct GatewayTasksPathsV7 {
pub cosmos_mnemonic: PathBuf,
}
impl GatewayTasksPathsV7 {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
GatewayTasksPathsV7 {
clients_storage: data_dir.as_ref().join(DEFAULT_CLIENTS_STORAGE_FILENAME),
stats_storage: data_dir.as_ref().join(DEFAULT_STATS_STORAGE_FILENAME),
cosmos_mnemonic: data_dir.as_ref().join(DEFAULT_MNEMONIC_FILENAME),
}
}
pub fn load_mnemonic_from_file(&self) -> Result<Zeroizing<bip39::Mnemonic>, EntryGatewayError> {
let stringified =
Zeroizing::new(fs::read_to_string(&self.cosmos_mnemonic).map_err(|source| {
EntryGatewayError::MnemonicLoadFailure {
path: self.cosmos_mnemonic.clone(),
source,
}
})?);
Ok(Zeroizing::new(bip39::Mnemonic::parse::<&str>(
stringified.as_ref(),
)?))
}
pub fn save_mnemonic_to_file(
&self,
mnemonic: &bip39::Mnemonic,
) -> Result<(), EntryGatewayError> {
// wrapper for io errors
fn _save_to_file(path: &Path, mnemonic: &bip39::Mnemonic) -> io::Result<()> {
if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}
info!("saving entry gateway mnemonic to '{}'", path.display());
let stringified = Zeroizing::new(mnemonic.to_string());
fs::write(path, &stringified)
}
_save_to_file(&self.cosmos_mnemonic, mnemonic).map_err(|source| {
EntryGatewayError::MnemonicSaveFailure {
path: self.cosmos_mnemonic.clone(),
source,
}
})
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StaleMessageDebugV7 {
/// Specifies how often the clean-up task should check for stale data.
@@ -1258,19 +1004,6 @@ pub struct GatewayTasksConfigV7 {
pub debug: GatewayTasksConfigDebugV7,
}
impl GatewayTasksConfigV7 {
pub fn new_default<P: AsRef<Path>>(data_dir: P) -> Self {
GatewayTasksConfigV7 {
storage_paths: GatewayTasksPathsV7::new(data_dir),
enforce_zk_nyms: false,
bind_address: SocketAddr::new(in6addr_any_init(), DEFAULT_WS_PORT),
announce_ws_port: None,
announce_wss_port: None,
debug: Default::default(),
}
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ServiceProvidersPathsV7 {
@@ -1288,19 +1021,6 @@ pub struct ServiceProvidersPathsV7 {
pub authenticator: AuthenticatorPathsV7,
}
impl ServiceProvidersPathsV7 {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
let data_dir = data_dir.as_ref();
ServiceProvidersPathsV7 {
clients_storage: data_dir.join(DEFAULT_CLIENTS_STORAGE_FILENAME),
stats_storage: data_dir.join(DEFAULT_STATS_STORAGE_FILENAME),
network_requester: NetworkRequesterPathsV7::new(data_dir),
ip_packet_router: IpPacketRouterPathsV7::new(data_dir),
authenticator: AuthenticatorPathsV7::new(data_dir),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ServiceProvidersConfigDebugV7 {
@@ -1342,25 +1062,6 @@ pub struct ServiceProvidersConfigV7 {
pub debug: ServiceProvidersConfigDebugV7,
}
impl ServiceProvidersConfigV7 {
pub fn new_default<P: AsRef<Path>>(data_dir: P) -> Self {
#[allow(clippy::expect_used)]
// SAFETY:
// we expect our default values to be well-formed
ServiceProvidersConfigV7 {
storage_paths: ServiceProvidersPathsV7::new(data_dir),
open_proxy: false,
upstream_exit_policy_url: mainnet::EXIT_POLICY_URL
.parse()
.expect("invalid default exit policy URL"),
network_requester: Default::default(),
ip_packet_router: Default::default(),
authenticator: Default::default(),
debug: Default::default(),
}
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct MetricsConfigV7 {
@@ -1498,7 +1199,7 @@ impl ConfigV7 {
pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
path: P,
prev_config: Option<ConfigV7>,
) -> Result<Config, NymNodeError> {
) -> Result<ConfigV8, NymNodeError> {
debug!("attempting to load v7 config...");
let old_cfg = if let Some(prev_config) = prev_config {
@@ -1507,20 +1208,20 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
ConfigV7::read_from_path(&path)?
};
let cfg = Config {
let cfg = ConfigV8 {
save_path: old_cfg.save_path,
id: old_cfg.id,
modes: NodeModes {
modes: NodeModesV8 {
mixnode: old_cfg.modes.mixnode,
entry: old_cfg.modes.entry,
exit: old_cfg.modes.exit,
},
host: Host {
host: HostV8 {
public_ips: old_cfg.host.public_ips,
hostname: old_cfg.host.hostname,
location: old_cfg.host.location,
},
mixnet: Mixnet {
mixnet: MixnetV8 {
bind_address: {
if old_cfg.mixnet.bind_address.ip().is_unspecified() {
SocketAddr::new(in6addr_any_init(), old_cfg.mixnet.bind_address.port())
@@ -1531,7 +1232,7 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
announce_port: old_cfg.mixnet.announce_port,
nym_api_urls: old_cfg.mixnet.nym_api_urls,
nyxd_urls: old_cfg.mixnet.nyxd_urls,
debug: MixnetDebug {
debug: MixnetDebugV8 {
maximum_forward_packet_delay: old_cfg.mixnet.debug.maximum_forward_packet_delay,
packet_forwarding_initial_backoff: old_cfg
.mixnet
@@ -1546,8 +1247,8 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
unsafe_disable_noise: old_cfg.mixnet.debug.unsafe_disable_noise,
},
},
storage_paths: NymNodePaths {
keys: KeysPaths {
storage_paths: NymNodePathsV8 {
keys: KeysPathsV8 {
private_ed25519_identity_key_file: old_cfg
.storage_paths
.keys
@@ -1575,7 +1276,7 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
},
description: old_cfg.storage_paths.description,
},
http: Http {
http: HttpV8 {
bind_address: {
if old_cfg.http.bind_address.ip().is_unspecified() {
SocketAddr::new(in6addr_any_init(), old_cfg.http.bind_address.port())
@@ -1590,7 +1291,7 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
expose_crypto_hardware: old_cfg.http.expose_crypto_hardware,
..Default::default()
},
verloc: Verloc {
verloc: VerlocV8 {
bind_address: {
if old_cfg.verloc.bind_address.ip().is_unspecified() {
SocketAddr::new(in6addr_any_init(), old_cfg.verloc.bind_address.port())
@@ -1599,7 +1300,7 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
}
},
announce_port: old_cfg.verloc.announce_port,
debug: VerlocDebug {
debug: VerlocDebugV8 {
packets_per_node: old_cfg.verloc.debug.packets_per_node,
connection_timeout: old_cfg.verloc.debug.connection_timeout,
packet_timeout: old_cfg.verloc.debug.packet_timeout,
@@ -1609,7 +1310,7 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
retry_timeout: old_cfg.verloc.debug.retry_timeout,
},
},
wireguard: Wireguard {
wireguard: WireguardV8 {
enabled: old_cfg.wireguard.enabled,
bind_address: {
if old_cfg.wireguard.bind_address.ip().is_unspecified() {
@@ -1623,7 +1324,7 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
announced_port: old_cfg.wireguard.announced_port,
private_network_prefix_v4: old_cfg.wireguard.private_network_prefix_v4,
private_network_prefix_v6: old_cfg.wireguard.private_network_prefix_v6,
storage_paths: WireguardPaths {
storage_paths: WireguardPathsV8 {
private_diffie_hellman_key_file: old_cfg
.wireguard
.storage_paths
@@ -1634,8 +1335,8 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
.public_diffie_hellman_key_file,
},
},
gateway_tasks: GatewayTasksConfig {
storage_paths: GatewayTasksPaths {
gateway_tasks: GatewayTasksConfigV8 {
storage_paths: GatewayTasksPathsV8 {
clients_storage: old_cfg.gateway_tasks.storage_paths.clients_storage,
stats_storage: old_cfg.gateway_tasks.storage_paths.stats_storage,
cosmos_mnemonic: old_cfg.gateway_tasks.storage_paths.cosmos_mnemonic,
@@ -1653,9 +1354,9 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
},
announce_ws_port: old_cfg.gateway_tasks.announce_ws_port,
announce_wss_port: old_cfg.gateway_tasks.announce_wss_port,
debug: gateway_tasks::Debug {
debug: GatewayTasksConfigDebugV8 {
message_retrieval_limit: old_cfg.gateway_tasks.debug.message_retrieval_limit,
zk_nym_tickets: ZkNymTicketHandlerDebug {
zk_nym_tickets: ZkNymTicketHandlerDebugV8 {
revocation_bandwidth_penalty: old_cfg
.gateway_tasks
.debug
@@ -1681,11 +1382,11 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
..Default::default()
},
},
service_providers: ServiceProvidersConfig {
storage_paths: ServiceProvidersPaths {
service_providers: ServiceProvidersConfigV8 {
storage_paths: ServiceProvidersPathsV8 {
clients_storage: old_cfg.service_providers.storage_paths.clients_storage,
stats_storage: old_cfg.service_providers.storage_paths.stats_storage,
network_requester: NetworkRequesterPaths {
network_requester: NetworkRequesterPathsV8 {
private_ed25519_identity_key_file: old_cfg
.service_providers
.storage_paths
@@ -1722,7 +1423,7 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
.network_requester
.gateway_registrations,
},
ip_packet_router: IpPacketRouterPaths {
ip_packet_router: IpPacketRouterPathsV8 {
private_ed25519_identity_key_file: old_cfg
.service_providers
.storage_paths
@@ -1759,7 +1460,7 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
.ip_packet_router
.gateway_registrations,
},
authenticator: AuthenticatorPaths {
authenticator: AuthenticatorPathsV8 {
private_ed25519_identity_key_file: old_cfg
.service_providers
.storage_paths
@@ -1799,8 +1500,8 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
},
open_proxy: old_cfg.service_providers.open_proxy,
upstream_exit_policy_url: old_cfg.service_providers.upstream_exit_policy_url,
network_requester: NetworkRequester {
debug: NetworkRequesterDebug {
network_requester: NetworkRequesterV8 {
debug: NetworkRequesterDebugV8 {
enabled: old_cfg.service_providers.network_requester.debug.enabled,
disable_poisson_rate: old_cfg
.service_providers
@@ -1814,8 +1515,8 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
.client_debug,
},
},
ip_packet_router: IpPacketRouter {
debug: IpPacketRouterDebug {
ip_packet_router: IpPacketRouterV8 {
debug: IpPacketRouterDebugV8 {
enabled: old_cfg.service_providers.ip_packet_router.debug.enabled,
disable_poisson_rate: old_cfg
.service_providers
@@ -1829,8 +1530,8 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
.client_debug,
},
},
authenticator: Authenticator {
debug: AuthenticatorDebug {
authenticator: AuthenticatorV8 {
debug: AuthenticatorDebugV8 {
enabled: old_cfg.service_providers.authenticator.debug.enabled,
disable_poisson_rate: old_cfg
.service_providers
@@ -1840,12 +1541,12 @@ pub async fn try_upgrade_config_v7<P: AsRef<Path>>(
client_debug: old_cfg.service_providers.authenticator.debug.client_debug,
},
},
debug: service_providers::Debug {
debug: ServiceProvidersConfigDebugV8 {
message_retrieval_limit: old_cfg.service_providers.debug.message_retrieval_limit,
},
},
metrics: Default::default(),
logging: LoggingSettings {},
logging: LoggingSettingsV8 {},
debug: Default::default(),
};
Ok(cfg)
File diff suppressed because it is too large Load Diff
+35
View File
@@ -55,6 +55,12 @@ pub const DEFAULT_AUTH_GATEWAYS_DB_FILENAME: &str = "auth_gateways_info_store.sq
pub const DEFAULT_X25519_WG_DH_KEY_FILENAME: &str = "x25519_wg_dh";
pub const DEFAULT_X25519_WG_PUBLIC_DH_KEY_FILENAME: &str = "x25519_wg_dh.pub";
// Replay Detection
pub const DEFAULT_RD_BLOOMFILTER_SUBDIR: &str = "replay-detection";
pub const DEFAULT_RD_BLOOMFILTER_FILE_EXT: &str = "bloom";
pub const DEFAULT_RD_BLOOMFILTER_FLUSH_FILE_EXT: &str = "flush";
pub const CURRENT_RD_BLOOMFILTER_FILENAME: &str = "current";
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct NymNodePaths {
@@ -490,3 +496,32 @@ impl WireguardPaths {
)
}
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ReplayProtectionPaths {
/// Path to the directory storing currently used bloomfilter(s).
pub current_bloomfilters_directory: PathBuf,
}
impl ReplayProtectionPaths {
pub fn current_bloomfilter_filepath(&self) -> PathBuf {
self.current_bloomfilters_directory
.join(CURRENT_RD_BLOOMFILTER_FILENAME)
.with_extension(DEFAULT_RD_BLOOMFILTER_FILE_EXT)
}
pub fn current_bloomfilter_being_flushed_filepath(&self) -> PathBuf {
self.current_bloomfilters_directory
.join(CURRENT_RD_BLOOMFILTER_FILENAME)
.with_extension(DEFAULT_RD_BLOOMFILTER_FLUSH_FILE_EXT)
}
}
impl ReplayProtectionPaths {
pub fn new<P: AsRef<Path>>(data_dir: P) -> Self {
ReplayProtectionPaths {
current_bloomfilters_directory: data_dir.as_ref().join(DEFAULT_RD_BLOOMFILTER_SUBDIR),
}
}
}
+6
View File
@@ -65,6 +65,12 @@ nyxd_urls = [
{{#each mixnet.nyxd_urls }}'{{this}}',{{/each}}
]
[mixnet.replay_protection]
[mixnet.replay_protection.storage_paths]
# Path to the directory storing currently used bloomfilter(s).
current_bloomfilters_directory = '{{ mixnet.replay_protection.storage_paths.current_bloomfilters_directory }}'
# Storage paths to persistent nym-node data, such as its long term keys.
[storage_paths]
+5 -2
View File
@@ -15,7 +15,8 @@ async fn try_upgrade_config(path: &Path) -> Result<(), NymNodeError> {
let cfg = try_upgrade_config_v4(path, cfg).await.ok();
let cfg = try_upgrade_config_v5(path, cfg).await.ok();
let cfg = try_upgrade_config_v6(path, cfg).await.ok();
match try_upgrade_config_v7(path, cfg).await {
let cfg = try_upgrade_config_v7(path, cfg).await.ok();
match try_upgrade_config_v8(path, cfg).await {
Ok(cfg) => cfg.save(),
Err(e) => {
tracing::error!("Failed to finish upgrade - {e}");
@@ -35,5 +36,7 @@ pub async fn try_load_current_config<P: AsRef<Path>>(
}
try_upgrade_config(config_path.as_ref()).await?;
Config::read_from_toml_file(config_path)
let loaded = Config::read_from_toml_file(config_path)?;
loaded.validate()?;
Ok(loaded)
}
+1
View File
@@ -40,6 +40,7 @@ pub mod vars {
pub const NYMNODE_NYM_APIS_ARG: &str = "NYMNODE_NYM_APIS";
pub const NYMNODE_NYXD_URLS_ARG: &str = "NYMNODE_NYXD";
pub const NYMNODE_UNSAFE_DISABLE_NOISE: &str = "UNSAFE_DISABLE_NOISE";
pub const NYMNODE_UNSAFE_DISABLE_REPLAY_PROTECTION: &str = "UNSAFE_DISABLE_REPLAY_PROTECTION";
// wireguard:
pub const NYMNODE_WG_ENABLED_ARG: &str = "NYMNODE_WG_ENABLED";
+21
View File
@@ -82,6 +82,9 @@ pub enum NymNodeError {
source: io::Error,
},
#[error("failed to validate loaded config: {error}")]
ConfigValidationFailure { error: String },
#[error("the node description file is malformed: {source}")]
MalformedDescriptionFile {
#[source]
@@ -148,6 +151,12 @@ pub enum NymNodeError {
)]
InitialTopologyQueryFailure { source: ValidatorClientError },
#[error("experienced critical failure with the replay detection bloomfilter: {message}")]
BloomfilterFailure { message: &'static str },
#[error("failed to save/load the bloomfilter: {source} using path: {}", path.display())]
BloomfilterIoFailure { source: io::Error, path: PathBuf },
#[error(transparent)]
GatewayFailure(#[from] nym_gateway::GatewayError),
@@ -168,6 +177,18 @@ pub enum NymNodeError {
FailedUpgrade,
}
impl NymNodeError {
pub fn config_validation_failure<S: Into<String>>(error: S) -> Self {
NymNodeError::ConfigValidationFailure {
error: error.into(),
}
}
pub fn bloomfilter_failure(message: &'static str) -> Self {
NymNodeError::BloomfilterFailure { message }
}
}
#[derive(Debug, Error)]
pub enum EntryGatewayError {
#[error(transparent)]
@@ -38,6 +38,7 @@ fn build_response(metrics: &NymNodeMetrics) -> PacketsStats {
forward_hop_packets_received: metrics.mixnet.ingress.forward_hop_packets_received(),
final_hop_packets_received: metrics.mixnet.ingress.final_hop_packets_received(),
malformed_packets_received: metrics.mixnet.ingress.malformed_packets_received(),
replayed_packets_received: metrics.mixnet.ingress.replayed_packets_received(),
excessive_delay_packets: metrics.mixnet.ingress.excessive_delay_packets(),
forward_hop_packets_dropped: metrics.mixnet.ingress.forward_hop_packets_dropped(),
final_hop_packets_dropped: metrics.mixnet.ingress.final_hop_packets_dropped(),
@@ -64,6 +64,10 @@ impl OnUpdateMetricsHandler for PrometheusGlobalNodeMetricsRegistryUpdater {
MixnetIngressMalformedPacketsReceived,
self.metrics.mixnet.ingress.malformed_packets_received() as i64,
);
self.prometheus_wrapper.set(
MixnetIngressReplayedPacketsReceived,
self.metrics.mixnet.ingress.replayed_packets_received() as i64,
);
self.prometheus_wrapper.set(
MixnetIngressExcessiveDelayPacketsReceived,
self.metrics.mixnet.ingress.excessive_delay_packets() as i64,
+233 -32
View File
@@ -3,24 +3,69 @@
use crate::node::mixnet::shared::SharedData;
use futures::StreamExt;
use nym_metrics::nanos;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_framing::codec::NymCodec;
use nym_sphinx_framing::packet::FramedNymPacket;
use nym_sphinx_framing::processing::{
process_framed_packet, MixProcessingResultData, ProcessedFinalHop,
process_framed_packet, MixProcessingResult, MixProcessingResultData, PacketProcessingError,
PartiallyUnwrappedPacket, ProcessedFinalHop,
};
use nym_sphinx_types::Delay;
use nym_sphinx_types::{Delay, REPLAY_TAG_SIZE};
use std::mem;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
use tracing::{debug, error, instrument, trace};
use tracing::{debug, error, instrument, trace, warn};
struct PendingReplayCheckPackets {
packets: Vec<PartiallyUnwrappedPacket>,
last_acquired_mutex: Instant,
}
impl PendingReplayCheckPackets {
fn new() -> PendingReplayCheckPackets {
PendingReplayCheckPackets {
packets: vec![],
last_acquired_mutex: Instant::now(),
}
}
fn reset(&mut self, now: Instant) -> Vec<PartiallyUnwrappedPacket> {
self.last_acquired_mutex = now;
mem::take(&mut self.packets)
}
fn push(&mut self, now: Instant, packet: PartiallyUnwrappedPacket) {
if self.packets.is_empty() {
self.last_acquired_mutex = now;
}
self.packets.push(packet);
}
fn replay_tags(&self) -> Vec<&[u8; REPLAY_TAG_SIZE]> {
let mut replay_tags = Vec::with_capacity(self.packets.len());
for packet in &self.packets {
let Some(replay_tag) = packet.replay_tag() else {
error!(
"corrupted batch of {} packets - replay tag was missing",
self.packets.len()
);
return Vec::new();
};
replay_tags.push(replay_tag);
}
replay_tags
}
}
pub(crate) struct ConnectionHandler {
shared: SharedData,
mixnet_connection: Framed<TcpStream, NymCodec>,
remote_address: SocketAddr,
// packets pending for replay detection
pending_packets: PendingReplayCheckPackets,
}
impl Drop for ConnectionHandler {
@@ -45,6 +90,7 @@ impl ConnectionHandler {
shared: SharedData {
processing_config: shared.processing_config,
sphinx_keys: shared.sphinx_keys.clone(),
replay_protection_filter: shared.replay_protection_filter.clone(),
mixnet_forwarder: shared.mixnet_forwarder.clone(),
final_hop: shared.final_hop.clone(),
metrics: shared.metrics.clone(),
@@ -52,6 +98,7 @@ impl ConnectionHandler {
},
remote_address,
mixnet_connection: Framed::new(tcp_stream, NymCodec),
pending_packets: PendingReplayCheckPackets::new(),
}
}
@@ -60,9 +107,8 @@ impl ConnectionHandler {
/// the skew caused by being stuck in the channel queue.
/// This method also clamps the maximum allowed delay so that nobody could send a bunch of packets
/// with, for example, delays of 1 year thus causing denial of service
fn create_delay_target(&self, delay: Option<Delay>) -> Option<Instant> {
fn create_delay_target(&self, now: Instant, delay: Option<Delay>) -> Option<Instant> {
let delay = delay?.to_duration();
let now = Instant::now();
let delay = if delay > self.shared.processing_config.maximum_packet_delay {
self.shared.processing_config.maximum_packet_delay
@@ -77,14 +123,14 @@ impl ConnectionHandler {
Some(now + delay)
}
fn handle_forward_packet(&self, mix_packet: MixPacket, delay: Option<Delay>) {
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
if !self.shared.processing_config.forward_hop_processing_enabled {
trace!("this nym-node does not support forward hop packets");
self.shared.dropped_forward_packet(self.remote_address.ip());
return;
}
let forward_instant = self.create_delay_target(delay);
let forward_instant = self.create_delay_target(now, delay);
self.shared.forward_mix_packet(mix_packet, forward_instant);
}
@@ -128,33 +174,188 @@ impl ConnectionHandler {
self.shared.forward_ack_packet(final_hop_data.forward_ack);
}
#[instrument(skip(self, packet), level = "debug")]
async fn handle_received_nym_packet(&self, packet: FramedNymPacket) {
// TODO: here be replay attack detection with bloomfilters and all the fancy stuff
//
fn within_deferral_threshold(&self, now: Instant) -> bool {
let time_threshold = now
.saturating_duration_since(self.pending_packets.last_acquired_mutex)
<= self
.shared
.processing_config
.maximum_replay_detection_deferral;
nanos!("handle_received_nym_packet", {
// 1. attempt to unwrap the packet
let count_threshold = self.pending_packets.packets.len()
< self
.shared
.processing_config
.maximum_replay_detection_pending_packets;
// time threshold is ignored if we currently have 0 packets queued up
if self.pending_packets.packets.is_empty() {
return true;
}
trace!(
"within deferral time threshold: {time_threshold}, count threshold: {count_threshold}"
);
if !time_threshold {
warn!(
"{}: time failure - {}",
self.remote_address,
self.pending_packets.packets.len()
)
}
if !count_threshold {
warn!("{}, count failure", self.remote_address)
}
time_threshold && count_threshold
}
async fn handle_received_packet_with_replay_detection(
&mut self,
now: Instant,
packet: FramedNymPacket,
) {
// 1. derive and expand shared secret
// also check the header integrity
let partially_unwrapped = match PartiallyUnwrappedPacket::new(
packet,
self.shared.sphinx_keys.private_key().as_ref(),
) {
Ok(unwrapped) => unwrapped,
Err(err) => {
trace!("failed to process received mix packet: {err}");
self.shared
.metrics
.mixnet
.ingress_malformed_packet(self.remote_address.ip());
return;
}
};
self.pending_packets.push(now, partially_unwrapped);
// 2. check for packet replay
// 2.1 first try it without locking
if self.handle_pending_packets_batch_no_locking(now).await {
return;
}
// 2.2 if we're within deferral threshold, just leave it queued up for another call
if self.within_deferral_threshold(now) {
return;
}
// 2.3. otherwise block until we obtain the lock and clear the whole batch
self.handle_pending_packets_batch(now).await;
}
async fn handle_unwrapped_packet(
&self,
now: Instant,
unwrapped_packet: Result<MixProcessingResult, PacketProcessingError>,
) {
// 2. increment our favourite metrics stats
self.shared
.update_metrics(&unwrapped_packet, self.remote_address.ip());
// 3. forward the packet to the relevant sink (if enabled)
match unwrapped_packet {
Err(err) => trace!("failed to process received mix packet: {err}"),
Ok(processed_packet) => match processed_packet.processing_data {
MixProcessingResultData::ForwardHop { packet, delay } => {
self.handle_forward_packet(now, packet, delay);
}
MixProcessingResultData::FinalHop { final_hop_data } => {
self.handle_final_hop(final_hop_data).await;
}
},
}
}
async fn handle_post_replay_detection_packets(
&self,
now: Instant,
packets: Vec<PartiallyUnwrappedPacket>,
replay_check_results: Vec<bool>,
) {
for (packet, replayed) in packets.into_iter().zip(replay_check_results) {
let unwrapped_packet = if replayed {
Err(PacketProcessingError::PacketReplay)
} else {
packet.finalise_unwrapping()
};
self.handle_unwrapped_packet(now, unwrapped_packet).await;
}
}
async fn handle_pending_packets_batch_no_locking(&mut self, now: Instant) -> bool {
let replay_tags = self.pending_packets.replay_tags();
if replay_tags.is_empty() {
return false;
}
let replay_check_results = match self
.shared
.replay_protection_filter
.batch_try_check_and_set(&replay_tags)
{
None => return false,
Some(Ok(replay_check_results)) => replay_check_results,
Some(Err(_)) => {
// our mutex got poisoned - we have to shut down
error!("CRITICAL FAILURE: replay bloomfilter mutex poisoning!");
self.shared.shutdown.cancel();
return false;
}
};
let batch = self.pending_packets.reset(now);
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
.await;
true
}
async fn handle_pending_packets_batch(&mut self, now: Instant) {
let batch = self.pending_packets.reset(now);
let replay_tags = self.pending_packets.replay_tags();
if replay_tags.is_empty() {
return;
}
let Ok(replay_check_results) = self
.shared
.replay_protection_filter
.batch_check_and_set(&replay_tags)
else {
// our mutex got poisoned - we have to shut down
error!("CRITICAL FAILURE: replay bloomfilter mutex poisoning!");
self.shared.shutdown.cancel();
return;
};
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
.await;
}
#[instrument(skip(self, packet), level = "debug")]
async fn handle_received_nym_packet(&mut self, packet: FramedNymPacket) {
let now = Instant::now();
// 1. attempt to unwrap the packet
// if it's a sphinx packet attempt to do pre-processing and replay detection
if packet.is_sphinx() && !self.shared.replay_protection_filter.disabled() {
self.handle_received_packet_with_replay_detection(now, packet)
.await;
} else {
// otherwise just skip that whole procedure and go straight to payload unwrapping
// (assuming the basic framing is valid)
let unwrapped_packet =
process_framed_packet(packet, self.shared.sphinx_keys.private_key().as_ref());
// 2. increment our favourite metrics stats
self.shared
.update_metrics(&unwrapped_packet, self.remote_address.ip());
// 3. forward the packet to the relevant sink (if enabled)
match unwrapped_packet {
Err(err) => trace!("failed to process received mix packet: {err}"),
Ok(processed_packet) => match processed_packet.processing_data {
MixProcessingResultData::ForwardHop { packet, delay } => {
self.handle_forward_packet(packet, delay);
}
MixProcessingResultData::FinalHop { final_hop_data } => {
self.handle_final_hop(final_hop_data).await;
}
},
}
})
self.handle_unwrapped_packet(now, unwrapped_packet).await;
};
}
#[instrument(
@@ -1,7 +1,7 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::shared_network::RoutingFilter;
use crate::node::routing_filter::RoutingFilter;
use futures::StreamExt;
use nym_mixnet_client::forwarder::{
mix_forwarding_channels, MixForwardingReceiver, MixForwardingSender, PacketToForward,
@@ -60,14 +60,6 @@ impl<C, F> PacketForwarder<C, F> {
{
let next_hop = packet.next_hop();
if !self.routing_filter.should_route(next_hop.as_ref().ip()) {
debug!("dropping packet as the egress address does not belong to any known node");
self.metrics
.mixnet
.egress_dropped_forward_packet(next_hop.into());
return;
}
let packet_type = packet.packet_type();
let packet = packet.into_packet();
@@ -110,6 +102,16 @@ impl<C, F> PacketForwarder<C, F> {
C: SendWithoutResponse,
F: RoutingFilter,
{
let next_hop = new_packet.packet.next_hop();
if !self.routing_filter.should_route(next_hop.as_ref().ip()) {
debug!("dropping packet as the egress address does not belong to any known node");
self.metrics
.mixnet
.egress_dropped_forward_packet(next_hop.into());
return;
}
// in case of a zero delay packet, don't bother putting it in the delay queue,
// just forward it immediately
if let Some(instant) = new_packet.forward_delay_target {
+21
View File
@@ -4,6 +4,7 @@
use crate::config::Config;
use crate::node::mixnet::handler::ConnectionHandler;
use crate::node::mixnet::SharedFinalHopData;
use crate::node::replay_protection::bloomfilter::ReplayProtectionBloomfilter;
use nym_crypto::asymmetric::x25519;
use nym_gateway::node::GatewayStorageError;
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketToForward};
@@ -29,6 +30,13 @@ pub(crate) mod final_hop;
#[derive(Clone, Copy)]
pub(crate) struct ProcessingConfig {
pub(crate) maximum_packet_delay: Duration,
/// how long the task is willing to skip mutex acquisition before it will block the thread
/// until it actually obtains it
pub(crate) maximum_replay_detection_deferral: Duration,
/// how many packets the task is willing to queue before it will block the thread
/// until it obtains the mutex
pub(crate) maximum_replay_detection_pending_packets: usize,
pub(crate) forward_hop_processing_enabled: bool,
pub(crate) final_hop_processing_enabled: bool,
@@ -38,6 +46,16 @@ impl ProcessingConfig {
pub(crate) fn new(config: &Config) -> Self {
ProcessingConfig {
maximum_packet_delay: config.mixnet.debug.maximum_forward_packet_delay,
maximum_replay_detection_deferral: config
.mixnet
.replay_protection
.debug
.maximum_replay_detection_deferral,
maximum_replay_detection_pending_packets: config
.mixnet
.replay_protection
.debug
.maximum_replay_detection_pending_packets,
forward_hop_processing_enabled: config.modes.mixnode,
final_hop_processing_enabled: config.modes.expects_final_hop_traffic()
|| config.wireguard.enabled,
@@ -49,6 +67,7 @@ impl ProcessingConfig {
pub(crate) struct SharedData {
pub(super) processing_config: ProcessingConfig,
pub(super) sphinx_keys: Arc<x25519::KeyPair>,
pub(super) replay_protection_filter: ReplayProtectionBloomfilter,
// used for FORWARD mix packets and FINAL ack packets
pub(super) mixnet_forwarder: MixForwardingSender,
@@ -71,6 +90,7 @@ impl SharedData {
pub(crate) fn new(
processing_config: ProcessingConfig,
x25519_keys: Arc<x25519::KeyPair>,
replay_protection_filter: ReplayProtectionBloomfilter,
mixnet_forwarder: MixForwardingSender,
final_hop: SharedFinalHopData,
metrics: NymNodeMetrics,
@@ -79,6 +99,7 @@ impl SharedData {
SharedData {
processing_config,
sphinx_keys: x25519_keys,
replay_protection_filter,
mixnet_forwarder,
final_hop,
metrics,
+43 -12
View File
@@ -28,9 +28,10 @@ use crate::node::metrics::handler::pending_egress_packets_updater::PendingEgress
use crate::node::mixnet::packet_forwarding::PacketForwarder;
use crate::node::mixnet::shared::ProcessingConfig;
use crate::node::mixnet::SharedFinalHopData;
use crate::node::shared_network::{
CachedNetwork, CachedTopologyProvider, NetworkRefresher, OpenFilter, RoutingFilter,
};
use crate::node::replay_protection::background_task::ReplayProtectionBackgroundTask;
use crate::node::replay_protection::bloomfilter::ReplayProtectionBloomfilter;
use crate::node::routing_filter::{OpenFilter, RoutingFilter};
use crate::node::shared_network::{CachedNetwork, CachedTopologyProvider, NetworkRefresher};
use nym_bin_common::bin_info;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_gateway::node::{ActiveClientsStore, GatewayTasksBuilder};
@@ -69,6 +70,8 @@ pub mod helpers;
pub(crate) mod http;
pub(crate) mod metrics;
pub(crate) mod mixnet;
pub(crate) mod replay_protection;
mod routing_filter;
mod shared_network;
pub struct GatewayTasksData {
@@ -979,12 +982,35 @@ impl NymNode {
events_sender
}
pub(crate) fn start_mixnet_listener<F>(
pub(crate) async fn setup_replay_detection(
&self,
) -> Result<ReplayProtectionBloomfilter, NymNodeError> {
if self.config.mixnet.replay_protection.debug.unsafe_disabled {
return Ok(ReplayProtectionBloomfilter::new_disabled());
}
// create the background task for the bloomfilter
// to reset it and flush it to disk
let mut replay_detection_background = ReplayProtectionBackgroundTask::new(
&self.config,
self.metrics.clone(),
self.shutdown_manager
.clone_token("replay-detection-background"),
)
.await?;
let replay_protection_bloomfilter = replay_detection_background.global_bloomfilter();
self.shutdown_manager
.spawn(async move { replay_detection_background.run().await });
Ok(replay_protection_bloomfilter)
}
pub(crate) async fn start_mixnet_listener<F>(
&self,
active_clients_store: &ActiveClientsStore,
routing_filter: F,
shutdown: ShutdownToken,
) -> (MixForwardingSender, ActiveConnections)
) -> Result<(MixForwardingSender, ActiveConnections), NymNodeError>
where
F: RoutingFilter + Send + Sync + 'static,
{
@@ -1012,6 +1038,7 @@ impl NymNode {
);
let active_connections = mixnet_client.active_connections();
let replay_protection_bloomfilter = self.setup_replay_detection().await?;
let mut packet_forwarder = PacketForwarder::new(
mixnet_client,
routing_filter,
@@ -1029,6 +1056,7 @@ impl NymNode {
let shared = mixnet::SharedData::new(
processing_config,
self.x25519_sphinx_keys.clone(),
replay_protection_bloomfilter,
mix_packet_sender.clone(),
final_hop_data,
self.metrics.clone(),
@@ -1036,7 +1064,7 @@ impl NymNode {
);
mixnet::Listener::new(self.config.mixnet.bind_address, shared).start();
(mix_packet_sender, active_connections)
Ok((mix_packet_sender, active_connections))
}
pub(crate) async fn run_minimal_mixnet_processing(self) -> Result<(), NymNodeError> {
@@ -1044,7 +1072,8 @@ impl NymNode {
&ActiveClientsStore::new(),
OpenFilter,
self.shutdown_manager.clone_token("mixnet-traffic"),
);
)
.await?;
self.shutdown_manager.close();
self.shutdown_manager.wait_for_shutdown_signal().await;
@@ -1081,11 +1110,13 @@ impl NymNode {
let network_refresher = self.build_network_refresher().await?;
let active_clients_store = ActiveClientsStore::new();
let (mix_packet_sender, active_egress_mixnet_connections) = self.start_mixnet_listener(
&active_clients_store,
network_refresher.routing_filter(),
self.shutdown_manager.clone_token("mixnet-traffic"),
);
let (mix_packet_sender, active_egress_mixnet_connections) = self
.start_mixnet_listener(
&active_clients_store,
network_refresher.routing_filter(),
self.shutdown_manager.clone_token("mixnet-traffic"),
)
.await?;
let metrics_sender = self.setup_metrics_backend(
active_clients_store.clone(),
@@ -0,0 +1,229 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::Config;
use crate::error::NymNodeError;
use crate::node::replay_protection::bloomfilter::ReplayProtectionBloomfilter;
use crate::node::replay_protection::items_in_bloomfilter;
use human_repr::HumanCount;
use nym_node_metrics::NymNodeMetrics;
use nym_task::ShutdownToken;
use std::cmp::max;
use std::fs;
use std::path::PathBuf;
use std::time::Duration;
use tokio::time::{interval, Instant};
use tracing::{error, info, trace, warn};
struct LastResetData {
packets_received_at_last_reset: usize,
reset_time: Instant,
}
struct ReplayProtectionBackgroundTaskConfig {
current_bloomfilter_path: PathBuf,
current_bloomfilter_temp_flush_path: PathBuf,
false_positive_rate: f64,
filter_reset_rate: Duration,
disk_flushing_rate: Duration,
bloomfilter_size_multiplier: f64,
minimum_bloomfilter_packets_per_second: usize,
}
impl From<&Config> for ReplayProtectionBackgroundTaskConfig {
fn from(config: &Config) -> Self {
ReplayProtectionBackgroundTaskConfig {
current_bloomfilter_path: config
.mixnet
.replay_protection
.storage_paths
.current_bloomfilter_filepath(),
current_bloomfilter_temp_flush_path: config
.mixnet
.replay_protection
.storage_paths
.current_bloomfilter_being_flushed_filepath(),
false_positive_rate: config.mixnet.replay_protection.debug.false_positive_rate,
filter_reset_rate: config.mixnet.replay_protection.debug.bloomfilter_reset_rate,
disk_flushing_rate: config
.mixnet
.replay_protection
.debug
.bloomfilter_disk_flushing_rate,
bloomfilter_size_multiplier: config
.mixnet
.replay_protection
.debug
.bloomfilter_size_multiplier,
minimum_bloomfilter_packets_per_second: config
.mixnet
.replay_protection
.debug
.bloomfilter_minimum_packets_per_second_size,
}
}
}
// background task responsible for periodically flushing the bloomfilter to disk
// as well as clearing it up on the specified timer
// (in the future this will be enforced by key rotation)
pub struct ReplayProtectionBackgroundTask {
config: ReplayProtectionBackgroundTaskConfig,
last_reset: LastResetData,
filter: ReplayProtectionBloomfilter,
metrics: NymNodeMetrics,
shutdown_token: ShutdownToken,
}
impl ReplayProtectionBackgroundTask {
pub(crate) async fn new(
config: &Config,
metrics: NymNodeMetrics,
shutdown_token: ShutdownToken,
) -> Result<Self, NymNodeError> {
let task_config: ReplayProtectionBackgroundTaskConfig = config.into();
if task_config.current_bloomfilter_temp_flush_path.exists() {
error!(
"bloomfilter didn't get successfully flushed to disk and its data got corrupted"
);
fs::remove_file(&task_config.current_bloomfilter_temp_flush_path).map_err(|source| {
NymNodeError::BloomfilterIoFailure {
source,
path: task_config.current_bloomfilter_temp_flush_path.clone(),
}
})?
}
// if there's nothing on disk, we must create a new filter
let bloomfilter = if task_config.current_bloomfilter_path.exists() {
ReplayProtectionBloomfilter::load(&task_config.current_bloomfilter_path).await?
} else {
let bf_items = items_in_bloomfilter(
task_config.filter_reset_rate,
config
.mixnet
.replay_protection
.debug
.initial_expected_packets_per_second,
);
ReplayProtectionBloomfilter::new_empty(bf_items, task_config.false_positive_rate)?
};
Ok(ReplayProtectionBackgroundTask {
config: task_config,
last_reset: LastResetData {
packets_received_at_last_reset: 0,
reset_time: Instant::now(),
},
filter: bloomfilter,
metrics,
shutdown_token,
})
}
pub(crate) fn global_bloomfilter(&self) -> ReplayProtectionBloomfilter {
self.filter.clone()
}
async fn flush_to_disk(&self) -> Result<(), NymNodeError> {
if let Some(temp_parent) = self.config.current_bloomfilter_temp_flush_path.parent() {
fs::create_dir_all(temp_parent).map_err(|source| {
NymNodeError::BloomfilterIoFailure {
source,
path: temp_parent.to_path_buf(),
}
})?
}
if let Some(current_parent) = self.config.current_bloomfilter_temp_flush_path.parent() {
fs::create_dir_all(current_parent).map_err(|source| {
NymNodeError::BloomfilterIoFailure {
source,
path: current_parent.to_path_buf(),
}
})?
}
// because it takes a while to actually write the file to disk,
// we first write bytes to temporary location,
// and then we move it to the correct path
let temp = &self.config.current_bloomfilter_temp_flush_path;
self.filter.flush_to_disk(temp).await?;
fs::rename(temp, &self.config.current_bloomfilter_path).map_err(|source| {
NymNodeError::BloomfilterIoFailure {
source,
path: self.config.current_bloomfilter_path.clone(),
}
})?;
Ok(())
}
fn reset_bloomfilter(&mut self) -> Result<(), NymNodeError> {
// 1. determine parameters for new bloomfilter
let received = self.metrics.mixnet.ingress.forward_hop_packets_received()
+ self.metrics.mixnet.ingress.final_hop_packets_received();
let time_delta = self.last_reset.reset_time.elapsed();
let received_since_last_reset = received - self.last_reset.packets_received_at_last_reset;
let received_per_second =
(received_since_last_reset as f64 / time_delta.as_secs_f64()).round() as usize;
let bf_received = max(
received_per_second,
self.config.minimum_bloomfilter_packets_per_second,
);
let items_in_new_filter = items_in_bloomfilter(self.config.filter_reset_rate, bf_received);
let adjusted =
(items_in_new_filter as f64 * self.config.bloomfilter_size_multiplier).round() as usize;
info!(
"resetting bloom filter. new expected number of packets: {} that preserve fp rate of {}",
adjusted.human_count_bare(),
self.config.false_positive_rate
);
// 2. update the filter
self.last_reset.reset_time = Instant::now();
self.last_reset.packets_received_at_last_reset = received_since_last_reset;
// if this fails with the mutex getting poisoned, the next received packet is going to cause
// a shutdown, so we don't have to propagate it here
self.filter.reset(adjusted, self.config.false_positive_rate)
}
pub(crate) async fn run(&mut self) {
let mut reset_timer = interval(self.config.filter_reset_rate);
reset_timer.reset();
let mut flush_timer = interval(self.config.disk_flushing_rate);
flush_timer.reset();
loop {
tokio::select! {
biased;
_ = self.shutdown_token.cancelled() => {
trace!("ReplayProtectionBackgroundTask: Received shutdown");
break;
}
_ = reset_timer.tick() => {
if let Err(err) = self.reset_bloomfilter() {
error!("failed to reset the bloomfilter: {err}")
}
}
_ = flush_timer.tick() => {
if let Err(err) = self.flush_to_disk().await {
error!("failed to flush bloomfilter to disk: {err}")
}
}
}
}
info!("SHUTDOWN: flushing replay detection bloomfilter to disk. this might take a while. DO NOT INTERRUPT THIS PROCESS");
if let Err(err) = self.flush_to_disk().await {
warn!("failed to flush replay detection bloom filter on shutdown: {err}");
}
}
}
@@ -0,0 +1,223 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NymNodeError;
use bloomfilter::Bloom;
use human_repr::HumanDuration;
use nym_sphinx_types::REPLAY_TAG_SIZE;
use std::path::Path;
use std::sync::{Arc, PoisonError, TryLockError};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time::Instant;
use tracing::{debug, info};
// it appears that now std Mutex is faster (or comparable) to parking_lot
// in high contention situations: https://github.com/rust-lang/rust/pull/95035#issuecomment-1073966631
// (tokio's async Mutex has too much overhead due to the number of access required)
#[derive(Clone)]
pub(crate) struct ReplayProtectionBloomfilter {
disabled: bool,
inner: Arc<std::sync::Mutex<ReplayProtectionBloomfilterInner>>,
}
impl ReplayProtectionBloomfilter {
pub(crate) fn new_empty(items_count: usize, fp_p: f64) -> Result<Self, NymNodeError> {
Ok(ReplayProtectionBloomfilter {
disabled: false,
inner: Arc::new(std::sync::Mutex::new(ReplayProtectionBloomfilterInner {
current_filter: Bloom::new_for_fp_rate(items_count, fp_p)
.map_err(NymNodeError::bloomfilter_failure)?,
})),
})
}
// SAFETY: the hardcoded values of 1,1 are valid
#[allow(clippy::unwrap_used)]
pub(crate) fn new_disabled() -> Self {
// well, technically it's not fully empty, but the memory footprint is negligible
ReplayProtectionBloomfilter {
disabled: true,
inner: Arc::new(std::sync::Mutex::new(ReplayProtectionBloomfilterInner {
current_filter: Bloom::new(1, 1).unwrap(),
})),
}
}
pub(crate) fn disabled(&self) -> bool {
self.disabled
}
pub(crate) fn reset(&self, items_count: usize, fp_p: f64) -> Result<(), NymNodeError> {
// 1. build the new filter
let new_inner = ReplayProtectionBloomfilterInner {
current_filter: Bloom::new_for_fp_rate(items_count, fp_p)
.map_err(NymNodeError::bloomfilter_failure)?,
};
// 2. swap it
let mut guard = self
.inner
.lock()
.map_err(|_| NymNodeError::BloomfilterFailure {
message: "mutex got poisoned",
})?;
*guard = new_inner;
Ok(())
}
// NOTE: with key rotations we'll have to check whether the file is still valid and which
// key it corresponds to, but that's a future problem
pub(crate) async fn load<P: AsRef<Path>>(path: P) -> Result<Self, NymNodeError> {
info!("attempting to load prior replay detection bloomfilter...");
let path = path.as_ref();
let mut file =
File::open(path)
.await
.map_err(|source| NymNodeError::BloomfilterIoFailure {
source,
path: path.to_path_buf(),
})?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)
.await
.map_err(|source| NymNodeError::BloomfilterIoFailure {
source,
path: path.to_path_buf(),
})?;
Ok(ReplayProtectionBloomfilter {
disabled: false,
inner: Arc::new(std::sync::Mutex::new(ReplayProtectionBloomfilterInner {
current_filter: Bloom::from_bytes(buf)
.map_err(NymNodeError::bloomfilter_failure)?,
})),
})
}
// average HDD has the write speed of ~80MB/s so a 2GB bloomfilter would take almost 30s to write...
// and this function is explicitly async and using tokio's async operations, because otherwise
// we'd have to go through the whole hassle of using spawn_blocking and awaiting that one instead
pub(crate) async fn flush_to_disk<P: AsRef<Path>>(&self, path: P) -> Result<(), NymNodeError> {
debug!("flushing replay protection bloomfilter to disk...");
let start = Instant::now();
let path = path.as_ref();
let mut file =
File::create(path)
.await
.map_err(|source| NymNodeError::BloomfilterIoFailure {
source,
path: path.to_path_buf(),
})?;
let data = self.bytes().map_err(|_| NymNodeError::BloomfilterFailure {
message: "mutex got poisoned",
})?;
file.write_all(&data)
.await
.map_err(|source| NymNodeError::BloomfilterIoFailure {
source,
path: path.to_path_buf(),
})?;
let elapsed = start.elapsed();
info!(
"flushed replay protection bloomfilter to disk. it took: {}",
elapsed.human_duration()
);
Ok(())
}
}
struct ReplayProtectionBloomfilterInner {
// metadata to do with epochs, etc.
current_filter: Bloom<[u8; REPLAY_TAG_SIZE]>,
// overlap_filter: bloomfilter::Bloom<[u8; REPLAY_TAG_SIZE]>,
}
impl ReplayProtectionBloomfilter {
#[allow(dead_code)]
pub(crate) fn check_and_set(
&self,
replay_tag: &[u8; REPLAY_TAG_SIZE],
) -> Result<bool, PoisonError<()>> {
let Ok(mut guard) = self.inner.lock() else {
return Err(PoisonError::new(()));
};
Ok(guard.current_filter.check_and_set(replay_tag))
}
#[allow(dead_code)]
pub(crate) fn try_check_and_set(
&self,
replay_tag: &[u8; REPLAY_TAG_SIZE],
) -> Option<Result<bool, PoisonError<()>>> {
let mut guard = match self.inner.try_lock() {
Ok(guard) => guard,
Err(TryLockError::Poisoned(_)) => return Some(Err(PoisonError::new(()))),
Err(TryLockError::WouldBlock) => return None,
};
Some(Ok(guard.current_filter.check_and_set(replay_tag)))
}
pub(crate) fn batch_try_check_and_set(
&self,
reply_tags: &[&[u8; REPLAY_TAG_SIZE]],
) -> Option<Result<Vec<bool>, PoisonError<()>>> {
let mut guard = match self.inner.try_lock() {
Ok(guard) => guard,
Err(TryLockError::Poisoned(_)) => return Some(Err(PoisonError::new(()))),
Err(TryLockError::WouldBlock) => return None,
};
let mut result = Vec::with_capacity(reply_tags.len());
for tag in reply_tags {
result.push(guard.current_filter.check_and_set(tag));
}
// for testing throughput without disabling checks:
// return Some(Ok(vec![false; reply_tags.len()]));
Some(Ok(result))
}
pub(crate) fn batch_check_and_set(
&self,
reply_tags: &[&[u8; REPLAY_TAG_SIZE]],
) -> Result<Vec<bool>, PoisonError<()>> {
let Ok(mut guard) = self.inner.lock() else {
return Err(PoisonError::new(()));
};
let mut result = Vec::with_capacity(reply_tags.len());
for tag in reply_tags {
result.push(guard.current_filter.check_and_set(tag));
}
// for testing throughput without disabling checks:
// return Ok(vec![false; reply_tags.len()]);
Ok(result)
}
#[allow(dead_code)]
pub(crate) fn clear(&self) -> Result<(), PoisonError<()>> {
let mut guard = self.inner.lock().map_err(|_| PoisonError::new(()))?;
guard.current_filter.clear();
Ok(())
}
// due to the size of the bloomfilter, extra caution has to be applied when using this method
// note: we're not getting reference to bytes as this method is used when flushing data to the disk
// (which takes ~30s) and we can't block the mutex for that long.
fn bytes(&self) -> Result<Vec<u8>, PoisonError<()>> {
let guard = self.inner.lock().map_err(|_| PoisonError::new(()))?;
Ok(guard.current_filter.to_bytes())
}
}
@@ -0,0 +1,52 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use std::f64::consts::LN_2;
use std::time::Duration;
pub(crate) mod background_task;
pub(crate) mod bloomfilter;
pub fn bitmap_size(false_positive_rate: f64, items_in_filter: usize) -> usize {
/// Equivalent to ln(1 / 2^ln(2)) = ln^2(2)
const NEG_LN_2_POW_2: f64 = -0.48045301391820144f64;
assert!(items_in_filter < f64::MAX.floor() as usize);
((items_in_filter as f64 * false_positive_rate.ln()) / NEG_LN_2_POW_2).ceil() as usize
}
#[allow(dead_code)]
pub fn num_of_hash_functions(items_in_filter: usize, bitmap_size: usize) -> usize {
((bitmap_size as f64 / items_in_filter as f64) * LN_2).round() as usize
}
pub fn items_in_bloomfilter(reset_rate: Duration, packets_per_second: usize) -> usize {
reset_rate.as_secs() as usize * packets_per_second
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn calculating_bitmap_size() {
let fpr = 1e-5;
let items_in_filter = 725760000;
let expected_bitmap_size = 17391129920;
assert_eq!(bitmap_size(fpr, items_in_filter), expected_bitmap_size);
}
#[test]
fn calculating_number_of_hash_functions() {
let items_in_filter = 725760000;
let bitmap_size = 17391129920;
let expected_hashes = 17;
assert_eq!(
num_of_hash_functions(items_in_filter, bitmap_size),
expected_hashes
);
}
}
+44
View File
@@ -0,0 +1,44 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use std::net::IpAddr;
pub(crate) mod network_filter;
pub(crate) trait RoutingFilter {
fn should_route(&self, ip: IpAddr) -> bool;
}
#[derive(Debug, Copy, Clone, Default)]
pub(crate) struct OpenFilter;
impl RoutingFilter for OpenFilter {
fn should_route(&self, _: IpAddr) -> bool {
true
}
}
// #[derive(Default)]
// pub(crate) struct ComposedRoutingFilter {
// layers: Vec<Box<dyn RoutingFilter + Send + Sync + 'static>>,
// }
//
// impl ComposedRoutingFilter {
// pub(crate) fn new() -> Self {
// Self::default()
// }
//
// pub(crate) fn with_filter<F: RoutingFilter + Send + Sync + 'static>(
// mut self,
// filter: F,
// ) -> Self {
// self.layers.push(Box::new(filter));
// self
// }
// }
//
// impl RoutingFilter for ComposedRoutingFilter {
// fn should_route(&self, ip: IpAddr) -> bool {
// self.layers.iter().all(|l| l.should_route(ip))
// }
// }
@@ -0,0 +1,128 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::mixnet::packet_forwarding::global::is_global_ip;
use crate::node::routing_filter::RoutingFilter;
use arc_swap::ArcSwap;
use std::collections::HashSet;
use std::net::IpAddr;
use std::sync::Arc;
use tokio::sync::RwLock;
impl RoutingFilter for NetworkRoutingFilter {
fn should_route(&self, ip: IpAddr) -> bool {
// only allow non-global ips on testnets
if self.testnet_mode && !is_global_ip(&ip) {
return true;
}
self.attempt_resolve(ip).should_route()
}
}
#[derive(Clone)]
pub(crate) struct NetworkRoutingFilter {
testnet_mode: bool,
pub(crate) resolved: KnownNodes,
// while this is technically behind a lock, it should not be called too often as once resolved it will
// be present on the arcswap in either allowed or denied section
pub(crate) pending: UnknownNodes,
}
impl NetworkRoutingFilter {
pub(crate) fn new_empty(testnet_mode: bool) -> Self {
NetworkRoutingFilter {
testnet_mode,
resolved: Default::default(),
pending: Default::default(),
}
}
pub(crate) fn attempt_resolve(&self, ip: IpAddr) -> Resolution {
if self.resolved.inner.allowed.load().contains(&ip) {
Resolution::Accept
} else if self.resolved.inner.denied.load().contains(&ip) {
Resolution::Deny
} else {
self.pending.try_insert(ip);
Resolution::Unknown
}
}
pub(crate) fn allowed_nodes_copy(&self) -> HashSet<IpAddr> {
self.resolved.inner.allowed.load_full().as_ref().clone()
}
pub(crate) fn denied_nodes_copy(&self) -> HashSet<IpAddr> {
self.resolved.inner.denied.load_full().as_ref().clone()
}
}
#[derive(Clone, Default)]
pub(crate) struct UnknownNodes(Arc<RwLock<HashSet<IpAddr>>>);
impl UnknownNodes {
fn try_insert(&self, ip: IpAddr) {
// if we can immediately grab the lock to push it into the pending queue, amazing, let's do it
// otherwise we can do it next time we see this ip
// (if we can't hold the lock, it means it's being updated at this very moment which is actually a good thing)
if let Ok(mut guard) = self.0.try_write() {
guard.insert(ip);
}
}
pub(crate) async fn clear(&self) {
self.0.write().await.clear();
}
pub(crate) async fn nodes(&self) -> HashSet<IpAddr> {
self.0.read().await.clone()
}
}
// for now we don't care about keys, etc.
// we only want to know if given ip belongs to a known node
#[derive(Debug, Default, Clone)]
pub(crate) struct KnownNodes {
inner: Arc<KnownNodesInner>,
}
#[derive(Debug, Default)]
struct KnownNodesInner {
allowed: ArcSwap<HashSet<IpAddr>>,
denied: ArcSwap<HashSet<IpAddr>>,
}
pub(crate) enum Resolution {
Unknown,
Deny,
Accept,
}
impl From<bool> for Resolution {
fn from(value: bool) -> Self {
if value {
Resolution::Accept
} else {
Resolution::Deny
}
}
}
impl Resolution {
pub(crate) fn should_route(&self) -> bool {
matches!(self, Resolution::Accept)
}
}
impl KnownNodes {
pub(crate) fn swap_allowed(&self, new: HashSet<IpAddr>) {
self.inner.allowed.store(Arc::new(new))
}
pub(crate) fn swap_denied(&self, new: HashSet<IpAddr>) {
self.inner.denied.store(Arc::new(new))
}
}
+4 -148
View File
@@ -2,8 +2,7 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NymNodeError;
use crate::node::mixnet::packet_forwarding::global::is_global_ip;
use arc_swap::ArcSwap;
use crate::node::routing_filter::network_filter::NetworkRoutingFilter;
use async_trait::async_trait;
use nym_gateway::node::UserAgent;
use nym_node_metrics::prometheus_wrapper::{PrometheusMetric, PROMETHEUS_METRICS};
@@ -23,129 +22,6 @@ use tracing::log::error;
use tracing::{debug, trace, warn};
use url::Url;
pub(crate) trait RoutingFilter {
fn should_route(&self, ip: IpAddr) -> bool;
}
#[derive(Debug, Copy, Clone, Default)]
pub(crate) struct OpenFilter;
impl RoutingFilter for OpenFilter {
fn should_route(&self, _: IpAddr) -> bool {
true
}
}
impl RoutingFilter for NetworkRoutingFilter {
fn should_route(&self, ip: IpAddr) -> bool {
// only allow non-global ips on testnets
if self.testnet_mode && !is_global_ip(&ip) {
return true;
}
self.attempt_resolve(ip).should_route()
}
}
#[derive(Clone)]
pub(crate) struct NetworkRoutingFilter {
testnet_mode: bool,
resolved: KnownNodes,
// while this is technically behind a lock, it should not be called too often as once resolved it will
// be present on the arcswap in either allowed or denied section
pending: UnknownNodes,
}
impl NetworkRoutingFilter {
fn new_empty(testnet_mode: bool) -> Self {
NetworkRoutingFilter {
testnet_mode,
resolved: Default::default(),
pending: Default::default(),
}
}
pub(crate) fn attempt_resolve(&self, ip: IpAddr) -> Resolution {
if self.resolved.inner.allowed.load().contains(&ip) {
Resolution::Accept
} else if self.resolved.inner.denied.load().contains(&ip) {
Resolution::Deny
} else {
self.pending.try_insert(ip);
Resolution::Unknown
}
}
}
#[derive(Clone, Default)]
struct UnknownNodes(Arc<RwLock<HashSet<IpAddr>>>);
impl UnknownNodes {
fn try_insert(&self, ip: IpAddr) {
// if we can immediately grab the lock to push it into the pending queue, amazing, let's do it
// otherwise we can do it next time we see this ip
// (if we can't hold the lock, it means it's being updated at this very moment which is actually a good thing)
if let Ok(mut guard) = self.0.try_write() {
guard.insert(ip);
}
}
async fn clear(&self) {
self.0.write().await.clear();
}
async fn nodes(&self) -> HashSet<IpAddr> {
self.0.read().await.clone()
}
}
// for now we don't care about keys, etc.
// we only want to know if given ip belongs to a known node
#[derive(Debug, Default, Clone)]
pub(crate) struct KnownNodes {
inner: Arc<KnownNodesInner>,
}
#[derive(Debug, Default)]
struct KnownNodesInner {
allowed: ArcSwap<HashSet<IpAddr>>,
denied: ArcSwap<HashSet<IpAddr>>,
}
pub(crate) enum Resolution {
Unknown,
Deny,
Accept,
}
impl From<bool> for Resolution {
fn from(value: bool) -> Self {
if value {
Resolution::Accept
} else {
Resolution::Deny
}
}
}
impl Resolution {
pub(crate) fn should_route(&self) -> bool {
matches!(self, Resolution::Accept)
}
}
impl KnownNodes {
fn swap_allowed(&self, new: HashSet<IpAddr>) {
self.inner.allowed.store(Arc::new(new))
}
fn swap_denied(&self, new: HashSet<IpAddr>) {
self.inner.denied.store(Arc::new(new))
}
}
struct NodesQuerier {
client: NymApiClient,
nym_api_urls: Vec<Url>,
@@ -316,26 +192,6 @@ impl NetworkRefresher {
Ok(this)
}
fn allowed_nodes_copy(&self) -> HashSet<IpAddr> {
self.routing_filter
.resolved
.inner
.allowed
.load_full()
.as_ref()
.clone()
}
fn denied_nodes_copy(&self) -> HashSet<IpAddr> {
self.routing_filter
.resolved
.inner
.denied
.load_full()
.as_ref()
.clone()
}
async fn inspect_pending(&mut self) {
let to_resolve = self.routing_filter.pending.nodes().await;
@@ -344,8 +200,8 @@ impl NetworkRefresher {
return;
}
let mut allowed = self.allowed_nodes_copy();
let mut denied = self.denied_nodes_copy();
let mut allowed = self.routing_filter().allowed_nodes_copy();
let mut denied = self.routing_filter().denied_nodes_copy();
// short circuit: check if the pending nodes are not already resolved
// (it could happen due to lack of full sync between pending lock and arcswap(s))
@@ -389,7 +245,7 @@ impl NetworkRefresher {
.collect::<HashSet<_>>();
let pending = self.routing_filter.pending.nodes().await;
let mut current_denied = self.denied_nodes_copy();
let mut current_denied = self.routing_filter.denied_nodes_copy();
for allowed in &known_nodes {
// if some node has become known, it should be removed from the denied set
+26 -10
View File
@@ -7,6 +7,7 @@ use arrayref::array_ref;
use blake2::VarBlake2b;
use chacha::ChaCha;
use futures::{stream, SinkExt, Stream, StreamExt};
use hkdf::Hkdf;
use human_repr::{HumanCount, HumanDuration, HumanThroughput};
use lioness::Lioness;
use nym_crypto::asymmetric::x25519;
@@ -15,13 +16,15 @@ use nym_sphinx_framing::codec::{NymCodec, NymCodecError};
use nym_sphinx_framing::packet::FramedNymPacket;
use nym_sphinx_params::PacketSize;
use nym_sphinx_routing::generate_hop_delays;
use nym_sphinx_types::constants::{EXPANDED_SHARED_SECRET_LENGTH, HKDF_INPUT_SEED};
use nym_sphinx_types::header::keys::PayloadKey;
use nym_sphinx_types::{
Destination, DestinationAddressBytes, Node, NymPacket, SphinxHeader,
DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH,
Destination, DestinationAddressBytes, Node, NymPacket, DESTINATION_ADDRESS_LENGTH,
IDENTIFIER_LENGTH,
};
use nym_task::ShutdownToken;
use rand::rngs::OsRng;
use sha2::Sha256;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
@@ -96,6 +99,18 @@ pub(crate) struct ThroughputTestingClient {
payload_key: PayloadKey,
}
fn rederive_lioness_payload_key(shared_secret: &[u8; 32]) -> PayloadKey {
let hkdf = Hkdf::<Sha256>::new(None, shared_secret);
// expanded shared secret
let mut output = [0u8; EXPANDED_SHARED_SECRET_LENGTH];
// SAFETY: the length of the provided okm is within the allowed range
#[allow(clippy::unwrap_used)]
hkdf.expand(HKDF_INPUT_SEED, &mut output).unwrap();
*array_ref!(&output, 32, 192)
}
impl ThroughputTestingClient {
pub(crate) async fn try_create(
initial_sending_delay: Duration,
@@ -145,16 +160,17 @@ impl ThroughputTestingClient {
// SAFETY: we constructed a sphinx packet...
#[allow(clippy::unwrap_used)]
let sphinx_packet = forward_packet.as_sphinx_packet().unwrap();
let sphinx_packet = forward_packet.to_sphinx_packet().unwrap();
let header = &sphinx_packet.header;
// derive the routing keys of our node so we could tag the payload to figure out latency
// derive the expanded shared secret for our node so we could tag the payload to figure out latency
// by tagging the packet
let routing_keys = SphinxHeader::compute_routing_keys(
&header.shared_secret,
(&node_keys.private_key()).as_ref(),
);
let payload_key = routing_keys.payload_key;
let shared_secret = node_keys
.private_key()
.as_ref()
.diffie_hellman(&header.shared_secret);
let payload_key = rederive_lioness_payload_key(shared_secret.as_bytes());
let unwrapped_payload = sphinx_packet.payload.unwrap(&payload_key)?;
let unwrapped_forward_payload_bytes = unwrapped_payload.into_bytes();
@@ -271,7 +287,7 @@ impl ThroughputTestingClient {
let inner = received.into_inner();
// safety: we sent a sphinx packet...
#[allow(clippy::unwrap_used)]
let sphinx = inner.as_sphinx_packet().unwrap();
let sphinx = inner.to_sphinx_packet().unwrap();
let tag = PacketTag::from_bytes(sphinx.payload.as_bytes());
self.stats.new_received(tag.elapsed_nanos());