Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3aa2d859a2 |
Generated
+1
@@ -6872,6 +6872,7 @@ dependencies = [
|
||||
"nym-network-defaults",
|
||||
"nym-task",
|
||||
"nym-wireguard-types",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
|
||||
@@ -50,7 +50,8 @@ pub mod wireguard {
|
||||
pub const WG_PORT: u16 = 51822;
|
||||
|
||||
// The interface used to route traffic
|
||||
pub const WG_TUN_BASE_NAME: &str = "nymwg";
|
||||
pub const WG_TUN_BASE_NAME_V4: &str = "nymwg";
|
||||
pub const WG_TUN_BASE_NAME_V6: &str = "nymwgv6";
|
||||
pub const WG_TUN_DEVICE_ADDRESS: &str = "10.1.0.1";
|
||||
pub const WG_TUN_DEVICE_IP_ADDRESS_V4: Ipv4Addr = Ipv4Addr::new(10, 1, 0, 1);
|
||||
pub const WG_TUN_DEVICE_NETMASK_V4: u8 = 16;
|
||||
|
||||
@@ -23,6 +23,9 @@ futures = { workspace = true }
|
||||
x25519-dalek = { workspace = true }
|
||||
ip_network = { workspace = true }
|
||||
log.workspace = true
|
||||
serde = { workspace = true, features = [
|
||||
"derive",
|
||||
] } # for config serialization/deserialization
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
|
||||
tokio-stream = { workspace = true }
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use defguard_wireguard_rs::{host::Peer, key::Key};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Default, Clone, Serialize, Deserialize)]
|
||||
pub struct Hosts {
|
||||
host_v4: defguard_wireguard_rs::host::Host,
|
||||
host_v6: defguard_wireguard_rs::host::Host,
|
||||
}
|
||||
|
||||
impl Hosts {
|
||||
pub fn new(
|
||||
host_v4: defguard_wireguard_rs::host::Host,
|
||||
host_v6: defguard_wireguard_rs::host::Host,
|
||||
) -> Self {
|
||||
Self { host_v4, host_v6 }
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &Key) -> Option<&Peer> {
|
||||
self.host_v4
|
||||
.peers
|
||||
.get(key)
|
||||
.or_else(|| self.host_v6.peers.get(key))
|
||||
}
|
||||
}
|
||||
+63
-38
@@ -6,34 +6,44 @@
|
||||
// #![warn(clippy::expect_used)]
|
||||
// #![warn(clippy::unwrap_used)]
|
||||
|
||||
use defguard_wireguard_rs::WGApi;
|
||||
#[cfg(target_os = "linux")]
|
||||
use defguard_wireguard_rs::{host::Peer, key::Key, net::IpAddrMask};
|
||||
use defguard_wireguard_rs::{WGApi, WireguardInterfaceApi};
|
||||
use hosts::Hosts;
|
||||
use nym_crypto::asymmetric::encryption::KeyPair;
|
||||
#[cfg(target_os = "linux")]
|
||||
use nym_network_defaults::constants::WG_TUN_BASE_NAME;
|
||||
use nym_network_defaults::constants::{WG_TUN_BASE_NAME_V4, WG_TUN_BASE_NAME_V6};
|
||||
use nym_wireguard_types::Config;
|
||||
use peer_controller::PeerControlRequest;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
|
||||
pub(crate) mod error;
|
||||
pub mod hosts;
|
||||
pub mod peer_controller;
|
||||
pub mod peer_handle;
|
||||
|
||||
pub struct WgApiWrapper {
|
||||
inner: WGApi,
|
||||
api_v4: WGApi,
|
||||
api_v6: WGApi,
|
||||
}
|
||||
|
||||
impl WgApiWrapper {
|
||||
pub fn new(wg_api: WGApi) -> Self {
|
||||
WgApiWrapper { inner: wg_api }
|
||||
pub fn new(api_v4: WGApi, api_v6: WGApi) -> Self {
|
||||
WgApiWrapper { api_v4, api_v6 }
|
||||
}
|
||||
|
||||
pub fn get_hosts(&self) -> Result<Hosts, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||
let host_v4 = self.api_v4.read_interface_data()?;
|
||||
let host_v6 = self.api_v6.read_interface_data()?;
|
||||
let hosts = Hosts::new(host_v4, host_v6);
|
||||
Ok(hosts)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WgApiWrapper {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = defguard_wireguard_rs::WireguardInterfaceApi::remove_interface(&self.inner)
|
||||
if let Err(e) = defguard_wireguard_rs::WireguardInterfaceApi::remove_interface(&self.api_v4)
|
||||
{
|
||||
log::error!("Could not remove the wireguard interface: {:?}", e);
|
||||
}
|
||||
@@ -93,9 +103,13 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
let ifname = String::from(WG_TUN_BASE_NAME);
|
||||
let wg_api = defguard_wireguard_rs::WGApi::new(ifname.clone(), false)?;
|
||||
let ifname_v4 = String::from(WG_TUN_BASE_NAME_V4);
|
||||
let ifname_v6 = String::from(WG_TUN_BASE_NAME_V6);
|
||||
let wg_api_v4 = defguard_wireguard_rs::WGApi::new(ifname_v4.clone(), false)?;
|
||||
let wg_api_v6 = defguard_wireguard_rs::WGApi::new(ifname_v6.clone(), false)?;
|
||||
let mut peer_bandwidth_managers = HashMap::with_capacity(all_peers.len());
|
||||
let mut peers_v4 = vec![];
|
||||
let mut peers_v6 = vec![];
|
||||
let peers = all_peers
|
||||
.into_iter()
|
||||
.map(Peer::try_from)
|
||||
@@ -108,7 +122,10 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
|
||||
peer
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
for peer in peers.iter() {
|
||||
for peer in peers.into_iter() {
|
||||
let Some(ip_mask) = peer.allowed_ips.first() else {
|
||||
continue;
|
||||
};
|
||||
let bandwidth_manager =
|
||||
PeerController::generate_bandwidth_manager(storage.clone(), &peer.public_key)
|
||||
.await?
|
||||
@@ -116,34 +133,35 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
|
||||
// Update storage with *x_bytes set to 0, as in kernel peers we can't set those values
|
||||
// so we need to restart counting. Hopefully the bandwidth was counted in available_bandwidth
|
||||
storage
|
||||
.insert_wireguard_peer(peer, bandwidth_manager.is_some())
|
||||
.insert_wireguard_peer(&peer, bandwidth_manager.is_some())
|
||||
.await?;
|
||||
peer_bandwidth_managers.insert(peer.public_key.clone(), bandwidth_manager);
|
||||
if ip_mask.ip.is_ipv4() {
|
||||
peers_v4.push(peer);
|
||||
} else {
|
||||
peers_v6.push(peer);
|
||||
}
|
||||
}
|
||||
wg_api.create_interface()?;
|
||||
let interface_config = InterfaceConfiguration {
|
||||
name: ifname.clone(),
|
||||
wg_api_v4.create_interface()?;
|
||||
wg_api_v6.create_interface()?;
|
||||
|
||||
wg_api_v4.configure_interface(&InterfaceConfiguration {
|
||||
name: ifname_v4.clone(),
|
||||
prvkey: BASE64_STANDARD.encode(wireguard_data.inner.keypair().private_key().to_bytes()),
|
||||
address: wireguard_data.inner.config().private_ipv4.to_string(),
|
||||
port: wireguard_data.inner.config().announced_port as u32,
|
||||
peers,
|
||||
peers: peers_v4,
|
||||
mtu: None,
|
||||
};
|
||||
wg_api.configure_interface(&interface_config)?;
|
||||
std::process::Command::new("ip")
|
||||
.args([
|
||||
"-6",
|
||||
"addr",
|
||||
"add",
|
||||
&format!(
|
||||
"{}/{}",
|
||||
wireguard_data.inner.config().private_ipv6,
|
||||
wireguard_data.inner.config().private_network_prefix_v6
|
||||
),
|
||||
"dev",
|
||||
(&ifname),
|
||||
])
|
||||
.output()?;
|
||||
})?;
|
||||
|
||||
wg_api_v6.configure_interface(&InterfaceConfiguration {
|
||||
name: ifname_v6.clone(),
|
||||
prvkey: BASE64_STANDARD.encode(wireguard_data.inner.keypair().private_key().to_bytes()),
|
||||
address: wireguard_data.inner.config().private_ipv6.to_string(),
|
||||
port: wireguard_data.inner.config().announced_port as u32,
|
||||
peers: peers_v6,
|
||||
mtu: None,
|
||||
})?;
|
||||
|
||||
// Use a dummy peer to create routing rule for the entire network space
|
||||
let mut catch_all_peer = Peer::new(Key::new([0; 32]));
|
||||
@@ -155,18 +173,25 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
|
||||
wireguard_data.inner.config().private_ipv6,
|
||||
wireguard_data.inner.config().private_network_prefix_v6,
|
||||
)?;
|
||||
catch_all_peer.set_allowed_ips(vec![
|
||||
IpAddrMask::new(network_v4.network_address(), network_v4.netmask()),
|
||||
IpAddrMask::new(network_v6.network_address(), network_v6.netmask()),
|
||||
]);
|
||||
wg_api.configure_peer_routing(&[catch_all_peer])?;
|
||||
|
||||
let host = wg_api.read_interface_data()?;
|
||||
let wg_api = std::sync::Arc::new(WgApiWrapper::new(wg_api));
|
||||
catch_all_peer.set_allowed_ips(vec![IpAddrMask::new(
|
||||
network_v4.network_address(),
|
||||
network_v4.netmask(),
|
||||
)]);
|
||||
wg_api_v4.configure_peer_routing(&[catch_all_peer.clone()])?;
|
||||
|
||||
catch_all_peer.set_allowed_ips(vec![IpAddrMask::new(
|
||||
network_v6.network_address(),
|
||||
network_v6.netmask(),
|
||||
)]);
|
||||
wg_api_v6.configure_peer_routing(&[catch_all_peer])?;
|
||||
|
||||
let wg_api = std::sync::Arc::new(WgApiWrapper::new(wg_api_v4, wg_api_v6));
|
||||
let hosts = wg_api.get_hosts()?;
|
||||
let mut controller = PeerController::new(
|
||||
storage,
|
||||
wg_api.clone(),
|
||||
host,
|
||||
hosts,
|
||||
peer_bandwidth_managers,
|
||||
wireguard_data.inner.peer_tx.clone(),
|
||||
wireguard_data.peer_rx,
|
||||
|
||||
@@ -1,11 +1,7 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use defguard_wireguard_rs::{
|
||||
host::{Host, Peer},
|
||||
key::Key,
|
||||
WireguardInterfaceApi,
|
||||
};
|
||||
use defguard_wireguard_rs::{host::Peer, key::Key, WireguardInterfaceApi};
|
||||
use futures::channel::oneshot;
|
||||
use nym_authenticator_requests::{
|
||||
latest::registration::RemainingBandwidthData, v1::registration::BANDWIDTH_CAP_PER_DAY,
|
||||
@@ -20,6 +16,7 @@ use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::sync::{mpsc, RwLock};
|
||||
use tokio_stream::{wrappers::IntervalStream, StreamExt};
|
||||
|
||||
use crate::hosts::Hosts;
|
||||
use crate::peer_handle::PeerHandle;
|
||||
use crate::WgApiWrapper;
|
||||
use crate::{error::Error, peer_handle::SharedBandwidthStorageManager};
|
||||
@@ -68,7 +65,7 @@ pub struct PeerController<St: Storage + Clone + 'static> {
|
||||
request_tx: mpsc::Sender<PeerControlRequest>,
|
||||
request_rx: mpsc::Receiver<PeerControlRequest>,
|
||||
wg_api: Arc<WgApiWrapper>,
|
||||
host_information: Arc<RwLock<Host>>,
|
||||
hosts_information: Arc<RwLock<Hosts>>,
|
||||
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
|
||||
timeout_check_interval: IntervalStream,
|
||||
task_client: nym_task::TaskClient,
|
||||
@@ -78,7 +75,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
pub fn new(
|
||||
storage: St,
|
||||
wg_api: Arc<WgApiWrapper>,
|
||||
initial_host_information: Host,
|
||||
initial_hosts_information: Hosts,
|
||||
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
|
||||
request_tx: mpsc::Sender<PeerControlRequest>,
|
||||
request_rx: mpsc::Receiver<PeerControlRequest>,
|
||||
@@ -87,12 +84,12 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
let timeout_check_interval = tokio_stream::wrappers::IntervalStream::new(
|
||||
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
|
||||
);
|
||||
let host_information = Arc::new(RwLock::new(initial_host_information));
|
||||
let hosts_information = Arc::new(RwLock::new(initial_hosts_information));
|
||||
for (public_key, bandwidth_storage_manager) in bw_storage_managers.iter() {
|
||||
let mut handle = PeerHandle::new(
|
||||
storage.clone(),
|
||||
public_key.clone(),
|
||||
host_information.clone(),
|
||||
hosts_information.clone(),
|
||||
bandwidth_storage_manager.clone(),
|
||||
request_tx.clone(),
|
||||
&task_client,
|
||||
@@ -107,7 +104,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
PeerController {
|
||||
storage,
|
||||
wg_api,
|
||||
host_information,
|
||||
hosts_information,
|
||||
bw_storage_managers,
|
||||
request_tx,
|
||||
request_rx,
|
||||
@@ -122,7 +119,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
self.storage.insert_wireguard_peer(peer, false).await?;
|
||||
}
|
||||
let ret: Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> =
|
||||
self.wg_api.inner.configure_peer(peer);
|
||||
self.wg_api.api_v4.configure_peer(peer);
|
||||
if client_id.is_none() && ret.is_err() {
|
||||
// Try to revert the insertion in storage
|
||||
if self
|
||||
@@ -141,7 +138,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
pub async fn remove_peer(&mut self, key: &Key) -> Result<(), Error> {
|
||||
self.storage.remove_wireguard_peer(&key.to_string()).await?;
|
||||
self.bw_storage_managers.remove(key);
|
||||
let ret = self.wg_api.inner.remove_peer(key);
|
||||
let ret = self.wg_api.api_v4.remove_peer(key);
|
||||
if ret.is_err() {
|
||||
log::error!("Wireguard peer could not be removed from wireguard kernel module. Process should be restarted so that the interface is reset.");
|
||||
}
|
||||
@@ -187,7 +184,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
let mut handle = PeerHandle::new(
|
||||
self.storage.clone(),
|
||||
peer.public_key.clone(),
|
||||
self.host_information.clone(),
|
||||
self.hosts_information.clone(),
|
||||
bandwidth_storage_manager.clone(),
|
||||
self.request_tx.clone(),
|
||||
&self.task_client,
|
||||
@@ -226,7 +223,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
.available_bandwidth()
|
||||
.await
|
||||
} else {
|
||||
let Some(peer) = self.host_information.read().await.peers.get(key).cloned() else {
|
||||
let Some(peer) = self.hosts_information.read().await.get(key).cloned() else {
|
||||
// host information not updated yet
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -242,11 +239,11 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = self.timeout_check_interval.next() => {
|
||||
let Ok(host) = self.wg_api.inner.read_interface_data() else {
|
||||
let Ok(hosts) = self.wg_api.get_hosts() else {
|
||||
log::error!("Can't read wireguard kernel data");
|
||||
continue;
|
||||
};
|
||||
*self.host_information.write().await = host;
|
||||
*self.hosts_information.write().await = hosts;
|
||||
}
|
||||
_ = self.task_client.recv() => {
|
||||
log::trace!("PeerController handler: Received shutdown");
|
||||
|
||||
@@ -2,9 +2,10 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::hosts::Hosts;
|
||||
use crate::peer_controller::PeerControlRequest;
|
||||
use defguard_wireguard_rs::host::Peer;
|
||||
use defguard_wireguard_rs::{host::Host, key::Key};
|
||||
use defguard_wireguard_rs::key::Key;
|
||||
use futures::channel::oneshot;
|
||||
use nym_authenticator_requests::v2::registration::BANDWIDTH_CAP_PER_DAY;
|
||||
use nym_credential_verification::bandwidth_storage_manager::BandwidthStorageManager;
|
||||
@@ -23,7 +24,7 @@ const AUTO_REMOVE_AFTER: Duration = Duration::from_secs(60 * 60 * 24); // 24 hou
|
||||
pub struct PeerHandle<St> {
|
||||
storage: St,
|
||||
public_key: Key,
|
||||
host_information: Arc<RwLock<Host>>,
|
||||
hosts_information: Arc<RwLock<Hosts>>,
|
||||
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
|
||||
request_tx: mpsc::Sender<PeerControlRequest>,
|
||||
timeout_check_interval: IntervalStream,
|
||||
@@ -35,7 +36,7 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
|
||||
pub fn new(
|
||||
storage: St,
|
||||
public_key: Key,
|
||||
host_information: Arc<RwLock<Host>>,
|
||||
hosts_information: Arc<RwLock<Hosts>>,
|
||||
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
|
||||
request_tx: mpsc::Sender<PeerControlRequest>,
|
||||
task_client: &TaskClient,
|
||||
@@ -48,7 +49,7 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
|
||||
PeerHandle {
|
||||
storage,
|
||||
public_key,
|
||||
host_information,
|
||||
hosts_information,
|
||||
bandwidth_storage_manager,
|
||||
request_tx,
|
||||
timeout_check_interval,
|
||||
@@ -123,10 +124,9 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
|
||||
tokio::select! {
|
||||
_ = self.timeout_check_interval.next() => {
|
||||
let Some(kernel_peer) = self
|
||||
.host_information
|
||||
.hosts_information
|
||||
.read()
|
||||
.await
|
||||
.peers
|
||||
.get(&self.public_key)
|
||||
.cloned() else {
|
||||
// the host information hasn't beed updated yet
|
||||
|
||||
Reference in New Issue
Block a user