Compare commits

..

9 Commits

Author SHA1 Message Date
Jon Häggblad 161a8a7d4d Create tun_common subdir 2023-11-20 10:19:47 +01:00
Jon Häggblad 25aa04686e new structure compiles 2023-11-20 10:06:34 +01:00
Jon Häggblad 9b6355b256 wip 2023-11-20 08:55:26 +01:00
Jon Häggblad 65272d7bf6 wip: extract crates 2023-11-18 15:29:55 +01:00
Jon Häggblad 810dce5ee8 Use common interface request response 2023-11-18 14:17:27 +01:00
Jon Häggblad 58ec878256 wip 2023-11-17 16:21:40 +01:00
Bogdan-Ștefan Neacşu a5c1e4abf0 Expose the same pub key that's used for wg (#4157) 2023-11-17 13:04:22 +00:00
Jon Häggblad 3a1003c564 Create TaggedPacket (#4156)
* Create TaggedPacket

* Fix bug passing the correct data
2023-11-17 12:30:15 +01:00
Jon Häggblad 1cdd8f6c08 Rework error handling in tun device (#4146)
* Rework error handling in tun device

* Extract out timeout constants

* Experiment with timeouts

* Update error msg

* try_send in one direction as hotfix for deadlock

* Downgrade some log from info to debug

* Update comment

* rustfmt
2023-11-17 09:52:05 +01:00
32 changed files with 484 additions and 253 deletions
+7 -6
View File
@@ -75,28 +75,29 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace
# Enable wireguard by default on linux only
args: --workspace --features wireguard
- name: Build all examples
if: matrix.os == 'custom-linux'
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace --examples
args: --workspace --examples --features wireguard
- name: Run all tests
if: matrix.os == 'custom-linux'
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace
args: --workspace --features wireguard
- name: Run expensive tests
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && matrix.os == 'custom-linux'
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace -- --ignored
args: --workspace --features wireguard -- --ignored
- name: Annotate with clippy checks
if: matrix.os == 'custom-linux'
@@ -104,10 +105,10 @@ jobs:
continue-on-error: true
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --workspace
args: --workspace --features wireguard
- name: Clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace --all-targets -- -D warnings
args: --workspace --all-targets --features wireguard -- -D warnings
Generated
+37
View File
@@ -6657,10 +6657,23 @@ dependencies = [
"thiserror",
]
[[package]]
name = "nym-ip-packet-requests"
version = "0.1.0"
dependencies = [
"bincode",
"bytes",
"nym-service-providers-common",
"nym-sphinx",
"serde",
]
[[package]]
name = "nym-ip-packet-router"
version = "0.1.0"
dependencies = [
"bincode",
"bytes",
"etherparse",
"futures",
"log",
@@ -6668,11 +6681,13 @@ dependencies = [
"nym-client-core",
"nym-config",
"nym-exit-policy",
"nym-ip-packet-requests",
"nym-network-requester",
"nym-sdk",
"nym-service-providers-common",
"nym-sphinx",
"nym-task",
"nym-tun",
"nym-wireguard",
"nym-wireguard-types",
"reqwest",
@@ -6920,6 +6935,7 @@ dependencies = [
"nym-crypto",
"nym-node-requests",
"nym-task",
"nym-wireguard",
"nym-wireguard-types",
"rand 0.7.3",
"serde",
@@ -7462,6 +7478,19 @@ dependencies = [
"wasm-utils",
]
[[package]]
name = "nym-tun"
version = "0.1.0"
dependencies = [
"boringtun",
"etherparse",
"log",
"nym-wireguard-types",
"thiserror",
"tokio",
"tokio-tun",
]
[[package]]
name = "nym-types"
version = "1.0.0"
@@ -7581,6 +7610,7 @@ version = "0.1.0"
dependencies = [
"async-recursion",
"base64 0.21.4",
"bincode",
"boringtun",
"bytes",
"dashmap",
@@ -7589,7 +7619,9 @@ dependencies = [
"ip_network",
"ip_network_table",
"log",
"nym-sphinx",
"nym-task",
"nym-tun",
"nym-wireguard-types",
"rand 0.8.5",
"serde",
@@ -7605,14 +7637,19 @@ version = "0.1.0"
dependencies = [
"base64 0.21.4",
"boringtun",
"bytes",
"dashmap",
"hmac 0.12.1",
"ip_network",
"ip_network_table",
"log",
"nym-crypto",
"rand 0.7.3",
"serde",
"serde_json",
"sha2 0.10.8",
"thiserror",
"tokio",
"utoipa",
"x25519-dalek 2.0.0",
]
+2
View File
@@ -49,6 +49,7 @@ members = [
"common/exit-policy",
"common/http-api-client",
"common/inclusion-probability",
"common/ip-packet-requests",
"common/ledger",
"common/mixnode-common",
"common/network-defaults",
@@ -74,6 +75,7 @@ members = [
"common/store-cipher",
"common/task",
"common/topology",
"common/tun",
"common/types",
"common/wasm/client-core",
"common/wasm/storage",
+18
View File
@@ -0,0 +1,18 @@
[package]
name = "nym-ip-packet-requests"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bincode = "1.3.3"
bytes = "1.5.0"
nym-service-providers-common = { path = "../../service-providers/common" }
nym-sphinx = { path = "../nymsphinx" }
serde = { workspace = true, features = ["derive"] }
+33
View File
@@ -0,0 +1,33 @@
use nym_service_providers_common::interface;
pub type IpPacketRouterRequest = interface::Request<TaggedIpPacket>;
pub type IpPacketRouterResponse = interface::Response<IpPacket>;
#[derive(serde::Serialize, serde::Deserialize)]
pub struct TaggedIpPacket {
pub packet: bytes::Bytes,
pub return_address: nym_sphinx::addressing::clients::Recipient,
pub return_mix_hops: Option<u8>,
pub return_mix_delays: Option<f64>,
}
#[derive(serde::Serialize, serde::Deserialize)]
pub struct IpPacket {
pub packet: bytes::Bytes,
}
impl TaggedIpPacket {
pub fn from_message(
message: &nym_sphinx::receiver::ReconstructedMessage,
) -> Result<Self, bincode::Error> {
use bincode::Options;
make_bincode_serializer().deserialize(&message.message)
}
}
fn make_bincode_serializer() -> impl bincode::Options {
use bincode::Options;
bincode::DefaultOptions::new()
.with_big_endian()
.with_varint_encoding()
}
+26
View File
@@ -0,0 +1,26 @@
[package]
name = "nym-tun"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
thiserror.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util", "time", "sync", "macros"] }
etherparse = "0.13.0"
log.workspace = true
# TODO: remove
boringtun = { workspace = true }
nym-wireguard-types = { path = "../wireguard-types", optional = true }
[target.'cfg(target_os = "linux")'.dependencies]
tokio-tun = "0.9.0"
[features]
wireguard = ["nym-wireguard-types"]
+7
View File
@@ -0,0 +1,7 @@
#[cfg(target_os = "linux")]
mod linux;
pub mod tun_task_channel;
#[cfg(target_os = "linux")]
pub use linux::tun_device;
@@ -1,22 +1,67 @@
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
sync::Arc,
time::Duration,
};
use etherparse::{InternetSlice, SlicedPacket};
use tap::TapFallible;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::{
event::Event,
tun_task_channel::{
tun_task_channel, tun_task_response_channel, TunTaskPayload, TunTaskResponseRx,
TunTaskResponseTx, TunTaskRx, TunTaskTx,
},
udp_listener::PeersByIp,
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
time::timeout,
};
use crate::tun_task_channel::{
tun_task_channel, tun_task_response_channel, TunTaskPayload, TunTaskResponseRx,
TunTaskResponseSendError, TunTaskResponseTx, TunTaskRx, TunTaskTx,
};
#[cfg(feature = "wireguard")]
use nym_wireguard_types::tun_common::{
active_peers::{PeerEventSenderError, PeersByIp},
event::Event,
};
#[cfg(feature = "wireguard")]
const MUTEX_LOCK_TIMEOUT_MS: u64 = 200;
const TUN_WRITE_TIMEOUT_MS: u64 = 1000;
#[derive(thiserror::Error, Debug)]
pub enum TunDeviceError {
#[error("timeout writing to tun device, dropping packet")]
TunWriteTimeout,
#[error("error writing to tun device: {source}")]
TunWriteError { source: std::io::Error },
#[cfg(feature = "wireguard")]
#[error("failed forwarding packet to peer: {source}")]
ForwardToPeerFailed {
#[from]
source: PeerEventSenderError,
},
#[error("failed to forward responding packet with tag: {source}")]
ForwardNatResponseFailed {
#[from]
source: TunTaskResponseSendError,
},
#[error("unable to parse destination address from packet")]
UnableToParseDstAdddress,
#[error("unable to parse source address from packet")]
UnableToParseSrcAddress {
#[from]
source: etherparse::ReadError,
},
#[error("unable to parse source address from packet: ip header missing")]
UnableToParseSrcAddressIpHeaderMissing,
#[error("unable to lock peer mutex")]
FailedToLockPeer,
}
fn setup_tokio_tun_device(name: &str, address: Ipv4Addr, netmask: Ipv4Addr) -> tokio_tun::Tun {
log::info!("Creating TUN device with: address={address}, netmask={netmask}");
// Read MTU size from env variable NYM_MTU_SIZE, else default to 1420.
@@ -51,6 +96,7 @@ pub struct TunDevice {
pub enum RoutingMode {
// The routing table, as how wireguard does it
#[cfg(feature = "wireguard")]
AllowedIps(AllowedIpsInner),
// This is an alternative to the routing table, where we just match outgoing source IP with
@@ -65,13 +111,27 @@ impl RoutingMode {
})
}
pub fn new_allowed_ips(peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>) -> Self {
#[cfg(feature = "wireguard")]
pub fn new_allowed_ips(peers_by_ip: std::sync::Arc<tokio::sync::Mutex<PeersByIp>>) -> Self {
RoutingMode::AllowedIps(AllowedIpsInner { peers_by_ip })
}
}
#[cfg(feature = "wireguard")]
pub struct AllowedIpsInner {
peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>,
peers_by_ip: std::sync::Arc<tokio::sync::Mutex<PeersByIp>>,
}
#[cfg(feature = "wireguard")]
impl AllowedIpsInner {
async fn lock(&self) -> Result<tokio::sync::MutexGuard<PeersByIp>, TunDeviceError> {
timeout(
Duration::from_millis(MUTEX_LOCK_TIMEOUT_MS),
self.peers_by_ip.as_ref().lock(),
)
.await
.map_err(|_| TunDeviceError::FailedToLockPeer)
}
}
pub struct NatInner {
@@ -114,48 +174,38 @@ impl TunDevice {
}
// Send outbound packets out on the wild internet
async fn handle_tun_write(&mut self, data: TunTaskPayload) {
async fn handle_tun_write(&mut self, data: TunTaskPayload) -> Result<(), TunDeviceError> {
let (tag, packet) = data;
let Some(dst_addr) = boringtun::noise::Tunn::dst_address(&packet) else {
log::error!("Unable to parse dst_address in packet that was supposed to be written to tun device");
return;
};
let Some(src_addr) = parse_src_address(&packet) else {
log::error!("Unable to parse src_address in packet that was supposed to be written to tun device");
return;
};
log::info!(
let dst_addr = boringtun::noise::Tunn::dst_address(&packet)
.ok_or_else(|| TunDeviceError::UnableToParseDstAdddress)?;
let src_addr = parse_src_address(&packet)?;
log::debug!(
"iface: write Packet({src_addr} -> {dst_addr}, {} bytes)",
packet.len()
);
// TODO: expire old entries
#[allow(irrefutable_let_patterns)]
if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
nat_table.nat_table.insert(src_addr, tag);
}
tokio::time::timeout(
std::time::Duration::from_millis(1000),
timeout(
Duration::from_millis(TUN_WRITE_TIMEOUT_MS),
self.tun.write_all(&packet),
)
.await
.tap_err(|err| {
log::error!("iface: write error: {err}");
})
.ok();
.map_err(|_| TunDeviceError::TunWriteTimeout)?
.map_err(|err| TunDeviceError::TunWriteError { source: err })
}
// Receive reponse packets from the wild internet
async fn handle_tun_read(&self, packet: &[u8]) {
let Some(dst_addr) = boringtun::noise::Tunn::dst_address(packet) else {
log::error!("Unable to parse dst_address in packet that was read from tun device");
return;
};
let Some(src_addr) = parse_src_address(packet) else {
log::error!("Unable to parse src_address in packet that was read from tun device");
return;
};
log::info!(
async fn handle_tun_read(&self, packet: &[u8]) -> Result<(), TunDeviceError> {
let dst_addr = boringtun::noise::Tunn::dst_address(packet)
.ok_or(TunDeviceError::UnableToParseDstAdddress)?;
let src_addr = parse_src_address(packet)?;
log::debug!(
"iface: read Packet({src_addr} -> {dst_addr}, {} bytes)",
packet.len(),
);
@@ -164,47 +214,32 @@ impl TunDevice {
match self.routing_mode {
// This is how wireguard does it, by consulting the AllowedIPs table.
#[cfg(feature = "wireguard")]
RoutingMode::AllowedIps(ref peers_by_ip) => {
let Ok(peers) = tokio::time::timeout(
std::time::Duration::from_millis(1000),
peers_by_ip.peers_by_ip.as_ref().lock(),
)
.await
else {
log::error!("Failed to lock peer");
return;
};
let peers = peers_by_ip.lock().await?;
if let Some(peer_tx) = peers.longest_match(dst_addr).map(|(_, tx)| tx) {
log::info!("Forward packet to wg tunnel");
tokio::time::timeout(
std::time::Duration::from_millis(1000),
peer_tx.send(Event::Ip(packet.to_vec().into())),
)
.await
.tap_err(|err| log::error!("Failed to forward packet to wg tunnel: {err}"))
.ok();
return;
log::debug!("Forward packet to wg tunnel");
return peer_tx
.send(Event::Ip(packet.to_vec().into()))
.await
.map_err(|err| err.into());
}
}
// But we can also do it by consulting the NAT table.
RoutingMode::Nat(ref nat_table) => {
if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
log::info!("Forward packet with tag: {tag}");
tokio::time::timeout(
std::time::Duration::from_millis(1000),
self.tun_task_response_tx.send((*tag, packet.to_vec())),
)
.await
.tap_err(|err| log::error!("Failed to foward packet with tag: {err}"))
.ok();
return;
log::debug!("Forward packet with NAT tag: {tag}");
return self
.tun_task_response_tx
.try_send((*tag, packet.to_vec()))
.map_err(|err| err.into());
}
}
}
log::info!("No peer found, packet dropped");
Ok(())
}
pub async fn run(mut self) {
@@ -216,13 +251,9 @@ impl TunDevice {
len = self.tun.read(&mut buf) => match len {
Ok(len) => {
let packet = &buf[..len];
tokio::time::timeout(
std::time::Duration::from_millis(1000),
self.handle_tun_read(packet)
)
.await
.tap_err(|_err| log::error!("Failed: handle_tun_read timeout"))
.ok();
if let Err(err) = self.handle_tun_read(packet).await {
log::error!("iface: handle_tun_read failed: {err}")
}
},
Err(err) => {
log::info!("iface: read error: {err}");
@@ -231,13 +262,9 @@ impl TunDevice {
},
// Writing to the TUN device
Some(data) = self.tun_task_rx.recv() => {
tokio::time::timeout(
std::time::Duration::from_millis(1000),
self.handle_tun_write(data)
)
.await
.tap_err(|_err| log::error!("Failed: handle_tun_write timeout"))
.ok();
if let Err(err) = self.handle_tun_write(data).await {
log::error!("ifcae: handle_tun_write failed: {err}");
}
}
}
}
@@ -249,12 +276,11 @@ impl TunDevice {
}
}
fn parse_src_address(packet: &[u8]) -> Option<IpAddr> {
let headers = SlicedPacket::from_ip(packet)
.tap_err(|err| log::error!("Unable to parse IP packet: {err:?}"))
.ok()?;
Some(match headers.ip? {
InternetSlice::Ipv4(ip, _) => ip.source_addr().into(),
InternetSlice::Ipv6(ip, _) => ip.source_addr().into(),
})
fn parse_src_address(packet: &[u8]) -> Result<IpAddr, TunDeviceError> {
let headers = SlicedPacket::from_ip(packet)?;
match headers.ip {
Some(InternetSlice::Ipv4(ip, _)) => Ok(ip.source_addr().into()),
Some(InternetSlice::Ipv6(ip, _)) => Ok(ip.source_addr().into()),
None => Err(TunDeviceError::UnableToParseSrcAddressIpHeaderMissing),
}
}
+82
View File
@@ -0,0 +1,82 @@
use std::time::Duration;
use tokio::sync::mpsc::{
self,
error::{SendError, SendTimeoutError, TrySendError},
};
pub(crate) type TunTaskPayload = (u64, Vec<u8>);
#[derive(Clone)]
pub struct TunTaskTx(mpsc::Sender<TunTaskPayload>);
pub(crate) struct TunTaskRx(mpsc::Receiver<TunTaskPayload>);
impl TunTaskTx {
pub async fn send(&self, data: TunTaskPayload) -> Result<(), SendError<TunTaskPayload>> {
self.0.send(data).await
}
pub fn try_send(&self, data: TunTaskPayload) -> Result<(), TrySendError<TunTaskPayload>> {
self.0.try_send(data)
}
}
impl TunTaskRx {
pub(crate) async fn recv(&mut self) -> Option<TunTaskPayload> {
self.0.recv().await
}
}
pub(crate) fn tun_task_channel() -> (TunTaskTx, TunTaskRx) {
let (tun_task_tx, tun_task_rx) = tokio::sync::mpsc::channel(128);
(TunTaskTx(tun_task_tx), TunTaskRx(tun_task_rx))
}
const TUN_TASK_RESPONSE_SEND_TIMEOUT_MS: u64 = 1_000;
// Send responses back from the tun device back to the PacketRelayer
pub(crate) struct TunTaskResponseTx(mpsc::Sender<TunTaskPayload>);
pub struct TunTaskResponseRx(mpsc::Receiver<TunTaskPayload>);
#[derive(thiserror::Error, Debug)]
pub enum TunTaskResponseSendError {
#[error("failed to send tun response: {0}")]
SendTimeoutError(#[from] SendTimeoutError<TunTaskPayload>),
#[error("failed to send tun response: {0}")]
SendError(#[from] SendError<TunTaskPayload>),
#[error("failed to send tun response: {0}")]
TrySendError(#[from] TrySendError<TunTaskPayload>),
}
impl TunTaskResponseTx {
#[allow(unused)]
pub(crate) async fn send(&self, data: TunTaskPayload) -> Result<(), TunTaskResponseSendError> {
Ok(self
.0
.send_timeout(
data,
Duration::from_millis(TUN_TASK_RESPONSE_SEND_TIMEOUT_MS),
)
.await?)
}
pub(crate) fn try_send(&self, data: TunTaskPayload) -> Result<(), TunTaskResponseSendError> {
Ok(self.0.try_send(data)?)
}
}
impl TunTaskResponseRx {
pub async fn recv(&mut self) -> Option<TunTaskPayload> {
self.0.recv().await
}
}
pub(crate) fn tun_task_response_channel() -> (TunTaskResponseTx, TunTaskResponseRx) {
let (tun_task_tx, tun_task_rx) = tokio::sync::mpsc::channel(128);
(
TunTaskResponseTx(tun_task_tx),
TunTaskResponseRx(tun_task_rx),
)
}
+6 -1
View File
@@ -12,9 +12,14 @@ license.workspace = true
[dependencies]
base64 = { workspace = true }
bytes = "1.5.0"
dashmap = { workspace = true }
ip_network = "0.4.1"
ip_network_table = "0.2.0"
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync", "time"] }
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
@@ -45,4 +50,4 @@ nym-crypto = { path = "../crypto", features = ["rand"]}
default = ["verify"]
openapi = ["utoipa", "serde_json"]
# this is moved to a separate feature as we really need clients to import it (especially, *cough*, wasm)
verify = ["hmac", "sha2"]
verify = ["hmac", "sha2"]
+1
View File
@@ -4,6 +4,7 @@
pub mod error;
pub mod public_key;
pub mod registration;
pub mod tun_common;
pub use error::Error;
pub use public_key::PeerPublicKey;
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, time::Duration};
use boringtun::x25519;
use dashmap::{
@@ -7,26 +7,47 @@ use dashmap::{
};
use tokio::sync::mpsc::{self};
use crate::event::Event;
use crate::tun_common::{event::Event, network_table::NetworkTable};
// Registered peers
pub type PeersByIp = NetworkTable<PeerEventSender>;
// Channels that are used to communicate with the various tunnels
#[derive(Clone)]
pub struct PeerEventSender(mpsc::Sender<Event>);
pub(crate) struct PeerEventReceiver(mpsc::Receiver<Event>);
pub struct PeerEventReceiver(mpsc::Receiver<Event>);
#[derive(thiserror::Error, Debug)]
pub enum PeerEventSenderError {
#[error("send failed: timeout: {source}")]
SendTimeoutError {
#[from]
source: mpsc::error::SendTimeoutError<Event>,
},
#[error("send failed: {source}")]
SendError {
#[from]
source: mpsc::error::SendError<Event>,
},
}
impl PeerEventSender {
pub(crate) async fn send(&self, event: Event) -> Result<(), mpsc::error::SendError<Event>> {
self.0.send(event).await
pub async fn send(&self, event: Event) -> Result<(), PeerEventSenderError> {
Ok(self
.0
.send_timeout(event, Duration::from_millis(1000))
.await?)
}
}
impl PeerEventReceiver {
pub(crate) async fn recv(&mut self) -> Option<Event> {
pub async fn recv(&mut self) -> Option<Event> {
self.0.recv().await
}
}
pub(crate) fn peer_event_channel() -> (PeerEventSender, PeerEventReceiver) {
pub fn peer_event_channel() -> (PeerEventSender, PeerEventReceiver) {
let (tx, rx) = mpsc::channel(16);
(PeerEventSender(tx), PeerEventReceiver(rx))
}
@@ -35,20 +56,20 @@ pub(crate) type PeersByKey = DashMap<x25519::PublicKey, PeerEventSender>;
pub(crate) type PeersByAddr = DashMap<SocketAddr, PeerEventSender>;
#[derive(Default)]
pub(crate) struct ActivePeers {
pub struct ActivePeers {
active_peers: PeersByKey,
active_peers_by_addr: PeersByAddr,
}
impl ActivePeers {
pub(crate) fn remove(&self, public_key: &x25519::PublicKey) {
pub fn remove(&self, public_key: &x25519::PublicKey) {
log::info!("Removing peer: {public_key:?}");
self.active_peers.remove(public_key);
log::warn!("TODO: remove from peers_by_ip?");
log::warn!("TODO: remove from peers_by_addr");
}
pub(crate) fn insert(
pub fn insert(
&self,
public_key: x25519::PublicKey,
addr: SocketAddr,
@@ -58,17 +79,14 @@ impl ActivePeers {
self.active_peers_by_addr.insert(addr, peer_tx);
}
pub(crate) fn get_by_key_mut(
pub fn get_by_key_mut(
&self,
public_key: &x25519::PublicKey,
) -> Option<RefMut<'_, x25519::PublicKey, PeerEventSender>> {
self.active_peers.get_mut(public_key)
}
pub(crate) fn get_by_addr(
&self,
addr: &SocketAddr,
) -> Option<Ref<'_, SocketAddr, PeerEventSender>> {
pub fn get_by_addr(&self, addr: &SocketAddr) -> Option<Ref<'_, SocketAddr, PeerEventSender>> {
self.active_peers_by_addr.get(addr)
}
}
@@ -0,0 +1,3 @@
pub mod active_peers;
pub mod event;
pub mod network_table;
@@ -9,7 +9,7 @@ pub struct NetworkTable<T> {
}
impl<T> NetworkTable<T> {
pub(crate) fn new() -> Self {
pub fn new() -> Self {
Self {
ips: IpNetworkTable::new(),
}
+3
View File
@@ -13,6 +13,7 @@ license.workspace = true
[dependencies]
async-recursion = "1.0.4"
base64 = "0.21.3"
bincode = "1.3.3"
# The latest version on crates.io at the time of writing this (6.0.0) has a
# version mismatch with x25519-dalek/curve25519-dalek that is resolved in the
# latest commit. So pick that for now.
@@ -27,6 +28,8 @@ ip_network_table = "0.2.0"
log.workspace = true
nym-task = { path = "../task" }
nym-wireguard-types = { path = "../wireguard-types" }
nym-sphinx = { path = "../nymsphinx" }
nym-tun = { path = "../tun" , features = ["wireguard"] }
rand.workspace = true
serde = { workspace = true, features = ["derive"] }
tap.workspace = true
+12 -8
View File
@@ -3,15 +3,15 @@
// #![warn(clippy::expect_used)]
// #![warn(clippy::unwrap_used)]
mod active_peers;
// mod active_peers;
mod error;
mod event;
mod network_table;
// mod event;
// mod network_table;
mod packet_relayer;
mod platform;
// mod platform;
mod registered_peers;
mod setup;
pub mod tun_task_channel;
pub mod setup;
// pub mod tun_task_channel;
mod udp_listener;
mod wg_tunnel;
@@ -20,7 +20,9 @@ use std::sync::Arc;
// Currently the module related to setting up the virtual network device is platform specific.
#[cfg(target_os = "linux")]
pub use platform::linux::tun_device;
use nym_tun::tun_device;
use nym_tun::tun_task_channel;
/// Start wireguard UDP listener and TUN device
///
@@ -36,7 +38,9 @@ pub async fn start_wireguard(
// We can optionally index peers by their IP like standard wireguard. If we don't then we do
// plain NAT where we match incoming destination IP with outgoing source IP.
let peers_by_ip = Arc::new(tokio::sync::Mutex::new(network_table::NetworkTable::new()));
use nym_wireguard_types::tun_common::network_table::NetworkTable;
let peers_by_ip = Arc::new(tokio::sync::Mutex::new(NetworkTable::new()));
// Alternative 1:
let routing_mode = tun_device::RoutingMode::new_allowed_ips(peers_by_ip.clone());
+3 -5
View File
@@ -3,11 +3,9 @@ use std::{collections::HashMap, sync::Arc};
use tap::TapFallible;
use tokio::sync::mpsc::{self};
use crate::{
active_peers::PeerEventSender,
event::Event,
tun_task_channel::{TunTaskResponseRx, TunTaskTx},
};
use crate::tun_task_channel::{TunTaskResponseRx, TunTaskTx};
use nym_wireguard_types::tun_common::{active_peers::PeerEventSender, event::Event};
#[derive(Clone)]
pub struct PacketRelaySender(pub(crate) mpsc::Sender<(u64, Vec<u8>)>);
-54
View File
@@ -1,54 +0,0 @@
use tokio::sync::mpsc;
pub(crate) type TunTaskPayload = (u64, Vec<u8>);
#[derive(Clone)]
pub struct TunTaskTx(mpsc::Sender<TunTaskPayload>);
pub(crate) struct TunTaskRx(mpsc::Receiver<TunTaskPayload>);
impl TunTaskTx {
pub async fn send(
&self,
data: TunTaskPayload,
) -> Result<(), tokio::sync::mpsc::error::SendError<TunTaskPayload>> {
self.0.send(data).await
}
}
impl TunTaskRx {
pub(crate) async fn recv(&mut self) -> Option<TunTaskPayload> {
self.0.recv().await
}
}
pub(crate) fn tun_task_channel() -> (TunTaskTx, TunTaskRx) {
let (tun_task_tx, tun_task_rx) = tokio::sync::mpsc::channel(16);
(TunTaskTx(tun_task_tx), TunTaskRx(tun_task_rx))
}
// Send responses back from the tun device back to the PacketRelayer
pub(crate) struct TunTaskResponseTx(mpsc::Sender<TunTaskPayload>);
pub struct TunTaskResponseRx(mpsc::Receiver<TunTaskPayload>);
impl TunTaskResponseTx {
pub(crate) async fn send(
&self,
data: TunTaskPayload,
) -> Result<(), tokio::sync::mpsc::error::SendError<TunTaskPayload>> {
self.0.send(data).await
}
}
impl TunTaskResponseRx {
pub async fn recv(&mut self) -> Option<TunTaskPayload> {
self.0.recv().await
}
}
pub(crate) fn tun_task_response_channel() -> (TunTaskResponseTx, TunTaskResponseRx) {
let (tun_task_tx, tun_task_rx) = tokio::sync::mpsc::channel(16);
(
TunTaskResponseTx(tun_task_tx),
TunTaskResponseRx(tun_task_rx),
)
}
+8 -7
View File
@@ -7,15 +7,19 @@ use boringtun::{
use futures::StreamExt;
use log::error;
use nym_task::TaskClient;
use nym_wireguard_types::{registration::GatewayClientRegistry, PeerPublicKey, WG_PORT};
use nym_wireguard_types::{
registration::GatewayClientRegistry,
tun_common::{
active_peers::{ActivePeers, PeersByIp},
event::Event,
},
PeerPublicKey, WG_PORT,
};
use tap::TapFallible;
use tokio::{net::UdpSocket, sync::Mutex};
use crate::{
active_peers::{ActivePeers, PeerEventSender},
error::WgError,
event::Event,
network_table::NetworkTable,
packet_relayer::PacketRelaySender,
registered_peers::{RegisteredPeer, RegisteredPeers},
setup::{self, WG_ADDRESS},
@@ -24,9 +28,6 @@ use crate::{
const MAX_PACKET: usize = 65535;
// Registered peers
pub(crate) type PeersByIp = NetworkTable<PeerEventSender>;
async fn add_test_peer(registered_peers: &mut RegisteredPeers) {
let peer_static_public = PeerPublicKey::new(setup::peer_static_public_key());
let peer_index = 0;
+6 -8
View File
@@ -7,18 +7,16 @@ use boringtun::{
};
use bytes::Bytes;
use log::{debug, error, info, warn};
use nym_wireguard_types::tun_common::{
active_peers::{peer_event_channel, PeerEventReceiver, PeerEventSender},
event::Event,
network_table::NetworkTable,
};
use rand::RngCore;
use tap::TapFallible;
use tokio::{net::UdpSocket, sync::broadcast, time::timeout};
use crate::{
active_peers::{peer_event_channel, PeerEventReceiver, PeerEventSender},
error::WgError,
event::Event,
network_table::NetworkTable,
packet_relayer::PacketRelaySender,
registered_peers::PeerIdx,
};
use crate::{error::WgError, packet_relayer::PacketRelaySender, registered_peers::PeerIdx};
const HANDSHAKE_MAX_RATE: u64 = 10;
+4 -1
View File
@@ -76,7 +76,7 @@ nym-statistics-common = { path = "../common/statistics" }
nym-task = { path = "../common/task" }
nym-types = { path = "../common/types" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-wireguard = { path = "../common/wireguard" }
nym-wireguard = { path = "../common/wireguard", optional = true }
nym-ip-packet-router = { path = "../service-providers/ip-packet-router" }
[dev-dependencies]
@@ -92,3 +92,6 @@ sqlx = { version = "0.5", features = [
"macros",
"migrate",
] }
[features]
wireguard = ["nym-wireguard"]
+2 -2
View File
@@ -229,14 +229,14 @@ impl<'a> HttpApiBuilder<'a> {
IpAddr::from(Ipv4Addr::new(10, 1, 0, 0)),
self.gateway_config.wireguard.private_network_prefix,
)?;
let wg_state = self.client_registry.map(|client_registry| {
let wg_state = self.client_registry.and_then(|client_registry| {
WireguardAppState::new(
self.sphinx_keypair,
client_registry,
Default::default(),
self.gateway_config.wireguard.bind_address.port(),
wireguard_private_network,
)
.ok()
});
let router = nym_node::http::NymNodeRouter::new(config, wg_state);
+10 -12
View File
@@ -200,6 +200,7 @@ impl<St> Gateway<St> {
mixnet_handling::Listener::new(listening_address, shutdown).start(connection_handler);
}
#[cfg(feature = "wireguard")]
async fn start_wireguard(
&self,
shutdown: TaskClient,
@@ -519,18 +520,15 @@ impl<St> Gateway<St> {
Arc::new(coconut_verifier),
);
// TODO: later we'll make this a commandline flag
let wireguard_enabled = std::env::var("NYM_ENABLE_WIREGUARD")
.map(|v| v == "1")
.unwrap_or(false);
if wireguard_enabled {
if let Err(err) = self
.start_wireguard(shutdown.subscribe().named("wireguard"))
.await
{
// that's a nasty workaround, but anyhow errors are generally nicer, especially on exit
bail!("{err}")
}
// Once this is a bit more mature, make this a commandline flag instead of a compile time
// flag
#[cfg(feature = "wireguard")]
if let Err(err) = self
.start_wireguard(shutdown.subscribe().named("wireguard"))
.await
{
// that's a nasty workaround, but anyhow errors are generally nicer, especially on exit
bail!("{err}")
}
info!("Finished nym gateway startup procedure - it should now be able to receive mix and client traffic!");
+1
View File
@@ -50,6 +50,7 @@ nym-config = { path = "../common/config" }
nym-crypto = { path = "../common/crypto", features = ["asymmetric" ]}
nym-node-requests = { path = "nym-node-requests", default-features = false, features = ["openapi"]}
nym-task = { path = "../common/task" }
nym-wireguard = { path = "../common/wireguard" }
nym-wireguard-types = { path = "../common/wireguard-types", features = ["verify"] }
[dev-dependencies]
+6
View File
@@ -31,6 +31,12 @@ pub enum NymNodeError {
source: WireguardError,
},
#[error(transparent)]
KeyRecoveryError {
#[from]
source: nym_crypto::asymmetric::encryption::KeyRecoveryError,
},
#[error("unimplemented")]
Unimplemented,
}
@@ -31,10 +31,7 @@ async fn process_final_message(
}
};
if client
.verify(state.dh_keypair.private_key(), preshared_nonce)
.is_ok()
{
if client.verify(&state.private_key, preshared_nonce).is_ok() {
state.registration_in_progress.remove(&client.pub_key());
state.client_registry.insert(client.pub_key(), client);
@@ -104,7 +101,7 @@ pub(crate) async fn register_client(
// mark it as used, even though it's not final
*private_ip_ref = false;
let gateway_data = GatewayClient::new(
state.dh_keypair.private_key(),
&state.private_key,
remote_public,
*private_ip_ref.key(),
nonce,
@@ -8,8 +8,9 @@ use crate::wireguard::types::{GatewayClientRegistry, PendingRegistrations};
use axum::routing::{get, post};
use axum::Router;
use ipnetwork::IpNetwork;
use nym_crypto::asymmetric::encryption;
use nym_crypto::asymmetric::encryption::PrivateKey;
use nym_node_requests::routes::api::v1::gateway::client_interfaces::wireguard;
use nym_wireguard::setup;
use nym_wireguard_types::registration::PrivateIPs;
use std::sync::Arc;
@@ -24,15 +25,16 @@ pub struct WireguardAppState {
impl WireguardAppState {
pub fn new(
dh_keypair: Arc<encryption::KeyPair>,
client_registry: Arc<GatewayClientRegistry>,
registration_in_progress: Arc<PendingRegistrations>,
binding_port: u16,
private_ip_network: IpNetwork,
) -> Self {
WireguardAppState {
) -> Result<Self, crate::error::NymNodeError> {
Ok(WireguardAppState {
inner: Some(WireguardAppStateInner {
dh_keypair,
private_key: Arc::new(PrivateKey::from_bytes(
setup::server_static_private_key().as_ref(),
)?),
client_registry,
registration_in_progress,
binding_port,
@@ -40,7 +42,7 @@ impl WireguardAppState {
private_ip_network.iter().map(|ip| (ip, true)).collect(),
),
}),
}
})
}
// #[allow(dead_code)]
@@ -79,7 +81,7 @@ macro_rules! get_state {
#[derive(Clone)]
pub(crate) struct WireguardAppStateInner {
dh_keypair: Arc<encryption::KeyPair>,
private_key: Arc<PrivateKey>,
client_registry: Arc<GatewayClientRegistry>,
registration_in_progress: Arc<PendingRegistrations>,
binding_port: u16,
@@ -112,6 +114,7 @@ mod test {
PeerPublicKey,
};
use nym_node_requests::routes::api::v1::gateway::client_interfaces::wireguard;
use nym_wireguard::setup::server_static_private_key;
use nym_wireguard_types::registration::HmacSha256;
use std::net::IpAddr;
use std::str::FromStr;
@@ -130,8 +133,15 @@ mod test {
// 6. Gateway verifies mac digest and nonce, and stores client's public key and socket address and port
let mut rng = rand::thread_rng();
let gateway_private_key =
encryption::PrivateKey::from_bytes(server_static_private_key().as_bytes()).unwrap();
let gateway_public_key = encryption::PublicKey::from(&gateway_private_key);
let gateway_key_pair = encryption::KeyPair::new(&mut rng);
let gateway_key_pair = encryption::KeyPair::from_bytes(
&gateway_private_key.to_bytes(),
&gateway_public_key.to_bytes(),
)
.unwrap();
let client_key_pair = encryption::KeyPair::new(&mut rng);
let gateway_static_public =
@@ -158,7 +168,7 @@ mod test {
let state = WireguardAppState {
inner: Some(WireguardAppStateInner {
client_registry: Arc::clone(&client_registry),
dh_keypair: Arc::new(gateway_key_pair),
private_key: Arc::new(gateway_private_key),
registration_in_progress: Arc::clone(&registration_in_progress),
binding_port: 8080,
free_private_network_ips,
@@ -9,6 +9,8 @@ edition.workspace = true
license.workspace = true
[dependencies]
bincode = "1.3.3"
bytes = "1.5.0"
etherparse = "0.13.0"
futures = { workspace = true }
log = { workspace = true }
@@ -16,11 +18,13 @@ nym-bin-common = { path = "../../common/bin-common" }
nym-client-core = { path = "../../common/client-core" }
nym-config = { path = "../../common/config" }
nym-exit-policy = { path = "../../common/exit-policy" }
nym-ip-packet-requests = { path = "../../common/ip-packet-requests" }
nym-network-requester = { path = "../network-requester" }
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
nym-service-providers-common = { path = "../common" }
nym-sphinx = { path = "../../common/nymsphinx" }
nym-task = { path = "../../common/task" }
nym-tun = { path = "../../common/tun" }
nym-wireguard = { path = "../../common/wireguard" }
nym-wireguard-types = { path = "../../common/wireguard-types" }
reqwest.workspace = true
@@ -30,6 +30,9 @@ pub enum IpPacketRouterError {
#[error("the entity wrapping the network requester has disconnected")]
DisconnectedParent,
#[error("failed to deserialize tagged packet: {source}")]
FailedToDeserializeTaggedPacket { source: bincode::Error },
#[error("failed to parse incoming packet: {source}")]
PacketParseFailed { source: etherparse::ReadError },
@@ -39,6 +42,11 @@ pub enum IpPacketRouterError {
#[error("parsed packet is missing transport header")]
PacketMissingTransportHeader,
#[error("failed to send packet to tun device: {source}")]
FailedToSendPacketToTun {
source: tokio::sync::mpsc::error::TrySendError<(u64, Vec<u8>)>,
},
#[error("the provided socket address, '{addr}' is not covered by the exit policy!")]
AddressNotCoveredByExitPolicy { addr: SocketAddr },
+25 -30
View File
@@ -18,7 +18,6 @@ use nym_sdk::{
use nym_sphinx::receiver::ReconstructedMessage;
use nym_task::{connections::TransmissionLane, TaskClient, TaskHandle};
use request_filter::RequestFilter;
use tap::TapFallible;
use crate::config::BaseClientConfig;
@@ -140,13 +139,13 @@ impl IpPacketRouterBuilder {
let self_address = *mixnet_client.nym_address();
// Create the TUN device that we interact with the rest of the world with
let config = nym_wireguard::tun_device::TunDeviceConfig {
let config = nym_tun::tun_device::TunDeviceConfig {
base_name: TUN_BASE_NAME.to_string(),
ip: TUN_DEVICE_ADDRESS.parse().unwrap(),
netmask: TUN_DEVICE_NETMASK.parse().unwrap(),
};
let (tun, tun_task_tx, tun_task_response_rx) = nym_wireguard::tun_device::TunDevice::new(
nym_wireguard::tun_device::RoutingMode::new_nat(),
let (tun, tun_task_tx, tun_task_response_rx) = nym_tun::tun_device::TunDevice::new(
nym_tun::tun_device::RoutingMode::new_nat(),
config,
);
tun.start();
@@ -184,8 +183,8 @@ impl IpPacketRouterBuilder {
struct IpPacketRouter {
_config: Config,
request_filter: request_filter::RequestFilter,
tun_task_tx: nym_wireguard::tun_task_channel::TunTaskTx,
tun_task_response_rx: nym_wireguard::tun_task_channel::TunTaskResponseRx,
tun_task_tx: nym_tun::tun_task_channel::TunTaskTx,
tun_task_response_rx: nym_tun::tun_task_channel::TunTaskResponseRx,
mixnet_client: nym_sdk::mixnet::MixnetClient,
task_handle: TaskHandle,
}
@@ -202,7 +201,9 @@ impl IpPacketRouter {
},
msg = self.mixnet_client.next() => {
if let Some(msg) = msg {
self.on_message(msg).await.ok();
if let Err(err) = self.on_message(msg).await {
log::error!("Error handling mixnet message: {err}");
};
} else {
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
break;
@@ -225,13 +226,9 @@ impl IpPacketRouter {
let packet_type = None;
let input_message = InputMessage::new_regular(recipient, packet, lane, packet_type);
self.mixnet_client
.send(input_message)
.await
.tap_err(|err| {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
})
.ok();
if let Err(err) = self.mixnet_client.send(input_message).await {
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
};
} else {
log::error!("NYM_CLIENT_ADDR not set or invalid");
}
@@ -251,7 +248,13 @@ impl IpPacketRouter {
&mut self,
reconstructed: ReconstructedMessage,
) -> Result<(), IpPacketRouterError> {
log::info!("Received message: {:?}", reconstructed.sender_tag);
log::debug!(
"Received message with sender_tag: {:?}",
reconstructed.sender_tag
);
let tagged_packet = nym_ip_packet_requests::TaggedIpPacket::from_message(&reconstructed)
.map_err(|err| IpPacketRouterError::FailedToDeserializeTaggedPacket { source: err })?;
// We don't forward packets that we are not able to parse. BUT, there might be a good
// reason to still forward them.
@@ -265,7 +268,7 @@ impl IpPacketRouter {
src_addr,
dst_addr,
dst,
} = parse_packet(&reconstructed.message)?;
} = parse_packet(&tagged_packet.packet)?;
let dst_str = dst.map_or(dst_addr.to_string(), |dst| dst.to_string());
log::info!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
@@ -274,27 +277,19 @@ impl IpPacketRouter {
if let Some(dst) = dst {
if !self.request_filter.check_address(&dst).await {
log::warn!("Failed filter check: {dst}");
// TODO: send back a response here
// TODO: we could consider sending back a response here
return Err(IpPacketRouterError::AddressFailedFilterCheck { addr: dst });
}
} else {
// TODO: are we always allowing packets without port numbers?
log::warn!(
"Ignoring filter check for packet without port number! (TODO: is this correct?)"
);
// TODO: we should also filter packets without port number
log::warn!("Ignoring filter check for packet without port number! TODO!");
}
// TODO: set the tag correctly. Can we just reuse sender_tag?
let peer_tag = 0;
self.tun_task_tx
.send((peer_tag, reconstructed.message))
.await
.tap_err(|err| {
log::error!("Failed to send packet to tun device: {err}");
})
.ok();
.try_send((peer_tag, tagged_packet.packet.into()))
.map_err(|err| IpPacketRouterError::FailedToSendPacketToTun { source: err })?;
Ok(())
}
@@ -316,7 +311,7 @@ fn parse_packet(packet: &[u8]) -> Result<ParsedPacket, IpPacketRouterError> {
let (packet_type, dst_port) = match headers.transport {
Some(etherparse::TransportSlice::Udp(header)) => ("ipv4", Some(header.destination_port())),
Some(etherparse::TransportSlice::Tcp(header)) => ("ipv6", Some(header.destination_port())),
Some(etherparse::TransportSlice::Icmpv4(_)) => ("icpv4", None),
Some(etherparse::TransportSlice::Icmpv4(_)) => ("icmpv4", None),
Some(etherparse::TransportSlice::Icmpv6(_)) => ("icmpv6", None),
Some(etherparse::TransportSlice::Unknown(_)) => ("unknown", None),
None => {