Feature/credential proxy crate (#6018)

* moved storage and deposits buffer to the common lib

* move more of the state into the shared lib

* extracted the rest of the features into the shared lib

* fixed test imports

* clippy
This commit is contained in:
Jędrzej Stuczyński
2025-09-10 09:28:38 +01:00
committed by GitHub
parent 7c5f10a219
commit d3cdaf373b
46 changed files with 1945 additions and 1716 deletions
Generated
+38
View File
@@ -5464,6 +5464,7 @@ dependencies = [
"nym-bin-common",
"nym-compact-ecash",
"nym-config",
"nym-credential-proxy-lib",
"nym-credential-proxy-requests",
"nym-credentials",
"nym-credentials-interface",
@@ -5494,6 +5495,43 @@ dependencies = [
"zeroize",
]
[[package]]
name = "nym-credential-proxy-lib"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.7.9",
"bip39",
"bs58",
"futures",
"humantime",
"nym-compact-ecash",
"nym-credential-proxy-requests",
"nym-credentials",
"nym-credentials-interface",
"nym-crypto",
"nym-ecash-contract-common",
"nym-ecash-signer-check",
"nym-network-defaults",
"nym-validator-client",
"rand 0.8.5",
"reqwest 0.12.22",
"serde",
"serde_json",
"sqlx",
"strum",
"strum_macros",
"tempfile",
"thiserror 2.0.12",
"time",
"tokio",
"tokio-util",
"tracing",
"url",
"uuid",
"zeroize",
]
[[package]]
name = "nym-credential-proxy-requests"
version = "0.1.0"
+1
View File
@@ -43,6 +43,7 @@ members = [
"common/cosmwasm-smart-contracts/nym-performance-contract",
"common/cosmwasm-smart-contracts/nym-pool-contract",
"common/cosmwasm-smart-contracts/vesting-contract",
"common/credential-proxy",
"common/credential-storage",
"common/credential-utils",
"common/credential-verification",
+55
View File
@@ -0,0 +1,55 @@
[package]
name = "nym-credential-proxy-lib"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
anyhow = { workspace = true }
axum = { workspace = true }
bip39 = { workspace = true, features = ["zeroize"] }
bs58 = { workspace = true }
futures = { workspace = true }
humantime = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"] }
time = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
uuid = { workspace = true, features = ["serde"] }
url = { workspace = true }
zeroize = { workspace = true }
nym-credentials = { path = "../credentials" }
nym-crypto = { path = "../crypto", features = ["asymmetric", "rand", "serde"] }
nym-credentials-interface = { path = "../credentials-interface" }
nym-credential-proxy-requests = { path = "../../nym-credential-proxy/nym-credential-proxy-requests" }
nym-ecash-signer-check = { path = "../ecash-signer-check" }
nym-ecash-contract-common = { path = "../cosmwasm-smart-contracts/ecash-contract" }
nym-compact-ecash = { path = "../nym_offline_compact_ecash" }
nym-validator-client = { path = "../client-libs/validator-client" }
nym-network-defaults = { path = "../network-defaults" }
[dev-dependencies]
tempfile = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
[lints]
workspace = true
@@ -10,11 +10,11 @@ use nym_validator_client::nyxd::{Coin, Hash};
use time::OffsetDateTime;
use zeroize::Zeroizing;
pub(crate) struct BufferedDeposit {
pub(crate) deposit_id: u32,
pub struct BufferedDeposit {
pub deposit_id: u32,
// note: this type implements `ZeroizeOnDrop`
pub(crate) ed25519_private_key: ed25519::PrivateKey,
pub ed25519_private_key: ed25519::PrivateKey,
}
impl TryFrom<StorableEcashDeposit> for BufferedDeposit {
@@ -36,14 +36,14 @@ impl TryFrom<StorableEcashDeposit> for BufferedDeposit {
}
impl BufferedDeposit {
pub(crate) fn new(deposit_id: u32, ed25519_private_key: ed25519::PrivateKey) -> Self {
pub fn new(deposit_id: u32, ed25519_private_key: ed25519::PrivateKey) -> Self {
BufferedDeposit {
deposit_id,
ed25519_private_key,
}
}
pub(crate) fn sign_ticketbook_plaintext(
pub fn sign_ticketbook_plaintext(
&self,
withdrawal_request: &WithdrawalRequest,
) -> ed25519::Signature {
@@ -52,13 +52,13 @@ impl BufferedDeposit {
}
}
pub(crate) struct PerformedDeposits {
pub(crate) deposits_data: Vec<BufferedDeposit>,
pub struct PerformedDeposits {
pub deposits_data: Vec<BufferedDeposit>,
// shared by all performed deposits as they were included in the same tx
pub(crate) tx_hash: Hash,
pub(crate) requested_on: OffsetDateTime,
pub(crate) deposit_amount: Coin,
pub tx_hash: Hash,
pub requested_on: OffsetDateTime,
pub deposit_amount: Coin,
}
impl PerformedDeposits {
@@ -1,11 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::deposits_buffer::helpers::{request_sizes, BufferedDeposit, PerformedDeposits};
use crate::deposits_buffer::helpers::request_sizes;
use crate::deposits_buffer::refill_task::RefillTask;
use crate::error::CredentialProxyError;
use crate::http::state::required_deposit_cache::RequiredDepositCache;
use crate::http::state::ChainClient;
use crate::shared_state::nyxd_client::ChainClient;
use crate::shared_state::required_deposit_cache::RequiredDepositCache;
use crate::storage::CredentialProxyStorage;
use nym_compact_ecash::PublicKeyUser;
use nym_crypto::asymmetric::ed25519;
@@ -21,6 +21,8 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use uuid::Uuid;
pub use helpers::{BufferedDeposit, PerformedDeposits};
pub(crate) mod helpers;
mod refill_task;
@@ -43,12 +45,12 @@ struct DepositsBufferInner {
}
#[derive(Clone)]
pub(crate) struct DepositsBuffer {
pub struct DepositsBuffer {
inner: Arc<DepositsBufferInner>,
}
impl DepositsBuffer {
pub(crate) async fn new(
pub async fn new(
storage: CredentialProxyStorage,
client: ChainClient,
required_deposit_cache: RequiredDepositCache,
@@ -250,7 +252,7 @@ impl DepositsBuffer {
}
}
pub(crate) async fn get_valid_deposit(
pub async fn get_valid_deposit(
&self,
request_uuid: Uuid,
requested_on: OffsetDateTime,
@@ -290,7 +292,7 @@ impl DepositsBuffer {
}
}
pub(crate) async fn wait_for_shutdown(&self) {
pub async fn wait_for_shutdown(&self) {
let task_handle = self.inner.deposits_refill_task.take_task_join_handle();
if let Some(task_handle) = task_handle {
if !task_handle.is_finished() {
@@ -1,4 +1,4 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// Copyright 2025 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use nym_ecash_signer_check::SignerCheckError;
@@ -128,6 +128,30 @@ pub enum CredentialProxyError {
#[from]
source: SignerCheckError,
},
#[error(
"this operation couldn't be completed as the program is in the process of shutting down"
)]
ShutdownInProgress,
#[error("failed to obtain wallet shares with id {id}: {message}")]
ShareByIdLoadError { message: String, id: i64 },
#[error("failed to obtain wallet shares with device_id {device_id} and credential_id: {credential_id}: {message}")]
ShareByDeviceLoadError {
message: String,
device_id: String,
credential_id: String,
},
#[error("could not find shares with id {id}")]
SharesByIdNotFound { id: i64 },
#[error("could not find shares with device_id {device_id} and credential_id: {credential_id}")]
SharesByDeviceNotFound {
device_id: String,
credential_id: String,
},
}
impl CredentialProxyError {
+67
View File
@@ -0,0 +1,67 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use rand::rngs::OsRng;
use rand::RngCore;
use time::OffsetDateTime;
use tracing::{debug, info, warn};
use uuid::Uuid;
pub fn random_uuid() -> Uuid {
let mut bytes = [0u8; 16];
let mut rng = OsRng;
rng.fill_bytes(&mut bytes);
Uuid::from_bytes(bytes)
}
pub struct LockTimer {
created: OffsetDateTime,
message: String,
}
impl LockTimer {
pub fn new<S: Into<String>>(message: S) -> Self {
LockTimer {
message: message.into(),
..Default::default()
}
}
}
impl Drop for LockTimer {
fn drop(&mut self) {
let time_taken = OffsetDateTime::now_utc() - self.created;
let time_taken_formatted = humantime::format_duration(time_taken.unsigned_abs());
if time_taken > time::Duration::SECOND * 10 {
warn!(time_taken = %time_taken_formatted, "{}", self.message)
} else if time_taken > time::Duration::SECOND * 5 {
info!(time_taken = %time_taken_formatted, "{}", self.message)
} else {
debug!(time_taken = %time_taken_formatted, "{}", self.message)
};
}
}
impl Default for LockTimer {
fn default() -> Self {
LockTimer {
created: OffsetDateTime::now_utc(),
message: "released the lock".to_string(),
}
}
}
// #[allow(clippy::panic)]
// fn build_sha_short() -> &'static str {
// let bin_info = bin_info!();
// if bin_info.commit_sha.len() < 7 {
// panic!("unavailable build commit sha")
// }
//
// if bin_info.commit_sha == "VERGEN_IDEMPOTENT_OUTPUT" {
// error!("the binary hasn't been built correctly. it doesn't have a commit sha information");
// return "unknown";
// }
//
// &bin_info.commit_sha[..7]
// }
@@ -1,11 +1,12 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::CredentialProxyError;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use nym_credential_proxy_requests::api::v1::ErrorResponse;
use tracing::warn;
use uuid::Uuid;
#[derive(Debug, Clone)]
@@ -35,6 +36,10 @@ impl RequestError {
}
}
pub fn new_plain_error(err: CredentialProxyError) -> Self {
Self::from_err(err, StatusCode::INTERNAL_SERVER_ERROR)
}
pub fn new_server_error(err: CredentialProxyError, uuid: Uuid) -> Self {
RequestError::new_with_uuid(err.to_string(), uuid, StatusCode::INTERNAL_SERVER_ERROR)
}
@@ -59,3 +64,12 @@ impl IntoResponse for RequestError {
(self.status, Json(self.inner)).into_response()
}
}
pub fn db_failure<T>(err: CredentialProxyError, uuid: Uuid) -> Result<T, RequestError> {
warn!("db failure: {err}");
Err(RequestError::new_with_uuid(
format!("oh no, something went wrong {err}"),
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
))
}
+13
View File
@@ -0,0 +1,13 @@
// Copyright 2025 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod deposits_buffer;
pub mod error;
pub mod helpers;
pub mod http_helpers;
pub mod nym_api_helpers;
pub mod quorum_checker;
pub mod shared_state;
pub mod storage;
pub mod ticketbook_manager;
pub mod webhook;
@@ -19,9 +19,9 @@ use time::{Date, OffsetDateTime};
use tokio::sync::{Mutex, RwLock, RwLockReadGuard};
use tracing::warn;
pub(crate) struct CachedEpoch {
pub struct CachedEpoch {
valid_until: OffsetDateTime,
pub(crate) current_epoch: Epoch,
pub current_epoch: Epoch,
}
impl Default for CachedEpoch {
@@ -34,11 +34,11 @@ impl Default for CachedEpoch {
}
impl CachedEpoch {
pub(crate) fn is_valid(&self) -> bool {
pub fn is_valid(&self) -> bool {
self.valid_until > OffsetDateTime::now_utc()
}
pub(crate) fn update(&mut self, epoch: Epoch) {
pub fn update(&mut self, epoch: Epoch) {
let now = OffsetDateTime::now_utc();
let validity_duration = if let Some(epoch_finish) = epoch.deadline {
@@ -58,13 +58,13 @@ impl CachedEpoch {
}
// a map of items that never change for given key
pub(crate) struct CachedImmutableItems<K, V> {
pub struct CachedImmutableItems<K, V> {
// I wonder if there's a more efficient structure with OnceLock or OnceCell or something
inner: RwLock<HashMap<K, V>>,
}
// an item that stays constant throughout given epoch
pub(crate) type CachedImmutableEpochItem<T> = CachedImmutableItems<EpochId, T>;
pub type CachedImmutableEpochItem<T> = CachedImmutableItems<EpochId, T>;
impl<K, V> Default for CachedImmutableItems<K, V> {
fn default() -> Self {
@@ -86,11 +86,7 @@ impl<K, V> CachedImmutableItems<K, V>
where
K: Eq + Hash,
{
pub(crate) async fn get_or_init<F, U, E>(
&self,
key: K,
f: F,
) -> Result<RwLockReadGuard<'_, V>, E>
pub async fn get_or_init<F, U, E>(&self, key: K, f: F) -> Result<RwLockReadGuard<'_, V>, E>
where
F: FnOnce() -> U,
U: Future<Output = Result<V, E>>,
@@ -129,9 +125,7 @@ where
}
}
pub(crate) fn ensure_sane_expiration_date(
expiration_date: Date,
) -> Result<(), CredentialProxyError> {
pub fn ensure_sane_expiration_date(expiration_date: Date) -> Result<(), CredentialProxyError> {
let today = ecash_today();
if expiration_date < today.date() {
@@ -146,7 +140,7 @@ pub(crate) fn ensure_sane_expiration_date(
Ok(())
}
pub(crate) async fn query_all_threshold_apis<F, T, U>(
pub async fn query_all_threshold_apis<F, T, U>(
all_apis: Vec<EcashApiClient>,
threshold: u64,
f: F,
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::CredentialProxyError;
use crate::http::state::ChainClient;
use crate::shared_state::nyxd_client::ChainClient;
use nym_ecash_signer_check::{check_known_dealers, dkg_details_with_client};
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -12,17 +12,17 @@ use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
#[derive(Clone)]
pub(crate) struct QuorumState {
pub struct QuorumState {
available: Arc<AtomicBool>,
}
impl QuorumState {
pub(crate) fn available(&self) -> bool {
pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
}
pub(crate) struct QuorumStateChecker {
pub struct QuorumStateChecker {
client: ChainClient,
cancellation_token: CancellationToken,
check_interval: Duration,
@@ -0,0 +1,49 @@
// Copyright 2025 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::nym_api_helpers::{CachedEpoch, CachedImmutableEpochItem, CachedImmutableItems};
use crate::quorum_checker::QuorumState;
use crate::shared_state::required_deposit_cache::RequiredDepositCache;
use nym_compact_ecash::VerificationKeyAuth;
use nym_credentials::{AggregatedCoinIndicesSignatures, AggregatedExpirationDateSignatures};
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::EcashApiClient;
use time::Date;
use tokio::sync::RwLock;
pub struct EcashState {
pub required_deposit_cache: RequiredDepositCache,
pub quorum_state: QuorumState,
pub cached_epoch: RwLock<CachedEpoch>,
pub master_verification_key: CachedImmutableEpochItem<VerificationKeyAuth>,
pub threshold_values: CachedImmutableEpochItem<u64>,
pub epoch_clients: CachedImmutableEpochItem<Vec<EcashApiClient>>,
pub coin_index_signatures: CachedImmutableEpochItem<AggregatedCoinIndicesSignatures>,
pub expiration_date_signatures:
CachedImmutableItems<(EpochId, Date), AggregatedExpirationDateSignatures>,
}
impl EcashState {
pub fn new(
required_deposit_cache: RequiredDepositCache,
quorum_state: QuorumState,
) -> EcashState {
EcashState {
required_deposit_cache,
quorum_state,
cached_epoch: Default::default(),
master_verification_key: Default::default(),
threshold_values: Default::default(),
epoch_clients: Default::default(),
coin_index_signatures: Default::default(),
expiration_date_signatures: Default::default(),
}
}
}
@@ -0,0 +1,495 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::deposits_buffer::{BufferedDeposit, DepositsBuffer};
use crate::error::CredentialProxyError;
use crate::nym_api_helpers::{ensure_sane_expiration_date, query_all_threshold_apis};
use crate::shared_state::ecash_state::EcashState;
use crate::shared_state::nyxd_client::ChainClient;
use crate::storage::CredentialProxyStorage;
use nym_compact_ecash::scheme::coin_indices_signatures::{
aggregate_annotated_indices_signatures, CoinIndexSignatureShare,
};
use nym_compact_ecash::scheme::expiration_date_signatures::{
aggregate_annotated_expiration_signatures, ExpirationDateSignatureShare,
};
use nym_compact_ecash::{Base58, PublicKeyUser, VerificationKeyAuth};
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
AggregatedCoinIndicesSignaturesResponse, AggregatedExpirationDateSignaturesResponse,
GlobalDataParams, MasterVerificationKeyResponse,
};
use nym_credentials::ecash::utils::EcashTime;
use nym_credentials::{
AggregatedCoinIndicesSignatures, AggregatedExpirationDateSignatures, EpochVerificationKey,
};
use nym_ecash_contract_common::deposit::DepositId;
use nym_validator_client::coconut::EcashApiError;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::dkg_query_client::Epoch;
use nym_validator_client::nyxd::contract_traits::{DkgQueryClient, PagedDkgQueryClient};
use nym_validator_client::nyxd::Coin;
use nym_validator_client::{DirectSigningHttpRpcNyxdClient, EcashApiClient};
use std::sync::Arc;
use std::time::Duration;
use time::{Date, OffsetDateTime};
use tokio::sync::RwLockReadGuard;
use tokio::time::Instant;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
pub mod ecash_state;
pub mod nyxd_client;
pub mod required_deposit_cache;
#[derive(Clone)]
pub struct CredentialProxyState {
inner: Arc<CredentialProxyStateInner>,
}
impl CredentialProxyState {
pub fn new(
storage: CredentialProxyStorage,
client: ChainClient,
deposits_buffer: DepositsBuffer,
ecash_state: EcashState,
) -> Self {
CredentialProxyState {
inner: Arc::new(CredentialProxyStateInner {
storage,
client,
deposits_buffer,
ecash_state,
}),
}
}
pub fn storage(&self) -> &CredentialProxyStorage {
&self.inner.storage
}
pub async fn deposit_amount(&self) -> Result<Coin, CredentialProxyError> {
self.ecash_state()
.required_deposit_cache
.get_or_update(self.client())
.await
}
pub fn client(&self) -> &ChainClient {
&self.inner.client
}
pub fn deposits_buffer(&self) -> &DepositsBuffer {
&self.inner.deposits_buffer
}
pub fn ecash_state(&self) -> &EcashState {
&self.inner.ecash_state
}
pub(crate) async fn query_chain(&self) -> RwLockReadGuard<'_, DirectSigningHttpRpcNyxdClient> {
self.inner.client.query_chain().await
}
pub async fn ensure_credentials_issuable(&self) -> Result<(), CredentialProxyError> {
let epoch = self.current_epoch().await?;
if epoch.state.is_final() {
Ok(())
} else if let Some(final_timestamp) = epoch.final_timestamp_secs() {
// SAFETY: the timestamp values in our DKG contract should be valid timestamps,
// otherwise it means the chain is seriously misbehaving
#[allow(clippy::unwrap_used)]
let finish_dt = OffsetDateTime::from_unix_timestamp(final_timestamp as i64).unwrap();
Err(CredentialProxyError::CredentialsNotYetIssuable {
availability: finish_dt,
})
} else if epoch.state.is_waiting_initialisation() {
Err(CredentialProxyError::UninitialisedDkg)
} else {
Err(CredentialProxyError::UnknownEcashFailure)
}
}
pub async fn get_deposit(
&self,
request_uuid: Uuid,
requested_on: OffsetDateTime,
client_pubkey: PublicKeyUser,
) -> Result<BufferedDeposit, CredentialProxyError> {
let start = Instant::now();
let deposit = self
.deposits_buffer()
.get_valid_deposit(request_uuid, requested_on, client_pubkey)
.await;
let time_taken = start.elapsed();
let formatted = humantime::format_duration(time_taken);
if time_taken > Duration::from_secs(10) {
warn!("attempting to get buffered deposit took {formatted}. perhaps the buffer is too small or the process/chain is overloaded?")
} else {
debug!("attempting to get buffered deposit took {formatted}")
};
deposit
}
pub async fn insert_deposit_usage_error(&self, deposit_id: DepositId, error: String) {
if let Err(err) = self
.storage()
.insert_deposit_usage_error(deposit_id, error)
.await
{
error!("failed to insert information about deposit (id: {deposit_id}) usage failure: {err}")
}
}
pub async fn current_epoch_id(&self) -> Result<EpochId, CredentialProxyError> {
let read_guard = self.inner.ecash_state.cached_epoch.read().await;
if read_guard.is_valid() {
return Ok(read_guard.current_epoch.epoch_id);
}
// update cache
drop(read_guard);
let mut write_guard = self.inner.ecash_state.cached_epoch.write().await;
let epoch = self.query_chain().await.get_current_epoch().await?;
write_guard.update(epoch);
Ok(epoch.epoch_id)
}
pub async fn current_epoch(&self) -> Result<Epoch, CredentialProxyError> {
let read_guard = self.ecash_state().cached_epoch.read().await;
if read_guard.is_valid() {
return Ok(read_guard.current_epoch);
}
// update cache
drop(read_guard);
let mut write_guard = self.ecash_state().cached_epoch.write().await;
let epoch = self.query_chain().await.get_current_epoch().await?;
write_guard.update(epoch);
Ok(epoch)
}
pub async fn global_data(
&self,
global_data: GlobalDataParams,
epoch_id: EpochId,
expiration_date: Date,
) -> Result<
(
Option<MasterVerificationKeyResponse>,
Option<AggregatedExpirationDateSignaturesResponse>,
Option<AggregatedCoinIndicesSignaturesResponse>,
),
CredentialProxyError,
> {
let master_verification_key = if global_data.include_master_verification_key {
debug!("including master verification key in the response");
Some(
self.master_verification_key(Some(epoch_id))
.await
.map(|key| MasterVerificationKeyResponse {
epoch_id,
bs58_encoded_key: key.to_bs58(),
})
.inspect_err(|err| warn!("request failure: {err}"))?,
)
} else {
None
};
let aggregated_expiration_date_signatures =
if global_data.include_expiration_date_signatures {
debug!("including expiration date signatures in the response");
Some(
self.master_expiration_date_signatures(epoch_id, expiration_date)
.await
.map(|signatures| AggregatedExpirationDateSignaturesResponse {
signatures: signatures.clone(),
})
.inspect_err(|err| warn!("request failure: {err}"))?,
)
} else {
None
};
let aggregated_coin_index_signatures = if global_data.include_coin_index_signatures {
debug!("including coin index signatures in the response");
Some(
self.master_coin_index_signatures(Some(epoch_id))
.await
.map(|signatures| AggregatedCoinIndicesSignaturesResponse {
signatures: signatures.clone(),
})
.inspect_err(|err| warn!("request failure: {err}"))?,
)
} else {
None
};
Ok((
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
))
}
pub async fn master_verification_key(
&self,
epoch_id: Option<EpochId>,
) -> Result<RwLockReadGuard<'_, VerificationKeyAuth>, CredentialProxyError> {
let epoch_id = match epoch_id {
Some(id) => id,
None => self.current_epoch_id().await?,
};
self.inner
.ecash_state
.master_verification_key
.get_or_init(epoch_id, || async {
// 1. check the storage
if let Some(stored) = self
.inner
.storage
.get_master_verification_key(epoch_id)
.await?
{
return Ok(stored.key);
}
info!("attempting to establish master verification key for epoch {epoch_id}...");
// 2. perform actual aggregation
let all_apis = self.ecash_clients(epoch_id).await?;
let threshold = self.ecash_threshold(epoch_id).await?;
if all_apis.len() < threshold as usize {
return Err(CredentialProxyError::InsufficientNumberOfSigners {
threshold,
available: all_apis.len(),
});
}
let master_key = nym_credentials::aggregate_verification_keys(&all_apis)?;
let epoch = EpochVerificationKey {
epoch_id,
key: master_key,
};
// 3. save the key in the storage for when we reboot
self.inner
.storage
.insert_master_verification_key(&epoch)
.await?;
Ok(epoch.key)
})
.await
}
pub async fn master_coin_index_signatures(
&self,
epoch_id: Option<EpochId>,
) -> Result<RwLockReadGuard<'_, AggregatedCoinIndicesSignatures>, CredentialProxyError> {
let epoch_id = match epoch_id {
Some(id) => id,
None => self.current_epoch_id().await?,
};
self.inner
.ecash_state
.coin_index_signatures
.get_or_init(epoch_id, || async {
// 1. check the storage
if let Some(master_sigs) = self
.inner
.storage
.get_master_coin_index_signatures(epoch_id)
.await?
{
return Ok(master_sigs);
}
info!(
"attempting to establish master coin index signatures for epoch {epoch_id}..."
);
// 2. go around APIs and attempt to aggregate the data
let master_vk = self.master_verification_key(Some(epoch_id)).await?;
let all_apis = self.ecash_clients(epoch_id).await?;
let threshold = self.ecash_threshold(epoch_id).await?;
let get_partial_signatures = |api: EcashApiClient| async {
// move the api into the closure
let api = api;
let node_index = api.node_id;
let partial_vk = api.verification_key;
let partial = api
.api_client
.partial_coin_indices_signatures(Some(epoch_id))
.await?
.signatures;
Ok(CoinIndexSignatureShare {
index: node_index,
key: partial_vk,
signatures: partial,
})
};
let shares =
query_all_threshold_apis(all_apis.clone(), threshold, get_partial_signatures)
.await?;
let aggregated = aggregate_annotated_indices_signatures(
nym_credentials_interface::ecash_parameters(),
&master_vk,
&shares,
)?;
let sigs = AggregatedCoinIndicesSignatures {
epoch_id,
signatures: aggregated,
};
// 3. save the signatures in the storage for when we reboot
self.inner
.storage
.insert_master_coin_index_signatures(&sigs)
.await?;
Ok(sigs)
})
.await
}
pub async fn master_expiration_date_signatures(
&self,
epoch_id: EpochId,
expiration_date: Date,
) -> Result<RwLockReadGuard<'_, AggregatedExpirationDateSignatures>, CredentialProxyError> {
self.inner.ecash_state
.expiration_date_signatures
.get_or_init((epoch_id, expiration_date), || async {
// 1. sanity check to see if the expiration_date is not nonsense
ensure_sane_expiration_date(expiration_date)?;
// 2. check the storage
if let Some(master_sigs) = self
.storage()
.get_master_expiration_date_signatures(expiration_date, epoch_id)
.await?
{
return Ok(master_sigs);
}
info!(
"attempting to establish master expiration date signatures for {expiration_date} and epoch {epoch_id}..."
);
// 3. go around APIs and attempt to aggregate the data
let epoch_id = self.current_epoch_id().await?;
let master_vk = self.master_verification_key(Some(epoch_id)).await?;
let all_apis = self.ecash_clients(epoch_id).await?;
let threshold = self.ecash_threshold(epoch_id).await?;
let get_partial_signatures = |api: EcashApiClient| async {
// move the api into the closure
let api = api;
let node_index = api.node_id;
let partial_vk = api.verification_key;
let partial = api
.api_client
.partial_expiration_date_signatures(Some(expiration_date), Some(epoch_id))
.await?
.signatures;
Ok(ExpirationDateSignatureShare {
index: node_index,
key: partial_vk,
signatures: partial,
})
};
let shares =
query_all_threshold_apis(all_apis.clone(), threshold, get_partial_signatures)
.await?;
let aggregated = aggregate_annotated_expiration_signatures(
&master_vk,
expiration_date.ecash_unix_timestamp(),
&shares,
)?;
let sigs = AggregatedExpirationDateSignatures {
epoch_id,
expiration_date,
signatures: aggregated,
};
// 4. save the signatures in the storage for when we reboot
self.inner.storage
.insert_master_expiration_date_signatures(&sigs)
.await?;
Ok(sigs)
})
.await
}
pub async fn ecash_clients(
&self,
epoch_id: EpochId,
) -> Result<RwLockReadGuard<'_, Vec<EcashApiClient>>, CredentialProxyError> {
self.inner
.ecash_state
.epoch_clients
.get_or_init(epoch_id, || async {
Ok(self
.query_chain()
.await
.get_all_verification_key_shares(epoch_id)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<anyhow::Result<Vec<_>, EcashApiError>>()?)
})
.await
}
pub async fn ecash_threshold(&self, epoch_id: EpochId) -> Result<u64, CredentialProxyError> {
self.inner
.ecash_state
.threshold_values
.get_or_init(epoch_id, || async {
if let Some(threshold) = self
.query_chain()
.await
.get_epoch_threshold(epoch_id)
.await?
{
Ok(threshold)
} else {
Err(CredentialProxyError::UnavailableThreshold { epoch_id })
}
})
.await
.map(|t| *t)
}
}
struct CredentialProxyStateInner {
storage: CredentialProxyStorage,
client: ChainClient,
deposits_buffer: DepositsBuffer,
ecash_state: EcashState,
}
@@ -0,0 +1,126 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::CredentialProxyError;
use crate::helpers::LockTimer;
use nym_ecash_contract_common::msg::ExecuteMsg;
use nym_validator_client::nyxd::contract_traits::NymContractsProvider;
use nym_validator_client::nyxd::cosmwasm_client::types::ExecuteResult;
use nym_validator_client::nyxd::{Coin, CosmWasmClient, NyxdClient};
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::{instrument, warn};
#[derive(Clone)]
pub struct ChainClient(Arc<RwLock<DirectSigningHttpRpcNyxdClient>>);
impl ChainClient {
pub fn new(mnemonic: bip39::Mnemonic) -> Result<Self, CredentialProxyError> {
let network_details = nym_network_defaults::NymNetworkDetails::new_from_env();
let client_config = nyxd::Config::try_from_nym_network_details(&network_details)?;
let nyxd_url = network_details
.endpoints
.first()
.ok_or_else(|| CredentialProxyError::NoNyxEndpointsAvailable)?
.nyxd_url
.as_str();
let client = NyxdClient::connect_with_mnemonic(client_config, nyxd_url, mnemonic)?;
if client.ecash_contract_address().is_none() {
return Err(CredentialProxyError::UnavailableEcashContract);
}
if client.dkg_contract_address().is_none() {
return Err(CredentialProxyError::UnavailableDKGContract);
}
Ok(ChainClient(Arc::new(RwLock::new(client))))
}
pub async fn query_chain(&self) -> ChainReadPermit<'_> {
let _acquire_timer = LockTimer::new("acquire chain query permit");
self.0.read().await
}
pub async fn start_chain_tx(&self) -> ChainWritePermit<'_> {
let _acquire_timer = LockTimer::new("acquire exclusive chain write permit");
ChainWritePermit {
lock_timer: LockTimer::new("exclusive chain access permit"),
inner: self.0.write().await,
}
}
}
pub type ChainReadPermit<'a> = RwLockReadGuard<'a, DirectSigningHttpRpcNyxdClient>;
// explicitly wrap the WriteGuard for extra information regarding time taken
pub struct ChainWritePermit<'a> {
// it's not really dead, we only care about it being dropped
#[allow(dead_code)]
lock_timer: LockTimer,
inner: RwLockWriteGuard<'a, DirectSigningHttpRpcNyxdClient>,
}
impl ChainWritePermit<'_> {
#[instrument(skip(self, short_sha, info), err(Display))]
pub async fn make_deposits(
self,
short_sha: &'static str,
info: Vec<(String, Coin)>,
) -> Result<ExecuteResult, CredentialProxyError> {
let address = self.inner.address();
let starting_sequence = self.inner.get_sequence(&address).await?.sequence;
let deposits = info.len();
let ecash_contract = self
.inner
.ecash_contract_address()
.ok_or(CredentialProxyError::UnavailableEcashContract)?;
let deposit_messages = info
.into_iter()
.map(|(identity_key, amount)| {
(
ExecuteMsg::DepositTicketBookFunds { identity_key },
vec![amount],
)
})
.collect::<Vec<_>>();
let res = self
.inner
.execute_multiple(
ecash_contract,
deposit_messages,
None,
format!("cp-{short_sha}: performing {deposits} deposits"),
)
.await?;
loop {
let updated_sequence = self.inner.get_sequence(&address).await?.sequence;
if updated_sequence > starting_sequence {
break;
}
warn!("wrong sequence number... waiting before releasing chain lock");
tokio::time::sleep(Duration::from_millis(50)).await;
}
Ok(res)
}
}
impl Deref for ChainWritePermit<'_> {
type Target = DirectSigningHttpRpcNyxdClient;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
@@ -2,14 +2,14 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::CredentialProxyError;
use crate::http::state::ChainClient;
use crate::shared_state::nyxd_client::ChainClient;
use nym_validator_client::nyxd::contract_traits::EcashQueryClient;
use nym_validator_client::nyxd::Coin;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::RwLock;
pub(crate) struct CachedDeposit {
pub struct CachedDeposit {
valid_until: OffsetDateTime,
required_amount: Coin,
}
@@ -40,12 +40,12 @@ impl Default for CachedDeposit {
}
#[derive(Clone, Default)]
pub(crate) struct RequiredDepositCache {
pub struct RequiredDepositCache {
inner: Arc<RwLock<CachedDeposit>>,
}
impl RequiredDepositCache {
pub(crate) async fn get_or_update(
pub async fn get_or_update(
&self,
chain_client: &ChainClient,
) -> Result<Coin, CredentialProxyError> {
@@ -1,7 +1,6 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::credentials::ticketbook::NodeId;
use crate::deposits_buffer::helpers::{BufferedDeposit, PerformedDeposits};
use crate::error::CredentialProxyError;
use crate::storage::manager::SqliteStorageManager;
@@ -26,6 +25,10 @@ use uuid::Uuid;
mod manager;
pub mod models;
pub(crate) mod pruner;
// TODO: proper import
type NodeId = u64;
#[derive(Clone)]
pub struct CredentialProxyStorage {
@@ -74,7 +77,7 @@ impl CredentialProxyStorage {
}
#[allow(dead_code)]
pub(crate) async fn load_blinded_shares_status_by_shares_id(
pub async fn load_blinded_shares_status_by_shares_id(
&self,
id: i64,
) -> Result<Option<BlindedShares>, CredentialProxyError> {
@@ -84,7 +87,7 @@ impl CredentialProxyStorage {
.await?)
}
pub(crate) async fn load_wallet_shares_by_shares_id(
pub async fn load_wallet_shares_by_shares_id(
&self,
id: i64,
) -> Result<Vec<MinimalWalletShare>, CredentialProxyError> {
@@ -94,7 +97,7 @@ impl CredentialProxyStorage {
.await?)
}
pub(crate) async fn load_shares_error_by_shares_id(
pub async fn load_shares_error_by_shares_id(
&self,
id: i64,
) -> Result<Option<String>, CredentialProxyError> {
@@ -105,7 +108,7 @@ impl CredentialProxyStorage {
}
#[allow(dead_code)]
pub(crate) async fn load_blinded_shares_status_by_device_and_credential_id(
pub async fn load_blinded_shares_status_by_device_and_credential_id(
&self,
device_id: &str,
credential_id: &str,
@@ -116,7 +119,7 @@ impl CredentialProxyStorage {
.await?)
}
pub(crate) async fn load_wallet_shares_by_device_and_credential_id(
pub async fn load_wallet_shares_by_device_and_credential_id(
&self,
device_id: &str,
credential_id: &str,
@@ -127,7 +130,7 @@ impl CredentialProxyStorage {
.await?)
}
pub(crate) async fn load_shares_error_by_device_and_credential_id(
pub async fn load_shares_error_by_device_and_credential_id(
&self,
device_id: &str,
credential_id: &str,
@@ -138,7 +141,7 @@ impl CredentialProxyStorage {
.await?)
}
pub(crate) async fn insert_new_pending_async_shares_request(
pub async fn insert_new_pending_async_shares_request(
&self,
request: Uuid,
device_id: &str,
@@ -150,7 +153,7 @@ impl CredentialProxyStorage {
.await?)
}
pub(crate) async fn update_pending_async_blinded_shares_issued(
pub async fn update_pending_async_blinded_shares_issued(
&self,
available_shares: usize,
device_id: &str,
@@ -166,7 +169,7 @@ impl CredentialProxyStorage {
.await?)
}
pub(crate) async fn update_pending_async_blinded_shares_error(
pub async fn update_pending_async_blinded_shares_error(
&self,
available_shares: usize,
device_id: &str,
@@ -184,7 +187,7 @@ impl CredentialProxyStorage {
.await?)
}
pub(crate) async fn prune_old_blinded_shares(&self) -> Result<(), CredentialProxyError> {
pub async fn prune_old_blinded_shares(&self) -> Result<(), CredentialProxyError> {
let max_age = OffsetDateTime::now_utc() - time::Duration::days(31);
self.storage_manager
@@ -199,7 +202,7 @@ impl CredentialProxyStorage {
Ok(())
}
pub(crate) async fn insert_new_deposits(
pub async fn insert_new_deposits(
&self,
deposits: &PerformedDeposits,
) -> Result<(), CredentialProxyError> {
@@ -211,9 +214,7 @@ impl CredentialProxyStorage {
Ok(())
}
pub(crate) async fn load_unused_deposits(
&self,
) -> Result<Vec<BufferedDeposit>, CredentialProxyError> {
pub async fn load_unused_deposits(&self) -> Result<Vec<BufferedDeposit>, CredentialProxyError> {
self.storage_manager
.load_unused_deposits()
.await?
@@ -222,7 +223,7 @@ impl CredentialProxyStorage {
.collect()
}
pub(crate) async fn insert_deposit_usage(
pub async fn insert_deposit_usage(
&self,
deposit_id: DepositId,
requested_on: OffsetDateTime,
@@ -240,7 +241,7 @@ impl CredentialProxyStorage {
Ok(())
}
pub(crate) async fn insert_deposit_usage_error(
pub async fn insert_deposit_usage_error(
&self,
deposit_id: DepositId,
error: String,
@@ -251,7 +252,7 @@ impl CredentialProxyStorage {
Ok(())
}
pub(crate) async fn insert_partial_wallet_share(
pub async fn insert_partial_wallet_share(
&self,
deposit_id: DepositId,
epoch_id: EpochId,
@@ -291,7 +292,7 @@ impl CredentialProxyStorage {
Ok(())
}
pub(crate) async fn get_master_verification_key(
pub async fn get_master_verification_key(
&self,
epoch_id: EpochId,
) -> Result<Option<EpochVerificationKey>, CredentialProxyError> {
@@ -309,7 +310,7 @@ impl CredentialProxyStorage {
Ok(Some(deserialised))
}
pub(crate) async fn insert_master_verification_key(
pub async fn insert_master_verification_key(
&self,
key: &EpochVerificationKey,
) -> Result<(), CredentialProxyError> {
@@ -320,7 +321,7 @@ impl CredentialProxyStorage {
.await?)
}
pub(crate) async fn get_master_coin_index_signatures(
pub async fn get_master_coin_index_signatures(
&self,
epoch_id: EpochId,
) -> Result<Option<AggregatedCoinIndicesSignatures>, CredentialProxyError> {
@@ -340,7 +341,7 @@ impl CredentialProxyStorage {
Ok(Some(deserialised))
}
pub(crate) async fn insert_master_coin_index_signatures(
pub async fn insert_master_coin_index_signatures(
&self,
signatures: &AggregatedCoinIndicesSignatures,
) -> Result<(), CredentialProxyError> {
@@ -355,7 +356,7 @@ impl CredentialProxyStorage {
Ok(())
}
pub(crate) async fn get_master_expiration_date_signatures(
pub async fn get_master_expiration_date_signatures(
&self,
expiration_date: Date,
epoch_id: EpochId,
@@ -376,7 +377,7 @@ impl CredentialProxyStorage {
Ok(Some(deserialised))
}
pub(crate) async fn insert_master_expiration_date_signatures(
pub async fn insert_master_expiration_date_signatures(
&self,
signatures: &AggregatedExpirationDateSignatures,
) -> Result<(), CredentialProxyError> {
@@ -398,7 +399,7 @@ impl CredentialProxyStorage {
#[cfg(test)]
mod tests {
use super::*;
use crate::http::helpers;
use crate::helpers::random_uuid;
use crate::storage::models::BlindedSharesStatus;
use nym_compact_ecash::scheme::keygen::KeyPairUser;
use nym_crypto::asymmetric::ed25519;
@@ -480,7 +481,7 @@ mod tests {
async fn test_add() -> anyhow::Result<()> {
let storage = get_storage().await?;
let dummy_uuid = helpers::random_uuid();
let dummy_uuid = random_uuid();
println!("🚀 insert_pending_blinded_share...");
storage.insert_dummy_used_deposit(dummy_uuid).await?;
@@ -1,5 +1,5 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::CredentialProxyStorage;
use tokio_util::sync::CancellationToken;
@@ -0,0 +1,163 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::deposits_buffer::DepositsBuffer;
use crate::error::CredentialProxyError;
use crate::quorum_checker::QuorumStateChecker;
use crate::shared_state::ecash_state::EcashState;
use crate::shared_state::nyxd_client::ChainClient;
use crate::shared_state::required_deposit_cache::RequiredDepositCache;
use crate::shared_state::CredentialProxyState;
use crate::storage::pruner::StoragePruner;
use crate::storage::CredentialProxyStorage;
use crate::webhook::ZkNymWebhook;
use nym_credentials::ecash::utils::ecash_today;
use nym_validator_client::nym_api::EpochId;
use std::future::Future;
use std::time::Duration;
use time::Date;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
mod shares_handlers;
pub mod ticketbook_handlers;
pub mod wallet_shares;
#[derive(Clone, Default)]
pub struct ShutdownTracker {
pub shutdown_token: CancellationToken,
pub tracker: TaskTracker,
}
#[derive(Clone)]
pub struct TicketbookManager {
pub(crate) state: CredentialProxyState,
pub(crate) webhook: ZkNymWebhook,
pub(crate) shutdown_tracker: ShutdownTracker,
}
impl TicketbookManager {
pub async fn new(
build_sha: &'static str,
quorum_check_interval: Duration,
deposits_buffer_size: usize,
max_concurrent_deposits: usize,
storage: CredentialProxyStorage,
mnemonic: bip39::Mnemonic,
webhook: ZkNymWebhook,
) -> Result<Self, CredentialProxyError> {
let chain_client = ChainClient::new(mnemonic)?;
let shutdown_tracker = ShutdownTracker::default();
let quorum_state_checker = QuorumStateChecker::new(
chain_client.clone(),
quorum_check_interval,
shutdown_tracker.shutdown_token.clone(),
)
.await?;
let required_deposit_cache = RequiredDepositCache::default();
let deposits_buffer = DepositsBuffer::new(
storage.clone(),
chain_client.clone(),
required_deposit_cache.clone(),
build_sha,
deposits_buffer_size,
max_concurrent_deposits,
shutdown_tracker.shutdown_token.clone(),
)
.await?;
let storage_pruner =
StoragePruner::new(shutdown_tracker.shutdown_token.clone(), storage.clone());
let this = TicketbookManager {
state: CredentialProxyState::new(
storage.clone(),
chain_client,
deposits_buffer,
EcashState::new(
required_deposit_cache,
quorum_state_checker.quorum_state_ref(),
),
),
webhook,
shutdown_tracker,
};
// since this is startup,
// might as well do all the needed network queries to establish needed global signatures
// if we don't already have them
this.build_initial_cache().await?;
// spawn the background tasks
this.try_spawn_in_background(quorum_state_checker.run_forever());
this.try_spawn_in_background(storage_pruner.run_forever());
Ok(this)
}
async fn build_initial_cache(&self) -> Result<(), CredentialProxyError> {
let today = ecash_today().date();
let epoch_id = self.state.current_epoch_id().await?;
let _ = self.state.deposit_amount().await?;
let _ = self.state.master_verification_key(Some(epoch_id)).await?;
let _ = self.state.ecash_threshold(epoch_id).await?;
let _ = self.state.ecash_clients(epoch_id).await?;
let _ = self
.state
.master_coin_index_signatures(Some(epoch_id))
.await?;
let _ = self
.state
.master_expiration_date_signatures(epoch_id, today)
.await?;
Ok(())
}
pub async fn cancel_and_wait(&self) {
self.shutdown_tracker.shutdown_token.cancel();
self.state.deposits_buffer().wait_for_shutdown().await;
self.shutdown_tracker.tracker.wait().await
}
pub fn shutdown_token(&self) -> CancellationToken {
self.shutdown_tracker.shutdown_token.clone()
}
/// Ensure the required global data for the specified epoch and expiration date exists in our cache (and storage)
async fn ensure_global_data_cached(
&self,
epoch: EpochId,
expiration_date: Date,
) -> Result<(), CredentialProxyError> {
let _ = self.state.master_verification_key(Some(epoch)).await?;
let _ = self.state.master_coin_index_signatures(Some(epoch)).await?;
let _ = self
.state
.master_expiration_date_signatures(epoch, expiration_date)
.await?;
Ok(())
}
pub fn try_spawn_in_background<F>(&self, task: F) -> Option<JoinHandle<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
// don't spawn new task if we've received cancellation token
if self.shutdown_tracker.shutdown_token.is_cancelled() {
None
} else {
self.shutdown_tracker.tracker.reopen();
// TODO: later use a task queue since most requests will be blocked waiting on chain permit anyway
let join_handle = self.shutdown_tracker.tracker.spawn(task);
self.shutdown_tracker.tracker.close();
Some(join_handle)
}
}
}
@@ -0,0 +1,145 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::CredentialProxyError;
use crate::storage::models::MinimalWalletShare;
use crate::ticketbook_manager::TicketbookManager;
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
GlobalDataParams, TicketbookWalletSharesResponse,
};
use nym_validator_client::nym_api::EpochId;
use tracing::{debug, span, Instrument, Level};
use uuid::Uuid;
impl TicketbookManager {
async fn shares_to_response(
&self,
shares: Vec<MinimalWalletShare>,
params: GlobalDataParams,
) -> Result<TicketbookWalletSharesResponse, CredentialProxyError> {
// in all calls we ensured the shares are non-empty
#[allow(clippy::unwrap_used)]
let first = shares.first().unwrap();
let expiration_date = first.expiration_date;
let epoch_id = first.epoch_id as EpochId;
let threshold = self.state.ecash_threshold(epoch_id).await?;
if shares.len() < threshold as usize {
return Err(CredentialProxyError::InsufficientNumberOfCredentials {
available: shares.len(),
threshold,
});
}
// grab any requested additional data
let (
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
) = self
.state
.global_data(params, epoch_id, expiration_date)
.await?;
// finally produce a response
Ok(TicketbookWalletSharesResponse {
epoch_id,
shares: shares.into_iter().map(Into::into).collect(),
master_verification_key,
aggregated_coin_index_signatures,
aggregated_expiration_date_signatures,
})
}
/// Query by id for blinded shares of a bandwidth voucher
pub async fn query_for_shares_by_id(
&self,
uuid: Uuid,
params: GlobalDataParams,
share_id: i64,
) -> Result<TicketbookWalletSharesResponse, CredentialProxyError> {
let span = span!(Level::INFO, "query shares by id", uuid = %uuid, share_id = %share_id);
async move {
debug!("");
// TODO: edge case: this will **NOT** work if shares got created in epoch X,
// but this query happened in epoch X+1
let shares = self
.state
.storage()
.load_wallet_shares_by_shares_id(share_id)
.await?;
if shares.is_empty() {
debug!("shares not found");
// check for explicit error
if let Some(error_message) = self
.state
.storage()
.load_shares_error_by_shares_id(share_id)
.await?
{
return Err(CredentialProxyError::ShareByIdLoadError {
message: error_message,
id: share_id,
});
}
return Err(CredentialProxyError::SharesByIdNotFound { id: share_id });
}
self.shares_to_response(shares, params).await
}
.instrument(span)
.await
}
/// Query by id for blinded wallet shares of a ticketbook
pub async fn query_for_shares_by_device_id_and_credential_id(
&self,
uuid: Uuid,
params: GlobalDataParams,
device_id: String,
credential_id: String,
) -> Result<TicketbookWalletSharesResponse, CredentialProxyError> {
let span = span!(Level::INFO, "query shares by device and credential ids", uuid = %uuid, device_id = %device_id, credential_id = %credential_id);
async move {
debug!("");
// TODO: edge case: this will **NOT** work if shares got created in epoch X,
// but this query happened in epoch X+1
let shares = self
.state
.storage()
.load_wallet_shares_by_device_and_credential_id(&device_id, &credential_id)
.await?;
if shares.is_empty() {
debug!("shares not found");
// check for explicit error
if let Some(error_message) = self
.state
.storage()
.load_shares_error_by_device_and_credential_id(&device_id, &credential_id)
.await?
{
return Err(CredentialProxyError::ShareByDeviceLoadError {
message: error_message,
device_id,
credential_id,
});
}
return Err(CredentialProxyError::SharesByDeviceNotFound {
device_id,
credential_id,
});
}
self.shares_to_response(shares, params).await
}
.instrument(span)
.await
}
}
@@ -0,0 +1,164 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::CredentialProxyError;
use crate::nym_api_helpers::ensure_sane_expiration_date;
use crate::ticketbook_manager::TicketbookManager;
use nym_compact_ecash::Base58;
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
CurrentEpochResponse, DepositResponse, GlobalDataParams, MasterVerificationKeyResponse,
PartialVerificationKey, PartialVerificationKeysResponse, TicketbookAsyncRequest,
TicketbookObtainParams, TicketbookRequest, TicketbookWalletSharesAsyncResponse,
TicketbookWalletSharesResponse,
};
use time::OffsetDateTime;
use tracing::{error, info, span, warn, Instrument, Level};
use uuid::Uuid;
impl TicketbookManager {
pub async fn obtain_ticketbook_shares(
&self,
uuid: Uuid,
request: TicketbookRequest,
params: GlobalDataParams,
) -> Result<TicketbookWalletSharesResponse, CredentialProxyError> {
let requested_on = OffsetDateTime::now_utc();
let span = span!(Level::INFO, "obtain ticketboook", uuid = %uuid);
async move {
info!("");
self.state.ensure_credentials_issuable().await?;
let epoch_id = self.state.current_epoch_id().await?;
ensure_sane_expiration_date(request.expiration_date)?;
// if additional data was requested, grab them first in case there are any cache/network issues
let (
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
) = self
.state
.global_data(params, epoch_id, request.expiration_date)
.await?;
let shares = self
.try_obtain_wallet_shares(uuid, requested_on, request)
.await
.inspect_err(|err| warn!("shares request failure: {err}"))?;
info!("request was successful!");
Ok(TicketbookWalletSharesResponse {
epoch_id,
shares,
master_verification_key,
aggregated_coin_index_signatures,
aggregated_expiration_date_signatures,
})
}
.instrument(span)
.await
}
pub async fn obtain_ticketbook_shares_async(
&self,
uuid: Uuid,
request: TicketbookAsyncRequest,
params: TicketbookObtainParams,
) -> Result<TicketbookWalletSharesAsyncResponse, CredentialProxyError> {
let requested_on = OffsetDateTime::now_utc();
let span = span!(Level::INFO, "[async] obtain ticketboook", uuid = %uuid);
async move {
info!("");
// 1. perform basic validation
self.state.ensure_credentials_issuable().await?;
ensure_sane_expiration_date(request.inner.expiration_date)?;
// 2. store the request to retrieve the id
let pending = self
.state
.storage()
.insert_new_pending_async_shares_request(
uuid,
&request.device_id,
&request.credential_id,
)
.await
.inspect_err(|err| error!("failed to insert new pending async shares: {err}"))?;
let id = pending.id;
// 3. try to spawn a new task attempting to resolve the request
let this = self.clone();
if self
.try_spawn_in_background(async move {
this.try_obtain_blinded_ticketbook_async(
uuid,
requested_on,
request,
params,
pending,
)
.await
})
.is_none()
{
warn!("could not start async ticketbook issuance due to shutdown in progress");
return Err(CredentialProxyError::ShutdownInProgress);
}
// 4. in the meantime, return the id to the user
Ok(TicketbookWalletSharesAsyncResponse { id, uuid })
}
.instrument(span)
.await
}
pub async fn current_deposit(&self) -> Result<DepositResponse, CredentialProxyError> {
let current_deposit = self.state.deposit_amount().await?;
Ok(DepositResponse {
current_deposit_amount: current_deposit.amount,
current_deposit_denom: current_deposit.denom,
})
}
pub async fn partial_verification_keys(
&self,
) -> Result<PartialVerificationKeysResponse, CredentialProxyError> {
self.state.ensure_credentials_issuable().await?;
let epoch_id = self.state.current_epoch_id().await?;
let signers = self.state.ecash_clients(epoch_id).await?;
Ok(PartialVerificationKeysResponse {
epoch_id,
keys: signers
.iter()
.map(|signer| PartialVerificationKey {
node_index: signer.node_id,
bs58_encoded_key: signer.verification_key.to_bs58(),
})
.collect(),
})
}
pub async fn master_verification_key(
&self,
) -> Result<MasterVerificationKeyResponse, CredentialProxyError> {
self.state.ensure_credentials_issuable().await?;
let epoch_id = self.state.current_epoch_id().await?;
let key = self.state.master_verification_key(Some(epoch_id)).await?;
Ok(MasterVerificationKeyResponse {
epoch_id,
bs58_encoded_key: key.to_bs58(),
})
}
pub async fn current_epoch(&self) -> Result<CurrentEpochResponse, CredentialProxyError> {
self.state.ensure_credentials_issuable().await?;
let epoch_id = self.state.current_epoch_id().await?;
Ok(CurrentEpochResponse { epoch_id })
}
}
@@ -0,0 +1,343 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::CredentialProxyError;
use crate::storage::models::BlindedShares;
use crate::ticketbook_manager::TicketbookManager;
use futures::{stream, StreamExt};
use nym_compact_ecash::Base58;
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
TicketbookAsyncRequest, TicketbookObtainParams, TicketbookRequest,
TicketbookWalletSharesResponse, WalletShare, WebhookTicketbookWalletShares,
WebhookTicketbookWalletSharesRequest,
};
use nym_validator_client::ecash::BlindSignRequestBody;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tracing::{debug, error, info, instrument};
use uuid::Uuid;
impl TicketbookManager {
#[instrument(
skip(self, request_data, request, requested_on),
fields(
expiration_date = %request_data.expiration_date,
ticketbook_type = %request_data.ticketbook_type
)
)]
pub async fn try_obtain_wallet_shares(
&self,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookRequest,
) -> Result<Vec<WalletShare>, CredentialProxyError> {
// don't proceed if we don't have quorum available as the request will definitely fail
if !self.state.ecash_state().quorum_state.available() {
return Err(CredentialProxyError::UnavailableSigningQuorum);
}
let epoch = self.state.current_epoch_id().await?;
let threshold = self.state.ecash_threshold(epoch).await?;
let expiration_date = request_data.expiration_date;
// before we commit to making the deposit, ensure we have required signatures cached and stored
self.ensure_global_data_cached(epoch, expiration_date)
.await?;
let ecash_api_clients = self.state.ecash_clients(epoch).await?.clone();
let deposit_data = self
.state
.get_deposit(request, requested_on, request_data.ecash_pubkey)
.await?;
let deposit_id = deposit_data.deposit_id;
let signature = deposit_data.sign_ticketbook_plaintext(&request_data.withdrawal_request);
let credential_request = BlindSignRequestBody::new(
request_data.withdrawal_request.into(),
deposit_id,
signature,
request_data.ecash_pubkey,
request_data.expiration_date,
request_data.ticketbook_type,
);
let wallet_shares = Arc::new(Mutex::new(HashMap::new()));
info!("attempting to contract all nym-apis for the partial wallets...");
stream::iter(ecash_api_clients)
.for_each_concurrent(None, |client| async {
// move the client into the block
let client = client;
debug!("contacting {client} for blinded partial wallet");
let res = timeout(
Duration::from_secs(5),
client.api_client.blind_sign(&credential_request),
)
.await
.map_err(|_| CredentialProxyError::EcashApiRequestTimeout {
client_repr: client.to_string(),
})
.and_then(|res| res.map_err(Into::into));
// 1. try to store it
if let Err(err) = self
.state
.storage()
.insert_partial_wallet_share(
deposit_id,
epoch,
expiration_date,
client.node_id,
&res,
)
.await
{
error!("failed to persist issued partial share: {err}")
}
// 2. add it to the map
match res {
Ok(share) => {
wallet_shares
.lock()
.await
.insert(client.node_id, share.blinded_signature);
}
Err(err) => {
error!("failed to obtain partial blinded wallet share from {client}: {err}")
}
}
})
.await;
// SAFETY: the futures have completed, so we MUST have the only arc reference
#[allow(clippy::unwrap_used)]
let wallet_shares = Arc::into_inner(wallet_shares).unwrap().into_inner();
let shares = wallet_shares.len();
if shares < threshold as usize {
let err = CredentialProxyError::InsufficientNumberOfCredentials {
available: shares,
threshold,
};
self.state
.insert_deposit_usage_error(deposit_id, err.to_string())
.await;
return Err(err);
}
Ok(wallet_shares
.into_iter()
.map(|(node_index, share)| WalletShare {
node_index,
bs58_encoded_share: share.to_bs58(),
})
.collect())
}
pub async fn try_obtain_wallet_shares_async(
&self,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookRequest,
device_id: &str,
credential_id: &str,
) -> Result<Vec<WalletShare>, CredentialProxyError> {
let shares = match self
.try_obtain_wallet_shares(request, requested_on, request_data)
.await
{
Ok(shares) => shares,
Err(err) => {
let obtained = match err {
CredentialProxyError::InsufficientNumberOfCredentials { available, .. } => {
available
}
_ => 0,
};
// currently there's no retry mechanisms, but, who knows, that might change
if let Err(err) = self
.state
.storage()
.update_pending_async_blinded_shares_error(
obtained,
device_id,
credential_id,
&err.to_string(),
)
.await
{
error!("failed to update database with the error information: {err}")
}
return Err(err);
}
};
Ok(shares)
}
async fn try_obtain_blinded_ticketbook_async_inner(
&self,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookAsyncRequest,
params: TicketbookObtainParams,
pending: &BlindedShares,
) -> Result<(), CredentialProxyError> {
let epoch_id = self.state.current_epoch_id().await?;
let device_id = &request_data.device_id;
let credential_id = &request_data.credential_id;
let secret = request_data.secret.clone();
// 1. try to obtain global data
let (
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
) = self
.state
.global_data(params.global, epoch_id, request_data.inner.expiration_date)
.await?;
// 2. try to obtain shares (failures are written to the DB)
let shares = self
.try_obtain_wallet_shares_async(
request,
requested_on,
request_data.inner,
device_id,
credential_id,
)
.await?;
// 3. update the storage, if possible
// (as long as we can trigger webhook, we should still be good)
if let Err(err) = self
.state
.storage()
.update_pending_async_blinded_shares_issued(shares.len(), device_id, credential_id)
.await
{
error!(uuid = %request, "failed to update db with issued information: {err}")
}
// 4. build the webhook request body
let data = Some(TicketbookWalletSharesResponse {
epoch_id,
shares,
master_verification_key,
aggregated_coin_index_signatures,
aggregated_expiration_date_signatures,
});
let ticketbook_wallet_shares = WebhookTicketbookWalletShares {
id: pending.id,
status: pending.status.to_string(),
device_id: device_id.clone(),
credential_id: credential_id.clone(),
data,
error_message: None,
created: pending.created,
updated: pending.updated,
};
let webhook_request = WebhookTicketbookWalletSharesRequest {
ticketbook_wallet_shares,
secret,
};
// 5. call the webhook
self.webhook.try_trigger(request, &webhook_request).await;
Ok(())
}
async fn try_trigger_webhook_request_for_error(
&self,
request: Uuid,
request_data: TicketbookAsyncRequest,
pending: &BlindedShares,
error_message: String,
) -> Result<(), CredentialProxyError> {
let device_id = &request_data.device_id;
let credential_id = &request_data.credential_id;
let secret = request_data.secret.clone();
let ticketbook_wallet_shares = WebhookTicketbookWalletShares {
id: pending.id,
status: "error".to_string(),
device_id: device_id.clone(),
credential_id: credential_id.clone(),
data: None,
error_message: Some(error_message),
created: pending.created,
updated: pending.updated,
};
let webhook_request = WebhookTicketbookWalletSharesRequest {
ticketbook_wallet_shares,
secret,
};
self.webhook.try_trigger(request, &webhook_request).await;
Ok(())
}
#[instrument(
skip_all,
fields(
credential_id = %request_data.credential_id,
device_id = %request_data.device_id)
)
]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn try_obtain_blinded_ticketbook_async(
&self,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookAsyncRequest,
params: TicketbookObtainParams,
pending: BlindedShares,
) {
let skip_webhook = params.skip_webhook;
if let Err(err) = self
.try_obtain_blinded_ticketbook_async_inner(
request,
requested_on,
request_data.clone(),
params,
&pending,
)
.await
{
if skip_webhook {
info!(uuid = %request,"the webhook is not going to be called for this request");
return;
}
// post to the webhook to notify of errors on this side
if let Err(webhook_err) = self
.try_trigger_webhook_request_for_error(
request,
request_data,
&pending,
format!("Failed to get ticketbook: {err}"),
)
.await
{
error!(uuid = %request, "failed to make webhook request to report error: {webhook_err}")
}
error!(uuid = %request, "failed to resolve the blinded ticketbook issuance: {err}")
} else {
info!(uuid = %request, "managed to resolve the blinded ticketbook issuance")
}
}
}
@@ -1,57 +1,34 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::CredentialProxyError;
use clap::Args;
use reqwest::header::AUTHORIZATION;
use serde::Serialize;
use tracing::{debug, error, instrument, span, Instrument, Level};
use url::Url;
use uuid::Uuid;
#[derive(Args, Debug, Clone)]
pub struct ZkNymWebHookConfig {
#[clap(long, env = "WEBHOOK_ZK_NYMS_URL")]
pub webhook_url: Url,
#[derive(Debug, Clone)]
pub struct ZkNymWebhook {
pub webhook_client_url: Url,
#[clap(long, env = "WEBHOOK_ZK_NYMS_CLIENT_ID")]
pub webhook_client_id: String,
#[clap(long, env = "WEBHOOK_ZK_NYMS_CLIENT_SECRET")]
pub webhook_client_secret: String,
}
impl ZkNymWebHookConfig {
pub fn ensure_valid_client_url(&self) -> Result<(), CredentialProxyError> {
self.client_url()
.map_err(|_| CredentialProxyError::InvalidWebhookUrl)
.map(|_| ())
}
fn client_url(&self) -> Result<Url, url::ParseError> {
self.webhook_url.join(&self.webhook_client_id)
}
fn unchecked_client_url(&self) -> Url {
// we ensured we have valid url on startup
#[allow(clippy::unwrap_used)]
self.client_url().unwrap()
}
impl ZkNymWebhook {
fn bearer_token(&self) -> String {
format!("Bearer {}", self.webhook_client_secret)
}
#[instrument(skip_all)]
pub async fn try_trigger<T: Serialize + ?Sized>(&self, original_uuid: Uuid, payload: &T) {
let url = self.unchecked_client_url();
let url = self.webhook_client_url.clone();
let span = span!(Level::DEBUG, "webhook", uuid = %original_uuid, url = %url);
async move {
debug!("🕸️ about to trigger the webhook");
match reqwest::Client::new()
.post(url.clone())
.post(url)
.header(AUTHORIZATION, self.bearer_token())
.json(payload)
.send()
+1 -1
View File
@@ -744,7 +744,7 @@ version = "0.1.0"
dependencies = [
"cosmwasm-std",
"quote",
"syn 1.0.109",
"syn 2.0.98",
]
[[package]]
@@ -37,5 +37,5 @@ features = ["tokio"]
[features]
default = ["query-types"]
query-types = ["nym-http-api-common", "nym-http-api-common/output"]
openapi = ["utoipa"]
openapi = ["utoipa", "nym-http-api-common/utoipa"]
tsify = ["dep:tsify", "wasm-bindgen"]
@@ -268,12 +268,20 @@ pub struct WebhookTicketbookWalletSharesRequest {
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema, utoipa::IntoParams))]
#[cfg(feature = "query-types")]
#[serde(default, rename_all = "kebab-case")]
pub struct TicketbookObtainQueryParams {
pub output: Option<Output>,
pub struct TicketbookObtainParams {
#[serde(default)]
pub skip_webhook: bool,
#[serde(default)]
#[serde(flatten)]
pub global: GlobalDataParams,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema, utoipa::IntoParams))]
#[cfg(feature = "query-types")]
#[serde(default, rename_all = "kebab-case")]
pub struct GlobalDataParams {
pub include_master_verification_key: bool,
pub include_coin_index_signatures: bool,
@@ -281,6 +289,18 @@ pub struct TicketbookObtainQueryParams {
pub include_expiration_date_signatures: bool,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema, utoipa::IntoParams))]
#[cfg(feature = "query-types")]
#[serde(default, rename_all = "kebab-case")]
pub struct TicketbookObtainQueryParams {
pub output: Option<Output>,
#[serde(default)]
#[serde(flatten)]
pub obtain_params: TicketbookObtainParams,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema, utoipa::IntoParams))]
#[cfg(feature = "query-types")]
@@ -288,9 +308,7 @@ pub struct TicketbookObtainQueryParams {
pub struct SharesQueryParams {
pub output: Option<Output>,
pub include_master_verification_key: bool,
pub include_coin_index_signatures: bool,
pub include_expiration_date_signatures: bool,
#[serde(default)]
#[serde(flatten)]
pub global: GlobalDataParams,
}
@@ -53,6 +53,8 @@ nym-network-defaults = { path = "../../common/network-defaults" }
nym-credential-proxy-requests = { path = "../nym-credential-proxy-requests", features = ["openapi"] }
nym-ecash-signer-check = { path = "../../common/ecash-signer-check" }
nym-credential-proxy-lib = { path = "../../common/credential-proxy" }
[dev-dependencies]
tempfile = { workspace = true }
@@ -2,16 +2,18 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::default_database_filepath;
use crate::webhook::ZkNymWebHookConfig;
use clap::builder::ArgPredicate;
use clap::Parser;
use clap::{Args, Parser};
use nym_bin_common::bin_info;
use nym_credential_proxy_lib::error::CredentialProxyError;
use nym_credential_proxy_lib::webhook::ZkNymWebhook;
use std::fs::create_dir_all;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::OnceLock;
use std::time::Duration;
use tracing::info;
use url::Url;
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
@@ -86,6 +88,32 @@ pub struct Cli {
pub(crate) persistent_storage_path: Option<PathBuf>,
}
#[derive(Args, Debug, Clone)]
pub struct ZkNymWebHookConfig {
#[clap(long, env = "WEBHOOK_ZK_NYMS_URL")]
pub webhook_url: Url,
#[clap(long, env = "WEBHOOK_ZK_NYMS_CLIENT_ID")]
pub webhook_client_id: String,
#[clap(long, env = "WEBHOOK_ZK_NYMS_CLIENT_SECRET")]
pub webhook_client_secret: String,
}
impl TryFrom<ZkNymWebHookConfig> for ZkNymWebhook {
type Error = CredentialProxyError;
fn try_from(cfg: ZkNymWebHookConfig) -> Result<Self, Self::Error> {
Ok(ZkNymWebhook {
webhook_client_url: cfg
.webhook_url
.join(&cfg.webhook_client_id)
.map_err(|_| CredentialProxyError::InvalidWebhookUrl)?,
webhook_client_secret: cfg.webhook_client_secret,
})
}
}
impl Cli {
pub fn bind_address(&self) -> SocketAddr {
// SAFETY:
@@ -1,4 +0,0 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod ticketbook;
@@ -1,346 +0,0 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::CredentialProxyError;
use crate::http::state::ApiState;
use crate::storage::models::BlindedShares;
use futures::{stream, StreamExt};
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
TicketbookAsyncRequest, TicketbookObtainQueryParams, TicketbookRequest,
TicketbookWalletSharesResponse, WalletShare, WebhookTicketbookWalletShares,
WebhookTicketbookWalletSharesRequest,
};
use nym_credentials_interface::Base58;
use nym_validator_client::ecash::BlindSignRequestBody;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tracing::{debug, error, info, instrument};
use uuid::Uuid;
// use the same type alias as our contract without importing the whole thing just for this single line
pub type NodeId = u64;
#[instrument(
skip(state, request_data, request, requested_on),
fields(
expiration_date = %request_data.expiration_date,
ticketbook_type = %request_data.ticketbook_type
)
)]
pub(crate) async fn try_obtain_wallet_shares(
state: &ApiState,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookRequest,
) -> Result<Vec<WalletShare>, CredentialProxyError> {
// don't proceed if we don't have quorum available as the request will definitely fail
if !state.quorum_available() {
return Err(CredentialProxyError::UnavailableSigningQuorum);
}
let epoch = state.current_epoch_id().await?;
let threshold = state.ecash_threshold(epoch).await?;
let expiration_date = request_data.expiration_date;
// before we commit to making the deposit, ensure we have required signatures cached and stored
let _ = state.master_verification_key(Some(epoch)).await?;
let _ = state.master_coin_index_signatures(Some(epoch)).await?;
let _ = state
.master_expiration_date_signatures(epoch, expiration_date)
.await?;
let ecash_api_clients = state.ecash_clients(epoch).await?.clone();
let deposit_data = state
.get_deposit(request, requested_on, request_data.ecash_pubkey)
.await?;
let deposit_id = deposit_data.deposit_id;
let signature = deposit_data.sign_ticketbook_plaintext(&request_data.withdrawal_request);
let credential_request = BlindSignRequestBody::new(
request_data.withdrawal_request.into(),
deposit_id,
signature,
request_data.ecash_pubkey,
request_data.expiration_date,
request_data.ticketbook_type,
);
let wallet_shares = Arc::new(Mutex::new(HashMap::new()));
info!("attempting to contract all nym-apis for the partial wallets...");
stream::iter(ecash_api_clients)
.for_each_concurrent(None, |client| async {
// move the client into the block
let client = client;
debug!("contacting {client} for blinded partial wallet");
let res = timeout(
Duration::from_secs(5),
client.api_client.blind_sign(&credential_request),
)
.await
.map_err(|_| CredentialProxyError::EcashApiRequestTimeout {
client_repr: client.to_string(),
})
.and_then(|res| res.map_err(Into::into));
// 1. try to store it
if let Err(err) = state
.storage()
.insert_partial_wallet_share(
deposit_id,
epoch,
expiration_date,
client.node_id,
&res,
)
.await
{
error!("failed to persist issued partial share: {err}")
}
// 2. add it to the map
match res {
Ok(share) => {
wallet_shares
.lock()
.await
.insert(client.node_id, share.blinded_signature);
}
Err(err) => {
error!("failed to obtain partial blinded wallet share from {client}: {err}")
}
}
})
.await;
// SAFETY: the futures have completed, so we MUST have the only arc reference
#[allow(clippy::unwrap_used)]
let wallet_shares = Arc::into_inner(wallet_shares).unwrap().into_inner();
let shares = wallet_shares.len();
if shares < threshold as usize {
let err = CredentialProxyError::InsufficientNumberOfCredentials {
available: shares,
threshold,
};
state
.insert_deposit_usage_error(deposit_id, err.to_string())
.await;
return Err(err);
}
Ok(wallet_shares
.into_iter()
.map(|(node_index, share)| WalletShare {
node_index,
bs58_encoded_share: share.to_bs58(),
})
.collect())
}
// same as try_obtain_wallet_shares, but writes failures into the db
async fn try_obtain_wallet_shares_async(
state: &ApiState,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookRequest,
device_id: &str,
credential_id: &str,
) -> Result<Vec<WalletShare>, CredentialProxyError> {
let shares = match try_obtain_wallet_shares(state, request, requested_on, request_data).await {
Ok(shares) => shares,
Err(err) => {
let obtained = match err {
CredentialProxyError::InsufficientNumberOfCredentials { available, .. } => {
available
}
_ => 0,
};
// currently there's no retry mechanisms, but, who knows, that might change
if let Err(err) = state
.storage()
.update_pending_async_blinded_shares_error(
obtained,
device_id,
credential_id,
&err.to_string(),
)
.await
{
error!("failed to update database with the error information: {err}")
}
return Err(err);
}
};
Ok(shares)
}
async fn try_obtain_blinded_ticketbook_async_inner(
state: &ApiState,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookAsyncRequest,
params: TicketbookObtainQueryParams,
pending: &BlindedShares,
) -> Result<(), CredentialProxyError> {
let epoch_id = state.current_epoch_id().await?;
let device_id = &request_data.device_id;
let credential_id = &request_data.credential_id;
let secret = request_data.secret.clone();
// 1. try to obtain global data
let (
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
) = state
.global_data(
params.include_master_verification_key,
params.include_coin_index_signatures,
params.include_expiration_date_signatures,
epoch_id,
request_data.inner.expiration_date,
)
.await?;
// 2. try to obtain shares (failures are written to the DB)
let shares = try_obtain_wallet_shares_async(
state,
request,
requested_on,
request_data.inner,
device_id,
credential_id,
)
.await?;
// 3. update the storage, if possible
// (as long as we can trigger webhook, we should still be good)
if let Err(err) = state
.storage()
.update_pending_async_blinded_shares_issued(shares.len(), device_id, credential_id)
.await
{
error!(uuid = %request, "failed to update db with issued information: {err}")
}
// 4. build the webhook request body
let data = Some(TicketbookWalletSharesResponse {
epoch_id,
shares,
master_verification_key,
aggregated_coin_index_signatures,
aggregated_expiration_date_signatures,
});
let ticketbook_wallet_shares = WebhookTicketbookWalletShares {
id: pending.id,
status: pending.status.to_string(),
device_id: device_id.clone(),
credential_id: credential_id.clone(),
data,
error_message: None,
created: pending.created,
updated: pending.updated,
};
let webhook_request = WebhookTicketbookWalletSharesRequest {
ticketbook_wallet_shares,
secret,
};
// 5. call the webhook
state
.zk_nym_web_hook()
.try_trigger(request, &webhook_request)
.await;
Ok(())
}
async fn try_trigger_webhook_request_for_error(
state: &ApiState,
request: Uuid,
request_data: TicketbookAsyncRequest,
pending: &BlindedShares,
error_message: String,
) -> Result<(), CredentialProxyError> {
let device_id = &request_data.device_id;
let credential_id = &request_data.credential_id;
let secret = request_data.secret.clone();
let ticketbook_wallet_shares = WebhookTicketbookWalletShares {
id: pending.id,
status: "error".to_string(),
device_id: device_id.clone(),
credential_id: credential_id.clone(),
data: None,
error_message: Some(error_message),
created: pending.created,
updated: pending.updated,
};
let webhook_request = WebhookTicketbookWalletSharesRequest {
ticketbook_wallet_shares,
secret,
};
state
.zk_nym_web_hook()
.try_trigger(request, &webhook_request)
.await;
Ok(())
}
#[instrument(skip_all, fields(credential_id = %request_data.credential_id, device_id = %request_data.device_id))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn try_obtain_blinded_ticketbook_async(
state: ApiState,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookAsyncRequest,
params: TicketbookObtainQueryParams,
pending: BlindedShares,
) {
let skip_webhook = params.skip_webhook;
if let Err(err) = try_obtain_blinded_ticketbook_async_inner(
&state,
request,
requested_on,
request_data.clone(),
params,
&pending,
)
.await
{
if skip_webhook {
info!(uuid = %request,"the webhook is not going to be called for this request");
return;
}
// post to the webhook to notify of errors on this side
if let Err(webhook_err) = try_trigger_webhook_request_for_error(
&state,
request,
request_data,
&pending,
format!("Failed to get ticketbook: {err}"),
)
.await
{
error!(uuid = %request, "failed to make webhook request to report error: {webhook_err}")
}
error!(uuid = %request, "failed to resolve the blinded ticketbook issuance: {err}")
} else {
info!(uuid = %request, "managed to resolve the blinded ticketbook issuance")
}
}
@@ -1,61 +1,12 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::{cli::Cli, http::HttpServer};
use nym_bin_common::bin_info;
use time::OffsetDateTime;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::deposits_buffer::DepositsBuffer;
use crate::http::state::required_deposit_cache::RequiredDepositCache;
use crate::quorum_checker::QuorumStateChecker;
use crate::{
cli::Cli,
error::CredentialProxyError,
http::{
state::{ApiState, ChainClient},
HttpServer,
},
storage::CredentialProxyStorage,
tasks::StoragePruner,
};
pub struct LockTimer {
created: OffsetDateTime,
message: String,
}
impl LockTimer {
pub fn new<S: Into<String>>(message: S) -> Self {
LockTimer {
message: message.into(),
..Default::default()
}
}
}
impl Drop for LockTimer {
fn drop(&mut self) {
let time_taken = OffsetDateTime::now_utc() - self.created;
let time_taken_formatted = humantime::format_duration(time_taken.unsigned_abs());
if time_taken > time::Duration::SECOND * 10 {
warn!(time_taken = %time_taken_formatted, "{}", self.message)
} else if time_taken > time::Duration::SECOND * 5 {
info!(time_taken = %time_taken_formatted, "{}", self.message)
} else {
debug!(time_taken = %time_taken_formatted, "{}", self.message)
};
}
}
impl Default for LockTimer {
fn default() -> Self {
LockTimer {
created: OffsetDateTime::now_utc(),
message: "released the lock".to_string(),
}
}
}
use nym_credential_proxy_lib::error::CredentialProxyError;
use nym_credential_proxy_lib::storage::CredentialProxyStorage;
use nym_credential_proxy_lib::ticketbook_manager::TicketbookManager;
use tracing::{error, info};
pub async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
@@ -95,66 +46,33 @@ fn build_sha_short() -> &'static str {
}
pub(crate) async fn run_api(cli: Cli) -> Result<(), CredentialProxyError> {
// create the tasks
let bind_address = cli.bind_address();
let storage = CredentialProxyStorage::init(cli.persistent_storage_path()).await?;
let mnemonic = cli.mnemonic;
let auth_token = cli.http_auth_token;
let webhook_cfg = cli.webhook;
let chain_client = ChainClient::new(mnemonic)?;
let cancellation_token = CancellationToken::new();
let required_deposit_cache = RequiredDepositCache::default();
let quorum_state_checker = QuorumStateChecker::new(
chain_client.clone(),
cli.quorum_check_interval,
cancellation_token.clone(),
)
.await?;
let quorum_state = quorum_state_checker.quorum_state_ref();
let deposits_buffer = DepositsBuffer::new(
storage.clone(),
chain_client.clone(),
required_deposit_cache.clone(),
let ticketbook_manager = TicketbookManager::new(
build_sha_short(),
cli.quorum_check_interval,
cli.deposits_buffer_size,
cli.max_concurrent_deposits,
cancellation_token.clone(),
storage,
mnemonic,
webhook_cfg.try_into()?,
)
.await?;
// let deposit_request_sender = deposit_maker.deposit_request_sender();
let api_state = ApiState::new(
storage.clone(),
quorum_state,
webhook_cfg,
chain_client,
deposits_buffer,
required_deposit_cache,
cancellation_token.clone(),
)
.await?;
let http_server = HttpServer::new(
bind_address,
api_state.clone(),
auth_token,
cancellation_token.clone(),
);
let storage_pruner = StoragePruner::new(cancellation_token, storage);
let http_server = HttpServer::new(bind_address, ticketbook_manager.clone(), auth_token);
// spawn all the tasks
api_state.try_spawn(http_server.run_forever());
api_state.try_spawn(storage_pruner.run_forever());
api_state.try_spawn(quorum_state_checker.run_forever());
// spawn the http server as a separate task / thread(-ish)
http_server.spawn_as_task();
// wait for cancel signal (SIGINT, SIGTERM or SIGQUIT)
wait_for_signal().await;
// cancel all the tasks and wait for all task to terminate
api_state.cancel_and_wait().await;
ticketbook_manager.cancel_and_wait().await;
Ok(())
}
@@ -1,26 +0,0 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::CredentialProxyError;
use crate::http::types::RequestError;
use axum::http::StatusCode;
use rand::rngs::OsRng;
use rand::RngCore;
use tracing::warn;
use uuid::Uuid;
pub fn random_uuid() -> Uuid {
let mut bytes = [0u8; 16];
let mut rng = OsRng;
rng.fill_bytes(&mut bytes);
Uuid::from_bytes(bytes)
}
pub fn db_failure<T>(err: CredentialProxyError, uuid: Uuid) -> Result<T, RequestError> {
warn!("db failure: {err}");
Err(RequestError::new_with_uuid(
format!("oh no, something went wrong {err}"),
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
))
}
@@ -1,56 +1,54 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::CredentialProxyError;
use crate::http::router::build_router;
use crate::http::state::ApiState;
use axum::Router;
use nym_credential_proxy_lib::error::CredentialProxyError;
use nym_credential_proxy_lib::ticketbook_manager::TicketbookManager;
use std::net::SocketAddr;
use tokio_util::sync::CancellationToken;
use tracing::info;
pub mod helpers;
pub mod router;
pub mod state;
pub mod types;
pub struct HttpServer {
bind_address: SocketAddr,
cancellation: CancellationToken,
router: Router,
ticketbook_manager: TicketbookManager,
auth_token: String,
}
impl HttpServer {
pub fn new(
bind_address: SocketAddr,
state: ApiState,
ticketbook_manager: TicketbookManager,
auth_token: String,
cancellation: CancellationToken,
) -> Self {
HttpServer {
bind_address,
cancellation,
router: build_router(state, auth_token),
ticketbook_manager,
auth_token,
}
}
pub async fn run_forever(self) -> Result<(), CredentialProxyError> {
let address = self.bind_address;
info!("starting the http server on http://{address}");
pub fn spawn_as_task(self) {
let cancellation = self.ticketbook_manager.shutdown_token();
let listener = tokio::net::TcpListener::bind(address)
let ticketbook_manager = self.ticketbook_manager.clone();
ticketbook_manager.try_spawn_in_background(async move {
let address = self.bind_address;
let router = build_router(self.ticketbook_manager, self.auth_token);
info!("starting the http server on http://{address}");
let listener = tokio::net::TcpListener::bind(address)
.await
.map_err(|source| CredentialProxyError::SocketBindFailure { address, source })?;
axum::serve(
listener,
router.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(async move { cancellation.cancelled().await })
.await
.map_err(|source| CredentialProxyError::SocketBindFailure { address, source })?;
let cancellation = self.cancellation;
axum::serve(
listener,
self.router
.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(async move { cancellation.cancelled().await })
.await
.map_err(|source| CredentialProxyError::HttpServerFailure { source })
.map_err(|source| CredentialProxyError::HttpServerFailure { source })
});
}
}
@@ -1,27 +1,19 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::credentials::ticketbook::{
try_obtain_blinded_ticketbook_async, try_obtain_wallet_shares,
};
use crate::http::helpers::random_uuid;
use crate::http::state::ApiState;
use crate::http::types::RequestError;
use crate::nym_api_helpers::ensure_sane_expiration_date;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Json, Router};
use nym_compact_ecash::Base58;
use nym_credential_proxy_lib::helpers::random_uuid;
use nym_credential_proxy_lib::http_helpers::RequestError;
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
CurrentEpochResponse, DepositResponse, MasterVerificationKeyResponse, PartialVerificationKey,
CurrentEpochResponse, DepositResponse, MasterVerificationKeyResponse,
PartialVerificationKeysResponse, TicketbookAsyncRequest, TicketbookObtainQueryParams,
TicketbookRequest, TicketbookWalletSharesAsyncResponse, TicketbookWalletSharesResponse,
};
use nym_credential_proxy_requests::routes::api::v1::ticketbook;
use nym_http_api_common::{FormattedResponse, OutputParams};
use time::OffsetDateTime;
use tracing::{error, info, span, warn, Instrument, Level};
pub(crate) mod shares;
@@ -68,61 +60,15 @@ pub(crate) async fn obtain_ticketbook_shares(
Json(payload): Json<TicketbookRequest>,
) -> Result<FormattedTicketbookWalletSharesResponse, RequestError> {
let uuid = random_uuid();
let requested_on = OffsetDateTime::now_utc();
let output = params.output.unwrap_or_default();
let span = span!(Level::INFO, "obtain ticketboook", uuid = %uuid);
async move {
info!("");
let response = state
.inner_state()
.obtain_ticketbook_shares(uuid, payload, params.obtain_params.global)
.await
.map_err(|err| RequestError::new_server_error(err, uuid))?;
let output = params.output.unwrap_or_default();
state.ensure_not_in_epoch_transition(Some(uuid)).await?;
let epoch_id = state
.current_epoch_id()
.await
.map_err(|err| RequestError::new_server_error(err, uuid))?;
if let Err(err) = ensure_sane_expiration_date(payload.expiration_date) {
warn!("failure due to invalid expiration date");
return Err(RequestError::new_with_uuid(
err.to_string(),
uuid,
StatusCode::BAD_REQUEST,
));
}
// if additional data was requested, grab them first in case there are any cache/network issues
let (
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
) = state
.response_global_data(
params.include_master_verification_key,
params.include_expiration_date_signatures,
params.include_coin_index_signatures,
epoch_id,
payload.expiration_date,
uuid,
)
.await?;
let shares = try_obtain_wallet_shares(&state, uuid, requested_on, payload)
.await
.inspect_err(|err| warn!("request failure: {err}"))
.map_err(|err| RequestError::new(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
info!("request was successful!");
Ok(output.to_response(TicketbookWalletSharesResponse {
epoch_id,
shares,
master_verification_key,
aggregated_coin_index_signatures,
aggregated_expiration_date_signatures,
}))
}
.instrument(span)
.await
Ok(output.to_response(response))
}
/// Attempt to obtain blinded shares of an ecash ticketbook wallet asynchronously
@@ -159,72 +105,15 @@ pub(crate) async fn obtain_ticketbook_shares_async(
Json(payload): Json<TicketbookAsyncRequest>,
) -> Result<FormattedTicketbookWalletSharesAsyncResponse, RequestError> {
let uuid = random_uuid();
let requested_on = OffsetDateTime::now_utc();
let output = params.output.unwrap_or_default();
let span = span!(Level::INFO, "[async] obtain ticketboook", uuid = %uuid);
async move {
info!("");
let output = params.output.unwrap_or_default();
let response = state
.inner_state()
.obtain_ticketbook_shares_async(uuid, payload, params.obtain_params)
.await
.map_err(|err| RequestError::new_server_error(err, uuid))?;
// 1. perform basic validation
state.ensure_not_in_epoch_transition(Some(uuid)).await?;
if let Err(err) = ensure_sane_expiration_date(payload.inner.expiration_date) {
warn!("failure due to invalid expiration date");
return Err(RequestError::new_with_uuid(
err.to_string(),
uuid,
StatusCode::BAD_REQUEST,
));
}
// 2. store the request to retrieve the id
let pending = match state
.storage()
.insert_new_pending_async_shares_request(
uuid,
&payload.device_id,
&payload.credential_id,
)
.await
{
Err(err) => {
error!("failed to insert new pending async shares: {err}");
return Err(RequestError::new_with_uuid(
err.to_string(),
uuid,
StatusCode::CONFLICT,
));
}
Ok(pending) => pending,
};
let id = pending.id;
// 3. try to spawn a new task attempting to resolve the request
if state
.try_spawn(try_obtain_blinded_ticketbook_async(
state.clone(),
uuid,
requested_on,
payload,
params,
pending,
))
.is_none()
{
// we're going through the shutdown
return Err(RequestError::new_with_uuid(
"server shutdown in progress",
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
));
}
// 4. in the meantime, return the id to the user
Ok(output.to_response(TicketbookWalletSharesAsyncResponse { id, uuid }))
}
.instrument(span)
.await
Ok(output.to_response(response))
}
/// Obtain the current value of the bandwidth voucher deposit
@@ -251,15 +140,14 @@ pub(crate) async fn current_deposit(
State(state): State<ApiState>,
) -> Result<FormattedDepositResponse, RequestError> {
let output = output.output.unwrap_or_default();
let current_deposit = state
.deposit_amount()
.await
.map_err(|err| RequestError::new(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
Ok(output.to_response(DepositResponse {
current_deposit_amount: current_deposit.amount,
current_deposit_denom: current_deposit.denom,
}))
let response = state
.inner_state()
.current_deposit()
.await
.map_err(RequestError::new_plain_error)?;
Ok(output.to_response(response))
}
/// Obtain partial verification keys of all signers for the current epoch.
@@ -288,28 +176,13 @@ pub(crate) async fn partial_verification_keys(
) -> Result<FormattedPartialVerificationKeysResponse, RequestError> {
let output = output.output.unwrap_or_default();
state.ensure_not_in_epoch_transition(None).await?;
let epoch_id = state
.current_epoch_id()
let response = state
.inner_state()
.partial_verification_keys()
.await
.map_err(|err| RequestError::new(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
.map_err(RequestError::new_plain_error)?;
let signers = state
.ecash_clients(epoch_id)
.await
.map_err(|err| RequestError::new(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
Ok(output.to_response(PartialVerificationKeysResponse {
epoch_id,
keys: signers
.iter()
.map(|signer| PartialVerificationKey {
node_index: signer.node_id,
bs58_encoded_key: signer.verification_key.to_bs58(),
})
.collect(),
}))
Ok(output.to_response(response))
}
/// Obtain the master verification key for the current epoch.
@@ -338,22 +211,13 @@ pub(crate) async fn master_verification_key(
) -> Result<FormattedMasterVerificationKeyResponse, RequestError> {
let output = output.output.unwrap_or_default();
state.ensure_not_in_epoch_transition(None).await?;
let epoch_id = state
.current_epoch_id()
let response = state
.inner_state()
.master_verification_key()
.await
.map_err(|err| RequestError::new(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
.map_err(RequestError::new_plain_error)?;
let key = state
.master_verification_key(Some(epoch_id))
.await
.map_err(|err| RequestError::new(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
Ok(output.to_response(MasterVerificationKeyResponse {
epoch_id,
bs58_encoded_key: key.to_bs58(),
}))
Ok(output.to_response(response))
}
/// Obtain the id of the current epoch.
@@ -383,14 +247,13 @@ pub(crate) async fn current_epoch(
) -> Result<FormattedCurrentEpochResponse, RequestError> {
let output = output.output.unwrap_or_default();
state.ensure_not_in_epoch_transition(None).await?;
let epoch_id = state
.current_epoch_id()
let response = state
.inner_state()
.current_epoch()
.await
.map_err(|err| RequestError::new(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
.map_err(RequestError::new_plain_error)?;
Ok(output.to_response(CurrentEpochResponse { epoch_id }))
Ok(output.to_response(response))
}
pub(crate) fn routes() -> Router<ApiState> {
@@ -1,76 +1,18 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::CredentialProxyError;
use crate::http::helpers::{db_failure, random_uuid};
use crate::http::router::api::v1::ticketbook::FormattedTicketbookWalletSharesResponse;
use crate::http::state::ApiState;
use crate::http::types::RequestError;
use crate::storage::models::MinimalWalletShare;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::routing::get;
use axum::Router;
use nym_credential_proxy_lib::helpers::random_uuid;
use nym_credential_proxy_lib::http_helpers::RequestError;
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
SharesQueryParams, TicketbookWalletSharesResponse,
};
use nym_credential_proxy_requests::routes::api::v1::ticketbook::shares;
use nym_http_api_common::OutputParams;
use nym_validator_client::nym_api::EpochId;
use tracing::{debug, span, Instrument, Level};
use uuid::Uuid;
async fn shares_to_response(
state: ApiState,
uuid: Uuid,
shares: Vec<MinimalWalletShare>,
params: SharesQueryParams,
) -> Result<FormattedTicketbookWalletSharesResponse, RequestError> {
// in all calls we ensured the shares are non-empty
#[allow(clippy::unwrap_used)]
let first = shares.first().unwrap();
let expiration_date = first.expiration_date;
let epoch_id = first.epoch_id as EpochId;
let threshold = state.response_ecash_threshold(uuid, epoch_id).await?;
if shares.len() < threshold as usize {
return Err(RequestError::new_server_error(
CredentialProxyError::InsufficientNumberOfCredentials {
available: shares.len(),
threshold,
},
uuid,
));
}
// grab any requested additional data
let (
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
) = state
.response_global_data(
params.include_master_verification_key,
params.include_expiration_date_signatures,
params.include_coin_index_signatures,
epoch_id,
expiration_date,
uuid,
)
.await?;
// finally produce a response
Ok(params
.output
.unwrap_or_default()
.to_response(TicketbookWalletSharesResponse {
epoch_id,
shares: shares.into_iter().map(Into::into).collect(),
master_verification_key,
aggregated_coin_index_signatures,
aggregated_expiration_date_signatures,
}))
}
/// Query by id for blinded shares of a bandwidth voucher
#[utoipa::path(
@@ -98,53 +40,15 @@ pub(crate) async fn query_for_shares_by_id(
Path(share_id): Path<i64>,
) -> Result<FormattedTicketbookWalletSharesResponse, RequestError> {
let uuid = random_uuid();
let output = params.output.unwrap_or_default();
let span = span!(Level::INFO, "query shares by id", uuid = %uuid, share_id = %share_id);
async move {
debug!("");
let response = state
.inner_state()
.query_for_shares_by_id(uuid, params.global, share_id)
.await
.map_err(|err| RequestError::new_server_error(err, uuid))?;
// TODO: edge case: this will **NOT** work if shares got created in epoch X,
// but this query happened in epoch X+1
let shares = match state
.storage()
.load_wallet_shares_by_shares_id(share_id)
.await
{
Ok(shares) => {
if shares.is_empty() {
debug!("shares not found");
// check for explicit error
match state
.storage()
.load_shares_error_by_shares_id(share_id)
.await
{
Ok(maybe_error_message) => {
if let Some(error_message) = maybe_error_message {
return Err(RequestError::new_with_uuid(
format!("failed to obtain wallet shares: {error_message} - share_id = {share_id}"),
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
));
}
}
Err(err) => return db_failure(err, uuid),
}
return Err(RequestError::new_with_uuid(
format!("not found - share_id = {share_id}"),
uuid,
StatusCode::NOT_FOUND,
));
}
shares
}
Err(err) => return db_failure(err, uuid),
};
shares_to_response(state, uuid, shares, params).await
}.instrument(span).await
Ok(output.to_response(response))
}
/// Query by id for blinded wallet shares of a ticketbook
@@ -173,53 +77,20 @@ pub(crate) async fn query_for_shares_by_device_id_and_credential_id(
Path((device_id, credential_id)): Path<(String, String)>,
) -> Result<FormattedTicketbookWalletSharesResponse, RequestError> {
let uuid = random_uuid();
let output = params.output.unwrap_or_default();
let span = span!(Level::INFO, "query shares by device and credential ids", uuid = %uuid, device_id = %device_id, credential_id = %credential_id);
async move {
debug!("");
let response = state
.inner_state()
.query_for_shares_by_device_id_and_credential_id(
uuid,
params.global,
device_id,
credential_id,
)
.await
.map_err(|err| RequestError::new_server_error(err, uuid))?;
// TODO: edge case: this will **NOT** work if shares got created in epoch X,
// but this query happened in epoch X+1
let shares = match state
.storage()
.load_wallet_shares_by_device_and_credential_id(&device_id, &credential_id)
.await
{
Ok(shares) => {
if shares.is_empty() {
debug!("shares not found");
// check for explicit error
match state
.storage()
.load_shares_error_by_device_and_credential_id(&device_id, &credential_id)
.await
{
Ok(maybe_error_message) => {
if let Some(error_message) = maybe_error_message {
return Err(RequestError::new_with_uuid(
format!("failed to obtain wallet shares: {error_message} - device_id = {device_id}, credential_id = {credential_id}"),
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
));
}
}
Err(err) => return db_failure(err, uuid),
}
return Err(RequestError::new_with_uuid(
format!("not found - device_id = {device_id}, credential_id = {credential_id}"),
uuid,
StatusCode::NOT_FOUND,
));
}
shares
}
Err(err) => return db_failure(err, uuid),
};
shares_to_response(state, uuid, shares, params).await
}.instrument(span).await
Ok(output.to_response(response))
}
pub(crate) fn routes() -> Router<ApiState> {
@@ -18,7 +18,7 @@ fn swagger_redirect<S: Clone + Send + Sync + 'static>() -> MethodRouter<S> {
get(|| async { Redirect::to("/api/v1/swagger/") })
}
pub fn build_router(state: ApiState, auth_token: String) -> Router {
pub fn build_router(state: impl Into<ApiState>, auth_token: String) -> Router {
// let auth_layer = from_extractor::<RequireAuth>();
let auth_middleware = AuthLayer::new(Arc::new(Zeroizing::new(auth_token)));
@@ -32,7 +32,7 @@ pub fn build_router(state: ApiState, auth_token: String) -> Router {
// we don't have to be using middleware, but we already had that code
// we might want something like: https://github.com/tokio-rs/axum/blob/main/examples/tracing-aka-logging/src/main.rs#L44 instead
.layer(axum::middleware::from_fn(logging::log_request_info))
.with_state(state);
.with_state(state.into());
cfg_if::cfg_if! {
if #[cfg(feature = "cors")] {
@@ -1,776 +1,21 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::deposits_buffer::helpers::BufferedDeposit;
use crate::deposits_buffer::DepositsBuffer;
use crate::error::CredentialProxyError;
use crate::helpers::LockTimer;
use crate::http::state::required_deposit_cache::RequiredDepositCache;
use crate::http::types::RequestError;
use crate::nym_api_helpers::{
ensure_sane_expiration_date, query_all_threshold_apis, CachedEpoch, CachedImmutableEpochItem,
CachedImmutableItems,
};
use crate::quorum_checker::QuorumState;
use crate::storage::CredentialProxyStorage;
use crate::webhook::ZkNymWebHookConfig;
use axum::http::StatusCode;
use bip39::Mnemonic;
use nym_compact_ecash::scheme::coin_indices_signatures::{
aggregate_annotated_indices_signatures, CoinIndexSignatureShare,
};
use nym_compact_ecash::scheme::expiration_date_signatures::{
aggregate_annotated_expiration_signatures, ExpirationDateSignatureShare,
};
use nym_compact_ecash::{Base58, PublicKeyUser};
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
AggregatedCoinIndicesSignaturesResponse, AggregatedExpirationDateSignaturesResponse,
MasterVerificationKeyResponse,
};
use nym_credentials::ecash::utils::{ecash_today, EcashTime};
use nym_credentials::{
AggregatedCoinIndicesSignatures, AggregatedExpirationDateSignatures, EpochVerificationKey,
};
use nym_credentials_interface::VerificationKeyAuth;
use nym_ecash_contract_common::deposit::DepositId;
use nym_ecash_contract_common::msg::ExecuteMsg;
use nym_validator_client::coconut::EcashApiError;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::dkg_query_client::Epoch;
use nym_validator_client::nyxd::contract_traits::{
DkgQueryClient, NymContractsProvider, PagedDkgQueryClient,
};
use nym_validator_client::nyxd::cosmwasm_client::types::ExecuteResult;
use nym_validator_client::nyxd::{Coin, CosmWasmClient, NyxdClient};
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient, EcashApiClient};
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use time::{Date, OffsetDateTime};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tracing::{debug, error, info, instrument, warn};
use uuid::Uuid;
use nym_credential_proxy_lib::ticketbook_manager::TicketbookManager;
pub(crate) mod required_deposit_cache;
// currently we need to hold our keypair so that we could request a freepass credential
#[derive(Clone)]
pub struct ApiState {
inner: Arc<CredentialProxyStateInner>,
inner: TicketbookManager,
}
impl From<TicketbookManager> for ApiState {
fn from(inner: TicketbookManager) -> Self {
Self { inner }
}
}
// a lot of functionalities, mostly to do with caching and storage is just copy-pasted from nym-api,
// since we have to do more or less the same work
impl ApiState {
pub(crate) async fn new(
storage: CredentialProxyStorage,
quorum_state: QuorumState,
zk_nym_web_hook_config: ZkNymWebHookConfig,
client: ChainClient,
deposits_buffer: DepositsBuffer,
required_deposit_cache: RequiredDepositCache,
cancellation_token: CancellationToken,
) -> Result<Self, CredentialProxyError> {
let state = ApiState {
inner: Arc::new(CredentialProxyStateInner {
storage,
client,
ecash_state: EcashState {
required_deposit_cache,
quorum_state,
cached_epoch: Default::default(),
master_verification_key: Default::default(),
threshold_values: Default::default(),
epoch_clients: Default::default(),
coin_index_signatures: Default::default(),
expiration_date_signatures: Default::default(),
},
zk_nym_web_hook_config,
task_tracker: TaskTracker::new(),
deposits_buffer,
cancellation_token,
}),
};
// since this is startup,
// might as well do all the needed network queries to establish needed global signatures
// if we don't already have them
state.build_initial_cache().await?;
Ok(state)
}
async fn build_initial_cache(&self) -> Result<(), CredentialProxyError> {
let today = ecash_today().date();
let epoch_id = self.current_epoch_id().await?;
let _ = self.deposit_amount().await?;
let _ = self.master_verification_key(Some(epoch_id)).await?;
let _ = self.ecash_threshold(epoch_id).await?;
let _ = self.ecash_clients(epoch_id).await?;
let _ = self.master_coin_index_signatures(Some(epoch_id)).await?;
let _ = self
.master_expiration_date_signatures(epoch_id, today)
.await?;
Ok(())
}
pub(crate) fn try_spawn<F>(&self, task: F) -> Option<JoinHandle<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
// don't spawn new task if we've received cancellation token
if self.inner.cancellation_token.is_cancelled() {
None
} else {
self.inner.task_tracker.reopen();
// TODO: later use a task queue since most requests will be blocked waiting on chain permit anyway
let join_handle = self.inner.task_tracker.spawn(task);
self.inner.task_tracker.close();
Some(join_handle)
}
}
pub(crate) async fn cancel_and_wait(&self) {
self.inner.cancellation_token.cancel();
self.inner.deposits_buffer.wait_for_shutdown().await;
self.inner.task_tracker.wait().await
}
pub(crate) fn zk_nym_web_hook(&self) -> &ZkNymWebHookConfig {
&self.inner.zk_nym_web_hook_config
}
pub(crate) fn quorum_available(&self) -> bool {
self.inner.ecash_state.quorum_state.available()
}
async fn ensure_credentials_issuable(&self) -> Result<(), CredentialProxyError> {
let epoch = self.current_epoch().await?;
if epoch.state.is_final() {
Ok(())
} else if let Some(final_timestamp) = epoch.final_timestamp_secs() {
// SAFETY: the timestamp values in our DKG contract should be valid timestamps,
// otherwise it means the chain is seriously misbehaving
#[allow(clippy::unwrap_used)]
let finish_dt = OffsetDateTime::from_unix_timestamp(final_timestamp as i64).unwrap();
Err(CredentialProxyError::CredentialsNotYetIssuable {
availability: finish_dt,
})
} else if epoch.state.is_waiting_initialisation() {
Err(CredentialProxyError::UninitialisedDkg)
} else {
Err(CredentialProxyError::UnknownEcashFailure)
}
}
pub(crate) fn storage(&self) -> &CredentialProxyStorage {
&self.inner.storage
}
pub async fn deposit_amount(&self) -> Result<Coin, CredentialProxyError> {
self.inner
.ecash_state
.required_deposit_cache
.get_or_update(&self.inner.client)
.await
}
async fn current_epoch(&self) -> Result<Epoch, CredentialProxyError> {
let read_guard = self.inner.ecash_state.cached_epoch.read().await;
if read_guard.is_valid() {
return Ok(read_guard.current_epoch);
}
// update cache
drop(read_guard);
let mut write_guard = self.inner.ecash_state.cached_epoch.write().await;
let epoch = self.query_chain().await.get_current_epoch().await?;
write_guard.update(epoch);
Ok(epoch)
}
pub async fn current_epoch_id(&self) -> Result<EpochId, CredentialProxyError> {
let read_guard = self.inner.ecash_state.cached_epoch.read().await;
if read_guard.is_valid() {
return Ok(read_guard.current_epoch.epoch_id);
}
// update cache
drop(read_guard);
let mut write_guard = self.inner.ecash_state.cached_epoch.write().await;
let epoch = self.query_chain().await.get_current_epoch().await?;
write_guard.update(epoch);
Ok(epoch.epoch_id)
}
pub(crate) async fn query_chain(&self) -> RwLockReadGuard<'_, DirectSigningHttpRpcNyxdClient> {
self.inner.client.query_chain().await
}
pub(crate) async fn get_deposit(
&self,
request_uuid: Uuid,
requested_on: OffsetDateTime,
client_pubkey: PublicKeyUser,
) -> Result<BufferedDeposit, CredentialProxyError> {
let start = Instant::now();
let deposit = self
.inner
.deposits_buffer
.get_valid_deposit(request_uuid, requested_on, client_pubkey)
.await;
let time_taken = start.elapsed();
let formatted = humantime::format_duration(time_taken);
if time_taken > Duration::from_secs(10) {
warn!("attempting to get buffered deposit took {formatted}. perhaps the buffer is too small or the process/chain is overloaded?")
} else {
debug!("attempting to get buffered deposit took {formatted}")
};
deposit
}
pub(crate) async fn insert_deposit_usage_error(&self, deposit_id: DepositId, error: String) {
if let Err(err) = self
.inner
.storage
.insert_deposit_usage_error(deposit_id, error)
.await
{
error!("failed to insert information about deposit (id: {deposit_id}) usage failure: {err}")
}
}
pub(crate) async fn global_data(
&self,
include_master_verification_key: bool,
include_expiration_date_signatures: bool,
include_coin_index_signatures: bool,
epoch_id: EpochId,
expiration_date: Date,
) -> Result<
(
Option<MasterVerificationKeyResponse>,
Option<AggregatedExpirationDateSignaturesResponse>,
Option<AggregatedCoinIndicesSignaturesResponse>,
),
CredentialProxyError,
> {
let master_verification_key = if include_master_verification_key {
debug!("including master verification key in the response");
Some(
self.master_verification_key(Some(epoch_id))
.await
.map(|key| MasterVerificationKeyResponse {
epoch_id,
bs58_encoded_key: key.to_bs58(),
})
.inspect_err(|err| warn!("request failure: {err}"))?,
)
} else {
None
};
let aggregated_expiration_date_signatures = if include_expiration_date_signatures {
debug!("including expiration date signatures in the response");
Some(
self.master_expiration_date_signatures(epoch_id, expiration_date)
.await
.map(|signatures| AggregatedExpirationDateSignaturesResponse {
signatures: signatures.clone(),
})
.inspect_err(|err| warn!("request failure: {err}"))?,
)
} else {
None
};
let aggregated_coin_index_signatures = if include_coin_index_signatures {
debug!("including coin index signatures in the response");
Some(
self.master_coin_index_signatures(Some(epoch_id))
.await
.map(|signatures| AggregatedCoinIndicesSignaturesResponse {
signatures: signatures.clone(),
})
.inspect_err(|err| warn!("request failure: {err}"))?,
)
} else {
None
};
Ok((
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
))
}
pub(crate) async fn response_global_data(
&self,
include_master_verification_key: bool,
include_expiration_date_signatures: bool,
include_coin_index_signatures: bool,
epoch_id: EpochId,
expiration_date: Date,
uuid: Uuid,
) -> Result<
(
Option<MasterVerificationKeyResponse>,
Option<AggregatedExpirationDateSignaturesResponse>,
Option<AggregatedCoinIndicesSignaturesResponse>,
),
RequestError,
> {
self.global_data(
include_master_verification_key,
include_expiration_date_signatures,
include_coin_index_signatures,
epoch_id,
expiration_date,
)
.await
.map_err(|err| RequestError::new_server_error(err, uuid))
}
pub async fn ensure_not_in_epoch_transition(
&self,
uuid: Option<Uuid>,
) -> Result<(), RequestError> {
if let Err(err) = self.ensure_credentials_issuable().await {
return if let Some(uuid) = uuid {
Err(RequestError::new_with_uuid(
err.to_string(),
uuid,
StatusCode::SERVICE_UNAVAILABLE,
))
} else {
Err(RequestError::new(
err.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
))
};
}
Ok(())
}
pub(crate) async fn ecash_clients(
&self,
epoch_id: EpochId,
) -> Result<RwLockReadGuard<'_, Vec<EcashApiClient>>, CredentialProxyError> {
self.inner
.ecash_state
.epoch_clients
.get_or_init(epoch_id, || async {
Ok(self
.query_chain()
.await
.get_all_verification_key_shares(epoch_id)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<anyhow::Result<Vec<_>, EcashApiError>>()?)
})
.await
}
pub(crate) async fn ecash_threshold(
&self,
epoch_id: EpochId,
) -> Result<u64, CredentialProxyError> {
self.inner
.ecash_state
.threshold_values
.get_or_init(epoch_id, || async {
if let Some(threshold) = self
.query_chain()
.await
.get_epoch_threshold(epoch_id)
.await?
{
Ok(threshold)
} else {
Err(CredentialProxyError::UnavailableThreshold { epoch_id })
}
})
.await
.map(|t| *t)
}
pub(crate) async fn response_ecash_threshold(
&self,
uuid: Uuid,
epoch_id: EpochId,
) -> Result<u64, RequestError> {
self.ecash_threshold(epoch_id)
.await
.map_err(|err| RequestError::new_server_error(err, uuid))
}
pub(crate) async fn master_verification_key(
&self,
epoch_id: Option<EpochId>,
) -> Result<RwLockReadGuard<'_, VerificationKeyAuth>, CredentialProxyError> {
let epoch_id = match epoch_id {
Some(id) => id,
None => self.current_epoch_id().await?,
};
self.inner
.ecash_state
.master_verification_key
.get_or_init(epoch_id, || async {
// 1. check the storage
if let Some(stored) = self
.inner
.storage
.get_master_verification_key(epoch_id)
.await?
{
return Ok(stored.key);
}
info!("attempting to establish master verification key for epoch {epoch_id}...");
// 2. perform actual aggregation
let all_apis = self.ecash_clients(epoch_id).await?;
let threshold = self.ecash_threshold(epoch_id).await?;
if all_apis.len() < threshold as usize {
return Err(CredentialProxyError::InsufficientNumberOfSigners {
threshold,
available: all_apis.len(),
});
}
let master_key = nym_credentials::aggregate_verification_keys(&all_apis)?;
let epoch = EpochVerificationKey {
epoch_id,
key: master_key,
};
// 3. save the key in the storage for when we reboot
self.inner
.storage
.insert_master_verification_key(&epoch)
.await?;
Ok(epoch.key)
})
.await
}
pub(crate) async fn master_coin_index_signatures(
&self,
epoch_id: Option<EpochId>,
) -> Result<RwLockReadGuard<'_, AggregatedCoinIndicesSignatures>, CredentialProxyError> {
let epoch_id = match epoch_id {
Some(id) => id,
None => self.current_epoch_id().await?,
};
self.inner
.ecash_state
.coin_index_signatures
.get_or_init(epoch_id, || async {
// 1. check the storage
if let Some(master_sigs) = self
.inner
.storage
.get_master_coin_index_signatures(epoch_id)
.await?
{
return Ok(master_sigs);
}
info!(
"attempting to establish master coin index signatures for epoch {epoch_id}..."
);
// 2. go around APIs and attempt to aggregate the data
let master_vk = self.master_verification_key(Some(epoch_id)).await?;
let all_apis = self.ecash_clients(epoch_id).await?;
let threshold = self.ecash_threshold(epoch_id).await?;
let get_partial_signatures = |api: EcashApiClient| async {
// move the api into the closure
let api = api;
let node_index = api.node_id;
let partial_vk = api.verification_key;
let partial = api
.api_client
.partial_coin_indices_signatures(Some(epoch_id))
.await?
.signatures;
Ok(CoinIndexSignatureShare {
index: node_index,
key: partial_vk,
signatures: partial,
})
};
let shares =
query_all_threshold_apis(all_apis.clone(), threshold, get_partial_signatures)
.await?;
let aggregated = aggregate_annotated_indices_signatures(
nym_credentials_interface::ecash_parameters(),
&master_vk,
&shares,
)?;
let sigs = AggregatedCoinIndicesSignatures {
epoch_id,
signatures: aggregated,
};
// 3. save the signatures in the storage for when we reboot
self.inner
.storage
.insert_master_coin_index_signatures(&sigs)
.await?;
Ok(sigs)
})
.await
}
pub(crate) async fn master_expiration_date_signatures(
&self,
epoch_id: EpochId,
expiration_date: Date,
) -> Result<RwLockReadGuard<'_, AggregatedExpirationDateSignatures>, CredentialProxyError> {
self.inner
.ecash_state
.expiration_date_signatures
.get_or_init((epoch_id, expiration_date), || async {
// 1. sanity check to see if the expiration_date is not nonsense
ensure_sane_expiration_date(expiration_date)?;
// 2. check the storage
if let Some(master_sigs) = self
.inner
.storage
.get_master_expiration_date_signatures(expiration_date, epoch_id)
.await?
{
return Ok(master_sigs);
}
info!(
"attempting to establish master expiration date signatures for {expiration_date} and epoch {epoch_id}..."
);
// 3. go around APIs and attempt to aggregate the data
let epoch_id = self.current_epoch_id().await?;
let master_vk = self.master_verification_key(Some(epoch_id)).await?;
let all_apis = self.ecash_clients(epoch_id).await?;
let threshold = self.ecash_threshold(epoch_id).await?;
let get_partial_signatures = |api: EcashApiClient| async {
// move the api into the closure
let api = api;
let node_index = api.node_id;
let partial_vk = api.verification_key;
let partial = api
.api_client
.partial_expiration_date_signatures(Some(expiration_date), Some(epoch_id))
.await?
.signatures;
Ok(ExpirationDateSignatureShare {
index: node_index,
key: partial_vk,
signatures: partial,
})
};
let shares =
query_all_threshold_apis(all_apis.clone(), threshold, get_partial_signatures)
.await?;
let aggregated = aggregate_annotated_expiration_signatures(
&master_vk,
expiration_date.ecash_unix_timestamp(),
&shares,
)?;
let sigs = AggregatedExpirationDateSignatures {
epoch_id,
expiration_date,
signatures: aggregated,
};
// 4. save the signatures in the storage for when we reboot
self.inner
.storage
.insert_master_expiration_date_signatures(&sigs)
.await?;
Ok(sigs)
})
.await
}
}
#[derive(Clone)]
pub struct ChainClient(Arc<RwLock<DirectSigningHttpRpcNyxdClient>>);
impl ChainClient {
pub fn new(mnemonic: Mnemonic) -> Result<Self, CredentialProxyError> {
let network_details = nym_network_defaults::NymNetworkDetails::new_from_env();
let client_config = nyxd::Config::try_from_nym_network_details(&network_details)?;
let nyxd_url = network_details
.endpoints
.first()
.ok_or_else(|| CredentialProxyError::NoNyxEndpointsAvailable)?
.nyxd_url
.as_str();
let client = NyxdClient::connect_with_mnemonic(client_config, nyxd_url, mnemonic)?;
if client.ecash_contract_address().is_none() {
return Err(CredentialProxyError::UnavailableEcashContract);
}
if client.dkg_contract_address().is_none() {
return Err(CredentialProxyError::UnavailableDKGContract);
}
Ok(ChainClient(Arc::new(RwLock::new(client))))
}
pub(crate) async fn query_chain(&self) -> ChainReadPermit<'_> {
let _acquire_timer = LockTimer::new("acquire chain query permit");
self.0.read().await
}
pub(crate) async fn start_chain_tx(&self) -> ChainWritePermit<'_> {
let _acquire_timer = LockTimer::new("acquire exclusive chain write permit");
ChainWritePermit {
lock_timer: LockTimer::new("exclusive chain access permit"),
inner: self.0.write().await,
}
}
}
struct CredentialProxyStateInner {
storage: CredentialProxyStorage,
client: ChainClient,
deposits_buffer: DepositsBuffer,
zk_nym_web_hook_config: ZkNymWebHookConfig,
ecash_state: EcashState,
task_tracker: TaskTracker,
cancellation_token: CancellationToken,
}
pub(crate) struct EcashState {
pub(crate) required_deposit_cache: RequiredDepositCache,
pub(crate) quorum_state: QuorumState,
pub(crate) cached_epoch: RwLock<CachedEpoch>,
pub(crate) master_verification_key: CachedImmutableEpochItem<VerificationKeyAuth>,
pub(crate) threshold_values: CachedImmutableEpochItem<u64>,
pub(crate) epoch_clients: CachedImmutableEpochItem<Vec<EcashApiClient>>,
pub(crate) coin_index_signatures: CachedImmutableEpochItem<AggregatedCoinIndicesSignatures>,
pub(crate) expiration_date_signatures:
CachedImmutableItems<(EpochId, Date), AggregatedExpirationDateSignatures>,
}
pub(crate) type ChainReadPermit<'a> = RwLockReadGuard<'a, DirectSigningHttpRpcNyxdClient>;
// explicitly wrap the WriteGuard for extra information regarding time taken
pub(crate) struct ChainWritePermit<'a> {
// it's not really dead, we only care about it being dropped
#[allow(dead_code)]
lock_timer: LockTimer,
inner: RwLockWriteGuard<'a, DirectSigningHttpRpcNyxdClient>,
}
impl ChainWritePermit<'_> {
#[instrument(skip(self, short_sha, info), err(Display))]
pub(crate) async fn make_deposits(
self,
short_sha: &'static str,
info: Vec<(String, Coin)>,
) -> Result<ExecuteResult, CredentialProxyError> {
let address = self.inner.address();
let starting_sequence = self.inner.get_sequence(&address).await?.sequence;
let deposits = info.len();
let ecash_contract = self
.inner
.ecash_contract_address()
.ok_or(CredentialProxyError::UnavailableEcashContract)?;
let deposit_messages = info
.into_iter()
.map(|(identity_key, amount)| {
(
ExecuteMsg::DepositTicketBookFunds { identity_key },
vec![amount],
)
})
.collect::<Vec<_>>();
let res = self
.inner
.execute_multiple(
ecash_contract,
deposit_messages,
None,
format!("cp-{short_sha}: performing {deposits} deposits"),
)
.await?;
loop {
let updated_sequence = self.inner.get_sequence(&address).await?.sequence;
if updated_sequence > starting_sequence {
break;
}
warn!("wrong sequence number... waiting before releasing chain lock");
tokio::time::sleep(Duration::from_millis(50)).await;
}
Ok(res)
}
}
impl Deref for ChainWritePermit<'_> {
type Target = DirectSigningHttpRpcNyxdClient;
fn deref(&self) -> &Self::Target {
self.inner.deref()
pub(crate) fn inner_state(&self) -> &TicketbookManager {
&self.inner
}
}
@@ -12,16 +12,8 @@ cfg_if::cfg_if! {
pub mod cli;
pub mod config;
pub mod credentials;
pub mod error;
pub mod helpers;
pub mod http;
pub mod nym_api_helpers;
pub mod storage;
pub mod tasks;
mod webhook;
mod deposits_buffer;
mod quorum_checker;
}
}
@@ -34,7 +26,6 @@ async fn main() -> anyhow::Result<()> {
// );
let cli = Cli::parse();
cli.webhook.ensure_valid_client_url()?;
trace!("args: {cli:#?}");
setup_env(cli.config_env_file.as_ref());