Compare commits

...

7 Commits

Author SHA1 Message Date
Bogdan-Ștefan Neacşu c1776bff02 More win fix 2025-06-24 14:20:12 +00:00
Bogdan-Ștefan Neacşu 72a43d5bf8 Fix clippy 2025-06-24 14:20:12 +00:00
Bogdan-Ștefan Neacşu 21cb5597f0 Use correct LevelFilter 2025-06-24 14:20:12 +00:00
Bogdan-Ștefan Neacşu b60d22feb2 Add log_slow_statements to gateway storage 2025-06-24 14:20:12 +00:00
Bogdan-Ștefan Neacşu 7957d33d38 Use make_bincode_serializer like in other places 2025-06-24 14:20:12 +00:00
Bogdan-Ștefan Neacşu ec9635447a Fix windows different API 2025-06-24 14:20:12 +00:00
Bogdan-Ștefan Neacşu 2f47aa5cda Set cached storage counters to 0 (#5812)
* Set cached storage counters to 0

* u64 to i64 log possible error

* Check addition too

Debug commit

Remove more data from wg storage peer

Put actual ticket type in storage

Simplify add peer

Finish rebase

Pass defguard Peer

Cache less data for consumption

GatewayStorage traits

Wg API trait

Mock test structures

Unit test for peer controller

EcashManager trait

Init test of Authenticator

Remove peer test
2025-06-24 14:20:12 +00:00
47 changed files with 1599 additions and 1486 deletions
Generated
+9 -2
View File
@@ -2290,9 +2290,9 @@ dependencies = [
[[package]]
name = "dyn-clone"
version = "1.0.18"
version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feeef44e73baff3a26d371801df019877a9866a8c493d315ab00177843314f35"
checksum = "1c7a8fb8a9fbf66c1f703fe16184d10ca0ee9d23be5b4436400408ba54a95005"
[[package]]
name = "easy-addr"
@@ -5000,6 +5000,7 @@ dependencies = [
"serde",
"serde_json",
"thiserror 2.0.12",
"time",
"tokio",
"tokio-stream",
"tokio-util",
@@ -5600,9 +5601,11 @@ dependencies = [
name = "nym-credential-verification"
version = "0.1.0"
dependencies = [
"async-trait",
"bs58",
"cosmwasm-std",
"cw-utils",
"dyn-clone",
"futures",
"nym-api-requests 0.1.0",
"nym-credentials",
@@ -5986,8 +5989,10 @@ dependencies = [
name = "nym-gateway-storage"
version = "0.1.0"
dependencies = [
"async-trait",
"bincode",
"defguard_wireguard_rs",
"dyn-clone",
"nym-credentials-interface 0.1.0",
"nym-gateway-requests",
"nym-sphinx 0.1.0",
@@ -7913,11 +7918,13 @@ dependencies = [
name = "nym-wireguard"
version = "0.1.0"
dependencies = [
"async-trait",
"base64 0.22.1",
"bincode",
"chrono",
"dashmap",
"defguard_wireguard_rs",
"dyn-clone",
"futures",
"ip_network",
"log",
+1
View File
@@ -233,6 +233,7 @@ digest = "0.10.7"
dirs = "5.0"
doc-comment = "0.3"
dotenvy = "0.15.6"
dyn-clone = "1.0.19"
ecdsa = "0.16"
ed25519-dalek = "2.1"
encoding_rs = "0.8.35"
@@ -28,8 +28,6 @@ pub type HmacSha256 = Hmac<Sha256>;
pub type Nonce = u64;
pub type Taken = Option<SystemTime>;
pub const BANDWIDTH_CAP_PER_DAY: u64 = 250 * 1024 * 1024 * 1024; // 250 GB
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct IpPair {
pub ipv4: Ipv4Addr,
@@ -11,9 +11,11 @@ rust-version.workspace = true
readme.workspace = true
[dependencies]
async-trait = { workspace = true }
bs58 = { workspace = true }
cosmwasm-std = { workspace = true }
cw-utils = { workspace = true }
dyn-clone = { workspace = true }
futures = { workspace = true }
rand = { workspace = true }
si-scale = { workspace = true }
@@ -7,25 +7,36 @@ use crate::ClientBandwidth;
use nym_credentials::ecash::utils::ecash_today;
use nym_credentials_interface::Bandwidth;
use nym_gateway_requests::ServerResponse;
use nym_gateway_storage::GatewayStorage;
use nym_gateway_storage::traits::BandwidthGatewayStorage;
use si_scale::helpers::bibytes2;
use time::OffsetDateTime;
use tracing::*;
const FREE_TESTNET_BANDWIDTH_VALUE: Bandwidth = Bandwidth::new_unchecked(64 * 1024 * 1024 * 1024); // 64GB
#[derive(Clone)]
pub struct BandwidthStorageManager {
pub(crate) storage: GatewayStorage,
pub(crate) storage: Box<dyn BandwidthGatewayStorage + Send + Sync>,
pub(crate) client_bandwidth: ClientBandwidth,
pub(crate) client_id: i64,
pub(crate) bandwidth_cfg: BandwidthFlushingBehaviourConfig,
pub(crate) only_coconut_credentials: bool,
}
impl Clone for BandwidthStorageManager {
fn clone(&self) -> Self {
Self {
storage: dyn_clone::clone_box(&*self.storage),
client_bandwidth: self.client_bandwidth.clone(),
client_id: self.client_id,
bandwidth_cfg: self.bandwidth_cfg,
only_coconut_credentials: self.only_coconut_credentials,
}
}
}
impl BandwidthStorageManager {
pub fn new(
storage: GatewayStorage,
storage: Box<dyn BandwidthGatewayStorage + Send + Sync>,
client_bandwidth: ClientBandwidth,
client_id: i64,
bandwidth_cfg: BandwidthFlushingBehaviourConfig,
+123 -35
View File
@@ -2,12 +2,14 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::Error;
use async_trait::async_trait;
use credential_sender::CredentialHandler;
use credential_sender::CredentialHandlerConfig;
use error::EcashTicketError;
use futures::channel::mpsc::{self, UnboundedSender};
use nym_credentials::CredentialSpendingData;
use nym_credentials_interface::{ClientTicket, CompactEcashError, NymPayInfo, VerificationKeyAuth};
use nym_gateway_storage::traits::BandwidthGatewayStorage;
use nym_gateway_storage::GatewayStorage;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::DirectSigningHttpRpcNyxdClient;
@@ -20,6 +22,7 @@ pub mod credential_sender;
pub mod error;
mod helpers;
mod state;
pub mod traits;
pub const TIME_RANGE_SEC: i64 = 30;
@@ -31,44 +34,21 @@ pub struct EcashManager {
cred_sender: UnboundedSender<ClientTicket>,
}
impl EcashManager {
pub async fn new(
credential_handler_cfg: CredentialHandlerConfig,
nyxd_client: DirectSigningHttpRpcNyxdClient,
pk_bytes: [u8; 32],
shutdown: nym_task::TaskClient,
storage: GatewayStorage,
) -> Result<Self, Error> {
let shared_state = SharedState::new(nyxd_client, storage).await?;
let (cred_sender, cred_receiver) = mpsc::unbounded();
let cs =
CredentialHandler::new(credential_handler_cfg, cred_receiver, shared_state.clone())
.await?;
cs.start(shutdown);
Ok(EcashManager {
shared_state,
pk_bytes,
pay_infos: Default::default(),
cred_sender,
})
}
pub async fn verification_key(
#[async_trait]
impl traits::EcashManager for EcashManager {
async fn verification_key(
&self,
epoch_id: EpochId,
) -> Result<RwLockReadGuard<VerificationKeyAuth>, EcashTicketError> {
self.shared_state.verification_key(epoch_id).await
}
pub fn storage(&self) -> &GatewayStorage {
&self.shared_state.storage
fn storage(&self) -> Box<dyn BandwidthGatewayStorage + Send + Sync> {
dyn_clone::clone_box(&*self.shared_state.storage)
}
//Check for duplicate pay_info, then check the payment, then insert pay_info if everything succeeded
pub async fn check_payment(
async fn check_payment(
&self,
credential: &CredentialSpendingData,
aggregated_verification_key: &VerificationKeyAuth,
@@ -88,6 +68,40 @@ impl EcashManager {
.await
}
fn async_verify(&self, ticket: ClientTicket) {
// TODO: I guess do something for shutdowns
let _ = self
.cred_sender
.unbounded_send(ticket)
.inspect_err(|_| error!("failed to send the client ticket for verification task"));
}
}
impl EcashManager {
pub async fn new(
credential_handler_cfg: CredentialHandlerConfig,
nyxd_client: DirectSigningHttpRpcNyxdClient,
pk_bytes: [u8; 32],
shutdown: nym_task::TaskClient,
storage: GatewayStorage,
) -> Result<Self, Error> {
let shared_state = SharedState::new(nyxd_client, Box::new(storage)).await?;
let (cred_sender, cred_receiver) = mpsc::unbounded();
let cs =
CredentialHandler::new(credential_handler_cfg, cred_receiver, shared_state.clone())
.await?;
cs.start(shutdown);
Ok(EcashManager {
shared_state,
pk_bytes,
pay_infos: Default::default(),
cred_sender,
})
}
pub async fn verify_pay_info(&self, pay_info: NymPayInfo) -> Result<usize, EcashTicketError> {
//Public key check
if pay_info.pk() != self.pk_bytes {
@@ -152,12 +166,86 @@ impl EcashManager {
inner.insert(index, pay_info);
Ok(())
}
}
pub fn async_verify(&self, ticket: ClientTicket) {
// TODO: I guess do something for shutdowns
let _ = self
.cred_sender
.unbounded_send(ticket)
.inspect_err(|_| error!("failed to send the client ticket for verification task"));
pub struct MockEcashManager {
verfication_key: tokio::sync::RwLock<VerificationKeyAuth>,
storage: Box<dyn BandwidthGatewayStorage + Send + Sync>,
}
impl MockEcashManager {
pub fn new(storage: Box<dyn BandwidthGatewayStorage + Send + Sync>) -> Self {
Self {
verfication_key: tokio::sync::RwLock::new(
VerificationKeyAuth::from_bytes(&[
129, 187, 76, 12, 1, 51, 46, 26, 132, 205, 148, 109, 140, 131, 50, 119, 45,
128, 51, 218, 106, 70, 181, 74, 244, 38, 162, 62, 42, 12, 5, 100, 7, 136, 32,
155, 18, 219, 195, 182, 3, 56, 168, 16, 93, 154, 249, 230, 16, 202, 90, 134,
246, 25, 98, 6, 175, 215, 188, 239, 71, 84, 66, 1, 43, 66, 197, 180, 216, 80,
55, 185, 140, 216, 14, 48, 244, 214, 20, 68, 106, 41, 48, 252, 188, 181, 231,
170, 23, 211, 215, 12, 91, 147, 47, 7, 4, 0, 0, 0, 0, 0, 0, 0, 174, 31, 237,
215, 159, 183, 71, 125, 90, 147, 84, 78, 49, 216, 66, 232, 92, 206, 41, 230,
239, 209, 211, 166, 131, 190, 148, 36, 225, 194, 146, 6, 120, 34, 194, 5, 154,
155, 234, 41, 191, 119, 227, 51, 91, 128, 151, 240, 129, 208, 253, 171, 234,
170, 71, 139, 251, 78, 49, 35, 218, 16, 77, 150, 177, 204, 83, 210, 67, 147,
66, 162, 58, 25, 96, 168, 61, 180, 92, 21, 18, 78, 194, 98, 176, 123, 122, 176,
81, 150, 187, 20, 64, 69, 0, 134, 142, 3, 84, 108, 3, 55, 107, 111, 73, 31, 46,
51, 225, 248, 202, 173, 194, 24, 104, 96, 31, 61, 24, 140, 220, 31, 176, 200,
30, 217, 66, 58, 11, 181, 158, 196, 179, 199, 177, 7, 210, 4, 119, 142, 149,
59, 3, 186, 145, 27, 230, 125, 230, 246, 197, 196, 119, 70, 239, 115, 99, 215,
63, 205, 63, 74, 108, 201, 42, 226, 150, 137, 3, 157, 45, 25, 163, 54, 107,
153, 61, 141, 64, 207, 139, 41, 203, 39, 36, 97, 181, 72, 206, 235, 221, 178,
171, 60, 4, 6, 170, 181, 213, 10, 216, 53, 28, 32, 33, 41, 224, 60, 247, 206,
137, 108, 251, 229, 234, 112, 65, 145, 124, 212, 125, 116, 154, 114, 2, 125,
202, 24, 25, 196, 219, 104, 200, 131, 133, 180, 39, 21, 144, 204, 8, 151, 218,
99, 64, 209, 47, 5, 42, 13, 214, 139, 54, 112, 224, 53, 238, 250, 56, 42, 105,
15, 21, 238, 99, 225, 79, 121, 104, 155, 230, 243, 133, 47, 39, 147, 98, 45,
113, 137, 200, 102, 151, 122, 174, 9, 250, 17, 138, 191, 129, 202, 244, 107,
75, 48, 141, 136, 89, 168, 124, 88, 174, 251, 17, 35, 146, 88, 76, 134, 102,
105, 204, 16, 176, 214, 63, 13, 170, 225, 250, 112, 7, 237, 161, 160, 15, 71,
10, 130, 137, 69, 186, 64, 223, 188, 5, 5, 228, 57, 214, 134, 247, 20, 171,
140, 43, 230, 57, 29, 127, 136, 169, 80, 14, 137, 130, 200, 205, 222, 81, 143,
40, 77, 68, 197, 91, 142, 91, 84, 164, 15, 133, 242, 149, 255, 173, 201, 108,
208, 23, 188, 230, 158, 146, 54, 198, 52, 148, 123, 202, 52, 222, 50, 4, 62,
211, 208, 176, 61, 104, 151, 227, 192, 224, 200, 132, 53, 187, 240, 254, 150,
60, 30, 140, 11, 63, 71, 12, 30, 233, 255, 144, 250, 16, 81, 38, 33, 9, 185,
195, 214, 0, 119, 117, 94, 100, 103, 144, 10, 189, 65, 113, 114, 192, 11, 177,
214, 223, 218, 36, 139, 183, 2, 206, 247, 245, 88, 62, 231, 183, 50, 46, 95,
202, 152, 82, 244, 80, 173, 192, 147, 51, 248, 46, 181, 194, 205, 233, 67, 144,
155, 250, 142, 124, 71, 9, 136, 142, 88, 29, 99, 222, 43, 181, 172, 120, 187,
179, 172, 240, 231, 57, 236, 195, 158, 182, 203, 19, 49, 220, 180, 212, 101,
105, 239, 58, 215, 0, 50, 100, 172, 29, 236, 170, 108, 129, 150, 5, 64, 238,
59, 50, 4, 21, 131, 197, 142, 191, 76, 101, 140, 133, 112, 38, 235, 113, 203,
22, 161, 204, 84, 73, 125, 219, 70, 62, 67, 119, 52, 130, 208, 180, 231, 78,
141, 181, 13, 207, 196, 126, 159, 70, 34, 195, 70,
])
.unwrap(),
),
storage: dyn_clone::clone_box(&*storage),
}
}
}
#[async_trait]
impl traits::EcashManager for MockEcashManager {
async fn verification_key(
&self,
_epoch_id: EpochId,
) -> Result<RwLockReadGuard<VerificationKeyAuth>, EcashTicketError> {
Ok(self.verfication_key.read().await)
}
fn storage(&self) -> Box<dyn BandwidthGatewayStorage + Send + Sync> {
dyn_clone::clone_box(&*self.storage)
}
async fn check_payment(
&self,
_credential: &CredentialSpendingData,
_aggregated_verification_key: &VerificationKeyAuth,
) -> Result<(), EcashTicketError> {
Ok(())
}
fn async_verify(&self, _ticket: ClientTicket) {}
}
@@ -6,7 +6,7 @@ use crate::Error;
use cosmwasm_std::{from_json, CosmosMsg, WasmMsg};
use nym_credentials_interface::VerificationKeyAuth;
use nym_ecash_contract_common::msg::ExecuteMsg;
use nym_gateway_storage::GatewayStorage;
use nym_gateway_storage::traits::BandwidthGatewayStorage;
use nym_validator_client::coconut::all_ecash_api_clients;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::{
@@ -22,18 +22,28 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::{error, trace, warn};
// state shared by different subtasks dealing with credentials
#[derive(Clone)]
pub(crate) struct SharedState {
pub(crate) nyxd_client: Arc<RwLock<DirectSigningHttpRpcNyxdClient>>,
pub(crate) address: AccountId,
pub(crate) epoch_data: Arc<RwLock<BTreeMap<EpochId, EpochState>>>,
pub(crate) storage: GatewayStorage,
pub(crate) storage: Box<dyn BandwidthGatewayStorage + Send + Sync>,
}
impl Clone for SharedState {
fn clone(&self) -> Self {
Self {
nyxd_client: self.nyxd_client.clone(),
address: self.address.clone(),
epoch_data: self.epoch_data.clone(),
storage: dyn_clone::clone_box(&*self.storage),
}
}
}
impl SharedState {
pub(crate) async fn new(
nyxd_client: DirectSigningHttpRpcNyxdClient,
storage: GatewayStorage,
storage: Box<dyn BandwidthGatewayStorage + Send + Sync>,
) -> Result<Self, Error> {
let address = nyxd_client.address();
@@ -0,0 +1,23 @@
use async_trait::async_trait;
use nym_credentials::CredentialSpendingData;
use nym_credentials_interface::{ClientTicket, VerificationKeyAuth};
use nym_gateway_storage::traits::BandwidthGatewayStorage;
use nym_validator_client::nym_api::EpochId;
use tokio::sync::RwLockReadGuard;
use crate::ecash::error::EcashTicketError;
#[async_trait]
pub trait EcashManager {
async fn verification_key(
&self,
epoch_id: EpochId,
) -> Result<RwLockReadGuard<VerificationKeyAuth>, EcashTicketError>;
fn storage(&self) -> Box<dyn BandwidthGatewayStorage + Send + Sync>;
async fn check_payment(
&self,
credential: &CredentialSpendingData,
aggregated_verification_key: &VerificationKeyAuth,
) -> Result<(), EcashTicketError>;
fn async_verify(&self, ticket: ClientTicket);
}
+3 -3
View File
@@ -1,8 +1,8 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::ecash::traits::EcashManager;
use bandwidth_storage_manager::BandwidthStorageManager;
use ecash::EcashManager;
use nym_credentials::ecash::utils::{cred_exp_date, ecash_today, EcashTime};
use nym_credentials_interface::{Bandwidth, ClientTicket, TicketType};
use nym_gateway_requests::models::CredentialSpendingRequest;
@@ -20,14 +20,14 @@ pub mod error;
pub struct CredentialVerifier {
credential: CredentialSpendingRequest,
ecash_verifier: Arc<EcashManager>,
ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
bandwidth_storage_manager: BandwidthStorageManager,
}
impl CredentialVerifier {
pub fn new(
credential: CredentialSpendingRequest,
ecash_verifier: Arc<EcashManager>,
ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
bandwidth_storage_manager: BandwidthStorageManager,
) -> Self {
CredentialVerifier {
+7
View File
@@ -9,8 +9,10 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait = { workspace = true }
bincode = { workspace = true }
defguard_wireguard_rs = { workspace = true }
dyn-clone = { workspace = true }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
@@ -21,6 +23,7 @@ sqlx = { workspace = true, features = [
] }
time = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync"], optional = true }
tracing = { workspace = true }
nym-credentials-interface = { path = "../credentials-interface" }
@@ -35,3 +38,7 @@ sqlx = { workspace = true, features = [
"macros",
"migrate",
] }
[features]
default = []
mock = ["tokio"]
@@ -0,0 +1,19 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
DELETE FROM wireguard_peer WHERE client_id IS NULL;
CREATE TABLE wireguard_peer_new
(
public_key TEXT NOT NULL PRIMARY KEY UNIQUE,
allowed_ips BLOB NOT NULL,
client_id INTEGER REFERENCES clients(id) NOT NULL
);
INSERT INTO wireguard_peer_new (public_key, allowed_ips, client_id)
SELECT public_key, allowed_ips, client_id FROM wireguard_peer;
DROP TABLE wireguard_peer;
ALTER TABLE wireguard_peer_new RENAME TO wireguard_peer;
+13
View File
@@ -3,6 +3,8 @@
use std::str::FromStr;
use nym_credentials_interface::TicketType;
use crate::models::Client;
#[derive(Debug, PartialEq, sqlx::Type)]
@@ -15,6 +17,17 @@ pub enum ClientType {
ExitWireguard,
}
impl From<TicketType> for ClientType {
fn from(value: TicketType) -> Self {
match value {
TicketType::V1MixnetEntry => ClientType::EntryMixnet,
TicketType::V1MixnetExit => ClientType::ExitMixnet,
TicketType::V1WireguardEntry => ClientType::EntryWireguard,
TicketType::V1WireguardExit => ClientType::ExitWireguard,
}
}
}
impl FromStr for ClientType {
type Err = &'static str;
+77 -74
View File
@@ -1,6 +1,7 @@
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use async_trait::async_trait;
use bandwidth::BandwidthManager;
use clients::{ClientManager, ClientType};
use models::{
@@ -15,10 +16,10 @@ use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
ConnectOptions,
};
use std::path::Path;
use std::{path::Path, time::Duration};
use tickets::TicketStorageManager;
use time::OffsetDateTime;
use tracing::{debug, error};
use tracing::{debug, error, log::LevelFilter};
pub mod bandwidth;
mod clients;
@@ -27,11 +28,21 @@ mod inboxes;
pub mod models;
mod shared_keys;
mod tickets;
pub mod traits;
mod wireguard_peers;
pub use error::GatewayStorageError;
pub use inboxes::InboxManager;
use crate::traits::{BandwidthGatewayStorage, InboxGatewayStorage, SharedKeyGatewayStorage};
fn make_bincode_serializer() -> impl bincode::Options {
use bincode::Options;
bincode::DefaultOptions::new()
.with_big_endian()
.with_varint_encoding()
}
// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
pub struct GatewayStorage {
@@ -71,6 +82,21 @@ impl GatewayStorage {
&self.wireguard_peer_manager
}
pub async fn handle_forget_me(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), GatewayStorageError> {
let client_id = self.get_mixnet_client_id(client_address).await?;
self.inbox_manager()
.remove_messages_for_client(&client_address.as_base58_string())
.await?;
self.bandwidth_manager().remove_client(client_id).await?;
self.shared_key_manager()
.remove_shared_keys(&client_address.as_base58_string())
.await?;
Ok(())
}
/// Initialises `PersistentStorage` using the provided path.
///
/// # Arguments
@@ -92,6 +118,7 @@ impl GatewayStorage {
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(250))
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
@@ -123,8 +150,9 @@ impl GatewayStorage {
}
}
impl GatewayStorage {
pub async fn get_mixnet_client_id(
#[async_trait]
impl SharedKeyGatewayStorage for GatewayStorage {
async fn get_mixnet_client_id(
&self,
client_address: DestinationAddressBytes,
) -> Result<i64, GatewayStorageError> {
@@ -134,22 +162,7 @@ impl GatewayStorage {
.await?)
}
pub async fn handle_forget_me(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), GatewayStorageError> {
let client_id = self.get_mixnet_client_id(client_address).await?;
self.inbox_manager()
.remove_messages_for_client(&client_address.as_base58_string())
.await?;
self.bandwidth_manager().remove_client(client_id).await?;
self.shared_key_manager()
.remove_shared_keys(&client_address.as_base58_string())
.await?;
Ok(())
}
pub async fn insert_shared_keys(
async fn insert_shared_keys(
&self,
client_address: DestinationAddressBytes,
shared_keys: &SharedGatewayKey,
@@ -178,7 +191,7 @@ impl GatewayStorage {
Ok(client_id)
}
pub async fn get_shared_keys(
async fn get_shared_keys(
&self,
client_address: DestinationAddressBytes,
) -> Result<Option<PersistedSharedKeys>, GatewayStorageError> {
@@ -190,7 +203,7 @@ impl GatewayStorage {
}
#[allow(dead_code)]
pub async fn remove_shared_keys(
async fn remove_shared_keys(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), GatewayStorageError> {
@@ -200,7 +213,7 @@ impl GatewayStorage {
Ok(())
}
pub async fn update_last_used_authentication_timestamp(
async fn update_last_used_authentication_timestamp(
&self,
client_id: i64,
last_used_authentication_timestamp: OffsetDateTime,
@@ -214,12 +227,15 @@ impl GatewayStorage {
Ok(())
}
pub async fn get_client(&self, client_id: i64) -> Result<Option<Client>, GatewayStorageError> {
async fn get_client(&self, client_id: i64) -> Result<Option<Client>, GatewayStorageError> {
let client = self.client_manager.get_client(client_id).await?;
Ok(client)
}
}
pub async fn store_message(
#[async_trait]
impl InboxGatewayStorage for GatewayStorage {
async fn store_message(
&self,
client_address: DestinationAddressBytes,
message: Vec<u8>,
@@ -230,7 +246,7 @@ impl GatewayStorage {
Ok(())
}
pub async fn retrieve_messages(
async fn retrieve_messages(
&self,
client_address: DestinationAddressBytes,
start_after: Option<i64>,
@@ -242,19 +258,22 @@ impl GatewayStorage {
Ok(messages)
}
pub async fn remove_messages(&self, ids: Vec<i64>) -> Result<(), GatewayStorageError> {
async fn remove_messages(&self, ids: Vec<i64>) -> Result<(), GatewayStorageError> {
for id in ids {
self.inbox_manager.remove_message(id).await?;
}
Ok(())
}
}
pub async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError> {
#[async_trait]
impl BandwidthGatewayStorage for GatewayStorage {
async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError> {
self.bandwidth_manager.insert_new_client(client_id).await?;
Ok(())
}
pub async fn set_expiration(
async fn set_expiration(
&self,
client_id: i64,
expiration: OffsetDateTime,
@@ -265,12 +284,12 @@ impl GatewayStorage {
Ok(())
}
pub async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError> {
async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError> {
self.bandwidth_manager.reset_bandwidth(client_id).await?;
Ok(())
}
pub async fn get_available_bandwidth(
async fn get_available_bandwidth(
&self,
client_id: i64,
) -> Result<Option<PersistedBandwidth>, GatewayStorageError> {
@@ -280,7 +299,7 @@ impl GatewayStorage {
.await?)
}
pub async fn increase_bandwidth(
async fn increase_bandwidth(
&self,
client_id: i64,
amount: i64,
@@ -291,7 +310,7 @@ impl GatewayStorage {
.await?)
}
pub async fn revoke_ticket_bandwidth(
async fn revoke_ticket_bandwidth(
&self,
ticket_id: i64,
amount: i64,
@@ -302,7 +321,7 @@ impl GatewayStorage {
.await?)
}
pub async fn decrease_bandwidth(
async fn decrease_bandwidth(
&self,
client_id: i64,
amount: i64,
@@ -313,7 +332,7 @@ impl GatewayStorage {
.await?)
}
pub async fn insert_epoch_signers(
async fn insert_epoch_signers(
&self,
epoch_id: i64,
signer_ids: Vec<i64>,
@@ -324,7 +343,7 @@ impl GatewayStorage {
Ok(())
}
pub async fn insert_received_ticket(
async fn insert_received_ticket(
&self,
client_id: i64,
received_at: OffsetDateTime,
@@ -344,11 +363,11 @@ impl GatewayStorage {
Ok(ticket_id)
}
pub async fn contains_ticket(&self, serial_number: &[u8]) -> Result<bool, GatewayStorageError> {
async fn contains_ticket(&self, serial_number: &[u8]) -> Result<bool, GatewayStorageError> {
Ok(self.ticket_manager.has_ticket_data(serial_number).await?)
}
pub async fn insert_ticket_verification(
async fn insert_ticket_verification(
&self,
ticket_id: i64,
signer_id: i64,
@@ -361,7 +380,7 @@ impl GatewayStorage {
Ok(())
}
pub async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
// set the ticket as rejected
self.ticket_manager.set_rejected_ticket(ticket_id).await?;
@@ -372,7 +391,7 @@ impl GatewayStorage {
Ok(())
}
pub async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
// 1. insert into verified table
self.ticket_manager
.insert_verified_ticket(ticket_id)
@@ -386,7 +405,7 @@ impl GatewayStorage {
Ok(())
}
pub async fn remove_verified_ticket_binary_data(
async fn remove_verified_ticket_binary_data(
&self,
ticket_id: i64,
) -> Result<(), GatewayStorageError> {
@@ -396,7 +415,7 @@ impl GatewayStorage {
Ok(())
}
pub async fn get_all_verified_tickets_with_sn(
async fn get_all_verified_tickets_with_sn(
&self,
) -> Result<Vec<VerifiedTicket>, GatewayStorageError> {
Ok(self
@@ -405,7 +424,7 @@ impl GatewayStorage {
.await?)
}
pub async fn get_all_proposed_tickets_with_sn(
async fn get_all_proposed_tickets_with_sn(
&self,
proposal_id: u32,
) -> Result<Vec<VerifiedTicket>, GatewayStorageError> {
@@ -415,7 +434,7 @@ impl GatewayStorage {
.await?)
}
pub async fn insert_redemption_proposal(
async fn insert_redemption_proposal(
&self,
tickets: &[VerifiedTicket],
proposal_id: u32,
@@ -438,7 +457,7 @@ impl GatewayStorage {
Ok(())
}
pub async fn clear_post_proposal_data(
async fn clear_post_proposal_data(
&self,
proposal_id: u32,
resolved_at: OffsetDateTime,
@@ -462,13 +481,11 @@ impl GatewayStorage {
Ok(())
}
pub async fn latest_proposal(&self) -> Result<Option<RedemptionProposal>, GatewayStorageError> {
async fn latest_proposal(&self) -> Result<Option<RedemptionProposal>, GatewayStorageError> {
Ok(self.ticket_manager.get_latest_redemption_proposal().await?)
}
pub async fn get_all_unverified_tickets(
&self,
) -> Result<Vec<ClientTicket>, GatewayStorageError> {
async fn get_all_unverified_tickets(&self) -> Result<Vec<ClientTicket>, GatewayStorageError> {
self.ticket_manager
.get_unverified_tickets()
.await?
@@ -477,21 +494,21 @@ impl GatewayStorage {
.collect()
}
pub async fn get_all_unresolved_proposals(&self) -> Result<Vec<i64>, GatewayStorageError> {
async fn get_all_unresolved_proposals(&self) -> Result<Vec<i64>, GatewayStorageError> {
Ok(self
.ticket_manager
.get_all_unresolved_redemption_proposal_ids()
.await?)
}
pub async fn get_votes(&self, ticket_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
async fn get_votes(&self, ticket_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
Ok(self
.ticket_manager
.get_verification_votes(ticket_id)
.await?)
}
pub async fn get_signers(&self, epoch_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
async fn get_signers(&self, epoch_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
Ok(self.ticket_manager.get_epoch_signers(epoch_id).await?)
}
@@ -500,34 +517,20 @@ impl GatewayStorage {
/// # Arguments
///
/// * `peer`: wireguard peer data to be stored
/// * `with_client_id`: if the peer should have a corresponding client_id
/// (created with entry wireguard ticket) or live without one (or with an
/// exiting one), for temporary backwards compatibility.
pub async fn insert_wireguard_peer(
async fn insert_wireguard_peer(
&self,
peer: &defguard_wireguard_rs::host::Peer,
with_client_id: bool,
) -> Result<Option<i64>, GatewayStorageError> {
client_type: ClientType,
) -> Result<i64, GatewayStorageError> {
let client_id = match self
.wireguard_peer_manager
.retrieve_peer(&peer.public_key.to_string())
.await?
{
Some(peer) => peer.client_id,
_ => {
if with_client_id {
Some(
self.client_manager
.insert_client(ClientType::EntryWireguard)
.await?,
)
} else {
None
}
}
None => self.client_manager.insert_client(client_type).await?,
};
let mut peer = WireguardPeer::from(peer.clone());
peer.client_id = client_id;
let peer = WireguardPeer::from_defguard_peer(peer.clone(), client_id)?;
self.wireguard_peer_manager.insert_peer(&peer).await?;
Ok(client_id)
}
@@ -537,7 +540,7 @@ impl GatewayStorage {
/// # Arguments
///
/// * `peer_public_key`: wireguard public key of the peer to be retrieved.
pub async fn get_wireguard_peer(
async fn get_wireguard_peer(
&self,
peer_public_key: &str,
) -> Result<Option<WireguardPeer>, GatewayStorageError> {
@@ -549,7 +552,7 @@ impl GatewayStorage {
}
/// Retrieves all wireguard peers.
pub async fn get_all_wireguard_peers(&self) -> Result<Vec<WireguardPeer>, GatewayStorageError> {
async fn get_all_wireguard_peers(&self) -> Result<Vec<WireguardPeer>, GatewayStorageError> {
let ret = self.wireguard_peer_manager.retrieve_all_peers().await?;
Ok(ret)
}
@@ -559,7 +562,7 @@ impl GatewayStorage {
/// # Arguments
///
/// * `peer_public_key`: wireguard public key of the peer to be removed.
pub async fn remove_wireguard_peer(
async fn remove_wireguard_peer(
&self,
peer_public_key: &str,
) -> Result<(), GatewayStorageError> {
+15 -61
View File
@@ -1,9 +1,7 @@
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use std::time::SystemTime;
use crate::error::GatewayStorageError;
use crate::{error::GatewayStorageError, make_bincode_serializer};
use nym_credentials_interface::{AvailableBandwidth, ClientTicket, CredentialSpendingData};
use nym_gateway_requests::shared_key::{LegacySharedKeys, SharedGatewayKey, SharedSymmetricKey};
use sqlx::FromRow;
@@ -112,35 +110,23 @@ impl TryFrom<UnverifiedTicketData> for ClientTicket {
#[derive(Debug, Clone, FromRow)]
pub struct WireguardPeer {
pub public_key: String,
pub preshared_key: Option<String>,
pub protocol_version: Option<i64>,
pub endpoint: Option<String>,
pub last_handshake: Option<OffsetDateTime>,
pub tx_bytes: i64,
pub rx_bytes: i64,
pub persistent_keepalive_interval: Option<i64>,
pub allowed_ips: Vec<u8>,
pub client_id: Option<i64>,
pub client_id: i64,
}
impl From<defguard_wireguard_rs::host::Peer> for WireguardPeer {
fn from(value: defguard_wireguard_rs::host::Peer) -> Self {
WireguardPeer {
impl WireguardPeer {
pub fn from_defguard_peer(
value: defguard_wireguard_rs::host::Peer,
client_id: i64,
) -> Result<Self, crate::error::GatewayStorageError> {
Ok(WireguardPeer {
public_key: value.public_key.to_string(),
preshared_key: value.preshared_key.as_ref().map(|k| k.to_string()),
protocol_version: value.protocol_version.map(|v| v as i64),
endpoint: value.endpoint.map(|e| e.to_string()),
last_handshake: value.last_handshake.map(OffsetDateTime::from),
tx_bytes: value.tx_bytes as i64,
rx_bytes: value.rx_bytes as i64,
persistent_keepalive_interval: value.persistent_keepalive_interval.map(|v| v as i64),
allowed_ips: bincode::Options::serialize(
bincode::DefaultOptions::new(),
&value.allowed_ips,
)
.unwrap_or_default(),
client_id: None,
}
allowed_ips: bincode::Options::serialize(make_bincode_serializer(), &value.allowed_ips)
.map_err(|e| {
crate::error::GatewayStorageError::TypeConversion(format!("allowed ips {e}"))
})?,
client_id,
})
}
}
@@ -154,44 +140,12 @@ impl TryFrom<WireguardPeer> for defguard_wireguard_rs::host::Peer {
.as_str()
.try_into()
.map_err(|e| Self::Error::TypeConversion(format!("public key {e}")))?,
preshared_key: value
.preshared_key
.as_deref()
.map(TryFrom::try_from)
.transpose()
.map_err(|e| Self::Error::TypeConversion(format!("preshared key {e}")))?,
protocol_version: value
.protocol_version
.map(TryFrom::try_from)
.transpose()
.map_err(|e| Self::Error::TypeConversion(format!("protocol version {e}")))?,
endpoint: value
.endpoint
.as_deref()
.map(|e| e.parse())
.transpose()
.map_err(|e| Self::Error::TypeConversion(format!("endpoint {e}")))?,
last_handshake: value.last_handshake.map(SystemTime::from),
tx_bytes: value
.tx_bytes
.try_into()
.map_err(|e| Self::Error::TypeConversion(format!("tx bytes {e}")))?,
rx_bytes: value
.rx_bytes
.try_into()
.map_err(|e| Self::Error::TypeConversion(format!("rx bytes {e}")))?,
persistent_keepalive_interval: value
.persistent_keepalive_interval
.map(TryFrom::try_from)
.transpose()
.map_err(|e| {
Self::Error::TypeConversion(format!("persistent keepalive interval {e}"))
})?,
allowed_ips: bincode::Options::deserialize(
bincode::DefaultOptions::new(),
&value.allowed_ips,
)
.map_err(|e| Self::Error::TypeConversion(format!("allowed ips {e}")))?,
..Default::default()
})
}
}
+511
View File
@@ -0,0 +1,511 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use async_trait::async_trait;
use nym_credentials_interface::ClientTicket;
use nym_gateway_requests::SharedGatewayKey;
use nym_sphinx::DestinationAddressBytes;
use time::OffsetDateTime;
use crate::{
clients::ClientType,
models::{
Client, PersistedBandwidth, PersistedSharedKeys, RedemptionProposal, StoredMessage,
VerifiedTicket, WireguardPeer,
},
GatewayStorageError,
};
#[async_trait]
pub trait SharedKeyGatewayStorage {
async fn get_mixnet_client_id(
&self,
client_address: DestinationAddressBytes,
) -> Result<i64, GatewayStorageError>;
async fn insert_shared_keys(
&self,
client_address: DestinationAddressBytes,
shared_keys: &SharedGatewayKey,
) -> Result<i64, GatewayStorageError>;
async fn get_shared_keys(
&self,
client_address: DestinationAddressBytes,
) -> Result<Option<PersistedSharedKeys>, GatewayStorageError>;
#[allow(dead_code)]
async fn remove_shared_keys(
&self,
client_address: DestinationAddressBytes,
) -> Result<(), GatewayStorageError>;
async fn update_last_used_authentication_timestamp(
&self,
client_id: i64,
last_used_authentication_timestamp: OffsetDateTime,
) -> Result<(), GatewayStorageError>;
async fn get_client(&self, client_id: i64) -> Result<Option<Client>, GatewayStorageError>;
}
#[async_trait]
pub trait InboxGatewayStorage {
async fn store_message(
&self,
client_address: DestinationAddressBytes,
message: Vec<u8>,
) -> Result<(), GatewayStorageError>;
async fn retrieve_messages(
&self,
client_address: DestinationAddressBytes,
start_after: Option<i64>,
) -> Result<(Vec<StoredMessage>, Option<i64>), GatewayStorageError>;
async fn remove_messages(&self, ids: Vec<i64>) -> Result<(), GatewayStorageError>;
}
#[async_trait]
pub trait BandwidthGatewayStorage: dyn_clone::DynClone {
async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError>;
async fn set_expiration(
&self,
client_id: i64,
expiration: OffsetDateTime,
) -> Result<(), GatewayStorageError>;
async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError>;
async fn get_available_bandwidth(
&self,
client_id: i64,
) -> Result<Option<PersistedBandwidth>, GatewayStorageError>;
async fn increase_bandwidth(
&self,
client_id: i64,
amount: i64,
) -> Result<i64, GatewayStorageError>;
async fn revoke_ticket_bandwidth(
&self,
ticket_id: i64,
amount: i64,
) -> Result<(), GatewayStorageError>;
async fn decrease_bandwidth(
&self,
client_id: i64,
amount: i64,
) -> Result<i64, GatewayStorageError>;
async fn insert_epoch_signers(
&self,
epoch_id: i64,
signer_ids: Vec<i64>,
) -> Result<(), GatewayStorageError>;
async fn insert_received_ticket(
&self,
client_id: i64,
received_at: OffsetDateTime,
serial_number: Vec<u8>,
data: Vec<u8>,
) -> Result<i64, GatewayStorageError>;
async fn contains_ticket(&self, serial_number: &[u8]) -> Result<bool, GatewayStorageError>;
async fn insert_ticket_verification(
&self,
ticket_id: i64,
signer_id: i64,
verified_at: OffsetDateTime,
accepted: bool,
) -> Result<(), GatewayStorageError>;
async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError>;
async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError>;
async fn remove_verified_ticket_binary_data(
&self,
ticket_id: i64,
) -> Result<(), GatewayStorageError>;
async fn get_all_verified_tickets_with_sn(
&self,
) -> Result<Vec<VerifiedTicket>, GatewayStorageError>;
async fn get_all_proposed_tickets_with_sn(
&self,
proposal_id: u32,
) -> Result<Vec<VerifiedTicket>, GatewayStorageError>;
async fn insert_redemption_proposal(
&self,
tickets: &[VerifiedTicket],
proposal_id: u32,
created_at: OffsetDateTime,
) -> Result<(), GatewayStorageError>;
async fn clear_post_proposal_data(
&self,
proposal_id: u32,
resolved_at: OffsetDateTime,
rejected: bool,
) -> Result<(), GatewayStorageError>;
async fn latest_proposal(&self) -> Result<Option<RedemptionProposal>, GatewayStorageError>;
async fn get_all_unverified_tickets(&self) -> Result<Vec<ClientTicket>, GatewayStorageError>;
async fn get_all_unresolved_proposals(&self) -> Result<Vec<i64>, GatewayStorageError>;
async fn get_votes(&self, ticket_id: i64) -> Result<Vec<i64>, GatewayStorageError>;
async fn get_signers(&self, epoch_id: i64) -> Result<Vec<i64>, GatewayStorageError>;
/// Insert a wireguard peer in the storage.
///
/// # Arguments
///
/// * `peer`: wireguard peer data to be stored
async fn insert_wireguard_peer(
&self,
peer: &defguard_wireguard_rs::host::Peer,
client_type: ClientType,
) -> Result<i64, GatewayStorageError>;
/// Tries to retrieve available bandwidth for the particular peer.
///
/// # Arguments
///
/// * `peer_public_key`: wireguard public key of the peer to be retrieved.
async fn get_wireguard_peer(
&self,
peer_public_key: &str,
) -> Result<Option<WireguardPeer>, GatewayStorageError>;
/// Retrieves all wireguard peers.
async fn get_all_wireguard_peers(&self) -> Result<Vec<WireguardPeer>, GatewayStorageError>;
/// Remove a wireguard peer from the storage.
///
/// # Arguments
///
/// * `peer_public_key`: wireguard public key of the peer to be removed.
async fn remove_wireguard_peer(&self, peer_public_key: &str)
-> Result<(), GatewayStorageError>;
}
#[cfg(feature = "mock")]
pub mod mock {
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use super::*;
struct EcashSigner {
_epoch_id: i64,
_signer_id: i64,
}
struct ReceivedTicket {
client_id: i64,
_received_at: OffsetDateTime,
rejected: Option<bool>,
}
struct TicketData {
serial_number: Vec<u8>,
data: Option<Vec<u8>>,
}
struct TicketVerification {
_ticket_id: i64,
_signer_id: i64,
_verified_at: OffsetDateTime,
_accepted: bool,
}
#[derive(Clone, Default)]
pub struct MockGatewayStorage {
available_bandwidth: Arc<RwLock<HashMap<i64, PersistedBandwidth>>>,
ecash_signers: Arc<RwLock<Vec<EcashSigner>>>,
received_ticket: Arc<RwLock<HashMap<i64, ReceivedTicket>>>,
ticket_data: Arc<RwLock<HashMap<i64, TicketData>>>,
ticket_verification: Arc<RwLock<HashMap<i64, TicketVerification>>>,
verified_tickets: Arc<RwLock<Vec<i64>>>,
wireguard_peers: Arc<RwLock<HashMap<String, WireguardPeer>>>,
clients: Arc<RwLock<HashMap<i64, String>>>,
}
#[async_trait]
impl BandwidthGatewayStorage for MockGatewayStorage {
async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError> {
self.available_bandwidth.write().await.insert(
client_id,
PersistedBandwidth {
client_id,
available: 0,
expiration: Some(OffsetDateTime::UNIX_EPOCH),
},
);
Ok(())
}
async fn set_expiration(
&self,
client_id: i64,
expiration: OffsetDateTime,
) -> Result<(), GatewayStorageError> {
if let Some(bw) = self.available_bandwidth.write().await.get_mut(&client_id) {
bw.expiration = Some(expiration);
}
Ok(())
}
async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError> {
if let Some(bw) = self.available_bandwidth.write().await.get_mut(&client_id) {
bw.available = 0;
bw.expiration = Some(OffsetDateTime::UNIX_EPOCH);
}
Ok(())
}
async fn get_available_bandwidth(
&self,
client_id: i64,
) -> Result<Option<PersistedBandwidth>, GatewayStorageError> {
Ok(self
.available_bandwidth
.read()
.await
.get(&client_id)
.cloned())
}
async fn increase_bandwidth(
&self,
client_id: i64,
amount: i64,
) -> Result<i64, GatewayStorageError> {
self.available_bandwidth
.write()
.await
.get_mut(&client_id)
.map(|bw| {
bw.available += amount;
bw.available
})
.ok_or(GatewayStorageError::InternalDatabaseError(
sqlx::Error::RowNotFound,
))
}
async fn revoke_ticket_bandwidth(
&self,
ticket_id: i64,
amount: i64,
) -> Result<(), GatewayStorageError> {
if let Some(client_id) = self
.received_ticket
.read()
.await
.get(&ticket_id)
.map(|ticket| ticket.client_id)
{
if let Some(bw) = self.available_bandwidth.write().await.get_mut(&client_id) {
bw.available -= amount;
}
}
Ok(())
}
async fn decrease_bandwidth(
&self,
client_id: i64,
amount: i64,
) -> Result<i64, GatewayStorageError> {
self.available_bandwidth
.write()
.await
.get_mut(&client_id)
.map(|bw| {
bw.available -= amount;
bw.available
})
.ok_or(GatewayStorageError::InternalDatabaseError(
sqlx::Error::RowNotFound,
))
}
async fn insert_epoch_signers(
&self,
_epoch_id: i64,
signer_ids: Vec<i64>,
) -> Result<(), GatewayStorageError> {
self.ecash_signers
.write()
.await
.extend(signer_ids.iter().map(|signer_id| EcashSigner {
_epoch_id,
_signer_id: *signer_id,
}));
Ok(())
}
async fn insert_received_ticket(
&self,
client_id: i64,
_received_at: OffsetDateTime,
serial_number: Vec<u8>,
data: Vec<u8>,
) -> Result<i64, GatewayStorageError> {
let mut received_ticket = self.received_ticket.write().await;
let mut ticket_data = self.ticket_data.write().await;
let ticket_id = received_ticket.len() as i64;
received_ticket.insert(
ticket_id,
ReceivedTicket {
client_id,
_received_at,
rejected: None,
},
);
ticket_data.insert(
ticket_id,
TicketData {
serial_number,
data: Some(data),
},
);
Ok(ticket_id)
}
async fn contains_ticket(&self, serial_number: &[u8]) -> Result<bool, GatewayStorageError> {
Ok(self
.ticket_data
.read()
.await
.values()
.any(|ticket_data| ticket_data.serial_number == serial_number))
}
async fn insert_ticket_verification(
&self,
_ticket_id: i64,
_signer_id: i64,
_verified_at: OffsetDateTime,
_accepted: bool,
) -> Result<(), GatewayStorageError> {
self.ticket_verification.write().await.insert(
_ticket_id,
TicketVerification {
_ticket_id,
_signer_id,
_verified_at,
_accepted,
},
);
Ok(())
}
async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
if let Some(ticket) = self.received_ticket.write().await.get_mut(&ticket_id) {
ticket.rejected = Some(true);
}
self.ticket_data.write().await.remove(&ticket_id);
Ok(())
}
async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> {
self.verified_tickets.write().await.push(ticket_id);
self.ticket_verification.write().await.remove(&ticket_id);
Ok(())
}
async fn remove_verified_ticket_binary_data(
&self,
ticket_id: i64,
) -> Result<(), GatewayStorageError> {
if let Some(ticket) = self.ticket_data.write().await.get_mut(&ticket_id) {
ticket.data = None;
}
Ok(())
}
async fn get_all_verified_tickets_with_sn(
&self,
) -> Result<Vec<VerifiedTicket>, GatewayStorageError> {
todo!()
}
async fn get_all_proposed_tickets_with_sn(
&self,
_proposal_id: u32,
) -> Result<Vec<VerifiedTicket>, GatewayStorageError> {
todo!()
}
async fn insert_redemption_proposal(
&self,
_tickets: &[VerifiedTicket],
_proposal_id: u32,
_created_at: OffsetDateTime,
) -> Result<(), GatewayStorageError> {
todo!()
}
async fn clear_post_proposal_data(
&self,
_proposal_id: u32,
_resolved_at: OffsetDateTime,
_rejected: bool,
) -> Result<(), GatewayStorageError> {
todo!()
}
async fn latest_proposal(&self) -> Result<Option<RedemptionProposal>, GatewayStorageError> {
todo!()
}
async fn get_all_unverified_tickets(
&self,
) -> Result<Vec<ClientTicket>, GatewayStorageError> {
todo!()
}
async fn get_all_unresolved_proposals(&self) -> Result<Vec<i64>, GatewayStorageError> {
todo!()
}
async fn get_votes(&self, _ticket_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
todo!()
}
async fn get_signers(&self, _epoch_id: i64) -> Result<Vec<i64>, GatewayStorageError> {
todo!()
}
async fn insert_wireguard_peer(
&self,
peer: &defguard_wireguard_rs::host::Peer,
client_type: ClientType,
) -> Result<i64, GatewayStorageError> {
let mut wireguard_peers = self.wireguard_peers.write().await;
let mut clients = self.clients.write().await;
let client_id = if let Some(peer) = wireguard_peers.get(&peer.public_key.to_string()) {
peer.client_id
} else {
let client_id = clients.len() as i64;
clients.insert(client_id, client_type.to_string());
client_id
};
wireguard_peers.insert(
peer.public_key.to_string(),
WireguardPeer::from_defguard_peer(peer.clone(), client_id)?,
);
Ok(client_id)
}
async fn get_wireguard_peer(
&self,
peer_public_key: &str,
) -> Result<Option<WireguardPeer>, GatewayStorageError> {
Ok(self
.wireguard_peers
.read()
.await
.get(peer_public_key)
.cloned())
}
async fn get_all_wireguard_peers(&self) -> Result<Vec<WireguardPeer>, GatewayStorageError> {
todo!()
}
async fn remove_wireguard_peer(
&self,
peer_public_key: &str,
) -> Result<(), GatewayStorageError> {
self.wireguard_peers.write().await.remove(peer_public_key);
Ok(())
}
}
}
@@ -27,15 +27,18 @@ impl WgPeerManager {
pub(crate) async fn insert_peer(&self, peer: &WireguardPeer) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT OR IGNORE INTO wireguard_peer(public_key, preshared_key, protocol_version, endpoint, last_handshake, tx_bytes, rx_bytes, persistent_keepalive_interval, allowed_ips, client_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
INSERT OR IGNORE INTO wireguard_peer(public_key, allowed_ips, client_id)
VALUES (?, ?, ?);
UPDATE wireguard_peer
SET preshared_key = ?, protocol_version = ?, endpoint = ?, last_handshake = ?, tx_bytes = ?, rx_bytes = ?, persistent_keepalive_interval = ?, allowed_ips = ?, client_id = ?
SET allowed_ips = ?, client_id = ?
WHERE public_key = ?
"#,
peer.public_key, peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.client_id,
peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.client_id,
peer.public_key,
peer.allowed_ips,
peer.client_id,
peer.allowed_ips,
peer.client_id,
peer.public_key,
)
.execute(&self.connection_pool)
+10
View File
@@ -11,11 +11,13 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
defguard_wireguard_rs = { workspace = true }
dyn-clone = { workspace = true }
futures = { workspace = true }
# The latest version on crates.io at the time of writing this (6.0.0) has a
# version mismatch with x25519-dalek/curve25519-dalek that is resolved in the
@@ -37,3 +39,11 @@ nym-network-defaults = { path = "../network-defaults" }
nym-task = { path = "../task" }
nym-wireguard-types = { path = "../wireguard-types" }
nym-node-metrics = { path = "../../nym-node/nym-node-metrics" }
[dev-dependencies]
nym-gateway-storage = { path = "../gateway-storage", features = ["mock"] }
[features]
default = []
mock = ["nym-gateway-storage/mock"]
+4 -4
View File
@@ -3,18 +3,18 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("traffic byte data needs to be increasing")]
InconsistentConsumedBytes,
#[error("{0}")]
Defguard(#[from] defguard_wireguard_rs::error::WireguardInterfaceError),
#[error("internal {0}")]
Internal(String),
#[error("storage should have the requested bandwidht entry")]
#[error("storage should have the requested bandwidth entry")]
MissingClientBandwidthEntry,
#[error("kernel should have the requested client entry: {0}")]
MissingClientKernelEntry(String),
#[error("{0}")]
GatewayStorage(#[from] nym_gateway_storage::error::GatewayStorageError),
+87 -29
View File
@@ -6,16 +6,15 @@
// #![warn(clippy::expect_used)]
// #![warn(clippy::unwrap_used)]
use defguard_wireguard_rs::WGApi;
use defguard_wireguard_rs::{host::Peer, key::Key, net::IpAddrMask, WGApi, WireguardInterfaceApi};
use nym_crypto::asymmetric::x25519::KeyPair;
#[cfg(target_os = "linux")]
use nym_gateway_storage::GatewayStorage;
use nym_wireguard_types::Config;
use peer_controller::PeerControlRequest;
use std::sync::Arc;
use tokio::sync::mpsc::{self, Receiver, Sender};
#[cfg(target_os = "linux")]
use defguard_wireguard_rs::{host::Peer, key::Key, net::IpAddrMask};
#[cfg(target_os = "linux")]
use nym_network_defaults::constants::WG_TUN_BASE_NAME;
@@ -28,6 +27,81 @@ pub struct WgApiWrapper {
inner: WGApi,
}
impl WireguardInterfaceApi for WgApiWrapper {
fn create_interface(
&self,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.inner.create_interface()
}
fn assign_address(
&self,
address: &IpAddrMask,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.inner.assign_address(address)
}
fn configure_peer_routing(
&self,
peers: &[Peer],
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.inner.configure_peer_routing(peers)
}
#[cfg(not(target_os = "windows"))]
fn configure_interface(
&self,
config: &defguard_wireguard_rs::InterfaceConfiguration,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.inner.configure_interface(config)
}
#[cfg(target_os = "windows")]
fn configure_interface(
&self,
config: &defguard_wireguard_rs::InterfaceConfiguration,
dns: &[std::net::IpAddr],
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.inner.configure_interface(config, dns)
}
fn remove_interface(
&self,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.inner.remove_interface()
}
fn configure_peer(
&self,
peer: &Peer,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.inner.configure_peer(peer)
}
fn remove_peer(
&self,
peer_pubkey: &Key,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.inner.remove_peer(peer_pubkey)
}
fn read_interface_data(
&self,
) -> Result<
defguard_wireguard_rs::host::Host,
defguard_wireguard_rs::error::WireguardInterfaceError,
> {
self.inner.read_interface_data()
}
fn configure_dns(
&self,
dns: &[std::net::IpAddr],
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.inner.configure_dns(dns)
}
}
impl WgApiWrapper {
pub fn new(wg_api: WGApi) -> Self {
WgApiWrapper { inner: wg_api }
@@ -84,9 +158,9 @@ pub struct WireguardData {
/// Start wireguard device
#[cfg(target_os = "linux")]
pub async fn start_wireguard(
storage: nym_gateway_storage::GatewayStorage,
storage: GatewayStorage,
metrics: nym_node_metrics::NymNodeMetrics,
all_peers: Vec<nym_gateway_storage::models::WireguardPeer>,
peers: Vec<Peer>,
task_client: nym_task::TaskClient,
wireguard_data: WireguardData,
) -> Result<std::sync::Arc<WgApiWrapper>, Box<dyn std::error::Error + Send + Sync + 'static>> {
@@ -100,29 +174,13 @@ pub async fn start_wireguard(
let ifname = String::from(WG_TUN_BASE_NAME);
let wg_api = defguard_wireguard_rs::WGApi::new(ifname.clone(), false)?;
let mut peer_bandwidth_managers = HashMap::with_capacity(all_peers.len());
let peers = all_peers
.into_iter()
.map(Peer::try_from)
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.map(|mut peer| {
// since WGApi doesn't set those values on init, let's set them to 0
peer.rx_bytes = 0;
peer.tx_bytes = 0;
peer
})
.collect::<Vec<_>>();
let mut peer_bandwidth_managers = HashMap::with_capacity(peers.len());
for peer in peers.iter() {
let bandwidth_manager =
PeerController::generate_bandwidth_manager(storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
// Update storage with *x_bytes set to 0, as in kernel peers we can't set those values
// so we need to restart counting. Hopefully the bandwidth was counted in available_bandwidth
storage
.insert_wireguard_peer(peer, bandwidth_manager.is_some())
.await?;
let bandwidth_manager = Arc::new(RwLock::new(
PeerController::generate_bandwidth_manager(Box::new(storage.clone()), &peer.public_key)
.await?,
));
peer_bandwidth_managers.insert(peer.public_key.clone(), (bandwidth_manager, peer.clone()));
}
@@ -175,7 +233,7 @@ pub async fn start_wireguard(
let host = wg_api.read_interface_data()?;
let wg_api = std::sync::Arc::new(WgApiWrapper::new(wg_api));
let mut controller = PeerController::new(
storage,
Box::new(storage),
metrics,
wg_api.clone(),
host,
+206 -125
View File
@@ -8,14 +8,11 @@ use defguard_wireguard_rs::{
};
use futures::channel::oneshot;
use log::info;
use nym_authenticator_requests::latest::registration::{
RemainingBandwidthData, BANDWIDTH_CAP_PER_DAY,
};
use nym_credential_verification::{
bandwidth_storage_manager::BandwidthStorageManager, BandwidthFlushingBehaviourConfig,
ClientBandwidth,
};
use nym_gateway_storage::GatewayStorage;
use nym_gateway_storage::traits::BandwidthGatewayStorage;
use nym_node_metrics::NymNodeMetrics;
use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK;
use std::time::{Duration, SystemTime};
@@ -23,14 +20,12 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use crate::WgApiWrapper;
use crate::{error::Error, peer_handle::SharedBandwidthStorageManager};
use crate::{peer_handle::PeerHandle, peer_storage_manager::PeerStorageManager};
use crate::{peer_handle::PeerHandle, peer_storage_manager::CachedPeerManager};
pub enum PeerControlRequest {
AddPeer {
peer: Peer,
client_id: Option<i64>,
response_tx: oneshot::Sender<AddPeerControlResponse>,
},
RemovePeer {
@@ -41,10 +36,6 @@ pub enum PeerControlRequest {
key: Key,
response_tx: oneshot::Sender<QueryPeerControlResponse>,
},
QueryBandwidth {
key: Key,
response_tx: oneshot::Sender<QueryBandwidthControlResponse>,
},
GetClientBandwidth {
key: Key,
response_tx: oneshot::Sender<GetClientBandwidthControlResponse>,
@@ -64,17 +55,12 @@ pub struct QueryPeerControlResponse {
pub peer: Option<Peer>,
}
pub struct QueryBandwidthControlResponse {
pub success: bool,
pub bandwidth_data: Option<RemainingBandwidthData>,
}
pub struct GetClientBandwidthControlResponse {
pub client_bandwidth: Option<ClientBandwidth>,
}
pub struct PeerController {
storage: GatewayStorage,
storage: Box<dyn BandwidthGatewayStorage + Send + Sync>,
// we have "all" metrics of a node, but they're behind a single Arc pointer,
// so the overhead is minimal
@@ -83,9 +69,9 @@ pub struct PeerController {
// used to receive commands from individual handles too
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
wg_api: Arc<WgApiWrapper>,
wg_api: Arc<dyn WireguardInterfaceApi + Send + Sync>,
host_information: Arc<RwLock<Host>>,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager>>,
bw_storage_managers: HashMap<Key, SharedBandwidthStorageManager>,
timeout_check_interval: IntervalStream,
task_client: nym_task::TaskClient,
}
@@ -93,11 +79,11 @@ pub struct PeerController {
impl PeerController {
#[allow(clippy::too_many_arguments)]
pub fn new(
storage: GatewayStorage,
storage: Box<dyn BandwidthGatewayStorage + Send + Sync>,
metrics: NymNodeMetrics,
wg_api: Arc<WgApiWrapper>,
wg_api: Arc<dyn WireguardInterfaceApi + Send + Sync>,
initial_host_information: Host,
bw_storage_managers: HashMap<Key, (Option<SharedBandwidthStorageManager>, Peer)>,
bw_storage_managers: HashMap<Key, (SharedBandwidthStorageManager, Peer)>,
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
task_client: nym_task::TaskClient,
@@ -107,15 +93,11 @@ impl PeerController {
);
let host_information = Arc::new(RwLock::new(initial_host_information));
for (public_key, (bandwidth_storage_manager, peer)) in bw_storage_managers.iter() {
let peer_storage_manager = PeerStorageManager::new(
storage.clone(),
peer.clone(),
bandwidth_storage_manager.is_some(),
);
let cached_peer_manager = CachedPeerManager::new(peer);
let mut handle = PeerHandle::new(
public_key.clone(),
host_information.clone(),
peer_storage_manager,
cached_peer_manager,
bandwidth_storage_manager.clone(),
request_tx.clone(),
&task_client,
@@ -144,32 +126,11 @@ impl PeerController {
}
}
// Function that should be used for peer insertion, to handle both storage and kernel interaction
pub async fn add_peer(&self, peer: &Peer, client_id: Option<i64>) -> Result<(), Error> {
if client_id.is_none() {
self.storage.insert_wireguard_peer(peer, false).await?;
}
let ret: Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> =
self.wg_api.inner.configure_peer(peer);
if client_id.is_none() && ret.is_err() {
// Try to revert the insertion in storage
if self
.storage
.remove_wireguard_peer(&peer.public_key.to_string())
.await
.is_err()
{
log::error!("The storage has been corrupted. Wireguard peer {} will persist in storage indefinitely.", peer.public_key);
}
}
Ok(ret?)
}
// Function that should be used for peer removal, to handle both storage and kernel interaction
pub async fn remove_peer(&mut self, key: &Key) -> Result<(), Error> {
self.storage.remove_wireguard_peer(&key.to_string()).await?;
self.bw_storage_managers.remove(key);
let ret = self.wg_api.inner.remove_peer(key);
let ret = self.wg_api.remove_peer(key);
if ret.is_err() {
log::error!("Wireguard peer could not be removed from wireguard kernel module. Process should be restarted so that the interface is reset.");
}
@@ -177,50 +138,43 @@ impl PeerController {
}
pub async fn generate_bandwidth_manager(
storage: GatewayStorage,
storage: Box<dyn BandwidthGatewayStorage + Send + Sync>,
public_key: &Key,
) -> Result<Option<BandwidthStorageManager>, Error> {
if let Some(client_id) = storage
) -> Result<BandwidthStorageManager, Error> {
let client_id = storage
.get_wireguard_peer(&public_key.to_string())
.await?
.ok_or(Error::MissingClientBandwidthEntry)?
.client_id
{
let bandwidth = storage
.get_available_bandwidth(client_id)
.await?
.ok_or(Error::MissingClientBandwidthEntry)?;
Ok(Some(BandwidthStorageManager::new(
storage,
ClientBandwidth::new(bandwidth.into()),
client_id,
BandwidthFlushingBehaviourConfig::default(),
true,
)))
} else {
Ok(None)
}
.client_id;
let bandwidth = storage
.get_available_bandwidth(client_id)
.await?
.ok_or(Error::MissingClientBandwidthEntry)?;
Ok(BandwidthStorageManager::new(
storage,
ClientBandwidth::new(bandwidth.into()),
client_id,
BandwidthFlushingBehaviourConfig::default(),
true,
))
}
async fn handle_add_request(
&mut self,
peer: &Peer,
client_id: Option<i64>,
) -> Result<(), Error> {
self.add_peer(peer, client_id).await?;
let bandwidth_storage_manager =
Self::generate_bandwidth_manager(self.storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
let peer_storage_manager = PeerStorageManager::new(
self.storage.clone(),
peer.clone(),
bandwidth_storage_manager.is_some(),
);
async fn handle_add_request(&mut self, peer: &Peer) -> Result<(), Error> {
self.wg_api.configure_peer(peer)?;
let bandwidth_storage_manager = Arc::new(RwLock::new(
Self::generate_bandwidth_manager(
dyn_clone::clone_box(&*self.storage),
&peer.public_key,
)
.await?,
));
let cached_peer_manager = CachedPeerManager::new(peer);
let mut handle = PeerHandle::new(
peer.public_key.clone(),
self.host_information.clone(),
peer_storage_manager,
cached_peer_manager,
bandwidth_storage_manager.clone(),
self.request_tx.clone(),
&self.task_client,
@@ -228,7 +182,7 @@ impl PeerController {
self.bw_storage_managers
.insert(peer.public_key.clone(), bandwidth_storage_manager);
// try to immediately update the host information, to eliminate races
if let Ok(host_information) = self.wg_api.inner.read_interface_data() {
if let Ok(host_information) = self.wg_api.read_interface_data() {
*self.host_information.write().await = host_information;
}
let public_key = peer.public_key.clone();
@@ -248,35 +202,8 @@ impl PeerController {
.transpose()?)
}
async fn handle_query_bandwidth(
&self,
key: &Key,
) -> Result<Option<RemainingBandwidthData>, Error> {
let Some(bandwidth_storage_manager) = self.bw_storage_managers.get(key) else {
return Ok(None);
};
let available_bandwidth = if let Some(bandwidth_storage_manager) = bandwidth_storage_manager
{
bandwidth_storage_manager
.read()
.await
.available_bandwidth()
.await
} else {
let Some(peer) = self.host_information.read().await.peers.get(key).cloned() else {
// host information not updated yet
return Ok(None);
};
BANDWIDTH_CAP_PER_DAY.saturating_sub(peer.rx_bytes + peer.tx_bytes) as i64
};
Ok(Some(RemainingBandwidthData {
available_bandwidth,
}))
}
async fn handle_get_client_bandwidth(&self, key: &Key) -> Option<ClientBandwidth> {
if let Some(Some(bandwidth_storage_manager)) = self.bw_storage_managers.get(key) {
if let Some(bandwidth_storage_manager) = self.bw_storage_managers.get(key) {
Some(bandwidth_storage_manager.read().await.client_bandwidth())
} else {
None
@@ -362,7 +289,7 @@ impl PeerController {
loop {
tokio::select! {
_ = self.timeout_check_interval.next() => {
let Ok(host) = self.wg_api.inner.read_interface_data() else {
let Ok(host) = self.wg_api.read_interface_data() else {
log::error!("Can't read wireguard kernel data");
continue;
};
@@ -376,8 +303,8 @@ impl PeerController {
}
msg = self.request_rx.recv() => {
match msg {
Some(PeerControlRequest::AddPeer { peer, client_id, response_tx }) => {
let ret = self.handle_add_request(&peer, client_id).await;
Some(PeerControlRequest::AddPeer { peer, response_tx }) => {
let ret = self.handle_add_request(&peer).await;
if ret.is_ok() {
response_tx.send(AddPeerControlResponse { success: true }).ok();
} else {
@@ -396,14 +323,6 @@ impl PeerController {
response_tx.send(QueryPeerControlResponse { success: false, peer: None }).ok();
}
}
Some(PeerControlRequest::QueryBandwidth { key, response_tx }) => {
let ret = self.handle_query_bandwidth(&key).await;
if let Ok(bandwidth_data) = ret {
response_tx.send(QueryBandwidthControlResponse { success: true, bandwidth_data }).ok();
} else {
response_tx.send(QueryBandwidthControlResponse { success: false, bandwidth_data: None }).ok();
}
}
Some(PeerControlRequest::GetClientBandwidth { key, response_tx }) => {
let client_bandwidth = self.handle_get_client_bandwidth(&key).await;
response_tx.send(GetClientBandwidthControlResponse { client_bandwidth }).ok();
@@ -419,3 +338,165 @@ impl PeerController {
}
}
}
#[cfg(feature = "mock")]
#[derive(Default)]
struct MockWgApi {
peers: std::sync::RwLock<HashMap<Key, Peer>>,
}
#[cfg(feature = "mock")]
impl WireguardInterfaceApi for MockWgApi {
fn create_interface(
&self,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
todo!()
}
fn assign_address(
&self,
_address: &defguard_wireguard_rs::net::IpAddrMask,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
todo!()
}
fn configure_peer_routing(
&self,
_peers: &[Peer],
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
todo!()
}
#[cfg(not(target_os = "windows"))]
fn configure_interface(
&self,
_config: &defguard_wireguard_rs::InterfaceConfiguration,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
todo!()
}
#[cfg(target_os = "windows")]
fn configure_interface(
&self,
config: &defguard_wireguard_rs::InterfaceConfiguration,
dns: &[std::net::IpAddr],
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
todo!()
}
fn remove_interface(
&self,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
todo!()
}
fn configure_peer(
&self,
peer: &Peer,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.peers
.write()
.unwrap()
.insert(peer.public_key.clone(), peer.clone());
Ok(())
}
fn remove_peer(
&self,
peer_pubkey: &Key,
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
self.peers.write().unwrap().remove(peer_pubkey);
Ok(())
}
fn read_interface_data(
&self,
) -> Result<Host, defguard_wireguard_rs::error::WireguardInterfaceError> {
let mut host = Host::default();
host.peers = self.peers.read().unwrap().clone();
Ok(host)
}
fn configure_dns(
&self,
_dns: &[std::net::IpAddr],
) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> {
todo!()
}
}
#[cfg(feature = "mock")]
pub fn start_controller(
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
) -> (
nym_gateway_storage::traits::mock::MockGatewayStorage,
nym_task::TaskManager,
) {
let storage = nym_gateway_storage::traits::mock::MockGatewayStorage::default();
let wg_api = Arc::new(MockWgApi::default());
let task_manager = nym_task::TaskManager::default();
let mut peer_controller = PeerController::new(
Box::new(storage.clone()),
Default::default(),
wg_api,
Default::default(),
Default::default(),
request_tx,
request_rx,
task_manager.subscribe(),
);
tokio::spawn(async move { peer_controller.run().await });
(storage, task_manager)
}
#[cfg(feature = "mock")]
pub async fn stop_controller(mut task_manager: nym_task::TaskManager) {
task_manager.signal_shutdown().unwrap();
task_manager.wait_for_shutdown().await;
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn start_and_stop() {
let (request_tx, request_rx) = mpsc::channel(1);
let (_, task_manager) = start_controller(request_tx.clone(), request_rx);
stop_controller(task_manager).await;
}
// #[tokio::test]
// async fn add_peer() {
// let (request_tx, storage, mut task_manager) = start_controller();
// let peer = Peer::default();
// let (response_tx, response_rx) = oneshot::channel();
// request_tx
// .send(PeerControlRequest::AddPeer {
// peer: peer.clone(),
// response_tx,
// })
// .await
// .unwrap();
// let response = response_rx.await.unwrap();
// assert!(!response.success);
// storage
// .insert_wireguard_peer(&peer, FromStr::from_str("entry_wireguard").unwrap())
// .await
// .unwrap();
// let (response_tx, response_rx) = oneshot::channel();
// request_tx
// .send(PeerControlRequest::AddPeer { peer, response_tx })
// .await
// .unwrap();
// let response = response_rx.await.unwrap();
// assert!(response.success);
// task_manager.signal_shutdown().unwrap();
// task_manager.wait_for_shutdown().await;
// }
}
+53 -81
View File
@@ -3,13 +3,10 @@
use crate::error::Error;
use crate::peer_controller::PeerControlRequest;
use crate::peer_storage_manager::PeerStorageManager;
use defguard_wireguard_rs::host::Peer;
use crate::peer_storage_manager::{CachedPeerManager, PeerInformation};
use defguard_wireguard_rs::{host::Host, key::Key};
use futures::channel::oneshot;
use nym_authenticator_requests::latest::registration::BANDWIDTH_CAP_PER_DAY;
use nym_credential_verification::bandwidth_storage_manager::BandwidthStorageManager;
use nym_gateway_storage::models::WireguardPeer;
use nym_task::TaskClient;
use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK;
use std::sync::Arc;
@@ -21,8 +18,8 @@ pub(crate) type SharedBandwidthStorageManager = Arc<RwLock<BandwidthStorageManag
pub struct PeerHandle {
public_key: Key,
host_information: Arc<RwLock<Host>>,
peer_storage_manager: PeerStorageManager,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager>,
cached_peer: CachedPeerManager,
bandwidth_storage_manager: SharedBandwidthStorageManager,
request_tx: mpsc::Sender<PeerControlRequest>,
timeout_check_interval: IntervalStream,
task_client: TaskClient,
@@ -32,8 +29,8 @@ impl PeerHandle {
pub fn new(
public_key: Key,
host_information: Arc<RwLock<Host>>,
peer_storage_manager: PeerStorageManager,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager>,
cached_peer: CachedPeerManager,
bandwidth_storage_manager: SharedBandwidthStorageManager,
request_tx: mpsc::Sender<PeerControlRequest>,
task_client: &TaskClient,
) -> Self {
@@ -45,7 +42,7 @@ impl PeerHandle {
PeerHandle {
public_key,
host_information,
peer_storage_manager,
cached_peer,
bandwidth_storage_manager,
request_tx,
timeout_check_interval,
@@ -69,14 +66,10 @@ impl PeerHandle {
Ok(success)
}
fn compute_spent_bandwidth(kernel_peer: &Peer, storage_peer: &WireguardPeer) -> Option<u64> {
let storage_peer_rx_bytes = u64::try_from(storage_peer.rx_bytes)
.inspect_err(|e| tracing::error!("Storage rx bytes could not be converted: {e}"))
.ok()?;
let storage_peer_tx_bytes = u64::try_from(storage_peer.tx_bytes)
.inspect_err(|e| tracing::error!("Storage tx bytes could not be converted: {e}"))
.ok()?;
fn compute_spent_bandwidth(
kernel_peer: PeerInformation,
cached_peer: PeerInformation,
) -> Option<u64> {
let kernel_total = kernel_peer
.rx_bytes
.checked_add(kernel_peer.tx_bytes)
@@ -88,21 +81,26 @@ impl PeerHandle {
);
None
})?;
let storage_total = storage_peer_rx_bytes
.checked_add(storage_peer_tx_bytes)
let cached_total = cached_peer
.rx_bytes
.checked_add(cached_peer.tx_bytes)
.or_else(|| {
tracing::error!("Overflow on storage adding bytes: {storage_peer_rx_bytes} + {storage_peer_tx_bytes}");
tracing::error!(
"Overflow on cached adding bytes: {} + {}",
cached_peer.rx_bytes,
cached_peer.tx_bytes
);
None
})?;
kernel_total.checked_sub(storage_total).or_else(|| {
tracing::error!("Overflow on spent bandwidth subtraction: kernel - storage = {kernel_total} - {storage_total}");
kernel_total.checked_sub(cached_total).or_else(|| {
tracing::error!("Overflow on spent bandwidth subtraction: kernel - cached = {kernel_total} - {cached_total}");
None
})
}
async fn active_peer(&mut self, kernel_peer: &Peer) -> Result<bool, Error> {
let Some(storage_peer) = self.peer_storage_manager.get_peer() else {
async fn active_peer(&mut self, kernel_peer: PeerInformation) -> Result<bool, Error> {
let Some(cached_peer) = self.cached_peer.get_peer() else {
log::debug!(
"Peer {:?} not in storage anymore, shutting down handle",
self.public_key
@@ -110,76 +108,51 @@ impl PeerHandle {
return Ok(false);
};
if let Some(bandwidth_manager) = &self.bandwidth_storage_manager {
let spent_bandwidth = Self::compute_spent_bandwidth(kernel_peer, &storage_peer)
.unwrap_or_else(|| {
// if gateway restarted, the kernel values restart from 0
// and we should restart from 0 in storage as well
if let Some(peer_information) =
self.peer_storage_manager.peer_information.as_mut()
{
peer_information.force_sync = true;
peer_information.peer.rx_bytes = kernel_peer.rx_bytes;
peer_information.peer.tx_bytes = kernel_peer.tx_bytes;
}
0
})
.try_into()
.map_err(|_| Error::InconsistentConsumedBytes)?;
if spent_bandwidth > 0 {
self.peer_storage_manager.update_trx(kernel_peer);
if bandwidth_manager
.write()
.await
.try_use_bandwidth(spent_bandwidth)
.await
.is_err()
{
tracing::debug!(
"Peer {} is out of bandwidth, removing it",
kernel_peer.public_key.to_string()
);
let success = self.remove_peer().await?;
self.peer_storage_manager.remove_peer();
return Ok(!success);
}
}
} else {
let spent_bandwidth = kernel_peer.rx_bytes + kernel_peer.tx_bytes;
if spent_bandwidth >= BANDWIDTH_CAP_PER_DAY {
log::debug!(
"Peer {} doesn't have bandwidth anymore, removing it",
self.public_key
);
let success = self.remove_peer().await?;
return Ok(!success);
}
let spent_bandwidth = Self::compute_spent_bandwidth(kernel_peer, cached_peer)
.unwrap_or_default()
.try_into()
.inspect_err(|err| tracing::error!("Could not convert from u64 to i64: {err:?}"))
.unwrap_or_default();
self.cached_peer.update(kernel_peer);
if spent_bandwidth > 0
&& self
.bandwidth_storage_manager
.write()
.await
.try_use_bandwidth(spent_bandwidth)
.await
.is_err()
{
tracing::debug!(
"Peer {} is out of bandwidth, removing it",
self.public_key.to_string()
);
let success = self.remove_peer().await?;
self.cached_peer.remove_peer();
return Ok(!success);
}
Ok(true)
}
async fn continue_checking(&mut self) -> Result<bool, Error> {
let Some(kernel_peer) = self
let kernel_peer = self
.host_information
.read()
.await
.peers
.get(&self.public_key)
.cloned()
else {
// the host information hasn't beed updated yet
return Ok(true);
};
if !self.active_peer(&kernel_peer).await? {
.ok_or(Error::MissingClientKernelEntry(self.public_key.to_string()))?
.into();
if !self.active_peer(kernel_peer).await? {
log::debug!(
"Peer {:?} is not active anymore, shutting down handle",
self.public_key
);
Ok(false)
} else {
// Update storage values
self.peer_storage_manager.sync_storage_peer().await?;
Ok(true)
}
}
@@ -208,11 +181,10 @@ impl PeerHandle {
_ = self.task_client.recv() => {
log::trace!("PeerHandle: Received shutdown");
if let Some(bandwidth_manager) = &self.bandwidth_storage_manager {
if let Err(e) = bandwidth_manager.write().await.sync_storage_bandwidth().await {
log::error!("Storage sync failed - {e}, unaccounted bandwidth might have been consumed");
}
if let Err(e) = self.bandwidth_storage_manager.write().await.sync_storage_bandwidth().await {
log::error!("Storage sync failed - {e}, unaccounted bandwidth might have been consumed");
}
log::trace!("PeerHandle: Finished shutdown");
}
}
+21 -91
View File
@@ -1,12 +1,8 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::Error;
use defguard_wireguard_rs::host::Peer;
use nym_gateway_storage::models::WireguardPeer;
use nym_gateway_storage::GatewayStorage;
use std::time::Duration;
use time::OffsetDateTime;
const DEFAULT_PEER_MAX_FLUSHING_RATE: Duration = Duration::from_secs(60 * 60 * 24); // 24h
const DEFAULT_PEER_MAX_DELTA_FLUSHING_AMOUNT: u64 = 512 * 1024 * 1024; // 512MB
@@ -29,116 +25,50 @@ impl Default for PeerFlushingBehaviourConfig {
}
}
pub struct PeerStorageManager {
pub(crate) storage: GatewayStorage,
pub struct CachedPeerManager {
pub(crate) peer_information: Option<PeerInformation>,
pub(crate) cfg: PeerFlushingBehaviourConfig,
pub(crate) with_client_id: bool,
}
impl PeerStorageManager {
pub(crate) fn new(storage: GatewayStorage, peer: Peer, with_client_id: bool) -> Self {
let peer_information = Some(PeerInformation::new(peer));
impl CachedPeerManager {
pub(crate) fn new(peer: &Peer) -> Self {
Self {
storage,
peer_information,
cfg: PeerFlushingBehaviourConfig::default(),
with_client_id,
peer_information: Some(peer.into()),
}
}
pub(crate) fn get_peer(&self) -> Option<WireguardPeer> {
pub(crate) fn get_peer(&self) -> Option<PeerInformation> {
self.peer_information
.as_ref()
.map(|p| p.peer.clone().into())
}
pub(crate) fn remove_peer(&mut self) {
self.peer_information = None;
}
pub(crate) fn update_trx(&mut self, kernel_peer: &Peer) {
pub(crate) fn update(&mut self, kernel_peer: PeerInformation) {
if let Some(peer_information) = self.peer_information.as_mut() {
peer_information.update_trx_bytes(kernel_peer.tx_bytes, kernel_peer.rx_bytes);
peer_information.update_trx_bytes(kernel_peer);
}
}
pub(crate) async fn sync_storage_peer(&mut self) -> Result<(), Error> {
let Some(peer_information) = self.peer_information.as_mut() else {
return Ok(());
};
if !peer_information.should_sync(self.cfg) {
return Ok(());
}
if self
.storage
.get_wireguard_peer(&peer_information.peer().public_key.to_string())
.await?
.is_none()
{
self.peer_information = None;
return Ok(());
}
self.storage
.insert_wireguard_peer(peer_information.peer(), self.with_client_id)
.await?;
peer_information.resync_peer_with_storage();
Ok(())
}
}
#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub(crate) struct PeerInformation {
pub(crate) peer: Peer,
pub(crate) last_synced: OffsetDateTime,
pub(crate) tx_bytes: u64,
pub(crate) rx_bytes: u64,
}
pub(crate) bytes_delta_since_sync: u64,
pub(crate) force_sync: bool,
impl From<&Peer> for PeerInformation {
fn from(value: &Peer) -> Self {
Self {
tx_bytes: value.tx_bytes,
rx_bytes: value.rx_bytes,
}
}
}
impl PeerInformation {
pub fn new(peer: Peer) -> PeerInformation {
PeerInformation {
peer,
last_synced: OffsetDateTime::now_utc(),
bytes_delta_since_sync: 0,
force_sync: false,
}
}
pub(crate) fn should_sync(&self, cfg: PeerFlushingBehaviourConfig) -> bool {
if self.force_sync {
return true;
}
if self.bytes_delta_since_sync >= cfg.peer_max_delta_flushing_amount {
return true;
}
if self.last_synced + cfg.peer_max_flushing_rate < OffsetDateTime::now_utc()
&& self.bytes_delta_since_sync != 0
{
return true;
}
false
}
pub(crate) fn peer(&self) -> &Peer {
&self.peer
}
pub(crate) fn update_trx_bytes(&mut self, tx_bytes: u64, rx_bytes: u64) {
self.bytes_delta_since_sync += tx_bytes.saturating_sub(self.peer.tx_bytes)
+ rx_bytes.saturating_sub(self.peer.rx_bytes);
self.peer.tx_bytes = tx_bytes;
self.peer.rx_bytes = rx_bytes;
}
pub(crate) fn resync_peer_with_storage(&mut self) {
self.bytes_delta_since_sync = 0;
self.last_synced = OffsetDateTime::now_utc();
self.force_sync = false;
pub(crate) fn update_trx_bytes(&mut self, peer: PeerInformation) {
self.tx_bytes = peer.tx_bytes;
self.rx_bytes = peer.rx_bytes;
}
}
@@ -24,6 +24,8 @@ use nym_gateway_requests::{
SimpleGatewayRequestsError,
};
use nym_gateway_storage::error::GatewayStorageError;
use nym_gateway_storage::traits::BandwidthGatewayStorage;
use nym_gateway_storage::traits::SharedKeyGatewayStorage;
use nym_node_metrics::events::MetricsEvent;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_statistics_common::{gateways::GatewaySessionEvent, types::SessionType};
@@ -190,7 +192,7 @@ impl<R, S> AuthenticatedHandler<R, S> {
let handler = AuthenticatedHandler {
bandwidth_storage_manager: BandwidthStorageManager::new(
fresh.shared_state.storage.clone(),
Box::new(fresh.shared_state.storage.clone()),
ClientBandwidth::new(bandwidth.into()),
client.id,
fresh.shared_state.cfg.bandwidth,
@@ -27,6 +27,9 @@ use nym_gateway_requests::{
INITIAL_PROTOCOL_VERSION,
};
use nym_gateway_storage::error::GatewayStorageError;
use nym_gateway_storage::traits::BandwidthGatewayStorage;
use nym_gateway_storage::traits::InboxGatewayStorage;
use nym_gateway_storage::traits::SharedKeyGatewayStorage;
use nym_node_metrics::events::MetricsEvent;
use nym_sphinx::DestinationAddressBytes;
use nym_task::TaskClient;
+10 -9
View File
@@ -13,7 +13,6 @@ use nym_credential_verification::ecash::{
credential_sender::CredentialHandlerConfig, EcashManager,
};
use nym_crypto::asymmetric::ed25519;
use nym_gateway_storage::models::WireguardPeer;
use nym_ip_packet_router::IpPacketRouter;
use nym_mixnet_client::forwarder::MixForwardingSender;
use nym_network_defaults::NymNetworkDetails;
@@ -38,7 +37,7 @@ mod stale_data_cleaner;
use crate::node::stale_data_cleaner::StaleMessagesCleaner;
pub use client_handling::active_clients::ActiveClientsStore;
pub use nym_gateway_stats_storage::PersistentStatsStorage;
pub use nym_gateway_storage::{error::GatewayStorageError, GatewayStorage};
pub use nym_gateway_storage::{error::GatewayStorageError, traits::*, GatewayStorage};
use nym_node_metrics::NymNodeMetrics;
pub use nym_sdk::{NymApiTopologyProvider, NymApiTopologyProviderConfig, UserAgent};
@@ -93,7 +92,7 @@ pub struct GatewayTasksBuilder {
// populated and cached as necessary
ecash_manager: Option<Arc<EcashManager>>,
wireguard_peers: Option<Vec<WireguardPeer>>,
wireguard_peers: Option<Vec<defguard_wireguard_rs::host::Peer>>,
wireguard_networks: Option<Vec<IpAddr>>,
}
@@ -357,12 +356,12 @@ impl GatewayTasksBuilder {
async fn build_wireguard_peers_and_networks(
&self,
) -> Result<(Vec<WireguardPeer>, Vec<IpAddr>), GatewayError> {
) -> Result<(Vec<defguard_wireguard_rs::host::Peer>, Vec<IpAddr>), GatewayError> {
let mut used_private_network_ips = vec![];
let mut all_peers = vec![];
for wireguard_peer in self.storage.get_all_wireguard_peers().await?.into_iter() {
let mut peer = defguard_wireguard_rs::host::Peer::try_from(wireguard_peer.clone())?;
let Some(peer) = peer.allowed_ips.pop() else {
let Some(allowed_ip) = peer.allowed_ips.pop() else {
let peer_identity = &peer.public_key;
warn!("Peer {peer_identity} has empty allowed ips. It will be removed",);
self.storage
@@ -370,8 +369,8 @@ impl GatewayTasksBuilder {
.await?;
continue;
};
used_private_network_ips.push(peer.ip);
all_peers.push(wireguard_peer);
used_private_network_ips.push(allowed_ip.ip);
all_peers.push(peer);
}
Ok((all_peers, used_private_network_ips))
@@ -379,7 +378,9 @@ impl GatewayTasksBuilder {
// only used under linux
#[allow(dead_code)]
async fn get_wireguard_peers(&mut self) -> Result<Vec<WireguardPeer>, GatewayError> {
async fn get_wireguard_peers(
&mut self,
) -> Result<Vec<defguard_wireguard_rs::host::Peer>, GatewayError> {
if let Some(cached) = self.wireguard_peers.take() {
return Ok(cached);
}
@@ -432,8 +433,8 @@ impl GatewayTasksBuilder {
opts.config.clone(),
wireguard_data.inner.clone(),
used_private_network_ips,
ecash_manager,
)
.with_ecash_verifier(ecash_manager)
.with_custom_gateway_transceiver(transceiver)
.with_shutdown(self.shutdown.fork("authenticator_sp"))
.with_wait_for_gateway(true)
+3 -1
View File
@@ -1,7 +1,9 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_gateway::node::{ActiveClientsStore, GatewayStorage, GatewayStorageError};
use nym_gateway::node::{
ActiveClientsStore, GatewayStorage, GatewayStorageError, InboxGatewayStorage,
};
use nym_sphinx_types::DestinationAddressBytes;
use tracing::debug;
@@ -54,3 +54,6 @@ nym-wireguard-types = { path = "../../common/wireguard-types" }
[dev-dependencies]
mock_instant = "0.5.3"
time = { workspace = true }
nym-wireguard = { path = "../../common/wireguard", features = ["mock"] }
@@ -31,7 +31,7 @@ pub struct Authenticator {
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send + Sync>>,
wireguard_gateway_data: WireguardGatewayData,
ecash_verifier: Option<Arc<EcashManager>>,
ecash_verifier: Arc<EcashManager>,
used_private_network_ips: Vec<IpAddr>,
shutdown: Option<TaskClient>,
on_start: Option<oneshot::Sender<OnStartData>>,
@@ -42,13 +42,14 @@ impl Authenticator {
config: Config,
wireguard_gateway_data: WireguardGatewayData,
used_private_network_ips: Vec<IpAddr>,
ecash_verifier: Arc<EcashManager>,
) -> Self {
Self {
config,
wait_for_gateway: false,
custom_topology_provider: None,
custom_gateway_transceiver: None,
ecash_verifier: None,
ecash_verifier,
wireguard_gateway_data,
used_private_network_ips,
shutdown: None,
@@ -56,13 +57,6 @@ impl Authenticator {
}
}
#[must_use]
#[allow(unused)]
pub fn with_ecash_verifier(mut self, ecash_verifier: Arc<EcashManager>) -> Self {
self.ecash_verifier = Some(ecash_verifier);
self
}
#[must_use]
#[allow(unused)]
pub fn with_shutdown(mut self, shutdown: TaskClient) -> Self {
@@ -1,30 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliAuthenticatorClient;
use nym_authenticator::error::AuthenticatorError;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::cli_helpers::client_add_gateway::{add_gateway, CommonClientAddGatewayArgs};
#[derive(clap::Args)]
pub(crate) struct Args {
#[command(flatten)]
common_args: CommonClientAddGatewayArgs,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
impl AsRef<CommonClientAddGatewayArgs> for Args {
fn as_ref(&self) -> &CommonClientAddGatewayArgs {
&self.common_args
}
}
pub(crate) async fn execute(args: Args) -> Result<(), AuthenticatorError> {
let output = args.output;
let res = add_gateway::<CliAuthenticatorClient, _>(args, None).await?;
println!("{}", output.format(&res));
Ok(())
}
@@ -1,16 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use clap::Args;
use nym_bin_common::bin_info_owned;
use nym_bin_common::output_format::OutputFormat;
#[derive(Args)]
pub(crate) struct BuildInfo {
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
pub(crate) fn execute(args: BuildInfo) {
println!("{}", args.output.format(&bin_info_owned!()))
}
@@ -1,16 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliAuthenticatorClient;
use nym_authenticator::error::AuthenticatorError;
use nym_client_core::cli_helpers::client_import_coin_index_signatures::{
import_coin_index_signatures, CommonClientImportCoinIndexSignaturesArgs,
};
pub(crate) async fn execute(
args: CommonClientImportCoinIndexSignaturesArgs,
) -> Result<(), AuthenticatorError> {
import_coin_index_signatures::<CliAuthenticatorClient, _>(args).await?;
println!("successfully imported coin index signatures!");
Ok(())
}
@@ -1,14 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliAuthenticatorClient;
use nym_authenticator::error::AuthenticatorError;
use nym_client_core::cli_helpers::client_import_credential::{
import_credential, CommonClientImportTicketBookArgs,
};
pub async fn execute(args: CommonClientImportTicketBookArgs) -> Result<(), AuthenticatorError> {
import_credential::<CliAuthenticatorClient, _>(args).await?;
println!("successfully imported credential!");
Ok(())
}
@@ -1,16 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliAuthenticatorClient;
use nym_authenticator::error::AuthenticatorError;
use nym_client_core::cli_helpers::client_import_expiration_date_signatures::{
import_expiration_date_signatures, CommonClientImportExpirationDateSignaturesArgs,
};
pub(crate) async fn execute(
args: CommonClientImportExpirationDateSignaturesArgs,
) -> Result<(), AuthenticatorError> {
import_expiration_date_signatures::<CliAuthenticatorClient, _>(args).await?;
println!("successfully imported expiration date signatures!");
Ok(())
}
@@ -1,16 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliAuthenticatorClient;
use nym_authenticator::error::AuthenticatorError;
use nym_client_core::cli_helpers::client_import_master_verification_key::{
import_master_verification_key, CommonClientImportMasterVerificationKeyArgs,
};
pub(crate) async fn execute(
args: CommonClientImportMasterVerificationKeyArgs,
) -> Result<(), AuthenticatorError> {
import_master_verification_key::<CliAuthenticatorClient, _>(args).await?;
println!("successfully imported master verification key!");
Ok(())
}
@@ -1,59 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use clap::{Args, Subcommand};
use nym_authenticator::error::AuthenticatorError;
use nym_client_core::cli_helpers::client_import_coin_index_signatures::CommonClientImportCoinIndexSignaturesArgs;
use nym_client_core::cli_helpers::client_import_credential::CommonClientImportTicketBookArgs;
use nym_client_core::cli_helpers::client_import_expiration_date_signatures::CommonClientImportExpirationDateSignaturesArgs;
use nym_client_core::cli_helpers::client_import_master_verification_key::CommonClientImportMasterVerificationKeyArgs;
pub(crate) mod import_coin_index_signatures;
pub(crate) mod import_credential;
pub(crate) mod import_expiration_date_signatures;
pub(crate) mod import_master_verification_key;
pub(crate) mod show_ticketbooks;
#[derive(Args)]
#[clap(args_conflicts_with_subcommands = true, subcommand_required = true)]
pub struct Ecash {
#[clap(subcommand)]
pub command: EcashCommands,
}
impl Ecash {
pub async fn execute(self) -> Result<(), AuthenticatorError> {
match self.command {
EcashCommands::ShowTicketBooks(args) => show_ticketbooks::execute(args).await?,
EcashCommands::ImportTicketBook(args) => import_credential::execute(args).await?,
EcashCommands::ImportCoinIndexSignatures(args) => {
import_coin_index_signatures::execute(args).await?
}
EcashCommands::ImportExpirationDateSignatures(args) => {
import_expiration_date_signatures::execute(args).await?
}
EcashCommands::ImportMasterVerificationKey(args) => {
import_master_verification_key::execute(args).await?
}
}
Ok(())
}
}
#[derive(Subcommand)]
pub enum EcashCommands {
/// Display information associated with the imported ticketbooks,
ShowTicketBooks(show_ticketbooks::Args),
/// Import a pre-generated ticketbook
ImportTicketBook(CommonClientImportTicketBookArgs),
/// Import coin index signatures needed for ticketbooks
ImportCoinIndexSignatures(CommonClientImportCoinIndexSignaturesArgs),
/// Import expiration date signatures needed for ticketbooks
ImportExpirationDateSignatures(CommonClientImportExpirationDateSignaturesArgs),
/// Import master verification key needed for ticketbooks
ImportMasterVerificationKey(CommonClientImportMasterVerificationKeyArgs),
}
@@ -1,32 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliAuthenticatorClient;
use nym_authenticator::error::AuthenticatorError;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::cli_helpers::client_show_ticketbooks::{
show_ticketbooks, CommonShowTicketbooksArgs,
};
#[derive(clap::Args)]
pub(crate) struct Args {
#[command(flatten)]
common_args: CommonShowTicketbooksArgs,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
impl AsRef<CommonShowTicketbooksArgs> for Args {
fn as_ref(&self) -> &CommonShowTicketbooksArgs {
&self.common_args
}
}
pub(crate) async fn execute(args: Args) -> Result<(), AuthenticatorError> {
let output = args.output;
let res = show_ticketbooks::<CliAuthenticatorClient, _>(args).await?;
println!("{}", output.format(&res));
Ok(())
}
@@ -1,96 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::{override_config, CliAuthenticatorClient, OverrideConfig};
use clap::Args;
use nym_authenticator::{
config::{default_config_directory, default_config_filepath, default_data_directory, Config},
error::AuthenticatorError,
};
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::cli_helpers::client_init::{
initialise_client, CommonClientInitArgs, InitResultsWithConfig, InitialisableClient,
};
use serde::Serialize;
use std::{fmt::Display, fs, path::PathBuf};
impl InitialisableClient for CliAuthenticatorClient {
type InitArgs = Init;
fn initialise_storage_paths(id: &str) -> Result<(), Self::Error> {
fs::create_dir_all(default_data_directory(id))?;
fs::create_dir_all(default_config_directory(id))?;
Ok(())
}
fn default_config_path(id: &str) -> PathBuf {
default_config_filepath(id)
}
fn construct_config(init_args: &Self::InitArgs) -> Self::Config {
override_config(
Config::new(&init_args.common_args.id),
OverrideConfig::from(init_args.clone()),
)
}
}
#[derive(Args, Clone, Debug)]
pub(crate) struct Init {
#[command(flatten)]
common_args: CommonClientInitArgs,
#[clap(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
impl From<Init> for OverrideConfig {
fn from(init_config: Init) -> Self {
OverrideConfig {
nym_apis: init_config.common_args.nym_apis,
nyxd_urls: init_config.common_args.nyxd_urls,
enabled_credentials_mode: init_config.common_args.enabled_credentials_mode,
}
}
}
impl AsRef<CommonClientInitArgs> for Init {
fn as_ref(&self) -> &CommonClientInitArgs {
&self.common_args
}
}
#[derive(Debug, Serialize)]
pub struct InitResults {
#[serde(flatten)]
client_core: nym_client_core::init::types::InitResults,
client_address: String,
}
impl InitResults {
fn new(res: InitResultsWithConfig<Config>) -> Self {
Self {
client_address: res.init_results.address.to_string(),
client_core: res.init_results,
}
}
}
impl Display for InitResults {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "{}", self.client_core)?;
write!(f, "Address of this authenticator: {}", self.client_address)
}
}
pub(crate) async fn execute(args: Init) -> Result<(), AuthenticatorError> {
eprintln!("Initialising client...");
let output = args.output;
let res = initialise_client::<CliAuthenticatorClient>(args, None).await?;
let init_results = InitResults::new(res);
println!("{}", output.format(&init_results));
Ok(())
}
@@ -1,32 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliAuthenticatorClient;
use nym_authenticator::error::AuthenticatorError;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::cli_helpers::client_list_gateways::{
list_gateways, CommonClientListGatewaysArgs,
};
#[derive(clap::Args)]
pub(crate) struct Args {
#[command(flatten)]
common_args: CommonClientListGatewaysArgs,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
impl AsRef<CommonClientListGatewaysArgs> for Args {
fn as_ref(&self) -> &CommonClientListGatewaysArgs {
&self.common_args
}
}
pub(crate) async fn execute(args: Args) -> Result<(), AuthenticatorError> {
let output = args.output;
let res = list_gateways::<CliAuthenticatorClient, _>(args).await?;
println!("{}", output.format(&res));
Ok(())
}
@@ -1,173 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::ecash::Ecash;
use clap::{CommandFactory, Parser, Subcommand};
use log::error;
use nym_authenticator::{
config::{helpers::try_upgrade_config, BaseClientConfig, Config},
error::AuthenticatorError,
};
use nym_bin_common::bin_info;
use nym_bin_common::completions::{fig_generate, ArgShell};
use nym_client_core::cli_helpers::CliClient;
use std::sync::OnceLock;
mod add_gateway;
mod build_info;
pub mod ecash;
mod init;
mod list_gateways;
mod peer_handler;
mod request;
mod run;
mod sign;
mod switch_gateway;
pub(crate) struct CliAuthenticatorClient;
impl CliClient for CliAuthenticatorClient {
const NAME: &'static str = "authenticator";
type Error = AuthenticatorError;
type Config = Config;
async fn try_upgrade_outdated_config(id: &str) -> Result<(), Self::Error> {
try_upgrade_config(id).await
}
async fn try_load_current_config(id: &str) -> Result<Self::Config, Self::Error> {
try_load_current_config(id).await
}
}
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
#[derive(Parser)]
#[command(author = "Nymtech", version, about, long_version = pretty_build_info_static())]
pub(crate) struct Cli {
/// Path pointing to an env file that configures the client.
#[arg(short, long)]
pub(crate) config_env_file: Option<std::path::PathBuf>,
/// Flag used for disabling the printed banner in tty.
#[arg(long)]
pub(crate) no_banner: bool,
#[command(subcommand)]
command: Commands,
}
#[allow(clippy::large_enum_variant)]
#[derive(Subcommand)]
pub(crate) enum Commands {
/// Initialize an authenticator. Do this first!
Init(init::Init),
/// Run the authenticator with the provided configuration and optionally override
/// parameters.
Run(run::Run),
/// Make a dummy request to a running authenticator
Request(request::Request),
/// Ecash-related functionalities
Ecash(Ecash),
/// List all registered with gateways
ListGateways(list_gateways::Args),
/// Add new gateway to this client
AddGateway(add_gateway::Args),
/// Change the currently active gateway. Note that you must have already registered with the new gateway!
SwitchGateway(switch_gateway::Args),
/// Sign to prove ownership of this authenticator
Sign(sign::Sign),
/// Show build information of this binary
BuildInfo(build_info::BuildInfo),
/// Generate shell completions
Completions(ArgShell),
/// Generate Fig specification
GenerateFigSpec,
}
// Configuration that can be overridden.
pub(crate) struct OverrideConfig {
nym_apis: Option<Vec<url::Url>>,
nyxd_urls: Option<Vec<url::Url>>,
enabled_credentials_mode: Option<bool>,
}
pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config {
config
.with_optional_base_custom_env(
BaseClientConfig::with_custom_nym_apis,
args.nym_apis,
nym_network_defaults::var_names::NYM_API,
nym_config::parse_urls,
)
.with_optional_base_custom_env(
BaseClientConfig::with_custom_nyxd,
args.nyxd_urls,
nym_network_defaults::var_names::NYXD,
nym_config::parse_urls,
)
.with_optional_base(
BaseClientConfig::with_disabled_credentials,
args.enabled_credentials_mode.map(|b| !b),
)
}
pub(crate) async fn execute(args: Cli) -> Result<(), AuthenticatorError> {
let bin_name = "nym-authenticator";
match args.command {
Commands::Init(m) => init::execute(m).await?,
Commands::Run(m) => run::execute(&m).await?,
Commands::Request(r) => request::execute(&r).await?,
Commands::Ecash(ecash) => ecash.execute().await?,
Commands::ListGateways(args) => list_gateways::execute(args).await?,
Commands::AddGateway(args) => add_gateway::execute(args).await?,
Commands::SwitchGateway(args) => switch_gateway::execute(args).await?,
Commands::Sign(m) => sign::execute(&m).await?,
Commands::BuildInfo(m) => build_info::execute(m),
Commands::Completions(s) => s.generate(&mut Cli::command(), bin_name),
Commands::GenerateFigSpec => fig_generate(&mut Cli::command(), bin_name),
}
Ok(())
}
async fn try_load_current_config(id: &str) -> Result<Config, AuthenticatorError> {
// try to load the config as is
if let Ok(cfg) = Config::read_from_default_path(id) {
return if !cfg.validate() {
Err(AuthenticatorError::ConfigValidationFailure)
} else {
Ok(cfg)
};
}
// we couldn't load it - try upgrading it from older revisions
try_upgrade_config(id).await?;
let config = match Config::read_from_default_path(id) {
Ok(cfg) => cfg,
Err(err) => {
error!("Failed to load config for {id}. Are you sure you have run `init` before? (Error was: {err})");
return Err(AuthenticatorError::FailedToLoadConfig(id.to_string()));
}
};
if !config.validate() {
return Err(AuthenticatorError::ConfigValidationFailure);
}
Ok(config)
}
@@ -1,143 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::try_load_current_config;
use crate::cli::AuthenticatorError;
use crate::cli::{override_config, OverrideConfig};
use clap::{Args, Subcommand};
use nym_authenticator_requests::latest::{
registration::{ClientMac, FinalMessage, GatewayClient, InitMessage, IpPair},
request::{AuthenticatorRequest, AuthenticatorRequestData},
};
use nym_client_core::cli_helpers::client_run::CommonClientRunArgs;
use nym_sdk::mixnet::{MixnetMessageSender, Recipient, TransmissionLane};
use nym_task::TaskHandle;
use nym_wireguard_types::PeerPublicKey;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::str::FromStr;
use std::time::Duration;
use tokio::time::sleep;
#[allow(clippy::struct_excessive_bools)]
#[derive(Args, Clone)]
pub(crate) struct Request {
#[command(flatten)]
common_args: CommonClientRunArgs,
#[command(subcommand)]
request: RequestType,
authenticator_recipient: String,
}
impl From<Request> for OverrideConfig {
fn from(request_config: Request) -> Self {
OverrideConfig {
nym_apis: None,
nyxd_urls: request_config.common_args.nyxd_urls,
enabled_credentials_mode: request_config.common_args.enabled_credentials_mode,
}
}
}
#[derive(Clone, Subcommand)]
pub(crate) enum RequestType {
Initial(Initial),
Final(Final),
QueryBandwidth(QueryBandwidth),
}
#[derive(Args, Clone, Debug)]
pub(crate) struct Initial {
pub_key: String,
}
#[derive(Args, Clone, Debug)]
pub(crate) struct Final {
pub_key: String,
private_ipv4: String,
private_ipv6: String,
mac: String,
}
#[derive(Args, Clone, Debug)]
pub(crate) struct QueryBandwidth {
pub_key: String,
}
impl TryFrom<RequestType> for AuthenticatorRequestData {
type Error = AuthenticatorError;
fn try_from(value: RequestType) -> Result<Self, Self::Error> {
let ret = match value {
RequestType::Initial(req) => AuthenticatorRequestData::Initial(InitMessage::new(
PeerPublicKey::from_str(&req.pub_key)?,
)),
RequestType::Final(req) => AuthenticatorRequestData::Final(Box::new(FinalMessage {
gateway_client: GatewayClient {
pub_key: PeerPublicKey::from_str(&req.pub_key)?,
private_ips: IpPair::new(
Ipv4Addr::from_str(&req.private_ipv4)?,
Ipv6Addr::from_str(&req.private_ipv6)?,
),
mac: ClientMac::from_str(&req.mac)?,
},
credential: None,
})),
RequestType::QueryBandwidth(req) => {
AuthenticatorRequestData::QueryBandwidth(PeerPublicKey::from_str(&req.pub_key)?)
}
};
Ok(ret)
}
}
pub(crate) async fn execute(args: &Request) -> Result<(), AuthenticatorError> {
let mut config = try_load_current_config(&args.common_args.id).await?;
config = override_config(config, OverrideConfig::from(args.clone()));
let shutdown = TaskHandle::default();
let mixnet_client = nym_authenticator::mixnet_client::create_mixnet_client(
&config.base,
shutdown.get_handle().named("nym_sdk::MixnetClient"),
None,
None,
false,
&config.storage_paths.common_paths,
)
.await?;
let request_data = AuthenticatorRequestData::try_from(args.request.clone())?;
let authenticator_recipient = Recipient::from_str(&args.authenticator_recipient)?;
let (request, _) = match request_data {
AuthenticatorRequestData::Initial(init_message) => {
AuthenticatorRequest::new_initial_request(init_message)
}
AuthenticatorRequestData::Final(final_message) => {
AuthenticatorRequest::new_final_request(*final_message)
}
AuthenticatorRequestData::QueryBandwidth(query_message) => {
AuthenticatorRequest::new_query_request(query_message)
}
AuthenticatorRequestData::TopUpBandwidth(top_up_message) => {
AuthenticatorRequest::new_topup_request(*top_up_message)
}
};
mixnet_client
.split_sender()
.send(nym_sdk::mixnet::InputMessage::new_regular(
authenticator_recipient,
request.to_bytes().unwrap(),
TransmissionLane::General,
None,
))
.await
.map_err(|source| AuthenticatorError::FailedToSendPacketToMixnet {
source: Box::new(source),
})?;
log::info!("Sent request, sleeping 60 seconds or until killed");
sleep(Duration::from_secs(60)).await;
Ok(())
}
@@ -1,55 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::peer_handler::DummyHandler;
use crate::cli::try_load_current_config;
use crate::cli::{override_config, OverrideConfig};
use clap::Args;
use nym_authenticator::error::AuthenticatorError;
use nym_client_core::cli_helpers::client_run::CommonClientRunArgs;
use nym_crypto::asymmetric::x25519::KeyPair;
use nym_task::TaskHandle;
use nym_wireguard::WireguardGatewayData;
use rand::rngs::OsRng;
use std::sync::Arc;
#[allow(clippy::struct_excessive_bools)]
#[derive(Args, Clone)]
pub(crate) struct Run {
#[command(flatten)]
common_args: CommonClientRunArgs,
}
impl From<Run> for OverrideConfig {
fn from(run_config: Run) -> Self {
OverrideConfig {
nym_apis: None,
nyxd_urls: run_config.common_args.nyxd_urls,
enabled_credentials_mode: run_config.common_args.enabled_credentials_mode,
}
}
}
pub(crate) async fn execute(args: &Run) -> Result<(), AuthenticatorError> {
let mut config = try_load_current_config(&args.common_args.id).await?;
config = override_config(config, OverrideConfig::from(args.clone()));
log::debug!("Using config: {:#?}", config);
log::info!("Starting authenticator service provider");
let (wireguard_gateway_data, peer_rx) = WireguardGatewayData::new(
config.authenticator.clone().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let task_handler = TaskHandle::default();
let handler = DummyHandler::new(peer_rx, task_handler.fork("peer_handler"));
tokio::spawn(async move {
handler.run().await;
});
let mut server = nym_authenticator::Authenticator::new(config, wireguard_gateway_data, vec![]);
if let Some(custom_mixnet) = &args.common_args.custom_mixnet {
server = server.with_stored_topology(custom_mixnet)?
}
server.run_service_provider().await
}
@@ -1,74 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::try_load_current_config;
use clap::Args;
use nym_authenticator::error::AuthenticatorError;
use nym_bin_common::output_format::OutputFormat;
use nym_client_core::client::key_manager::persistence::OnDiskKeys;
use nym_client_core::error::ClientCoreError;
use nym_crypto::asymmetric::ed25519;
use nym_types::helpers::ConsoleSigningOutput;
#[derive(Args, Clone)]
pub(crate) struct Sign {
/// The id of the mixnode you want to sign with
#[arg(long)]
id: String,
/// Signs a transaction-specific payload, that is going to be sent to the smart contract, with your identity key
#[arg(long)]
contract_msg: String,
#[arg(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
fn print_signed_contract_msg(
private_key: &ed25519::PrivateKey,
raw_msg: &str,
output: OutputFormat,
) {
let trimmed = raw_msg.trim();
eprintln!(">>> attempting to sign {trimmed}");
let Ok(decoded) = bs58::decode(trimmed).into_vec() else {
println!("it seems you have incorrectly copied the message to sign. Make sure you didn't accidentally skip any characters");
return;
};
eprintln!(">>> decoding the message...");
// we don't really care about what particular information is embedded inside of it,
// we just want to know if user correctly copied the string, i.e. whether it's a valid bs58 encoded json
if serde_json::from_slice::<serde_json::Value>(&decoded).is_err() {
println!("it seems you have incorrectly copied the message to sign. Make sure you didn't accidentally skip any characters");
return;
};
// if this is a valid json, it MUST be a valid string
let decoded_string = String::from_utf8(decoded.clone()).unwrap();
let signature = private_key.sign(&decoded).to_base58_string();
let sign_output = ConsoleSigningOutput::new(decoded_string, signature);
println!("{}", output.format(&sign_output));
}
pub(crate) async fn execute(args: &Sign) -> Result<(), AuthenticatorError> {
let config = try_load_current_config(&args.id).await?;
let key_store = OnDiskKeys::new(config.storage_paths.common_paths.keys);
let identity_keypair = key_store.load_identity_keypair().map_err(|source| {
AuthenticatorError::ClientCoreError(ClientCoreError::KeyStoreError {
source: Box::new(source),
})
})?;
print_signed_contract_msg(
identity_keypair.private_key(),
&args.contract_msg,
args.output,
);
Ok(())
}
@@ -1,24 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cli::CliAuthenticatorClient;
use nym_authenticator::error::AuthenticatorError;
use nym_client_core::cli_helpers::client_switch_gateway::{
switch_gateway, CommonClientSwitchGatewaysArgs,
};
#[derive(clap::Args, Clone, Debug)]
pub struct Args {
#[command(flatten)]
common_args: CommonClientSwitchGatewaysArgs,
}
impl AsRef<CommonClientSwitchGatewaysArgs> for Args {
fn as_ref(&self) -> &CommonClientSwitchGatewaysArgs {
&self.common_args
}
}
pub(crate) async fn execute(args: Args) -> Result<(), AuthenticatorError> {
switch_gateway::<CliAuthenticatorClient, _>(args).await
}
+7 -7
View File
@@ -17,6 +17,9 @@ pub enum AuthenticatorError {
#[error("{0}")]
CredentialVerificationError(#[from] nym_credential_verification::Error),
#[error("invalid credential type")]
InvalidCredentialType,
#[error("the entity wrapping the network requester has disconnected")]
DisconnectedParent,
@@ -77,13 +80,7 @@ pub enum AuthenticatorError {
#[error("peers can't be interacted with anymore")]
PeerInteractionStopped,
#[error("operation is not supported")]
UnsupportedOperation,
#[error("operation unavailable for older client")]
OldClient,
#[error("storage should have the requested bandwidht entry")]
#[error("storage should have the requested bandwidth entry")]
MissingClientBandwidthEntry,
#[error("unknown version number")]
@@ -103,6 +100,9 @@ pub enum AuthenticatorError {
#[error("{0}")]
RecipientFormatting(#[from] nym_sdk::mixnet::RecipientFormattingError),
#[error("no credential received")]
NoCredentialReceived,
}
pub type Result<T> = std::result::Result<T, AuthenticatorError>;
@@ -1,20 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
mod cli;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
use clap::Parser;
let args = cli::Cli::parse();
nym_bin_common::logging::setup_tracing_logger();
nym_network_defaults::setup_env(args.config_env_file.as_ref());
if !args.no_banner {
nym_bin_common::logging::maybe_print_banner(clap::crate_name!(), clap::crate_version!());
}
cli::execute(args).await?;
Ok(())
}
@@ -23,13 +23,15 @@ use nym_authenticator_requests::{
},
v1, v2, v3, v4, v5, CURRENT_VERSION,
};
use nym_credential_verification::ecash::traits::EcashManager;
use nym_credential_verification::{
bandwidth_storage_manager::BandwidthStorageManager, ecash::EcashManager,
BandwidthFlushingBehaviourConfig, ClientBandwidth, CredentialVerifier,
bandwidth_storage_manager::BandwidthStorageManager, BandwidthFlushingBehaviourConfig,
ClientBandwidth, CredentialVerifier,
};
use nym_credentials_interface::CredentialSpendingData;
use nym_credentials_interface::{CredentialSpendingData, TicketType};
use nym_crypto::asymmetric::x25519::KeyPair;
use nym_gateway_requests::models::CredentialSpendingRequest;
use nym_gateway_storage::models::PersistedBandwidth;
use nym_sdk::mixnet::{
AnonymousSenderTag, InputMessage, MixnetMessageSender, Recipient, TransmissionLane,
};
@@ -74,7 +76,7 @@ pub(crate) struct MixnetListener {
pub(crate) peer_manager: PeerManager,
pub(crate) ecash_verifier: Option<Arc<EcashManager>>,
pub(crate) ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
pub(crate) timeout_check_interval: IntervalStream,
@@ -88,7 +90,7 @@ impl MixnetListener {
wireguard_gateway_data: WireguardGatewayData,
mixnet_client: nym_sdk::mixnet::MixnetClient,
task_handle: TaskHandle,
ecash_verifier: Option<Arc<EcashManager>>,
ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
) -> Self {
let timeout_check_interval =
IntervalStream::new(tokio::time::interval(DEFAULT_REGISTRATION_TIMEOUT_CHECK));
@@ -500,38 +502,37 @@ impl MixnetListener {
128,
));
// If gateway does ecash verification and client sends a credential, we do the additional
// credential verification. Later this will become mandatory.
if let (Some(ecash_verifier), Some(credential)) =
(self.ecash_verifier.clone(), final_message.credential())
let Some(credential) = final_message.credential() else {
return Err(AuthenticatorError::NoCredentialReceived);
};
let client_id = self
.ecash_verifier
.storage()
.insert_wireguard_peer(
&peer,
TicketType::try_from_encoded(credential.payment.t_type)
.map_err(|_| AuthenticatorError::InvalidCredentialType)?
.into(),
)
.await?;
if let Err(e) =
credential_verification(self.ecash_verifier.clone(), credential, client_id).await
{
let client_id = ecash_verifier
self.ecash_verifier
.storage()
.insert_wireguard_peer(&peer, true)
.await?
.ok_or(AuthenticatorError::InternalError(
"peer with ticket shouldn't have been used before without a ticket".to_string(),
))?;
if let Err(e) =
Self::credential_verification(ecash_verifier.clone(), credential, client_id).await
{
ecash_verifier
.storage()
.remove_wireguard_peer(&peer.public_key.to_string())
.await?;
return Err(e);
}
let public_key = peer.public_key.to_string();
if let Err(e) = self.peer_manager.add_peer(peer, Some(client_id)).await {
ecash_verifier
.storage()
.remove_wireguard_peer(&public_key)
.await?;
return Err(e);
}
} else {
self.peer_manager.add_peer(peer, None).await?;
.remove_wireguard_peer(&peer.public_key.to_string())
.await?;
return Err(e);
}
let public_key = peer.public_key.to_string();
if let Err(e) = self.peer_manager.add_peer(peer).await {
self.ecash_verifier
.storage()
.remove_wireguard_peer(&public_key)
.await?;
return Err(e);
}
registred_and_free
.registration_in_progres
.remove(&final_message.pub_key());
@@ -596,37 +597,6 @@ impl MixnetListener {
Ok((bytes, reply_to))
}
async fn credential_verification(
ecash_verifier: Arc<EcashManager>,
credential: CredentialSpendingData,
client_id: i64,
) -> Result<i64> {
ecash_verifier
.storage()
.create_bandwidth_entry(client_id)
.await?;
let bandwidth = ecash_verifier
.storage()
.get_available_bandwidth(client_id)
.await?
.ok_or(AuthenticatorError::InternalError(
"bandwidth entry should have just been created".to_string(),
))?;
let client_bandwidth = ClientBandwidth::new(bandwidth.into());
let mut verifier = CredentialVerifier::new(
CredentialSpendingRequest::new(credential),
ecash_verifier.clone(),
BandwidthStorageManager::new(
ecash_verifier.storage().clone(),
client_bandwidth,
client_id,
BandwidthFlushingBehaviourConfig::default(),
true,
),
);
Ok(verifier.verify().await?)
}
async fn on_query_bandwidth_request(
&mut self,
msg: Box<dyn QueryBandwidthMessage + Send + Sync + 'static>,
@@ -634,12 +604,12 @@ impl MixnetListener {
request_id: u64,
reply_to: Option<Recipient>,
) -> AuthenticatorHandleResult {
let bandwidth_data = self.peer_manager.query_bandwidth(msg).await?;
let bandwidth_data = self.peer_manager.query_bandwidth(msg.pub_key()).await?;
let bytes = match AuthenticatorVersion::from(protocol) {
AuthenticatorVersion::V1 => {
v1::response::AuthenticatorResponse::new_remaining_bandwidth(
bandwidth_data.map(|data| v1::registration::RemainingBandwidthData {
available_bandwidth: data.available_bandwidth as u64,
available_bandwidth: data as u64,
suspended: false,
}),
reply_to.ok_or(AuthenticatorError::MissingReplyToForOldClient)?,
@@ -652,8 +622,10 @@ impl MixnetListener {
}
AuthenticatorVersion::V2 => {
v2::response::AuthenticatorResponse::new_remaining_bandwidth(
bandwidth_data.map(|data| v2::registration::RemainingBandwidthData {
available_bandwidth: data.available_bandwidth,
bandwidth_data.map(|available_bandwidth| {
v2::registration::RemainingBandwidthData {
available_bandwidth,
}
}),
reply_to.ok_or(AuthenticatorError::MissingReplyToForOldClient)?,
request_id,
@@ -665,8 +637,10 @@ impl MixnetListener {
}
AuthenticatorVersion::V3 => {
v3::response::AuthenticatorResponse::new_remaining_bandwidth(
bandwidth_data.map(|data| v3::registration::RemainingBandwidthData {
available_bandwidth: data.available_bandwidth,
bandwidth_data.map(|available_bandwidth| {
v3::registration::RemainingBandwidthData {
available_bandwidth,
}
}),
reply_to.ok_or(AuthenticatorError::MissingReplyToForOldClient)?,
request_id,
@@ -678,8 +652,10 @@ impl MixnetListener {
}
AuthenticatorVersion::V4 => {
v4::response::AuthenticatorResponse::new_remaining_bandwidth(
bandwidth_data.map(|data| v4::registration::RemainingBandwidthData {
available_bandwidth: data.available_bandwidth,
bandwidth_data.map(|available_bandwidth| {
v4::registration::RemainingBandwidthData {
available_bandwidth,
}
}),
reply_to.ok_or(AuthenticatorError::MissingReplyToForOldClient)?,
request_id,
@@ -691,8 +667,10 @@ impl MixnetListener {
}
AuthenticatorVersion::V5 => {
v5::response::AuthenticatorResponse::new_remaining_bandwidth(
bandwidth_data.map(|data| v5::registration::RemainingBandwidthData {
available_bandwidth: data.available_bandwidth,
bandwidth_data.map(|available_bandwidth| {
v5::registration::RemainingBandwidthData {
available_bandwidth,
}
}),
request_id,
)
@@ -713,17 +691,13 @@ impl MixnetListener {
request_id: u64,
reply_to: Option<Recipient>,
) -> AuthenticatorHandleResult {
let Some(ecash_verifier) = self.ecash_verifier.clone() else {
return Err(AuthenticatorError::UnsupportedOperation);
};
let client_id = ecash_verifier
let client_id = self
.ecash_verifier
.storage()
.get_wireguard_peer(&msg.pub_key().to_string())
.await?
.ok_or(AuthenticatorError::MissingClientBandwidthEntry)?
.client_id
.ok_or(AuthenticatorError::OldClient)?;
.client_id;
let client_bandwidth = self
.peer_manager
.query_client_bandwidth(msg.pub_key())
@@ -737,9 +711,9 @@ impl MixnetListener {
let credential = msg.credential();
let mut verifier = CredentialVerifier::new(
CredentialSpendingRequest::new(credential.clone()),
ecash_verifier.clone(),
self.ecash_verifier.clone(),
BandwidthStorageManager::new(
ecash_verifier.storage().clone(),
self.ecash_verifier.storage(),
client_bandwidth,
client_id,
BandwidthFlushingBehaviourConfig::default(),
@@ -907,6 +881,45 @@ impl MixnetListener {
}
}
pub async fn credential_storage_preparation(
ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
client_id: i64,
) -> Result<PersistedBandwidth> {
ecash_verifier
.storage()
.create_bandwidth_entry(client_id)
.await?;
let bandwidth = ecash_verifier
.storage()
.get_available_bandwidth(client_id)
.await?
.ok_or(AuthenticatorError::InternalError(
"bandwidth entry should have just been created".to_string(),
))?;
Ok(bandwidth)
}
async fn credential_verification(
ecash_verifier: Arc<dyn EcashManager + Send + Sync>,
credential: CredentialSpendingData,
client_id: i64,
) -> Result<i64> {
let bandwidth = credential_storage_preparation(ecash_verifier.clone(), client_id).await?;
let client_bandwidth = ClientBandwidth::new(bandwidth.into());
let mut verifier = CredentialVerifier::new(
CredentialSpendingRequest::new(credential),
ecash_verifier.clone(),
BandwidthStorageManager::new(
ecash_verifier.storage(),
client_bandwidth,
client_id,
BandwidthFlushingBehaviourConfig::default(),
true,
),
);
Ok(verifier.verify().await?)
}
fn deserialize_request(reconstructed: &ReconstructedMessage) -> Result<AuthenticatorRequest> {
let request_version = *reconstructed
.message
@@ -4,15 +4,11 @@
use crate::error::*;
use defguard_wireguard_rs::{host::Peer, key::Key};
use futures::channel::oneshot;
use nym_authenticator_requests::{
latest::registration::{GatewayClient, RemainingBandwidthData},
traits::QueryBandwidthMessage,
};
use nym_credential_verification::ClientBandwidth;
use nym_wireguard::{
peer_controller::{
AddPeerControlResponse, GetClientBandwidthControlResponse, PeerControlRequest,
QueryBandwidthControlResponse, QueryPeerControlResponse, RemovePeerControlResponse,
QueryPeerControlResponse, RemovePeerControlResponse,
},
WireguardGatewayData,
};
@@ -28,13 +24,9 @@ impl PeerManager {
wireguard_gateway_data,
}
}
pub async fn add_peer(&mut self, peer: Peer, client_id: Option<i64>) -> Result<()> {
pub async fn add_peer(&mut self, peer: Peer) -> Result<()> {
let (response_tx, response_rx) = oneshot::channel();
let msg = PeerControlRequest::AddPeer {
peer,
client_id,
response_tx,
};
let msg = PeerControlRequest::AddPeer { peer, response_tx };
self.wireguard_gateway_data
.peer_tx()
.send(msg)
@@ -52,8 +44,8 @@ impl PeerManager {
Ok(())
}
pub async fn _remove_peer(&mut self, client: &GatewayClient) -> Result<()> {
let key = Key::new(client.pub_key().to_bytes());
pub async fn _remove_peer(&mut self, pub_key: PeerPublicKey) -> Result<()> {
let key = Key::new(pub_key.to_bytes());
let (response_tx, response_rx) = oneshot::channel();
let msg = PeerControlRequest::RemovePeer { key, response_tx };
self.wireguard_gateway_data
@@ -63,7 +55,7 @@ impl PeerManager {
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
let RemovePeerControlResponse { success } = response_rx.await.map_err(|_| {
AuthenticatorError::InternalError("no response for add peer".to_string())
AuthenticatorError::InternalError("no response for remove peer".to_string())
})?;
if !success {
return Err(AuthenticatorError::InternalError(
@@ -94,31 +86,13 @@ impl PeerManager {
Ok(peer)
}
pub async fn query_bandwidth(
&mut self,
msg: Box<dyn QueryBandwidthMessage + Send + Sync + 'static>,
) -> Result<Option<RemainingBandwidthData>> {
let key = Key::new(msg.pub_key().to_bytes());
let (response_tx, response_rx) = oneshot::channel();
let msg = PeerControlRequest::QueryBandwidth { key, response_tx };
self.wireguard_gateway_data
.peer_tx()
.send(msg)
.await
.map_err(|_| AuthenticatorError::PeerInteractionStopped)?;
let QueryBandwidthControlResponse {
success,
bandwidth_data,
} = response_rx.await.map_err(|_| {
AuthenticatorError::InternalError("no response for query bandwidth".to_string())
})?;
if !success {
return Err(AuthenticatorError::InternalError(
"querying bandwidth could not be performed".to_string(),
));
}
Ok(bandwidth_data)
pub async fn query_bandwidth(&mut self, public_key: PeerPublicKey) -> Result<Option<i64>> {
let res = if let Some(client_bandwidth) = self.query_client_bandwidth(public_key).await? {
Some(client_bandwidth.available().await)
} else {
None
};
Ok(res)
}
pub async fn query_client_bandwidth(
@@ -143,3 +117,239 @@ impl PeerManager {
Ok(client_bandwidth)
}
}
#[cfg(test)]
mod tests {
use std::{str::FromStr, sync::Arc};
use nym_credential_verification::{
bandwidth_storage_manager::BandwidthStorageManager, ecash::MockEcashManager,
};
use nym_credentials_interface::Bandwidth;
use nym_crypto::asymmetric::x25519::KeyPair;
use nym_gateway_storage::traits::{mock::MockGatewayStorage, BandwidthGatewayStorage};
use nym_wireguard::peer_controller::{start_controller, stop_controller};
use rand::rngs::OsRng;
use time::{Duration, OffsetDateTime};
use crate::{config::Authenticator, mixnet_listener::credential_storage_preparation};
use super::*;
#[tokio::test]
async fn add_peer() {
let (wireguard_data, request_rx) = WireguardGatewayData::new(
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let (storage, task_manager) = start_controller(
peer_manager.wireguard_gateway_data.peer_tx().clone(),
request_rx,
);
let peer = Peer::default();
let ecash_manager = MockEcashManager::new(Box::new(storage.clone()));
assert!(peer_manager.add_peer(peer.clone()).await.is_err());
let client_id = storage
.insert_wireguard_peer(&peer, FromStr::from_str("entry_wireguard").unwrap())
.await
.unwrap();
assert!(peer_manager.add_peer(peer.clone()).await.is_err());
credential_storage_preparation(Arc::new(ecash_manager), client_id)
.await
.unwrap();
peer_manager.add_peer(peer.clone()).await.unwrap();
stop_controller(task_manager).await;
}
async fn helper_add_peer(storage: &MockGatewayStorage, peer_manager: &mut PeerManager) -> i64 {
let peer = Peer::default();
let ecash_manager = MockEcashManager::new(Box::new(storage.clone()));
let client_id = storage
.insert_wireguard_peer(&peer, FromStr::from_str("entry_wireguard").unwrap())
.await
.unwrap();
credential_storage_preparation(Arc::new(ecash_manager), client_id)
.await
.unwrap();
peer_manager.add_peer(peer.clone()).await.unwrap();
client_id
}
#[tokio::test]
async fn remove_peer() {
let (wireguard_data, request_rx) = WireguardGatewayData::new(
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let (storage, task_manager) = start_controller(
peer_manager.wireguard_gateway_data.peer_tx().clone(),
request_rx,
);
helper_add_peer(&storage, &mut peer_manager).await;
peer_manager._remove_peer(public_key).await.unwrap();
stop_controller(task_manager).await;
}
#[tokio::test]
async fn query_peer() {
let (wireguard_data, request_rx) = WireguardGatewayData::new(
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let (storage, task_manager) = start_controller(
peer_manager.wireguard_gateway_data.peer_tx().clone(),
request_rx,
);
assert!(peer_manager.query_peer(public_key).await.unwrap().is_none());
helper_add_peer(&storage, &mut peer_manager).await;
let peer = peer_manager.query_peer(public_key).await.unwrap().unwrap();
assert_eq!(peer.public_key, key);
stop_controller(task_manager).await;
}
#[tokio::test]
async fn query_bandwidth() {
let (wireguard_data, request_rx) = WireguardGatewayData::new(
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let (storage, task_manager) = start_controller(
peer_manager.wireguard_gateway_data.peer_tx().clone(),
request_rx,
);
assert!(peer_manager
.query_bandwidth(public_key)
.await
.unwrap()
.is_none());
helper_add_peer(&storage, &mut peer_manager).await;
let available_bandwidth = peer_manager
.query_bandwidth(public_key)
.await
.unwrap()
.unwrap();
assert_eq!(available_bandwidth, 0);
stop_controller(task_manager).await;
}
#[tokio::test]
async fn query_client_bandwidth() {
let (wireguard_data, request_rx) = WireguardGatewayData::new(
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let (storage, task_manager) = start_controller(
peer_manager.wireguard_gateway_data.peer_tx().clone(),
request_rx,
);
assert!(peer_manager
.query_client_bandwidth(public_key)
.await
.unwrap()
.is_none());
helper_add_peer(&storage, &mut peer_manager).await;
let available_bandwidth = peer_manager
.query_client_bandwidth(public_key)
.await
.unwrap()
.unwrap()
.available()
.await;
assert_eq!(available_bandwidth, 0);
stop_controller(task_manager).await;
}
#[tokio::test]
async fn increase_decrease_bandwidth() {
let (wireguard_data, request_rx) = WireguardGatewayData::new(
Authenticator::default().into(),
Arc::new(KeyPair::new(&mut OsRng)),
);
let mut peer_manager = PeerManager::new(wireguard_data);
let key = Key::default();
let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap();
let top_up = 42;
let consume = 4;
let (storage, task_manager) = start_controller(
peer_manager.wireguard_gateway_data.peer_tx().clone(),
request_rx,
);
let client_id = helper_add_peer(&storage, &mut peer_manager).await;
let client_bandwidth = peer_manager
.query_client_bandwidth(public_key)
.await
.unwrap()
.unwrap();
let mut bw_manager = BandwidthStorageManager::new(
Box::new(storage),
client_bandwidth.clone(),
client_id,
Default::default(),
true,
);
bw_manager
.increase_bandwidth(
Bandwidth::new_unchecked(top_up as u64),
OffsetDateTime::now_utc()
.checked_add(Duration::minutes(1))
.unwrap(),
)
.await
.unwrap();
assert_eq!(client_bandwidth.available().await, top_up);
assert_eq!(
peer_manager
.query_bandwidth(public_key)
.await
.unwrap()
.unwrap(),
top_up
);
bw_manager.try_use_bandwidth(consume).await.unwrap();
let remaining = top_up - consume;
assert_eq!(client_bandwidth.available().await, remaining);
assert_eq!(
peer_manager
.query_bandwidth(public_key)
.await
.unwrap()
.unwrap(),
remaining
);
stop_controller(task_manager).await;
}
}