827c13b69e
dont build netstack in CI additional rust 2024 fixes fixes removed temp.rs first round of cleanup removed duplicated NS types moved gateway probe to the monorepo
345 lines
12 KiB
Rust
345 lines
12 KiB
Rust
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
use crate::error::CredentialProxyError;
|
|
use crate::storage::models::BlindedShares;
|
|
use crate::ticketbook_manager::TicketbookManager;
|
|
use futures::{StreamExt, stream};
|
|
use nym_compact_ecash::Base58;
|
|
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
|
|
TicketbookAsyncRequest, TicketbookObtainParams, TicketbookRequest,
|
|
TicketbookWalletSharesResponse, WalletShare, WebhookTicketbookWalletShares,
|
|
WebhookTicketbookWalletSharesRequest,
|
|
};
|
|
use nym_validator_client::client::NymApiClientExt;
|
|
use nym_validator_client::ecash::BlindSignRequestBody;
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use time::OffsetDateTime;
|
|
use tokio::sync::Mutex;
|
|
use tokio::time::timeout;
|
|
use tracing::{debug, error, info, instrument};
|
|
use uuid::Uuid;
|
|
|
|
impl TicketbookManager {
|
|
#[instrument(
|
|
skip(self, request_data, request, requested_on),
|
|
fields(
|
|
expiration_date = %request_data.expiration_date,
|
|
ticketbook_type = %request_data.ticketbook_type
|
|
)
|
|
)]
|
|
pub async fn try_obtain_wallet_shares(
|
|
&self,
|
|
request: Uuid,
|
|
requested_on: OffsetDateTime,
|
|
request_data: TicketbookRequest,
|
|
) -> Result<Vec<WalletShare>, CredentialProxyError> {
|
|
// don't proceed if we don't have quorum available as the request will definitely fail
|
|
if !self.state.ecash_state().quorum_state.available() {
|
|
return Err(CredentialProxyError::UnavailableSigningQuorum);
|
|
}
|
|
|
|
let epoch = self.state.current_epoch_id().await?;
|
|
let threshold = self.state.ecash_threshold(epoch).await?;
|
|
let expiration_date = request_data.expiration_date;
|
|
|
|
// before we commit to making the deposit, ensure we have required signatures cached and stored
|
|
self.ensure_global_data_cached(epoch, expiration_date)
|
|
.await?;
|
|
let ecash_api_clients = self.state.ecash_clients(epoch).await?.clone();
|
|
|
|
let deposit_data = self
|
|
.state
|
|
.get_deposit(request, requested_on, request_data.ecash_pubkey)
|
|
.await?;
|
|
let deposit_id = deposit_data.deposit_id;
|
|
let signature = deposit_data.sign_ticketbook_plaintext(&request_data.withdrawal_request);
|
|
|
|
let credential_request = BlindSignRequestBody::new(
|
|
request_data.withdrawal_request.into(),
|
|
deposit_id,
|
|
signature,
|
|
request_data.ecash_pubkey,
|
|
request_data.expiration_date,
|
|
request_data.ticketbook_type,
|
|
);
|
|
|
|
let wallet_shares = Arc::new(Mutex::new(HashMap::new()));
|
|
|
|
info!("attempting to contract all nym-apis for the partial wallets...");
|
|
stream::iter(ecash_api_clients)
|
|
.for_each_concurrent(None, |client| async {
|
|
// move the client into the block
|
|
let client = client;
|
|
|
|
debug!("contacting {client} for blinded partial wallet");
|
|
let res = timeout(
|
|
Duration::from_secs(5),
|
|
client.api_client.blind_sign(&credential_request),
|
|
)
|
|
.await
|
|
.map_err(|_| CredentialProxyError::EcashApiRequestTimeout {
|
|
client_repr: client.to_string(),
|
|
})
|
|
.and_then(|res| res.map_err(Into::into));
|
|
|
|
// 1. try to store it
|
|
if let Err(err) = self
|
|
.state
|
|
.storage()
|
|
.insert_partial_wallet_share(
|
|
deposit_id,
|
|
epoch,
|
|
expiration_date,
|
|
client.node_id,
|
|
&res,
|
|
)
|
|
.await
|
|
{
|
|
error!("failed to persist issued partial share: {err}")
|
|
}
|
|
|
|
// 2. add it to the map
|
|
match res {
|
|
Ok(share) => {
|
|
wallet_shares
|
|
.lock()
|
|
.await
|
|
.insert(client.node_id, share.blinded_signature);
|
|
}
|
|
Err(err) => {
|
|
error!("failed to obtain partial blinded wallet share from {client}: {err}")
|
|
}
|
|
}
|
|
})
|
|
.await;
|
|
|
|
// SAFETY: the futures have completed, so we MUST have the only arc reference
|
|
#[allow(clippy::unwrap_used)]
|
|
let wallet_shares = Arc::into_inner(wallet_shares).unwrap().into_inner();
|
|
let shares = wallet_shares.len();
|
|
|
|
if shares < threshold as usize {
|
|
let err = CredentialProxyError::InsufficientNumberOfCredentials {
|
|
available: shares,
|
|
threshold,
|
|
};
|
|
self.state
|
|
.insert_deposit_usage_error(deposit_id, err.to_string())
|
|
.await;
|
|
return Err(err);
|
|
}
|
|
|
|
Ok(wallet_shares
|
|
.into_iter()
|
|
.map(|(node_index, share)| WalletShare {
|
|
node_index,
|
|
bs58_encoded_share: share.to_bs58(),
|
|
})
|
|
.collect())
|
|
}
|
|
|
|
pub async fn try_obtain_wallet_shares_async(
|
|
&self,
|
|
request: Uuid,
|
|
requested_on: OffsetDateTime,
|
|
request_data: TicketbookRequest,
|
|
device_id: &str,
|
|
credential_id: &str,
|
|
) -> Result<Vec<WalletShare>, CredentialProxyError> {
|
|
let shares = match self
|
|
.try_obtain_wallet_shares(request, requested_on, request_data)
|
|
.await
|
|
{
|
|
Ok(shares) => shares,
|
|
Err(err) => {
|
|
let obtained = match err {
|
|
CredentialProxyError::InsufficientNumberOfCredentials { available, .. } => {
|
|
available
|
|
}
|
|
_ => 0,
|
|
};
|
|
|
|
// currently there's no retry mechanisms, but, who knows, that might change
|
|
if let Err(err) = self
|
|
.state
|
|
.storage()
|
|
.update_pending_async_blinded_shares_error(
|
|
obtained,
|
|
device_id,
|
|
credential_id,
|
|
&err.to_string(),
|
|
)
|
|
.await
|
|
{
|
|
error!("failed to update database with the error information: {err}")
|
|
}
|
|
return Err(err);
|
|
}
|
|
};
|
|
|
|
Ok(shares)
|
|
}
|
|
|
|
async fn try_obtain_blinded_ticketbook_async_inner(
|
|
&self,
|
|
request: Uuid,
|
|
requested_on: OffsetDateTime,
|
|
request_data: TicketbookAsyncRequest,
|
|
params: TicketbookObtainParams,
|
|
pending: &BlindedShares,
|
|
) -> Result<(), CredentialProxyError> {
|
|
let epoch_id = self.state.current_epoch_id().await?;
|
|
|
|
let device_id = &request_data.device_id;
|
|
let credential_id = &request_data.credential_id;
|
|
let secret = request_data.secret.clone();
|
|
|
|
// 1. try to obtain global data
|
|
let (
|
|
master_verification_key,
|
|
aggregated_expiration_date_signatures,
|
|
aggregated_coin_index_signatures,
|
|
) = self
|
|
.state
|
|
.global_data(params.global, epoch_id, request_data.inner.expiration_date)
|
|
.await?;
|
|
|
|
// 2. try to obtain shares (failures are written to the DB)
|
|
let shares = self
|
|
.try_obtain_wallet_shares_async(
|
|
request,
|
|
requested_on,
|
|
request_data.inner,
|
|
device_id,
|
|
credential_id,
|
|
)
|
|
.await?;
|
|
|
|
// 3. update the storage, if possible
|
|
// (as long as we can trigger webhook, we should still be good)
|
|
if let Err(err) = self
|
|
.state
|
|
.storage()
|
|
.update_pending_async_blinded_shares_issued(shares.len(), device_id, credential_id)
|
|
.await
|
|
{
|
|
error!(uuid = %request, "failed to update db with issued information: {err}")
|
|
}
|
|
|
|
// 4. build the webhook request body
|
|
let data = Some(TicketbookWalletSharesResponse {
|
|
epoch_id,
|
|
shares,
|
|
master_verification_key,
|
|
aggregated_coin_index_signatures,
|
|
aggregated_expiration_date_signatures,
|
|
});
|
|
|
|
let ticketbook_wallet_shares = WebhookTicketbookWalletShares {
|
|
id: pending.id,
|
|
status: pending.status.to_string(),
|
|
device_id: device_id.clone(),
|
|
credential_id: credential_id.clone(),
|
|
data,
|
|
error_message: None,
|
|
created: pending.created,
|
|
updated: pending.updated,
|
|
};
|
|
|
|
let webhook_request = WebhookTicketbookWalletSharesRequest {
|
|
ticketbook_wallet_shares,
|
|
secret,
|
|
};
|
|
|
|
// 5. call the webhook
|
|
self.webhook.try_trigger(request, &webhook_request).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn try_trigger_webhook_request_for_error(
|
|
&self,
|
|
request: Uuid,
|
|
request_data: TicketbookAsyncRequest,
|
|
pending: &BlindedShares,
|
|
error_message: String,
|
|
) -> Result<(), CredentialProxyError> {
|
|
let device_id = &request_data.device_id;
|
|
let credential_id = &request_data.credential_id;
|
|
let secret = request_data.secret.clone();
|
|
|
|
let ticketbook_wallet_shares = WebhookTicketbookWalletShares {
|
|
id: pending.id,
|
|
status: "error".to_string(),
|
|
device_id: device_id.clone(),
|
|
credential_id: credential_id.clone(),
|
|
data: None,
|
|
error_message: Some(error_message),
|
|
created: pending.created,
|
|
updated: pending.updated,
|
|
};
|
|
|
|
let webhook_request = WebhookTicketbookWalletSharesRequest {
|
|
ticketbook_wallet_shares,
|
|
secret,
|
|
};
|
|
|
|
self.webhook.try_trigger(request, &webhook_request).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[instrument(
|
|
skip_all,
|
|
fields(
|
|
credential_id = %request_data.credential_id,
|
|
device_id = %request_data.device_id)
|
|
)
|
|
]
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub(crate) async fn try_obtain_blinded_ticketbook_async(
|
|
&self,
|
|
request: Uuid,
|
|
requested_on: OffsetDateTime,
|
|
request_data: TicketbookAsyncRequest,
|
|
params: TicketbookObtainParams,
|
|
pending: BlindedShares,
|
|
) {
|
|
let skip_webhook = params.skip_webhook;
|
|
if let Err(err) = self
|
|
.try_obtain_blinded_ticketbook_async_inner(
|
|
request,
|
|
requested_on,
|
|
request_data.clone(),
|
|
params,
|
|
&pending,
|
|
)
|
|
.await
|
|
{
|
|
if skip_webhook {
|
|
info!(uuid = %request,"the webhook is not going to be called for this request");
|
|
return;
|
|
}
|
|
|
|
// post to the webhook to notify of errors on this side
|
|
if let Err(webhook_err) = self
|
|
.try_trigger_webhook_request_for_error(
|
|
request,
|
|
request_data,
|
|
&pending,
|
|
format!("Failed to get ticketbook: {err}"),
|
|
)
|
|
.await
|
|
{
|
|
error!(uuid = %request, "failed to make webhook request to report error: {webhook_err}")
|
|
}
|
|
error!(uuid = %request, "failed to resolve the blinded ticketbook issuance: {err}")
|
|
} else {
|
|
info!(uuid = %request, "managed to resolve the blinded ticketbook issuance")
|
|
}
|
|
}
|
|
}
|