Compare commits
1 Commits
test_crate2
...
simon/udp
| Author | SHA1 | Date | |
|---|---|---|---|
| 005b67ef0f |
@@ -15,9 +15,10 @@ use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::net::{TcpStream, UdpSocket};
|
||||
use tokio::time::sleep;
|
||||
use tokio_util::codec::Framed;
|
||||
use tokio_util::udp::UdpFramed;
|
||||
|
||||
pub struct Config {
|
||||
initial_reconnection_backoff: Duration,
|
||||
@@ -57,136 +58,98 @@ pub trait SendWithoutResponse {
|
||||
}
|
||||
|
||||
pub struct Client {
|
||||
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
|
||||
conn_new: Option<mpsc::Sender<(FramedNymPacket, SocketAddr)>>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
struct ConnectionSender {
|
||||
channel: mpsc::Sender<FramedNymPacket>,
|
||||
current_reconnection_attempt: Arc<AtomicU32>,
|
||||
}
|
||||
|
||||
impl ConnectionSender {
|
||||
fn new(channel: mpsc::Sender<FramedNymPacket>) -> Self {
|
||||
ConnectionSender {
|
||||
channel,
|
||||
current_reconnection_attempt: Arc::new(AtomicU32::new(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(config: Config) -> Client {
|
||||
Client {
|
||||
conn_new: HashMap::new(),
|
||||
conn_new: None,
|
||||
config,
|
||||
}
|
||||
}
|
||||
|
||||
async fn manage_connection(
|
||||
address: SocketAddr,
|
||||
receiver: mpsc::Receiver<FramedNymPacket>,
|
||||
connection_timeout: Duration,
|
||||
current_reconnection: &AtomicU32,
|
||||
receiver: mpsc::Receiver<(FramedNymPacket, SocketAddr)>,
|
||||
) {
|
||||
let connection_fut = TcpStream::connect(address);
|
||||
|
||||
let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
|
||||
Ok(stream_res) => match stream_res {
|
||||
Ok(stream) => {
|
||||
debug!("Managed to establish connection to {}", address);
|
||||
// if we managed to connect, reset the reconnection count (whatever it might have been)
|
||||
current_reconnection.store(0, Ordering::Release);
|
||||
Framed::new(stream, NymCodec)
|
||||
}
|
||||
Err(err) => {
|
||||
debug!(
|
||||
"failed to establish connection to {} (err: {})",
|
||||
address, err
|
||||
);
|
||||
return;
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
debug!(
|
||||
"failed to connect to {} within {:?}",
|
||||
address, connection_timeout
|
||||
);
|
||||
|
||||
// we failed to connect - increase reconnection attempt
|
||||
current_reconnection.fetch_add(1, Ordering::SeqCst);
|
||||
let socket = match UdpSocket::bind("0.0.0.0:0").await {
|
||||
Ok(socket) => socket,
|
||||
Err(err) => {
|
||||
error!("Failed to bind to - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let framed_conn = UdpFramed::new(socket, NymCodec);
|
||||
// Take whatever the receiver channel produces and put it on the connection.
|
||||
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
|
||||
// about neither receiver nor the connection, it doesn't matter which one gets consumed
|
||||
if let Err(err) = receiver.map(Ok).forward(conn).await {
|
||||
if let Err(err) = receiver.map(Ok).forward(framed_conn).await {
|
||||
warn!("Failed to forward packets to {} - {err}", address);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
|
||||
address
|
||||
"connection manager is finished. Either the connection failed or mixnet client got dropped"
|
||||
);
|
||||
}
|
||||
|
||||
/// If we're trying to reconnect, determine how long we should wait.
|
||||
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
|
||||
if current_attempt == 0 {
|
||||
None
|
||||
} else {
|
||||
let exp = 2_u32.checked_pow(current_attempt);
|
||||
let backoff = exp
|
||||
.and_then(|exp| self.config.initial_reconnection_backoff.checked_mul(exp))
|
||||
.unwrap_or(self.config.maximum_reconnection_backoff);
|
||||
// fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
|
||||
// if current_attempt == 0 {
|
||||
// None
|
||||
// } else {
|
||||
// let exp = 2_u32.checked_pow(current_attempt);
|
||||
// let backoff = exp
|
||||
// .and_then(|exp| self.config.initial_reconnection_backoff.checked_mul(exp))
|
||||
// .unwrap_or(self.config.maximum_reconnection_backoff);
|
||||
|
||||
Some(std::cmp::min(
|
||||
backoff,
|
||||
self.config.maximum_reconnection_backoff,
|
||||
))
|
||||
}
|
||||
}
|
||||
// Some(std::cmp::min(
|
||||
// backoff,
|
||||
// self.config.maximum_reconnection_backoff,
|
||||
// ))
|
||||
// }
|
||||
// }
|
||||
|
||||
fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) {
|
||||
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
|
||||
|
||||
// this CAN'T fail because we just created the channel which has a non-zero capacity
|
||||
if self.config.maximum_connection_buffer_size > 0 {
|
||||
sender.try_send(pending_packet).unwrap();
|
||||
sender.try_send((pending_packet, address.into())).unwrap();
|
||||
}
|
||||
self.conn_new = Some(sender);
|
||||
|
||||
// if we already tried to connect to `address` before, grab the current attempt count
|
||||
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
|
||||
existing.channel = sender;
|
||||
Arc::clone(&existing.current_reconnection_attempt)
|
||||
} else {
|
||||
let new_entry = ConnectionSender::new(sender);
|
||||
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
|
||||
self.conn_new.insert(address, new_entry);
|
||||
current_attempt
|
||||
};
|
||||
// let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
|
||||
// existing.channel = sender;
|
||||
// Arc::clone(&existing.current_reconnection_attempt)
|
||||
// } else {
|
||||
// let new_entry = ConnectionSender::new(sender);
|
||||
// let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
|
||||
// self.conn_new.insert(address, new_entry);
|
||||
// current_attempt
|
||||
// };
|
||||
|
||||
// load the actual value.
|
||||
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
|
||||
let backoff = self.determine_backoff(reconnection_attempt);
|
||||
// let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
|
||||
// let backoff = self.determine_backoff(reconnection_attempt);
|
||||
|
||||
// copy the value before moving into another task
|
||||
let initial_connection_timeout = self.config.initial_connection_timeout;
|
||||
// let initial_connection_timeout = self.config.initial_connection_timeout;
|
||||
|
||||
tokio::spawn(async move {
|
||||
// before executing the manager, wait for what was specified, if anything
|
||||
if let Some(backoff) = backoff {
|
||||
trace!("waiting for {:?} before attempting connection", backoff);
|
||||
sleep(backoff).await;
|
||||
}
|
||||
// if let Some(backoff) = backoff {
|
||||
// trace!("waiting for {:?} before attempting connection", backoff);
|
||||
// sleep(backoff).await;
|
||||
// }
|
||||
|
||||
Self::manage_connection(
|
||||
address.into(),
|
||||
receiver,
|
||||
initial_connection_timeout,
|
||||
¤t_reconnection_attempt,
|
||||
//initial_connection_timeout,
|
||||
//¤t_reconnection_attempt,
|
||||
)
|
||||
.await
|
||||
});
|
||||
@@ -204,48 +167,53 @@ impl SendWithoutResponse for Client {
|
||||
let framed_packet =
|
||||
FramedNymPacket::new(packet, packet_type, self.config.use_legacy_version);
|
||||
|
||||
if let Some(sender) = self.conn_new.get_mut(&address) {
|
||||
if let Err(err) = sender.channel.try_send(framed_packet) {
|
||||
if err.is_full() {
|
||||
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
))
|
||||
} else if err.is_disconnected() {
|
||||
debug!(
|
||||
"Connection to {} seems to be dead. attempting to re-establish it...",
|
||||
address
|
||||
);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, err.into_inner());
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
"reconnection attempt is in progress",
|
||||
))
|
||||
match &self.conn_new {
|
||||
Some(sender) => {
|
||||
if let Err(err) = sender.clone().try_send((framed_packet, address.into())) {
|
||||
if err.is_full() {
|
||||
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet
|
||||
// if the queue is full, we can't really do anything but to drop the packet
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::WouldBlock,
|
||||
"connection queue is full",
|
||||
))
|
||||
} else if err.is_disconnected() {
|
||||
debug!(
|
||||
"Connection to {} seems to be dead. attempting to re-establish it...",
|
||||
address
|
||||
);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue
|
||||
// it up to send it as soon as the connection is re-established
|
||||
self.make_connection(address, err.into_inner().0);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::ConnectionAborted,
|
||||
"reconnection attempt is in progress",
|
||||
))
|
||||
} else {
|
||||
// this can't really happen, but let's safe-guard against it in case something changes in futures library
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"unknown connection buffer error",
|
||||
))
|
||||
}
|
||||
} else {
|
||||
// this can't really happen, but let's safe-guard against it in case something changes in futures library
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"unknown connection buffer error",
|
||||
))
|
||||
debug!("Sending packet to {:?}", address);
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
// there was never a connection to begin with
|
||||
debug!("establishing initial connection to {}", address);
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
|
||||
// for sending for as soon as the connection is created
|
||||
self.make_connection(address, framed_packet);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
"connection is in progress",
|
||||
))
|
||||
|
||||
None => {
|
||||
// there was never a connection to begin with
|
||||
debug!("establishing initial connection");
|
||||
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
|
||||
// for sending for as soon as the connection is created
|
||||
self.make_connection(address, framed_packet);
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
"connection is in progress",
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ const ACK_IV_SIZE: usize = 16;
|
||||
|
||||
const ACK_PACKET_SIZE: usize = ACK_IV_SIZE + FRAG_ID_LEN + SPHINX_PACKET_OVERHEAD;
|
||||
const REGULAR_PACKET_SIZE: usize = 2 * 1024 + SPHINX_PACKET_OVERHEAD;
|
||||
const EXTENDED_PACKET_SIZE_8: usize = 8 * 1024 + SPHINX_PACKET_OVERHEAD;
|
||||
const EXTENDED_PACKET_SIZE_8: usize = 1 * 512 + SPHINX_PACKET_OVERHEAD;
|
||||
const EXTENDED_PACKET_SIZE_16: usize = 16 * 1024 + SPHINX_PACKET_OVERHEAD;
|
||||
const EXTENDED_PACKET_SIZE_32: usize = 32 * 1024 + SPHINX_PACKET_OVERHEAD;
|
||||
|
||||
|
||||
@@ -155,7 +155,7 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
self.forward_ack(forward_ack, client_address);
|
||||
}
|
||||
|
||||
async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedNymPacket) {
|
||||
pub(crate) async fn handle_received_packet(&mut self, framed_sphinx_packet: FramedNymPacket) {
|
||||
//
|
||||
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
||||
// packet processor for vpn packets,
|
||||
@@ -174,47 +174,47 @@ impl<St: Storage> ConnectionHandler<St> {
|
||||
self.handle_processed_packet(processed_final_hop).await
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_connection(
|
||||
mut self,
|
||||
conn: TcpStream,
|
||||
remote: SocketAddr,
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
debug!("Starting connection handler for {:?}", remote);
|
||||
shutdown.mark_as_success();
|
||||
let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("ConnectionHandler: received shutdown");
|
||||
}
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
Some(Ok(framed_sphinx_packet)) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously under higher load in single and multi-threaded situation.
|
||||
// pub(crate) async fn handle_connection(
|
||||
// mut self,
|
||||
// conn: TcpStream,
|
||||
// remote: SocketAddr,
|
||||
// mut shutdown: TaskClient,
|
||||
// ) {
|
||||
// debug!("Starting connection handler for {:?}", remote);
|
||||
// shutdown.mark_as_success();
|
||||
// let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
// while !shutdown.is_shutdown() {
|
||||
// tokio::select! {
|
||||
// biased;
|
||||
// _ = shutdown.recv() => {
|
||||
// log::trace!("ConnectionHandler: received shutdown");
|
||||
// }
|
||||
// framed_sphinx_packet = framed_conn.next() => {
|
||||
// match framed_sphinx_packet {
|
||||
// Some(Ok(framed_sphinx_packet)) => {
|
||||
// // TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// // synchronously under higher load in single and multi-threaded situation.
|
||||
|
||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// but we already handle multiple concurrent connections so if anything, making
|
||||
// that change would only slow things down
|
||||
self.handle_received_packet(framed_sphinx_packet).await;
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
error!(
|
||||
"The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
);
|
||||
return;
|
||||
}
|
||||
None => break, // stream got closed by remote
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// // in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// // but we already handle multiple concurrent connections so if anything, making
|
||||
// // that change would only slow things down
|
||||
// self.handle_received_packet(framed_sphinx_packet).await;
|
||||
// }
|
||||
// Some(Err(err)) => {
|
||||
// error!(
|
||||
// "The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
// );
|
||||
// return;
|
||||
// }
|
||||
// None => break, // stream got closed by remote
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
info!(
|
||||
"Closing connection from {:?}",
|
||||
framed_conn.into_inner().peer_addr()
|
||||
);
|
||||
}
|
||||
// info!(
|
||||
// "Closing connection from {:?}",
|
||||
// framed_conn.into_inner().peer_addr()
|
||||
// );
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -3,11 +3,14 @@
|
||||
|
||||
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
|
||||
use crate::node::storage::Storage;
|
||||
use futures::StreamExt;
|
||||
use log::*;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use nym_task::TaskClient;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::udp::UdpFramed;
|
||||
|
||||
pub(crate) struct Listener {
|
||||
address: SocketAddr,
|
||||
@@ -20,12 +23,12 @@ impl Listener {
|
||||
Listener { address, shutdown }
|
||||
}
|
||||
|
||||
pub(crate) async fn run<St>(&mut self, connection_handler: ConnectionHandler<St>)
|
||||
pub(crate) async fn run<St>(&mut self, mut connection_handler: ConnectionHandler<St>)
|
||||
where
|
||||
St: Storage + Clone + 'static,
|
||||
{
|
||||
info!("Starting mixnet listener at {}", self.address);
|
||||
let tcp_listener = match tokio::net::TcpListener::bind(self.address).await {
|
||||
let socket = match tokio::net::UdpSocket::bind(self.address).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
error!("Failed to bind to {} - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?", self.address);
|
||||
@@ -33,19 +36,33 @@ impl Listener {
|
||||
}
|
||||
};
|
||||
|
||||
let mut framed_conn = UdpFramed::new(socket, NymCodec);
|
||||
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("mixnet_handling::Listener: Received shutdown");
|
||||
}
|
||||
connection = tcp_listener.accept() => {
|
||||
match connection {
|
||||
Ok((socket, remote_addr)) => {
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
Some(Ok((framed_sphinx_packet, remote))) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously under higher load in single and multi-threaded situation.
|
||||
|
||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// but we already handle multiple concurrent connections so if anything, making
|
||||
// that change would only slow things down
|
||||
debug!("Handling packet from {remote:?}");
|
||||
connection_handler.handle_received_packet(framed_sphinx_packet).await;
|
||||
}
|
||||
Err(err) => warn!("failed to get client: {err}"),
|
||||
Some(Err(err)) => {
|
||||
error!(
|
||||
"The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
);
|
||||
return;
|
||||
}
|
||||
None => break, // stream got closed by remote
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+1
-1
@@ -33,7 +33,7 @@ serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sysinfo = "0.27.7"
|
||||
tokio = { version = "1.21.2", features = ["rt-multi-thread", "net", "signal"] }
|
||||
tokio-util = { version = "0.7.3", features = ["codec"] }
|
||||
tokio-util = { version = "0.7.3", features = ["codec", "net"] }
|
||||
toml = "0.5.8"
|
||||
url = { version = "2.2", features = ["serde"] }
|
||||
cfg-if = "1.0.0"
|
||||
|
||||
@@ -54,7 +54,7 @@ impl ConnectionHandler {
|
||||
feature = "cpucycles",
|
||||
instrument(skip(self, framed_sphinx_packet), fields(cpucycles))
|
||||
)]
|
||||
fn handle_received_packet(&self, framed_sphinx_packet: FramedNymPacket) {
|
||||
pub(crate) async fn handle_received_packet(&self, framed_sphinx_packet: FramedNymPacket) {
|
||||
//
|
||||
// TODO: here be replay attack detection - it will require similar key cache to the one in
|
||||
// packet processor for vpn packets,
|
||||
@@ -78,50 +78,50 @@ impl ConnectionHandler {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_connection(
|
||||
self,
|
||||
conn: TcpStream,
|
||||
remote: SocketAddr,
|
||||
mut shutdown: TaskClient,
|
||||
) {
|
||||
debug!("Starting connection handler for {:?}", remote);
|
||||
shutdown.mark_as_success();
|
||||
let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
while !shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = shutdown.recv() => {
|
||||
log::trace!("ConnectionHandler: received shutdown");
|
||||
}
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
Some(Ok(framed_sphinx_packet)) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously (without delaying inside of course,
|
||||
// delay is moved to a global DelayQueue)
|
||||
// under higher load in single and multi-threaded situation.
|
||||
// pub(crate) async fn handle_connection(
|
||||
// self,
|
||||
// conn: TcpStream,
|
||||
// remote: SocketAddr,
|
||||
// mut shutdown: TaskClient,
|
||||
// ) {
|
||||
// debug!("Starting connection handler for {:?}", remote);
|
||||
// //shutdown.mark_as_success();
|
||||
// //let mut framed_conn = Framed::new(conn, NymCodec);
|
||||
// while !shutdown.is_shutdown() {
|
||||
// tokio::select! {
|
||||
// biased;
|
||||
// _ = shutdown.recv() => {
|
||||
// log::trace!("ConnectionHandler: received shutdown");
|
||||
// }
|
||||
// framed_sphinx_packet = framed_conn.next() => {
|
||||
// match framed_sphinx_packet {
|
||||
// Some(Ok(framed_sphinx_packet)) => {
|
||||
// // TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// // synchronously (without delaying inside of course,
|
||||
// // delay is moved to a global DelayQueue)
|
||||
// // under higher load in single and multi-threaded situation.
|
||||
|
||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// but we already handle multiple concurrent connections so if anything, making
|
||||
// that change would only slow things down
|
||||
self.handle_received_packet(framed_sphinx_packet);
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
error!(
|
||||
"{remote:?} - The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
);
|
||||
return;
|
||||
}
|
||||
None => break, // stream got closed by remote
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// // in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// // but we already handle multiple concurrent connections so if anything, making
|
||||
// // that change would only slow things down
|
||||
// self.handle_received_packet(framed_sphinx_packet);
|
||||
// }
|
||||
// Some(Err(err)) => {
|
||||
// error!(
|
||||
// "{remote:?} - The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
// );
|
||||
// return;
|
||||
// }
|
||||
// None => break, // stream got closed by remote
|
||||
// }
|
||||
// },
|
||||
// }
|
||||
// }
|
||||
|
||||
info!(
|
||||
"Closing connection from {:?}",
|
||||
framed_conn.into_inner().peer_addr()
|
||||
);
|
||||
log::trace!("ConnectionHandler: Exiting");
|
||||
}
|
||||
// info!(
|
||||
// "Closing connection from {:?}",
|
||||
// framed_conn.into_inner().peer_addr()
|
||||
// );
|
||||
// log::trace!("ConnectionHandler: Exiting");
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -2,10 +2,13 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::node::listener::connection_handler::ConnectionHandler;
|
||||
use futures::StreamExt;
|
||||
use nym_sphinx::framing::codec::NymCodec;
|
||||
use std::net::SocketAddr;
|
||||
use std::process;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::udp::UdpFramed;
|
||||
#[cfg(feature = "cpucycles")]
|
||||
use tracing::error;
|
||||
|
||||
@@ -25,7 +28,7 @@ impl Listener {
|
||||
|
||||
async fn run(&mut self, connection_handler: ConnectionHandler) {
|
||||
log::trace!("Starting Listener");
|
||||
let listener = match TcpListener::bind(self.address).await {
|
||||
let socket = match UdpSocket::bind(self.address).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) => {
|
||||
error!("Failed to bind to {} - {err}. Are you sure nothing else is running on the specified port and your user has sufficient permission to bind to the requested address?", self.address);
|
||||
@@ -33,19 +36,36 @@ impl Listener {
|
||||
}
|
||||
};
|
||||
|
||||
let mut framed_conn = UdpFramed::new(socket, NymCodec);
|
||||
|
||||
while !self.shutdown.is_shutdown() {
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = self.shutdown.recv() => {
|
||||
log::trace!("Listener: Received shutdown");
|
||||
}
|
||||
connection = listener.accept() => {
|
||||
match connection {
|
||||
Ok((socket, remote_addr)) => {
|
||||
let handler = connection_handler.clone();
|
||||
tokio::spawn(handler.handle_connection(socket, remote_addr, self.shutdown.clone()));
|
||||
},
|
||||
framed_sphinx_packet = framed_conn.next() => {
|
||||
match framed_sphinx_packet {
|
||||
Some(Ok((framed_sphinx_packet, remote))) => {
|
||||
// TODO: benchmark spawning tokio task with full processing vs just processing it
|
||||
// synchronously (without delaying inside of course,
|
||||
// delay is moved to a global DelayQueue)
|
||||
// under higher load in single and multi-threaded situation.
|
||||
|
||||
// in theory we could process multiple sphinx packet from the same connection in parallel,
|
||||
// but we already handle multiple concurrent connections so if anything, making
|
||||
// that change would only slow things down
|
||||
debug!("Handling packet from {remote:?}");
|
||||
let connection_handler_clone = connection_handler.clone();
|
||||
tokio::spawn(async move {connection_handler_clone.handle_received_packet(framed_sphinx_packet).await });
|
||||
}
|
||||
Err(err) => warn!("Failed to accept incoming connection - {err}"),
|
||||
Some(Err(err)) => {
|
||||
error!(
|
||||
"The socket connection got corrupted with error: {err}. Closing the socket",
|
||||
);
|
||||
return;
|
||||
}
|
||||
None => break, // stream got closed by remote
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user