Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 26b86dd887 | |||
| 1bb786962f | |||
| 8cf2a29a6c | |||
| 4d62ff6ff1 |
Generated
+17
-1
@@ -2772,6 +2772,9 @@ name = "fastrand"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
|
||||
dependencies = [
|
||||
"getrandom 0.2.16",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ff"
|
||||
@@ -2844,6 +2847,18 @@ dependencies = [
|
||||
"spin",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flume"
|
||||
version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be"
|
||||
dependencies = [
|
||||
"fastrand",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"spin",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
@@ -7149,6 +7164,7 @@ dependencies = [
|
||||
"csv",
|
||||
"cupid",
|
||||
"dashmap",
|
||||
"flume 0.12.0",
|
||||
"futures",
|
||||
"hex",
|
||||
"hkdf",
|
||||
@@ -11069,7 +11085,7 @@ checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
|
||||
dependencies = [
|
||||
"atoi",
|
||||
"chrono",
|
||||
"flume",
|
||||
"flume 0.11.1",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
|
||||
@@ -269,6 +269,7 @@ etherparse = "0.13.0"
|
||||
eyre = "0.6.9"
|
||||
fastrand = "2.1.1"
|
||||
flate2 = "1.1.1"
|
||||
flume = "0.12.0"
|
||||
futures = "0.3.31"
|
||||
futures-util = "0.3"
|
||||
generic-array = "0.14.7"
|
||||
|
||||
@@ -110,7 +110,7 @@ pub enum PacketProcessingError {
|
||||
PacketReplay,
|
||||
}
|
||||
|
||||
pub struct PartialyUnwrappedPacketWithKeyRotation {
|
||||
pub struct PartiallyUnwrappedPacketWithKeyRotation {
|
||||
pub packet: PartiallyUnwrappedPacket,
|
||||
pub used_key_rotation: u32,
|
||||
}
|
||||
@@ -183,8 +183,8 @@ impl PartiallyUnwrappedPacket {
|
||||
pub fn with_key_rotation(
|
||||
self,
|
||||
used_key_rotation: u32,
|
||||
) -> PartialyUnwrappedPacketWithKeyRotation {
|
||||
PartialyUnwrappedPacketWithKeyRotation {
|
||||
) -> PartiallyUnwrappedPacketWithKeyRotation {
|
||||
PartiallyUnwrappedPacketWithKeyRotation {
|
||||
packet: self,
|
||||
used_key_rotation,
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ console-subscriber = { workspace = true, optional = true }
|
||||
csv = { workspace = true }
|
||||
clap = { workspace = true, features = ["cargo", "env"] }
|
||||
dashmap = { workspace = true }
|
||||
flume= { workspace = true }
|
||||
futures = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
humantime-serde = { workspace = true }
|
||||
|
||||
@@ -104,6 +104,12 @@ impl MixingStats {
|
||||
.dropped += 1;
|
||||
}
|
||||
|
||||
pub fn ingress_dropped_overflow_packet(&self) {
|
||||
self.ingress
|
||||
.overflow_packets_dropped
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn ingress_dropped_final_hop_packet(&self, source: IpAddr) {
|
||||
self.ingress
|
||||
.final_hop_packets_dropped
|
||||
@@ -252,6 +258,9 @@ pub struct IngressMixingStats {
|
||||
// final hop packets (i.e. to gateway)
|
||||
final_hop_packets_dropped: AtomicUsize,
|
||||
|
||||
/// Packets that were dropped because the ingress channel queue was full
|
||||
overflow_packets_dropped: AtomicUsize,
|
||||
|
||||
senders: DashMap<IpAddr, IngressRecipientStats>,
|
||||
}
|
||||
|
||||
@@ -284,6 +293,10 @@ impl IngressMixingStats {
|
||||
self.final_hop_packets_dropped.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn overflow_packets_dropped(&self) -> usize {
|
||||
self.overflow_packets_dropped.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn senders(&self) -> &DashMap<IpAddr, IngressRecipientStats> {
|
||||
&self.senders
|
||||
}
|
||||
|
||||
@@ -657,13 +657,10 @@ pub struct ReplayProtectionDebug {
|
||||
/// Specifies whether this node should **NOT** use replay protection
|
||||
pub unsafe_disabled: bool,
|
||||
|
||||
/// How long the processing task is willing to skip mutex acquisition before it will block the thread
|
||||
/// until it actually obtains it
|
||||
pub maximum_replay_detection_deferral: Duration,
|
||||
|
||||
/// How many packets the processing task is willing to queue before it will block the thread
|
||||
/// until it obtains the mutex
|
||||
pub maximum_replay_detection_pending_packets: usize,
|
||||
/// Channel capacity for the mixnet ingress channel. This determines the maximum number of
|
||||
/// packets that can be queued waiting for ingest processing. Once the queue is full packets
|
||||
/// will still be taken off the wire, but dropped as the node is too busy to handle them.
|
||||
pub ingress_channel_maximum_capacity: usize,
|
||||
|
||||
/// Probability of false positives, fraction between 0 and 1 or a number indicating 1-in-p
|
||||
pub false_positive_rate: f64,
|
||||
@@ -689,9 +686,7 @@ pub struct ReplayProtectionDebug {
|
||||
}
|
||||
|
||||
impl ReplayProtectionDebug {
|
||||
pub const DEFAULT_MAXIMUM_REPLAY_DETECTION_DEFERRAL: Duration = Duration::from_millis(50);
|
||||
|
||||
pub const DEFAULT_MAXIMUM_REPLAY_DETECTION_PENDING_PACKETS: usize = 100;
|
||||
pub const DEFAULT_INGRESS_CHANNEL_MAXIMUM_CAPACITY: usize = 2000;
|
||||
|
||||
// 12% (completely arbitrary)
|
||||
pub const DEFAULT_BLOOMFILTER_SIZE_MULTIPLIER: f64 = 1.12;
|
||||
@@ -755,9 +750,7 @@ impl Default for ReplayProtectionDebug {
|
||||
fn default() -> Self {
|
||||
ReplayProtectionDebug {
|
||||
unsafe_disabled: false,
|
||||
maximum_replay_detection_deferral: Self::DEFAULT_MAXIMUM_REPLAY_DETECTION_DEFERRAL,
|
||||
maximum_replay_detection_pending_packets:
|
||||
Self::DEFAULT_MAXIMUM_REPLAY_DETECTION_PENDING_PACKETS,
|
||||
ingress_channel_maximum_capacity: Self::DEFAULT_INGRESS_CHANNEL_MAXIMUM_CAPACITY,
|
||||
false_positive_rate: Self::DEFAULT_REPLAY_DETECTION_FALSE_POSITIVE_RATE,
|
||||
initial_expected_packets_per_second: Self::DEFAULT_INITIAL_EXPECTED_PACKETS_PER_SECOND,
|
||||
bloomfilter_minimum_packets_per_second_size:
|
||||
@@ -802,7 +795,7 @@ impl MixnetDebug {
|
||||
const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_millis(10_000);
|
||||
const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000);
|
||||
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
|
||||
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
|
||||
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 128;
|
||||
}
|
||||
|
||||
impl Default for MixnetDebug {
|
||||
|
||||
@@ -614,16 +614,8 @@ pub async fn try_upgrade_config_v12<P: AsRef<Path>>(
|
||||
},
|
||||
debug: ReplayProtectionDebug {
|
||||
unsafe_disabled: old_cfg.mixnet.replay_protection.debug.unsafe_disabled,
|
||||
maximum_replay_detection_deferral: old_cfg
|
||||
.mixnet
|
||||
.replay_protection
|
||||
.debug
|
||||
.maximum_replay_detection_deferral,
|
||||
maximum_replay_detection_pending_packets: old_cfg
|
||||
.mixnet
|
||||
.replay_protection
|
||||
.debug
|
||||
.maximum_replay_detection_pending_packets,
|
||||
ingress_channel_maximum_capacity:
|
||||
ReplayProtectionDebug::DEFAULT_INGRESS_CHANNEL_MAXIMUM_CAPACITY,
|
||||
false_positive_rate: old_cfg.mixnet.replay_protection.debug.false_positive_rate,
|
||||
initial_expected_packets_per_second: old_cfg
|
||||
.mixnet
|
||||
|
||||
@@ -1,675 +0,0 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node::key_rotation::active_keys::SphinxKeyGuard;
|
||||
use crate::node::mixnet::shared::SharedData;
|
||||
use futures::StreamExt;
|
||||
use nym_noise::connection::Connection;
|
||||
use nym_noise::upgrade_noise_responder;
|
||||
use nym_sphinx_forwarding::packet::MixPacket;
|
||||
use nym_sphinx_framing::codec::NymCodec;
|
||||
use nym_sphinx_framing::packet::FramedNymPacket;
|
||||
use nym_sphinx_framing::processing::{
|
||||
MixProcessingResult, MixProcessingResultData, PacketProcessingError, PartiallyUnwrappedPacket,
|
||||
PartialyUnwrappedPacketWithKeyRotation, ProcessedFinalHop, process_framed_packet,
|
||||
};
|
||||
use nym_sphinx_params::SphinxKeyRotation;
|
||||
use nym_sphinx_types::{Delay, REPLAY_TAG_SIZE};
|
||||
use std::collections::HashMap;
|
||||
use std::mem;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::codec::Framed;
|
||||
use tracing::{Span, debug, error, instrument, trace, warn};
|
||||
|
||||
/// How often (in packets) the stream-level span updates its packet count.
|
||||
const SPAN_UPDATE_INTERVAL: u64 = 10_000;
|
||||
|
||||
struct PendingReplayCheckPackets {
|
||||
// map of rotation id used for packet creation to the packets
|
||||
packets: HashMap<u32, Vec<PartiallyUnwrappedPacket>>,
|
||||
last_acquired_mutex: Instant,
|
||||
}
|
||||
|
||||
impl PendingReplayCheckPackets {
|
||||
fn new() -> PendingReplayCheckPackets {
|
||||
PendingReplayCheckPackets {
|
||||
packets: Default::default(),
|
||||
last_acquired_mutex: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&mut self, now: Instant) -> HashMap<u32, Vec<PartiallyUnwrappedPacket>> {
|
||||
self.last_acquired_mutex = now;
|
||||
mem::take(&mut self.packets)
|
||||
}
|
||||
|
||||
fn push(&mut self, now: Instant, packet: PartialyUnwrappedPacketWithKeyRotation) {
|
||||
if self.packets.is_empty() {
|
||||
self.last_acquired_mutex = now;
|
||||
}
|
||||
self.packets
|
||||
.entry(packet.used_key_rotation)
|
||||
.or_default()
|
||||
.push(packet.packet)
|
||||
}
|
||||
|
||||
fn total_count(&self) -> usize {
|
||||
self.packets.values().map(|v| v.len()).sum()
|
||||
}
|
||||
|
||||
fn replay_tags(&self) -> HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>> {
|
||||
let mut replay_tags = HashMap::with_capacity(self.packets.len());
|
||||
'outer: for (rotation_id, packets) in &self.packets {
|
||||
let mut rotation_replay_tags = Vec::with_capacity(packets.len());
|
||||
for packet in packets {
|
||||
let Some(replay_tag) = packet.replay_tag() else {
|
||||
error!(
|
||||
"corrupted batch of {} packets - replay tag was missing",
|
||||
self.packets.len()
|
||||
);
|
||||
replay_tags.insert(*rotation_id, Vec::new());
|
||||
continue 'outer;
|
||||
};
|
||||
rotation_replay_tags.push(replay_tag);
|
||||
}
|
||||
replay_tags.insert(*rotation_id, rotation_replay_tags);
|
||||
}
|
||||
replay_tags
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ConnectionHandler {
|
||||
shared: SharedData,
|
||||
remote_address: SocketAddr,
|
||||
|
||||
// packets pending for replay detection
|
||||
pending_packets: PendingReplayCheckPackets,
|
||||
}
|
||||
|
||||
impl Drop for ConnectionHandler {
|
||||
fn drop(&mut self) {
|
||||
self.shared
|
||||
.metrics
|
||||
.network
|
||||
.disconnected_ingress_mixnet_client()
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnectionHandler {
|
||||
pub(crate) fn new(shared: &SharedData, remote_address: SocketAddr) -> Self {
|
||||
shared.metrics.network.new_active_ingress_mixnet_client();
|
||||
|
||||
ConnectionHandler {
|
||||
shared: SharedData {
|
||||
processing_config: shared.processing_config,
|
||||
sphinx_keys: shared.sphinx_keys.clone(),
|
||||
replay_protection_filter: shared.replay_protection_filter.clone(),
|
||||
mixnet_forwarder: shared.mixnet_forwarder.clone(),
|
||||
final_hop: shared.final_hop.clone(),
|
||||
noise_config: shared.noise_config.clone(),
|
||||
metrics: shared.metrics.clone(),
|
||||
shutdown_token: shared.shutdown_token.child_token(),
|
||||
},
|
||||
remote_address,
|
||||
pending_packets: PendingReplayCheckPackets::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine instant at which packet should get forwarded to the next hop.
|
||||
/// By using [`Instant`] rather than explicit [`Duration`] we minimise effects of
|
||||
/// the skew caused by being stuck in the channel queue.
|
||||
/// This method also clamps the maximum allowed delay so that nobody could send a bunch of packets
|
||||
/// with, for example, delays of 1 year thus causing denial of service
|
||||
fn create_delay_target(&self, now: Instant, delay: Option<Delay>) -> Option<Instant> {
|
||||
let delay = delay?.to_duration();
|
||||
|
||||
let delay = if delay > self.shared.processing_config.maximum_packet_delay {
|
||||
self.shared.processing_config.maximum_packet_delay
|
||||
} else {
|
||||
delay
|
||||
};
|
||||
trace!(
|
||||
"received packet will be delayed for {}ms",
|
||||
delay.as_millis()
|
||||
);
|
||||
|
||||
Some(now + delay)
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.forward_packet",
|
||||
skip(self, mix_packet, delay),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote_addr = %self.remote_address,
|
||||
delay_ms = tracing::field::Empty,
|
||||
)
|
||||
)]
|
||||
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
|
||||
if !self.shared.processing_config.forward_hop_processing_enabled {
|
||||
warn!(
|
||||
event = "packet.dropped.forward_disabled",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: forward hop processing disabled"
|
||||
);
|
||||
self.shared.dropped_forward_packet(self.remote_address.ip());
|
||||
return;
|
||||
}
|
||||
|
||||
let forward_instant = self.create_delay_target(now, delay);
|
||||
if let Some(target) = forward_instant {
|
||||
Span::current().record(
|
||||
"delay_ms",
|
||||
target.saturating_duration_since(now).as_millis() as u64,
|
||||
);
|
||||
}
|
||||
self.shared.forward_mix_packet(mix_packet, forward_instant);
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.final_hop",
|
||||
skip(self, final_hop_data),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote_addr = %self.remote_address,
|
||||
client_online,
|
||||
disk_fallback = false,
|
||||
ack_forwarded = false,
|
||||
)
|
||||
)]
|
||||
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop) {
|
||||
if !self.shared.processing_config.final_hop_processing_enabled {
|
||||
warn!(
|
||||
event = "packet.dropped.final_hop_disabled",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: final hop processing disabled"
|
||||
);
|
||||
self.shared
|
||||
.dropped_final_hop_packet(self.remote_address.ip());
|
||||
return;
|
||||
}
|
||||
|
||||
let client = final_hop_data.destination;
|
||||
let message = final_hop_data.message;
|
||||
let has_ack = final_hop_data.forward_ack.is_some();
|
||||
|
||||
// if possible attempt to push message directly to the client
|
||||
match self.shared.try_push_message_to_client(client, message) {
|
||||
Err(unsent_plaintext) => {
|
||||
// if that failed, store it on disk
|
||||
Span::current().record("client_online", false);
|
||||
match self
|
||||
.shared
|
||||
.store_processed_packet_payload(client, unsent_plaintext)
|
||||
.await
|
||||
{
|
||||
Err(err) => error!("Failed to store client data - {err}"),
|
||||
Ok(_) => {
|
||||
Span::current().record("disk_fallback", true);
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
.egress
|
||||
.add_disk_persisted_packet();
|
||||
trace!("Stored packet for {client}")
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
Span::current().record("client_online", true);
|
||||
trace!("Pushed received packet to {client}");
|
||||
}
|
||||
}
|
||||
|
||||
// if we managed to either push message directly to the [online] client or store it at
|
||||
// disk, forward the ack
|
||||
self.shared.forward_ack_packet(final_hop_data.forward_ack);
|
||||
if has_ack {
|
||||
Span::current().record("ack_forwarded", true);
|
||||
}
|
||||
}
|
||||
|
||||
fn within_deferral_threshold(&self, now: Instant) -> bool {
|
||||
let time_threshold = now
|
||||
.saturating_duration_since(self.pending_packets.last_acquired_mutex)
|
||||
<= self
|
||||
.shared
|
||||
.processing_config
|
||||
.maximum_replay_detection_deferral;
|
||||
|
||||
let count_threshold = self.pending_packets.packets.len()
|
||||
< self
|
||||
.shared
|
||||
.processing_config
|
||||
.maximum_replay_detection_pending_packets;
|
||||
|
||||
// time threshold is ignored if we currently have 0 packets queued up
|
||||
if self.pending_packets.packets.is_empty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"within deferral time threshold: {time_threshold}, count threshold: {count_threshold}"
|
||||
);
|
||||
|
||||
if !time_threshold {
|
||||
warn!(
|
||||
event = "replay_detection.deferral_exceeded",
|
||||
threshold_type = "time",
|
||||
deferred_count = self.pending_packets.total_count(),
|
||||
deferral_ms = now.saturating_duration_since(self.pending_packets.last_acquired_mutex).as_millis() as u64,
|
||||
remote_addr = %self.remote_address,
|
||||
"{}: time deferral threshold exceeded with {} pending packets",
|
||||
self.remote_address,
|
||||
self.pending_packets.total_count()
|
||||
)
|
||||
}
|
||||
|
||||
if !count_threshold {
|
||||
warn!(
|
||||
event = "replay_detection.deferral_exceeded",
|
||||
threshold_type = "count",
|
||||
deferred_count = self.pending_packets.total_count(),
|
||||
remote_addr = %self.remote_address,
|
||||
"{}: count deferral threshold exceeded",
|
||||
self.remote_address
|
||||
)
|
||||
}
|
||||
|
||||
time_threshold && count_threshold
|
||||
}
|
||||
|
||||
/// Resolve the sphinx key for the given rotation, recording the rotation
|
||||
/// label on the current tracing span. Returns `ExpiredKey` if the requested
|
||||
/// odd/even key has already been rotated out.
|
||||
fn resolve_rotation_key(
|
||||
&self,
|
||||
rotation: SphinxKeyRotation,
|
||||
) -> Result<SphinxKeyGuard, PacketProcessingError> {
|
||||
let rotation_label = match rotation {
|
||||
SphinxKeyRotation::Unknown => "unknown",
|
||||
SphinxKeyRotation::OddRotation => "odd",
|
||||
SphinxKeyRotation::EvenRotation => "even",
|
||||
};
|
||||
Span::current().record("key_rotation", rotation_label);
|
||||
|
||||
match rotation {
|
||||
SphinxKeyRotation::Unknown => Ok(self.shared.sphinx_keys.primary()),
|
||||
SphinxKeyRotation::OddRotation => self.shared.sphinx_keys.odd().ok_or_else(|| {
|
||||
warn!(
|
||||
event = "packet.dropped.expired_key",
|
||||
key_rotation = "odd",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: odd key rotation expired"
|
||||
);
|
||||
PacketProcessingError::ExpiredKey
|
||||
}),
|
||||
SphinxKeyRotation::EvenRotation => self.shared.sphinx_keys.even().ok_or_else(|| {
|
||||
warn!(
|
||||
event = "packet.dropped.expired_key",
|
||||
key_rotation = "even",
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping packet: even key rotation expired"
|
||||
);
|
||||
PacketProcessingError::ExpiredKey
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.sphinx_partial_unwrap",
|
||||
skip(self, packet),
|
||||
level = "debug",
|
||||
fields(key_rotation, unwrap_result,)
|
||||
)]
|
||||
fn try_partially_unwrap_packet(
|
||||
&self,
|
||||
packet: FramedNymPacket,
|
||||
) -> Result<PartialyUnwrappedPacketWithKeyRotation, PacketProcessingError> {
|
||||
let rotation = packet.header().key_rotation;
|
||||
|
||||
let result = match rotation {
|
||||
SphinxKeyRotation::Unknown => {
|
||||
// Unknown rotation: try primary, fallback to secondary
|
||||
let primary = self.resolve_rotation_key(rotation)?;
|
||||
let primary_rotation = primary.rotation_id();
|
||||
|
||||
match PartiallyUnwrappedPacket::new(packet, primary.inner().as_ref()) {
|
||||
Ok(unwrapped_packet) => {
|
||||
Ok(unwrapped_packet.with_key_rotation(primary_rotation))
|
||||
}
|
||||
Err((packet, err)) => {
|
||||
if let Some(secondary) = self.shared.sphinx_keys.secondary() {
|
||||
let secondary_rotation = secondary.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet, secondary.inner().as_ref())
|
||||
.map_err(|(_, err)| err)
|
||||
.map(|p| p.with_key_rotation(secondary_rotation))
|
||||
} else {
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let key = self.resolve_rotation_key(rotation)?;
|
||||
let rotation_id = key.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet, key.inner().as_ref())
|
||||
.map_err(|(_, err)| err)
|
||||
.map(|p| p.with_key_rotation(rotation_id))
|
||||
}
|
||||
};
|
||||
|
||||
Span::current().record("unwrap_result", if result.is_ok() { "ok" } else { "err" });
|
||||
result
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_replay_detection(
|
||||
&mut self,
|
||||
now: Instant,
|
||||
packet: FramedNymPacket,
|
||||
) {
|
||||
// 1. derive and expand shared secret
|
||||
// also check the header integrity
|
||||
let partially_unwrapped = match self.try_partially_unwrap_packet(packet) {
|
||||
Ok(unwrapped) => unwrapped,
|
||||
Err(err) => {
|
||||
trace!("failed to process received mix packet: {err}");
|
||||
warn!(
|
||||
event = "packet.dropped.malformed",
|
||||
error = %err,
|
||||
remote_addr = %self.remote_address,
|
||||
"dropping malformed packet"
|
||||
);
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
.ingress_malformed_packet(self.remote_address.ip());
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
self.pending_packets.push(now, partially_unwrapped);
|
||||
|
||||
// 2. check for packet replay
|
||||
// 2.1 first try it without locking
|
||||
if self.handle_pending_packets_batch_no_locking(now).await {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2.2 if we're within deferral threshold, just leave it queued up for another call
|
||||
if self.within_deferral_threshold(now) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2.3. otherwise block until we obtain the lock and clear the whole batch
|
||||
self.handle_pending_packets_batch(now).await;
|
||||
}
|
||||
|
||||
async fn handle_unwrapped_packet(
|
||||
&self,
|
||||
now: Instant,
|
||||
unwrapped_packet: Result<MixProcessingResult, PacketProcessingError>,
|
||||
) {
|
||||
// 2. increment our favourite metrics stats
|
||||
self.shared
|
||||
.update_metrics(&unwrapped_packet, self.remote_address.ip());
|
||||
|
||||
// 3. forward the packet to the relevant sink (if enabled)
|
||||
match unwrapped_packet {
|
||||
Err(err) => {
|
||||
trace!("failed to process received mix packet: {err}");
|
||||
}
|
||||
Ok(processed_packet) => match processed_packet.processing_data {
|
||||
MixProcessingResultData::ForwardHop { packet, delay } => {
|
||||
self.handle_forward_packet(now, packet, delay);
|
||||
}
|
||||
MixProcessingResultData::FinalHop { final_hop_data } => {
|
||||
self.handle_final_hop(final_hop_data).await;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_post_replay_detection_packets(
|
||||
&self,
|
||||
now: Instant,
|
||||
packets: HashMap<u32, Vec<PartiallyUnwrappedPacket>>,
|
||||
replay_check_results: HashMap<u32, Vec<bool>>,
|
||||
) {
|
||||
let mut replays_detected: u64 = 0;
|
||||
for (rotation_id, packets) in packets {
|
||||
let Some(replay_checks) = replay_check_results.get(&rotation_id) else {
|
||||
// this should never happen, but if we messed up, and it does, don't panic, just drop the packets
|
||||
error!("inconsistent replay check result - no values for rotation {rotation_id}");
|
||||
continue;
|
||||
};
|
||||
for (packet, &replayed) in packets.into_iter().zip(replay_checks) {
|
||||
let unwrapped_packet = if replayed {
|
||||
replays_detected += 1;
|
||||
warn!(
|
||||
event = "packet.dropped.replay",
|
||||
remote_addr = %self.remote_address,
|
||||
rotation_id,
|
||||
"dropping replayed packet"
|
||||
);
|
||||
Err(PacketProcessingError::PacketReplay)
|
||||
} else {
|
||||
packet.finalise_unwrapping()
|
||||
};
|
||||
|
||||
self.handle_unwrapped_packet(now, unwrapped_packet).await;
|
||||
}
|
||||
}
|
||||
if replays_detected > 0 {
|
||||
debug!(
|
||||
replays_detected,
|
||||
remote_addr = %self.remote_address,
|
||||
"replay detection batch completed with replays"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_pending_packets_batch_no_locking(&mut self, now: Instant) -> bool {
|
||||
let replay_tags = self.pending_packets.replay_tags();
|
||||
if replay_tags.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let replay_check_results = match self
|
||||
.shared
|
||||
.replay_protection_filter
|
||||
.batch_try_check_and_set(&replay_tags)
|
||||
{
|
||||
None => return false,
|
||||
Some(Ok(replay_check_results)) => replay_check_results,
|
||||
Some(Err(_)) => {
|
||||
// our mutex got poisoned - we have to shut down
|
||||
error!("CRITICAL FAILURE: replay bloomfilter mutex poisoning!");
|
||||
self.shared.shutdown_token.cancel();
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let batch = self.pending_packets.reset(now);
|
||||
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
|
||||
.await;
|
||||
true
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.replay_check_batch",
|
||||
skip(self),
|
||||
level = "debug",
|
||||
fields(batch_size, mutex_wait_ms,)
|
||||
)]
|
||||
async fn handle_pending_packets_batch(&mut self, now: Instant) {
|
||||
let replay_tags = self.pending_packets.replay_tags();
|
||||
if replay_tags.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let batch_size = self.pending_packets.total_count();
|
||||
Span::current().record("batch_size", batch_size as u64);
|
||||
|
||||
let mutex_start = Instant::now();
|
||||
let Ok(replay_check_results) = self
|
||||
.shared
|
||||
.replay_protection_filter
|
||||
.batch_check_and_set(&replay_tags)
|
||||
else {
|
||||
// our mutex got poisoned - we have to shut down
|
||||
error!("CRITICAL FAILURE: replay bloomfilter mutex poisoning!");
|
||||
self.shared.shutdown_token.cancel();
|
||||
return;
|
||||
};
|
||||
Span::current().record("mutex_wait_ms", mutex_start.elapsed().as_millis() as u64);
|
||||
|
||||
let batch = self.pending_packets.reset(now);
|
||||
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.sphinx_full_unwrap",
|
||||
skip(self, packet),
|
||||
level = "debug",
|
||||
fields(key_rotation)
|
||||
)]
|
||||
fn try_full_unwrap_packet(
|
||||
&self,
|
||||
packet: FramedNymPacket,
|
||||
) -> Result<MixProcessingResult, PacketProcessingError> {
|
||||
let key = self.resolve_rotation_key(packet.header().key_rotation)?;
|
||||
process_framed_packet(packet, key.inner().as_ref())
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_no_replay_detection(
|
||||
&mut self,
|
||||
now: Instant,
|
||||
packet: FramedNymPacket,
|
||||
) {
|
||||
let unwrapped_packet = self.try_full_unwrap_packet(packet);
|
||||
self.handle_unwrapped_packet(now, unwrapped_packet).await;
|
||||
}
|
||||
|
||||
#[instrument(skip(self, packet), level = "debug")]
|
||||
async fn handle_received_nym_packet(&mut self, packet: FramedNymPacket) {
|
||||
let now = Instant::now();
|
||||
|
||||
// 1. attempt to unwrap the packet
|
||||
// if it's a sphinx packet attempt to do pre-processing and replay detection
|
||||
if packet.is_sphinx() && !self.shared.replay_protection_filter.disabled() {
|
||||
self.handle_received_packet_with_replay_detection(now, packet)
|
||||
.await;
|
||||
} else {
|
||||
// otherwise just skip that whole procedure and go straight to payload unwrapping
|
||||
// (assuming the basic framing is valid)
|
||||
self.handle_received_packet_with_no_replay_detection(now, packet)
|
||||
.await;
|
||||
};
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.connection",
|
||||
skip(self, socket),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote = %self.remote_address,
|
||||
noise_handshake_ms = tracing::field::Empty,
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn handle_connection(&mut self, socket: TcpStream) {
|
||||
let handshake_start = Instant::now();
|
||||
let noise_stream = match upgrade_noise_responder(socket, &self.shared.noise_config).await {
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
Span::current().record(
|
||||
"noise_handshake_ms",
|
||||
handshake_start.elapsed().as_millis() as u64,
|
||||
);
|
||||
warn!(
|
||||
event = "connection.failed.noise",
|
||||
remote_addr = %self.remote_address,
|
||||
error = %err,
|
||||
"Noise responder handshake failed"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
Span::current().record(
|
||||
"noise_handshake_ms",
|
||||
handshake_start.elapsed().as_millis() as u64,
|
||||
);
|
||||
debug!(
|
||||
"Noise responder handshake completed for {:?}",
|
||||
self.remote_address
|
||||
);
|
||||
self.handle_stream(Framed::new(noise_stream, NymCodec))
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.stream",
|
||||
skip(self, mixnet_connection),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote = %self.remote_address,
|
||||
packets_processed = 0u64,
|
||||
exit_reason,
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn handle_stream(
|
||||
&mut self,
|
||||
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
|
||||
) {
|
||||
let mut packets_processed: u64 = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shared.shutdown_token.cancelled() => {
|
||||
trace!("connection handler: received shutdown");
|
||||
Span::current().record("exit_reason", "shutdown");
|
||||
break
|
||||
}
|
||||
maybe_framed_nym_packet = mixnet_connection.next() => {
|
||||
match maybe_framed_nym_packet {
|
||||
Some(Ok(packet)) => {
|
||||
self.handle_received_nym_packet(packet).await;
|
||||
packets_processed += 1;
|
||||
if packets_processed.is_multiple_of(SPAN_UPDATE_INTERVAL) {
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
warn!(
|
||||
event = "connection.corrupted",
|
||||
remote_addr = %self.remote_address,
|
||||
error = %err,
|
||||
packets_processed,
|
||||
"connection stream corrupted"
|
||||
);
|
||||
Span::current().record("exit_reason", "corrupted");
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
return
|
||||
}
|
||||
None => {
|
||||
debug!(
|
||||
remote_addr = %self.remote_address,
|
||||
packets_processed,
|
||||
"connection closed by remote"
|
||||
);
|
||||
Span::current().record("exit_reason", "closed_by_remote");
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
debug!("exiting and closing connection");
|
||||
}
|
||||
}
|
||||
@@ -2,20 +2,40 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::node::mixnet::SharedData;
|
||||
use crate::node::mixnet::packet_ingest::{IngressNymPacket, MixIngestSender};
|
||||
use futures::StreamExt;
|
||||
use nym_noise::connection::Connection;
|
||||
use nym_noise::upgrade_noise_responder;
|
||||
use nym_sphinx_framing::codec::NymCodec;
|
||||
use nym_task::ShutdownToken;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::codec::Framed;
|
||||
use tracing::{Span, debug, error, info, instrument, trace, warn};
|
||||
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use tracing::{debug, error, info, trace};
|
||||
use std::time::Instant;
|
||||
|
||||
/// How often (in packets) the stream-level span updates its packet count.
|
||||
const SPAN_UPDATE_INTERVAL: u64 = 10_000;
|
||||
|
||||
pub(crate) struct Listener {
|
||||
bind_address: SocketAddr,
|
||||
shared_data: SharedData,
|
||||
ingest_sender: MixIngestSender,
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub(crate) fn new(bind_address: SocketAddr, shared_data: SharedData) -> Self {
|
||||
pub(crate) fn new(
|
||||
bind_address: SocketAddr,
|
||||
shared_data: SharedData,
|
||||
ingest_sender: MixIngestSender,
|
||||
) -> Self {
|
||||
Listener {
|
||||
bind_address,
|
||||
shared_data,
|
||||
ingest_sender,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,10 +62,181 @@ impl Listener {
|
||||
break
|
||||
}
|
||||
connection = tcp_listener.accept() => {
|
||||
self.shared_data.try_handle_connection(connection);
|
||||
self.try_handle_connection(connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!("mixnet socket listener: Exiting");
|
||||
}
|
||||
|
||||
fn try_handle_connection(
|
||||
&self,
|
||||
accepted: io::Result<(TcpStream, SocketAddr)>,
|
||||
) -> Option<JoinHandle<()>> {
|
||||
match accepted {
|
||||
Ok((socket, remote_addr)) => {
|
||||
debug!("accepted incoming mixnet connection from: {remote_addr}");
|
||||
|
||||
let conn_handler = ConnectionHandler::new(
|
||||
&self.shared_data,
|
||||
remote_addr,
|
||||
self.ingest_sender.clone(),
|
||||
);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { conn_handler.handle_connection(socket).await });
|
||||
self.shared_data.log_connected_clients();
|
||||
Some(join_handle)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!("failed to accept incoming mixnet connection: {err}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ConnectionHandler {
|
||||
remote_addr: SocketAddr,
|
||||
shared_data: SharedData,
|
||||
ingest_sender: MixIngestSender,
|
||||
}
|
||||
|
||||
impl ConnectionHandler {
|
||||
fn new(shared: &SharedData, remote_addr: SocketAddr, ingest_sender: MixIngestSender) -> Self {
|
||||
Self {
|
||||
ingest_sender,
|
||||
remote_addr,
|
||||
shared_data: SharedData {
|
||||
processing_config: shared.processing_config,
|
||||
sphinx_keys: shared.sphinx_keys.clone(),
|
||||
replay_protection_filter: shared.replay_protection_filter.clone(),
|
||||
mixnet_forwarder: shared.mixnet_forwarder.clone(),
|
||||
final_hop: shared.final_hop.clone(),
|
||||
noise_config: shared.noise_config.clone(),
|
||||
metrics: shared.metrics.clone(),
|
||||
shutdown_token: shared.shutdown_token.child_token(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.connection",
|
||||
skip(self, socket),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote = %self.remote_addr,
|
||||
noise_handshake_ms = tracing::field::Empty,
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn handle_connection(&self, socket: TcpStream) {
|
||||
let handshake_start = Instant::now();
|
||||
let noise_stream =
|
||||
match upgrade_noise_responder(socket, &self.shared_data.noise_config).await {
|
||||
Ok(noise_stream) => noise_stream,
|
||||
Err(err) => {
|
||||
Span::current().record(
|
||||
"noise_handshake_ms",
|
||||
handshake_start.elapsed().as_millis() as u64,
|
||||
);
|
||||
warn!(
|
||||
event = "connection.failed.noise",
|
||||
remote_addr = %self.remote_addr,
|
||||
error = %err,
|
||||
"Noise responder handshake failed"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
Span::current().record(
|
||||
"noise_handshake_ms",
|
||||
handshake_start.elapsed().as_millis() as u64,
|
||||
);
|
||||
debug!(
|
||||
"Noise responder handshake completed for {:?}",
|
||||
self.remote_addr
|
||||
);
|
||||
self.handle_stream(Framed::new(noise_stream, NymCodec))
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.stream",
|
||||
skip(self, mixnet_connection),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote = %self.remote_addr,
|
||||
packets_processed = 0u64,
|
||||
exit_reason,
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn handle_stream(
|
||||
&self,
|
||||
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
|
||||
) {
|
||||
let mut packets_processed: u64 = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shared_data.shutdown_token.cancelled() => {
|
||||
trace!("connection handler: received shutdown");
|
||||
Span::current().record("exit_reason", "shutdown");
|
||||
break
|
||||
}
|
||||
maybe_framed_nym_packet = mixnet_connection.next() => {
|
||||
match maybe_framed_nym_packet {
|
||||
Some(Ok(packet)) => {
|
||||
let ingress_packet = IngressNymPacket::new(packet, Instant::now(), self.remote_addr);
|
||||
self.handle_received_nym_packet(ingress_packet);
|
||||
packets_processed += 1;
|
||||
if packets_processed.is_multiple_of(SPAN_UPDATE_INTERVAL) {
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
warn!(
|
||||
event = "connection.corrupted",
|
||||
remote_addr = %self.remote_addr,
|
||||
error = %err,
|
||||
packets_processed,
|
||||
"connection stream corrupted"
|
||||
);
|
||||
Span::current().record("exit_reason", "corrupted");
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
return
|
||||
}
|
||||
None => {
|
||||
debug!(
|
||||
remote_addr = %self.remote_addr,
|
||||
packets_processed,
|
||||
"connection closed by remote"
|
||||
);
|
||||
Span::current().record("exit_reason", "closed_by_remote");
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Span::current().record("packets_processed", packets_processed);
|
||||
debug!("exiting and closing connection");
|
||||
}
|
||||
|
||||
/// Attempt to add the packet to the processing queue. If there is no capacity available the packet will
|
||||
/// be dropped and the `mixnet_ingress_overflow_packets_dropped` metric will be incremented.
|
||||
fn handle_received_nym_packet(&self, packet: IngressNymPacket) {
|
||||
match self.ingest_sender.ingest_packet(packet) {
|
||||
Ok(_) => {}
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => self
|
||||
.shared_data
|
||||
.metrics
|
||||
.mixnet
|
||||
.ingress_dropped_overflow_packet(),
|
||||
Err(err) => {
|
||||
error!("unexpected error using ingress channel - shutting down: {err}");
|
||||
self.shared_data.shutdown_token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
pub(crate) mod handler;
|
||||
pub(crate) mod listener;
|
||||
pub(crate) mod packet_forwarding;
|
||||
pub(crate) mod packet_ingest;
|
||||
pub(crate) mod shared;
|
||||
|
||||
pub(crate) use listener::Listener;
|
||||
|
||||
@@ -0,0 +1,560 @@
|
||||
//! Mix Packet Ingest Worker implementation and tooling
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Instant;
|
||||
use std::{io, num::NonZero};
|
||||
|
||||
use crate::node::{
|
||||
key_rotation::active_keys::SphinxKeyGuard,
|
||||
mixnet::{SharedData, shared::ProcessingConfig},
|
||||
};
|
||||
use nym_node_metrics::mixnet::PacketKind;
|
||||
use nym_sphinx_forwarding::packet::MixPacket;
|
||||
use nym_sphinx_framing::packet::FramedNymPacket;
|
||||
use nym_sphinx_framing::processing::{
|
||||
MixPacketVersion, MixProcessingResult, MixProcessingResultData, PacketProcessingError,
|
||||
PartiallyUnwrappedPacket, PartiallyUnwrappedPacketWithKeyRotation, ProcessedFinalHop,
|
||||
process_framed_packet,
|
||||
};
|
||||
use nym_sphinx_params::SphinxKeyRotation;
|
||||
use nym_sphinx_types::Delay;
|
||||
use nym_task::ShutdownToken;
|
||||
use tracing::{Span, debug, error, info, instrument, trace, warn};
|
||||
|
||||
pub(crate) struct IngressNymPacket {
|
||||
packet: FramedNymPacket,
|
||||
received_at: Instant,
|
||||
received_from: SocketAddr,
|
||||
}
|
||||
|
||||
impl IngressNymPacket {
|
||||
pub(crate) fn new(
|
||||
packet: FramedNymPacket,
|
||||
received_at: Instant,
|
||||
received_from: SocketAddr,
|
||||
) -> Self {
|
||||
Self {
|
||||
packet,
|
||||
received_at,
|
||||
received_from,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MixPacketIngest {
|
||||
shared: SharedData,
|
||||
|
||||
// Keep a copy of the channel to prevent accidental exit
|
||||
packet_sender: MixIngestSender,
|
||||
packet_receiver: MixIngestReceiver,
|
||||
}
|
||||
|
||||
impl MixPacketIngest {
|
||||
pub fn new(shared: &SharedData) -> Self {
|
||||
let (packet_sender, packet_receiver) = mix_ingest_channels(&shared.processing_config);
|
||||
|
||||
Self {
|
||||
shared: SharedData {
|
||||
processing_config: shared.processing_config,
|
||||
sphinx_keys: shared.sphinx_keys.clone(),
|
||||
replay_protection_filter: shared.replay_protection_filter.clone(),
|
||||
mixnet_forwarder: shared.mixnet_forwarder.clone(),
|
||||
final_hop: shared.final_hop.clone(),
|
||||
noise_config: shared.noise_config.clone(),
|
||||
metrics: shared.metrics.clone(),
|
||||
shutdown_token: shared.shutdown_token.child_token(),
|
||||
},
|
||||
|
||||
packet_sender,
|
||||
packet_receiver,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sender(&self) -> MixIngestSender {
|
||||
self.packet_sender.clone()
|
||||
}
|
||||
|
||||
pub async fn run(&mut self, shutdown_token: ShutdownToken) {
|
||||
// this one is impossible to ever panic
|
||||
#[allow(clippy::unwrap_used)]
|
||||
let default_worker_count = NonZero::new(1).unwrap();
|
||||
let num_threads = std::thread::available_parallelism()
|
||||
.inspect_err(|e| warn!("unable to query available parallelism: {e}"))
|
||||
.unwrap_or(default_worker_count)
|
||||
.get();
|
||||
|
||||
let mut workers = tokio::task::JoinSet::new();
|
||||
|
||||
for _ in 0..num_threads {
|
||||
let recv = self.packet_receiver.clone();
|
||||
let worker = MixPacketIngestWorker {
|
||||
packet_receiver: recv,
|
||||
shared: SharedData {
|
||||
processing_config: self.shared.processing_config,
|
||||
sphinx_keys: self.shared.sphinx_keys.clone(),
|
||||
replay_protection_filter: self.shared.replay_protection_filter.clone(),
|
||||
mixnet_forwarder: self.shared.mixnet_forwarder.clone(),
|
||||
final_hop: self.shared.final_hop.clone(),
|
||||
noise_config: self.shared.noise_config.clone(),
|
||||
metrics: self.shared.metrics.clone(),
|
||||
shutdown_token: self.shared.shutdown_token.child_token(),
|
||||
},
|
||||
};
|
||||
|
||||
let worker_token = shutdown_token.child_token();
|
||||
workers.spawn(worker.run(worker_token));
|
||||
}
|
||||
info!("launched {num_threads} mix ingest workers");
|
||||
|
||||
while let Some(res) = workers.join_next().await {
|
||||
if let Err(err) = res {
|
||||
warn!("ingest worker closed with error: {err}");
|
||||
} else if !shutdown_token.is_cancelled() {
|
||||
warn!("ingest worker closed unexpectedly!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct MixPacketIngestWorker {
|
||||
shared: SharedData,
|
||||
|
||||
packet_receiver: MixIngestReceiver,
|
||||
}
|
||||
|
||||
impl MixPacketIngestWorker {
|
||||
pub async fn run(mut self, shutdown_token: ShutdownToken) {
|
||||
trace!("starting PacketIngest");
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown_token.cancelled() => {
|
||||
debug!("PacketIngest: Received shutdown");
|
||||
break;
|
||||
}
|
||||
new_packet = self.packet_receiver.recv_async() => {
|
||||
// this one is impossible to ever panic - the parent struct maintains a sender
|
||||
// and waits for this process to end. Therefore it can't happen that ALL senders
|
||||
// are dropped
|
||||
#[allow(clippy::expect_used)]
|
||||
let new_packet = new_packet.expect("the ingest receiver closed somehow");
|
||||
self.handle_ingest_packet(new_packet).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_ingest_packet(&mut self, packet: IngressNymPacket) {
|
||||
// 1. attempt to unwrap the packet
|
||||
// if it's a sphinx packet attempt to do pre-processing and replay detection
|
||||
if packet.packet.is_sphinx() && !self.shared.replay_protection_filter.disabled() {
|
||||
self.handle_received_packet_with_replay_detection(packet)
|
||||
.await;
|
||||
} else {
|
||||
// otherwise just skip that whole procedure and go straight to payload unwrapping
|
||||
// (assuming the basic framing is valid)
|
||||
self.handle_received_packet_with_no_replay_detection(packet)
|
||||
.await;
|
||||
};
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_replay_detection(&mut self, packet: IngressNymPacket) {
|
||||
let source = packet.received_from;
|
||||
let received_at = packet.received_at;
|
||||
|
||||
// 1. derive and expand shared secret
|
||||
// also check the header integrity
|
||||
let partially_unwrapped = match self.try_partially_unwrap_packet(packet) {
|
||||
Ok(unwrapped) => unwrapped,
|
||||
Err(err) => {
|
||||
trace!("failed to process received mix packet: {err}");
|
||||
warn!(
|
||||
event = "packet.dropped.malformed",
|
||||
error = %err,
|
||||
remote_addr = %source,
|
||||
"dropping malformed packet"
|
||||
);
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
.ingress_malformed_packet(source.ip());
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let rotation_id = partially_unwrapped.used_key_rotation;
|
||||
let Some(replay_tag) = partially_unwrapped.packet.replay_tag() else {
|
||||
error!("corrupted packet - replay tag was missing");
|
||||
return;
|
||||
};
|
||||
|
||||
let mutex_start = Instant::now();
|
||||
let Ok(replayed) = self
|
||||
.shared
|
||||
.replay_protection_filter
|
||||
.check_and_set(rotation_id, replay_tag)
|
||||
else {
|
||||
// our mutex got poisoned - we have to shut down
|
||||
error!("CRITICAL FAILURE: replay bloomfilter mutex poisoning!");
|
||||
self.shared.shutdown_token.cancel();
|
||||
return;
|
||||
};
|
||||
Span::current().record("mutex_wait_ms", mutex_start.elapsed().as_millis() as u64);
|
||||
|
||||
let unwrapped_packet = if replayed {
|
||||
warn!(
|
||||
event = "packet.dropped.replay",
|
||||
remote_addr = %source,
|
||||
rotation_id,
|
||||
"dropping replayed packet"
|
||||
);
|
||||
Err(PacketProcessingError::PacketReplay)
|
||||
} else {
|
||||
partially_unwrapped.packet.finalise_unwrapping()
|
||||
};
|
||||
|
||||
self.handle_unwrapped_packet(unwrapped_packet, source, received_at)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn handle_received_packet_with_no_replay_detection(&mut self, packet: IngressNymPacket) {
|
||||
let source = packet.received_from;
|
||||
let received_at = packet.received_at;
|
||||
let unwrapped_packet = self.try_full_unwrap_packet(packet);
|
||||
self.handle_unwrapped_packet(unwrapped_packet, source, received_at)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.sphinx_full_unwrap",
|
||||
skip(self, packet),
|
||||
level = "debug",
|
||||
fields(key_rotation)
|
||||
)]
|
||||
fn try_full_unwrap_packet(
|
||||
&self,
|
||||
packet: IngressNymPacket,
|
||||
) -> Result<MixProcessingResult, PacketProcessingError> {
|
||||
let key = self.resolve_rotation_key(packet.packet.header().key_rotation)?;
|
||||
process_framed_packet(packet.packet, key.inner().as_ref())
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.sphinx_partial_unwrap",
|
||||
skip(self, packet),
|
||||
level = "debug",
|
||||
fields(key_rotation, unwrap_result,)
|
||||
)]
|
||||
fn try_partially_unwrap_packet(
|
||||
&self,
|
||||
packet: IngressNymPacket,
|
||||
) -> Result<PartiallyUnwrappedPacketWithKeyRotation, PacketProcessingError> {
|
||||
let rotation = packet.packet.header().key_rotation;
|
||||
|
||||
let result = match rotation {
|
||||
SphinxKeyRotation::Unknown => {
|
||||
// Unknown rotation: try primary, fallback to secondary
|
||||
let primary = self.resolve_rotation_key(rotation)?;
|
||||
let primary_rotation = primary.rotation_id();
|
||||
|
||||
match PartiallyUnwrappedPacket::new(packet.packet, primary.inner().as_ref()) {
|
||||
Ok(unwrapped_packet) => {
|
||||
Ok(unwrapped_packet.with_key_rotation(primary_rotation))
|
||||
}
|
||||
Err((packet, err)) => {
|
||||
if let Some(secondary) = self.shared.sphinx_keys.secondary() {
|
||||
let secondary_rotation = secondary.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet, secondary.inner().as_ref())
|
||||
.map_err(|(_, err)| err)
|
||||
.map(|p| p.with_key_rotation(secondary_rotation))
|
||||
} else {
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let key = self.resolve_rotation_key(rotation)?;
|
||||
let rotation_id = key.rotation_id();
|
||||
PartiallyUnwrappedPacket::new(packet.packet, key.inner().as_ref())
|
||||
.map_err(|(_, err)| err)
|
||||
.map(|p| p.with_key_rotation(rotation_id))
|
||||
}
|
||||
};
|
||||
|
||||
Span::current().record("unwrap_result", if result.is_ok() { "ok" } else { "err" });
|
||||
result
|
||||
}
|
||||
|
||||
async fn handle_unwrapped_packet(
|
||||
&self,
|
||||
unwrapped_packet: Result<MixProcessingResult, PacketProcessingError>,
|
||||
source: SocketAddr,
|
||||
received_at: Instant,
|
||||
) {
|
||||
// 2. increment our favourite metrics stats
|
||||
self.update_metrics(&unwrapped_packet, source);
|
||||
|
||||
// 3. forward the packet to the relevant sink (if enabled)
|
||||
match unwrapped_packet {
|
||||
Err(err) => {
|
||||
trace!("failed to process received mix packet: {err}");
|
||||
}
|
||||
Ok(processed_packet) => match processed_packet.processing_data {
|
||||
MixProcessingResultData::ForwardHop { packet, delay } => {
|
||||
self.handle_forward_packet(packet, received_at, source, delay);
|
||||
}
|
||||
MixProcessingResultData::FinalHop { final_hop_data } => {
|
||||
self.handle_final_hop(final_hop_data, source).await;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn update_metrics(
|
||||
&self,
|
||||
processing_result: &Result<MixProcessingResult, PacketProcessingError>,
|
||||
source: SocketAddr,
|
||||
) {
|
||||
let Ok(processing_result) = processing_result else {
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
.ingress_malformed_packet(source.ip());
|
||||
return;
|
||||
};
|
||||
|
||||
let packet_version = convert_to_metrics_version(processing_result.packet_version);
|
||||
|
||||
match processing_result.processing_data {
|
||||
MixProcessingResultData::ForwardHop { delay, .. } => {
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
.ingress_received_forward_packet(source.ip(), packet_version);
|
||||
|
||||
// check if the delay wasn't excessive
|
||||
if let Some(delay) = delay
|
||||
&& delay.to_duration() > self.shared.processing_config.maximum_packet_delay
|
||||
{
|
||||
self.shared.metrics.mixnet.ingress_excessive_delay_packet()
|
||||
}
|
||||
}
|
||||
MixProcessingResultData::FinalHop { .. } => {
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
.ingress_received_final_hop_packet(source.ip(), packet_version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve the sphinx key for the given rotation, recording the rotation
|
||||
/// label on the current tracing span. Returns `ExpiredKey` if the requested
|
||||
/// odd/even key has already been rotated out.
|
||||
fn resolve_rotation_key(
|
||||
&self,
|
||||
rotation: SphinxKeyRotation,
|
||||
) -> Result<SphinxKeyGuard, PacketProcessingError> {
|
||||
let rotation_label = match rotation {
|
||||
SphinxKeyRotation::Unknown => "unknown",
|
||||
SphinxKeyRotation::OddRotation => "odd",
|
||||
SphinxKeyRotation::EvenRotation => "even",
|
||||
};
|
||||
Span::current().record("key_rotation", rotation_label);
|
||||
|
||||
match rotation {
|
||||
SphinxKeyRotation::Unknown => Ok(self.shared.sphinx_keys.primary()),
|
||||
SphinxKeyRotation::OddRotation => self.shared.sphinx_keys.odd().ok_or_else(|| {
|
||||
warn!(
|
||||
event = "packet.dropped.expired_key",
|
||||
key_rotation = "odd",
|
||||
"dropping packet: odd key rotation expired"
|
||||
);
|
||||
PacketProcessingError::ExpiredKey
|
||||
}),
|
||||
SphinxKeyRotation::EvenRotation => self.shared.sphinx_keys.even().ok_or_else(|| {
|
||||
warn!(
|
||||
event = "packet.dropped.expired_key",
|
||||
key_rotation = "even",
|
||||
"dropping packet: even key rotation expired"
|
||||
);
|
||||
PacketProcessingError::ExpiredKey
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.forward_packet",
|
||||
skip(self, mix_packet, delay),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote_addr = %remote_addr,
|
||||
delay_ms = tracing::field::Empty,
|
||||
)
|
||||
)]
|
||||
fn handle_forward_packet(
|
||||
&self,
|
||||
mix_packet: MixPacket,
|
||||
received_at: Instant,
|
||||
remote_addr: SocketAddr,
|
||||
delay: Option<Delay>,
|
||||
) {
|
||||
if !self.shared.processing_config.forward_hop_processing_enabled {
|
||||
warn!(
|
||||
event = "packet.dropped.forward_disabled",
|
||||
remote_addr = %remote_addr,
|
||||
"dropping packet: forward hop processing disabled"
|
||||
);
|
||||
self.shared.dropped_forward_packet(remote_addr.ip());
|
||||
return;
|
||||
}
|
||||
|
||||
let forward_instant = self.create_delay_target(received_at, delay);
|
||||
if let Some(target) = forward_instant {
|
||||
Span::current().record(
|
||||
"delay_ms",
|
||||
target.saturating_duration_since(received_at).as_millis() as u64,
|
||||
);
|
||||
}
|
||||
self.shared.forward_mix_packet(mix_packet, forward_instant);
|
||||
}
|
||||
|
||||
/// Determine instant at which packet should get forwarded to the next hop.
|
||||
/// By using [`Instant`] rather than explicit [`Duration`] we minimise effects of
|
||||
/// the skew caused by being stuck in the channel queue.
|
||||
/// This method also clamps the maximum allowed delay so that nobody could send a bunch of packets
|
||||
/// with, for example, delays of 1 year thus causing denial of service
|
||||
fn create_delay_target(&self, received_at: Instant, delay: Option<Delay>) -> Option<Instant> {
|
||||
let delay = delay?.to_duration();
|
||||
|
||||
let delay = if delay > self.shared.processing_config.maximum_packet_delay {
|
||||
self.shared.processing_config.maximum_packet_delay
|
||||
} else {
|
||||
delay
|
||||
};
|
||||
trace!(
|
||||
"received packet will be delayed for {}ms",
|
||||
delay.as_millis()
|
||||
);
|
||||
|
||||
Some(received_at + delay)
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
name = "mixnode.final_hop",
|
||||
skip(self, final_hop_data),
|
||||
level = "debug",
|
||||
fields(
|
||||
remote_addr = %remote_addr,
|
||||
client_online,
|
||||
disk_fallback = false,
|
||||
ack_forwarded = false,
|
||||
)
|
||||
)]
|
||||
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop, remote_addr: SocketAddr) {
|
||||
if !self.shared.processing_config.final_hop_processing_enabled {
|
||||
warn!(
|
||||
event = "packet.dropped.final_hop_disabled",
|
||||
remote_addr = %remote_addr,
|
||||
"dropping packet: final hop processing disabled"
|
||||
);
|
||||
self.shared.dropped_final_hop_packet(remote_addr.ip());
|
||||
return;
|
||||
}
|
||||
|
||||
let client = final_hop_data.destination;
|
||||
let message = final_hop_data.message;
|
||||
let has_ack = final_hop_data.forward_ack.is_some();
|
||||
|
||||
// if possible attempt to push message directly to the client
|
||||
match self.shared.try_push_message_to_client(client, message) {
|
||||
Err(unsent_plaintext) => {
|
||||
// if that failed, store it on disk
|
||||
Span::current().record("client_online", false);
|
||||
match self
|
||||
.shared
|
||||
.store_processed_packet_payload(client, unsent_plaintext)
|
||||
.await
|
||||
{
|
||||
Err(err) => error!("Failed to store client data - {err}"),
|
||||
Ok(_) => {
|
||||
Span::current().record("disk_fallback", true);
|
||||
self.shared
|
||||
.metrics
|
||||
.mixnet
|
||||
.egress
|
||||
.add_disk_persisted_packet();
|
||||
trace!("Stored packet for {client}")
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
Span::current().record("client_online", true);
|
||||
trace!("Pushed received packet to {client}");
|
||||
}
|
||||
}
|
||||
|
||||
// if we managed to either push message directly to the [online] client or store it at
|
||||
// disk, forward the ack
|
||||
self.shared.forward_ack_packet(final_hop_data.forward_ack);
|
||||
if has_ack {
|
||||
Span::current().record("ack_forwarded", true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_to_metrics_version(processed: MixPacketVersion) -> PacketKind {
|
||||
match processed {
|
||||
MixPacketVersion::Outfox => PacketKind::Outfox,
|
||||
MixPacketVersion::Sphinx(sphinx_version) => PacketKind::Sphinx(sphinx_version.value()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mix_ingest_channels(
|
||||
processing_config: &ProcessingConfig,
|
||||
) -> (MixIngestSender, MixIngestReceiver) {
|
||||
let (tx, rx) = flume::bounded(processing_config.ingress_channel_maximum_capacity);
|
||||
(MixIngestSender(tx), rx)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MixIngestSender(flume::Sender<IngressNymPacket>);
|
||||
|
||||
impl MixIngestSender {
|
||||
pub fn ingest_packet(&self, packet: impl Into<IngressNymPacket>) -> io::Result<()> {
|
||||
let sender = &self.0;
|
||||
|
||||
// we are using a bounded channel so unwrap is safe
|
||||
#[allow(clippy::unwrap_used)]
|
||||
let channel_capacity = sender.capacity().unwrap();
|
||||
let channel_used = sender.len();
|
||||
|
||||
let sending_res = sender.try_send(packet.into());
|
||||
|
||||
sending_res.map_err(|err| match err {
|
||||
flume::TrySendError::Full(_) => {
|
||||
warn!(
|
||||
event = "mixnode.ingress_try_send",
|
||||
result = "full_dropped",
|
||||
channel_capacity,
|
||||
channel_used,
|
||||
"dropping packet: ingress buffer is full ({channel_used}/{channel_capacity})"
|
||||
);
|
||||
io::Error::new(io::ErrorKind::WouldBlock, "ingress queue is full")
|
||||
}
|
||||
flume::TrySendError::Disconnected(_) => {
|
||||
debug!(
|
||||
event = "mixnode.ingress_try_send",
|
||||
result = "closed",
|
||||
channel_capacity,
|
||||
channel_used,
|
||||
"ingress queue is closed"
|
||||
);
|
||||
io::Error::new(io::ErrorKind::BrokenPipe, "ingress packet channel closed")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub type MixIngestReceiver = flume::Receiver<IngressNymPacket>;
|
||||
@@ -4,25 +4,16 @@
|
||||
use crate::config::Config;
|
||||
use crate::node::key_rotation::active_keys::ActiveSphinxKeys;
|
||||
use crate::node::mixnet::SharedFinalHopData;
|
||||
use crate::node::mixnet::handler::ConnectionHandler;
|
||||
use crate::node::replay_protection::bloomfilter::ReplayProtectionBloomfilters;
|
||||
use nym_gateway::node::GatewayStorageError;
|
||||
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketToForward};
|
||||
use nym_node_metrics::NymNodeMetrics;
|
||||
use nym_node_metrics::mixnet::PacketKind;
|
||||
use nym_noise::config::NoiseConfig;
|
||||
use nym_sphinx_forwarding::packet::MixPacket;
|
||||
use nym_sphinx_framing::processing::{
|
||||
MixPacketVersion, MixProcessingResult, MixProcessingResultData, PacketProcessingError,
|
||||
};
|
||||
use nym_sphinx_types::DestinationAddressBytes;
|
||||
use nym_task::ShutdownToken;
|
||||
use std::io;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Instant;
|
||||
use std::net::IpAddr;
|
||||
use std::time::{Duration, Instant};
|
||||
use tracing::{debug, error};
|
||||
|
||||
pub(crate) mod final_hop;
|
||||
@@ -30,13 +21,11 @@ pub(crate) mod final_hop;
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) struct ProcessingConfig {
|
||||
pub(crate) maximum_packet_delay: Duration,
|
||||
/// how long the task is willing to skip mutex acquisition before it will block the thread
|
||||
/// until it actually obtains it
|
||||
pub(crate) maximum_replay_detection_deferral: Duration,
|
||||
|
||||
/// how many packets the task is willing to queue before it will block the thread
|
||||
/// until it obtains the mutex
|
||||
pub(crate) maximum_replay_detection_pending_packets: usize,
|
||||
/// Channel capacity for the mixnet ingress channel. This determines the maximum number of
|
||||
/// packets that can be queued waiting for ingest processing. Once the queue is full packets
|
||||
/// will still be taken off the wire, but dropped as the node is too busy to handle them.
|
||||
pub(crate) ingress_channel_maximum_capacity: usize,
|
||||
|
||||
pub(crate) forward_hop_processing_enabled: bool,
|
||||
pub(crate) final_hop_processing_enabled: bool,
|
||||
@@ -46,16 +35,11 @@ impl ProcessingConfig {
|
||||
pub(crate) fn new(config: &Config) -> Self {
|
||||
ProcessingConfig {
|
||||
maximum_packet_delay: config.mixnet.debug.maximum_forward_packet_delay,
|
||||
maximum_replay_detection_deferral: config
|
||||
ingress_channel_maximum_capacity: config
|
||||
.mixnet
|
||||
.replay_protection
|
||||
.debug
|
||||
.maximum_replay_detection_deferral,
|
||||
maximum_replay_detection_pending_packets: config
|
||||
.mixnet
|
||||
.replay_protection
|
||||
.debug
|
||||
.maximum_replay_detection_pending_packets,
|
||||
.ingress_channel_maximum_capacity,
|
||||
forward_hop_processing_enabled: config.modes.mixnode,
|
||||
final_hop_processing_enabled: config.modes.expects_final_hop_traffic()
|
||||
|| config.wireguard.enabled,
|
||||
@@ -83,13 +67,6 @@ pub(crate) struct SharedData {
|
||||
pub(super) shutdown_token: ShutdownToken,
|
||||
}
|
||||
|
||||
fn convert_to_metrics_version(processed: MixPacketVersion) -> PacketKind {
|
||||
match processed {
|
||||
MixPacketVersion::Outfox => PacketKind::Outfox,
|
||||
MixPacketVersion::Sphinx(sphinx_version) => PacketKind::Sphinx(sphinx_version.value()),
|
||||
}
|
||||
}
|
||||
|
||||
impl SharedData {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
@@ -131,64 +108,11 @@ impl SharedData {
|
||||
self.metrics.mixnet.ingress_dropped_final_hop_packet(source)
|
||||
}
|
||||
|
||||
pub(super) fn update_metrics(
|
||||
&self,
|
||||
processing_result: &Result<MixProcessingResult, PacketProcessingError>,
|
||||
source: IpAddr,
|
||||
) {
|
||||
let Ok(processing_result) = processing_result else {
|
||||
self.metrics.mixnet.ingress_malformed_packet(source);
|
||||
return;
|
||||
};
|
||||
|
||||
let packet_version = convert_to_metrics_version(processing_result.packet_version);
|
||||
|
||||
match processing_result.processing_data {
|
||||
MixProcessingResultData::ForwardHop { delay, .. } => {
|
||||
self.metrics
|
||||
.mixnet
|
||||
.ingress_received_forward_packet(source, packet_version);
|
||||
|
||||
// check if the delay wasn't excessive
|
||||
if let Some(delay) = delay
|
||||
&& delay.to_duration() > self.processing_config.maximum_packet_delay
|
||||
{
|
||||
self.metrics.mixnet.ingress_excessive_delay_packet()
|
||||
}
|
||||
}
|
||||
MixProcessingResultData::FinalHop { .. } => {
|
||||
self.metrics
|
||||
.mixnet
|
||||
.ingress_received_final_hop_packet(source, packet_version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn try_handle_connection(
|
||||
&self,
|
||||
accepted: io::Result<(TcpStream, SocketAddr)>,
|
||||
) -> Option<JoinHandle<()>> {
|
||||
match accepted {
|
||||
Ok((socket, remote_addr)) => {
|
||||
debug!("accepted incoming mixnet connection from: {remote_addr}");
|
||||
let mut handler = ConnectionHandler::new(self, remote_addr);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { handler.handle_connection(socket).await });
|
||||
self.log_connected_clients();
|
||||
Some(join_handle)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!("failed to accept incoming mixnet connection: {err}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn forward_mix_packet(&self, packet: MixPacket, delay_until: Option<Instant>) {
|
||||
let has_delay = delay_until.is_some();
|
||||
if self
|
||||
.mixnet_forwarder
|
||||
.forward_packet(PacketToForward::new(packet, delay_until))
|
||||
.forward_packet(PacketToForward::new(packet, delay_until.map(Into::into)))
|
||||
.is_err()
|
||||
&& !self.shutdown_token.is_cancelled()
|
||||
{
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::node::metrics::handler::mixnet_data_cleaner::MixnetMetricsCleaner;
|
||||
use crate::node::metrics::handler::pending_egress_packets_updater::PendingEgressPacketsUpdater;
|
||||
use crate::node::mixnet::SharedFinalHopData;
|
||||
use crate::node::mixnet::packet_forwarding::PacketForwarder;
|
||||
use crate::node::mixnet::packet_ingest::MixPacketIngest;
|
||||
use crate::node::mixnet::shared::ProcessingConfig;
|
||||
use crate::node::nym_apis_client::NymApisClient;
|
||||
use crate::node::replay_protection::background_task::ReplayProtectionDiskFlush;
|
||||
@@ -1289,7 +1290,16 @@ impl NymNode {
|
||||
self.shutdown_token(),
|
||||
);
|
||||
|
||||
let mut mixnet_listener = mixnet::Listener::new(self.config.mixnet.bind_address, shared);
|
||||
let mut mix_packet_ingest = MixPacketIngest::new(&shared);
|
||||
let ingest_sender = mix_packet_ingest.sender();
|
||||
let ingest_shutdown = self.shutdown_token();
|
||||
self.shutdown_tracker().try_spawn_named(
|
||||
async move { mix_packet_ingest.run(ingest_shutdown).await },
|
||||
"MixPacketIngest",
|
||||
);
|
||||
|
||||
let mut mixnet_listener =
|
||||
mixnet::Listener::new(self.config.mixnet.bind_address, shared, ingest_sender);
|
||||
|
||||
let shutdown_token = self.shutdown_token();
|
||||
self.shutdown_tracker().try_spawn_named(
|
||||
|
||||
@@ -247,10 +247,11 @@ impl ReplayProtectionBloomfilters {
|
||||
// map from particular rotation id to vector of results, based on the order of requests received
|
||||
type BatchCheckResult = HashMap<u32, Vec<bool>>;
|
||||
|
||||
#[allow(unused)]
|
||||
impl ReplayProtectionBloomfilters {
|
||||
pub(crate) fn batch_try_check_and_set(
|
||||
&self,
|
||||
reply_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
|
||||
replay_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
|
||||
) -> Option<Result<BatchCheckResult, PoisonError<()>>> {
|
||||
let mut guard = match self.inner.try_lock() {
|
||||
Ok(guard) => guard,
|
||||
@@ -258,18 +259,43 @@ impl ReplayProtectionBloomfilters {
|
||||
Err(TryLockError::WouldBlock) => return None,
|
||||
};
|
||||
|
||||
Some(Ok(guard.batch_check_and_set(reply_tags)))
|
||||
Some(Ok(guard.batch_check_and_set(replay_tags)))
|
||||
}
|
||||
|
||||
pub(crate) fn batch_check_and_set(
|
||||
&self,
|
||||
reply_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
|
||||
replay_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
|
||||
) -> Result<HashMap<u32, Vec<bool>>, PoisonError<()>> {
|
||||
let Ok(mut guard) = self.inner.lock() else {
|
||||
return Err(PoisonError::new(()));
|
||||
};
|
||||
|
||||
Ok(guard.batch_check_and_set(reply_tags))
|
||||
Ok(guard.batch_check_and_set(replay_tags))
|
||||
}
|
||||
|
||||
pub(crate) fn try_check_and_set(
|
||||
&self,
|
||||
rotation_id: u32,
|
||||
replay_tag: &[u8; REPLAY_TAG_SIZE],
|
||||
) -> Option<Result<bool, PoisonError<()>>> {
|
||||
let mut guard = match self.inner.try_lock() {
|
||||
Ok(guard) => guard,
|
||||
Err(TryLockError::Poisoned(_)) => return Some(Err(PoisonError::new(()))),
|
||||
Err(TryLockError::WouldBlock) => return None,
|
||||
};
|
||||
|
||||
Some(Ok(guard.check_and_set(rotation_id, replay_tag)))
|
||||
}
|
||||
|
||||
pub(crate) fn check_and_set(
|
||||
&self,
|
||||
rotation_id: u32,
|
||||
replay_tag: &[u8; REPLAY_TAG_SIZE],
|
||||
) -> Result<bool, PoisonError<()>> {
|
||||
let Ok(mut guard) = self.inner.lock() else {
|
||||
return Err(PoisonError::new(()));
|
||||
};
|
||||
Ok(guard.check_and_set(rotation_id, replay_tag))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,50 +313,47 @@ struct ReplayProtectionBloomfiltersInner {
|
||||
pre_announced: Option<RotationFilter>,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
impl ReplayProtectionBloomfiltersInner {
|
||||
fn batch_check_and_set(
|
||||
&mut self,
|
||||
reply_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
|
||||
replay_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
|
||||
) -> HashMap<u32, Vec<bool>> {
|
||||
let mut result = HashMap::with_capacity(reply_tags.len());
|
||||
for (&rotation_id, reply_tags) in reply_tags {
|
||||
// try to 'find' the relevant filter. we might be doing 3 reads here, but realistically it's
|
||||
// going to be 'primary' most of the time and even if not, it's just few ns of overhead...
|
||||
let filter = if self.primary.metadata.rotation_id == rotation_id {
|
||||
Some(&mut self.primary.data)
|
||||
} else if let Some(secondary) = &mut self.overlap {
|
||||
// if let chaining won't be stable until 1.88 so we have to do the Option workaround
|
||||
if secondary.metadata.rotation_id == rotation_id {
|
||||
Some(&mut secondary.data)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else if let Some(pre_announced) = &mut self.pre_announced {
|
||||
if pre_announced.metadata.rotation_id == rotation_id {
|
||||
Some(&mut pre_announced.data)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let Some(filter) = filter else {
|
||||
// if we've received a packet from an unknown rotation, it most likely means it has been replayed
|
||||
// from an older rotation, so mark it as such
|
||||
result.insert(rotation_id, vec![false; reply_tags.len()]);
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut rotation_results = Vec::with_capacity(reply_tags.len());
|
||||
for tag in reply_tags {
|
||||
rotation_results.push(filter.check_and_set(tag))
|
||||
let mut result = HashMap::with_capacity(replay_tags.len());
|
||||
for (&rotation_id, replay_tags) in replay_tags {
|
||||
let mut rotation_results = Vec::with_capacity(replay_tags.len());
|
||||
for tag in replay_tags {
|
||||
rotation_results.push(self.check_and_set(rotation_id, tag))
|
||||
}
|
||||
result.insert(rotation_id, rotation_results);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn check_and_set(&mut self, rotation_id: u32, replay_tag: &[u8; 32]) -> bool {
|
||||
let filter = if self.primary.metadata.rotation_id == rotation_id {
|
||||
Some(&mut self.primary.data)
|
||||
} else if let Some(secondary) = &mut self.overlap
|
||||
&& secondary.metadata.rotation_id == rotation_id
|
||||
{
|
||||
Some(&mut secondary.data)
|
||||
} else if let Some(pre_announced) = &mut self.pre_announced
|
||||
&& pre_announced.metadata.rotation_id == rotation_id
|
||||
{
|
||||
Some(&mut pre_announced.data)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let Some(filter) = filter else {
|
||||
// if we've received a packet from an unknown rotation, it most likely means it has been replayed
|
||||
// from an older rotation, so mark it as such
|
||||
return false;
|
||||
};
|
||||
|
||||
filter.check_and_set(replay_tag)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RotationFilter {
|
||||
|
||||
Reference in New Issue
Block a user