Compare commits

...

1 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu 3aa2d859a2 Gateway ipv6 separation of peers 2024-11-13 16:57:23 +00:00
7 changed files with 115 additions and 61 deletions
Generated
+1
View File
@@ -6872,6 +6872,7 @@ dependencies = [
"nym-network-defaults",
"nym-task",
"nym-wireguard-types",
"serde",
"thiserror",
"tokio",
"tokio-stream",
+2 -1
View File
@@ -50,7 +50,8 @@ pub mod wireguard {
pub const WG_PORT: u16 = 51822;
// The interface used to route traffic
pub const WG_TUN_BASE_NAME: &str = "nymwg";
pub const WG_TUN_BASE_NAME_V4: &str = "nymwg";
pub const WG_TUN_BASE_NAME_V6: &str = "nymwgv6";
pub const WG_TUN_DEVICE_ADDRESS: &str = "10.1.0.1";
pub const WG_TUN_DEVICE_IP_ADDRESS_V4: Ipv4Addr = Ipv4Addr::new(10, 1, 0, 1);
pub const WG_TUN_DEVICE_NETMASK_V4: u8 = 16;
+3
View File
@@ -23,6 +23,9 @@ futures = { workspace = true }
x25519-dalek = { workspace = true }
ip_network = { workspace = true }
log.workspace = true
serde = { workspace = true, features = [
"derive",
] } # for config serialization/deserialization
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-stream = { workspace = true }
+27
View File
@@ -0,0 +1,27 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use defguard_wireguard_rs::{host::Peer, key::Key};
use serde::{Deserialize, Serialize};
#[derive(Default, Clone, Serialize, Deserialize)]
pub struct Hosts {
host_v4: defguard_wireguard_rs::host::Host,
host_v6: defguard_wireguard_rs::host::Host,
}
impl Hosts {
pub fn new(
host_v4: defguard_wireguard_rs::host::Host,
host_v6: defguard_wireguard_rs::host::Host,
) -> Self {
Self { host_v4, host_v6 }
}
pub fn get(&self, key: &Key) -> Option<&Peer> {
self.host_v4
.peers
.get(key)
.or_else(|| self.host_v6.peers.get(key))
}
}
+63 -38
View File
@@ -6,34 +6,44 @@
// #![warn(clippy::expect_used)]
// #![warn(clippy::unwrap_used)]
use defguard_wireguard_rs::WGApi;
#[cfg(target_os = "linux")]
use defguard_wireguard_rs::{host::Peer, key::Key, net::IpAddrMask};
use defguard_wireguard_rs::{WGApi, WireguardInterfaceApi};
use hosts::Hosts;
use nym_crypto::asymmetric::encryption::KeyPair;
#[cfg(target_os = "linux")]
use nym_network_defaults::constants::WG_TUN_BASE_NAME;
use nym_network_defaults::constants::{WG_TUN_BASE_NAME_V4, WG_TUN_BASE_NAME_V6};
use nym_wireguard_types::Config;
use peer_controller::PeerControlRequest;
use std::sync::Arc;
use tokio::sync::mpsc::{self, Receiver, Sender};
pub(crate) mod error;
pub mod hosts;
pub mod peer_controller;
pub mod peer_handle;
pub struct WgApiWrapper {
inner: WGApi,
api_v4: WGApi,
api_v6: WGApi,
}
impl WgApiWrapper {
pub fn new(wg_api: WGApi) -> Self {
WgApiWrapper { inner: wg_api }
pub fn new(api_v4: WGApi, api_v6: WGApi) -> Self {
WgApiWrapper { api_v4, api_v6 }
}
pub fn get_hosts(&self) -> Result<Hosts, Box<dyn std::error::Error + Send + Sync + 'static>> {
let host_v4 = self.api_v4.read_interface_data()?;
let host_v6 = self.api_v6.read_interface_data()?;
let hosts = Hosts::new(host_v4, host_v6);
Ok(hosts)
}
}
impl Drop for WgApiWrapper {
fn drop(&mut self) {
if let Err(e) = defguard_wireguard_rs::WireguardInterfaceApi::remove_interface(&self.inner)
if let Err(e) = defguard_wireguard_rs::WireguardInterfaceApi::remove_interface(&self.api_v4)
{
log::error!("Could not remove the wireguard interface: {:?}", e);
}
@@ -93,9 +103,13 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
use std::collections::HashMap;
use tokio::sync::RwLock;
let ifname = String::from(WG_TUN_BASE_NAME);
let wg_api = defguard_wireguard_rs::WGApi::new(ifname.clone(), false)?;
let ifname_v4 = String::from(WG_TUN_BASE_NAME_V4);
let ifname_v6 = String::from(WG_TUN_BASE_NAME_V6);
let wg_api_v4 = defguard_wireguard_rs::WGApi::new(ifname_v4.clone(), false)?;
let wg_api_v6 = defguard_wireguard_rs::WGApi::new(ifname_v6.clone(), false)?;
let mut peer_bandwidth_managers = HashMap::with_capacity(all_peers.len());
let mut peers_v4 = vec![];
let mut peers_v6 = vec![];
let peers = all_peers
.into_iter()
.map(Peer::try_from)
@@ -108,7 +122,10 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
peer
})
.collect::<Vec<_>>();
for peer in peers.iter() {
for peer in peers.into_iter() {
let Some(ip_mask) = peer.allowed_ips.first() else {
continue;
};
let bandwidth_manager =
PeerController::generate_bandwidth_manager(storage.clone(), &peer.public_key)
.await?
@@ -116,34 +133,35 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
// Update storage with *x_bytes set to 0, as in kernel peers we can't set those values
// so we need to restart counting. Hopefully the bandwidth was counted in available_bandwidth
storage
.insert_wireguard_peer(peer, bandwidth_manager.is_some())
.insert_wireguard_peer(&peer, bandwidth_manager.is_some())
.await?;
peer_bandwidth_managers.insert(peer.public_key.clone(), bandwidth_manager);
if ip_mask.ip.is_ipv4() {
peers_v4.push(peer);
} else {
peers_v6.push(peer);
}
}
wg_api.create_interface()?;
let interface_config = InterfaceConfiguration {
name: ifname.clone(),
wg_api_v4.create_interface()?;
wg_api_v6.create_interface()?;
wg_api_v4.configure_interface(&InterfaceConfiguration {
name: ifname_v4.clone(),
prvkey: BASE64_STANDARD.encode(wireguard_data.inner.keypair().private_key().to_bytes()),
address: wireguard_data.inner.config().private_ipv4.to_string(),
port: wireguard_data.inner.config().announced_port as u32,
peers,
peers: peers_v4,
mtu: None,
};
wg_api.configure_interface(&interface_config)?;
std::process::Command::new("ip")
.args([
"-6",
"addr",
"add",
&format!(
"{}/{}",
wireguard_data.inner.config().private_ipv6,
wireguard_data.inner.config().private_network_prefix_v6
),
"dev",
(&ifname),
])
.output()?;
})?;
wg_api_v6.configure_interface(&InterfaceConfiguration {
name: ifname_v6.clone(),
prvkey: BASE64_STANDARD.encode(wireguard_data.inner.keypair().private_key().to_bytes()),
address: wireguard_data.inner.config().private_ipv6.to_string(),
port: wireguard_data.inner.config().announced_port as u32,
peers: peers_v6,
mtu: None,
})?;
// Use a dummy peer to create routing rule for the entire network space
let mut catch_all_peer = Peer::new(Key::new([0; 32]));
@@ -155,18 +173,25 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
wireguard_data.inner.config().private_ipv6,
wireguard_data.inner.config().private_network_prefix_v6,
)?;
catch_all_peer.set_allowed_ips(vec![
IpAddrMask::new(network_v4.network_address(), network_v4.netmask()),
IpAddrMask::new(network_v6.network_address(), network_v6.netmask()),
]);
wg_api.configure_peer_routing(&[catch_all_peer])?;
let host = wg_api.read_interface_data()?;
let wg_api = std::sync::Arc::new(WgApiWrapper::new(wg_api));
catch_all_peer.set_allowed_ips(vec![IpAddrMask::new(
network_v4.network_address(),
network_v4.netmask(),
)]);
wg_api_v4.configure_peer_routing(&[catch_all_peer.clone()])?;
catch_all_peer.set_allowed_ips(vec![IpAddrMask::new(
network_v6.network_address(),
network_v6.netmask(),
)]);
wg_api_v6.configure_peer_routing(&[catch_all_peer])?;
let wg_api = std::sync::Arc::new(WgApiWrapper::new(wg_api_v4, wg_api_v6));
let hosts = wg_api.get_hosts()?;
let mut controller = PeerController::new(
storage,
wg_api.clone(),
host,
hosts,
peer_bandwidth_managers,
wireguard_data.inner.peer_tx.clone(),
wireguard_data.peer_rx,
+13 -16
View File
@@ -1,11 +1,7 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use defguard_wireguard_rs::{
host::{Host, Peer},
key::Key,
WireguardInterfaceApi,
};
use defguard_wireguard_rs::{host::Peer, key::Key, WireguardInterfaceApi};
use futures::channel::oneshot;
use nym_authenticator_requests::{
latest::registration::RemainingBandwidthData, v1::registration::BANDWIDTH_CAP_PER_DAY,
@@ -20,6 +16,7 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use crate::hosts::Hosts;
use crate::peer_handle::PeerHandle;
use crate::WgApiWrapper;
use crate::{error::Error, peer_handle::SharedBandwidthStorageManager};
@@ -68,7 +65,7 @@ pub struct PeerController<St: Storage + Clone + 'static> {
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
wg_api: Arc<WgApiWrapper>,
host_information: Arc<RwLock<Host>>,
hosts_information: Arc<RwLock<Hosts>>,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
timeout_check_interval: IntervalStream,
task_client: nym_task::TaskClient,
@@ -78,7 +75,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
pub fn new(
storage: St,
wg_api: Arc<WgApiWrapper>,
initial_host_information: Host,
initial_hosts_information: Hosts,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
@@ -87,12 +84,12 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
let timeout_check_interval = tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
);
let host_information = Arc::new(RwLock::new(initial_host_information));
let hosts_information = Arc::new(RwLock::new(initial_hosts_information));
for (public_key, bandwidth_storage_manager) in bw_storage_managers.iter() {
let mut handle = PeerHandle::new(
storage.clone(),
public_key.clone(),
host_information.clone(),
hosts_information.clone(),
bandwidth_storage_manager.clone(),
request_tx.clone(),
&task_client,
@@ -107,7 +104,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
PeerController {
storage,
wg_api,
host_information,
hosts_information,
bw_storage_managers,
request_tx,
request_rx,
@@ -122,7 +119,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
self.storage.insert_wireguard_peer(peer, false).await?;
}
let ret: Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> =
self.wg_api.inner.configure_peer(peer);
self.wg_api.api_v4.configure_peer(peer);
if client_id.is_none() && ret.is_err() {
// Try to revert the insertion in storage
if self
@@ -141,7 +138,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
pub async fn remove_peer(&mut self, key: &Key) -> Result<(), Error> {
self.storage.remove_wireguard_peer(&key.to_string()).await?;
self.bw_storage_managers.remove(key);
let ret = self.wg_api.inner.remove_peer(key);
let ret = self.wg_api.api_v4.remove_peer(key);
if ret.is_err() {
log::error!("Wireguard peer could not be removed from wireguard kernel module. Process should be restarted so that the interface is reset.");
}
@@ -187,7 +184,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
let mut handle = PeerHandle::new(
self.storage.clone(),
peer.public_key.clone(),
self.host_information.clone(),
self.hosts_information.clone(),
bandwidth_storage_manager.clone(),
self.request_tx.clone(),
&self.task_client,
@@ -226,7 +223,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
.available_bandwidth()
.await
} else {
let Some(peer) = self.host_information.read().await.peers.get(key).cloned() else {
let Some(peer) = self.hosts_information.read().await.get(key).cloned() else {
// host information not updated yet
return Ok(None);
};
@@ -242,11 +239,11 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
loop {
tokio::select! {
_ = self.timeout_check_interval.next() => {
let Ok(host) = self.wg_api.inner.read_interface_data() else {
let Ok(hosts) = self.wg_api.get_hosts() else {
log::error!("Can't read wireguard kernel data");
continue;
};
*self.host_information.write().await = host;
*self.hosts_information.write().await = hosts;
}
_ = self.task_client.recv() => {
log::trace!("PeerController handler: Received shutdown");
+6 -6
View File
@@ -2,9 +2,10 @@
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use crate::hosts::Hosts;
use crate::peer_controller::PeerControlRequest;
use defguard_wireguard_rs::host::Peer;
use defguard_wireguard_rs::{host::Host, key::Key};
use defguard_wireguard_rs::key::Key;
use futures::channel::oneshot;
use nym_authenticator_requests::v2::registration::BANDWIDTH_CAP_PER_DAY;
use nym_credential_verification::bandwidth_storage_manager::BandwidthStorageManager;
@@ -23,7 +24,7 @@ const AUTO_REMOVE_AFTER: Duration = Duration::from_secs(60 * 60 * 24); // 24 hou
pub struct PeerHandle<St> {
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
hosts_information: Arc<RwLock<Hosts>>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
timeout_check_interval: IntervalStream,
@@ -35,7 +36,7 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
pub fn new(
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
hosts_information: Arc<RwLock<Hosts>>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
task_client: &TaskClient,
@@ -48,7 +49,7 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
PeerHandle {
storage,
public_key,
host_information,
hosts_information,
bandwidth_storage_manager,
request_tx,
timeout_check_interval,
@@ -123,10 +124,9 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
tokio::select! {
_ = self.timeout_check_interval.next() => {
let Some(kernel_peer) = self
.host_information
.hosts_information
.read()
.await
.peers
.get(&self.public_key)
.cloned() else {
// the host information hasn't beed updated yet