Compare commits

...

1 Commits

Author SHA1 Message Date
Simon Wicky 005b67ef0f squash 2026-02-13 10:57:42 +01:00
7 changed files with 233 additions and 228 deletions
+90 -122
View File
@@ -15,9 +15,10 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::net::{TcpStream, UdpSocket};
use tokio::time::sleep;
use tokio_util::codec::Framed;
use tokio_util::udp::UdpFramed;
pub struct Config {
initial_reconnection_backoff: Duration,
@@ -57,136 +58,98 @@ pub trait SendWithoutResponse {
}
pub struct Client {
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
conn_new: Option<mpsc::Sender<(FramedNymPacket, SocketAddr)>>,
config: Config,
}
struct ConnectionSender {
channel: mpsc::Sender<FramedNymPacket>,
current_reconnection_attempt: Arc<AtomicU32>,
}
impl ConnectionSender {
fn new(channel: mpsc::Sender<FramedNymPacket>) -> Self {
ConnectionSender {
channel,
current_reconnection_attempt: Arc::new(AtomicU32::new(0)),
}
}
}
impl Client {
pub fn new(config: Config) -> Client {
Client {
conn_new: HashMap::new(),
conn_new: None,
config,
}
}
async fn manage_connection(
address: SocketAddr,
receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
receiver: mpsc::Receiver<(FramedNymPacket, SocketAddr)>,
) {
let connection_fut = TcpStream::connect(address);
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
Framed::new(stream, NymCodec)
}
Err(err) => {
debug!(
"failed to establish connection to {} (err: {})",
address, err
);
return;
}
},
Err(_) => {
debug!(
"failed to connect to {} within {:?}",
address, connection_timeout
);
// we failed to connect - increase reconnection attempt
current_reconnection.fetch_add(1, Ordering::SeqCst);
let socket = match UdpSocket::bind("0.0.0.0:0").await {
Ok(socket) => socket,
Err(err) => {
error!("Failed to bind to - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?");
return;
}
};
let framed_conn = UdpFramed::new(socket, NymCodec);
// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = receiver.map(Ok).forward(conn).await {
if let Err(err) = receiver.map(Ok).forward(framed_conn).await {
warn!("Failed to forward packets to {} - {err}", address);
}
debug!(
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
address
"connection manager is finished. Either the connection failed or mixnet client got dropped"
);
}
/// If we're trying to reconnect, determine how long we should wait.
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
if current_attempt == 0 {
None
} else {
let exp = 2_u32.checked_pow(current_attempt);
let backoff = exp
.and_then(|exp| self.config.initial_reconnection_backoff.checked_mul(exp))
.unwrap_or(self.config.maximum_reconnection_backoff);
// fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
// if current_attempt == 0 {
// None
// } else {
// let exp = 2_u32.checked_pow(current_attempt);
// let backoff = exp
// .and_then(|exp| self.config.initial_reconnection_backoff.checked_mul(exp))
// .unwrap_or(self.config.maximum_reconnection_backoff);
Some(std::cmp::min(
backoff,
self.config.maximum_reconnection_backoff,
))
}
}
// Some(std::cmp::min(
// backoff,
// self.config.maximum_reconnection_backoff,
// ))
// }
// }
fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) {
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
// this CAN'T fail because we just created the channel which has a non-zero capacity
if self.config.maximum_connection_buffer_size > 0 {
sender.try_send(pending_packet).unwrap();
sender.try_send((pending_packet, address.into())).unwrap();
}
self.conn_new = Some(sender);
// if we already tried to connect to `address` before, grab the current attempt count
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.conn_new.insert(address, new_entry);
current_attempt
};
// let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
// existing.channel = sender;
// Arc::clone(&existing.current_reconnection_attempt)
// } else {
// let new_entry = ConnectionSender::new(sender);
// let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
// self.conn_new.insert(address, new_entry);
// current_attempt
// };
// load the actual value.
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
let backoff = self.determine_backoff(reconnection_attempt);
// let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
// let backoff = self.determine_backoff(reconnection_attempt);
// copy the value before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;
// let initial_connection_timeout = self.config.initial_connection_timeout;
tokio::spawn(async move {
// before executing the manager, wait for what was specified, if anything
if let Some(backoff) = backoff {
trace!("waiting for {:?} before attempting connection", backoff);
sleep(backoff).await;
}
// if let Some(backoff) = backoff {
// trace!("waiting for {:?} before attempting connection", backoff);
// sleep(backoff).await;
// }
Self::manage_connection(
address.into(),
receiver,
initial_connection_timeout,
&current_reconnection_attempt,
//initial_connection_timeout,
//&current_reconnection_attempt,
)
.await
});
@@ -204,48 +167,53 @@ impl SendWithoutResponse for Client {
let framed_packet =
FramedNymPacket::new(packet, packet_type, self.config.use_legacy_version);
if let Some(sender) = self.conn_new.get_mut(&address) {
if let Err(err) = sender.channel.try_send(framed_packet) {
if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
))
} else if err.is_disconnected() {
debug!(
"Connection to {} seems to be dead. attempting to re-establish it...",
address
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, err.into_inner());
Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
))
match &self.conn_new {
Some(sender) => {
if let Err(err) = sender.clone().try_send((framed_packet, address.into())) {
if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
))
} else if err.is_disconnected() {
debug!(
"Connection to {} seems to be dead. attempting to re-establish it...",
address
);
// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, err.into_inner().0);
Err(io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
))
} else {
// this can't really happen, but let's safe-guard against it in case something changes in futures library
Err(io::Error::new(
io::ErrorKind::Other,
"unknown connection buffer error",
))
}
} else {
// this can't really happen, but let's safe-guard against it in case something changes in futures library
Err(io::Error::new(
io::ErrorKind::Other,
"unknown connection buffer error",
))
debug!("Sending packet to {:?}", address);
Ok(())
}
} else {
Ok(())
}
} else {
// there was never a connection to begin with
debug!("establishing initial connection to {}", address);
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection is in progress",
))
None => {
// there was never a connection to begin with
debug!("establishing initial connection");
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection is in progress",
))
}
}
}
}
+1 -1
View File
@@ -26,7 +26,7 @@ const ACK_IV_SIZE: usize = 16;
const ACK_PACKET_SIZE: usize = ACK_IV_SIZE + FRAG_ID_LEN + SPHINX_PACKET_OVERHEAD;
const REGULAR_PACKET_SIZE: usize = 2 * 1024 + SPHINX_PACKET_OVERHEAD;
const EXTENDED_PACKET_SIZE_8: usize = 8 * 1024 + SPHINX_PACKET_OVERHEAD;
const EXTENDED_PACKET_SIZE_8: usize = 1 * 512 + SPHINX_PACKET_OVERHEAD;
const EXTENDED_PACKET_SIZE_16: usize = 16 * 1024 + SPHINX_PACKET_OVERHEAD;
const EXTENDED_PACKET_SIZE_32: usize = 32 * 1024 + SPHINX_PACKET_OVERHEAD;
@@ -155,7 +155,7 @@ impl<St: Storage> ConnectionHandler<St> {
self.forward_ack(forward_ack, client_address);
}
async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedNymPacket) {
pub(crate) async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedNymPacket) {
//
// TODO: here be replay attack detection - it will require similar key cache to the one in
// packet processor for vpn packets,
@@ -174,47 +174,47 @@ impl<St: Storage> ConnectionHandler<St> {
self.handle_processed_packet(processed_final_hop).await
}
pub(crate) async fn handle_connection(
mut self,
conn: TcpStream,
remote: SocketAddr,
mut shutdown: TaskClient,
) {
debug!("Starting connection handler for {:?}", remote);
shutdown.mark_as_success();
let mut framed_conn = Framed::new(conn, NymCodec);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("ConnectionHandler: received shutdown");
}
framed_sphinx_packet = framed_conn.next() => {
match framed_sphinx_packet {
Some(Ok(framed_sphinx_packet)) => {
// TODO: benchmark spawning tokio task with full processing vs just processing it
// synchronously under higher load in single and multi-threaded situation.
// pub(crate) async fn handle_connection(
// mut self,
// conn: TcpStream,
// remote: SocketAddr,
// mut shutdown: TaskClient,
// ) {
// debug!("Starting connection handler for {:?}", remote);
// shutdown.mark_as_success();
// let mut framed_conn = Framed::new(conn, NymCodec);
// while !shutdown.is_shutdown() {
// tokio::select! {
// biased;
// _ = shutdown.recv() => {
// log::trace!("ConnectionHandler: received shutdown");
// }
// framed_sphinx_packet = framed_conn.next() => {
// match framed_sphinx_packet {
// Some(Ok(framed_sphinx_packet)) => {
// // TODO: benchmark spawning tokio task with full processing vs just processing it
// // synchronously under higher load in single and multi-threaded situation.
// in theory we could process multiple sphinx packet from the same connection in parallel,
// but we already handle multiple concurrent connections so if anything, making
// that change would only slow things down
self.handle_received_packet(framed_sphinx_packet).await;
}
Some(Err(err)) => {
error!(
"The socket connection got corrupted with error: {err}. Closing the socket",
);
return;
}
None => break, // stream got closed by remote
}
}
}
}
// // in theory we could process multiple sphinx packet from the same connection in parallel,
// // but we already handle multiple concurrent connections so if anything, making
// // that change would only slow things down
// self.handle_received_packet(framed_sphinx_packet).await;
// }
// Some(Err(err)) => {
// error!(
// "The socket connection got corrupted with error: {err}. Closing the socket",
// );
// return;
// }
// None => break, // stream got closed by remote
// }
// }
// }
// }
info!(
"Closing connection from {:?}",
framed_conn.into_inner().peer_addr()
);
}
// info!(
// "Closing connection from {:?}",
// framed_conn.into_inner().peer_addr()
// );
// }
}
@@ -3,11 +3,14 @@
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
use crate::node::storage::Storage;
use futures::StreamExt;
use log::*;
use nym_sphinx::framing::codec::NymCodec;
use nym_task::TaskClient;
use std::net::SocketAddr;
use std::process;
use tokio::task::JoinHandle;
use tokio_util::udp::UdpFramed;
pub(crate) struct Listener {
address: SocketAddr,
@@ -20,12 +23,12 @@ impl Listener {
Listener { address, shutdown }
}
pub(crate) async fn run<St>(&mut self, connection_handler: ConnectionHandler<St>)
pub(crate) async fn run<St>(&mut self, mut connection_handler: ConnectionHandler<St>)
where
St: Storage + Clone + 'static,
{
info!("Starting mixnet listener at {}", self.address);
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
let socket = match tokio::net::UdpSocket::bind(self.address).await {
Ok(listener) => listener,
Err(err) => {
error!("Failed to bind to {} - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?", self.address);
@@ -33,19 +36,33 @@ impl Listener {
}
};
let mut framed_conn = UdpFramed::new(socket, NymCodec);
while !self.shutdown.is_shutdown() {
tokio::select! {
biased;
_ = self.shutdown.recv() => {
log::trace!("mixnet_handling::Listener: Received shutdown");
}
connection = tcp_listener.accept() => {
match connection {
Ok((socket, remote_addr)) => {
let handler = connection_handler.clone();
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
framed_sphinx_packet = framed_conn.next() => {
match framed_sphinx_packet {
Some(Ok((framed_sphinx_packet, remote))) => {
// TODO: benchmark spawning tokio task with full processing vs just processing it
// synchronously under higher load in single and multi-threaded situation.
// in theory we could process multiple sphinx packet from the same connection in parallel,
// but we already handle multiple concurrent connections so if anything, making
// that change would only slow things down
debug!("Handling packet from {remote:?}");
connection_handler.handle_received_packet(framed_sphinx_packet).await;
}
Err(err) => warn!("failed to get client: {err}"),
Some(Err(err)) => {
error!(
"The socket connection got corrupted with error: {err}. Closing the socket",
);
return;
}
None => break, // stream got closed by remote
}
}
}
+1 -1
View File
@@ -33,7 +33,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sysinfo = "0.27.7"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
tokio-util = { version = "0.7.3", features = ["codec"] }
tokio-util = { version = "0.7.3", features = ["codec", "net"] }
toml = "0.5.8"
url = { version = "2.2", features = ["serde"] }
cfg-if = "1.0.0"
@@ -54,7 +54,7 @@ impl ConnectionHandler {
feature = "cpucycles",
instrument(skip(self, framed_sphinx_packet), fields(cpucycles))
)]
fn handle_received_packet(&self, framed_sphinx_packet: FramedNymPacket) {
pub(crate) async fn handle_received_packet(&self, framed_sphinx_packet: FramedNymPacket) {
//
// TODO: here be replay attack detection - it will require similar key cache to the one in
// packet processor for vpn packets,
@@ -78,50 +78,50 @@ impl ConnectionHandler {
})
}
pub(crate) async fn handle_connection(
self,
conn: TcpStream,
remote: SocketAddr,
mut shutdown: TaskClient,
) {
debug!("Starting connection handler for {:?}", remote);
shutdown.mark_as_success();
let mut framed_conn = Framed::new(conn, NymCodec);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
log::trace!("ConnectionHandler: received shutdown");
}
framed_sphinx_packet = framed_conn.next() => {
match framed_sphinx_packet {
Some(Ok(framed_sphinx_packet)) => {
// TODO: benchmark spawning tokio task with full processing vs just processing it
// synchronously (without delaying inside of course,
// delay is moved to a global DelayQueue)
// under higher load in single and multi-threaded situation.
// pub(crate) async fn handle_connection(
// self,
// conn: TcpStream,
// remote: SocketAddr,
// mut shutdown: TaskClient,
// ) {
// debug!("Starting connection handler for {:?}", remote);
// //shutdown.mark_as_success();
// //let mut framed_conn = Framed::new(conn, NymCodec);
// while !shutdown.is_shutdown() {
// tokio::select! {
// biased;
// _ = shutdown.recv() => {
// log::trace!("ConnectionHandler: received shutdown");
// }
// framed_sphinx_packet = framed_conn.next() => {
// match framed_sphinx_packet {
// Some(Ok(framed_sphinx_packet)) => {
// // TODO: benchmark spawning tokio task with full processing vs just processing it
// // synchronously (without delaying inside of course,
// // delay is moved to a global DelayQueue)
// // under higher load in single and multi-threaded situation.
// in theory we could process multiple sphinx packet from the same connection in parallel,
// but we already handle multiple concurrent connections so if anything, making
// that change would only slow things down
self.handle_received_packet(framed_sphinx_packet);
}
Some(Err(err)) => {
error!(
"{remote:?} - The socket connection got corrupted with error: {err}. Closing the socket",
);
return;
}
None => break, // stream got closed by remote
}
},
}
}
// // in theory we could process multiple sphinx packet from the same connection in parallel,
// // but we already handle multiple concurrent connections so if anything, making
// // that change would only slow things down
// self.handle_received_packet(framed_sphinx_packet);
// }
// Some(Err(err)) => {
// error!(
// "{remote:?} - The socket connection got corrupted with error: {err}. Closing the socket",
// );
// return;
// }
// None => break, // stream got closed by remote
// }
// },
// }
// }
info!(
"Closing connection from {:?}",
framed_conn.into_inner().peer_addr()
);
log::trace!("ConnectionHandler: Exiting");
}
// info!(
// "Closing connection from {:?}",
// framed_conn.into_inner().peer_addr()
// );
// log::trace!("ConnectionHandler: Exiting");
// }
}
+29 -9
View File
@@ -2,10 +2,13 @@
// SPDX-License-Identifier: Apache-2.0
use crate::node::listener::connection_handler::ConnectionHandler;
use futures::StreamExt;
use nym_sphinx::framing::codec::NymCodec;
use std::net::SocketAddr;
use std::process;
use tokio::net::TcpListener;
use tokio::net::UdpSocket;
use tokio::task::JoinHandle;
use tokio_util::udp::UdpFramed;
#[cfg(feature = "cpucycles")]
use tracing::error;
@@ -25,7 +28,7 @@ impl Listener {
async fn run(&mut self, connection_handler: ConnectionHandler) {
log::trace!("Starting Listener");
let listener = match TcpListener::bind(self.address).await {
let socket = match UdpSocket::bind(self.address).await {
Ok(listener) => listener,
Err(err) => {
error!("Failed to bind to {} - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?", self.address);
@@ -33,19 +36,36 @@ impl Listener {
}
};
let mut framed_conn = UdpFramed::new(socket, NymCodec);
while !self.shutdown.is_shutdown() {
tokio::select! {
biased;
_ = self.shutdown.recv() => {
log::trace!("Listener: Received shutdown");
}
connection = listener.accept() => {
match connection {
Ok((socket, remote_addr)) => {
let handler = connection_handler.clone();
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
},
framed_sphinx_packet = framed_conn.next() => {
match framed_sphinx_packet {
Some(Ok((framed_sphinx_packet, remote))) => {
// TODO: benchmark spawning tokio task with full processing vs just processing it
// synchronously (without delaying inside of course,
// delay is moved to a global DelayQueue)
// under higher load in single and multi-threaded situation.
// in theory we could process multiple sphinx packet from the same connection in parallel,
// but we already handle multiple concurrent connections so if anything, making
// that change would only slow things down
debug!("Handling packet from {remote:?}");
let connection_handler_clone = connection_handler.clone();
tokio::spawn(async move {connection_handler_clone.handle_received_packet(framed_sphinx_packet).await });
}
Err(err) => warn!("Failed to accept incoming connection - {err}"),
Some(Err(err)) => {
error!(
"The socket connection got corrupted with error: {err}. Closing the socket",
);
return;
}
None => break, // stream got closed by remote
}
},
};