Compare commits

..

8 Commits

Author SHA1 Message Date
Jędrzej Stuczyński a9308efe10 logs for all messages in authenticator 2024-10-21 17:15:03 +01:00
Bogdan-Ștefan Neacşu 9a45de5874 Remove stale free riders after 24 hours (#5002) 2024-10-21 13:15:08 +03:00
Bogdan-Ștefan Neacşu 2f894b9be3 Remove race on initial req processing (#5001) 2024-10-21 13:15:01 +03:00
Bogdan-Ștefan Neacşu 18c6fd3e3e Gateway peer fixes (#4985)
* Create bandwidth entry

* Remove mismatch possibilities
2024-10-18 12:43:36 +03:00
benedettadavico d75c7eaaaf update changelog and bump binaries 2024-10-17 08:51:39 +02:00
Jędrzej Stuczyński f786dbeaa7 Merge pull request #4960 from nymtech/chore/remove-bloomfilters-for-double-spending
nym-node: don't use bloomfilters for double spending checks
2024-10-14 09:44:33 +01:00
Jędrzej Stuczyński eae76cce10 disabled bloomfilter exporting in nym-api 2024-10-11 08:54:50 +01:00
Jędrzej Stuczyński 9341db5d08 removed gateway/nym-node using global double spending bloomfilter 2024-10-10 17:09:51 +01:00
26 changed files with 158 additions and 449 deletions
+74
View File
@@ -4,6 +4,80 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://
## [Unreleased]
## [2024.12-aero] (2024-10-17)
- nym-node: don't use bloomfilters for double spending checks ([#4960])
- bugfix: replace unreachable macro with an error return ([#4958])
- [DOCs:/operators]: Update FAQ sphinx size ([#4946])
- [DOCs/operators]: Release notes v2024.11-wedel ([#4939])
- Fix handle drop ([#4934])
- Assume offline mode ([#4926])
- Make ip-packet-request VERSION pub ([#4925])
- Expose error type ([#4924])
- Fix argument to cargo-deny action ([#4922])
- Fix nymvpn.com url in mainnet defaults ([#4920])
- Check both version and type in message header ([#4918])
- Bump http-api-client default timeout to 30 sec ([#4917])
- Max/proxy ffi ([#4906])
- Data Observatory stub ([#4905])
- Fix missing duplication of modified tables ([#4904])
- Update cargo deny ([#4901])
- docs: add hostname instructions for wss ([#4900])
- build(deps): bump the patch-updates group across 1 directory with 9 updates ([#4898])
- Fix clippy for beta toolchain ([#4897])
- Remove clippy github PR annotations ([#4896])
- Fix apt install in ci-build-upload-binaries.yml ([#4894])
- Update network monitor entrypoint ([#4893])
- Update nym-vpn metapackage and replace nymvpn-x with nym-vpn-app ([#4889])
- Entry wireguard tickets ([#4888])
- Build and Push CI ([#4887])
- Feature/updated gateway registration ([#4885])
- Few fixes to NNM pre deploy ([#4883])
- Fix sql serde with enum ([#4875])
- allow clients to send stateless gateway requests without prior registration ([#4873])
- chore: remove queued migration for adding explicit admin ([#4871])
- Gateway database modifications for different modes ([#4868])
- build(deps): bump strum from 0.25.0 to 0.26.3 ([#4848])
- Use serde from workspace ([#4833])
- build(deps): bump toml from 0.5.11 to 0.8.14 ([#4805])
- Max/rust sdk stream abstraction ([#4743])
[#4960]: https://github.com/nymtech/nym/pull/4960
[#4958]: https://github.com/nymtech/nym/pull/4958
[#4946]: https://github.com/nymtech/nym/pull/4946
[#4939]: https://github.com/nymtech/nym/pull/4939
[#4934]: https://github.com/nymtech/nym/pull/4934
[#4926]: https://github.com/nymtech/nym/pull/4926
[#4925]: https://github.com/nymtech/nym/pull/4925
[#4924]: https://github.com/nymtech/nym/pull/4924
[#4922]: https://github.com/nymtech/nym/pull/4922
[#4920]: https://github.com/nymtech/nym/pull/4920
[#4918]: https://github.com/nymtech/nym/pull/4918
[#4917]: https://github.com/nymtech/nym/pull/4917
[#4906]: https://github.com/nymtech/nym/pull/4906
[#4905]: https://github.com/nymtech/nym/pull/4905
[#4904]: https://github.com/nymtech/nym/pull/4904
[#4901]: https://github.com/nymtech/nym/pull/4901
[#4900]: https://github.com/nymtech/nym/pull/4900
[#4898]: https://github.com/nymtech/nym/pull/4898
[#4897]: https://github.com/nymtech/nym/pull/4897
[#4896]: https://github.com/nymtech/nym/pull/4896
[#4894]: https://github.com/nymtech/nym/pull/4894
[#4893]: https://github.com/nymtech/nym/pull/4893
[#4889]: https://github.com/nymtech/nym/pull/4889
[#4888]: https://github.com/nymtech/nym/pull/4888
[#4887]: https://github.com/nymtech/nym/pull/4887
[#4885]: https://github.com/nymtech/nym/pull/4885
[#4883]: https://github.com/nymtech/nym/pull/4883
[#4875]: https://github.com/nymtech/nym/pull/4875
[#4873]: https://github.com/nymtech/nym/pull/4873
[#4871]: https://github.com/nymtech/nym/pull/4871
[#4868]: https://github.com/nymtech/nym/pull/4868
[#4848]: https://github.com/nymtech/nym/pull/4848
[#4833]: https://github.com/nymtech/nym/pull/4833
[#4805]: https://github.com/nymtech/nym/pull/4805
[#4743]: https://github.com/nymtech/nym/pull/4743
## [2024.11-wedel] (2024-09-23)
- Backport #4894 to fix ci ([#4899])
Generated
+8 -42
View File
@@ -2317,7 +2317,7 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "explorer-api"
version = "1.1.40"
version = "1.1.41"
dependencies = [
"chrono",
"clap 4.5.17",
@@ -4223,7 +4223,7 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "nym-api"
version = "1.1.44"
version = "1.1.45"
dependencies = [
"anyhow",
"async-trait",
@@ -4294,9 +4294,6 @@ dependencies = [
"tap",
"tempfile",
"thiserror",
"tikv-jemalloc-ctl",
"tikv-jemalloc-sys",
"tikv-jemallocator",
"time",
"tokio",
"tokio-stream",
@@ -4472,7 +4469,7 @@ dependencies = [
[[package]]
name = "nym-cli"
version = "1.1.42"
version = "1.1.43"
dependencies = [
"anyhow",
"base64 0.22.1",
@@ -4553,7 +4550,7 @@ dependencies = [
[[package]]
name = "nym-client"
version = "1.1.41"
version = "1.1.42"
dependencies = [
"bs58",
"clap 4.5.17",
@@ -5592,7 +5589,7 @@ dependencies = [
[[package]]
name = "nym-network-requester"
version = "1.1.42"
version = "1.1.43"
dependencies = [
"addr",
"anyhow",
@@ -5643,7 +5640,7 @@ dependencies = [
[[package]]
name = "nym-node"
version = "1.1.8"
version = "1.1.9"
dependencies = [
"anyhow",
"bip39",
@@ -5932,7 +5929,7 @@ dependencies = [
[[package]]
name = "nym-socks5-client"
version = "1.1.41"
version = "1.1.42"
dependencies = [
"bs58",
"clap 4.5.17",
@@ -6460,7 +6457,7 @@ dependencies = [
[[package]]
name = "nymvisor"
version = "0.1.7"
version = "0.1.8"
dependencies = [
"anyhow",
"bytes",
@@ -9135,37 +9132,6 @@ dependencies = [
"threadpool",
]
[[package]]
name = "tikv-jemalloc-ctl"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b"
dependencies = [
"libc",
"paste",
"tikv-jemalloc-sys",
]
[[package]]
name = "tikv-jemalloc-sys"
version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "tikv-jemallocator"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865"
dependencies = [
"libc",
"tikv-jemalloc-sys",
]
[[package]]
name = "time"
version = "0.3.36"
-1
View File
@@ -5,7 +5,6 @@
panic = "abort"
opt-level = "s"
overflow-checks = true
debug = true
[profile.dev]
panic = "abort"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-client"
version = "1.1.41"
version = "1.1.42"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>", "Jędrzej Stuczyński <andrew@nymtech.net>"]
description = "Implementation of the Nym Client"
edition = "2021"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-socks5-client"
version = "1.1.41"
version = "1.1.42"
authors = ["Dave Hrycyszyn <futurechimp@users.noreply.github.com>"]
description = "A SOCKS5 localhost proxy that converts incoming messages to Sphinx and sends them to a Nym address"
edition = "2021"
@@ -1,100 +0,0 @@
// Copyright 2022-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::ecash::error::EcashTicketError;
use crate::ecash::state::SharedState;
use nym_ecash_double_spending::DoubleSpendingFilter;
use nym_gateway_storage::Storage;
use nym_task::TaskClient;
use nym_validator_client::client::NymApiClientExt;
use nym_validator_client::EcashApiClient;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::sync::Arc;
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio::time::{interval, Duration};
use tracing::{info, trace, warn};
#[derive(Clone)]
pub(crate) struct DoubleSpendingDetector<S> {
spent_serial_numbers: Arc<RwLock<DoubleSpendingFilter>>,
shared_state: SharedState<S>,
}
impl<S> DoubleSpendingDetector<S>
where
S: Storage + Clone + Send + Sync + 'static,
{
pub(crate) fn new(shared_state: SharedState<S>) -> Self {
DoubleSpendingDetector {
spent_serial_numbers: Arc::new(RwLock::new(DoubleSpendingFilter::new_empty_ecash())),
shared_state,
}
}
pub(crate) async fn check(&self, serial_number: &Vec<u8>) -> bool {
self.spent_serial_numbers.read().await.check(serial_number)
}
async fn latest_api_endpoints(
&self,
) -> Result<RwLockReadGuard<Vec<EcashApiClient>>, EcashTicketError> {
let epoch_id = self.shared_state.current_epoch_id().await?;
self.shared_state.api_clients(epoch_id).await
}
async fn refresh_bloomfilter(&self) {
let mut filter_builder = self.spent_serial_numbers.read().await.rebuild();
let api_clients = match self.latest_api_endpoints().await {
Ok(clients) => clients,
Err(err) => {
warn!("failed to obtain current api clients: {err}");
return;
}
};
let mut clients = api_clients
.iter()
.map(|c| c.api_client.clone())
.collect::<Vec<_>>();
clients.shuffle(&mut thread_rng());
for client in clients {
match client.nym_api.double_spending_filter_v1().await {
Ok(response) => {
// due to relative big size of the filter, query only one api since all of them should contain
// roughly the same data anyway.
filter_builder.add_bytes(&response.bitmap);
*self.spent_serial_numbers.write().await = filter_builder.build();
return;
}
Err(err) => {
warn!("Validator @ {} could not be reached. There might be a problem with the ecash endpoint: {err}", client.api_url());
}
}
}
warn!("none of the validators could be reached. the bloomfilter will remain unchanged.");
}
async fn run(&self, mut shutdown: TaskClient) {
info!("Starting Ecash DoubleSpendingDetector");
let mut interval = interval(Duration::from_secs(600));
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
trace!("ecash_verifier::DoubleSpendingDetector : received shutdown");
},
_ = interval.tick() => self.refresh_bloomfilter().await,
}
}
}
pub(crate) fn start(self, shutdown: nym_task::TaskClient) {
tokio::spawn(async move { self.run(shutdown).await });
}
}
@@ -4,7 +4,6 @@
use crate::Error;
use credential_sender::CredentialHandler;
use credential_sender::CredentialHandlerConfig;
use double_spending::DoubleSpendingDetector;
use error::EcashTicketError;
use futures::channel::mpsc::{self, UnboundedSender};
use nym_credentials::CredentialSpendingData;
@@ -18,7 +17,6 @@ use tokio::sync::{Mutex, RwLockReadGuard};
use tracing::error;
pub mod credential_sender;
pub(crate) mod double_spending;
pub mod error;
mod helpers;
mod state;
@@ -31,7 +29,6 @@ pub struct EcashManager<S> {
pk_bytes: [u8; 32], // bytes representation of a pub key representing the verifier
pay_infos: Mutex<Vec<NymPayInfo>>,
cred_sender: UnboundedSender<ClientTicket>,
double_spend_detector: DoubleSpendingDetector<S>,
}
impl<S> EcashManager<S>
@@ -47,9 +44,6 @@ where
) -> Result<Self, Error> {
let shared_state = SharedState::new(nyxd_client, storage).await?;
let double_spend_detector = DoubleSpendingDetector::new(shared_state.clone());
double_spend_detector.clone().start(shutdown.clone());
let (cred_sender, cred_receiver) = mpsc::unbounded();
let cs =
@@ -62,7 +56,6 @@ where
pk_bytes,
pay_infos: Default::default(),
cred_sender,
double_spend_detector,
})
}
@@ -163,10 +156,6 @@ where
Ok(())
}
pub async fn check_double_spend(&self, serial_number: &Vec<u8>) -> bool {
self.double_spend_detector.check(serial_number).await
}
pub fn async_verify(&self, ticket: ClientTicket) {
// TODO: I guess do something for shutdowns
let _ = self
-13
View File
@@ -53,18 +53,6 @@ impl<S: Storage + Clone + 'static> CredentialVerifier<S> {
Ok(())
}
async fn check_bloomfilter(&self, serial_number: &Vec<u8>) -> Result<()> {
trace!("checking the bloomfilter...");
let spent = self.ecash_verifier.check_double_spend(serial_number).await;
if spent {
trace!("the credential has already been spent before at some gateway before (bloomfilter failure)");
return Err(Error::BandwidthCredentialAlreadySpent);
}
Ok(())
}
async fn check_local_db_for_double_spending(&self, serial_number: &[u8]) -> Result<()> {
trace!("checking local db for double spending...");
@@ -128,7 +116,6 @@ impl<S: Storage + Clone + 'static> CredentialVerifier<S> {
}
self.check_credential_spending_date(spend_date.ecash_date())?;
self.check_bloomfilter(&serial_number).await?;
self.check_local_db_for_double_spending(&serial_number)
.await?;
+3 -3
View File
@@ -3,9 +3,6 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("peers in wireguard don't match with in-memory ")]
PeerMismatch,
#[error("traffic byte data needs to be increasing")]
InconsistentConsumedBytes,
@@ -20,4 +17,7 @@ pub enum Error {
#[error("{0}")]
GatewayStorage(#[from] nym_gateway_storage::error::StorageError),
#[error("{0}")]
SystemTime(#[from] std::time::SystemTimeError),
}
+6 -13
View File
@@ -160,13 +160,10 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
.ok_or(Error::MissingClientBandwidthEntry)?
.client_id
{
let bandwidth = storage
.get_available_bandwidth(client_id)
.await?
.ok_or(Error::MissingClientBandwidthEntry)?;
storage.create_bandwidth_entry(client_id).await?;
Ok(Some(BandwidthStorageManager::new(
storage,
ClientBandwidth::new(bandwidth.into()),
ClientBandwidth::new(Default::default()),
client_id,
BandwidthFlushingBehaviourConfig::default(),
true,
@@ -228,14 +225,10 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
.available_bandwidth()
.await
} else {
let peer = self
.host_information
.read()
.await
.peers
.get(key)
.ok_or(Error::PeerMismatch)?
.clone();
let Some(peer) = self.host_information.read().await.peers.get(key).cloned() else {
// host information not updated yet
return Ok(None);
};
BANDWIDTH_CAP_PER_DAY.saturating_sub((peer.rx_bytes + peer.tx_bytes) as i64)
};
+37 -18
View File
@@ -3,6 +3,7 @@
use crate::error::Error;
use crate::peer_controller::PeerControlRequest;
use defguard_wireguard_rs::host::Peer;
use defguard_wireguard_rs::{host::Host, key::Key};
use futures::channel::oneshot;
use nym_authenticator_requests::v2::registration::BANDWIDTH_CAP_PER_DAY;
@@ -12,10 +13,12 @@ use nym_gateway_storage::Storage;
use nym_task::TaskClient;
use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
pub(crate) type SharedBandwidthStorageManager<St> = Arc<RwLock<BandwidthStorageManager<St>>>;
const AUTO_REMOVE_AFTER: Duration = Duration::from_secs(60 * 60 * 24); // 24 hours
pub struct PeerHandle<St> {
storage: St,
@@ -25,6 +28,7 @@ pub struct PeerHandle<St> {
request_tx: mpsc::Sender<PeerControlRequest>,
timeout_check_interval: IntervalStream,
task_client: TaskClient,
startup_timestamp: SystemTime,
}
impl<St: Storage + Clone + 'static> PeerHandle<St> {
@@ -49,14 +53,11 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
request_tx,
timeout_check_interval,
task_client,
startup_timestamp: SystemTime::now(),
}
}
async fn remove_depleted_peer(&self) -> Result<bool, Error> {
log::debug!(
"Peer {} doesn't have bandwidth anymore, removing it",
self.public_key.to_string()
);
async fn remove_peer(&self) -> Result<bool, Error> {
let (response_tx, response_rx) = oneshot::channel();
self.request_tx
.send(PeerControlRequest::RemovePeer {
@@ -72,15 +73,11 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
Ok(success)
}
async fn active_peer(&mut self, storage_peer: WireguardPeer) -> Result<bool, Error> {
let kernel_peer = self
.host_information
.read()
.await
.peers
.get(&self.public_key)
.ok_or(Error::PeerMismatch)?
.clone();
async fn active_peer(
&mut self,
storage_peer: WireguardPeer,
kernel_peer: Peer,
) -> Result<bool, Error> {
if let Some(bandwidth_manager) = &self.bandwidth_storage_manager {
let spent_bandwidth = (kernel_peer.rx_bytes + kernel_peer.tx_bytes)
.checked_sub(storage_peer.rx_bytes as u64 + storage_peer.tx_bytes as u64)
@@ -94,13 +91,25 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
.await
.is_err()
{
let success = self.remove_depleted_peer().await?;
let success = self.remove_peer().await?;
return Ok(!success);
}
} else {
if SystemTime::now().duration_since(self.startup_timestamp)? >= AUTO_REMOVE_AFTER {
log::debug!(
"Peer {} has been present for 24 hours, removing it",
self.public_key.to_string()
);
let success = self.remove_peer().await?;
return Ok(!success);
}
let spent_bandwidth = kernel_peer.rx_bytes + kernel_peer.tx_bytes;
if spent_bandwidth >= BANDWIDTH_CAP_PER_DAY {
let success = self.remove_depleted_peer().await?;
log::debug!(
"Peer {} doesn't have bandwidth anymore, removing it",
self.public_key.to_string()
);
let success = self.remove_peer().await?;
return Ok(!success);
}
}
@@ -112,11 +121,21 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
while !self.task_client.is_shutdown() {
tokio::select! {
_ = self.timeout_check_interval.next() => {
let Some(peer) = self.storage.get_wireguard_peer(&self.public_key.to_string()).await? else {
let Some(kernel_peer) = self
.host_information
.read()
.await
.peers
.get(&self.public_key)
.cloned() else {
// the host information hasn't beed updated yet
continue;
};
let Some(storage_peer) = self.storage.get_wireguard_peer(&self.public_key.to_string()).await? else {
log::debug!("Peer {:?} not in storage anymore, shutting down handle", self.public_key);
return Ok(());
};
if !self.active_peer(peer).await? {
if !self.active_peer(storage_peer, kernel_peer).await? {
log::debug!("Peer {:?} doesn't have bandwidth anymore, shutting down handle", self.public_key);
return Ok(());
}
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "explorer-api"
version = "1.1.40"
version = "1.1.41"
edition = "2021"
license.workspace = true
+3 -8
View File
@@ -4,7 +4,7 @@
[package]
name = "nym-api"
license = "GPL-3.0"
version = "1.1.44"
version = "1.1.45"
authors = [
"Dave Hrycyszyn <futurechimp@users.noreply.github.com>",
"Jędrzej Stuczyński <andrew@nymtech.net>",
@@ -76,7 +76,7 @@ axum = { workspace = true, features = ["tokio"], optional = true }
axum-extra = { workspace = true, features = ["typed-header"], optional = true }
tower-http = { workspace = true, features = ["cors", "trace"], optional = true }
utoipa = { workspace = true, features = ["axum_extras", "time"], optional = true }
utoipa-swagger-ui = { workspace = true, features = ["axum"], optional = true }
utoipa-swagger-ui = { workspace = true, features = ["axum"], optional = true}
utoipauto = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, features = ["env-filter"], optional = true }
tracing = { workspace = true, optional = true }
@@ -112,7 +112,7 @@ cw4 = { workspace = true }
nym-dkg = { path = "../common/dkg", features = ["cw-types"] }
nym-gateway-client = { path = "../common/client-libs/gateway-client" }
nym-inclusion-probability = { path = "../common/inclusion-probability" }
nym-mixnet-contract-common = { path = "../common/cosmwasm-smart-contracts/mixnet-contract", features = ["utoipa"] }
nym-mixnet-contract-common = { path = "../common/cosmwasm-smart-contracts/mixnet-contract", features = ["utoipa"]}
nym-vesting-contract-common = { path = "../common/cosmwasm-smart-contracts/vesting-contract" }
nym-contracts-common = { path = "../common/cosmwasm-smart-contracts/contracts-common" }
nym-multisig-contract-common = { path = "../common/cosmwasm-smart-contracts/multisig-contract" }
@@ -129,10 +129,6 @@ nym-node-requests = { path = "../nym-node/nym-node-requests" }
nym-types = { path = "../common/types" }
nym-http-api-common = { path = "../common/http-api-common", features = ["utoipa"] }
tikv-jemallocator = { version = "0.6", optional = true, features = ["profiling"] }
tikv-jemalloc-sys = { version = "0.6", optional = true, features = ["stats", "profiling", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", optional = true, features = ["use_std", "stats", "profiling"] }
[features]
no-reward = []
generate-ts = ["ts-rs"]
@@ -147,7 +143,6 @@ axum = ["dep:axum",
"nym-http-api-common/utoipa",
"nym-mixnet-contract-common/utoipa"
]
memory-prof = ["tikv-jemallocator", "tikv-jemalloc-ctl", "tikv-jemalloc-sys"]
[build-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
+2 -5
View File
@@ -193,10 +193,7 @@ pub async fn batch_redeem_tickets(
#[openapi(tag = "Ecash")]
#[get("/double-spending-filter-v1")]
pub async fn double_spending_filter_v1(
state: &RocketState<EcashState>,
_state: &RocketState<EcashState>,
) -> crate::ecash::error::Result<Json<SpentCredentialsResponse>> {
let spent_credentials_export = state.get_bloomfilter_bytes().await;
Ok(Json(SpentCredentialsResponse::new(
spent_credentials_export,
)))
Err(EcashError::Restricted)
}
@@ -3,7 +3,7 @@
use crate::ecash::error::EcashError;
use crate::ecash::state::EcashState;
use crate::node_status_api::models::AxumResult;
use crate::node_status_api::models::{AxumErrorResponse, AxumResult};
use crate::v2::AxumAppState;
use axum::{Json, Router};
use nym_api_requests::constants::MIN_BATCH_REDEMPTION_DELAY;
@@ -236,10 +236,7 @@ async fn batch_redeem_tickets(
)
)]
async fn double_spending_filter_v1(
state: Arc<EcashState>,
_state: Arc<EcashState>,
) -> AxumResult<Json<SpentCredentialsResponse>> {
let spent_credentials_export = state.get_bloomfilter_bytes().await;
Ok(Json(SpentCredentialsResponse::new(
spent_credentials_export,
)))
AxumResult::Err(AxumErrorResponse::internal_msg("permanently restricted"))
}
+3
View File
@@ -32,6 +32,9 @@ pub type Result<T, E = EcashError> = std::result::Result<T, E>;
#[derive(Debug, Error)]
pub enum EcashError {
#[error("permanently restricted")]
Restricted,
#[error(transparent)]
IOError(#[from] std::io::Error),
+1 -51
View File
@@ -9,9 +9,8 @@ use crate::ecash::keys::KeyPair;
use nym_config::defaults::BloomfilterParameters;
use nym_crypto::asymmetric::identity;
use nym_ecash_double_spending::DoubleSpendingFilter;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use time::{Date, OffsetDateTime};
use time::Date;
use tokio::sync::RwLock;
pub(crate) struct TicketDoubleSpendingFilter {
@@ -70,10 +69,6 @@ impl TicketDoubleSpendingFilter {
self.today_filter.dump_bitmap()
}
pub(crate) fn export_global_bitmap(&self) -> Vec<u8> {
self.global_filter.dump_bitmap()
}
pub(crate) fn advance_day(&mut self, date: Date, new_global: DoubleSpendingFilter) {
self.built_on = date;
self.global_filter = new_global;
@@ -81,17 +76,6 @@ impl TicketDoubleSpendingFilter {
}
}
pub(crate) struct ExportedDoubleSpendingFilterData {
pub(crate) last_exported_at: OffsetDateTime,
pub(crate) bytes: Vec<u8>,
}
#[derive(Clone)]
pub(crate) struct ExportedDoubleSpendingFilter {
pub(crate) being_exported: Arc<AtomicBool>,
pub(crate) data: Arc<RwLock<ExportedDoubleSpendingFilterData>>,
}
pub(crate) struct LocalEcashState {
pub(crate) ecash_keypair: KeyPair,
pub(crate) identity_keypair: identity::KeyPair,
@@ -102,9 +86,6 @@ pub(crate) struct LocalEcashState {
// the actual, up to date, bloomfilter
pub(crate) double_spending_filter: Arc<RwLock<TicketDoubleSpendingFilter>>,
// the cached byte representation of the bloomfilter to be used by the clients
pub(crate) exported_double_spending_filter: ExportedDoubleSpendingFilter,
}
impl LocalEcashState {
@@ -118,38 +99,7 @@ impl LocalEcashState {
identity_keypair,
partial_coin_index_signatures: Default::default(),
partial_expiration_date_signatures: Default::default(),
exported_double_spending_filter: ExportedDoubleSpendingFilter {
being_exported: Arc::new(Default::default()),
data: Arc::new(RwLock::new(ExportedDoubleSpendingFilterData {
last_exported_at: OffsetDateTime::now_utc(),
bytes: double_spending_filter.export_global_bitmap(),
})),
},
double_spending_filter: Arc::new(RwLock::new(double_spending_filter)),
}
}
pub(crate) fn maybe_background_update_exported_bloomfilter(&self) {
// make sure another query hasn't already spawned an exporting task
let Ok(should_export) = self
.exported_double_spending_filter
.being_exported
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
else {
return;
};
let filter = self.double_spending_filter.clone();
let exported = self.exported_double_spending_filter.clone();
if should_export {
tokio::spawn(async move {
log::debug!("exporting bloomfilter bitmap");
let new = filter.read().await.export_global_bitmap();
let mut exported_guard = exported.data.write().await;
exported_guard.last_exported_at = OffsetDateTime::now_utc();
exported_guard.bytes = new;
});
}
}
}
+1 -14
View File
@@ -45,7 +45,7 @@ use nym_ecash_time::cred_exp_date;
use nym_validator_client::nyxd::AccountId;
use nym_validator_client::EcashApiClient;
use time::ext::NumericalDuration;
use time::{Date, Duration, OffsetDateTime};
use time::{Date, OffsetDateTime};
use tokio::sync::RwLockReadGuard;
pub(crate) mod auxiliary;
@@ -839,17 +839,4 @@ impl EcashState {
res
}
pub async fn get_bloomfilter_bytes(&self) -> Vec<u8> {
let guard = self.local.exported_double_spending_filter.data.read().await;
let bytes = guard.bytes.clone();
// see if it's been > 5min since last export (that value is arbitrary)
if guard.last_exported_at + Duration::minutes(5) < OffsetDateTime::now_utc() {
self.local.maybe_background_update_exported_bloomfilter();
}
bytes
}
}
+2 -13
View File
@@ -24,7 +24,7 @@ use circulating_supply_api::cache::CirculatingSupplyCache;
use clap::Parser;
use ecash::dkg::controller::DkgController;
use node_status_api::NodeStatusCache;
use nym_bin_common::logging::{setup_logging, setup_tracing_logger};
use nym_bin_common::logging::setup_logging;
use nym_config::defaults::NymNetworkDetails;
use nym_contract_cache::cache::NymContractCache;
use nym_sphinx::receiver::SphinxMessageReceiver;
@@ -44,10 +44,6 @@ pub(crate) mod nym_nodes;
mod status;
pub(crate) mod support;
#[cfg(feature = "memory-prof")]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[cfg(feature = "axum")]
mod v2;
@@ -62,16 +58,9 @@ async fn main() -> Result<(), anyhow::Error> {
cfg_if::cfg_if! {if #[cfg(feature = "console-subscriber")] {
// instrument tokio console subscriber needs RUSTFLAGS="--cfg tokio_unstable" at build time
console_subscriber::init();
} else {
setup_tracing_logger();
}}
// setup_tracing_logger();
// std::env::set_var("MALLOC_CONF", "prof:true,lg_prof_interval:28");
// setup_tracing_logger();
setup_logging();
// TODO rocket: replace with tracing logger once rocket is eliminated from code
info!("Starting nym api...");
-131
View File
@@ -1,131 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::node_status_api::models::RocketErrorResponse;
use okapi::openapi3::{OpenApi, Responses};
use rocket::http::Status;
use rocket::response::Responder;
use rocket::{response, Request, Route};
use rocket_okapi::gen::OpenApiGenerator;
use rocket_okapi::response::OpenApiResponderInner;
use rocket_okapi::settings::OpenApiSettings;
use rocket_okapi::util::ensure_status_code_exists;
use rocket_okapi::{openapi, openapi_get_routes_spec};
// code taken from https://github.dev/GreptimeTeam/greptimedb/blob/develop/src/cmd/src/bin/greptime.rs
#[cfg(feature = "memory-prof")]
pub mod memory_prof {
const PROF_DUMP: &[u8] = b"prof.dump\0";
// const OPT_PROF: &[u8] = b"opt.prof\0";
use anyhow::{bail, Context};
use nym_config::{must_get_home, DEFAULT_NYM_APIS_DIR, NYM_DIR};
use std::ffi::{c_char, CString};
use time::OffsetDateTime;
use tokio::fs::create_dir_all;
use tokio::io::AsyncReadExt;
pub async fn dump_profile() -> anyhow::Result<Vec<u8>> {
if !is_prof_enabled()? {
bail!("memory profiling is not enabled")
}
let now = OffsetDateTime::now_utc();
let dump_path = must_get_home()
.join(NYM_DIR)
.join(DEFAULT_NYM_APIS_DIR)
.join("memory_dumps")
.join(format!("{}", now.unix_timestamp()))
.join("nym-api.hprof");
let parent = dump_path.parent().unwrap();
create_dir_all(&parent).await?;
info!("using {} for the memory dump", dump_path.display());
let path = dump_path
.to_str()
.context("the temp dir contained invalid characters")?
.to_string();
let mut bytes = CString::new(path.as_str())
.context("could not construct a CString out of the path")?
.into_bytes_with_nul();
{
// #safety: we always expect a valid temp file path to write profiling data to.
let ptr = bytes.as_mut_ptr() as *mut c_char;
unsafe {
tikv_jemalloc_ctl::raw::write(PROF_DUMP, ptr).context(format!(
"failed to dump profiling data to {}",
dump_path.display()
))?
}
}
let mut f = tokio::fs::File::open(path.as_str())
.await
.context("failed to open the dump file")?;
let mut buf = vec![];
let _ = f
.read_to_end(&mut buf)
.await
.context("failed to read the dump file")?;
Ok(buf)
}
fn is_prof_enabled() -> anyhow::Result<bool> {
Ok(tikv_jemalloc_ctl::profiling::prof::read()?)
// Ok(unsafe {
// tikv_jemalloc_ctl::raw::read::<bool>(OPT_PROF)
// .context("failed to check the OPT_PROF")?
// })
}
}
pub struct BinaryResponse {
inner: Vec<u8>,
}
impl<'r, 'o: 'r> Responder<'r, 'o> for BinaryResponse {
fn respond_to(self, _req: &'r Request<'_>) -> response::Result<'o> {
let mut res = rocket::Response::new();
res.set_sized_body(self.inner.len(), std::io::Cursor::new(self.inner));
Ok(res)
}
}
impl OpenApiResponderInner for BinaryResponse {
fn responses(_gen: &mut OpenApiGenerator) -> rocket_okapi::Result<Responses> {
let mut responses = Responses::default();
ensure_status_code_exists(&mut responses, 200);
Ok(responses)
}
}
/// foomp
#[cfg(feature = "memory-prof")]
#[openapi(tag = "profiling")]
#[get("/mem")]
pub async fn mem_prof_handler() -> Result<BinaryResponse, RocketErrorResponse> {
let dump_data = memory_prof::dump_profile()
.await
.map_err(|err| RocketErrorResponse::new(err.to_string(), Status::InternalServerError))?;
Ok(BinaryResponse { inner: dump_data })
}
#[cfg(not(feature = "memory-prof"))]
#[openapi(tag = "profiling")]
#[get("/mem")]
pub async fn mem_prof_handler() -> RocketErrorResponse {
RocketErrorResponse::new("The 'mem-prof' feature is disabled", Status::NotImplemented)
}
pub(crate) fn api_status_routes(settings: &OpenApiSettings) -> (Vec<Route>, OpenApi) {
openapi_get_routes_spec![
settings:
mem_prof_handler
]
}
-3
View File
@@ -28,8 +28,6 @@ use rocket_okapi::swagger_ui::make_swagger_ui;
pub(crate) mod helpers;
pub(crate) mod openapi;
pub(crate) mod mem_prof;
pub(crate) async fn setup_rocket(
config: &Config,
network_details: NetworkDetails,
@@ -54,7 +52,6 @@ pub(crate) async fn setup_rocket(
"/api-status" => api_status_routes(&openapi_settings),
"/ecash" => ecash::routes_open_api(&openapi_settings, config.coconut_signer.enabled),
"" => nym_node_routes_deprecated(&openapi_settings),
"/prof" => mem_prof::api_status_routes(&openapi_settings),
// => when we move those routes, we'll need to add a redirection for backwards compatibility
"/unstable/nym-nodes" => nym_node_routes_next(&openapi_settings)
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-node"
version = "1.1.8"
version = "1.1.9"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -8,7 +8,7 @@ use std::{
use crate::{error::AuthenticatorError, peer_manager::PeerManager};
use futures::StreamExt;
use log::warn;
use log::{error, warn};
use nym_authenticator_requests::v2::{
self,
registration::{
@@ -152,10 +152,8 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
) -> AuthenticatorHandleResult {
let remote_public = init_message.pub_key;
let nonce: u64 = fastrand::u64(..);
if let Some(registration_data) = self
.registred_and_free
.read()
.await
let mut registred_and_free = self.registred_and_free.write().await;
if let Some(registration_data) = registred_and_free
.registration_in_progres
.get(&remote_public)
{
@@ -184,7 +182,6 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
));
}
let mut registred_and_free = self.registred_and_free.write().await;
let private_ip_ref = registred_and_free
.free_private_network_ips
.iter_mut()
@@ -289,10 +286,6 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
credential: CredentialSpendingData,
client_id: i64,
) -> Result<i64> {
ecash_verifier
.storage()
.create_bandwidth_entry(client_id)
.await?;
let bandwidth = ecash_verifier
.storage()
.get_available_bandwidth(client_id)
@@ -340,6 +333,7 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
let request = match deserialize_request(&reconstructed) {
Err(AuthenticatorError::InvalidPacketVersion(version)) => {
warn!("[DBG-TEMP]: failed to deserialize request - wrong packet version");
return self.on_version_mismatch(version, &reconstructed);
}
req => req,
@@ -347,14 +341,17 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
match request.data {
AuthenticatorRequestData::Initial(init_msg) => {
warn!("[DBG-TEMP]: received 'init_msg' - {init_msg:?}");
self.on_initial_request(init_msg, request.request_id, request.reply_to)
.await
}
AuthenticatorRequestData::Final(final_msg) => {
warn!("[DBG-TEMP]: received 'final_msg': {final_msg:?}");
self.on_final_request(*final_msg, request.request_id, request.reply_to)
.await
}
AuthenticatorRequestData::QueryBandwidth(peer_public_key) => {
warn!("[DBG-TEMP]: received 'query_bandwidth_msg' for {peer_public_key}");
self.on_query_bandwidth_request(
peer_public_key,
request.request_id,
@@ -408,6 +405,7 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
if let Some(msg) = msg {
match self.on_reconstructed_message(msg).await {
Ok(response) => {
warn!("[DBG-TEMP]: produced the following response: {response:?}");
if let Err(err) = self.handle_response(response).await {
log::error!("Mixnet listener failed to handle response: {err}");
}
@@ -4,7 +4,7 @@
[package]
name = "nym-network-requester"
license = "GPL-3.0"
version = "1.1.42"
version = "1.1.43"
authors.workspace = true
edition.workspace = true
rust-version = "1.70"
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nym-cli"
version = "1.1.42"
version = "1.1.43"
authors.workspace = true
edition = "2021"
license.workspace = true
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "nymvisor"
version = "0.1.7"
version = "0.1.8"
authors.workspace = true
repository.workspace = true
homepage.workspace = true