Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 161a8a7d4d | |||
| 25aa04686e | |||
| 9b6355b256 | |||
| 65272d7bf6 | |||
| 810dce5ee8 | |||
| 58ec878256 | |||
| a5c1e4abf0 | |||
| 3a1003c564 | |||
| 1cdd8f6c08 |
@@ -75,28 +75,29 @@ jobs:
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: build
|
||||
args: --workspace
|
||||
# Enable wireguard by default on linux only
|
||||
args: --workspace --features wireguard
|
||||
|
||||
- name: Build all examples
|
||||
if: matrix.os == 'custom-linux'
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: build
|
||||
args: --workspace --examples
|
||||
args: --workspace --examples --features wireguard
|
||||
|
||||
- name: Run all tests
|
||||
if: matrix.os == 'custom-linux'
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: --workspace
|
||||
args: --workspace --features wireguard
|
||||
|
||||
- name: Run expensive tests
|
||||
if: (github.ref == 'refs/heads/develop' || github.event.pull_request.base.ref == 'develop' || github.event.pull_request.base.ref == 'master') && matrix.os == 'custom-linux'
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: --workspace -- --ignored
|
||||
args: --workspace --features wireguard -- --ignored
|
||||
|
||||
- name: Annotate with clippy checks
|
||||
if: matrix.os == 'custom-linux'
|
||||
@@ -104,10 +105,10 @@ jobs:
|
||||
continue-on-error: true
|
||||
with:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
args: --workspace
|
||||
args: --workspace --features wireguard
|
||||
|
||||
- name: Clippy
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: clippy
|
||||
args: --workspace --all-targets -- -D warnings
|
||||
args: --workspace --all-targets --features wireguard -- -D warnings
|
||||
|
||||
Generated
+37
@@ -6657,10 +6657,23 @@ dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-ip-packet-requests"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"bytes",
|
||||
"nym-service-providers-common",
|
||||
"nym-sphinx",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-ip-packet-router"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"bytes",
|
||||
"etherparse",
|
||||
"futures",
|
||||
"log",
|
||||
@@ -6668,11 +6681,13 @@ dependencies = [
|
||||
"nym-client-core",
|
||||
"nym-config",
|
||||
"nym-exit-policy",
|
||||
"nym-ip-packet-requests",
|
||||
"nym-network-requester",
|
||||
"nym-sdk",
|
||||
"nym-service-providers-common",
|
||||
"nym-sphinx",
|
||||
"nym-task",
|
||||
"nym-tun",
|
||||
"nym-wireguard",
|
||||
"nym-wireguard-types",
|
||||
"reqwest",
|
||||
@@ -6920,6 +6935,7 @@ dependencies = [
|
||||
"nym-crypto",
|
||||
"nym-node-requests",
|
||||
"nym-task",
|
||||
"nym-wireguard",
|
||||
"nym-wireguard-types",
|
||||
"rand 0.7.3",
|
||||
"serde",
|
||||
@@ -7462,6 +7478,19 @@ dependencies = [
|
||||
"wasm-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-tun"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"boringtun",
|
||||
"etherparse",
|
||||
"log",
|
||||
"nym-wireguard-types",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-tun",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-types"
|
||||
version = "1.0.0"
|
||||
@@ -7581,6 +7610,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-recursion",
|
||||
"base64 0.21.4",
|
||||
"bincode",
|
||||
"boringtun",
|
||||
"bytes",
|
||||
"dashmap",
|
||||
@@ -7589,7 +7619,9 @@ dependencies = [
|
||||
"ip_network",
|
||||
"ip_network_table",
|
||||
"log",
|
||||
"nym-sphinx",
|
||||
"nym-task",
|
||||
"nym-tun",
|
||||
"nym-wireguard-types",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
@@ -7605,14 +7637,19 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"base64 0.21.4",
|
||||
"boringtun",
|
||||
"bytes",
|
||||
"dashmap",
|
||||
"hmac 0.12.1",
|
||||
"ip_network",
|
||||
"ip_network_table",
|
||||
"log",
|
||||
"nym-crypto",
|
||||
"rand 0.7.3",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.10.8",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"utoipa",
|
||||
"x25519-dalek 2.0.0",
|
||||
]
|
||||
|
||||
@@ -49,6 +49,7 @@ members = [
|
||||
"common/exit-policy",
|
||||
"common/http-api-client",
|
||||
"common/inclusion-probability",
|
||||
"common/ip-packet-requests",
|
||||
"common/ledger",
|
||||
"common/mixnode-common",
|
||||
"common/network-defaults",
|
||||
@@ -74,6 +75,7 @@ members = [
|
||||
"common/store-cipher",
|
||||
"common/task",
|
||||
"common/topology",
|
||||
"common/tun",
|
||||
"common/types",
|
||||
"common/wasm/client-core",
|
||||
"common/wasm/storage",
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "nym-ip-packet-requests"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
bincode = "1.3.3"
|
||||
bytes = "1.5.0"
|
||||
nym-service-providers-common = { path = "../../service-providers/common" }
|
||||
nym-sphinx = { path = "../nymsphinx" }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
@@ -0,0 +1,33 @@
|
||||
use nym_service_providers_common::interface;
|
||||
|
||||
pub type IpPacketRouterRequest = interface::Request<TaggedIpPacket>;
|
||||
pub type IpPacketRouterResponse = interface::Response<IpPacket>;
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
pub struct TaggedIpPacket {
|
||||
pub packet: bytes::Bytes,
|
||||
pub return_address: nym_sphinx::addressing::clients::Recipient,
|
||||
pub return_mix_hops: Option<u8>,
|
||||
pub return_mix_delays: Option<f64>,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
pub struct IpPacket {
|
||||
pub packet: bytes::Bytes,
|
||||
}
|
||||
|
||||
impl TaggedIpPacket {
|
||||
pub fn from_message(
|
||||
message: &nym_sphinx::receiver::ReconstructedMessage,
|
||||
) -> Result<Self, bincode::Error> {
|
||||
use bincode::Options;
|
||||
make_bincode_serializer().deserialize(&message.message)
|
||||
}
|
||||
}
|
||||
|
||||
fn make_bincode_serializer() -> impl bincode::Options {
|
||||
use bincode::Options;
|
||||
bincode::DefaultOptions::new()
|
||||
.with_big_endian()
|
||||
.with_varint_encoding()
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
[package]
|
||||
name = "nym-tun"
|
||||
version = "0.1.0"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
documentation.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util", "time", "sync", "macros"] }
|
||||
etherparse = "0.13.0"
|
||||
log.workspace = true
|
||||
# TODO: remove
|
||||
boringtun = { workspace = true }
|
||||
nym-wireguard-types = { path = "../wireguard-types", optional = true }
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
tokio-tun = "0.9.0"
|
||||
|
||||
[features]
|
||||
wireguard = ["nym-wireguard-types"]
|
||||
@@ -0,0 +1,7 @@
|
||||
#[cfg(target_os = "linux")]
|
||||
mod linux;
|
||||
|
||||
pub mod tun_task_channel;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub use linux::tun_device;
|
||||
+115
-89
@@ -1,22 +1,67 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use etherparse::{InternetSlice, SlicedPacket};
|
||||
use tap::TapFallible;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use crate::{
|
||||
event::Event,
|
||||
tun_task_channel::{
|
||||
tun_task_channel, tun_task_response_channel, TunTaskPayload, TunTaskResponseRx,
|
||||
TunTaskResponseTx, TunTaskRx, TunTaskTx,
|
||||
},
|
||||
udp_listener::PeersByIp,
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
time::timeout,
|
||||
};
|
||||
|
||||
use crate::tun_task_channel::{
|
||||
tun_task_channel, tun_task_response_channel, TunTaskPayload, TunTaskResponseRx,
|
||||
TunTaskResponseSendError, TunTaskResponseTx, TunTaskRx, TunTaskTx,
|
||||
};
|
||||
|
||||
#[cfg(feature = "wireguard")]
|
||||
use nym_wireguard_types::tun_common::{
|
||||
active_peers::{PeerEventSenderError, PeersByIp},
|
||||
event::Event,
|
||||
};
|
||||
|
||||
#[cfg(feature = "wireguard")]
|
||||
const MUTEX_LOCK_TIMEOUT_MS: u64 = 200;
|
||||
const TUN_WRITE_TIMEOUT_MS: u64 = 1000;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum TunDeviceError {
|
||||
#[error("timeout writing to tun device, dropping packet")]
|
||||
TunWriteTimeout,
|
||||
|
||||
#[error("error writing to tun device: {source}")]
|
||||
TunWriteError { source: std::io::Error },
|
||||
|
||||
#[cfg(feature = "wireguard")]
|
||||
#[error("failed forwarding packet to peer: {source}")]
|
||||
ForwardToPeerFailed {
|
||||
#[from]
|
||||
source: PeerEventSenderError,
|
||||
},
|
||||
|
||||
#[error("failed to forward responding packet with tag: {source}")]
|
||||
ForwardNatResponseFailed {
|
||||
#[from]
|
||||
source: TunTaskResponseSendError,
|
||||
},
|
||||
|
||||
#[error("unable to parse destination address from packet")]
|
||||
UnableToParseDstAdddress,
|
||||
|
||||
#[error("unable to parse source address from packet")]
|
||||
UnableToParseSrcAddress {
|
||||
#[from]
|
||||
source: etherparse::ReadError,
|
||||
},
|
||||
|
||||
#[error("unable to parse source address from packet: ip header missing")]
|
||||
UnableToParseSrcAddressIpHeaderMissing,
|
||||
|
||||
#[error("unable to lock peer mutex")]
|
||||
FailedToLockPeer,
|
||||
}
|
||||
|
||||
fn setup_tokio_tun_device(name: &str, address: Ipv4Addr, netmask: Ipv4Addr) -> tokio_tun::Tun {
|
||||
log::info!("Creating TUN device with: address={address}, netmask={netmask}");
|
||||
// Read MTU size from env variable NYM_MTU_SIZE, else default to 1420.
|
||||
@@ -51,6 +96,7 @@ pub struct TunDevice {
|
||||
|
||||
pub enum RoutingMode {
|
||||
// The routing table, as how wireguard does it
|
||||
#[cfg(feature = "wireguard")]
|
||||
AllowedIps(AllowedIpsInner),
|
||||
|
||||
// This is an alternative to the routing table, where we just match outgoing source IP with
|
||||
@@ -65,13 +111,27 @@ impl RoutingMode {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new_allowed_ips(peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>) -> Self {
|
||||
#[cfg(feature = "wireguard")]
|
||||
pub fn new_allowed_ips(peers_by_ip: std::sync::Arc<tokio::sync::Mutex<PeersByIp>>) -> Self {
|
||||
RoutingMode::AllowedIps(AllowedIpsInner { peers_by_ip })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "wireguard")]
|
||||
pub struct AllowedIpsInner {
|
||||
peers_by_ip: Arc<tokio::sync::Mutex<PeersByIp>>,
|
||||
peers_by_ip: std::sync::Arc<tokio::sync::Mutex<PeersByIp>>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "wireguard")]
|
||||
impl AllowedIpsInner {
|
||||
async fn lock(&self) -> Result<tokio::sync::MutexGuard<PeersByIp>, TunDeviceError> {
|
||||
timeout(
|
||||
Duration::from_millis(MUTEX_LOCK_TIMEOUT_MS),
|
||||
self.peers_by_ip.as_ref().lock(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| TunDeviceError::FailedToLockPeer)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NatInner {
|
||||
@@ -114,48 +174,38 @@ impl TunDevice {
|
||||
}
|
||||
|
||||
// Send outbound packets out on the wild internet
|
||||
async fn handle_tun_write(&mut self, data: TunTaskPayload) {
|
||||
async fn handle_tun_write(&mut self, data: TunTaskPayload) -> Result<(), TunDeviceError> {
|
||||
let (tag, packet) = data;
|
||||
let Some(dst_addr) = boringtun::noise::Tunn::dst_address(&packet) else {
|
||||
log::error!("Unable to parse dst_address in packet that was supposed to be written to tun device");
|
||||
return;
|
||||
};
|
||||
let Some(src_addr) = parse_src_address(&packet) else {
|
||||
log::error!("Unable to parse src_address in packet that was supposed to be written to tun device");
|
||||
return;
|
||||
};
|
||||
log::info!(
|
||||
let dst_addr = boringtun::noise::Tunn::dst_address(&packet)
|
||||
.ok_or_else(|| TunDeviceError::UnableToParseDstAdddress)?;
|
||||
|
||||
let src_addr = parse_src_address(&packet)?;
|
||||
log::debug!(
|
||||
"iface: write Packet({src_addr} -> {dst_addr}, {} bytes)",
|
||||
packet.len()
|
||||
);
|
||||
|
||||
// TODO: expire old entries
|
||||
#[allow(irrefutable_let_patterns)]
|
||||
if let RoutingMode::Nat(nat_table) = &mut self.routing_mode {
|
||||
nat_table.nat_table.insert(src_addr, tag);
|
||||
}
|
||||
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_millis(1000),
|
||||
timeout(
|
||||
Duration::from_millis(TUN_WRITE_TIMEOUT_MS),
|
||||
self.tun.write_all(&packet),
|
||||
)
|
||||
.await
|
||||
.tap_err(|err| {
|
||||
log::error!("iface: write error: {err}");
|
||||
})
|
||||
.ok();
|
||||
.map_err(|_| TunDeviceError::TunWriteTimeout)?
|
||||
.map_err(|err| TunDeviceError::TunWriteError { source: err })
|
||||
}
|
||||
|
||||
// Receive reponse packets from the wild internet
|
||||
async fn handle_tun_read(&self, packet: &[u8]) {
|
||||
let Some(dst_addr) = boringtun::noise::Tunn::dst_address(packet) else {
|
||||
log::error!("Unable to parse dst_address in packet that was read from tun device");
|
||||
return;
|
||||
};
|
||||
let Some(src_addr) = parse_src_address(packet) else {
|
||||
log::error!("Unable to parse src_address in packet that was read from tun device");
|
||||
return;
|
||||
};
|
||||
log::info!(
|
||||
async fn handle_tun_read(&self, packet: &[u8]) -> Result<(), TunDeviceError> {
|
||||
let dst_addr = boringtun::noise::Tunn::dst_address(packet)
|
||||
.ok_or(TunDeviceError::UnableToParseDstAdddress)?;
|
||||
let src_addr = parse_src_address(packet)?;
|
||||
log::debug!(
|
||||
"iface: read Packet({src_addr} -> {dst_addr}, {} bytes)",
|
||||
packet.len(),
|
||||
);
|
||||
@@ -164,47 +214,32 @@ impl TunDevice {
|
||||
|
||||
match self.routing_mode {
|
||||
// This is how wireguard does it, by consulting the AllowedIPs table.
|
||||
#[cfg(feature = "wireguard")]
|
||||
RoutingMode::AllowedIps(ref peers_by_ip) => {
|
||||
let Ok(peers) = tokio::time::timeout(
|
||||
std::time::Duration::from_millis(1000),
|
||||
peers_by_ip.peers_by_ip.as_ref().lock(),
|
||||
)
|
||||
.await
|
||||
else {
|
||||
log::error!("Failed to lock peer");
|
||||
return;
|
||||
};
|
||||
|
||||
let peers = peers_by_ip.lock().await?;
|
||||
if let Some(peer_tx) = peers.longest_match(dst_addr).map(|(_, tx)| tx) {
|
||||
log::info!("Forward packet to wg tunnel");
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_millis(1000),
|
||||
peer_tx.send(Event::Ip(packet.to_vec().into())),
|
||||
)
|
||||
.await
|
||||
.tap_err(|err| log::error!("Failed to forward packet to wg tunnel: {err}"))
|
||||
.ok();
|
||||
return;
|
||||
log::debug!("Forward packet to wg tunnel");
|
||||
return peer_tx
|
||||
.send(Event::Ip(packet.to_vec().into()))
|
||||
.await
|
||||
.map_err(|err| err.into());
|
||||
}
|
||||
}
|
||||
|
||||
// But we can also do it by consulting the NAT table.
|
||||
RoutingMode::Nat(ref nat_table) => {
|
||||
if let Some(tag) = nat_table.nat_table.get(&dst_addr) {
|
||||
log::info!("Forward packet with tag: {tag}");
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_millis(1000),
|
||||
self.tun_task_response_tx.send((*tag, packet.to_vec())),
|
||||
)
|
||||
.await
|
||||
.tap_err(|err| log::error!("Failed to foward packet with tag: {err}"))
|
||||
.ok();
|
||||
return;
|
||||
log::debug!("Forward packet with NAT tag: {tag}");
|
||||
return self
|
||||
.tun_task_response_tx
|
||||
.try_send((*tag, packet.to_vec()))
|
||||
.map_err(|err| err.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("No peer found, packet dropped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run(mut self) {
|
||||
@@ -216,13 +251,9 @@ impl TunDevice {
|
||||
len = self.tun.read(&mut buf) => match len {
|
||||
Ok(len) => {
|
||||
let packet = &buf[..len];
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_millis(1000),
|
||||
self.handle_tun_read(packet)
|
||||
)
|
||||
.await
|
||||
.tap_err(|_err| log::error!("Failed: handle_tun_read timeout"))
|
||||
.ok();
|
||||
if let Err(err) = self.handle_tun_read(packet).await {
|
||||
log::error!("iface: handle_tun_read failed: {err}")
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
log::info!("iface: read error: {err}");
|
||||
@@ -231,13 +262,9 @@ impl TunDevice {
|
||||
},
|
||||
// Writing to the TUN device
|
||||
Some(data) = self.tun_task_rx.recv() => {
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_millis(1000),
|
||||
self.handle_tun_write(data)
|
||||
)
|
||||
.await
|
||||
.tap_err(|_err| log::error!("Failed: handle_tun_write timeout"))
|
||||
.ok();
|
||||
if let Err(err) = self.handle_tun_write(data).await {
|
||||
log::error!("ifcae: handle_tun_write failed: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -249,12 +276,11 @@ impl TunDevice {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_src_address(packet: &[u8]) -> Option<IpAddr> {
|
||||
let headers = SlicedPacket::from_ip(packet)
|
||||
.tap_err(|err| log::error!("Unable to parse IP packet: {err:?}"))
|
||||
.ok()?;
|
||||
Some(match headers.ip? {
|
||||
InternetSlice::Ipv4(ip, _) => ip.source_addr().into(),
|
||||
InternetSlice::Ipv6(ip, _) => ip.source_addr().into(),
|
||||
})
|
||||
fn parse_src_address(packet: &[u8]) -> Result<IpAddr, TunDeviceError> {
|
||||
let headers = SlicedPacket::from_ip(packet)?;
|
||||
match headers.ip {
|
||||
Some(InternetSlice::Ipv4(ip, _)) => Ok(ip.source_addr().into()),
|
||||
Some(InternetSlice::Ipv6(ip, _)) => Ok(ip.source_addr().into()),
|
||||
None => Err(TunDeviceError::UnableToParseSrcAddressIpHeaderMissing),
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,82 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::mpsc::{
|
||||
self,
|
||||
error::{SendError, SendTimeoutError, TrySendError},
|
||||
};
|
||||
|
||||
pub(crate) type TunTaskPayload = (u64, Vec<u8>);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TunTaskTx(mpsc::Sender<TunTaskPayload>);
|
||||
pub(crate) struct TunTaskRx(mpsc::Receiver<TunTaskPayload>);
|
||||
|
||||
impl TunTaskTx {
|
||||
pub async fn send(&self, data: TunTaskPayload) -> Result<(), SendError<TunTaskPayload>> {
|
||||
self.0.send(data).await
|
||||
}
|
||||
|
||||
pub fn try_send(&self, data: TunTaskPayload) -> Result<(), TrySendError<TunTaskPayload>> {
|
||||
self.0.try_send(data)
|
||||
}
|
||||
}
|
||||
|
||||
impl TunTaskRx {
|
||||
pub(crate) async fn recv(&mut self) -> Option<TunTaskPayload> {
|
||||
self.0.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn tun_task_channel() -> (TunTaskTx, TunTaskRx) {
|
||||
let (tun_task_tx, tun_task_rx) = tokio::sync::mpsc::channel(128);
|
||||
(TunTaskTx(tun_task_tx), TunTaskRx(tun_task_rx))
|
||||
}
|
||||
|
||||
const TUN_TASK_RESPONSE_SEND_TIMEOUT_MS: u64 = 1_000;
|
||||
|
||||
// Send responses back from the tun device back to the PacketRelayer
|
||||
pub(crate) struct TunTaskResponseTx(mpsc::Sender<TunTaskPayload>);
|
||||
pub struct TunTaskResponseRx(mpsc::Receiver<TunTaskPayload>);
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum TunTaskResponseSendError {
|
||||
#[error("failed to send tun response: {0}")]
|
||||
SendTimeoutError(#[from] SendTimeoutError<TunTaskPayload>),
|
||||
|
||||
#[error("failed to send tun response: {0}")]
|
||||
SendError(#[from] SendError<TunTaskPayload>),
|
||||
|
||||
#[error("failed to send tun response: {0}")]
|
||||
TrySendError(#[from] TrySendError<TunTaskPayload>),
|
||||
}
|
||||
|
||||
impl TunTaskResponseTx {
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn send(&self, data: TunTaskPayload) -> Result<(), TunTaskResponseSendError> {
|
||||
Ok(self
|
||||
.0
|
||||
.send_timeout(
|
||||
data,
|
||||
Duration::from_millis(TUN_TASK_RESPONSE_SEND_TIMEOUT_MS),
|
||||
)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub(crate) fn try_send(&self, data: TunTaskPayload) -> Result<(), TunTaskResponseSendError> {
|
||||
Ok(self.0.try_send(data)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl TunTaskResponseRx {
|
||||
pub async fn recv(&mut self) -> Option<TunTaskPayload> {
|
||||
self.0.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn tun_task_response_channel() -> (TunTaskResponseTx, TunTaskResponseRx) {
|
||||
let (tun_task_tx, tun_task_rx) = tokio::sync::mpsc::channel(128);
|
||||
(
|
||||
TunTaskResponseTx(tun_task_tx),
|
||||
TunTaskResponseRx(tun_task_rx),
|
||||
)
|
||||
}
|
||||
@@ -12,9 +12,14 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
base64 = { workspace = true }
|
||||
bytes = "1.5.0"
|
||||
dashmap = { workspace = true }
|
||||
ip_network = "0.4.1"
|
||||
ip_network_table = "0.2.0"
|
||||
log = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync", "time"] }
|
||||
|
||||
nym-crypto = { path = "../crypto", features = ["asymmetric"] }
|
||||
|
||||
@@ -45,4 +50,4 @@ nym-crypto = { path = "../crypto", features = ["rand"]}
|
||||
default = ["verify"]
|
||||
openapi = ["utoipa", "serde_json"]
|
||||
# this is moved to a separate feature as we really need clients to import it (especially, *cough*, wasm)
|
||||
verify = ["hmac", "sha2"]
|
||||
verify = ["hmac", "sha2"]
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
pub mod error;
|
||||
pub mod public_key;
|
||||
pub mod registration;
|
||||
pub mod tun_common;
|
||||
|
||||
pub use error::Error;
|
||||
pub use public_key::PeerPublicKey;
|
||||
|
||||
+33
-15
@@ -1,4 +1,4 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::{net::SocketAddr, time::Duration};
|
||||
|
||||
use boringtun::x25519;
|
||||
use dashmap::{
|
||||
@@ -7,26 +7,47 @@ use dashmap::{
|
||||
};
|
||||
use tokio::sync::mpsc::{self};
|
||||
|
||||
use crate::event::Event;
|
||||
use crate::tun_common::{event::Event, network_table::NetworkTable};
|
||||
|
||||
// Registered peers
|
||||
pub type PeersByIp = NetworkTable<PeerEventSender>;
|
||||
|
||||
// Channels that are used to communicate with the various tunnels
|
||||
#[derive(Clone)]
|
||||
pub struct PeerEventSender(mpsc::Sender<Event>);
|
||||
pub(crate) struct PeerEventReceiver(mpsc::Receiver<Event>);
|
||||
pub struct PeerEventReceiver(mpsc::Receiver<Event>);
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum PeerEventSenderError {
|
||||
#[error("send failed: timeout: {source}")]
|
||||
SendTimeoutError {
|
||||
#[from]
|
||||
source: mpsc::error::SendTimeoutError<Event>,
|
||||
},
|
||||
|
||||
#[error("send failed: {source}")]
|
||||
SendError {
|
||||
#[from]
|
||||
source: mpsc::error::SendError<Event>,
|
||||
},
|
||||
}
|
||||
|
||||
impl PeerEventSender {
|
||||
pub(crate) async fn send(&self, event: Event) -> Result<(), mpsc::error::SendError<Event>> {
|
||||
self.0.send(event).await
|
||||
pub async fn send(&self, event: Event) -> Result<(), PeerEventSenderError> {
|
||||
Ok(self
|
||||
.0
|
||||
.send_timeout(event, Duration::from_millis(1000))
|
||||
.await?)
|
||||
}
|
||||
}
|
||||
|
||||
impl PeerEventReceiver {
|
||||
pub(crate) async fn recv(&mut self) -> Option<Event> {
|
||||
pub async fn recv(&mut self) -> Option<Event> {
|
||||
self.0.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn peer_event_channel() -> (PeerEventSender, PeerEventReceiver) {
|
||||
pub fn peer_event_channel() -> (PeerEventSender, PeerEventReceiver) {
|
||||
let (tx, rx) = mpsc::channel(16);
|
||||
(PeerEventSender(tx), PeerEventReceiver(rx))
|
||||
}
|
||||
@@ -35,20 +56,20 @@ pub(crate) type PeersByKey = DashMap<x25519::PublicKey, PeerEventSender>;
|
||||
pub(crate) type PeersByAddr = DashMap<SocketAddr, PeerEventSender>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct ActivePeers {
|
||||
pub struct ActivePeers {
|
||||
active_peers: PeersByKey,
|
||||
active_peers_by_addr: PeersByAddr,
|
||||
}
|
||||
|
||||
impl ActivePeers {
|
||||
pub(crate) fn remove(&self, public_key: &x25519::PublicKey) {
|
||||
pub fn remove(&self, public_key: &x25519::PublicKey) {
|
||||
log::info!("Removing peer: {public_key:?}");
|
||||
self.active_peers.remove(public_key);
|
||||
log::warn!("TODO: remove from peers_by_ip?");
|
||||
log::warn!("TODO: remove from peers_by_addr");
|
||||
}
|
||||
|
||||
pub(crate) fn insert(
|
||||
pub fn insert(
|
||||
&self,
|
||||
public_key: x25519::PublicKey,
|
||||
addr: SocketAddr,
|
||||
@@ -58,17 +79,14 @@ impl ActivePeers {
|
||||
self.active_peers_by_addr.insert(addr, peer_tx);
|
||||
}
|
||||
|
||||
pub(crate) fn get_by_key_mut(
|
||||
pub fn get_by_key_mut(
|
||||
&self,
|
||||
public_key: &x25519::PublicKey,
|
||||
) -> Option<RefMut<'_, x25519::PublicKey, PeerEventSender>> {
|
||||
self.active_peers.get_mut(public_key)
|
||||
}
|
||||
|
||||
pub(crate) fn get_by_addr(
|
||||
&self,
|
||||
addr: &SocketAddr,
|
||||
) -> Option<Ref<'_, SocketAddr, PeerEventSender>> {
|
||||
pub fn get_by_addr(&self, addr: &SocketAddr) -> Option<Ref<'_, SocketAddr, PeerEventSender>> {
|
||||
self.active_peers_by_addr.get(addr)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
pub mod active_peers;
|
||||
pub mod event;
|
||||
pub mod network_table;
|
||||
+1
-1
@@ -9,7 +9,7 @@ pub struct NetworkTable<T> {
|
||||
}
|
||||
|
||||
impl<T> NetworkTable<T> {
|
||||
pub(crate) fn new() -> Self {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
ips: IpNetworkTable::new(),
|
||||
}
|
||||
@@ -13,6 +13,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
async-recursion = "1.0.4"
|
||||
base64 = "0.21.3"
|
||||
bincode = "1.3.3"
|
||||
# The latest version on crates.io at the time of writing this (6.0.0) has a
|
||||
# version mismatch with x25519-dalek/curve25519-dalek that is resolved in the
|
||||
# latest commit. So pick that for now.
|
||||
@@ -27,6 +28,8 @@ ip_network_table = "0.2.0"
|
||||
log.workspace = true
|
||||
nym-task = { path = "../task" }
|
||||
nym-wireguard-types = { path = "../wireguard-types" }
|
||||
nym-sphinx = { path = "../nymsphinx" }
|
||||
nym-tun = { path = "../tun" , features = ["wireguard"] }
|
||||
rand.workspace = true
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
tap.workspace = true
|
||||
|
||||
@@ -3,15 +3,15 @@
|
||||
// #![warn(clippy::expect_used)]
|
||||
// #![warn(clippy::unwrap_used)]
|
||||
|
||||
mod active_peers;
|
||||
// mod active_peers;
|
||||
mod error;
|
||||
mod event;
|
||||
mod network_table;
|
||||
// mod event;
|
||||
// mod network_table;
|
||||
mod packet_relayer;
|
||||
mod platform;
|
||||
// mod platform;
|
||||
mod registered_peers;
|
||||
mod setup;
|
||||
pub mod tun_task_channel;
|
||||
pub mod setup;
|
||||
// pub mod tun_task_channel;
|
||||
mod udp_listener;
|
||||
mod wg_tunnel;
|
||||
|
||||
@@ -20,7 +20,9 @@ use std::sync::Arc;
|
||||
|
||||
// Currently the module related to setting up the virtual network device is platform specific.
|
||||
#[cfg(target_os = "linux")]
|
||||
pub use platform::linux::tun_device;
|
||||
use nym_tun::tun_device;
|
||||
|
||||
use nym_tun::tun_task_channel;
|
||||
|
||||
/// Start wireguard UDP listener and TUN device
|
||||
///
|
||||
@@ -36,7 +38,9 @@ pub async fn start_wireguard(
|
||||
|
||||
// We can optionally index peers by their IP like standard wireguard. If we don't then we do
|
||||
// plain NAT where we match incoming destination IP with outgoing source IP.
|
||||
let peers_by_ip = Arc::new(tokio::sync::Mutex::new(network_table::NetworkTable::new()));
|
||||
|
||||
use nym_wireguard_types::tun_common::network_table::NetworkTable;
|
||||
let peers_by_ip = Arc::new(tokio::sync::Mutex::new(NetworkTable::new()));
|
||||
|
||||
// Alternative 1:
|
||||
let routing_mode = tun_device::RoutingMode::new_allowed_ips(peers_by_ip.clone());
|
||||
|
||||
@@ -3,11 +3,9 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use tap::TapFallible;
|
||||
use tokio::sync::mpsc::{self};
|
||||
|
||||
use crate::{
|
||||
active_peers::PeerEventSender,
|
||||
event::Event,
|
||||
tun_task_channel::{TunTaskResponseRx, TunTaskTx},
|
||||
};
|
||||
use crate::tun_task_channel::{TunTaskResponseRx, TunTaskTx};
|
||||
|
||||
use nym_wireguard_types::tun_common::{active_peers::PeerEventSender, event::Event};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PacketRelaySender(pub(crate) mpsc::Sender<(u64, Vec<u8>)>);
|
||||
|
||||
@@ -1,54 +0,0 @@
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub(crate) type TunTaskPayload = (u64, Vec<u8>);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TunTaskTx(mpsc::Sender<TunTaskPayload>);
|
||||
pub(crate) struct TunTaskRx(mpsc::Receiver<TunTaskPayload>);
|
||||
|
||||
impl TunTaskTx {
|
||||
pub async fn send(
|
||||
&self,
|
||||
data: TunTaskPayload,
|
||||
) -> Result<(), tokio::sync::mpsc::error::SendError<TunTaskPayload>> {
|
||||
self.0.send(data).await
|
||||
}
|
||||
}
|
||||
|
||||
impl TunTaskRx {
|
||||
pub(crate) async fn recv(&mut self) -> Option<TunTaskPayload> {
|
||||
self.0.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn tun_task_channel() -> (TunTaskTx, TunTaskRx) {
|
||||
let (tun_task_tx, tun_task_rx) = tokio::sync::mpsc::channel(16);
|
||||
(TunTaskTx(tun_task_tx), TunTaskRx(tun_task_rx))
|
||||
}
|
||||
|
||||
// Send responses back from the tun device back to the PacketRelayer
|
||||
pub(crate) struct TunTaskResponseTx(mpsc::Sender<TunTaskPayload>);
|
||||
pub struct TunTaskResponseRx(mpsc::Receiver<TunTaskPayload>);
|
||||
|
||||
impl TunTaskResponseTx {
|
||||
pub(crate) async fn send(
|
||||
&self,
|
||||
data: TunTaskPayload,
|
||||
) -> Result<(), tokio::sync::mpsc::error::SendError<TunTaskPayload>> {
|
||||
self.0.send(data).await
|
||||
}
|
||||
}
|
||||
|
||||
impl TunTaskResponseRx {
|
||||
pub async fn recv(&mut self) -> Option<TunTaskPayload> {
|
||||
self.0.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn tun_task_response_channel() -> (TunTaskResponseTx, TunTaskResponseRx) {
|
||||
let (tun_task_tx, tun_task_rx) = tokio::sync::mpsc::channel(16);
|
||||
(
|
||||
TunTaskResponseTx(tun_task_tx),
|
||||
TunTaskResponseRx(tun_task_rx),
|
||||
)
|
||||
}
|
||||
@@ -7,15 +7,19 @@ use boringtun::{
|
||||
use futures::StreamExt;
|
||||
use log::error;
|
||||
use nym_task::TaskClient;
|
||||
use nym_wireguard_types::{registration::GatewayClientRegistry, PeerPublicKey, WG_PORT};
|
||||
use nym_wireguard_types::{
|
||||
registration::GatewayClientRegistry,
|
||||
tun_common::{
|
||||
active_peers::{ActivePeers, PeersByIp},
|
||||
event::Event,
|
||||
},
|
||||
PeerPublicKey, WG_PORT,
|
||||
};
|
||||
use tap::TapFallible;
|
||||
use tokio::{net::UdpSocket, sync::Mutex};
|
||||
|
||||
use crate::{
|
||||
active_peers::{ActivePeers, PeerEventSender},
|
||||
error::WgError,
|
||||
event::Event,
|
||||
network_table::NetworkTable,
|
||||
packet_relayer::PacketRelaySender,
|
||||
registered_peers::{RegisteredPeer, RegisteredPeers},
|
||||
setup::{self, WG_ADDRESS},
|
||||
@@ -24,9 +28,6 @@ use crate::{
|
||||
|
||||
const MAX_PACKET: usize = 65535;
|
||||
|
||||
// Registered peers
|
||||
pub(crate) type PeersByIp = NetworkTable<PeerEventSender>;
|
||||
|
||||
async fn add_test_peer(registered_peers: &mut RegisteredPeers) {
|
||||
let peer_static_public = PeerPublicKey::new(setup::peer_static_public_key());
|
||||
let peer_index = 0;
|
||||
|
||||
@@ -7,18 +7,16 @@ use boringtun::{
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use log::{debug, error, info, warn};
|
||||
use nym_wireguard_types::tun_common::{
|
||||
active_peers::{peer_event_channel, PeerEventReceiver, PeerEventSender},
|
||||
event::Event,
|
||||
network_table::NetworkTable,
|
||||
};
|
||||
use rand::RngCore;
|
||||
use tap::TapFallible;
|
||||
use tokio::{net::UdpSocket, sync::broadcast, time::timeout};
|
||||
|
||||
use crate::{
|
||||
active_peers::{peer_event_channel, PeerEventReceiver, PeerEventSender},
|
||||
error::WgError,
|
||||
event::Event,
|
||||
network_table::NetworkTable,
|
||||
packet_relayer::PacketRelaySender,
|
||||
registered_peers::PeerIdx,
|
||||
};
|
||||
use crate::{error::WgError, packet_relayer::PacketRelaySender, registered_peers::PeerIdx};
|
||||
|
||||
const HANDSHAKE_MAX_RATE: u64 = 10;
|
||||
|
||||
|
||||
+4
-1
@@ -76,7 +76,7 @@ nym-statistics-common = { path = "../common/statistics" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-types = { path = "../common/types" }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nym-wireguard = { path = "../common/wireguard" }
|
||||
nym-wireguard = { path = "../common/wireguard", optional = true }
|
||||
nym-ip-packet-router = { path = "../service-providers/ip-packet-router" }
|
||||
|
||||
[dev-dependencies]
|
||||
@@ -92,3 +92,6 @@ sqlx = { version = "0.5", features = [
|
||||
"macros",
|
||||
"migrate",
|
||||
] }
|
||||
|
||||
[features]
|
||||
wireguard = ["nym-wireguard"]
|
||||
|
||||
@@ -229,14 +229,14 @@ impl<'a> HttpApiBuilder<'a> {
|
||||
IpAddr::from(Ipv4Addr::new(10, 1, 0, 0)),
|
||||
self.gateway_config.wireguard.private_network_prefix,
|
||||
)?;
|
||||
let wg_state = self.client_registry.map(|client_registry| {
|
||||
let wg_state = self.client_registry.and_then(|client_registry| {
|
||||
WireguardAppState::new(
|
||||
self.sphinx_keypair,
|
||||
client_registry,
|
||||
Default::default(),
|
||||
self.gateway_config.wireguard.bind_address.port(),
|
||||
wireguard_private_network,
|
||||
)
|
||||
.ok()
|
||||
});
|
||||
|
||||
let router = nym_node::http::NymNodeRouter::new(config, wg_state);
|
||||
|
||||
+10
-12
@@ -200,6 +200,7 @@ impl<St> Gateway<St> {
|
||||
mixnet_handling::Listener::new(listening_address, shutdown).start(connection_handler);
|
||||
}
|
||||
|
||||
#[cfg(feature = "wireguard")]
|
||||
async fn start_wireguard(
|
||||
&self,
|
||||
shutdown: TaskClient,
|
||||
@@ -519,18 +520,15 @@ impl<St> Gateway<St> {
|
||||
Arc::new(coconut_verifier),
|
||||
);
|
||||
|
||||
// TODO: later we'll make this a commandline flag
|
||||
let wireguard_enabled = std::env::var("NYM_ENABLE_WIREGUARD")
|
||||
.map(|v| v == "1")
|
||||
.unwrap_or(false);
|
||||
if wireguard_enabled {
|
||||
if let Err(err) = self
|
||||
.start_wireguard(shutdown.subscribe().named("wireguard"))
|
||||
.await
|
||||
{
|
||||
// that's a nasty workaround, but anyhow errors are generally nicer, especially on exit
|
||||
bail!("{err}")
|
||||
}
|
||||
// Once this is a bit more mature, make this a commandline flag instead of a compile time
|
||||
// flag
|
||||
#[cfg(feature = "wireguard")]
|
||||
if let Err(err) = self
|
||||
.start_wireguard(shutdown.subscribe().named("wireguard"))
|
||||
.await
|
||||
{
|
||||
// that's a nasty workaround, but anyhow errors are generally nicer, especially on exit
|
||||
bail!("{err}")
|
||||
}
|
||||
|
||||
info!("Finished nym gateway startup procedure - it should now be able to receive mix and client traffic!");
|
||||
|
||||
@@ -50,6 +50,7 @@ nym-config = { path = "../common/config" }
|
||||
nym-crypto = { path = "../common/crypto", features = ["asymmetric" ]}
|
||||
nym-node-requests = { path = "nym-node-requests", default-features = false, features = ["openapi"]}
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-wireguard = { path = "../common/wireguard" }
|
||||
nym-wireguard-types = { path = "../common/wireguard-types", features = ["verify"] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@@ -31,6 +31,12 @@ pub enum NymNodeError {
|
||||
source: WireguardError,
|
||||
},
|
||||
|
||||
#[error(transparent)]
|
||||
KeyRecoveryError {
|
||||
#[from]
|
||||
source: nym_crypto::asymmetric::encryption::KeyRecoveryError,
|
||||
},
|
||||
|
||||
#[error("unimplemented")]
|
||||
Unimplemented,
|
||||
}
|
||||
|
||||
+2
-5
@@ -31,10 +31,7 @@ async fn process_final_message(
|
||||
}
|
||||
};
|
||||
|
||||
if client
|
||||
.verify(state.dh_keypair.private_key(), preshared_nonce)
|
||||
.is_ok()
|
||||
{
|
||||
if client.verify(&state.private_key, preshared_nonce).is_ok() {
|
||||
state.registration_in_progress.remove(&client.pub_key());
|
||||
state.client_registry.insert(client.pub_key(), client);
|
||||
|
||||
@@ -104,7 +101,7 @@ pub(crate) async fn register_client(
|
||||
// mark it as used, even though it's not final
|
||||
*private_ip_ref = false;
|
||||
let gateway_data = GatewayClient::new(
|
||||
state.dh_keypair.private_key(),
|
||||
&state.private_key,
|
||||
remote_public,
|
||||
*private_ip_ref.key(),
|
||||
nonce,
|
||||
|
||||
@@ -8,8 +8,9 @@ use crate::wireguard::types::{GatewayClientRegistry, PendingRegistrations};
|
||||
use axum::routing::{get, post};
|
||||
use axum::Router;
|
||||
use ipnetwork::IpNetwork;
|
||||
use nym_crypto::asymmetric::encryption;
|
||||
use nym_crypto::asymmetric::encryption::PrivateKey;
|
||||
use nym_node_requests::routes::api::v1::gateway::client_interfaces::wireguard;
|
||||
use nym_wireguard::setup;
|
||||
use nym_wireguard_types::registration::PrivateIPs;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -24,15 +25,16 @@ pub struct WireguardAppState {
|
||||
|
||||
impl WireguardAppState {
|
||||
pub fn new(
|
||||
dh_keypair: Arc<encryption::KeyPair>,
|
||||
client_registry: Arc<GatewayClientRegistry>,
|
||||
registration_in_progress: Arc<PendingRegistrations>,
|
||||
binding_port: u16,
|
||||
private_ip_network: IpNetwork,
|
||||
) -> Self {
|
||||
WireguardAppState {
|
||||
) -> Result<Self, crate::error::NymNodeError> {
|
||||
Ok(WireguardAppState {
|
||||
inner: Some(WireguardAppStateInner {
|
||||
dh_keypair,
|
||||
private_key: Arc::new(PrivateKey::from_bytes(
|
||||
setup::server_static_private_key().as_ref(),
|
||||
)?),
|
||||
client_registry,
|
||||
registration_in_progress,
|
||||
binding_port,
|
||||
@@ -40,7 +42,7 @@ impl WireguardAppState {
|
||||
private_ip_network.iter().map(|ip| (ip, true)).collect(),
|
||||
),
|
||||
}),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// #[allow(dead_code)]
|
||||
@@ -79,7 +81,7 @@ macro_rules! get_state {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct WireguardAppStateInner {
|
||||
dh_keypair: Arc<encryption::KeyPair>,
|
||||
private_key: Arc<PrivateKey>,
|
||||
client_registry: Arc<GatewayClientRegistry>,
|
||||
registration_in_progress: Arc<PendingRegistrations>,
|
||||
binding_port: u16,
|
||||
@@ -112,6 +114,7 @@ mod test {
|
||||
PeerPublicKey,
|
||||
};
|
||||
use nym_node_requests::routes::api::v1::gateway::client_interfaces::wireguard;
|
||||
use nym_wireguard::setup::server_static_private_key;
|
||||
use nym_wireguard_types::registration::HmacSha256;
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
@@ -130,8 +133,15 @@ mod test {
|
||||
// 6. Gateway verifies mac digest and nonce, and stores client's public key and socket address and port
|
||||
|
||||
let mut rng = rand::thread_rng();
|
||||
let gateway_private_key =
|
||||
encryption::PrivateKey::from_bytes(server_static_private_key().as_bytes()).unwrap();
|
||||
let gateway_public_key = encryption::PublicKey::from(&gateway_private_key);
|
||||
|
||||
let gateway_key_pair = encryption::KeyPair::new(&mut rng);
|
||||
let gateway_key_pair = encryption::KeyPair::from_bytes(
|
||||
&gateway_private_key.to_bytes(),
|
||||
&gateway_public_key.to_bytes(),
|
||||
)
|
||||
.unwrap();
|
||||
let client_key_pair = encryption::KeyPair::new(&mut rng);
|
||||
|
||||
let gateway_static_public =
|
||||
@@ -158,7 +168,7 @@ mod test {
|
||||
let state = WireguardAppState {
|
||||
inner: Some(WireguardAppStateInner {
|
||||
client_registry: Arc::clone(&client_registry),
|
||||
dh_keypair: Arc::new(gateway_key_pair),
|
||||
private_key: Arc::new(gateway_private_key),
|
||||
registration_in_progress: Arc::clone(®istration_in_progress),
|
||||
binding_port: 8080,
|
||||
free_private_network_ips,
|
||||
|
||||
@@ -9,6 +9,8 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
bincode = "1.3.3"
|
||||
bytes = "1.5.0"
|
||||
etherparse = "0.13.0"
|
||||
futures = { workspace = true }
|
||||
log = { workspace = true }
|
||||
@@ -16,11 +18,13 @@ nym-bin-common = { path = "../../common/bin-common" }
|
||||
nym-client-core = { path = "../../common/client-core" }
|
||||
nym-config = { path = "../../common/config" }
|
||||
nym-exit-policy = { path = "../../common/exit-policy" }
|
||||
nym-ip-packet-requests = { path = "../../common/ip-packet-requests" }
|
||||
nym-network-requester = { path = "../network-requester" }
|
||||
nym-sdk = { path = "../../sdk/rust/nym-sdk" }
|
||||
nym-service-providers-common = { path = "../common" }
|
||||
nym-sphinx = { path = "../../common/nymsphinx" }
|
||||
nym-task = { path = "../../common/task" }
|
||||
nym-tun = { path = "../../common/tun" }
|
||||
nym-wireguard = { path = "../../common/wireguard" }
|
||||
nym-wireguard-types = { path = "../../common/wireguard-types" }
|
||||
reqwest.workspace = true
|
||||
|
||||
@@ -30,6 +30,9 @@ pub enum IpPacketRouterError {
|
||||
#[error("the entity wrapping the network requester has disconnected")]
|
||||
DisconnectedParent,
|
||||
|
||||
#[error("failed to deserialize tagged packet: {source}")]
|
||||
FailedToDeserializeTaggedPacket { source: bincode::Error },
|
||||
|
||||
#[error("failed to parse incoming packet: {source}")]
|
||||
PacketParseFailed { source: etherparse::ReadError },
|
||||
|
||||
@@ -39,6 +42,11 @@ pub enum IpPacketRouterError {
|
||||
#[error("parsed packet is missing transport header")]
|
||||
PacketMissingTransportHeader,
|
||||
|
||||
#[error("failed to send packet to tun device: {source}")]
|
||||
FailedToSendPacketToTun {
|
||||
source: tokio::sync::mpsc::error::TrySendError<(u64, Vec<u8>)>,
|
||||
},
|
||||
|
||||
#[error("the provided socket address, '{addr}' is not covered by the exit policy!")]
|
||||
AddressNotCoveredByExitPolicy { addr: SocketAddr },
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ use nym_sdk::{
|
||||
use nym_sphinx::receiver::ReconstructedMessage;
|
||||
use nym_task::{connections::TransmissionLane, TaskClient, TaskHandle};
|
||||
use request_filter::RequestFilter;
|
||||
use tap::TapFallible;
|
||||
|
||||
use crate::config::BaseClientConfig;
|
||||
|
||||
@@ -140,13 +139,13 @@ impl IpPacketRouterBuilder {
|
||||
let self_address = *mixnet_client.nym_address();
|
||||
|
||||
// Create the TUN device that we interact with the rest of the world with
|
||||
let config = nym_wireguard::tun_device::TunDeviceConfig {
|
||||
let config = nym_tun::tun_device::TunDeviceConfig {
|
||||
base_name: TUN_BASE_NAME.to_string(),
|
||||
ip: TUN_DEVICE_ADDRESS.parse().unwrap(),
|
||||
netmask: TUN_DEVICE_NETMASK.parse().unwrap(),
|
||||
};
|
||||
let (tun, tun_task_tx, tun_task_response_rx) = nym_wireguard::tun_device::TunDevice::new(
|
||||
nym_wireguard::tun_device::RoutingMode::new_nat(),
|
||||
let (tun, tun_task_tx, tun_task_response_rx) = nym_tun::tun_device::TunDevice::new(
|
||||
nym_tun::tun_device::RoutingMode::new_nat(),
|
||||
config,
|
||||
);
|
||||
tun.start();
|
||||
@@ -184,8 +183,8 @@ impl IpPacketRouterBuilder {
|
||||
struct IpPacketRouter {
|
||||
_config: Config,
|
||||
request_filter: request_filter::RequestFilter,
|
||||
tun_task_tx: nym_wireguard::tun_task_channel::TunTaskTx,
|
||||
tun_task_response_rx: nym_wireguard::tun_task_channel::TunTaskResponseRx,
|
||||
tun_task_tx: nym_tun::tun_task_channel::TunTaskTx,
|
||||
tun_task_response_rx: nym_tun::tun_task_channel::TunTaskResponseRx,
|
||||
mixnet_client: nym_sdk::mixnet::MixnetClient,
|
||||
task_handle: TaskHandle,
|
||||
}
|
||||
@@ -202,7 +201,9 @@ impl IpPacketRouter {
|
||||
},
|
||||
msg = self.mixnet_client.next() => {
|
||||
if let Some(msg) = msg {
|
||||
self.on_message(msg).await.ok();
|
||||
if let Err(err) = self.on_message(msg).await {
|
||||
log::error!("Error handling mixnet message: {err}");
|
||||
};
|
||||
} else {
|
||||
log::trace!("IpPacketRouter [main loop]: stopping since channel closed");
|
||||
break;
|
||||
@@ -225,13 +226,9 @@ impl IpPacketRouter {
|
||||
let packet_type = None;
|
||||
let input_message = InputMessage::new_regular(recipient, packet, lane, packet_type);
|
||||
|
||||
self.mixnet_client
|
||||
.send(input_message)
|
||||
.await
|
||||
.tap_err(|err| {
|
||||
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
|
||||
})
|
||||
.ok();
|
||||
if let Err(err) = self.mixnet_client.send(input_message).await {
|
||||
log::error!("IpPacketRouter [main loop]: failed to send packet to mixnet: {err}");
|
||||
};
|
||||
} else {
|
||||
log::error!("NYM_CLIENT_ADDR not set or invalid");
|
||||
}
|
||||
@@ -251,7 +248,13 @@ impl IpPacketRouter {
|
||||
&mut self,
|
||||
reconstructed: ReconstructedMessage,
|
||||
) -> Result<(), IpPacketRouterError> {
|
||||
log::info!("Received message: {:?}", reconstructed.sender_tag);
|
||||
log::debug!(
|
||||
"Received message with sender_tag: {:?}",
|
||||
reconstructed.sender_tag
|
||||
);
|
||||
|
||||
let tagged_packet = nym_ip_packet_requests::TaggedIpPacket::from_message(&reconstructed)
|
||||
.map_err(|err| IpPacketRouterError::FailedToDeserializeTaggedPacket { source: err })?;
|
||||
|
||||
// We don't forward packets that we are not able to parse. BUT, there might be a good
|
||||
// reason to still forward them.
|
||||
@@ -265,7 +268,7 @@ impl IpPacketRouter {
|
||||
src_addr,
|
||||
dst_addr,
|
||||
dst,
|
||||
} = parse_packet(&reconstructed.message)?;
|
||||
} = parse_packet(&tagged_packet.packet)?;
|
||||
|
||||
let dst_str = dst.map_or(dst_addr.to_string(), |dst| dst.to_string());
|
||||
log::info!("Received packet: {packet_type}: {src_addr} -> {dst_str}");
|
||||
@@ -274,27 +277,19 @@ impl IpPacketRouter {
|
||||
if let Some(dst) = dst {
|
||||
if !self.request_filter.check_address(&dst).await {
|
||||
log::warn!("Failed filter check: {dst}");
|
||||
|
||||
// TODO: send back a response here
|
||||
|
||||
// TODO: we could consider sending back a response here
|
||||
return Err(IpPacketRouterError::AddressFailedFilterCheck { addr: dst });
|
||||
}
|
||||
} else {
|
||||
// TODO: are we always allowing packets without port numbers?
|
||||
log::warn!(
|
||||
"Ignoring filter check for packet without port number! (TODO: is this correct?)"
|
||||
);
|
||||
// TODO: we should also filter packets without port number
|
||||
log::warn!("Ignoring filter check for packet without port number! TODO!");
|
||||
}
|
||||
|
||||
// TODO: set the tag correctly. Can we just reuse sender_tag?
|
||||
let peer_tag = 0;
|
||||
self.tun_task_tx
|
||||
.send((peer_tag, reconstructed.message))
|
||||
.await
|
||||
.tap_err(|err| {
|
||||
log::error!("Failed to send packet to tun device: {err}");
|
||||
})
|
||||
.ok();
|
||||
.try_send((peer_tag, tagged_packet.packet.into()))
|
||||
.map_err(|err| IpPacketRouterError::FailedToSendPacketToTun { source: err })?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -316,7 +311,7 @@ fn parse_packet(packet: &[u8]) -> Result<ParsedPacket, IpPacketRouterError> {
|
||||
let (packet_type, dst_port) = match headers.transport {
|
||||
Some(etherparse::TransportSlice::Udp(header)) => ("ipv4", Some(header.destination_port())),
|
||||
Some(etherparse::TransportSlice::Tcp(header)) => ("ipv6", Some(header.destination_port())),
|
||||
Some(etherparse::TransportSlice::Icmpv4(_)) => ("icpv4", None),
|
||||
Some(etherparse::TransportSlice::Icmpv4(_)) => ("icmpv4", None),
|
||||
Some(etherparse::TransportSlice::Icmpv6(_)) => ("icmpv6", None),
|
||||
Some(etherparse::TransportSlice::Unknown(_)) => ("unknown", None),
|
||||
None => {
|
||||
|
||||
Reference in New Issue
Block a user