Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a4b880ba4a | |||
| 76d6c51f91 | |||
| 3a3d854eb7 | |||
| 748dd1faf3 | |||
| c2cb343135 | |||
| 0581d40bbf |
Generated
+1
@@ -6619,6 +6619,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"bytes",
|
||||
"dashmap",
|
||||
"etherparse",
|
||||
"futures",
|
||||
"log",
|
||||
|
||||
@@ -180,10 +180,10 @@ impl TunDevice {
|
||||
);
|
||||
|
||||
// TODO: expire old entries
|
||||
#[allow(irrefutable_let_patterns)]
|
||||
if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
|
||||
nat_table.nat_table.insert(src_addr, tag);
|
||||
}
|
||||
// #[allow(irrefutable_let_patterns)]
|
||||
// if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
|
||||
// nat_table.nat_table.insert(src_addr, tag);
|
||||
// }
|
||||
|
||||
timeout(
|
||||
Duration::from_millis(TUN_WRITE_TIMEOUT_MS),
|
||||
@@ -196,11 +196,13 @@ impl TunDevice {
|
||||
|
||||
// Receive reponse packets from the wild internet
|
||||
async fn handle_tun_read(&self, packet: &[u8]) -> Result<(), TunDeviceError> {
|
||||
let ParsedAddresses { src_addr, dst_addr } = parse_src_dst_address(packet)?;
|
||||
log::debug!(
|
||||
"iface: read Packet({dst_addr} <- {src_addr}, {} bytes)",
|
||||
packet.len(),
|
||||
);
|
||||
// let ParsedAddresses { src_addr, dst_addr } = parse_src_dst_address(packet)?;
|
||||
// log::debug!(
|
||||
// "iface: read Packet({dst_addr} <- {src_addr}, {} bytes)",
|
||||
// packet.len(),
|
||||
// );
|
||||
let dst_addr =
|
||||
parse_dst_addr(packet).ok_or(TunDeviceError::UnableToParseAddressIpHeaderMissing)?;
|
||||
|
||||
// Route packet to the correct peer.
|
||||
|
||||
@@ -220,13 +222,14 @@ impl TunDevice {
|
||||
|
||||
// But we can also do it by consulting the NAT table.
|
||||
RoutingMode::Nat(ref nat_table) => {
|
||||
if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
|
||||
log::debug!("Forward packet with NAT tag: {tag}");
|
||||
return self
|
||||
.tun_task_response_tx
|
||||
.try_send((*tag, packet.to_vec()))
|
||||
.map_err(|err| err.into());
|
||||
}
|
||||
// if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
|
||||
// log::debug!("Forward packet with NAT tag: {tag}");
|
||||
return self
|
||||
.tun_task_response_tx
|
||||
// .try_send((*tag, packet.to_vec()))
|
||||
.try_send((0, packet.to_vec()))
|
||||
.map_err(|err| err.into());
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,3 +290,37 @@ fn parse_src_dst_address(packet: &[u8]) -> Result<ParsedAddresses, TunDeviceErro
|
||||
None => Err(TunDeviceError::UnableToParseAddressIpHeaderMissing),
|
||||
}
|
||||
}
|
||||
|
||||
// Copyright (c) 2019 Cloudflare, Inc. All rights reserved.
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
const IPV4_MIN_HEADER_SIZE: usize = 20;
|
||||
const IPV4_DST_IP_OFF: usize = 16;
|
||||
const IPV4_IP_SZ: usize = 4;
|
||||
|
||||
const IPV6_MIN_HEADER_SIZE: usize = 40;
|
||||
const IPV6_DST_IP_OFF: usize = 24;
|
||||
const IPV6_IP_SZ: usize = 16;
|
||||
|
||||
pub fn parse_dst_addr(packet: &[u8]) -> Option<IpAddr> {
|
||||
if packet.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
match packet[0] >> 4 {
|
||||
4 if packet.len() >= IPV4_MIN_HEADER_SIZE => {
|
||||
let addr_bytes: [u8; IPV4_IP_SZ] = packet
|
||||
[IPV4_DST_IP_OFF..IPV4_DST_IP_OFF + IPV4_IP_SZ]
|
||||
.try_into()
|
||||
.unwrap();
|
||||
Some(IpAddr::from(addr_bytes))
|
||||
}
|
||||
6 if packet.len() >= IPV6_MIN_HEADER_SIZE => {
|
||||
let addr_bytes: [u8; IPV6_IP_SZ] = packet
|
||||
[IPV6_DST_IP_OFF..IPV6_DST_IP_OFF + IPV6_IP_SZ]
|
||||
.try_into()
|
||||
.unwrap();
|
||||
Some(IpAddr::from(addr_bytes))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,6 +168,7 @@ impl MixnetClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MixnetClientSender {
|
||||
client_input: ClientInput,
|
||||
packet_type: Option<PacketType>,
|
||||
|
||||
@@ -11,6 +11,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
bincode = "1.3.3"
|
||||
bytes = "1.5.0"
|
||||
dashmap.workspace = true
|
||||
etherparse = "0.13.0"
|
||||
futures = { workspace = true }
|
||||
log = { workspace = true }
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::{
|
||||
collections::HashMap,
|
||||
net::{IpAddr, SocketAddr},
|
||||
path::Path,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@@ -167,7 +168,7 @@ impl IpPacketRouterBuilder {
|
||||
_config: self.config,
|
||||
request_filter: request_filter.clone(),
|
||||
tun_task_tx,
|
||||
tun_task_response_rx,
|
||||
tun_task_response_rx: Some(tun_task_response_rx),
|
||||
mixnet_client,
|
||||
task_handle,
|
||||
connected_clients: Default::default(),
|
||||
@@ -195,10 +196,9 @@ struct IpPacketRouter {
|
||||
_config: Config,
|
||||
request_filter: request_filter::RequestFilter,
|
||||
tun_task_tx: nym_tun::tun_task_channel::TunTaskTx,
|
||||
tun_task_response_rx: nym_tun::tun_task_channel::TunTaskResponseRx,
|
||||
tun_task_response_rx: Option<nym_tun::tun_task_channel::TunTaskResponseRx>,
|
||||
mixnet_client: nym_sdk::mixnet::MixnetClient,
|
||||
task_handle: TaskHandle,
|
||||
|
||||
connected_clients: HashMap<IpAddr, ConnectedClient>,
|
||||
}
|
||||
|
||||
@@ -212,6 +212,7 @@ impl IpPacketRouter {
|
||||
async fn on_static_connect_request(
|
||||
&mut self,
|
||||
connect_request: nym_ip_packet_requests::StaticConnectRequest,
|
||||
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
|
||||
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
|
||||
log::info!(
|
||||
"Received static connect request from {sender_address}",
|
||||
@@ -225,10 +226,16 @@ impl IpPacketRouter {
|
||||
|
||||
// Check that the IP is available in the set of connected clients
|
||||
let is_ip_taken = self.connected_clients.contains_key(&requested_ip);
|
||||
// let is_ip_taken = connected_clients
|
||||
// .lock()
|
||||
// .unwrap()
|
||||
// .contains_key(&requested_ip);
|
||||
|
||||
// Check that the nym address isn't already registered
|
||||
let is_nym_address_taken = self
|
||||
.connected_clients
|
||||
// .lock()
|
||||
// .unwrap()
|
||||
.values()
|
||||
.any(|client| client.nym_address == reply_to);
|
||||
|
||||
@@ -254,6 +261,10 @@ impl IpPacketRouter {
|
||||
last_activity: std::time::Instant::now(),
|
||||
},
|
||||
);
|
||||
connected_client_tx
|
||||
.send(ConnectedClientEvent::Connect(requested_ip, reply_to))
|
||||
.await
|
||||
.unwrap();
|
||||
Ok(Some(IpPacketResponse::new_static_connect_success(
|
||||
request_id, reply_to,
|
||||
)))
|
||||
@@ -280,6 +291,7 @@ impl IpPacketRouter {
|
||||
async fn on_dynamic_connect_request(
|
||||
&mut self,
|
||||
connect_request: nym_ip_packet_requests::DynamicConnectRequest,
|
||||
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
|
||||
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
|
||||
log::info!(
|
||||
"Received dynamic connect request from {sender_address}",
|
||||
@@ -332,6 +344,10 @@ impl IpPacketRouter {
|
||||
last_activity: std::time::Instant::now(),
|
||||
},
|
||||
);
|
||||
connected_client_tx
|
||||
.send(ConnectedClientEvent::Connect(new_ip, reply_to))
|
||||
.await
|
||||
.unwrap();
|
||||
Ok(Some(IpPacketResponse::new_dynamic_connect_success(
|
||||
request_id, reply_to, new_ip,
|
||||
)))
|
||||
@@ -393,6 +409,7 @@ impl IpPacketRouter {
|
||||
async fn on_reconstructed_message(
|
||||
&mut self,
|
||||
reconstructed: ReconstructedMessage,
|
||||
connected_client_tx: &tokio::sync::mpsc::Sender<ConnectedClientEvent>,
|
||||
) -> Result<Option<IpPacketResponse>, IpPacketRouterError> {
|
||||
log::debug!(
|
||||
"Received message with sender_tag: {:?}",
|
||||
@@ -414,10 +431,12 @@ impl IpPacketRouter {
|
||||
|
||||
match request.data {
|
||||
IpPacketRequestData::StaticConnect(connect_request) => {
|
||||
self.on_static_connect_request(connect_request).await
|
||||
self.on_static_connect_request(connect_request, connected_client_tx)
|
||||
.await
|
||||
}
|
||||
IpPacketRequestData::DynamicConnect(connect_request) => {
|
||||
self.on_dynamic_connect_request(connect_request).await
|
||||
self.on_dynamic_connect_request(connect_request, connected_client_tx)
|
||||
.await
|
||||
}
|
||||
IpPacketRequestData::Data(data_request) => self.on_data_request(data_request).await,
|
||||
}
|
||||
@@ -427,6 +446,84 @@ impl IpPacketRouter {
|
||||
let mut task_client = self.task_handle.fork("main_loop");
|
||||
let mut disconnect_timer = tokio::time::interval(DISCONNECT_TIMER_INTERVAL);
|
||||
|
||||
let mixnet_client_sender = self.mixnet_client.split_sender();
|
||||
let mixnet_client_sender_clone = mixnet_client_sender.clone();
|
||||
|
||||
// let connected_clients = Arc::new(std::sync::Mutex::new(
|
||||
// HashMap::<IpAddr, ConnectedClient>::new(),
|
||||
// ));
|
||||
// let connected_clients_clone = connected_clients.clone();
|
||||
|
||||
let tun_task_response_rx = self.tun_task_response_rx.take();
|
||||
|
||||
let (connected_client_tx, mut connected_client_rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
// Spawn TUN listener
|
||||
tokio::spawn(async move {
|
||||
let mut connected_clients = HashMap::<IpAddr, ConnectedClient>::new();
|
||||
let mut tun_task_response_rx = tun_task_response_rx.unwrap();
|
||||
loop {
|
||||
tokio::select! {
|
||||
connected_client_event = connected_client_rx.recv() => {
|
||||
match connected_client_event {
|
||||
Some(ConnectedClientEvent::Connect(ip, nym_addr)) => {
|
||||
log::trace!("Connect client: {ip}");
|
||||
connected_clients.insert(ip, ConnectedClient {
|
||||
nym_address: nym_addr,
|
||||
last_activity: std::time::Instant::now(),
|
||||
});
|
||||
},
|
||||
Some(ConnectedClientEvent::Disconnect(ip)) => {
|
||||
log::trace!("Disconnect client: {ip}");
|
||||
connected_clients.remove(&ip);
|
||||
},
|
||||
None => {},
|
||||
}
|
||||
},
|
||||
packet = tun_task_response_rx.recv() => {
|
||||
if let Some((_tag, packet)) = packet {
|
||||
// TODO: skip full parsing since we only need dst_addr
|
||||
// let Ok(ParsedPacket {
|
||||
// packet_type: _,
|
||||
// src_addr: _,
|
||||
// dst_addr,
|
||||
// dst: _,
|
||||
// }) = parse_packet(&packet) else {
|
||||
// log::warn!("Failed to parse packet");
|
||||
// continue;
|
||||
// };
|
||||
let Some(dst_addr) = parse_dst_addr(&packet) else {
|
||||
log::warn!("Failed to parse packet");
|
||||
continue;
|
||||
};
|
||||
|
||||
let recipient = connected_clients.get(&dst_addr).map(|c| c.nym_address);
|
||||
|
||||
if let Some(recipient) = recipient {
|
||||
let lane = TransmissionLane::General;
|
||||
let packet_type = None;
|
||||
let response_packet = IpPacketResponse::new_ip_packet(packet.into()).to_bytes();
|
||||
let Ok(response_packet) = response_packet else {
|
||||
log::error!("Failed to serialize response packet");
|
||||
continue;
|
||||
};
|
||||
let input_message = InputMessage::new_regular(recipient, response_packet, lane, packet_type);
|
||||
|
||||
if let Err(err) = mixnet_client_sender_clone.send(input_message).await {
|
||||
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
|
||||
};
|
||||
} else {
|
||||
log::error!("IpPacketRouter [main loop]: no nym-address recipient for packet");
|
||||
}
|
||||
} else {
|
||||
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
while !task_client.is_shutdown() {
|
||||
tokio::select! {
|
||||
_ = task_client.recv() => {
|
||||
@@ -446,11 +543,12 @@ impl IpPacketRouter {
|
||||
for ip in inactive_clients {
|
||||
log::info!("Disconnect inactive client: {ip}");
|
||||
self.connected_clients.remove(&ip);
|
||||
connected_client_tx.send(ConnectedClientEvent::Disconnect(ip)).await.unwrap();
|
||||
}
|
||||
},
|
||||
msg = self.mixnet_client.next() => {
|
||||
if let Some(msg) = msg {
|
||||
match self.on_reconstructed_message(msg).await {
|
||||
match self.on_reconstructed_message(msg, &connected_client_tx).await {
|
||||
Ok(Some(response)) => {
|
||||
let Some(recipient) = response.recipient() else {
|
||||
log::error!("IpPacketRouter [main loop]: failed to get recipient from response");
|
||||
@@ -464,7 +562,7 @@ impl IpPacketRouter {
|
||||
let lane = TransmissionLane::General;
|
||||
let packet_type = None;
|
||||
let input_message = InputMessage::new_regular(*recipient, response_packet, lane, packet_type);
|
||||
if let Err(err) = self.mixnet_client.send(input_message).await {
|
||||
if let Err(err) = mixnet_client_sender.send(input_message).await {
|
||||
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
|
||||
};
|
||||
},
|
||||
@@ -474,49 +572,12 @@ impl IpPacketRouter {
|
||||
Err(err) => {
|
||||
log::error!("Error handling mixnet message: {err}");
|
||||
}
|
||||
|
||||
};
|
||||
} else {
|
||||
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
|
||||
break;
|
||||
};
|
||||
},
|
||||
packet = self.tun_task_response_rx.recv() => {
|
||||
if let Some((_tag, packet)) = packet {
|
||||
// TODO: skip full parsing since we only need dst_addr
|
||||
let Ok(ParsedPacket {
|
||||
packet_type: _,
|
||||
src_addr: _,
|
||||
dst_addr,
|
||||
dst: _,
|
||||
}) = parse_packet(&packet) else {
|
||||
log::warn!("Failed to parse packet");
|
||||
continue;
|
||||
};
|
||||
|
||||
let recipient = self.connected_clients.get(&dst_addr).map(|c| c.nym_address);
|
||||
|
||||
if let Some(recipient) = recipient {
|
||||
let lane = TransmissionLane::General;
|
||||
let packet_type = None;
|
||||
let response_packet = IpPacketResponse::new_ip_packet(packet.into()).to_bytes();
|
||||
let Ok(response_packet) = response_packet else {
|
||||
log::error!("Failed to serialize response packet");
|
||||
continue;
|
||||
};
|
||||
let input_message = InputMessage::new_regular(recipient, response_packet, lane, packet_type);
|
||||
|
||||
if let Err(err) = self.mixnet_client.send(input_message).await {
|
||||
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
|
||||
};
|
||||
} else {
|
||||
log::error!("IpPacketRouter [main loop]: no nym-address recipient for packet");
|
||||
}
|
||||
} else {
|
||||
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -525,6 +586,11 @@ impl IpPacketRouter {
|
||||
}
|
||||
}
|
||||
|
||||
enum ConnectedClientEvent {
|
||||
Disconnect(IpAddr),
|
||||
Connect(IpAddr, Recipient),
|
||||
}
|
||||
|
||||
struct ParsedPacket<'a> {
|
||||
packet_type: &'a str,
|
||||
src_addr: IpAddr,
|
||||
@@ -539,8 +605,8 @@ fn parse_packet(packet: &[u8]) -> Result<ParsedPacket, IpPacketRouterError> {
|
||||
})?;
|
||||
|
||||
let (packet_type, dst_port) = match headers.transport {
|
||||
Some(etherparse::TransportSlice::Udp(header)) => ("ipv4", Some(header.destination_port())),
|
||||
Some(etherparse::TransportSlice::Tcp(header)) => ("ipv6", Some(header.destination_port())),
|
||||
Some(etherparse::TransportSlice::Udp(header)) => ("udp", Some(header.destination_port())),
|
||||
Some(etherparse::TransportSlice::Tcp(header)) => ("tcp", Some(header.destination_port())),
|
||||
Some(etherparse::TransportSlice::Icmpv4(_)) => ("icmpv4", None),
|
||||
Some(etherparse::TransportSlice::Icmpv6(_)) => ("icmpv6", None),
|
||||
Some(etherparse::TransportSlice::Unknown(_)) => ("unknown", None),
|
||||
@@ -620,3 +686,37 @@ async fn create_mixnet_client(
|
||||
.await
|
||||
.map_err(|err| IpPacketRouterError::FailedToConnectToMixnet { source: err })
|
||||
}
|
||||
|
||||
// Copyright (c) 2019 Cloudflare, Inc. All rights reserved.
|
||||
// SPDX-License-Identifier: BSD-3-Clause
|
||||
const IPV4_MIN_HEADER_SIZE: usize = 20;
|
||||
const IPV4_DST_IP_OFF: usize = 16;
|
||||
const IPV4_IP_SZ: usize = 4;
|
||||
|
||||
const IPV6_MIN_HEADER_SIZE: usize = 40;
|
||||
const IPV6_DST_IP_OFF: usize = 24;
|
||||
const IPV6_IP_SZ: usize = 16;
|
||||
|
||||
pub fn parse_dst_addr(packet: &[u8]) -> Option<IpAddr> {
|
||||
if packet.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
match packet[0] >> 4 {
|
||||
4 if packet.len() >= IPV4_MIN_HEADER_SIZE => {
|
||||
let addr_bytes: [u8; IPV4_IP_SZ] = packet
|
||||
[IPV4_DST_IP_OFF..IPV4_DST_IP_OFF + IPV4_IP_SZ]
|
||||
.try_into()
|
||||
.unwrap();
|
||||
Some(IpAddr::from(addr_bytes))
|
||||
}
|
||||
6 if packet.len() >= IPV6_MIN_HEADER_SIZE => {
|
||||
let addr_bytes: [u8; IPV6_IP_SZ] = packet
|
||||
[IPV6_DST_IP_OFF..IPV6_DST_IP_OFF + IPV6_IP_SZ]
|
||||
.try_into()
|
||||
.unwrap();
|
||||
Some(IpAddr::from(addr_bytes))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user