Compare commits

...

6 Commits

Author SHA1 Message Date
Jon Häggblad a4b880ba4a More optimized parsing 2023-11-28 10:11:41 +01:00
Jon Häggblad 76d6c51f91 try a different parse_dst_addr 2023-11-28 09:59:15 +01:00
Jon Häggblad 3a3d854eb7 Try with split state 2023-11-28 09:42:55 +01:00
Jon Häggblad 748dd1faf3 Fix typo in log 2023-11-28 09:25:05 +01:00
Jon Häggblad c2cb343135 Fix connected clients 2023-11-28 08:56:20 +01:00
Jon Häggblad 0581d40bbf Initial split out tun listener: 2023-11-28 08:43:03 +01:00
5 changed files with 202 additions and 62 deletions
Generated
+1
View File
@@ -6619,6 +6619,7 @@ version = "0.1.0"
dependencies = [
"bincode",
"bytes",
"dashmap",
"etherparse",
"futures",
"log",
+53 -16
View File
@@ -180,10 +180,10 @@ impl TunDevice {
);
// TODO: expire old entries
#[allow(irrefutable_let_patterns)]
if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
nat_table.nat_table.insert(src_addr, tag);
}
// #[allow(irrefutable_let_patterns)]
// if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
// nat_table.nat_table.insert(src_addr, tag);
// }
timeout(
Duration::from_millis(TUN_WRITE_TIMEOUT_MS),
@@ -196,11 +196,13 @@ impl TunDevice {
// Receive reponse packets from the wild internet
async fn handle_tun_read(&self, packet: &[u8]) -> Result<(), TunDeviceError> {
let ParsedAddresses { src_addr, dst_addr } = parse_src_dst_address(packet)?;
log::debug!(
"iface: read Packet({dst_addr} <- {src_addr}, {} bytes)",
packet.len(),
);
// let ParsedAddresses { src_addr, dst_addr } = parse_src_dst_address(packet)?;
// log::debug!(
// "iface: read Packet({dst_addr} <- {src_addr}, {} bytes)",
// packet.len(),
// );
let dst_addr =
parse_dst_addr(packet).ok_or(TunDeviceError::UnableToParseAddressIpHeaderMissing)?;
// Route packet to the correct peer.
@@ -220,13 +222,14 @@ impl TunDevice {
// But we can also do it by consulting the NAT table.
RoutingMode::Nat(ref nat_table) => {
if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
log::debug!("Forward packet with NAT tag: {tag}");
return self
.tun_task_response_tx
.try_send((*tag, packet.to_vec()))
.map_err(|err| err.into());
}
// if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
// log::debug!("Forward packet with NAT tag: {tag}");
return self
.tun_task_response_tx
// .try_send((*tag, packet.to_vec()))
.try_send((0, packet.to_vec()))
.map_err(|err| err.into());
// }
}
}
@@ -287,3 +290,37 @@ fn parse_src_dst_address(packet: &[u8]) -> Result<ParsedAddresses, TunDeviceErro
None => Err(TunDeviceError::UnableToParseAddressIpHeaderMissing),
}
}
// Copyright (c) 2019 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause
const IPV4_MIN_HEADER_SIZE: usize = 20;
const IPV4_DST_IP_OFF: usize = 16;
const IPV4_IP_SZ: usize = 4;
const IPV6_MIN_HEADER_SIZE: usize = 40;
const IPV6_DST_IP_OFF: usize = 24;
const IPV6_IP_SZ: usize = 16;
pub fn parse_dst_addr(packet: &[u8]) -> Option<IpAddr> {
if packet.is_empty() {
return None;
}
match packet[0] >> 4 {
4 if packet.len() >= IPV4_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV4_IP_SZ] = packet
[IPV4_DST_IP_OFF..IPV4_DST_IP_OFF + IPV4_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
6 if packet.len() >= IPV6_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV6_IP_SZ] = packet
[IPV6_DST_IP_OFF..IPV6_DST_IP_OFF + IPV6_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
_ => None,
}
}
@@ -168,6 +168,7 @@ impl MixnetClient {
}
}
#[derive(Clone)]
pub struct MixnetClientSender {
client_input: ClientInput,
packet_type: Option<PacketType>,
@@ -11,6 +11,7 @@ license.workspace = true
[dependencies]
bincode = "1.3.3"
bytes = "1.5.0"
dashmap.workspace = true
etherparse = "0.13.0"
futures = { workspace = true }
log = { workspace = true }
+146 -46
View File
@@ -4,6 +4,7 @@ use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
path::Path,
sync::Arc,
time::Duration,
};
@@ -167,7 +168,7 @@ impl IpPacketRouterBuilder {
_config: self.config,
request_filter: request_filter.clone(),
tun_task_tx,
tun_task_response_rx,
tun_task_response_rx: Some(tun_task_response_rx),
mixnet_client,
task_handle,
connected_clients: Default::default(),
@@ -195,10 +196,9 @@ struct IpPacketRouter {
_config: Config,
request_filter: request_filter::RequestFilter,
tun_task_tx: nym_tun::tun_task_channel::TunTaskTx,
tun_task_response_rx: nym_tun::tun_task_channel::TunTaskResponseRx,
tun_task_response_rx: Option<nym_tun::tun_task_channel::TunTaskResponseRx>,
mixnet_client: nym_sdk::mixnet::MixnetClient,
task_handle: TaskHandle,
connected_clients: HashMap<IpAddr, ConnectedClient>,
}
@@ -212,6 +212,7 @@ impl IpPacketRouter {
async fn on_static_connect_request(
&mut self,
connect_request: nym_ip_packet_requests::StaticConnectRequest,
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::info!(
"Received static connect request from {sender_address}",
@@ -225,10 +226,16 @@ impl IpPacketRouter {
// Check that the IP is available in the set of connected clients
let is_ip_taken = self.connected_clients.contains_key(&requested_ip);
// let is_ip_taken = connected_clients
// .lock()
// .unwrap()
// .contains_key(&requested_ip);
// Check that the nym address isn't already registered
let is_nym_address_taken = self
.connected_clients
// .lock()
// .unwrap()
.values()
.any(|client| client.nym_address == reply_to);
@@ -254,6 +261,10 @@ impl IpPacketRouter {
last_activity: std::time::Instant::now(),
},
);
connected_client_tx
.send(ConnectedClientEvent::Connect(requested_ip, reply_to))
.await
.unwrap();
Ok(Some(IpPacketResponse::new_static_connect_success(
request_id, reply_to,
)))
@@ -280,6 +291,7 @@ impl IpPacketRouter {
async fn on_dynamic_connect_request(
&mut self,
connect_request: nym_ip_packet_requests::DynamicConnectRequest,
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::info!(
"Received dynamic connect request from {sender_address}",
@@ -332,6 +344,10 @@ impl IpPacketRouter {
last_activity: std::time::Instant::now(),
},
);
connected_client_tx
.send(ConnectedClientEvent::Connect(new_ip, reply_to))
.await
.unwrap();
Ok(Some(IpPacketResponse::new_dynamic_connect_success(
request_id, reply_to, new_ip,
)))
@@ -393,6 +409,7 @@ impl IpPacketRouter {
async fn on_reconstructed_message(
&mut self,
reconstructed: ReconstructedMessage,
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
log::debug!(
"Received message with sender_tag: {:?}",
@@ -414,10 +431,12 @@ impl IpPacketRouter {
match request.data {
IpPacketRequestData::StaticConnect(connect_request) => {
self.on_static_connect_request(connect_request).await
self.on_static_connect_request(connect_request, connected_client_tx)
.await
}
IpPacketRequestData::DynamicConnect(connect_request) => {
self.on_dynamic_connect_request(connect_request).await
self.on_dynamic_connect_request(connect_request, connected_client_tx)
.await
}
IpPacketRequestData::Data(data_request) => self.on_data_request(data_request).await,
}
@@ -427,6 +446,84 @@ impl IpPacketRouter {
let mut task_client = self.task_handle.fork("main_loop");
let mut disconnect_timer = tokio::time::interval(DISCONNECT_TIMER_INTERVAL);
let mixnet_client_sender = self.mixnet_client.split_sender();
let mixnet_client_sender_clone = mixnet_client_sender.clone();
// let connected_clients = Arc::new(std::sync::Mutex::new(
// HashMap::<IpAddr, ConnectedClient>::new(),
// ));
// let connected_clients_clone = connected_clients.clone();
let tun_task_response_rx = self.tun_task_response_rx.take();
let (connected_client_tx, mut connected_client_rx) = tokio::sync::mpsc::channel(16);
// Spawn TUN listener
tokio::spawn(async move {
let mut connected_clients = HashMap::<IpAddr, ConnectedClient>::new();
let mut tun_task_response_rx = tun_task_response_rx.unwrap();
loop {
tokio::select! {
connected_client_event = connected_client_rx.recv() => {
match connected_client_event {
Some(ConnectedClientEvent::Connect(ip, nym_addr)) => {
log::trace!("Connect client: {ip}");
connected_clients.insert(ip, ConnectedClient {
nym_address: nym_addr,
last_activity: std::time::Instant::now(),
});
},
Some(ConnectedClientEvent::Disconnect(ip)) => {
log::trace!("Disconnect client: {ip}");
connected_clients.remove(&ip);
},
None => {},
}
},
packet = tun_task_response_rx.recv() => {
if let Some((_tag, packet)) = packet {
// TODO: skip full parsing since we only need dst_addr
// let Ok(ParsedPacket {
// packet_type: _,
// src_addr: _,
// dst_addr,
// dst: _,
// }) = parse_packet(&packet) else {
// log::warn!("Failed to parse packet");
// continue;
// };
let Some(dst_addr) = parse_dst_addr(&packet) else {
log::warn!("Failed to parse packet");
continue;
};
let recipient = connected_clients.get(&dst_addr).map(|c| c.nym_address);
if let Some(recipient) = recipient {
let lane = TransmissionLane::General;
let packet_type = None;
let response_packet = IpPacketResponse::new_ip_packet(packet.into()).to_bytes();
let Ok(response_packet) = response_packet else {
log::error!("Failed to serialize response packet");
continue;
};
let input_message = InputMessage::new_regular(recipient, response_packet, lane, packet_type);
if let Err(err) = mixnet_client_sender_clone.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
} else {
log::error!("IpPacketRouter [main loop]: no nym-address recipient for packet");
}
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
}
}
}
}
});
while !task_client.is_shutdown() {
tokio::select! {
_ = task_client.recv() => {
@@ -446,11 +543,12 @@ impl IpPacketRouter {
for ip in inactive_clients {
log::info!("Disconnect inactive client: {ip}");
self.connected_clients.remove(&ip);
connected_client_tx.send(ConnectedClientEvent::Disconnect(ip)).await.unwrap();
}
},
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
match self.on_reconstructed_message(msg).await {
match self.on_reconstructed_message(msg, &connected_client_tx).await {
Ok(Some(response)) => {
let Some(recipient) = response.recipient() else {
log::error!("IpPacketRouter [main loop]: failed to get recipient from response");
@@ -464,7 +562,7 @@ impl IpPacketRouter {
let lane = TransmissionLane::General;
let packet_type = None;
let input_message = InputMessage::new_regular(*recipient, response_packet, lane, packet_type);
if let Err(err) = self.mixnet_client.send(input_message).await {
if let Err(err) = mixnet_client_sender.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
},
@@ -474,49 +572,12 @@ impl IpPacketRouter {
Err(err) => {
log::error!("Error handling mixnet message: {err}");
}
};
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
};
},
packet = self.tun_task_response_rx.recv() => {
if let Some((_tag, packet)) = packet {
// TODO: skip full parsing since we only need dst_addr
let Ok(ParsedPacket {
packet_type: _,
src_addr: _,
dst_addr,
dst: _,
}) = parse_packet(&packet) else {
log::warn!("Failed to parse packet");
continue;
};
let recipient = self.connected_clients.get(&dst_addr).map(|c| c.nym_address);
if let Some(recipient) = recipient {
let lane = TransmissionLane::General;
let packet_type = None;
let response_packet = IpPacketResponse::new_ip_packet(packet.into()).to_bytes();
let Ok(response_packet) = response_packet else {
log::error!("Failed to serialize response packet");
continue;
};
let input_message = InputMessage::new_regular(recipient, response_packet, lane, packet_type);
if let Err(err) = self.mixnet_client.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
} else {
log::error!("IpPacketRouter [main loop]: no nym-address recipient for packet");
}
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
}
}
}
}
@@ -525,6 +586,11 @@ impl IpPacketRouter {
}
}
enum ConnectedClientEvent {
Disconnect(IpAddr),
Connect(IpAddr, Recipient),
}
struct ParsedPacket<'a> {
packet_type: &'a str,
src_addr: IpAddr,
@@ -539,8 +605,8 @@ fn parse_packet(packet: &[u8]) -> Result<ParsedPacket, IpPacketRouterError> {
})?;
let (packet_type, dst_port) = match headers.transport {
Some(etherparse::TransportSlice::Udp(header)) => ("ipv4", Some(header.destination_port())),
Some(etherparse::TransportSlice::Tcp(header)) => ("ipv6", Some(header.destination_port())),
Some(etherparse::TransportSlice::Udp(header)) => ("udp", Some(header.destination_port())),
Some(etherparse::TransportSlice::Tcp(header)) => ("tcp", Some(header.destination_port())),
Some(etherparse::TransportSlice::Icmpv4(_)) => ("icmpv4", None),
Some(etherparse::TransportSlice::Icmpv6(_)) => ("icmpv6", None),
Some(etherparse::TransportSlice::Unknown(_)) => ("unknown", None),
@@ -620,3 +686,37 @@ async fn create_mixnet_client(
.await
.map_err(|err| IpPacketRouterError::FailedToConnectToMixnet { source: err })
}
// Copyright (c) 2019 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause
const IPV4_MIN_HEADER_SIZE: usize = 20;
const IPV4_DST_IP_OFF: usize = 16;
const IPV4_IP_SZ: usize = 4;
const IPV6_MIN_HEADER_SIZE: usize = 40;
const IPV6_DST_IP_OFF: usize = 24;
const IPV6_IP_SZ: usize = 16;
pub fn parse_dst_addr(packet: &[u8]) -> Option<IpAddr> {
if packet.is_empty() {
return None;
}
match packet[0] >> 4 {
4 if packet.len() >= IPV4_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV4_IP_SZ] = packet
[IPV4_DST_IP_OFF..IPV4_DST_IP_OFF + IPV4_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
6 if packet.len() >= IPV6_MIN_HEADER_SIZE => {
let addr_bytes: [u8; IPV6_IP_SZ] = packet
[IPV6_DST_IP_OFF..IPV6_DST_IP_OFF + IPV6_IP_SZ]
.try_into()
.unwrap();
Some(IpAddr::from(addr_bytes))
}
_ => None,
}
}