parking branch

This commit is contained in:
Simon Wicky
2026-05-29 15:07:59 +02:00
parent fc79fe4738
commit 9a38f1c3a6
23 changed files with 744 additions and 40 deletions
Generated
+1
View File
@@ -6280,6 +6280,7 @@ dependencies = [
"nym-gateway-requests",
"nym-http-api-client",
"nym-id",
"nym-lp-data",
"nym-mixnet-client",
"nym-mixnet-contract-common",
"nym-network-defaults",
+1
View File
@@ -60,6 +60,7 @@ nym-client-core-surb-storage = { workspace = true }
nym-client-core-gateways-storage = { workspace = true }
nym-ecash-time = { workspace = true }
nym-mixnet-contract-common = { workspace = true }
nym-lp-data = { workspace = true }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
nym-mixnet-client = { workspace = true }
@@ -11,6 +11,8 @@ use crate::client::event_control::EventControl;
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
use crate::client::key_manager::ClientKeys;
use crate::client::key_manager::persistence::KeyStore;
use crate::client::lp::data::LpDataSetup;
use crate::client::lp::data::shared::SharedLpDataState;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
use crate::client::real_messages_control;
@@ -636,7 +638,6 @@ where
{
Err(ClientCoreError::CustomGatewaySelectionExpected)
} else {
// and make sure to invalidate the task client, so we wouldn't cause premature shutdown
custom_gateway_transceiver.set_packet_router(packet_router)?;
Ok(custom_gateway_transceiver)
};
@@ -817,6 +818,24 @@ where
(mix_tx, client_tx)
}
#[allow(dead_code)]
fn build_lp_data_tasks(
config: &Config,
encryption_keys: Arc<x25519::KeyPair>,
identity_keys: Arc<ed25519::KeyPair>,
input_receiver: InputMessageReceiver,
shutdown_tracker: &ShutdownTracker,
) -> Result<LpDataSetup, ClientCoreError> {
let shared_state = SharedLpDataState::new(
config.debug,
encryption_keys,
identity_keys,
shutdown_tracker.clone_shutdown_token(),
);
LpDataSetup::new(shared_state, input_receiver, shutdown_tracker.clone())
}
// TODO: rename it as it implies the data is persistent whilst one can use InMemBackend
async fn setup_persistent_reply_storage(
backend: S::ReplyStore,
@@ -1063,12 +1082,27 @@ where
)
.await?;
// SW keep all the above
// LP Data channel
// let lp_data_tasks = Self::build_lp_data_tasks(
// &self.config,
// encryption_keys.clone(),
// identity_keys.clone(),
// input_receiver,
// &shutdown_tracker.clone(),
// )?;
// lp_data_tasks.start_tasks();
// SW Piping between inbound and outbound
let gateway_packet_router = PacketRouter::new(
ack_sender,
mixnet_messages_sender,
shutdown_tracker.clone_shutdown_token(),
);
// SW this needs to become the IO handler
let gateway_transceiver = Self::setup_gateway_transceiver(
self.custom_gateway_transceiver,
&self.config,
@@ -1090,6 +1124,7 @@ where
)
.await?;
// SW turn into inbound pipeline
Self::start_received_messages_buffer_controller(
encryption_keys,
received_buffer_request_receiver,
@@ -1100,6 +1135,8 @@ where
&shutdown_tracker.clone(),
);
// SW the rest below is outbound pipeline
// The message_sender is the transmitter for any component generating sphinx packets
// that are to be sent to the mixnet. They are used by cover traffic stream and real
// traffic stream.
@@ -0,0 +1,52 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_lp_data::packet::frame::LpFrameKind;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddressError;
use nym_sphinx::forwarding::packet::MixPacketFormattingError;
use nym_sphinx::framing::processing::PacketProcessingError;
use nym_sphinx::{OutfoxError, SphinxError};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum LpDataHandlerError {
#[error(transparent)]
PacketFormattingError(#[from] MixPacketFormattingError),
#[error(transparent)]
PacketProcessingError(#[from] PacketProcessingError),
#[error(transparent)]
NymNodeRoutingAddressError(#[from] NymNodeRoutingAddressError),
#[error("failed to process received sphinx packet: {0}")]
SphinxProcessingError(#[from] SphinxError),
#[error("failed to process received outfox packet: {0}")]
OutfoxProcessingError(#[from] OutfoxError),
#[error("received payload type of an unexpected type: {typ:?}")]
UnexpectedLpPayload { typ: LpFrameKind },
#[error("received an Lp Frame kind that we don't support: {typ:?}")]
UnsupportedLpFrameKind { typ: LpFrameKind },
#[error("unwrapped a packet into a forward hop packet. This is no longer supported")]
ForwardHop,
#[error("{0}")]
Internal(String),
#[error("{0}")]
Other(String),
}
impl LpDataHandlerError {
pub fn internal(message: impl Into<String>) -> Self {
LpDataHandlerError::Internal(message.into())
}
pub fn other(message: impl Into<String>) -> Self {
LpDataHandlerError::Other(message.into())
}
}
@@ -0,0 +1,56 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_lp_data::packet::frame::{LpFrameAttributes, LpFrameHeader, LpFrameKind};
use nym_sphinx::forwarding::packet::MixPacketFormattingError;
use nym_sphinx::params::SphinxKeyRotation;
use crate::client::lp::data::handler::error::LpDataHandlerError;
/// Message types supported by clients
#[derive(Debug, Clone, Copy)]
pub enum ClientMessage {
Sphinx(SphinxMessage),
Outfox(OutfoxMessage),
}
impl ClientMessage {
pub fn from_frame_header(header: LpFrameHeader) -> Result<Self, LpDataHandlerError> {
match header.kind {
LpFrameKind::SphinxPacket => {
Ok(ClientMessage::Sphinx(header.frame_attributes.try_into()?))
}
LpFrameKind::OutfoxPacket => {
Ok(ClientMessage::Outfox(header.frame_attributes.try_into()?))
}
_ => Err(LpDataHandlerError::UnsupportedLpFrameKind { typ: header.kind }),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct SphinxMessage {
pub key_rotation: SphinxKeyRotation,
}
impl TryFrom<LpFrameAttributes> for SphinxMessage {
type Error = LpDataHandlerError;
fn try_from(value: LpFrameAttributes) -> Result<Self, Self::Error> {
let key_rotation = value[0]
.try_into()
.map_err(MixPacketFormattingError::InvalidKeyRotation)?;
Ok(SphinxMessage { key_rotation })
}
}
impl From<SphinxMessage> for LpFrameAttributes {
fn from(value: SphinxMessage) -> Self {
let mut attrs = [0; 14];
attrs[0] = value.key_rotation as u8;
attrs
}
}
// For now there are no differences. We can augment this variant when we will need it
pub type OutfoxMessage = SphinxMessage;
@@ -0,0 +1,216 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::inbound_messages::InputMessageReceiver;
use crate::client::lp::LpDataHandlerError;
use crate::client::lp::data::PACKET_BUFFER_SIZE;
use crate::client::lp::data::shared::SharedLpDataState;
use nym_lp_data::clients::traits::ClientUnwrappingPipeline;
use nym_lp_data::common::traits::TransportUnwrap;
use nym_lp_data::packet::{EncryptedLpPacket, MalformedLpPacketError};
use nym_lp_data::{AddressedTimedData, TimedData};
use std::sync::{Arc, mpsc};
use std::time::Instant;
use std::{net::SocketAddr, time::Duration};
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::interval;
use tracing::*;
pub mod error;
pub mod messages;
pub mod pipeline;
mod processing;
const PIPELINE_TICKING_DURATION: Duration = Duration::from_millis(1);
/// Bounded queue depth in front of each worker; keeps memory bounded under
/// bursty load and provides drop-based backpressure.
const WORKER_QUEUE_DEPTH: usize = 128;
type WorkerOutput = Result<Option<Vec<u8>>, MalformedLpPacketError>;
/// LP Data Handler for UDP data plane, acts as a pipeline driver and buffer
/// for delaying packets. Heavy per-packet processing is fanned out across a
/// pool of worker threads spawned on the shared blocking pool tracked by the
/// surrounding [`nym_task::ShutdownTracker`].
pub struct LpDataHandler {
/// Shared state
shared_state: Arc<SharedLpDataState>,
// Outbound pipeline
/// Channel to receive data for the outbound pipeline
outbound_input_rx: InputMessageReceiver,
/// Buffer for outbound packet
outbound_pkt_buffer: Vec<AddressedTimedData<EncryptedLpPacket>>,
/// Channel to send outgoing data from the outbound pipeline
outbound_output_tx: tokio::sync::mpsc::Sender<(EncryptedLpPacket, SocketAddr)>,
// Inbound pipeline
/// Channel to receive incoming data for the inbound pipeline
inbound_input_rx: mpsc::Receiver<EncryptedLpPacket>,
/// Per-worker job queues (round-robin dispatch).
worker_input_txs: Vec<mpsc::SyncSender<TimedData<EncryptedLpPacket>>>,
/// Aggregated processed packets returned by the workers. (Inbound data)
worker_output_rx: mpsc::Receiver<WorkerOutput>,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
}
impl LpDataHandler {
pub(crate) fn new(
shared_state: Arc<SharedLpDataState>,
outbound_input_rx: InputMessageReceiver,
outbound_output_tx: tokio::sync::mpsc::Sender<(EncryptedLpPacket, SocketAddr)>,
inbound_input_rx: mpsc::Receiver<EncryptedLpPacket>,
// SW TODO : inbound output (worker_output_rx)
shutdown_tracker: &nym_task::ShutdownTracker,
) -> Result<Self, LpDataHandlerError> {
let (worker_output_tx, worker_output_rx) = mpsc::sync_channel(PACKET_BUFFER_SIZE);
// Allow at least one worker, even if the config says 0
let worker_count = 4; // SW Put that in the config
// Create workers. They will stop naturally when worker_output_rx is dropped.
// The mode is decided once here; each closure picks the right pipeline type so
// the worker loop monomorphizes against a single concrete pipeline.
let worker_input_txs = (0..worker_count)
.map(|_| {
let (worker_input_tx, _worker_input_rx) = mpsc::sync_channel(WORKER_QUEUE_DEPTH);
let _worker_state = shared_state.clone();
let _worker_output = worker_output_tx.clone();
shutdown_tracker.spawn_blocking(move || {
// Instantiat pipeline
todo!()
//Self::run_worker(pipeline, worker_input_rx, worker_output);
});
worker_input_tx
})
.collect();
Ok(Self {
shared_state,
outbound_input_rx,
outbound_pkt_buffer: Vec::new(),
outbound_output_tx,
inbound_input_rx,
worker_input_txs,
worker_output_rx,
shutdown: shutdown_tracker.clone_shutdown_token(),
})
}
pub async fn run(&mut self) {
info!(
workers = self.worker_input_txs.len(),
"Starting LP data handler"
);
let mut ticking_interval = interval(PIPELINE_TICKING_DURATION);
let mut next_worker = 0;
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
info!("LP data handler: received shutdown signal");
break;
}
timestamp = ticking_interval.tick() => {
let std_timestamp: Instant = timestamp.into();
// Drain processed packets returned by workers.
while let Ok(processing_result) = self.worker_output_rx.try_recv() {
match processing_result {
Ok(_packets) => {
// Dispatch to application
todo!()
},
Err(e) => {
warn!("LP data worker: error processing packet : {e}");
},
}
}
// Dispatch incoming packets to workers.
while let Ok(input) = self.inbound_input_rx.try_recv() {
next_worker = self.dispatch_to_workers(
TimedData::new(std_timestamp, input),
next_worker,
);
}
// Run outbound pipeline
while let Ok(_input) = self.outbound_input_rx.try_recv() {
// Run outbound pipeline and stack result in outbound_pkt_buffer
todo!()
}
// Send packets that needs sending
for pkt in self.outbound_pkt_buffer.extract_if(.., |p| p.data.timestamp <= std_timestamp) {
if let Err(e) = self.outbound_output_tx.try_send((pkt.data.data, pkt.dst)) {
match e {
TrySendError::Full(_) => {
warn!("LP data handler: packet sending buffer is full, the client might be overloaded");
},
TrySendError::Closed(_) => {
break;
},
}
}
}
}
}
}
// Workers will stop because we are dropping the receiving channel
info!("LP data handler shutdown complete");
}
/// Round-robin dispatch a job across worker queues. If the chosen worker is
/// full, fall through to the next one; if all are saturated, drop the packet
/// (UDP-style) and bump a metric. Returns the worker index to start from on
/// the next dispatch.
fn dispatch_to_workers(&self, mut job: TimedData<EncryptedLpPacket>, start: usize) -> usize {
let n = self.worker_input_txs.len();
for offset in 0..n {
let idx = (start + offset) % n;
match self.worker_input_txs[idx].try_send(job) {
Ok(()) => return (idx + 1) % n,
Err(mpsc::TrySendError::Full(returned)) => {
job = returned;
}
Err(mpsc::TrySendError::Disconnected(returned)) => {
error!(
"LP data worker {idx} disconnected; this shouldn't happen outside of shut down"
);
job = returned;
}
}
}
warn!("LP data handler: all workers saturated, dropping packet");
start
}
fn run_worker<P>(
mut pipeline: P,
input_rx: mpsc::Receiver<TimedData<EncryptedLpPacket>>,
output_tx: mpsc::SyncSender<WorkerOutput>,
) where
P: ClientUnwrappingPipeline<EncryptedLpPacket, ()> // SW fill in message kind
+ TransportUnwrap<EncryptedLpPacket, Error = MalformedLpPacketError>, // This is needed to specify the error type
{
while let Ok(input) = input_rx.recv() {
// Blocking is fine, we don't want to unclog ourself and process a new packet that will be dropped anyway
if let Err(e) = output_tx.send(pipeline.unwrap(input.data, input.timestamp)) {
trace!(
"Failed to send processing data back to handler : {e}. We are probably shutting down"
);
return;
}
}
}
}
@@ -0,0 +1,4 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// TODO
@@ -0,0 +1,5 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub(crate) mod outfox;
pub(crate) mod sphinx;
@@ -0,0 +1,37 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_lp_data::TimedPayload;
use nym_sphinx::OutfoxPacket;
use tracing::warn;
use crate::client::lp::data::{
handler::{error::LpDataHandlerError, messages::OutfoxMessage},
shared::SharedLpDataState,
};
pub(crate) fn process(
shared_state: &SharedLpDataState,
outfox_packet: TimedPayload,
_metadata: OutfoxMessage,
) -> Result<TimedPayload, LpDataHandlerError> {
let TimedPayload {
data: outfox_bytes,
timestamp: arrival_timestamp,
} = outfox_packet;
let mut outfox_packet = OutfoxPacket::try_from(outfox_bytes.as_slice())?;
let _next_address =
outfox_packet.decode_next_layer(shared_state.encryption_keys.private_key().as_ref())?;
if outfox_packet.is_final_hop() {
Ok(TimedPayload::new(
arrival_timestamp,
outfox_packet.payload().to_vec(),
))
} else {
warn!("Dropping forward hop packet in a client");
Err(LpDataHandlerError::ForwardHop)
}
}
@@ -0,0 +1,39 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nym_lp_data::TimedPayload;
use nym_sphinx::{ProcessedPacketData, SphinxPacket};
use tracing::warn;
use crate::client::lp::data::{
handler::{error::LpDataHandlerError, messages::SphinxMessage},
shared::SharedLpDataState,
};
pub(crate) fn process(
shared_state: &SharedLpDataState,
sphinx_packet: TimedPayload,
_metadata: SphinxMessage,
) -> Result<TimedPayload, LpDataHandlerError> {
let TimedPayload {
data: sphinx_bytes,
timestamp: arrival_timestamp,
} = sphinx_packet;
let sphinx_packet = SphinxPacket::from_bytes(&sphinx_bytes)?;
// Final processing
let processed_packet =
sphinx_packet.process(shared_state.encryption_keys.private_key().as_ref())?;
match processed_packet.data {
ProcessedPacketData::ForwardHop { .. } => {
warn!("Dropping forward hop packet in a client");
Err(LpDataHandlerError::ForwardHop)
}
ProcessedPacketData::FinalHop { payload, .. } => Ok(TimedPayload::new(
arrival_timestamp,
payload.recover_plaintext()?,
)),
}
}
@@ -0,0 +1,107 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::lp::data::MAX_UDP_PACKET_SIZE;
use crate::client::lp::data::shared::SharedLpDataState;
use crate::error::ClientCoreError;
use nym_lp_data::packet::EncryptedLpPacket;
use std::net::SocketAddr;
use std::sync::{Arc, mpsc, mpsc::TrySendError};
use tokio::net::UdpSocket;
use tracing::log::warn;
use tracing::{error, info};
/// LP UDP listener that accepts TCP connections on port 51264 (by default)
pub(crate) struct LpDataListener {
/// Shared state
shared_state: Arc<SharedLpDataState>,
/// Channel to send incoming data to the processing pipeline
inbound_input_tx: mpsc::SyncSender<EncryptedLpPacket>,
// This has to be a tokio channel, to be async and bounded
/// Channel to receive outgoing data from the processling pipeline
outbound_output_rx: tokio::sync::mpsc::Receiver<(EncryptedLpPacket, SocketAddr)>,
/// Shutdown token
shutdown: nym_task::ShutdownToken,
}
impl LpDataListener {
pub fn new(
shared_state: Arc<SharedLpDataState>,
inbound_input_tx: mpsc::SyncSender<EncryptedLpPacket>,
outbound_output_rx: tokio::sync::mpsc::Receiver<(EncryptedLpPacket, SocketAddr)>,
shutdown: nym_task::ShutdownToken,
) -> Self {
Self {
shared_state,
inbound_input_tx,
outbound_output_rx,
shutdown,
}
}
pub async fn run(&mut self) -> Result<(), ClientCoreError> {
let socket = UdpSocket::bind("[::]:0").await.map_err(|source| {
error!("Failed to bind LP data socket: {source}");
ClientCoreError::LpBindFailure { source }
})?;
info!("Started LP data socket on {}", socket.local_addr()?);
let mut buf = vec![0u8; MAX_UDP_PACKET_SIZE];
loop {
tokio::select! {
biased;
_ = self.shutdown.cancelled() => {
info!("LP data listener: received shutdown signal");
break;
}
result = self.outbound_output_rx.recv() => {
match result {
Some((payload, dst_addr)) => {
if let Err(e) = socket.send_to(&payload.to_bytes(), dst_addr).await {
warn!("LP data packet error to {dst_addr}: {e}");
}
}
None => {
warn!("LP outgoing packet channel closed");
break;
}
}
}
result = socket.recv_from(&mut buf) => {
match result {
Ok((len, src_addr)) => {
info!("received {len} bytes from {src_addr} on the LP Data socket");
if let Ok(encrypted_packet) = EncryptedLpPacket::decode(&buf[..len]) {
if let Err(e) = self.inbound_input_tx.try_send(encrypted_packet) {
match e {
TrySendError::Full(_) => {
warn!("LP data listener: packet sending buffer is full, the client might be overloaded");
},
TrySendError::Disconnected(_) => {
warn!("LP data listener: incoming packet channel is closed");
break;
},
}
}
} else {
warn!("Error reading LP packet from wire");
}
}
Err(e) => {
warn!("LP data socket recv error: {e}");
}
}
}
}
}
info!("LP data handler shutdown complete");
Ok(())
}
}
@@ -0,0 +1,103 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// Parking the branch
#![allow(clippy::todo)]
#![allow(dead_code)]
#![allow(clippy::incompatible_msrv)]
use std::sync::{Arc, mpsc};
use crate::client::inbound_messages::InputMessageReceiver;
use crate::client::lp::data::handler::LpDataHandler;
use crate::client::lp::data::listener::LpDataListener;
use crate::client::lp::data::shared::SharedLpDataState;
use crate::error::ClientCoreError;
use nym_task::ShutdownTracker;
use tracing::error;
/// Maximum UDP packet size we'll accept
/// Sphinx packets are typically ~2KB, LP overhead is ~50 bytes, so 4KB is plenty
const MAX_UDP_PACKET_SIZE: usize = 4096;
pub(crate) const PACKET_BUFFER_SIZE: usize = 100;
pub mod handler;
mod listener;
pub mod shared;
pub struct LpDataSetup {
listener: LpDataListener,
handler: LpDataHandler,
/// Shutdown coordination
shutdown: ShutdownTracker,
}
impl LpDataSetup {
pub(crate) fn new(
shared_state: SharedLpDataState,
outbound_input_rx: InputMessageReceiver,
shutdown: ShutdownTracker,
) -> Result<Self, ClientCoreError> {
let (inbound_input_tx, inbound_input_rx) = mpsc::sync_channel(PACKET_BUFFER_SIZE);
let (outbound_output_tx, outbound_output_rx) =
tokio::sync::mpsc::channel(PACKET_BUFFER_SIZE);
let shared_state = Arc::new(shared_state);
let listener = LpDataListener::new(
shared_state.clone(),
inbound_input_tx,
outbound_output_rx,
shutdown.clone_shutdown_token(),
);
let handler = LpDataHandler::new(
shared_state,
outbound_input_rx,
outbound_output_tx,
inbound_input_rx,
&shutdown,
)?;
Ok(LpDataSetup {
listener,
handler,
shutdown,
})
}
pub fn start_tasks(mut self) {
// Spawn the UDP data handler for LP data plane
// The data handler listens on UDP port 51264 and processes LP-wrapped Sphinx packets
// from registered clients. It decrypts the LP layer and forwards the Sphinx packets
let shutdown_token = self.shutdown.clone_shutdown_token();
let mut listener = self.listener;
self.shutdown.try_spawn_named(
async move {
if let Err(err) = listener.run().await {
shutdown_token.cancel();
error!("LP data listener error: {err}");
}
},
"LP::LpDataListener",
);
self.shutdown
.try_spawn_named(async move { self.handler.run().await }, "LP::LpDataHandler");
}
}
#[cfg(test)]
mod tests {
use super::*;
// Sphinx packets are typically around 2KB
// 4KB should be plenty with room to spare
const _: () = {
assert!(MAX_UDP_PACKET_SIZE >= 2048 + 100);
};
}
@@ -0,0 +1,38 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use nym_client_core_config_types::DebugConfig;
use nym_crypto::asymmetric::{ed25519, x25519};
use nym_lp_data::fragmentation::reconstruction::MessageReconstructor;
use nym_task::ShutdownToken;
/// Shared state for LP data plane
pub struct SharedLpDataState {
pub(crate) config: DebugConfig,
pub(crate) encryption_keys: Arc<x25519::KeyPair>,
pub(crate) identity_keys: Arc<ed25519::KeyPair>,
pub(crate) message_reconstructor: MessageReconstructor,
pub(crate) shutdown_token: ShutdownToken,
}
impl SharedLpDataState {
pub(crate) fn new(
config: DebugConfig,
encryption_keys: Arc<x25519::KeyPair>,
identity_keys: Arc<ed25519::KeyPair>,
shutdown_token: ShutdownToken,
) -> Self {
SharedLpDataState {
config,
encryption_keys,
identity_keys,
message_reconstructor: Default::default(),
shutdown_token,
}
}
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub use data::handler::error::LpDataHandlerError;
pub mod data;
+1
View File
@@ -7,6 +7,7 @@ pub(crate) mod event_control;
pub(crate) mod helpers;
pub mod inbound_messages;
pub mod key_manager;
pub mod lp;
pub mod mix_traffic;
pub mod real_messages_control;
pub mod received_buffer;
+7
View File
@@ -1,6 +1,7 @@
// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::client::lp::LpDataHandlerError;
use crate::client::mix_traffic::transceiver::ErasedGatewayError;
use nym_crypto::asymmetric::ed25519::Ed25519RecoveryError;
use nym_gateway_client::error::GatewayClientError;
@@ -263,6 +264,12 @@ pub enum ClientCoreError {
#[error("Could not access task registry, {0}")]
RegistryAccess(#[from] RegistryAccessError),
#[error("failed to bind LP UDP socket: {source}")]
LpBindFailure { source: std::io::Error },
#[error(transparent)]
LpFailure(#[from] LpDataHandlerError),
}
impl From<tungstenite::Error> for ClientCoreError {
-1
View File
@@ -149,7 +149,6 @@ nym-test-utils = { workspace = true }
tokio-console = ["console-subscriber", "nym-task/tokio-tracing"]
otel = ["nym-bin-common/otel-otlp", "dep:opentelemetry", "dep:opentelemetry_sdk"]
mix-sim = []
defaul = ["mix-sim"]
[lints]
workspace = true
@@ -15,16 +15,14 @@ pub enum MixMessage {
Outfox(OutfoxMixMessage),
}
impl MixMessage {
pub fn from_frame_header(header: LpFrameHeader) -> Result<Self, LpDataHandlerError> {
match header.kind {
LpFrameKind::SphinxPacket => {
Ok(MixMessage::Sphinx(header.frame_attributes.try_into()?))
}
LpFrameKind::OutfoxPacket => {
Ok(MixMessage::Outfox(header.frame_attributes.try_into()?))
}
_ => Err(LpDataHandlerError::UnsupportedLpFrameKind { typ: header.kind }),
impl TryFrom<LpFrameHeader> for MixMessage {
type Error = LpDataHandlerError;
fn try_from(value: LpFrameHeader) -> Result<Self, Self::Error> {
match value.kind {
LpFrameKind::SphinxPacket => Ok(MixMessage::Sphinx(value.frame_attributes.try_into()?)),
LpFrameKind::OutfoxPacket => Ok(MixMessage::Outfox(value.frame_attributes.try_into()?)),
_ => Err(LpDataHandlerError::UnsupportedLpFrameKind { typ: value.kind }),
}
}
}
@@ -28,19 +28,23 @@ impl NymNodeMessage {
pub fn new_outfox_mix_message(message: OutfoxMixMessage) -> Self {
Self::Mix(MixMessage::Outfox(message))
}
}
pub fn from_frame_header(header: LpFrameHeader) -> Result<Self, LpDataHandlerError> {
match header.kind {
impl TryFrom<LpFrameHeader> for NymNodeMessage {
type Error = LpDataHandlerError;
fn try_from(value: LpFrameHeader) -> Result<Self, Self::Error> {
match value.kind {
LpFrameKind::SphinxPacket | LpFrameKind::OutfoxPacket => {
Ok(NymNodeMessage::Mix(MixMessage::from_frame_header(header)?))
Ok(NymNodeMessage::Mix(value.try_into()?))
}
LpFrameKind::ForwardSphinxPacket => Ok(NymNodeMessage::ForwardSphinx(
header.frame_attributes.try_into()?,
value.frame_attributes.try_into()?,
)),
LpFrameKind::ForwardOutfoxPacket => Ok(NymNodeMessage::ForwardOutfox(
header.frame_attributes.try_into()?,
value.frame_attributes.try_into()?,
)),
_ => Err(LpDataHandlerError::UnsupportedLpFrameKind { typ: header.kind }),
_ => Err(LpDataHandlerError::UnsupportedLpFrameKind { typ: value.kind }),
}
}
}
+6 -19
View File
@@ -19,10 +19,10 @@ use crate::node::lp::data::PACKET_BUFFER_SIZE;
use crate::node::lp::data::handler::pipeline::{MixingNodeDataPipeline, NymNodeDataPipeline};
use crate::node::lp::data::shared::{SharedGatewayLpDataState, SharedLpDataState};
use crate::node::lp::error::LpHandlerError;
use nym_lp_data::AddressedTimedData;
use nym_lp_data::common::traits::TransportUnwrap;
use nym_lp_data::nymnodes::traits::NymNodeProcessingPipeline;
use nym_lp_data::packet::{EncryptedLpPacket, MalformedLpPacketError};
use nym_lp_data::{AddressedTimedData, TimedData};
use nym_metrics::inc;
use rand::rngs::OsRng;
use std::sync::{Arc, mpsc};
@@ -45,12 +45,6 @@ const WORKER_QUEUE_DEPTH: usize = 128;
type WorkerOutput = Result<Vec<AddressedTimedData<EncryptedLpPacket>>, MalformedLpPacketError>;
/// A single packet processing job dispatched to a worker thread.
struct WorkerInput {
packet: EncryptedLpPacket,
timestamp: Instant,
}
/// LP Data Handler for UDP data plane, acts as a pipeline driver and buffer
/// for delaying packets. Heavy per-packet processing is fanned out across a
/// pool of worker threads spawned on the shared blocking pool tracked by the
@@ -66,7 +60,7 @@ pub struct LpDataHandler {
output_tx: tokio::sync::mpsc::Sender<(EncryptedLpPacket, SocketAddr)>,
/// Per-worker job queues (round-robin dispatch).
worker_input_txs: Vec<mpsc::SyncSender<WorkerInput>>,
worker_input_txs: Vec<mpsc::SyncSender<TimedData<EncryptedLpPacket>>>,
/// Aggregated processed packets returned by the workers.
worker_output_rx: mpsc::Receiver<WorkerOutput>,
@@ -176,8 +170,7 @@ impl LpDataHandler {
// Dispatch incoming packets to workers.
while let Ok(input) = self.input_rx.try_recv() {
next_worker = self.dispatch_to_workers(
input,
std_timestamp,
TimedData::new(std_timestamp, input),
next_worker,
);
}
@@ -209,14 +202,8 @@ impl LpDataHandler {
/// full, fall through to the next one; if all are saturated, drop the packet
/// (UDP-style) and bump a metric. Returns the worker index to start from on
/// the next dispatch.
fn dispatch_to_workers(
&self,
packet: EncryptedLpPacket,
timestamp: Instant,
start: usize,
) -> usize {
fn dispatch_to_workers(&self, mut job: TimedData<EncryptedLpPacket>, start: usize) -> usize {
let n = self.worker_input_txs.len();
let mut job = WorkerInput { packet, timestamp };
for offset in 0..n {
let idx = (start + offset) % n;
match self.worker_input_txs[idx].try_send(job) {
@@ -240,7 +227,7 @@ impl LpDataHandler {
fn run_worker<P>(
mut pipeline: P,
input_rx: mpsc::Receiver<WorkerInput>,
input_rx: mpsc::Receiver<TimedData<EncryptedLpPacket>>,
output_tx: mpsc::SyncSender<WorkerOutput>,
) where
P: NymNodeProcessingPipeline<EncryptedLpPacket>
@@ -248,7 +235,7 @@ impl LpDataHandler {
{
while let Ok(input) = input_rx.recv() {
// Blocking is fine, we don't want to unclog ourself and process a new packet that will be dropped anyway
if let Err(e) = output_tx.send(pipeline.process(input.packet, input.timestamp)) {
if let Err(e) = output_tx.send(pipeline.process(input.data, input.timestamp)) {
trace!(
"Failed to send processing data back to handler : {e}. We are probably shutting down"
);
@@ -156,7 +156,10 @@ impl<R: Rng> FramingUnwrap<MixMessage> for MixingNodeDataPipeline<R> {
frame: TimedData<Self::Frame>,
) -> Option<(TimedPayload, MixMessage)> {
let reassembled = self.wire.frame_to_maybe_message(frame)?;
let message_kind = MixMessage::from_frame_header(reassembled.data.header)
let message_kind = reassembled
.data
.header
.try_into()
.inspect_err(|e| warn!("{e}"))
.ok()?;
@@ -204,7 +204,10 @@ impl<R: Rng> FramingUnwrap<NymNodeMessage> for NymNodeDataPipeline<R> {
frame: TimedData<Self::Frame>,
) -> Option<(TimedPayload, NymNodeMessage)> {
let reassembled = self.wire.frame_to_maybe_message(frame)?;
let message_kind = NymNodeMessage::from_frame_header(reassembled.data.header)
let message_kind = reassembled
.data
.header
.try_into()
.inspect_err(|e| warn!("{e}"))
.ok()?;
+1 -1
View File
@@ -12,7 +12,7 @@ use tokio::net::UdpSocket;
use tracing::log::warn;
use tracing::{error, info};
/// LP UDP listener that accepts TCP connections on port 51264 (by default)
/// LP UDP listener
pub(crate) struct LpDataListener {
/// Shared state
shared_state: Arc<SharedLpDataState>,