Compare commits

...

4 Commits

Author SHA1 Message Date
jmwample 26b86dd887 appease clippy 2026-03-11 11:36:57 -06:00
jmwample 1bb786962f reduce size of outgoing buffer for forward mix traffic 2026-03-11 09:30:22 -06:00
jmwample 8cf2a29a6c mpmc ingest workers using flume 2026-03-09 11:07:20 -06:00
jmwample 4d62ff6ff1 adapt mixnet ingest to channel based model 2026-03-04 15:39:51 -07:00
14 changed files with 880 additions and 831 deletions
Generated
+17 -1
View File
@@ -2772,6 +2772,9 @@ name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
dependencies = [
"getrandom 0.2.16",
]
[[package]]
name = "ff"
@@ -2844,6 +2847,18 @@ dependencies = [
"spin",
]
[[package]]
name = "flume"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be"
dependencies = [
"fastrand",
"futures-core",
"futures-sink",
"spin",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -7149,6 +7164,7 @@ dependencies = [
"csv",
"cupid",
"dashmap",
"flume 0.12.0",
"futures",
"hex",
"hkdf",
@@ -11069,7 +11085,7 @@ checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea"
dependencies = [
"atoi",
"chrono",
"flume",
"flume 0.11.1",
"futures-channel",
"futures-core",
"futures-executor",
+1
View File
@@ -269,6 +269,7 @@ etherparse = "0.13.0"
eyre = "0.6.9"
fastrand = "2.1.1"
flate2 = "1.1.1"
flume = "0.12.0"
futures = "0.3.31"
futures-util = "0.3"
generic-array = "0.14.7"
+3 -3
View File
@@ -110,7 +110,7 @@ pub enum PacketProcessingError {
PacketReplay,
}
pub struct PartialyUnwrappedPacketWithKeyRotation {
pub struct PartiallyUnwrappedPacketWithKeyRotation {
pub packet: PartiallyUnwrappedPacket,
pub used_key_rotation: u32,
}
@@ -183,8 +183,8 @@ impl PartiallyUnwrappedPacket {
pub fn with_key_rotation(
self,
used_key_rotation: u32,
) -> PartialyUnwrappedPacketWithKeyRotation {
PartialyUnwrappedPacketWithKeyRotation {
) -> PartiallyUnwrappedPacketWithKeyRotation {
PartiallyUnwrappedPacketWithKeyRotation {
packet: self,
used_key_rotation,
}
+1
View File
@@ -30,6 +30,7 @@ console-subscriber = { workspace = true, optional = true }
csv = { workspace = true }
clap = { workspace = true, features = ["cargo", "env"] }
dashmap = { workspace = true }
flume= { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
humantime-serde = { workspace = true }
+13
View File
@@ -104,6 +104,12 @@ impl MixingStats {
.dropped += 1;
}
pub fn ingress_dropped_overflow_packet(&self) {
self.ingress
.overflow_packets_dropped
.fetch_add(1, Ordering::Relaxed);
}
pub fn ingress_dropped_final_hop_packet(&self, source: IpAddr) {
self.ingress
.final_hop_packets_dropped
@@ -252,6 +258,9 @@ pub struct IngressMixingStats {
// final hop packets (i.e. to gateway)
final_hop_packets_dropped: AtomicUsize,
/// Packets that were dropped because the ingress channel queue was full
overflow_packets_dropped: AtomicUsize,
senders: DashMap<IpAddr, IngressRecipientStats>,
}
@@ -284,6 +293,10 @@ impl IngressMixingStats {
self.final_hop_packets_dropped.load(Ordering::Relaxed)
}
pub fn overflow_packets_dropped(&self) -> usize {
self.overflow_packets_dropped.load(Ordering::Relaxed)
}
pub fn senders(&self) -> &DashMap<IpAddr, IngressRecipientStats> {
&self.senders
}
+7 -14
View File
@@ -657,13 +657,10 @@ pub struct ReplayProtectionDebug {
/// Specifies whether this node should **NOT** use replay protection
pub unsafe_disabled: bool,
/// How long the processing task is willing to skip mutex acquisition before it will block the thread
/// until it actually obtains it
pub maximum_replay_detection_deferral: Duration,
/// How many packets the processing task is willing to queue before it will block the thread
/// until it obtains the mutex
pub maximum_replay_detection_pending_packets: usize,
/// Channel capacity for the mixnet ingress channel. This determines the maximum number of
/// packets that can be queued waiting for ingest processing. Once the queue is full packets
/// will still be taken off the wire, but dropped as the node is too busy to handle them.
pub ingress_channel_maximum_capacity: usize,
/// Probability of false positives, fraction between 0 and 1 or a number indicating 1-in-p
pub false_positive_rate: f64,
@@ -689,9 +686,7 @@ pub struct ReplayProtectionDebug {
}
impl ReplayProtectionDebug {
pub const DEFAULT_MAXIMUM_REPLAY_DETECTION_DEFERRAL: Duration = Duration::from_millis(50);
pub const DEFAULT_MAXIMUM_REPLAY_DETECTION_PENDING_PACKETS: usize = 100;
pub const DEFAULT_INGRESS_CHANNEL_MAXIMUM_CAPACITY: usize = 2000;
// 12% (completely arbitrary)
pub const DEFAULT_BLOOMFILTER_SIZE_MULTIPLIER: f64 = 1.12;
@@ -755,9 +750,7 @@ impl Default for ReplayProtectionDebug {
fn default() -> Self {
ReplayProtectionDebug {
unsafe_disabled: false,
maximum_replay_detection_deferral: Self::DEFAULT_MAXIMUM_REPLAY_DETECTION_DEFERRAL,
maximum_replay_detection_pending_packets:
Self::DEFAULT_MAXIMUM_REPLAY_DETECTION_PENDING_PACKETS,
ingress_channel_maximum_capacity: Self::DEFAULT_INGRESS_CHANNEL_MAXIMUM_CAPACITY,
false_positive_rate: Self::DEFAULT_REPLAY_DETECTION_FALSE_POSITIVE_RATE,
initial_expected_packets_per_second: Self::DEFAULT_INITIAL_EXPECTED_PACKETS_PER_SECOND,
bloomfilter_minimum_packets_per_second_size:
@@ -802,7 +795,7 @@ impl MixnetDebug {
const DEFAULT_PACKET_FORWARDING_INITIAL_BACKOFF: Duration = Duration::from_millis(10_000);
const DEFAULT_PACKET_FORWARDING_MAXIMUM_BACKOFF: Duration = Duration::from_millis(300_000);
const DEFAULT_INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(1_500);
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 2000;
const DEFAULT_MAXIMUM_CONNECTION_BUFFER_SIZE: usize = 128;
}
impl Default for MixnetDebug {
@@ -614,16 +614,8 @@ pub async fn try_upgrade_config_v12<P: AsRef<Path>>(
},
debug: ReplayProtectionDebug {
unsafe_disabled: old_cfg.mixnet.replay_protection.debug.unsafe_disabled,
maximum_replay_detection_deferral: old_cfg
.mixnet
.replay_protection
.debug
.maximum_replay_detection_deferral,
maximum_replay_detection_pending_packets: old_cfg
.mixnet
.replay_protection
.debug
.maximum_replay_detection_pending_packets,
ingress_channel_maximum_capacity:
ReplayProtectionDebug::DEFAULT_INGRESS_CHANNEL_MAXIMUM_CAPACITY,
false_positive_rate: old_cfg.mixnet.replay_protection.debug.false_positive_rate,
initial_expected_packets_per_second: old_cfg
.mixnet
-675
View File
@@ -1,675 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::key_rotation::active_keys::SphinxKeyGuard;
use crate::node::mixnet::shared::SharedData;
use futures::StreamExt;
use nym_noise::connection::Connection;
use nym_noise::upgrade_noise_responder;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_framing::codec::NymCodec;
use nym_sphinx_framing::packet::FramedNymPacket;
use nym_sphinx_framing::processing::{
MixProcessingResult, MixProcessingResultData, PacketProcessingError, PartiallyUnwrappedPacket,
PartialyUnwrappedPacketWithKeyRotation, ProcessedFinalHop, process_framed_packet,
};
use nym_sphinx_params::SphinxKeyRotation;
use nym_sphinx_types::{Delay, REPLAY_TAG_SIZE};
use std::collections::HashMap;
use std::mem;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::codec::Framed;
use tracing::{Span, debug, error, instrument, trace, warn};
/// How often (in packets) the stream-level span updates its packet count.
const SPAN_UPDATE_INTERVAL: u64 = 10_000;
struct PendingReplayCheckPackets {
// map of rotation id used for packet creation to the packets
packets: HashMap<u32, Vec<PartiallyUnwrappedPacket>>,
last_acquired_mutex: Instant,
}
impl PendingReplayCheckPackets {
fn new() -> PendingReplayCheckPackets {
PendingReplayCheckPackets {
packets: Default::default(),
last_acquired_mutex: Instant::now(),
}
}
fn reset(&mut self, now: Instant) -> HashMap<u32, Vec<PartiallyUnwrappedPacket>> {
self.last_acquired_mutex = now;
mem::take(&mut self.packets)
}
fn push(&mut self, now: Instant, packet: PartialyUnwrappedPacketWithKeyRotation) {
if self.packets.is_empty() {
self.last_acquired_mutex = now;
}
self.packets
.entry(packet.used_key_rotation)
.or_default()
.push(packet.packet)
}
fn total_count(&self) -> usize {
self.packets.values().map(|v| v.len()).sum()
}
fn replay_tags(&self) -> HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>> {
let mut replay_tags = HashMap::with_capacity(self.packets.len());
'outer: for (rotation_id, packets) in &self.packets {
let mut rotation_replay_tags = Vec::with_capacity(packets.len());
for packet in packets {
let Some(replay_tag) = packet.replay_tag() else {
error!(
"corrupted batch of {} packets - replay tag was missing",
self.packets.len()
);
replay_tags.insert(*rotation_id, Vec::new());
continue 'outer;
};
rotation_replay_tags.push(replay_tag);
}
replay_tags.insert(*rotation_id, rotation_replay_tags);
}
replay_tags
}
}
pub(crate) struct ConnectionHandler {
shared: SharedData,
remote_address: SocketAddr,
// packets pending for replay detection
pending_packets: PendingReplayCheckPackets,
}
impl Drop for ConnectionHandler {
fn drop(&mut self) {
self.shared
.metrics
.network
.disconnected_ingress_mixnet_client()
}
}
impl ConnectionHandler {
pub(crate) fn new(shared: &SharedData, remote_address: SocketAddr) -> Self {
shared.metrics.network.new_active_ingress_mixnet_client();
ConnectionHandler {
shared: SharedData {
processing_config: shared.processing_config,
sphinx_keys: shared.sphinx_keys.clone(),
replay_protection_filter: shared.replay_protection_filter.clone(),
mixnet_forwarder: shared.mixnet_forwarder.clone(),
final_hop: shared.final_hop.clone(),
noise_config: shared.noise_config.clone(),
metrics: shared.metrics.clone(),
shutdown_token: shared.shutdown_token.child_token(),
},
remote_address,
pending_packets: PendingReplayCheckPackets::new(),
}
}
/// Determine instant at which packet should get forwarded to the next hop.
/// By using [`Instant`] rather than explicit [`Duration`] we minimise effects of
/// the skew caused by being stuck in the channel queue.
/// This method also clamps the maximum allowed delay so that nobody could send a bunch of packets
/// with, for example, delays of 1 year thus causing denial of service
fn create_delay_target(&self, now: Instant, delay: Option<Delay>) -> Option<Instant> {
let delay = delay?.to_duration();
let delay = if delay > self.shared.processing_config.maximum_packet_delay {
self.shared.processing_config.maximum_packet_delay
} else {
delay
};
trace!(
"received packet will be delayed for {}ms",
delay.as_millis()
);
Some(now + delay)
}
#[instrument(
name = "mixnode.forward_packet",
skip(self, mix_packet, delay),
level = "debug",
fields(
remote_addr = %self.remote_address,
delay_ms = tracing::field::Empty,
)
)]
fn handle_forward_packet(&self, now: Instant, mix_packet: MixPacket, delay: Option<Delay>) {
if !self.shared.processing_config.forward_hop_processing_enabled {
warn!(
event = "packet.dropped.forward_disabled",
remote_addr = %self.remote_address,
"dropping packet: forward hop processing disabled"
);
self.shared.dropped_forward_packet(self.remote_address.ip());
return;
}
let forward_instant = self.create_delay_target(now, delay);
if let Some(target) = forward_instant {
Span::current().record(
"delay_ms",
target.saturating_duration_since(now).as_millis() as u64,
);
}
self.shared.forward_mix_packet(mix_packet, forward_instant);
}
#[instrument(
name = "mixnode.final_hop",
skip(self, final_hop_data),
level = "debug",
fields(
remote_addr = %self.remote_address,
client_online,
disk_fallback = false,
ack_forwarded = false,
)
)]
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop) {
if !self.shared.processing_config.final_hop_processing_enabled {
warn!(
event = "packet.dropped.final_hop_disabled",
remote_addr = %self.remote_address,
"dropping packet: final hop processing disabled"
);
self.shared
.dropped_final_hop_packet(self.remote_address.ip());
return;
}
let client = final_hop_data.destination;
let message = final_hop_data.message;
let has_ack = final_hop_data.forward_ack.is_some();
// if possible attempt to push message directly to the client
match self.shared.try_push_message_to_client(client, message) {
Err(unsent_plaintext) => {
// if that failed, store it on disk
Span::current().record("client_online", false);
match self
.shared
.store_processed_packet_payload(client, unsent_plaintext)
.await
{
Err(err) => error!("Failed to store client data - {err}"),
Ok(_) => {
Span::current().record("disk_fallback", true);
self.shared
.metrics
.mixnet
.egress
.add_disk_persisted_packet();
trace!("Stored packet for {client}")
}
}
}
Ok(_) => {
Span::current().record("client_online", true);
trace!("Pushed received packet to {client}");
}
}
// if we managed to either push message directly to the [online] client or store it at
// disk, forward the ack
self.shared.forward_ack_packet(final_hop_data.forward_ack);
if has_ack {
Span::current().record("ack_forwarded", true);
}
}
fn within_deferral_threshold(&self, now: Instant) -> bool {
let time_threshold = now
.saturating_duration_since(self.pending_packets.last_acquired_mutex)
<= self
.shared
.processing_config
.maximum_replay_detection_deferral;
let count_threshold = self.pending_packets.packets.len()
< self
.shared
.processing_config
.maximum_replay_detection_pending_packets;
// time threshold is ignored if we currently have 0 packets queued up
if self.pending_packets.packets.is_empty() {
return true;
}
trace!(
"within deferral time threshold: {time_threshold}, count threshold: {count_threshold}"
);
if !time_threshold {
warn!(
event = "replay_detection.deferral_exceeded",
threshold_type = "time",
deferred_count = self.pending_packets.total_count(),
deferral_ms = now.saturating_duration_since(self.pending_packets.last_acquired_mutex).as_millis() as u64,
remote_addr = %self.remote_address,
"{}: time deferral threshold exceeded with {} pending packets",
self.remote_address,
self.pending_packets.total_count()
)
}
if !count_threshold {
warn!(
event = "replay_detection.deferral_exceeded",
threshold_type = "count",
deferred_count = self.pending_packets.total_count(),
remote_addr = %self.remote_address,
"{}: count deferral threshold exceeded",
self.remote_address
)
}
time_threshold && count_threshold
}
/// Resolve the sphinx key for the given rotation, recording the rotation
/// label on the current tracing span. Returns `ExpiredKey` if the requested
/// odd/even key has already been rotated out.
fn resolve_rotation_key(
&self,
rotation: SphinxKeyRotation,
) -> Result<SphinxKeyGuard, PacketProcessingError> {
let rotation_label = match rotation {
SphinxKeyRotation::Unknown => "unknown",
SphinxKeyRotation::OddRotation => "odd",
SphinxKeyRotation::EvenRotation => "even",
};
Span::current().record("key_rotation", rotation_label);
match rotation {
SphinxKeyRotation::Unknown => Ok(self.shared.sphinx_keys.primary()),
SphinxKeyRotation::OddRotation => self.shared.sphinx_keys.odd().ok_or_else(|| {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "odd",
remote_addr = %self.remote_address,
"dropping packet: odd key rotation expired"
);
PacketProcessingError::ExpiredKey
}),
SphinxKeyRotation::EvenRotation => self.shared.sphinx_keys.even().ok_or_else(|| {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "even",
remote_addr = %self.remote_address,
"dropping packet: even key rotation expired"
);
PacketProcessingError::ExpiredKey
}),
}
}
#[instrument(
name = "mixnode.sphinx_partial_unwrap",
skip(self, packet),
level = "debug",
fields(key_rotation, unwrap_result,)
)]
fn try_partially_unwrap_packet(
&self,
packet: FramedNymPacket,
) -> Result<PartialyUnwrappedPacketWithKeyRotation, PacketProcessingError> {
let rotation = packet.header().key_rotation;
let result = match rotation {
SphinxKeyRotation::Unknown => {
// Unknown rotation: try primary, fallback to secondary
let primary = self.resolve_rotation_key(rotation)?;
let primary_rotation = primary.rotation_id();
match PartiallyUnwrappedPacket::new(packet, primary.inner().as_ref()) {
Ok(unwrapped_packet) => {
Ok(unwrapped_packet.with_key_rotation(primary_rotation))
}
Err((packet, err)) => {
if let Some(secondary) = self.shared.sphinx_keys.secondary() {
let secondary_rotation = secondary.rotation_id();
PartiallyUnwrappedPacket::new(packet, secondary.inner().as_ref())
.map_err(|(_, err)| err)
.map(|p| p.with_key_rotation(secondary_rotation))
} else {
Err(err)
}
}
}
}
_ => {
let key = self.resolve_rotation_key(rotation)?;
let rotation_id = key.rotation_id();
PartiallyUnwrappedPacket::new(packet, key.inner().as_ref())
.map_err(|(_, err)| err)
.map(|p| p.with_key_rotation(rotation_id))
}
};
Span::current().record("unwrap_result", if result.is_ok() { "ok" } else { "err" });
result
}
async fn handle_received_packet_with_replay_detection(
&mut self,
now: Instant,
packet: FramedNymPacket,
) {
// 1. derive and expand shared secret
// also check the header integrity
let partially_unwrapped = match self.try_partially_unwrap_packet(packet) {
Ok(unwrapped) => unwrapped,
Err(err) => {
trace!("failed to process received mix packet: {err}");
warn!(
event = "packet.dropped.malformed",
error = %err,
remote_addr = %self.remote_address,
"dropping malformed packet"
);
self.shared
.metrics
.mixnet
.ingress_malformed_packet(self.remote_address.ip());
return;
}
};
self.pending_packets.push(now, partially_unwrapped);
// 2. check for packet replay
// 2.1 first try it without locking
if self.handle_pending_packets_batch_no_locking(now).await {
return;
}
// 2.2 if we're within deferral threshold, just leave it queued up for another call
if self.within_deferral_threshold(now) {
return;
}
// 2.3. otherwise block until we obtain the lock and clear the whole batch
self.handle_pending_packets_batch(now).await;
}
async fn handle_unwrapped_packet(
&self,
now: Instant,
unwrapped_packet: Result<MixProcessingResult, PacketProcessingError>,
) {
// 2. increment our favourite metrics stats
self.shared
.update_metrics(&unwrapped_packet, self.remote_address.ip());
// 3. forward the packet to the relevant sink (if enabled)
match unwrapped_packet {
Err(err) => {
trace!("failed to process received mix packet: {err}");
}
Ok(processed_packet) => match processed_packet.processing_data {
MixProcessingResultData::ForwardHop { packet, delay } => {
self.handle_forward_packet(now, packet, delay);
}
MixProcessingResultData::FinalHop { final_hop_data } => {
self.handle_final_hop(final_hop_data).await;
}
},
}
}
async fn handle_post_replay_detection_packets(
&self,
now: Instant,
packets: HashMap<u32, Vec<PartiallyUnwrappedPacket>>,
replay_check_results: HashMap<u32, Vec<bool>>,
) {
let mut replays_detected: u64 = 0;
for (rotation_id, packets) in packets {
let Some(replay_checks) = replay_check_results.get(&rotation_id) else {
// this should never happen, but if we messed up, and it does, don't panic, just drop the packets
error!("inconsistent replay check result - no values for rotation {rotation_id}");
continue;
};
for (packet, &replayed) in packets.into_iter().zip(replay_checks) {
let unwrapped_packet = if replayed {
replays_detected += 1;
warn!(
event = "packet.dropped.replay",
remote_addr = %self.remote_address,
rotation_id,
"dropping replayed packet"
);
Err(PacketProcessingError::PacketReplay)
} else {
packet.finalise_unwrapping()
};
self.handle_unwrapped_packet(now, unwrapped_packet).await;
}
}
if replays_detected > 0 {
debug!(
replays_detected,
remote_addr = %self.remote_address,
"replay detection batch completed with replays"
);
}
}
async fn handle_pending_packets_batch_no_locking(&mut self, now: Instant) -> bool {
let replay_tags = self.pending_packets.replay_tags();
if replay_tags.is_empty() {
return false;
}
let replay_check_results = match self
.shared
.replay_protection_filter
.batch_try_check_and_set(&replay_tags)
{
None => return false,
Some(Ok(replay_check_results)) => replay_check_results,
Some(Err(_)) => {
// our mutex got poisoned - we have to shut down
error!("CRITICAL FAILURE: replay bloomfilter mutex poisoning!");
self.shared.shutdown_token.cancel();
return false;
}
};
let batch = self.pending_packets.reset(now);
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
.await;
true
}
#[instrument(
name = "mixnode.replay_check_batch",
skip(self),
level = "debug",
fields(batch_size, mutex_wait_ms,)
)]
async fn handle_pending_packets_batch(&mut self, now: Instant) {
let replay_tags = self.pending_packets.replay_tags();
if replay_tags.is_empty() {
return;
}
let batch_size = self.pending_packets.total_count();
Span::current().record("batch_size", batch_size as u64);
let mutex_start = Instant::now();
let Ok(replay_check_results) = self
.shared
.replay_protection_filter
.batch_check_and_set(&replay_tags)
else {
// our mutex got poisoned - we have to shut down
error!("CRITICAL FAILURE: replay bloomfilter mutex poisoning!");
self.shared.shutdown_token.cancel();
return;
};
Span::current().record("mutex_wait_ms", mutex_start.elapsed().as_millis() as u64);
let batch = self.pending_packets.reset(now);
self.handle_post_replay_detection_packets(now, batch, replay_check_results)
.await;
}
#[instrument(
name = "mixnode.sphinx_full_unwrap",
skip(self, packet),
level = "debug",
fields(key_rotation)
)]
fn try_full_unwrap_packet(
&self,
packet: FramedNymPacket,
) -> Result<MixProcessingResult, PacketProcessingError> {
let key = self.resolve_rotation_key(packet.header().key_rotation)?;
process_framed_packet(packet, key.inner().as_ref())
}
async fn handle_received_packet_with_no_replay_detection(
&mut self,
now: Instant,
packet: FramedNymPacket,
) {
let unwrapped_packet = self.try_full_unwrap_packet(packet);
self.handle_unwrapped_packet(now, unwrapped_packet).await;
}
#[instrument(skip(self, packet), level = "debug")]
async fn handle_received_nym_packet(&mut self, packet: FramedNymPacket) {
let now = Instant::now();
// 1. attempt to unwrap the packet
// if it's a sphinx packet attempt to do pre-processing and replay detection
if packet.is_sphinx() && !self.shared.replay_protection_filter.disabled() {
self.handle_received_packet_with_replay_detection(now, packet)
.await;
} else {
// otherwise just skip that whole procedure and go straight to payload unwrapping
// (assuming the basic framing is valid)
self.handle_received_packet_with_no_replay_detection(now, packet)
.await;
};
}
#[instrument(
name = "mixnode.connection",
skip(self, socket),
level = "debug",
fields(
remote = %self.remote_address,
noise_handshake_ms = tracing::field::Empty,
)
)]
pub(crate) async fn handle_connection(&mut self, socket: TcpStream) {
let handshake_start = Instant::now();
let noise_stream = match upgrade_noise_responder(socket, &self.shared.noise_config).await {
Ok(noise_stream) => noise_stream,
Err(err) => {
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
warn!(
event = "connection.failed.noise",
remote_addr = %self.remote_address,
error = %err,
"Noise responder handshake failed"
);
return;
}
};
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
debug!(
"Noise responder handshake completed for {:?}",
self.remote_address
);
self.handle_stream(Framed::new(noise_stream, NymCodec))
.await
}
#[instrument(
name = "mixnode.stream",
skip(self, mixnet_connection),
level = "debug",
fields(
remote = %self.remote_address,
packets_processed = 0u64,
exit_reason,
)
)]
pub(crate) async fn handle_stream(
&mut self,
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
) {
let mut packets_processed: u64 = 0;
loop {
tokio::select! {
biased;
_ = self.shared.shutdown_token.cancelled() => {
trace!("connection handler: received shutdown");
Span::current().record("exit_reason", "shutdown");
break
}
maybe_framed_nym_packet = mixnet_connection.next() => {
match maybe_framed_nym_packet {
Some(Ok(packet)) => {
self.handle_received_nym_packet(packet).await;
packets_processed += 1;
if packets_processed.is_multiple_of(SPAN_UPDATE_INTERVAL) {
Span::current().record("packets_processed", packets_processed);
}
}
Some(Err(err)) => {
warn!(
event = "connection.corrupted",
remote_addr = %self.remote_address,
error = %err,
packets_processed,
"connection stream corrupted"
);
Span::current().record("exit_reason", "corrupted");
Span::current().record("packets_processed", packets_processed);
return
}
None => {
debug!(
remote_addr = %self.remote_address,
packets_processed,
"connection closed by remote"
);
Span::current().record("exit_reason", "closed_by_remote");
Span::current().record("packets_processed", packets_processed);
return
}
}
}
}
}
Span::current().record("packets_processed", packets_processed);
debug!("exiting and closing connection");
}
}
+194 -3
View File
@@ -2,20 +2,40 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::node::mixnet::SharedData;
use crate::node::mixnet::packet_ingest::{IngressNymPacket, MixIngestSender};
use futures::StreamExt;
use nym_noise::connection::Connection;
use nym_noise::upgrade_noise_responder;
use nym_sphinx_framing::codec::NymCodec;
use nym_task::ShutdownToken;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tokio_util::codec::Framed;
use tracing::{Span, debug, error, info, instrument, trace, warn};
use std::io;
use std::net::SocketAddr;
use tracing::{debug, error, info, trace};
use std::time::Instant;
/// How often (in packets) the stream-level span updates its packet count.
const SPAN_UPDATE_INTERVAL: u64 = 10_000;
pub(crate) struct Listener {
bind_address: SocketAddr,
shared_data: SharedData,
ingest_sender: MixIngestSender,
}
impl Listener {
pub(crate) fn new(bind_address: SocketAddr, shared_data: SharedData) -> Self {
pub(crate) fn new(
bind_address: SocketAddr,
shared_data: SharedData,
ingest_sender: MixIngestSender,
) -> Self {
Listener {
bind_address,
shared_data,
ingest_sender,
}
}
@@ -42,10 +62,181 @@ impl Listener {
break
}
connection = tcp_listener.accept() => {
self.shared_data.try_handle_connection(connection);
self.try_handle_connection(connection);
}
}
}
debug!("mixnet socket listener: Exiting");
}
fn try_handle_connection(
&self,
accepted: io::Result<(TcpStream, SocketAddr)>,
) -> Option<JoinHandle<()>> {
match accepted {
Ok((socket, remote_addr)) => {
debug!("accepted incoming mixnet connection from: {remote_addr}");
let conn_handler = ConnectionHandler::new(
&self.shared_data,
remote_addr,
self.ingest_sender.clone(),
);
let join_handle =
tokio::spawn(async move { conn_handler.handle_connection(socket).await });
self.shared_data.log_connected_clients();
Some(join_handle)
}
Err(err) => {
debug!("failed to accept incoming mixnet connection: {err}");
None
}
}
}
}
struct ConnectionHandler {
remote_addr: SocketAddr,
shared_data: SharedData,
ingest_sender: MixIngestSender,
}
impl ConnectionHandler {
fn new(shared: &SharedData, remote_addr: SocketAddr, ingest_sender: MixIngestSender) -> Self {
Self {
ingest_sender,
remote_addr,
shared_data: SharedData {
processing_config: shared.processing_config,
sphinx_keys: shared.sphinx_keys.clone(),
replay_protection_filter: shared.replay_protection_filter.clone(),
mixnet_forwarder: shared.mixnet_forwarder.clone(),
final_hop: shared.final_hop.clone(),
noise_config: shared.noise_config.clone(),
metrics: shared.metrics.clone(),
shutdown_token: shared.shutdown_token.child_token(),
},
}
}
#[instrument(
name = "mixnode.connection",
skip(self, socket),
level = "debug",
fields(
remote = %self.remote_addr,
noise_handshake_ms = tracing::field::Empty,
)
)]
pub(crate) async fn handle_connection(&self, socket: TcpStream) {
let handshake_start = Instant::now();
let noise_stream =
match upgrade_noise_responder(socket, &self.shared_data.noise_config).await {
Ok(noise_stream) => noise_stream,
Err(err) => {
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
warn!(
event = "connection.failed.noise",
remote_addr = %self.remote_addr,
error = %err,
"Noise responder handshake failed"
);
return;
}
};
Span::current().record(
"noise_handshake_ms",
handshake_start.elapsed().as_millis() as u64,
);
debug!(
"Noise responder handshake completed for {:?}",
self.remote_addr
);
self.handle_stream(Framed::new(noise_stream, NymCodec))
.await
}
#[instrument(
name = "mixnode.stream",
skip(self, mixnet_connection),
level = "debug",
fields(
remote = %self.remote_addr,
packets_processed = 0u64,
exit_reason,
)
)]
pub(crate) async fn handle_stream(
&self,
mut mixnet_connection: Framed<Connection<TcpStream>, NymCodec>,
) {
let mut packets_processed: u64 = 0;
loop {
tokio::select! {
biased;
_ = self.shared_data.shutdown_token.cancelled() => {
trace!("connection handler: received shutdown");
Span::current().record("exit_reason", "shutdown");
break
}
maybe_framed_nym_packet = mixnet_connection.next() => {
match maybe_framed_nym_packet {
Some(Ok(packet)) => {
let ingress_packet = IngressNymPacket::new(packet, Instant::now(), self.remote_addr);
self.handle_received_nym_packet(ingress_packet);
packets_processed += 1;
if packets_processed.is_multiple_of(SPAN_UPDATE_INTERVAL) {
Span::current().record("packets_processed", packets_processed);
}
}
Some(Err(err)) => {
warn!(
event = "connection.corrupted",
remote_addr = %self.remote_addr,
error = %err,
packets_processed,
"connection stream corrupted"
);
Span::current().record("exit_reason", "corrupted");
Span::current().record("packets_processed", packets_processed);
return
}
None => {
debug!(
remote_addr = %self.remote_addr,
packets_processed,
"connection closed by remote"
);
Span::current().record("exit_reason", "closed_by_remote");
Span::current().record("packets_processed", packets_processed);
return
}
}
}
}
}
Span::current().record("packets_processed", packets_processed);
debug!("exiting and closing connection");
}
/// Attempt to add the packet to the processing queue. If there is no capacity available the packet will
/// be dropped and the `mixnet_ingress_overflow_packets_dropped` metric will be incremented.
fn handle_received_nym_packet(&self, packet: IngressNymPacket) {
match self.ingest_sender.ingest_packet(packet) {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => self
.shared_data
.metrics
.mixnet
.ingress_dropped_overflow_packet(),
Err(err) => {
error!("unexpected error using ingress channel - shutting down: {err}");
self.shared_data.shutdown_token.cancel();
}
}
}
}
+1 -1
View File
@@ -1,9 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub(crate) mod handler;
pub(crate) mod listener;
pub(crate) mod packet_forwarding;
pub(crate) mod packet_ingest;
pub(crate) mod shared;
pub(crate) use listener::Listener;
+560
View File
@@ -0,0 +1,560 @@
//! Mix Packet Ingest Worker implementation and tooling
use std::net::SocketAddr;
use std::time::Instant;
use std::{io, num::NonZero};
use crate::node::{
key_rotation::active_keys::SphinxKeyGuard,
mixnet::{SharedData, shared::ProcessingConfig},
};
use nym_node_metrics::mixnet::PacketKind;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_framing::packet::FramedNymPacket;
use nym_sphinx_framing::processing::{
MixPacketVersion, MixProcessingResult, MixProcessingResultData, PacketProcessingError,
PartiallyUnwrappedPacket, PartiallyUnwrappedPacketWithKeyRotation, ProcessedFinalHop,
process_framed_packet,
};
use nym_sphinx_params::SphinxKeyRotation;
use nym_sphinx_types::Delay;
use nym_task::ShutdownToken;
use tracing::{Span, debug, error, info, instrument, trace, warn};
pub(crate) struct IngressNymPacket {
packet: FramedNymPacket,
received_at: Instant,
received_from: SocketAddr,
}
impl IngressNymPacket {
pub(crate) fn new(
packet: FramedNymPacket,
received_at: Instant,
received_from: SocketAddr,
) -> Self {
Self {
packet,
received_at,
received_from,
}
}
}
pub struct MixPacketIngest {
shared: SharedData,
// Keep a copy of the channel to prevent accidental exit
packet_sender: MixIngestSender,
packet_receiver: MixIngestReceiver,
}
impl MixPacketIngest {
pub fn new(shared: &SharedData) -> Self {
let (packet_sender, packet_receiver) = mix_ingest_channels(&shared.processing_config);
Self {
shared: SharedData {
processing_config: shared.processing_config,
sphinx_keys: shared.sphinx_keys.clone(),
replay_protection_filter: shared.replay_protection_filter.clone(),
mixnet_forwarder: shared.mixnet_forwarder.clone(),
final_hop: shared.final_hop.clone(),
noise_config: shared.noise_config.clone(),
metrics: shared.metrics.clone(),
shutdown_token: shared.shutdown_token.child_token(),
},
packet_sender,
packet_receiver,
}
}
pub fn sender(&self) -> MixIngestSender {
self.packet_sender.clone()
}
pub async fn run(&mut self, shutdown_token: ShutdownToken) {
// this one is impossible to ever panic
#[allow(clippy::unwrap_used)]
let default_worker_count = NonZero::new(1).unwrap();
let num_threads = std::thread::available_parallelism()
.inspect_err(|e| warn!("unable to query available parallelism: {e}"))
.unwrap_or(default_worker_count)
.get();
let mut workers = tokio::task::JoinSet::new();
for _ in 0..num_threads {
let recv = self.packet_receiver.clone();
let worker = MixPacketIngestWorker {
packet_receiver: recv,
shared: SharedData {
processing_config: self.shared.processing_config,
sphinx_keys: self.shared.sphinx_keys.clone(),
replay_protection_filter: self.shared.replay_protection_filter.clone(),
mixnet_forwarder: self.shared.mixnet_forwarder.clone(),
final_hop: self.shared.final_hop.clone(),
noise_config: self.shared.noise_config.clone(),
metrics: self.shared.metrics.clone(),
shutdown_token: self.shared.shutdown_token.child_token(),
},
};
let worker_token = shutdown_token.child_token();
workers.spawn(worker.run(worker_token));
}
info!("launched {num_threads} mix ingest workers");
while let Some(res) = workers.join_next().await {
if let Err(err) = res {
warn!("ingest worker closed with error: {err}");
} else if !shutdown_token.is_cancelled() {
warn!("ingest worker closed unexpectedly!");
}
}
}
}
struct MixPacketIngestWorker {
shared: SharedData,
packet_receiver: MixIngestReceiver,
}
impl MixPacketIngestWorker {
pub async fn run(mut self, shutdown_token: ShutdownToken) {
trace!("starting PacketIngest");
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
debug!("PacketIngest: Received shutdown");
break;
}
new_packet = self.packet_receiver.recv_async() => {
// this one is impossible to ever panic - the parent struct maintains a sender
// and waits for this process to end. Therefore it can't happen that ALL senders
// are dropped
#[allow(clippy::expect_used)]
let new_packet = new_packet.expect("the ingest receiver closed somehow");
self.handle_ingest_packet(new_packet).await;
}
}
}
}
async fn handle_ingest_packet(&mut self, packet: IngressNymPacket) {
// 1. attempt to unwrap the packet
// if it's a sphinx packet attempt to do pre-processing and replay detection
if packet.packet.is_sphinx() && !self.shared.replay_protection_filter.disabled() {
self.handle_received_packet_with_replay_detection(packet)
.await;
} else {
// otherwise just skip that whole procedure and go straight to payload unwrapping
// (assuming the basic framing is valid)
self.handle_received_packet_with_no_replay_detection(packet)
.await;
};
}
async fn handle_received_packet_with_replay_detection(&mut self, packet: IngressNymPacket) {
let source = packet.received_from;
let received_at = packet.received_at;
// 1. derive and expand shared secret
// also check the header integrity
let partially_unwrapped = match self.try_partially_unwrap_packet(packet) {
Ok(unwrapped) => unwrapped,
Err(err) => {
trace!("failed to process received mix packet: {err}");
warn!(
event = "packet.dropped.malformed",
error = %err,
remote_addr = %source,
"dropping malformed packet"
);
self.shared
.metrics
.mixnet
.ingress_malformed_packet(source.ip());
return;
}
};
let rotation_id = partially_unwrapped.used_key_rotation;
let Some(replay_tag) = partially_unwrapped.packet.replay_tag() else {
error!("corrupted packet - replay tag was missing");
return;
};
let mutex_start = Instant::now();
let Ok(replayed) = self
.shared
.replay_protection_filter
.check_and_set(rotation_id, replay_tag)
else {
// our mutex got poisoned - we have to shut down
error!("CRITICAL FAILURE: replay bloomfilter mutex poisoning!");
self.shared.shutdown_token.cancel();
return;
};
Span::current().record("mutex_wait_ms", mutex_start.elapsed().as_millis() as u64);
let unwrapped_packet = if replayed {
warn!(
event = "packet.dropped.replay",
remote_addr = %source,
rotation_id,
"dropping replayed packet"
);
Err(PacketProcessingError::PacketReplay)
} else {
partially_unwrapped.packet.finalise_unwrapping()
};
self.handle_unwrapped_packet(unwrapped_packet, source, received_at)
.await;
}
async fn handle_received_packet_with_no_replay_detection(&mut self, packet: IngressNymPacket) {
let source = packet.received_from;
let received_at = packet.received_at;
let unwrapped_packet = self.try_full_unwrap_packet(packet);
self.handle_unwrapped_packet(unwrapped_packet, source, received_at)
.await;
}
#[instrument(
name = "mixnode.sphinx_full_unwrap",
skip(self, packet),
level = "debug",
fields(key_rotation)
)]
fn try_full_unwrap_packet(
&self,
packet: IngressNymPacket,
) -> Result<MixProcessingResult, PacketProcessingError> {
let key = self.resolve_rotation_key(packet.packet.header().key_rotation)?;
process_framed_packet(packet.packet, key.inner().as_ref())
}
#[instrument(
name = "mixnode.sphinx_partial_unwrap",
skip(self, packet),
level = "debug",
fields(key_rotation, unwrap_result,)
)]
fn try_partially_unwrap_packet(
&self,
packet: IngressNymPacket,
) -> Result<PartiallyUnwrappedPacketWithKeyRotation, PacketProcessingError> {
let rotation = packet.packet.header().key_rotation;
let result = match rotation {
SphinxKeyRotation::Unknown => {
// Unknown rotation: try primary, fallback to secondary
let primary = self.resolve_rotation_key(rotation)?;
let primary_rotation = primary.rotation_id();
match PartiallyUnwrappedPacket::new(packet.packet, primary.inner().as_ref()) {
Ok(unwrapped_packet) => {
Ok(unwrapped_packet.with_key_rotation(primary_rotation))
}
Err((packet, err)) => {
if let Some(secondary) = self.shared.sphinx_keys.secondary() {
let secondary_rotation = secondary.rotation_id();
PartiallyUnwrappedPacket::new(packet, secondary.inner().as_ref())
.map_err(|(_, err)| err)
.map(|p| p.with_key_rotation(secondary_rotation))
} else {
Err(err)
}
}
}
}
_ => {
let key = self.resolve_rotation_key(rotation)?;
let rotation_id = key.rotation_id();
PartiallyUnwrappedPacket::new(packet.packet, key.inner().as_ref())
.map_err(|(_, err)| err)
.map(|p| p.with_key_rotation(rotation_id))
}
};
Span::current().record("unwrap_result", if result.is_ok() { "ok" } else { "err" });
result
}
async fn handle_unwrapped_packet(
&self,
unwrapped_packet: Result<MixProcessingResult, PacketProcessingError>,
source: SocketAddr,
received_at: Instant,
) {
// 2. increment our favourite metrics stats
self.update_metrics(&unwrapped_packet, source);
// 3. forward the packet to the relevant sink (if enabled)
match unwrapped_packet {
Err(err) => {
trace!("failed to process received mix packet: {err}");
}
Ok(processed_packet) => match processed_packet.processing_data {
MixProcessingResultData::ForwardHop { packet, delay } => {
self.handle_forward_packet(packet, received_at, source, delay);
}
MixProcessingResultData::FinalHop { final_hop_data } => {
self.handle_final_hop(final_hop_data, source).await;
}
},
}
}
fn update_metrics(
&self,
processing_result: &Result<MixProcessingResult, PacketProcessingError>,
source: SocketAddr,
) {
let Ok(processing_result) = processing_result else {
self.shared
.metrics
.mixnet
.ingress_malformed_packet(source.ip());
return;
};
let packet_version = convert_to_metrics_version(processing_result.packet_version);
match processing_result.processing_data {
MixProcessingResultData::ForwardHop { delay, .. } => {
self.shared
.metrics
.mixnet
.ingress_received_forward_packet(source.ip(), packet_version);
// check if the delay wasn't excessive
if let Some(delay) = delay
&& delay.to_duration() > self.shared.processing_config.maximum_packet_delay
{
self.shared.metrics.mixnet.ingress_excessive_delay_packet()
}
}
MixProcessingResultData::FinalHop { .. } => {
self.shared
.metrics
.mixnet
.ingress_received_final_hop_packet(source.ip(), packet_version);
}
}
}
/// Resolve the sphinx key for the given rotation, recording the rotation
/// label on the current tracing span. Returns `ExpiredKey` if the requested
/// odd/even key has already been rotated out.
fn resolve_rotation_key(
&self,
rotation: SphinxKeyRotation,
) -> Result<SphinxKeyGuard, PacketProcessingError> {
let rotation_label = match rotation {
SphinxKeyRotation::Unknown => "unknown",
SphinxKeyRotation::OddRotation => "odd",
SphinxKeyRotation::EvenRotation => "even",
};
Span::current().record("key_rotation", rotation_label);
match rotation {
SphinxKeyRotation::Unknown => Ok(self.shared.sphinx_keys.primary()),
SphinxKeyRotation::OddRotation => self.shared.sphinx_keys.odd().ok_or_else(|| {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "odd",
"dropping packet: odd key rotation expired"
);
PacketProcessingError::ExpiredKey
}),
SphinxKeyRotation::EvenRotation => self.shared.sphinx_keys.even().ok_or_else(|| {
warn!(
event = "packet.dropped.expired_key",
key_rotation = "even",
"dropping packet: even key rotation expired"
);
PacketProcessingError::ExpiredKey
}),
}
}
#[instrument(
name = "mixnode.forward_packet",
skip(self, mix_packet, delay),
level = "debug",
fields(
remote_addr = %remote_addr,
delay_ms = tracing::field::Empty,
)
)]
fn handle_forward_packet(
&self,
mix_packet: MixPacket,
received_at: Instant,
remote_addr: SocketAddr,
delay: Option<Delay>,
) {
if !self.shared.processing_config.forward_hop_processing_enabled {
warn!(
event = "packet.dropped.forward_disabled",
remote_addr = %remote_addr,
"dropping packet: forward hop processing disabled"
);
self.shared.dropped_forward_packet(remote_addr.ip());
return;
}
let forward_instant = self.create_delay_target(received_at, delay);
if let Some(target) = forward_instant {
Span::current().record(
"delay_ms",
target.saturating_duration_since(received_at).as_millis() as u64,
);
}
self.shared.forward_mix_packet(mix_packet, forward_instant);
}
/// Determine instant at which packet should get forwarded to the next hop.
/// By using [`Instant`] rather than explicit [`Duration`] we minimise effects of
/// the skew caused by being stuck in the channel queue.
/// This method also clamps the maximum allowed delay so that nobody could send a bunch of packets
/// with, for example, delays of 1 year thus causing denial of service
fn create_delay_target(&self, received_at: Instant, delay: Option<Delay>) -> Option<Instant> {
let delay = delay?.to_duration();
let delay = if delay > self.shared.processing_config.maximum_packet_delay {
self.shared.processing_config.maximum_packet_delay
} else {
delay
};
trace!(
"received packet will be delayed for {}ms",
delay.as_millis()
);
Some(received_at + delay)
}
#[instrument(
name = "mixnode.final_hop",
skip(self, final_hop_data),
level = "debug",
fields(
remote_addr = %remote_addr,
client_online,
disk_fallback = false,
ack_forwarded = false,
)
)]
async fn handle_final_hop(&self, final_hop_data: ProcessedFinalHop, remote_addr: SocketAddr) {
if !self.shared.processing_config.final_hop_processing_enabled {
warn!(
event = "packet.dropped.final_hop_disabled",
remote_addr = %remote_addr,
"dropping packet: final hop processing disabled"
);
self.shared.dropped_final_hop_packet(remote_addr.ip());
return;
}
let client = final_hop_data.destination;
let message = final_hop_data.message;
let has_ack = final_hop_data.forward_ack.is_some();
// if possible attempt to push message directly to the client
match self.shared.try_push_message_to_client(client, message) {
Err(unsent_plaintext) => {
// if that failed, store it on disk
Span::current().record("client_online", false);
match self
.shared
.store_processed_packet_payload(client, unsent_plaintext)
.await
{
Err(err) => error!("Failed to store client data - {err}"),
Ok(_) => {
Span::current().record("disk_fallback", true);
self.shared
.metrics
.mixnet
.egress
.add_disk_persisted_packet();
trace!("Stored packet for {client}")
}
}
}
Ok(_) => {
Span::current().record("client_online", true);
trace!("Pushed received packet to {client}");
}
}
// if we managed to either push message directly to the [online] client or store it at
// disk, forward the ack
self.shared.forward_ack_packet(final_hop_data.forward_ack);
if has_ack {
Span::current().record("ack_forwarded", true);
}
}
}
fn convert_to_metrics_version(processed: MixPacketVersion) -> PacketKind {
match processed {
MixPacketVersion::Outfox => PacketKind::Outfox,
MixPacketVersion::Sphinx(sphinx_version) => PacketKind::Sphinx(sphinx_version.value()),
}
}
pub fn mix_ingest_channels(
processing_config: &ProcessingConfig,
) -> (MixIngestSender, MixIngestReceiver) {
let (tx, rx) = flume::bounded(processing_config.ingress_channel_maximum_capacity);
(MixIngestSender(tx), rx)
}
#[derive(Clone)]
pub struct MixIngestSender(flume::Sender<IngressNymPacket>);
impl MixIngestSender {
pub fn ingest_packet(&self, packet: impl Into<IngressNymPacket>) -> io::Result<()> {
let sender = &self.0;
// we are using a bounded channel so unwrap is safe
#[allow(clippy::unwrap_used)]
let channel_capacity = sender.capacity().unwrap();
let channel_used = sender.len();
let sending_res = sender.try_send(packet.into());
sending_res.map_err(|err| match err {
flume::TrySendError::Full(_) => {
warn!(
event = "mixnode.ingress_try_send",
result = "full_dropped",
channel_capacity,
channel_used,
"dropping packet: ingress buffer is full ({channel_used}/{channel_capacity})"
);
io::Error::new(io::ErrorKind::WouldBlock, "ingress queue is full")
}
flume::TrySendError::Disconnected(_) => {
debug!(
event = "mixnode.ingress_try_send",
result = "closed",
channel_capacity,
channel_used,
"ingress queue is closed"
);
io::Error::new(io::ErrorKind::BrokenPipe, "ingress packet channel closed")
}
})
}
}
pub type MixIngestReceiver = flume::Receiver<IngressNymPacket>;
+9 -85
View File
@@ -4,25 +4,16 @@
use crate::config::Config;
use crate::node::key_rotation::active_keys::ActiveSphinxKeys;
use crate::node::mixnet::SharedFinalHopData;
use crate::node::mixnet::handler::ConnectionHandler;
use crate::node::replay_protection::bloomfilter::ReplayProtectionBloomfilters;
use nym_gateway::node::GatewayStorageError;
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketToForward};
use nym_node_metrics::NymNodeMetrics;
use nym_node_metrics::mixnet::PacketKind;
use nym_noise::config::NoiseConfig;
use nym_sphinx_forwarding::packet::MixPacket;
use nym_sphinx_framing::processing::{
MixPacketVersion, MixProcessingResult, MixProcessingResultData, PacketProcessingError,
};
use nym_sphinx_types::DestinationAddressBytes;
use nym_task::ShutdownToken;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use std::net::IpAddr;
use std::time::{Duration, Instant};
use tracing::{debug, error};
pub(crate) mod final_hop;
@@ -30,13 +21,11 @@ pub(crate) mod final_hop;
#[derive(Clone, Copy)]
pub(crate) struct ProcessingConfig {
pub(crate) maximum_packet_delay: Duration,
/// how long the task is willing to skip mutex acquisition before it will block the thread
/// until it actually obtains it
pub(crate) maximum_replay_detection_deferral: Duration,
/// how many packets the task is willing to queue before it will block the thread
/// until it obtains the mutex
pub(crate) maximum_replay_detection_pending_packets: usize,
/// Channel capacity for the mixnet ingress channel. This determines the maximum number of
/// packets that can be queued waiting for ingest processing. Once the queue is full packets
/// will still be taken off the wire, but dropped as the node is too busy to handle them.
pub(crate) ingress_channel_maximum_capacity: usize,
pub(crate) forward_hop_processing_enabled: bool,
pub(crate) final_hop_processing_enabled: bool,
@@ -46,16 +35,11 @@ impl ProcessingConfig {
pub(crate) fn new(config: &Config) -> Self {
ProcessingConfig {
maximum_packet_delay: config.mixnet.debug.maximum_forward_packet_delay,
maximum_replay_detection_deferral: config
ingress_channel_maximum_capacity: config
.mixnet
.replay_protection
.debug
.maximum_replay_detection_deferral,
maximum_replay_detection_pending_packets: config
.mixnet
.replay_protection
.debug
.maximum_replay_detection_pending_packets,
.ingress_channel_maximum_capacity,
forward_hop_processing_enabled: config.modes.mixnode,
final_hop_processing_enabled: config.modes.expects_final_hop_traffic()
|| config.wireguard.enabled,
@@ -83,13 +67,6 @@ pub(crate) struct SharedData {
pub(super) shutdown_token: ShutdownToken,
}
fn convert_to_metrics_version(processed: MixPacketVersion) -> PacketKind {
match processed {
MixPacketVersion::Outfox => PacketKind::Outfox,
MixPacketVersion::Sphinx(sphinx_version) => PacketKind::Sphinx(sphinx_version.value()),
}
}
impl SharedData {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
@@ -131,64 +108,11 @@ impl SharedData {
self.metrics.mixnet.ingress_dropped_final_hop_packet(source)
}
pub(super) fn update_metrics(
&self,
processing_result: &Result<MixProcessingResult, PacketProcessingError>,
source: IpAddr,
) {
let Ok(processing_result) = processing_result else {
self.metrics.mixnet.ingress_malformed_packet(source);
return;
};
let packet_version = convert_to_metrics_version(processing_result.packet_version);
match processing_result.processing_data {
MixProcessingResultData::ForwardHop { delay, .. } => {
self.metrics
.mixnet
.ingress_received_forward_packet(source, packet_version);
// check if the delay wasn't excessive
if let Some(delay) = delay
&& delay.to_duration() > self.processing_config.maximum_packet_delay
{
self.metrics.mixnet.ingress_excessive_delay_packet()
}
}
MixProcessingResultData::FinalHop { .. } => {
self.metrics
.mixnet
.ingress_received_final_hop_packet(source, packet_version);
}
}
}
pub(super) fn try_handle_connection(
&self,
accepted: io::Result<(TcpStream, SocketAddr)>,
) -> Option<JoinHandle<()>> {
match accepted {
Ok((socket, remote_addr)) => {
debug!("accepted incoming mixnet connection from: {remote_addr}");
let mut handler = ConnectionHandler::new(self, remote_addr);
let join_handle =
tokio::spawn(async move { handler.handle_connection(socket).await });
self.log_connected_clients();
Some(join_handle)
}
Err(err) => {
debug!("failed to accept incoming mixnet connection: {err}");
None
}
}
}
pub(super) fn forward_mix_packet(&self, packet: MixPacket, delay_until: Option<Instant>) {
let has_delay = delay_until.is_some();
if self
.mixnet_forwarder
.forward_packet(PacketToForward::new(packet, delay_until))
.forward_packet(PacketToForward::new(packet, delay_until.map(Into::into)))
.is_err()
&& !self.shutdown_token.is_cancelled()
{
+11 -1
View File
@@ -32,6 +32,7 @@ use crate::node::metrics::handler::mixnet_data_cleaner::MixnetMetricsCleaner;
use crate::node::metrics::handler::pending_egress_packets_updater::PendingEgressPacketsUpdater;
use crate::node::mixnet::SharedFinalHopData;
use crate::node::mixnet::packet_forwarding::PacketForwarder;
use crate::node::mixnet::packet_ingest::MixPacketIngest;
use crate::node::mixnet::shared::ProcessingConfig;
use crate::node::nym_apis_client::NymApisClient;
use crate::node::replay_protection::background_task::ReplayProtectionDiskFlush;
@@ -1289,7 +1290,16 @@ impl NymNode {
self.shutdown_token(),
);
let mut mixnet_listener = mixnet::Listener::new(self.config.mixnet.bind_address, shared);
let mut mix_packet_ingest = MixPacketIngest::new(&shared);
let ingest_sender = mix_packet_ingest.sender();
let ingest_shutdown = self.shutdown_token();
self.shutdown_tracker().try_spawn_named(
async move { mix_packet_ingest.run(ingest_shutdown).await },
"MixPacketIngest",
);
let mut mixnet_listener =
mixnet::Listener::new(self.config.mixnet.bind_address, shared, ingest_sender);
let shutdown_token = self.shutdown_token();
self.shutdown_tracker().try_spawn_named(
@@ -247,10 +247,11 @@ impl ReplayProtectionBloomfilters {
// map from particular rotation id to vector of results, based on the order of requests received
type BatchCheckResult = HashMap<u32, Vec<bool>>;
#[allow(unused)]
impl ReplayProtectionBloomfilters {
pub(crate) fn batch_try_check_and_set(
&self,
reply_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
replay_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
) -> Option<Result<BatchCheckResult, PoisonError<()>>> {
let mut guard = match self.inner.try_lock() {
Ok(guard) => guard,
@@ -258,18 +259,43 @@ impl ReplayProtectionBloomfilters {
Err(TryLockError::WouldBlock) => return None,
};
Some(Ok(guard.batch_check_and_set(reply_tags)))
Some(Ok(guard.batch_check_and_set(replay_tags)))
}
pub(crate) fn batch_check_and_set(
&self,
reply_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
replay_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
) -> Result<HashMap<u32, Vec<bool>>, PoisonError<()>> {
let Ok(mut guard) = self.inner.lock() else {
return Err(PoisonError::new(()));
};
Ok(guard.batch_check_and_set(reply_tags))
Ok(guard.batch_check_and_set(replay_tags))
}
pub(crate) fn try_check_and_set(
&self,
rotation_id: u32,
replay_tag: &[u8; REPLAY_TAG_SIZE],
) -> Option<Result<bool, PoisonError<()>>> {
let mut guard = match self.inner.try_lock() {
Ok(guard) => guard,
Err(TryLockError::Poisoned(_)) => return Some(Err(PoisonError::new(()))),
Err(TryLockError::WouldBlock) => return None,
};
Some(Ok(guard.check_and_set(rotation_id, replay_tag)))
}
pub(crate) fn check_and_set(
&self,
rotation_id: u32,
replay_tag: &[u8; REPLAY_TAG_SIZE],
) -> Result<bool, PoisonError<()>> {
let Ok(mut guard) = self.inner.lock() else {
return Err(PoisonError::new(()));
};
Ok(guard.check_and_set(rotation_id, replay_tag))
}
}
@@ -287,50 +313,47 @@ struct ReplayProtectionBloomfiltersInner {
pre_announced: Option<RotationFilter>,
}
#[allow(unused)]
impl ReplayProtectionBloomfiltersInner {
fn batch_check_and_set(
&mut self,
reply_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
replay_tags: &HashMap<u32, Vec<&[u8; REPLAY_TAG_SIZE]>>,
) -> HashMap<u32, Vec<bool>> {
let mut result = HashMap::with_capacity(reply_tags.len());
for (&rotation_id, reply_tags) in reply_tags {
// try to 'find' the relevant filter. we might be doing 3 reads here, but realistically it's
// going to be 'primary' most of the time and even if not, it's just few ns of overhead...
let filter = if self.primary.metadata.rotation_id == rotation_id {
Some(&mut self.primary.data)
} else if let Some(secondary) = &mut self.overlap {
// if let chaining won't be stable until 1.88 so we have to do the Option workaround
if secondary.metadata.rotation_id == rotation_id {
Some(&mut secondary.data)
} else {
None
}
} else if let Some(pre_announced) = &mut self.pre_announced {
if pre_announced.metadata.rotation_id == rotation_id {
Some(&mut pre_announced.data)
} else {
None
}
} else {
None
};
let Some(filter) = filter else {
// if we've received a packet from an unknown rotation, it most likely means it has been replayed
// from an older rotation, so mark it as such
result.insert(rotation_id, vec![false; reply_tags.len()]);
continue;
};
let mut rotation_results = Vec::with_capacity(reply_tags.len());
for tag in reply_tags {
rotation_results.push(filter.check_and_set(tag))
let mut result = HashMap::with_capacity(replay_tags.len());
for (&rotation_id, replay_tags) in replay_tags {
let mut rotation_results = Vec::with_capacity(replay_tags.len());
for tag in replay_tags {
rotation_results.push(self.check_and_set(rotation_id, tag))
}
result.insert(rotation_id, rotation_results);
}
result
}
fn check_and_set(&mut self, rotation_id: u32, replay_tag: &[u8; 32]) -> bool {
let filter = if self.primary.metadata.rotation_id == rotation_id {
Some(&mut self.primary.data)
} else if let Some(secondary) = &mut self.overlap
&& secondary.metadata.rotation_id == rotation_id
{
Some(&mut secondary.data)
} else if let Some(pre_announced) = &mut self.pre_announced
&& pre_announced.metadata.rotation_id == rotation_id
{
Some(&mut pre_announced.data)
} else {
None
};
let Some(filter) = filter else {
// if we've received a packet from an unknown rotation, it most likely means it has been replayed
// from an older rotation, so mark it as such
return false;
};
filter.check_and_set(replay_tag)
}
}
pub(crate) struct RotationFilter {