Compare commits

...

4 Commits

Author SHA1 Message Date
durch 8f0c427734 Dummy WG implementation - cleaned up 2023-08-30 12:47:48 +02:00
durch 7dfc396f4f Each packet to its own thread 2023-07-25 17:18:06 +02:00
durch 2bf44db72f Tun arc and mutex 2023-07-25 16:46:24 +02:00
durch ebfecba933 Wireguard POC 2023-06-27 11:45:25 +02:00
12 changed files with 1660 additions and 259 deletions
+2 -1
View File
@@ -43,4 +43,5 @@ envs/qwerty.env
.parcel-cache
**/.DS_Store
cpu-cycles/libcpucycles/build
foxyfox.env
foxyfox.env
gateway/deploy.sh
Generated
+1069 -255
View File
File diff suppressed because it is too large Load Diff
+30 -3
View File
@@ -34,16 +34,41 @@ pretty_env_logger = "0.4"
rand = "0.7"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "sqlite", "macros", "migrate", ] }
sqlx = { version = "0.5", features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
] }
subtle-encoding = { version = "0.5", features = ["bech32-preview"] }
thiserror = "1"
tokio = { version = "1.24.1", features = [ "rt-multi-thread", "net", "signal", "fs", ] }
tokio = { version = "1.24.1", features = [
"rt-multi-thread",
"net",
"signal",
"fs",
] }
tokio-stream = { version = "0.1.11", features = ["fs"] }
tokio-tungstenite = "0.14"
tokio-util = { version = "0.7.4", features = ["codec"] }
url = { version = "2.2", features = ["serde"] }
zeroize = { workspace = true }
# wireguard
# Forked it to be able to bump x25519-dalek to rc.3
boringtun = { git = "https://github.com/durch/boringtun.git" }
base64 = "0.21"
x25519-dalek = { version = "=2.0.0-rc.3", features = [
"reusable_secrets",
"static_secrets",
] }
etherparse = "0.13.0"
pnet = "0.34.0"
bytes = "1.4.0"
async-recursion = "1.0.4"
smoltcp = "0.10.0"
tun-tap = "0.1.3"
# internal
nym-api-requests = { path = "../nym-api/nym-api-requests" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
@@ -60,7 +85,9 @@ nym-sphinx = { path = "../common/nymsphinx" }
nym-statistics-common = { path = "../common/statistics" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-validator-client = { path = "../common/client-libs/validator-client", features = [ "nyxd-client" ] }
nym-validator-client = { path = "../common/client-libs/validator-client", features = [
"nyxd-client",
] }
[build-dependencies]
tokio = { version = "1.24.1", features = ["rt-multi-thread", "macros"] }
+1
View File
@@ -0,0 +1 @@
gA3NCDl+xOorR3heFVB47FlGunsZgS4RDX2M0IY73lc=
+1
View File
@@ -0,0 +1 @@
mxV/mw7WZTe+0Msa0kvJHMHERDA/cSskiZWQce+TdEs=
+1
View File
@@ -0,0 +1 @@
AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=
+1
View File
@@ -0,0 +1 @@
WM8s8bYegwMa0TJ+xIwhk+dImk2IpDUKslDBCZPizlE=
+5
View File
@@ -10,6 +10,7 @@ use crate::node::client_handling::websocket::connection_handler::coconut::Coconu
use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler;
use crate::node::statistics::collector::GatewayStatisticsCollector;
use crate::node::storage::Storage;
use crate::node::wireguard::wireguard;
use log::*;
use nym_bin_common::output_format::OutputFormat;
use nym_crypto::asymmetric::{encryption, identity};
@@ -28,6 +29,8 @@ pub(crate) mod client_handling;
pub(crate) mod mixnet_handling;
pub(crate) mod statistics;
pub(crate) mod storage;
mod wg;
pub(crate) mod wireguard;
/// Wire up and create Gateway instance
pub(crate) async fn create_gateway(config: Config) -> Gateway<PersistentStorage> {
@@ -297,6 +300,8 @@ impl<St> Gateway<St> {
Arc::new(coconut_verifier),
);
tokio::spawn(wireguard());
info!("Finished nym gateway startup procedure - it should now be able to receive mix and client traffic!");
self.wait_for_interrupt(shutdown).await
+31
View File
@@ -0,0 +1,31 @@
use bytes::Bytes;
use std::fmt::{Display, Formatter};
#[derive(Debug, Clone)]
pub enum Event {
/// Dumb event with no data.
Dumb,
/// IP packet received from the WireGuard tunnel that should be passed through to the corresponding virtual device/internet.
/// Original implementation also has protocol here since it understands it, but we'll have to infer it downstream
WgPacket(Bytes),
/// IP packet to be sent through the WireGuard tunnel as crafted by the virtual device.
IpPacket(Bytes),
}
impl Display for Event {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Event::Dumb => {
write!(f, "Dumb{{}}")
}
Event::WgPacket(data) => {
let size = data.len();
write!(f, "WgPacket{{ size={} }}", size)
}
Event::IpPacket(data) => {
let size = data.len();
write!(f, "IpPacket{{ size={} }}", size)
}
}
}
}
+437
View File
@@ -0,0 +1,437 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use async_recursion::async_recursion;
use base64::engine::general_purpose;
use base64::Engine as _;
use boringtun::noise::errors::WireGuardError;
use boringtun::noise::{Tunn, TunnResult};
use etherparse::{InternetSlice, PacketBuilder, SlicedPacket, TransportSlice};
use log::{debug, info, warn};
use pnet::packet::ip::IpNextHeaderProtocols;
use pnet::packet::ipv4::{Ipv4Packet, MutableIpv4Packet};
use pnet::packet::{MutablePacket, Packet, PacketSize};
use pnet::transport::{ipv4_packet_iter, transport_channel};
use tokio::net::UdpSocket;
use tokio::sync::{Mutex, RwLock};
use tokio::time::{sleep, timeout};
use x25519_dalek::StaticSecret;
use crate::error;
use self::events::Event;
pub mod events;
const MAX_PACKET: usize = 65536;
/// A WireGuard tunnel. Encapsulates and decapsulates IP packets,
/// recieves packets from the client on the udp_rx channel,
/// and events from the internet on the eth_rx channel,
/// sends data through udp socket or datalink sender directly.
/// For now all tunnels recieve all events and filter on the source_peer_addr
pub struct WireGuardTunnel {
source_peer_addr: Arc<RwLock<Option<(Ipv4Addr, u16)>>>,
/// `boringtun` peer/tunnel implementation, used for crypto & WG protocol.
peer: Arc<Mutex<Tunn>>,
udp: Arc<UdpSocket>,
peer_endpoint: SocketAddr,
bus_rx: tokio::sync::broadcast::Receiver<Event>,
bus_tx: tokio::sync::broadcast::Sender<Event>,
}
pub fn handle_l3_packet(data: &[u8], destination_addr: Ipv4Addr) -> Vec<u8> {
let (mut tx, mut rx) = transport_channel(
65535,
pnet::transport::TransportChannelType::Layer3(IpNextHeaderProtocols::Tcp),
)
.unwrap();
let mut rx_iterator = ipv4_packet_iter(&mut rx);
let mut must_send = true;
let mut cnt = 0;
while let Ok((packet, addr)) = rx_iterator.next() {
if must_send {
let data = data.to_vec();
let incoming_packet = Ipv4Packet::new(&data).unwrap();
let mut new_packet = vec![0; incoming_packet.packet_size()];
let mut outgoing_packet = MutableIpv4Packet::new(&mut new_packet).unwrap();
outgoing_packet.clone_from(&incoming_packet);
info!(
"Sending (ttl={}, proto={} from {} to {}({})",
outgoing_packet.get_ttl(),
outgoing_packet.get_next_level_protocol(),
outgoing_packet.get_source(),
outgoing_packet.get_destination(),
destination_addr
);
outgoing_packet.set_source("95.217.227.118".parse().unwrap());
let sent = tx
.send_to(outgoing_packet, IpAddr::V4(destination_addr))
.unwrap();
info!("Sent L3 packet ({sent})");
must_send = false;
continue;
}
cnt += 1;
let source = packet.get_source();
let destination = packet.get_destination();
info!("Ignoring packet from {source}");
if source == destination_addr {
info!("({addr}){source} -> {destination}");
return packet.payload().to_vec();
}
if cnt >= 10 {
break;
}
}
vec![]
}
impl WireGuardTunnel {
async fn set_source_peer_addr(&self, source_addr: Ipv4Addr, source_port: Option<u16>) {
{
if self.source_peer_addr.read().await.is_some() {
return;
}
}
let mut source_peer_addr = self.source_peer_addr.write().await;
*source_peer_addr = Some((source_addr, source_port.unwrap_or(0)))
}
pub async fn spin_off(mut self) {
info!("Spun off WG tunnel");
// We'll receive both inbound and outbound packages on the same channel, and filter on packet type
loop {
tokio::select! {
packet = self.bus_rx.recv() => {
match packet {
Ok(p) => {
info!("{p}");
match p {
Event::IpPacket(data) => self.consume_eth(&data).await,
Event::WgPacket(data) => self.consume_wg(&data).await,
_ => {}
}
},
Err(e) => error!("{e}")
}
},
_ = sleep(Duration::from_millis(5))=> {
let mut send_buf = [0u8; MAX_PACKET];
let tun_result = {
let mut tun = timeout(Duration::from_millis(100), self.peer()).await.unwrap();
tun.update_timers(&mut send_buf)
};
self.handle_routine_tun_result(tun_result).await;
}
}
}
}
pub async fn consume_eth(&self, data: &[u8]) {
let parsed_packet = SlicedPacket::from_ethernet(data).unwrap();
debug!("{parsed_packet:?}");
let (source_addr, destination_addr) = match parsed_packet.ip.unwrap() {
InternetSlice::Ipv4(ip, _) => (ip.source_addr(), ip.destination_addr()),
_ => unimplemented!(),
};
let (source_port, destination_port, icmp_type) = match parsed_packet.transport.as_ref() {
Some(TransportSlice::Tcp(tcp)) => {
(Some(tcp.source_port()), Some(tcp.destination_port()), None)
}
Some(TransportSlice::Udp(udp)) => {
(Some(udp.source_port()), Some(udp.destination_port()), None)
}
Some(TransportSlice::Icmpv4(icmp)) => (None, None, Some(icmp.icmp_type())),
Some(TransportSlice::Icmpv6(_)) => panic!("ICMPv6"),
Some(TransportSlice::Unknown(_)) => panic!("Unknown"),
None => panic!("No transport layer"),
};
debug!(
"{:?}:{:?} -> {:?}:{:?} - ({:?})",
source_addr, source_port, destination_addr, destination_port, icmp_type
);
if destination_addr == self.source_peer_addr.read().await.unwrap().0 {
info!("Sending {} to {}", data.len(), self.peer_endpoint);
} else {
return;
}
let response_packet_builder =
PacketBuilder::ipv4(source_addr.octets(), destination_addr.octets(), 64);
let mut response_packet =
Vec::<u8>::with_capacity(response_packet_builder.size(parsed_packet.payload.len()));
match parsed_packet.transport.as_ref() {
Some(TransportSlice::Udp(udp)) => {
debug!("UDP: {}, {}", udp.length(), udp.destination_port());
let response_packet_builder =
response_packet_builder.udp(source_port.unwrap(), destination_port.unwrap());
response_packet_builder
.write(&mut response_packet, parsed_packet.payload)
.unwrap();
}
Some(TransportSlice::Tcp(tcp)) => {
let response_packet_builder = response_packet_builder.tcp(
destination_port.unwrap(),
source_port.unwrap(),
tcp.sequence_number(),
tcp.window_size(),
);
response_packet_builder
.write(&mut response_packet, parsed_packet.payload)
.unwrap();
}
Some(TransportSlice::Icmpv4(icmp)) => {
info!("{:?}", icmp.icmp_type());
let response_packet_builder = response_packet_builder.icmpv4(icmp.icmp_type());
response_packet_builder
.write(&mut response_packet, parsed_packet.payload)
.unwrap();
}
None => {}
_ => unimplemented!(),
};
let encapsulated_response_packet = self.encapsulate_packet(&response_packet).await;
// let packet = Tunn::parse_incoming_packet(&response_packet).unwrap();
// info!("Sending {packet:?} to {addr}");
let sent = self
.udp
.send_to(&encapsulated_response_packet, self.peer_endpoint)
.await
.unwrap();
info!(
"[{}:{} ({sent})-> {}:{}] -> {}",
destination_addr,
destination_port.unwrap_or(0),
source_addr,
source_port.unwrap_or(0),
self.peer_endpoint
);
}
// TODO: extend to work with IPv6
pub async fn produce_eth(&self, packet_bytes: &[u8]) -> Vec<u8> {
let outgoing_packet = SlicedPacket::from_ip(packet_bytes).unwrap();
let (source_addr, destination_addr) = match outgoing_packet.ip.unwrap() {
InternetSlice::Ipv4(ip, _) => (ip.source_addr(), ip.destination_addr()),
_ => unimplemented!(),
};
let (source_port, destination_port, icmp_type) = match outgoing_packet.transport.as_ref() {
Some(TransportSlice::Tcp(tcp)) => {
(Some(tcp.source_port()), Some(tcp.destination_port()), None)
}
Some(TransportSlice::Udp(udp)) => {
(Some(udp.source_port()), Some(udp.destination_port()), None)
}
Some(TransportSlice::Icmpv4(icmp)) => (None, None, Some(icmp.icmp_type())),
Some(TransportSlice::Icmpv6(_)) => panic!("ICMPv6"),
Some(TransportSlice::Unknown(_)) => panic!("Unknown"),
None => panic!("No transport layer"),
};
info!(
"{:?}:{:?} -> {:?}:{:?} - ({:?})",
source_addr, source_port, destination_addr, destination_port, icmp_type
);
self.set_source_peer_addr(source_addr, source_port).await;
handle_l3_packet(packet_bytes, destination_addr)
}
/// WireGuard consumption task. Receives encrypted packets from the WireGuard peer,
/// decapsulates them, and dispatches newly received IP packets.
async fn consume_wg(&self, data: &[u8]) {
let mut send_buf = [0u8; MAX_PACKET];
let mut peer = self.peer().await;
match peer.decapsulate(None, data, &mut send_buf) {
TunnResult::WriteToNetwork(packet) => {
match self.udp.send_to(packet, self.peer_endpoint).await {
Ok(_) => {}
Err(e) => {
error!("Failed to send decapsulation-instructed packet to WireGuard endpoint: {:?}", e);
}
};
loop {
let mut send_buf = [0u8; MAX_PACKET];
match peer.decapsulate(None, &[], &mut send_buf) {
TunnResult::WriteToNetwork(packet) => {
match self.udp.send_to(packet, self.peer_endpoint).await {
Ok(_) => {}
Err(e) => {
error!("Failed to send decapsulation-instructed packet to WireGuard endpoint: {:?}", e);
break;
}
};
}
_ => {
break;
}
}
}
}
TunnResult::WriteToTunnelV4(packet, _) | TunnResult::WriteToTunnelV6(packet, _) => {
info!(
"WireGuard endpoint sent IP packet of {} bytes",
packet.len()
);
let response = self.produce_eth(packet).await;
if !response.is_empty() {
self.bus_tx.send(Event::IpPacket(response.into())).unwrap();
}
}
x => warn!("{x:?}"),
}
}
async fn encapsulate_packet(&self, payload: &[u8]) -> Vec<u8> {
let len = 148.max(payload.len() + 32);
let mut dst = vec![0; len];
let mut t = self.peer().await;
let packet = t.encapsulate(payload, &mut dst);
match packet {
TunnResult::WriteToNetwork(p) => p.to_vec(),
unexpected => {
error!("{:?}", unexpected);
vec![]
}
}
}
pub async fn peer(&self) -> tokio::sync::MutexGuard<'_, Tunn> {
self.peer.lock().await
}
pub async fn new(
peer_static_public: x25519_dalek::PublicKey,
udp: Arc<UdpSocket>,
peer_endpoint: SocketAddr,
bus_tx: tokio::sync::broadcast::Sender<Event>,
) -> Self {
let peer = Arc::new(Mutex::new(Self::create_tunnel(peer_static_public)));
Self {
source_peer_addr: Arc::new(RwLock::new(None)),
peer,
udp,
peer_endpoint,
bus_rx: bus_tx.subscribe(),
bus_tx,
}
}
fn create_tunnel(peer_static_public: x25519_dalek::PublicKey) -> Tunn {
let secret_bytes: [u8; 32] = general_purpose::STANDARD
.decode("AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=")
.unwrap()
.try_into()
.unwrap();
let private_key = StaticSecret::try_from(secret_bytes).unwrap();
Tunn::new(private_key, peer_static_public, None, None, 0, None).unwrap()
}
/// Encapsulates and sends an IP packet back to the WireGuard client.
pub async fn send_ip_packet(&self, packet: &[u8]) -> anyhow::Result<()> {
let mut send_buf = [0u8; MAX_PACKET];
match self.peer().await.encapsulate(packet, &mut send_buf) {
TunnResult::WriteToNetwork(packet) => {
self.udp.send_to(packet, self.peer_endpoint).await.unwrap();
debug!(
"Sent {} bytes to WireGuard endpoint (encrypted IP packet)",
packet.len()
);
}
TunnResult::Err(e) => {
error!("Failed to encapsulate IP packet: {:?}", e);
}
TunnResult::Done => {
// Ignored
}
other => {
error!(
"Unexpected WireGuard state during encapsulation: {:?}",
other
);
}
};
Ok(())
}
#[async_recursion]
async fn handle_routine_tun_result<'a: 'async_recursion>(&self, result: TunnResult<'a>) -> () {
match result {
TunnResult::WriteToNetwork(packet) => {
info!(
"Sending routine packet of {} bytes to WireGuard endpoint",
packet.len()
);
match self.udp.send_to(packet, self.peer_endpoint).await {
Ok(_) => {}
Err(e) => {
error!(
"Failed to send routine packet to WireGuard endpoint: {:?}",
e
);
}
};
}
TunnResult::Err(WireGuardError::ConnectionExpired) => {
warn!("Wireguard handshake has expired!");
let mut buf = vec![0u8; MAX_PACKET];
let result = self
.peer()
.await
.format_handshake_initiation(&mut buf[..], false);
self.handle_routine_tun_result(result).await
}
TunnResult::Err(e) => {
error!(
"Failed to prepare routine packet for WireGuard endpoint: {:?}",
e
);
}
TunnResult::Done => {
// Sleep for a bit
// tokio::time::sleep(Duration::from_millis(1)).await;
}
other => {
warn!("Unexpected WireGuard routine task state: {:?}", other);
}
};
}
// fn route_protocol(&self, packet: &[u8]) -> Option<Protocol> {
// match IpVersion::of_packet(packet) {
// Ok(IpVersion::Ipv4) => Ipv4Packet::new_checked(&packet)
// .ok()
// // Only care if the packet is destined for this tunnel
// .filter(|packet| Ipv4Addr::from(packet.dst_addr()) == self.source_peer_ip)
// .and_then(|packet| match packet.next_header() {
// IpProtocol::Tcp => Some(Protocol::Tcp),
// IpProtocol::Udp => Some(Protocol::Udp),
// // Unrecognized protocol, so we cannot determine where to route
// _ => None,
// }),
// Ok(IpVersion::Ipv6) => Ipv6Packet::new_checked(&packet)
// .ok()
// // Only care if the packet is destined for this tunnel
// .filter(|packet| Ipv6Addr::from(packet.dst_addr()) == self.source_peer_ip)
// .and_then(|packet| match packet.next_header() {
// IpProtocol::Tcp => Some(Protocol::Tcp),
// IpProtocol::Udp => Some(Protocol::Udp),
// // Unrecognized protocol, so we cannot determine where to route
// _ => None,
// }),
// _ => None,
// }
// }
}
+77
View File
@@ -0,0 +1,77 @@
use base64::engine::general_purpose;
use base64::Engine as _;
use log::{error, info};
use std::collections::HashSet;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::broadcast;
use x25519_dalek::{PublicKey, StaticSecret};
use crate::node::wg::events::Event;
use crate::node::wg::WireGuardTunnel;
pub async fn wireguard() {
let wg_address = "0.0.0.0:51820";
let sock = Arc::new(UdpSocket::bind(wg_address).await.unwrap());
info!("wg listening on {wg_address}");
// Secret key ofthe gateway, we'll need a way to generate this from the IdentityKey, might be enough to do some base58 -> base64 conversion
let secret_bytes: [u8; 32] = general_purpose::STANDARD
.decode("AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=")
.unwrap()
.try_into()
.unwrap();
// Hardcoded peer public key, we'll need a way to register those, private key for that one is `aMUcuAgTiFCHQ/fHqEQRvpLWBxh8sKA7f7lSyWymrGE=`
// Wireguard configuration that works with this setup is below, this needs to be put into the wireguard client of choice.
// Working in this case means that they go through the handshake, and client
// starts sending data packets to the gateway.
//
// [Interface]
// PrivateKey = aMUcuAgTiFCHQ/fHqEQRvpLWBxh8sKA7f7lSyWymrGE=
// Address = 10.8.0.0/24
// DNS = 1.1.1.1
//
// [Peer]
// PublicKey = y6/iGYraJjON6pw9fcBa5vLRbGsQqprFLfWKyJQnlWs=
// AllowedIPs = 0.0.0.0/0
// Endpoint = 127.0.0.1:51820
let peer_public_bytes: [u8; 32] = general_purpose::STANDARD
.decode("mxV/mw7WZTe+0Msa0kvJHMHERDA/cSskiZWQce+TdEs=")
.unwrap()
.try_into()
.unwrap();
let peer_public = PublicKey::from(peer_public_bytes);
let secret = StaticSecret::try_from(secret_bytes).unwrap();
let public = PublicKey::from(&secret);
info!(
"wg public key: {}",
general_purpose::STANDARD.encode(public)
);
let mut buf = [0; 1024];
let mut peers = HashSet::new();
let (bus_tx, _) = broadcast::channel(128);
while let Ok((len, addr)) = sock.recv_from(&mut buf).await {
info!("Received {} bytes from {}", len, addr);
if peers.contains(&addr) {
bus_tx
.send(Event::WgPacket(buf[..len].to_vec().into()))
.map_err(|e| error!("{e}"))
.unwrap();
} else {
info!("New peer with endpoint {addr}");
let tun =
WireGuardTunnel::new(peer_public, Arc::clone(&sock), addr, bus_tx.clone()).await;
peers.insert(addr);
tokio::spawn(tun.spin_off());
bus_tx
.send(Event::WgPacket(buf[..len].to_vec().into()))
.map_err(|e| error!("{e}"))
.unwrap();
}
}
panic!("Not OK");
}
@@ -0,0 +1,5 @@
{
"name": "@nymproject/nym-client-wasm",
"version": "1.0.0",
"sideEffects": false
}