Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f62bc5addb | |||
| 0d4254f65a | |||
| 25e35384d9 | |||
| e4479f2655 | |||
| deef1092a8 | |||
| efaf55c1b3 |
Generated
+1
@@ -6704,6 +6704,7 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-tun",
|
||||
"tokio-util",
|
||||
"url",
|
||||
]
|
||||
|
||||
|
||||
@@ -168,6 +168,7 @@ impl MixnetClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MixnetClientSender {
|
||||
client_input: ClientInput,
|
||||
packet_type: Option<PacketType>,
|
||||
|
||||
@@ -40,6 +40,7 @@ serde_json = { workspace = true }
|
||||
tap.workspace = true
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
url.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
|
||||
@@ -9,7 +9,7 @@ mod constants;
|
||||
pub mod error;
|
||||
mod ip_packet_router;
|
||||
mod mixnet_client;
|
||||
mod mixnet_listener;
|
||||
pub mod mixnet_listener;
|
||||
pub mod request_filter;
|
||||
mod tun_listener;
|
||||
mod util;
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::{
|
||||
net::{IpAddr, SocketAddr},
|
||||
};
|
||||
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use futures::StreamExt;
|
||||
use nym_ip_packet_requests::{
|
||||
DynamicConnectFailureReason, IpPacketRequest, IpPacketRequestData, IpPacketResponse,
|
||||
@@ -13,6 +14,7 @@ use nym_sphinx::receiver::ReconstructedMessage;
|
||||
use nym_task::TaskHandle;
|
||||
#[cfg(target_os = "linux")]
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
use crate::{
|
||||
config::Config,
|
||||
@@ -26,6 +28,80 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
// Tokio codec for bundling multiple IP packets into one buffer that is at most 1500 bytes long.
|
||||
// These packets are separated by a 2 byte length prefix.
|
||||
pub struct BundledIpPacketCodec {
|
||||
buffer: BytesMut,
|
||||
}
|
||||
|
||||
impl BundledIpPacketCodec {
|
||||
pub fn new() -> Self {
|
||||
BundledIpPacketCodec {
|
||||
buffer: BytesMut::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn flush_current_buffer(&mut self) -> Bytes {
|
||||
let mut buffer_so_far = BytesMut::new();
|
||||
// TODO: is it possible to move the buffer instead of copying it?
|
||||
buffer_so_far.extend_from_slice(&self.buffer);
|
||||
self.buffer = BytesMut::new();
|
||||
buffer_so_far.freeze()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.buffer.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder<Bytes> for BundledIpPacketCodec {
|
||||
type Error = IpPacketRouterError;
|
||||
|
||||
fn encode(&mut self, packet: Bytes, dst: &mut BytesMut) -> Result<()> {
|
||||
let packet_size = packet.len();
|
||||
|
||||
if self.buffer.len() + packet_size + 2 > 1500 {
|
||||
// If the packet doesn't fit in the buffer, send the buffer and then add it to the buffer
|
||||
dst.extend_from_slice(&self.buffer);
|
||||
self.buffer = BytesMut::new();
|
||||
}
|
||||
|
||||
// Add the packet to the buffer
|
||||
self.buffer
|
||||
.extend_from_slice(&(packet_size as u16).to_be_bytes());
|
||||
self.buffer.extend_from_slice(&packet);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder for BundledIpPacketCodec {
|
||||
type Item = Bytes;
|
||||
type Error = IpPacketRouterError;
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
|
||||
if src.len() < 2 {
|
||||
// Not enough bytes to read the length prefix
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let packet_size = u16::from_be_bytes([src[0], src[1]]) as usize;
|
||||
|
||||
if src.len() < packet_size + 2 {
|
||||
// Not enough bytes to read the packet
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Remove the length prefix
|
||||
src.advance(2);
|
||||
|
||||
// Read the packet
|
||||
let packet = src.split_to(packet_size);
|
||||
|
||||
Ok(Some(packet.freeze()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(crate) struct MixnetListener {
|
||||
pub(crate) _config: Config,
|
||||
@@ -41,13 +117,14 @@ pub(crate) struct ConnectedClients {
|
||||
connected_client_tx: tokio::sync::mpsc::UnboundedSender<ConnectedClientEvent>,
|
||||
}
|
||||
|
||||
// TODO: move this to the tun_listener module?
|
||||
pub(crate) struct ConnectedClientsListener {
|
||||
clients: HashMap<IpAddr, ConnectedClient>,
|
||||
clients: HashMap<IpAddr, ConnectedClientMirror>,
|
||||
pub(crate) connected_client_rx: tokio::sync::mpsc::UnboundedReceiver<ConnectedClientEvent>,
|
||||
}
|
||||
|
||||
impl ConnectedClientsListener {
|
||||
pub(crate) fn get(&self, ip: &IpAddr) -> Option<&ConnectedClient> {
|
||||
pub(crate) fn get(&self, ip: &IpAddr) -> Option<&ConnectedClientMirror> {
|
||||
self.clients.get(ip)
|
||||
}
|
||||
|
||||
@@ -58,14 +135,16 @@ impl ConnectedClientsListener {
|
||||
ip,
|
||||
nym_address,
|
||||
mix_hops,
|
||||
forward_from_tun_tx,
|
||||
} = *connected_event;
|
||||
log::trace!("Connect client: {ip}");
|
||||
self.clients.insert(
|
||||
ip,
|
||||
ConnectedClient {
|
||||
ConnectedClientMirror {
|
||||
nym_address,
|
||||
mix_hops,
|
||||
last_activity: std::time::Instant::now(),
|
||||
forward_from_tun_tx,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -75,6 +154,11 @@ impl ConnectedClientsListener {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TEMP
|
||||
// pub(crate) fn get_first(&self) -> Option<&ConnectedClientMirror> {
|
||||
// self.clients.values().next()
|
||||
// }
|
||||
}
|
||||
|
||||
impl ConnectedClients {
|
||||
@@ -118,20 +202,33 @@ impl ConnectedClients {
|
||||
.find(|client| client.nym_address == *nym_address)
|
||||
}
|
||||
|
||||
fn connect(&mut self, ip: IpAddr, nym_address: Recipient, mix_hops: Option<u8>) {
|
||||
fn connect(
|
||||
&mut self,
|
||||
ip: IpAddr,
|
||||
nym_address: Recipient,
|
||||
mix_hops: Option<u8>,
|
||||
forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
|
||||
close_tx: tokio::sync::oneshot::Sender<()>,
|
||||
) {
|
||||
// The map of connected clients that the mixnet listener keeps track of. It monitors
|
||||
// activity and disconnects clients that have been inactive for too long.
|
||||
self.clients.insert(
|
||||
ip,
|
||||
ConnectedClient {
|
||||
nym_address,
|
||||
mix_hops,
|
||||
last_activity: std::time::Instant::now(),
|
||||
close_tx: Some(close_tx),
|
||||
},
|
||||
);
|
||||
// Send the connected client info to the tun listener, which will use it to forward packets
|
||||
// to the connected client handler.
|
||||
self.connected_client_tx
|
||||
.send(ConnectedClientEvent::Connect(Box::new(ConnectEvent {
|
||||
ip,
|
||||
nym_address,
|
||||
mix_hops,
|
||||
forward_from_tun_tx,
|
||||
})))
|
||||
.unwrap();
|
||||
}
|
||||
@@ -160,6 +257,7 @@ impl ConnectedClients {
|
||||
.collect();
|
||||
for ip in inactive_clients {
|
||||
log::info!("Disconnect inactive client: {ip}");
|
||||
// TODO: confirm this also stops the connected client handler
|
||||
self.clients.remove(&ip);
|
||||
self.connected_client_tx
|
||||
.send(ConnectedClientEvent::Disconnect(DisconnectEvent(ip)))
|
||||
@@ -176,6 +274,31 @@ pub(crate) struct ConnectedClient {
|
||||
pub(crate) nym_address: Recipient,
|
||||
pub(crate) mix_hops: Option<u8>,
|
||||
pub(crate) last_activity: std::time::Instant,
|
||||
// Send to connected clients listener to stop
|
||||
// This is inside an Option only because we want to send in Drop
|
||||
pub(crate) close_tx: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
// Forward to the connected clients listener packets that we have read from the TUN
|
||||
// pub(crate) tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl Drop for ConnectedClient {
|
||||
fn drop(&mut self) {
|
||||
log::info!("Dropping connected client: {}", self.nym_address);
|
||||
if let Some(close_tx) = self.close_tx.take() {
|
||||
log::info!("Sending close signal to connected client handler");
|
||||
close_tx.send(()).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ConnectedClientMirror {
|
||||
pub(crate) nym_address: Recipient,
|
||||
pub(crate) mix_hops: Option<u8>,
|
||||
pub(crate) last_activity: std::time::Instant,
|
||||
// Send to connected clients listener to stop
|
||||
// pub(crate) close_tx: tokio::sync::oneshot::Sender<()>,
|
||||
// Forward to the connected clients listener packets that we have read from the TUN
|
||||
pub(crate) forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
@@ -217,8 +340,27 @@ impl MixnetListener {
|
||||
}
|
||||
(false, false) => {
|
||||
log::info!("Connecting a new client");
|
||||
self.connected_clients
|
||||
.connect(requested_ip, reply_to, reply_to_hops);
|
||||
|
||||
// Spawn the ConnectedClientHandler task
|
||||
let (close_tx, close_rx) = tokio::sync::oneshot::channel();
|
||||
let (forward_from_tun_tx, forward_from_tun_rx) =
|
||||
tokio::sync::mpsc::unbounded_channel();
|
||||
let connected_client_handler = crate::tun_listener::ConnectedClientHandler::new(
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
forward_from_tun_rx,
|
||||
self.mixnet_client.split_sender(),
|
||||
close_rx,
|
||||
);
|
||||
connected_client_handler.start();
|
||||
|
||||
self.connected_clients.connect(
|
||||
requested_ip,
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
forward_from_tun_tx,
|
||||
close_tx,
|
||||
);
|
||||
Ok(Some(IpPacketResponse::new_static_connect_success(
|
||||
request_id, reply_to,
|
||||
)))
|
||||
@@ -285,8 +427,25 @@ impl MixnetListener {
|
||||
)));
|
||||
};
|
||||
|
||||
self.connected_clients
|
||||
.connect(new_ip, reply_to, reply_to_hops);
|
||||
// Spawn ConnectedClientHandler
|
||||
let (close_tx, close_rx) = tokio::sync::oneshot::channel();
|
||||
let (forward_from_tun_tx, forward_from_tun_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let connected_client_handler = crate::tun_listener::ConnectedClientHandler::new(
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
forward_from_tun_rx,
|
||||
self.mixnet_client.split_sender(),
|
||||
close_rx,
|
||||
);
|
||||
connected_client_handler.start();
|
||||
|
||||
self.connected_clients.connect(
|
||||
new_ip,
|
||||
reply_to,
|
||||
reply_to_hops,
|
||||
forward_from_tun_tx,
|
||||
close_tx,
|
||||
);
|
||||
Ok(Some(IpPacketResponse::new_dynamic_connect_success(
|
||||
request_id, reply_to, new_ip,
|
||||
)))
|
||||
@@ -298,6 +457,20 @@ impl MixnetListener {
|
||||
) -> Result<Option<IpPacketResponse>> {
|
||||
log::trace!("Received data request");
|
||||
|
||||
let mut codec = BundledIpPacketCodec::new();
|
||||
// convert from Bytes to BytesMut
|
||||
let mut bytes = BytesMut::new();
|
||||
bytes.extend_from_slice(&data_request.ip_packet);
|
||||
// let mut bytes = BytesMut::new(data_request.ip_packet.clone());
|
||||
while let Ok(Some(p)) = codec.decode(&mut bytes) {
|
||||
if let Err(err) = self.handle_packet(&p).await {
|
||||
log::error!("mixnet_listener: failed to handle packet: {err}");
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn handle_packet(&mut self, ip_packet: &Bytes) -> Result<Option<IpPacketRouterError>> {
|
||||
// We don't forward packets that we are not able to parse. BUT, there might be a good
|
||||
// reason to still forward them.
|
||||
//
|
||||
@@ -310,7 +483,8 @@ impl MixnetListener {
|
||||
src_addr,
|
||||
dst_addr,
|
||||
dst,
|
||||
} = parse_packet(&data_request.ip_packet)?;
|
||||
} = parse_packet(&ip_packet)?;
|
||||
// } = parse_packet(&data_request.ip_packet)?;
|
||||
|
||||
let dst_str = dst.map_or(dst_addr.to_string(), |dst| dst.to_string());
|
||||
log::info!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
|
||||
@@ -331,7 +505,8 @@ impl MixnetListener {
|
||||
}
|
||||
|
||||
// TODO: consider changing from Vec<u8> to bytes::Bytes?
|
||||
let packet = data_request.ip_packet;
|
||||
// let packet = data_request.ip_packet;
|
||||
let packet = ip_packet;
|
||||
self.tun_writer
|
||||
.write_all(&packet)
|
||||
.await
|
||||
@@ -450,4 +625,5 @@ pub(crate) struct ConnectEvent {
|
||||
pub(crate) ip: IpAddr,
|
||||
pub(crate) nym_address: Recipient,
|
||||
pub(crate) mix_hops: Option<u8>,
|
||||
pub(crate) forward_from_tun_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
|
||||
}
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use nym_ip_packet_requests::IpPacketResponse;
|
||||
use nym_sdk::mixnet::MixnetMessageSender;
|
||||
use nym_sdk::mixnet::{MixnetMessageSender, Recipient};
|
||||
use nym_task::TaskClient;
|
||||
#[cfg(target_os = "linux")]
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
use crate::{
|
||||
error::{IpPacketRouterError, Result},
|
||||
@@ -10,6 +14,143 @@ use crate::{
|
||||
util::{create_message::create_input_message, parse_ip::parse_dst_addr},
|
||||
};
|
||||
|
||||
// Data flow is
|
||||
// mixnet_listener -> decode -> handle_packet -> write_to_tun
|
||||
// tun_listener -> (task: send to connected client handler for processing -> encode) -> mixnet_sender
|
||||
// This handler is spawned as a task, and it listens to IP packets passed from the tun_listener,
|
||||
// encodes it, and then sends to mixnet.
|
||||
pub(crate) struct ConnectedClientHandler {
|
||||
nym_address: Recipient,
|
||||
mix_hops: Option<u8>,
|
||||
tun_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
|
||||
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
codec: mixnet_listener::BundledIpPacketCodec,
|
||||
codec_timer: tokio::time::Interval,
|
||||
close_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
impl ConnectedClientHandler {
|
||||
pub(crate) fn new(
|
||||
nym_address: Recipient,
|
||||
mix_hops: Option<u8>,
|
||||
tun_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
|
||||
mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender,
|
||||
close_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Self {
|
||||
Self {
|
||||
nym_address,
|
||||
mix_hops,
|
||||
tun_rx,
|
||||
mixnet_client_sender,
|
||||
codec: mixnet_listener::BundledIpPacketCodec::new(),
|
||||
codec_timer: tokio::time::interval(Duration::from_millis(20)),
|
||||
close_rx,
|
||||
}
|
||||
}
|
||||
|
||||
async fn flush_current_bundled_packets_and_send(
|
||||
&mut self,
|
||||
// bundled_packet_codec: &mut mixnet_listener::BundledIpPacketCodec,
|
||||
// bundle_timer: &mut tokio::time::Interval,
|
||||
) -> Result<()> {
|
||||
let mut bundled_packets = self.codec.flush_current_buffer();
|
||||
if !bundled_packets.is_empty() {
|
||||
// // TEMPORARY
|
||||
// let Some(connect_client) = self.connected_clients.get_first() else {
|
||||
// return Ok(());
|
||||
// };
|
||||
// let mixnet_listener::ConnectedClient {
|
||||
// nym_address,
|
||||
// mix_hops,
|
||||
// ..
|
||||
// } = connect_client;
|
||||
|
||||
let response_packet = IpPacketResponse::new_ip_packet(bundled_packets)
|
||||
.to_bytes()
|
||||
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
|
||||
source: err,
|
||||
})?;
|
||||
let input_message =
|
||||
create_input_message(self.nym_address, response_packet, self.mix_hops);
|
||||
|
||||
self.mixnet_client_sender
|
||||
.send(input_message)
|
||||
.await
|
||||
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_packet(&mut self, packet: Vec<u8>) -> Result<()> {
|
||||
// If we are the first packet, start the timer
|
||||
if self.codec.is_empty() {
|
||||
self.codec_timer.reset();
|
||||
}
|
||||
|
||||
// Bunch together
|
||||
let packet_bytes = Bytes::from(packet);
|
||||
let mut bundled_packets = BytesMut::new();
|
||||
self.codec
|
||||
.encode(packet_bytes, &mut bundled_packets)
|
||||
.unwrap();
|
||||
if bundled_packets.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let bundled_packets = bundled_packets.freeze();
|
||||
self.codec_timer.reset();
|
||||
|
||||
// let response_packet = IpPacketResponse::new_ip_packet(packet.into())
|
||||
let response_packet = IpPacketResponse::new_ip_packet(bundled_packets)
|
||||
.to_bytes()
|
||||
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket { source: err })?;
|
||||
let input_message = create_input_message(self.nym_address, response_packet, self.mix_hops);
|
||||
|
||||
self.mixnet_client_sender
|
||||
.send(input_message)
|
||||
.await
|
||||
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn run(mut self) -> Result<()> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut self.close_rx => {
|
||||
log::warn!("ConnectedClientHandler: received shutdown");
|
||||
break;
|
||||
},
|
||||
_ = self.codec_timer.tick() => {
|
||||
if let Err(err) = self.flush_current_bundled_packets_and_send().await {
|
||||
log::error!("connected client handler: failed to flush and send bundled packets: {err}");
|
||||
}
|
||||
},
|
||||
packet = self.tun_rx.recv() => match packet {
|
||||
Some(packet) => {
|
||||
if let Err(err) = self.handle_packet(packet).await {
|
||||
log::error!("connected client handler: failed to handle packet: {err}");
|
||||
}
|
||||
},
|
||||
None => {
|
||||
log::error!("connected client handler: tun channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log::warn!("ConnectedClientHandler: exiting");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn start(self) {
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = self.run().await {
|
||||
log::error!("connected client handler has failed: {err}")
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Reads packet from TUN and writes to mixnet client
|
||||
#[cfg(target_os = "linux")]
|
||||
pub(crate) struct TunListener {
|
||||
@@ -21,30 +162,90 @@ pub(crate) struct TunListener {
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
impl TunListener {
|
||||
async fn handle_packet(&mut self, buf: &[u8], len: usize) -> Result<()> {
|
||||
// async fn flush_current_bundled_packets_and_send(
|
||||
// &mut self,
|
||||
// bundled_packet_codec: &mut mixnet_listener::BundledIpPacketCodec,
|
||||
// bundle_timer: &mut tokio::time::Interval,
|
||||
// ) -> Result<()> {
|
||||
// let mut bundled_packets = bundled_packet_codec.flush_current_buffer();
|
||||
// if !bundled_packets.is_empty() {
|
||||
// // TEMPORARY
|
||||
// let Some(connect_client) = self.connected_clients.get_first() else {
|
||||
// return Ok(());
|
||||
// };
|
||||
// let mixnet_listener::ConnectedClient {
|
||||
// nym_address,
|
||||
// mix_hops,
|
||||
// ..
|
||||
// } = connect_client;
|
||||
//
|
||||
// let response_packet = IpPacketResponse::new_ip_packet(bundled_packets)
|
||||
// .to_bytes()
|
||||
// .map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
|
||||
// source: err,
|
||||
// })?;
|
||||
// let input_message = create_input_message(*nym_address, response_packet, *mix_hops);
|
||||
//
|
||||
// self.mixnet_client_sender
|
||||
// .send(input_message)
|
||||
// .await
|
||||
// .map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
|
||||
// }
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
async fn handle_packet(
|
||||
&mut self,
|
||||
buf: &[u8],
|
||||
len: usize,
|
||||
bundled_packet_codec: &mut mixnet_listener::BundledIpPacketCodec,
|
||||
bundle_timer: &mut tokio::time::Interval,
|
||||
) -> Result<()> {
|
||||
let Some(dst_addr) = parse_dst_addr(&buf[..len]) else {
|
||||
log::warn!("Failed to parse packet");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if let Some(mixnet_listener::ConnectedClient {
|
||||
if let Some(mixnet_listener::ConnectedClientMirror {
|
||||
nym_address,
|
||||
mix_hops,
|
||||
..
|
||||
last_activity: _last_activity,
|
||||
forward_from_tun_tx,
|
||||
}) = self.connected_clients.get(&dst_addr)
|
||||
{
|
||||
let packet = buf[..len].to_vec();
|
||||
let response_packet = IpPacketResponse::new_ip_packet(packet.into())
|
||||
.to_bytes()
|
||||
.map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
|
||||
source: err,
|
||||
})?;
|
||||
let input_message = create_input_message(*nym_address, response_packet, *mix_hops);
|
||||
|
||||
self.mixnet_client_sender
|
||||
.send(input_message)
|
||||
.await
|
||||
.map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
|
||||
forward_from_tun_tx.send(packet).unwrap();
|
||||
|
||||
// // If we are the first packet, start the timer
|
||||
// if bundled_packet_codec.is_empty() {
|
||||
// bundle_timer.reset();
|
||||
// }
|
||||
//
|
||||
// // Bunch together
|
||||
// let packet_bytes = Bytes::from(packet);
|
||||
// let mut bundled_packets = BytesMut::new();
|
||||
// bundled_packet_codec
|
||||
// .encode(packet_bytes, &mut bundled_packets)
|
||||
// .unwrap();
|
||||
// if bundled_packets.is_empty() {
|
||||
// return Ok(());
|
||||
// }
|
||||
// let bundled_packets = bundled_packets.freeze();
|
||||
// bundle_timer.reset();
|
||||
//
|
||||
// // let response_packet = IpPacketResponse::new_ip_packet(packet.into())
|
||||
// let response_packet = IpPacketResponse::new_ip_packet(bundled_packets)
|
||||
// .to_bytes()
|
||||
// .map_err(|err| IpPacketRouterError::FailedToSerializeResponsePacket {
|
||||
// source: err,
|
||||
// })?;
|
||||
// let input_message = create_input_message(*nym_address, response_packet, *mix_hops);
|
||||
//
|
||||
// self.mixnet_client_sender
|
||||
// .send(input_message)
|
||||
// .await
|
||||
// .map_err(|err| IpPacketRouterError::FailedToSendPacketToMixnet { source: err })?;
|
||||
} else {
|
||||
log::info!("No registered nym-address for packet - dropping");
|
||||
}
|
||||
@@ -54,6 +255,11 @@ impl TunListener {
|
||||
|
||||
async fn run(mut self) -> Result<()> {
|
||||
let mut buf = [0u8; 65535];
|
||||
|
||||
let mut bundled_packet_codec = mixnet_listener::BundledIpPacketCodec::new();
|
||||
// tokio timer for flushing the buffer
|
||||
let mut bundle_timer = tokio::time::interval(Duration::from_millis(20));
|
||||
|
||||
while !self.task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
_ = self.task_client.recv() => {
|
||||
@@ -67,9 +273,14 @@ impl TunListener {
|
||||
break;
|
||||
},
|
||||
},
|
||||
// _ = bundle_timer.tick() => {
|
||||
// if let Err(err) = self.flush_current_bundled_packets_and_send(&mut bundled_packet_codec, &mut bundle_timer).await {
|
||||
// log::error!("tun: failed to flush and send bundled packets: {err}");
|
||||
// }
|
||||
// },
|
||||
len = self.tun_reader.read(&mut buf) => match len {
|
||||
Ok(len) => {
|
||||
if let Err(err) = self.handle_packet(&buf, len).await {
|
||||
if let Err(err) = self.handle_packet(&buf, len, &mut bundled_packet_codec, &mut bundle_timer).await {
|
||||
log::error!("tun: failed to handle packet: {err}");
|
||||
}
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user