Compare commits

...

6 Commits

Author SHA1 Message Date
Jon Häggblad f62bc5addb wip: first try on connected client listener 2024-02-07 09:38:17 +01:00
Jon Häggblad 0d4254f65a wip: start play with connected client handler 2024-02-06 21:34:24 +01:00
Jon Häggblad 25e35384d9 Lower timer to 20ms 2024-02-05 13:14:45 +01:00
Jon Häggblad e4479f2655 Bundle timer 2024-02-02 00:20:06 +01:00
Jon Häggblad deef1092a8 wip: bundle packets from tun 2024-02-01 18:30:50 +01:00
Jon Häggblad efaf55c1b3 wip 2024-01-31 23:44:18 +01:00
6 changed files with 416 additions and 26 deletions
Generated
+1
View File
@@ -6704,6 +6704,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-tun",
"tokio-util",
"url",
]
@@ -168,6 +168,7 @@ impl MixnetClient {
}
}
#[derive(Clone)]
pub struct MixnetClientSender {
client_input: ClientInput,
packet_type: Option<PacketType>,
@@ -40,6 +40,7 @@ serde_json = { workspace = true }
tap.workspace = true
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-util = { workspace = true, features = ["codec"] }
url.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
@@ -9,7 +9,7 @@ mod constants;
pub mod error;
mod ip_packet_router;
mod mixnet_client;
mod mixnet_listener;
pub mod mixnet_listener;
pub mod request_filter;
mod tun_listener;
mod util;
@@ -3,6 +3,7 @@ use std::{
net::{IpAddr, SocketAddr},
};
use bytes::{Buf, Bytes, BytesMut};
use futures::StreamExt;
use nym_ip_packet_requests::{
DynamicConnectFailureReason, IpPacketRequest, IpPacketRequestData, IpPacketResponse,
@@ -13,6 +14,7 @@ use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::TaskHandle;
#[cfg(target_os = "linux")]
use tokio::io::AsyncWriteExt;
use tokio_util::codec::{Decoder, Encoder};
use crate::{
config::Config,
@@ -26,6 +28,80 @@ use crate::{
},
};
// Tokio codec for bundling multiple IP packets into one buffer that is at most 1500 bytes long.
// These packets are separated by a 2 byte length prefix.
pub struct BundledIpPacketCodec {
buffer: BytesMut,
}
impl BundledIpPacketCodec {
pub fn new() -> Self {
BundledIpPacketCodec {
buffer: BytesMut::new(),
}
}
pub fn flush_current_buffer(&mut self) -> Bytes {
let mut buffer_so_far = BytesMut::new();
// TODO: is it possible to move the buffer instead of copying it?
buffer_so_far.extend_from_slice(&self.buffer);
self.buffer = BytesMut::new();
buffer_so_far.freeze()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
}
impl Encoder<Bytes> for BundledIpPacketCodec {
type Error = IpPacketRouterError;
fn encode(&mut self, packet: Bytes, dst: &mut BytesMut) -> Result<()> {
let packet_size = packet.len();
if self.buffer.len() + packet_size + 2 > 1500 {
// If the packet doesn't fit in the buffer, send the buffer and then add it to the buffer
dst.extend_from_slice(&self.buffer);
self.buffer = BytesMut::new();
}
// Add the packet to the buffer
self.buffer
.extend_from_slice(&(packet_size as u16).to_be_bytes());
self.buffer.extend_from_slice(&packet);
Ok(())
}
}
impl Decoder for BundledIpPacketCodec {
type Item = Bytes;
type Error = IpPacketRouterError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
if src.len() < 2 {
// Not enough bytes to read the length prefix
return Ok(None);
}
let packet_size = u16::from_be_bytes([src[0], src[1]]) as usize;
if src.len() < packet_size + 2 {
// Not enough bytes to read the packet
return Ok(None);
}
// Remove the length prefix
src.advance(2);
// Read the packet
let packet = src.split_to(packet_size);
Ok(Some(packet.freeze()))
}
}
#[cfg(target_os = "linux")]
pub(crate) struct MixnetListener {
pub(crate) _config: Config,
@@ -41,13 +117,14 @@ pub(crate) struct ConnectedClients {
connected_client_tx: tokio::sync::mpsc::UnboundedSender<ConnectedClientEvent>,
}
// TODO: move this to the tun_listener module?
pub(crate) struct ConnectedClientsListener {
clients: HashMap<IpAddr, ConnectedClient>,
clients: HashMap<IpAddr, ConnectedClientMirror>,
pub(crate) connected_client_rx: tokio::sync::mpsc::UnboundedReceiver<ConnectedClientEvent>,
}
impl ConnectedClientsListener {
pub(crate) fn get(&self, ip: &IpAddr) -> Option<&ConnectedClient> {
pub(crate) fn get(&self, ip: &IpAddr) -> Option<&ConnectedClientMirror> {
self.clients.get(ip)
}
@@ -58,14 +135,16 @@ impl ConnectedClientsListener {
ip,
nym_address,
mix_hops,
forward_from_tun_tx,
} = *connected_event;
log::trace!("Connect client: {ip}");
self.clients.insert(
ip,
ConnectedClient {
ConnectedClientMirror {
nym_address,
mix_hops,
last_activity: std::time::Instant::now(),
forward_from_tun_tx,
},
);
}
@@ -75,6 +154,11 @@ impl ConnectedClientsListener {
}
}
}
// TEMP
// pub(crate) fn get_first(&self) -> Option<&ConnectedClientMirror> {
// self.clients.values().next()
// }
}
impl ConnectedClients {
@@ -118,20 +202,33 @@ impl ConnectedClients {
.find(|client| client.nym_address == *nym_address)
}
fn connect(&mut self, ip: IpAddr, nym_address: Recipient, mix_hops: Option<u8>) {
fn connect(
&mut self,
ip: IpAddr,
nym_address: Recipient,
mix_hops: Option<u8>,
forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
close_tx: tokio::sync::oneshot::Sender<()>,
) {
// The map of connected clients that the mixnet listener keeps track of. It monitors
// activity and disconnects clients that have been inactive for too long.
self.clients.insert(
ip,
ConnectedClient {
nym_address,
mix_hops,
last_activity: std::time::Instant::now(),
close_tx: Some(close_tx),
},
);
// Send the connected client info to the tun listener, which will use it to forward packets
// to the connected client handler.
self.connected_client_tx
.send(ConnectedClientEvent::Connect(Box::new(ConnectEvent {
ip,
nym_address,
mix_hops,
forward_from_tun_tx,
})))
.unwrap();
}
@@ -160,6 +257,7 @@ impl ConnectedClients {
.collect();
for ip in inactive_clients {
log::info!("Disconnect inactive client: {ip}");
// TODO: confirm this also stops the connected client handler
self.clients.remove(&ip);
self.connected_client_tx
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(ip)))
@@ -176,6 +274,31 @@ pub(crate) struct ConnectedClient {
pub(crate) nym_address: Recipient,
pub(crate) mix_hops: Option<u8>,
pub(crate) last_activity: std::time::Instant,
// Send to connected clients listener to stop
// This is inside an Option only because we want to send in Drop
pub(crate) close_tx: Option<tokio::sync::oneshot::Sender<()>>,
// Forward to the connected clients listener packets that we have read from the TUN
// pub(crate) tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
}
impl Drop for ConnectedClient {
fn drop(&mut self) {
log::info!("Dropping connected client: {}", self.nym_address);
if let Some(close_tx) = self.close_tx.take() {
log::info!("Sending close signal to connected client handler");
close_tx.send(()).unwrap();
}
}
}
pub(crate) struct ConnectedClientMirror {
pub(crate) nym_address: Recipient,
pub(crate) mix_hops: Option<u8>,
pub(crate) last_activity: std::time::Instant,
// Send to connected clients listener to stop
// pub(crate) close_tx: tokio::sync::oneshot::Sender<()>,
// Forward to the connected clients listener packets that we have read from the TUN
pub(crate) forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
}
#[cfg(target_os = "linux")]
@@ -217,8 +340,27 @@ impl MixnetListener {
}
(false, false) => {
log::info!("Connecting a new client");
self.connected_clients
.connect(requested_ip, reply_to, reply_to_hops);
// Spawn the ConnectedClientHandler task
let (close_tx, close_rx) = tokio::sync::oneshot::channel();
let (forward_from_tun_tx, forward_from_tun_rx) =
tokio::sync::mpsc::unbounded_channel();
let connected_client_handler = crate::tun_listener::ConnectedClientHandler::new(
reply_to,
reply_to_hops,
forward_from_tun_rx,
self.mixnet_client.split_sender(),
close_rx,
);
connected_client_handler.start();
self.connected_clients.connect(
requested_ip,
reply_to,
reply_to_hops,
forward_from_tun_tx,
close_tx,
);
Ok(Some(IpPacketResponse::new_static_connect_success(
request_id, reply_to,
)))
@@ -285,8 +427,25 @@ impl MixnetListener {
)));
};
self.connected_clients
.connect(new_ip, reply_to, reply_to_hops);
// Spawn ConnectedClientHandler
let (close_tx, close_rx) = tokio::sync::oneshot::channel();
let (forward_from_tun_tx, forward_from_tun_rx) = tokio::sync::mpsc::unbounded_channel();
let connected_client_handler = crate::tun_listener::ConnectedClientHandler::new(
reply_to,
reply_to_hops,
forward_from_tun_rx,
self.mixnet_client.split_sender(),
close_rx,
);
connected_client_handler.start();
self.connected_clients.connect(
new_ip,
reply_to,
reply_to_hops,
forward_from_tun_tx,
close_tx,
);
Ok(Some(IpPacketResponse::new_dynamic_connect_success(
request_id, reply_to, new_ip,
)))
@@ -298,6 +457,20 @@ impl MixnetListener {
) -> Result<Option<IpPacketResponse>> {
log::trace!("Received data request");
let mut codec = BundledIpPacketCodec::new();
// convert from Bytes to BytesMut
let mut bytes = BytesMut::new();
bytes.extend_from_slice(&data_request.ip_packet);
// let mut bytes = BytesMut::new(data_request.ip_packet.clone());
while let Ok(Some(p)) = codec.decode(&mut bytes) {
if let Err(err) = self.handle_packet(&p).await {
log::error!("mixnet_listener: failed to handle packet: {err}");
}
}
Ok(None)
}
async fn handle_packet(&mut self, ip_packet: &Bytes) -> Result<Option<IpPacketRouterError>> {
// We don't forward packets that we are not able to parse. BUT, there might be a good
// reason to still forward them.
//
@@ -310,7 +483,8 @@ impl MixnetListener {
src_addr,
dst_addr,
dst,
} = parse_packet(&data_request.ip_packet)?;
} = parse_packet(&ip_packet)?;
// } = parse_packet(&data_request.ip_packet)?;
let dst_str = dst.map_or(dst_addr.to_string(), |dst| dst.to_string());
log::info!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
@@ -331,7 +505,8 @@ impl MixnetListener {
}
// TODO: consider changing from Vec<u8> to bytes::Bytes?
let packet = data_request.ip_packet;
// let packet = data_request.ip_packet;
let packet = ip_packet;
self.tun_writer
.write_all(&packet)
.await
@@ -450,4 +625,5 @@ pub(crate) struct ConnectEvent {
pub(crate) ip: IpAddr,
pub(crate) nym_address: Recipient,
pub(crate) mix_hops: Option<u8>,
pub(crate) forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
}
@@ -1,8 +1,12 @@
use std::time::Duration;
use bytes::{Buf, Bytes, BytesMut};
use nym_ip_packet_requests::IpPacketResponse;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sdk::mixnet::{MixnetMessageSender, Recipient};
use nym_task::TaskClient;
#[cfg(target_os = "linux")]
use tokio::io::AsyncReadExt;
use tokio_util::codec::{Decoder, Encoder};
use crate::{
error::{IpPacketRouterError, Result},
@@ -10,6 +14,143 @@ use crate::{
util::{create_message::create_input_message, parse_ip::parse_dst_addr},
};
// Data flow is
// mixnet_listener -> decode -> handle_packet -> write_to_tun
// tun_listener -> (task: send to connected client handler for processing -> encode) -> mixnet_sender
// This handler is spawned as a task, and it listens to IP packets passed from the tun_listener,
// encodes it, and then sends to mixnet.
pub(crate) struct ConnectedClientHandler {
nym_address: Recipient,
mix_hops: Option<u8>,
tun_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
codec: mixnet_listener::BundledIpPacketCodec,
codec_timer: tokio::time::Interval,
close_rx: tokio::sync::oneshot::Receiver<()>,
}
impl ConnectedClientHandler {
pub(crate) fn new(
nym_address: Recipient,
mix_hops: Option<u8>,
tun_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
close_rx: tokio::sync::oneshot::Receiver<()>,
) -> Self {
Self {
nym_address,
mix_hops,
tun_rx,
mixnet_client_sender,
codec: mixnet_listener::BundledIpPacketCodec::new(),
codec_timer: tokio::time::interval(Duration::from_millis(20)),
close_rx,
}
}
async fn flush_current_bundled_packets_and_send(
&mut self,
// bundled_packet_codec: &mut mixnet_listener::BundledIpPacketCodec,
// bundle_timer: &mut tokio::time::Interval,
) -> Result<()> {
let mut bundled_packets = self.codec.flush_current_buffer();
if !bundled_packets.is_empty() {
// // TEMPORARY
// let Some(connect_client) = self.connected_clients.get_first() else {
// return Ok(());
// };
// let mixnet_listener::ConnectedClient {
// nym_address,
// mix_hops,
// ..
// } = connect_client;
let response_packet = IpPacketResponse::new_ip_packet(bundled_packets)
.to_bytes()
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
source: err,
})?;
let input_message =
create_input_message(self.nym_address, response_packet, self.mix_hops);
self.mixnet_client_sender
.send(input_message)
.await
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
}
Ok(())
}
async fn handle_packet(&mut self, packet: Vec<u8>) -> Result<()> {
// If we are the first packet, start the timer
if self.codec.is_empty() {
self.codec_timer.reset();
}
// Bunch together
let packet_bytes = Bytes::from(packet);
let mut bundled_packets = BytesMut::new();
self.codec
.encode(packet_bytes, &mut bundled_packets)
.unwrap();
if bundled_packets.is_empty() {
return Ok(());
}
let bundled_packets = bundled_packets.freeze();
self.codec_timer.reset();
// let response_packet = IpPacketResponse::new_ip_packet(packet.into())
let response_packet = IpPacketResponse::new_ip_packet(bundled_packets)
.to_bytes()
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket { source: err })?;
let input_message = create_input_message(self.nym_address, response_packet, self.mix_hops);
self.mixnet_client_sender
.send(input_message)
.await
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
Ok(())
}
pub(crate) async fn run(mut self) -> Result<()> {
loop {
tokio::select! {
_ = &mut self.close_rx => {
log::warn!("ConnectedClientHandler: received shutdown");
break;
},
_ = self.codec_timer.tick() => {
if let Err(err) = self.flush_current_bundled_packets_and_send().await {
log::error!("connected client handler: failed to flush and send bundled packets: {err}");
}
},
packet = self.tun_rx.recv() => match packet {
Some(packet) => {
if let Err(err) = self.handle_packet(packet).await {
log::error!("connected client handler: failed to handle packet: {err}");
}
},
None => {
log::error!("connected client handler: tun channel closed");
break;
}
}
}
}
log::warn!("ConnectedClientHandler: exiting");
Ok(())
}
pub(crate) fn start(self) {
tokio::spawn(async move {
if let Err(err) = self.run().await {
log::error!("connected client handler has failed: {err}")
}
});
}
}
// Reads packet from TUN and writes to mixnet client
#[cfg(target_os = "linux")]
pub(crate) struct TunListener {
@@ -21,30 +162,90 @@ pub(crate) struct TunListener {
#[cfg(target_os = "linux")]
impl TunListener {
async fn handle_packet(&mut self, buf: &[u8], len: usize) -> Result<()> {
// async fn flush_current_bundled_packets_and_send(
// &mut self,
// bundled_packet_codec: &mut mixnet_listener::BundledIpPacketCodec,
// bundle_timer: &mut tokio::time::Interval,
// ) -> Result<()> {
// let mut bundled_packets = bundled_packet_codec.flush_current_buffer();
// if !bundled_packets.is_empty() {
// // TEMPORARY
// let Some(connect_client) = self.connected_clients.get_first() else {
// return Ok(());
// };
// let mixnet_listener::ConnectedClient {
// nym_address,
// mix_hops,
// ..
// } = connect_client;
//
// let response_packet = IpPacketResponse::new_ip_packet(bundled_packets)
// .to_bytes()
// .map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
// source: err,
// })?;
// let input_message = create_input_message(*nym_address, response_packet, *mix_hops);
//
// self.mixnet_client_sender
// .send(input_message)
// .await
// .map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
// }
// Ok(())
// }
async fn handle_packet(
&mut self,
buf: &[u8],
len: usize,
bundled_packet_codec: &mut mixnet_listener::BundledIpPacketCodec,
bundle_timer: &mut tokio::time::Interval,
) -> Result<()> {
let Some(dst_addr) = parse_dst_addr(&buf[..len]) else {
log::warn!("Failed to parse packet");
return Ok(());
};
if let Some(mixnet_listener::ConnectedClient {
if let Some(mixnet_listener::ConnectedClientMirror {
nym_address,
mix_hops,
..
last_activity: _last_activity,
forward_from_tun_tx,
}) = self.connected_clients.get(&dst_addr)
{
let packet = buf[..len].to_vec();
let response_packet = IpPacketResponse::new_ip_packet(packet.into())
.to_bytes()
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
source: err,
})?;
let input_message = create_input_message(*nym_address, response_packet, *mix_hops);
self.mixnet_client_sender
.send(input_message)
.await
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
forward_from_tun_tx.send(packet).unwrap();
// // If we are the first packet, start the timer
// if bundled_packet_codec.is_empty() {
// bundle_timer.reset();
// }
//
// // Bunch together
// let packet_bytes = Bytes::from(packet);
// let mut bundled_packets = BytesMut::new();
// bundled_packet_codec
// .encode(packet_bytes, &mut bundled_packets)
// .unwrap();
// if bundled_packets.is_empty() {
// return Ok(());
// }
// let bundled_packets = bundled_packets.freeze();
// bundle_timer.reset();
//
// // let response_packet = IpPacketResponse::new_ip_packet(packet.into())
// let response_packet = IpPacketResponse::new_ip_packet(bundled_packets)
// .to_bytes()
// .map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
// source: err,
// })?;
// let input_message = create_input_message(*nym_address, response_packet, *mix_hops);
//
// self.mixnet_client_sender
// .send(input_message)
// .await
// .map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
} else {
log::info!("No registered nym-address for packet - dropping");
}
@@ -54,6 +255,11 @@ impl TunListener {
async fn run(mut self) -> Result<()> {
let mut buf = [0u8; 65535];
let mut bundled_packet_codec = mixnet_listener::BundledIpPacketCodec::new();
// tokio timer for flushing the buffer
let mut bundle_timer = tokio::time::interval(Duration::from_millis(20));
while !self.task_client.is_shutdown() {
tokio::select! {
_ = self.task_client.recv() => {
@@ -67,9 +273,14 @@ impl TunListener {
break;
},
},
// _ = bundle_timer.tick() => {
// if let Err(err) = self.flush_current_bundled_packets_and_send(&mut bundled_packet_codec, &mut bundle_timer).await {
// log::error!("tun: failed to flush and send bundled packets: {err}");
// }
// },
len = self.tun_reader.read(&mut buf) => match len {
Ok(len) => {
if let Err(err) = self.handle_packet(&buf, len).await {
if let Err(err) = self.handle_packet(&buf, len, &mut bundled_packet_codec, &mut bundle_timer).await {
log::error!("tun: failed to handle packet: {err}");
}
},