Compare commits

...

5 Commits

Author SHA1 Message Date
benedetta davico 8dbcd7d07c TEMP gh runner fix 2024-12-03 10:28:27 +01:00
Bogdan-Ștefan Neacşu 60c21a8d1d Fix backwards compat mac generation (#5202) 2024-12-02 19:52:59 +02:00
Jędrzej Stuczyński feefde9022 Bugfix/credential proxy sequencing (#5187)
* using common middleware for all http servers

* improved span handling in credential-proxy

* ensure increase in sequence number upon making deposit

* added explicit connect options for the db

* fixed further instances of incorrect span instrumentation

* batch deposit requests together to improve concurrency

* ignore cancelled requests

* updated credential proxy version to 0.1.4

* adjusted Dockerfile with new binary location

* log binary version on startup

* reduce default log level

* guard against unavaiable commit sha

* apply review comments: dont exit(0), instead just shutdown normally

* add skip_webhook parameter to obtain-async

* removing dead code
2024-12-02 14:52:35 +00:00
benedetta davico 645be5fa22 Update ci-build-upload-binaries.yml 2024-12-02 14:03:44 +01:00
benedetta davico ac56717b23 Update ci-build-upload-binaries.yml 2024-12-02 13:48:05 +01:00
38 changed files with 779 additions and 432 deletions
@@ -21,7 +21,7 @@ jobs:
strategy:
fail-fast: false
matrix:
platform: [ arc-ubuntu-20.04 ]
platform: [ arc-ubuntu-22.04 ]
runs-on: ${{ matrix.platform }}
env:
Generated
+5 -1
View File
@@ -5093,7 +5093,7 @@ dependencies = [
[[package]]
name = "nym-credential-proxy"
version = "0.1.3"
version = "0.1.6"
dependencies = [
"anyhow",
"async-trait",
@@ -5113,6 +5113,7 @@ dependencies = [
"nym-credentials",
"nym-credentials-interface",
"nym-crypto",
"nym-ecash-contract-common",
"nym-http-api-common",
"nym-network-defaults",
"nym-validator-client",
@@ -5620,12 +5621,15 @@ dependencies = [
"axum-client-ip",
"bytes",
"colored",
"futures",
"mime",
"serde",
"serde_json",
"serde_yaml",
"tower 0.4.13",
"tracing",
"utoipa",
"zeroize",
]
[[package]]
@@ -17,7 +17,7 @@ use nym_validator_client::coconut::all_ecash_api_clients;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::EcashSigningClient;
use nym_validator_client::nyxd::contract_traits::{DkgQueryClient, EcashQueryClient};
use nym_validator_client::nyxd::cosmwasm_client::ToSingletonContractData;
use nym_validator_client::nyxd::cosmwasm_client::ContractResponseData;
use nym_validator_client::EcashApiClient;
use rand::rngs::OsRng;
@@ -13,6 +13,44 @@ use tracing::error;
pub use cosmrs::abci::MsgResponse;
pub fn parse_singleton_u32_from_contract_response(b: Vec<u8>) -> Result<u32, NyxdError> {
if b.len() != 4 {
return Err(NyxdError::MalformedResponseData {
got: b.len(),
expected: 4,
});
}
Ok(u32::from_be_bytes([b[0], b[1], b[2], b[3]]))
}
pub fn parse_singleton_u64_from_contract_response(b: Vec<u8>) -> Result<u64, NyxdError> {
if b.len() != 8 {
return Err(NyxdError::MalformedResponseData {
got: b.len(),
expected: 8,
});
}
Ok(u64::from_be_bytes([
b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
]))
}
#[derive(Debug, Clone)]
pub struct ParsedContractResponse {
pub message_index: usize,
pub response: Vec<u8>,
}
impl ParsedContractResponse {
pub fn parse_singleton_u32_contract_data(self) -> Result<u32, NyxdError> {
parse_singleton_u32_from_contract_response(self.response)
}
pub fn parse_singleton_u64_contract_data(self) -> Result<u64, NyxdError> {
parse_singleton_u64_from_contract_response(self.response)
}
}
pub fn parse_msg_responses(data: Bytes) -> Vec<MsgResponse> {
// it seems that currently, on wasmd 0.43 + tendermint-rs 0.37 + cosmrs 0.17.0-pre
// the data is left in undecoded base64 form, but I'd imagine this might change so if the decoding fails,
@@ -34,35 +72,25 @@ pub fn parse_msg_responses(data: Bytes) -> Vec<MsgResponse> {
}
// requires there's a single response message
pub trait ToSingletonContractData: Sized {
pub trait ContractResponseData: Sized {
fn parse_singleton_u32_contract_data(&self) -> Result<u32, NyxdError> {
let b = self.to_singleton_contract_data()?;
if b.len() != 4 {
return Err(NyxdError::MalformedResponseData {
got: b.len(),
expected: 4,
});
}
Ok(u32::from_be_bytes([b[0], b[1], b[2], b[3]]))
parse_singleton_u32_from_contract_response(b)
}
fn parse_singleton_u64_contract_data(&self) -> Result<u64, NyxdError> {
let b = self.to_singleton_contract_data()?;
if b.len() != 8 {
return Err(NyxdError::MalformedResponseData {
got: b.len(),
expected: 8,
});
}
Ok(u64::from_be_bytes([
b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
]))
parse_singleton_u64_from_contract_response(b)
}
fn to_singleton_contract_data(&self) -> Result<Vec<u8>, NyxdError>;
fn to_unchecked_contract_data(&self) -> Result<Vec<Vec<u8>>, NyxdError>;
fn to_contract_data(&self) -> Result<Vec<ParsedContractResponse>, NyxdError>;
}
impl ToSingletonContractData for ExecuteResult {
impl ContractResponseData for ExecuteResult {
fn to_singleton_contract_data(&self) -> Result<Vec<u8>, NyxdError> {
if self.msg_responses.len() != 1 {
return Err(NyxdError::UnexpectedNumberOfMsgResponses {
@@ -72,6 +100,30 @@ impl ToSingletonContractData for ExecuteResult {
self.msg_responses[0].to_contract_response_data()
}
fn to_unchecked_contract_data(&self) -> Result<Vec<Vec<u8>>, NyxdError> {
self.msg_responses
.iter()
.map(ToContractResponseData::to_contract_response_data)
.collect()
}
fn to_contract_data(&self) -> Result<Vec<ParsedContractResponse>, NyxdError> {
let mut response = Vec::new();
for (message_index, msg) in self.msg_responses.iter().enumerate() {
// unfortunately `Name` trait has not been derived for `MsgExecuteContractResponse`,
// so we have to make an explicit string comparison instead
if msg.type_url == "/cosmwasm.wasm.v1.MsgExecuteContractResponse" {
response.push(ParsedContractResponse {
message_index,
response: msg.to_contract_response_data()?,
})
}
}
Ok(response)
}
}
pub trait ToContractResponseData: Sized {
@@ -23,7 +23,7 @@ use tendermint_rpc::endpoint::*;
use tendermint_rpc::query::Query;
use tendermint_rpc::{Error as TendermintRpcError, Order, Paging, SimpleRequest};
pub use helpers::{ToContractResponseData, ToSingletonContractData};
pub use helpers::{ContractResponseData, ToContractResponseData};
#[cfg(feature = "http-client")]
use crate::http_client;
@@ -18,7 +18,7 @@ use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::{
EcashSigningClient, MultisigQueryClient, MultisigSigningClient, PagedMultisigQueryClient,
};
use nym_validator_client::nyxd::cosmwasm_client::ToSingletonContractData;
use nym_validator_client::nyxd::cosmwasm_client::ContractResponseData;
use nym_validator_client::nyxd::cw3::Status;
use nym_validator_client::nyxd::AccountId;
use nym_validator_client::EcashApiClient;
+3
View File
@@ -15,12 +15,15 @@ axum-client-ip.workspace = true
axum.workspace = true
bytes = { workspace = true }
colored.workspace = true
futures = { workspace = true }
mime = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
serde_yaml = { workspace = true }
tower = { workspace = true }
tracing.workspace = true
utoipa = { workspace = true, optional = true }
zeroize = { workspace = true }
[features]
utoipa = ["dep:utoipa"]
+1 -1
View File
@@ -7,7 +7,7 @@ use axum::Json;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
pub mod logging;
pub mod middleware;
#[derive(Debug, Clone)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
@@ -1,5 +1,5 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// SPDX-License-Identifier: Apache-2.0
use axum::http::{header, HeaderValue, StatusCode};
use axum::response::IntoResponse;
@@ -1,5 +1,5 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// SPDX-License-Identifier: Apache-2.0
use axum::extract::Request;
use axum::http::header::{HOST, USER_AGENT};
@@ -11,6 +11,7 @@ use colored::Colorize;
use std::time::Instant;
use tracing::info;
/// Simple logger for requests
pub async fn logger(
InsecureClientIp(addr): InsecureClientIp,
request: Request,
@@ -0,0 +1,5 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod bearer_auth;
pub mod logging;
+1 -1
View File
@@ -17,7 +17,7 @@ use axum::response::Redirect;
use axum::routing::get;
use axum::Router;
use core::net::SocketAddr;
use nym_http_api_common::logging::logger;
use nym_http_api_common::middleware::logging::logger;
use tokio::net::TcpListener;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tower_http::cors::CorsLayer;
@@ -34,7 +34,6 @@ nym-serde-helpers = { path = "../../common/serde-helpers", features = ["bs58"] }
workspace = true
features = ["tokio"]
[features]
default = ["query-types"]
query-types = ["nym-http-api-common"]
@@ -268,6 +268,9 @@ pub struct WebhookTicketbookWalletSharesRequest {
pub struct TicketbookObtainQueryParams {
pub output: Option<Output>,
#[serde(default)]
pub skip_webhook: bool,
pub include_master_verification_key: bool,
pub include_coin_index_signatures: bool,
@@ -1,6 +1,6 @@
[package]
name = "nym-credential-proxy"
version = "0.1.3"
version = "0.1.6"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -48,6 +48,7 @@ nym-config = { path = "../../common/config" }
nym-crypto = { path = "../../common/crypto", features = ["asymmetric", "rand", "serde"] }
nym-credentials = { path = "../../common/credentials" }
nym-credentials-interface = { path = "../../common/credentials-interface" }
nym-ecash-contract-common = { path = "../../common/cosmwasm-smart-contracts/ecash-contract" }
nym-http-api-common = { path = "../../common/http-api-common", features = ["utoipa"] }
nym-validator-client = { path = "../../common/client-libs/validator-client" }
nym-network-defaults = { path = "../../common/network-defaults" }
@@ -30,6 +30,6 @@ RUN apt update && apt install -yy curl ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/nym-credential-proxy/target/release/nym-credential-proxy ./
COPY --from=builder /usr/src/nym/target/release/nym-credential-proxy ./
ENTRYPOINT [ "/nym/nym-credential-proxy" ]
@@ -55,6 +55,15 @@ pub struct Cli {
)]
pub(crate) http_auth_token: String,
/// Specify the maximum number of deposits the credential proxy can make in a single transaction
/// (default: 32)
#[clap(
long,
env = "NYM_CREDENTIAL_PROXY_MAX_CONCURRENT_DEPOSITS",
default_value_t = 32
)]
pub(crate) max_concurrent_deposits: usize,
#[clap(long, env = "NYM_CREDENTIAL_PROXY_PERSISTENT_STORAGE_STORAGE")]
pub(crate) persistent_storage_path: Option<PathBuf>,
}
@@ -1,6 +1,7 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::deposit_maker::{DepositRequest, DepositResponse};
use crate::error::VpnApiError;
use crate::http::state::ApiState;
use crate::storage::models::BlindedShares;
@@ -14,21 +15,48 @@ use nym_credentials::IssuanceTicketBook;
use nym_credentials_interface::Base58;
use nym_crypto::asymmetric::ed25519;
use nym_validator_client::ecash::BlindSignRequestBody;
use nym_validator_client::nyxd::contract_traits::EcashSigningClient;
use nym_validator_client::nyxd::cosmwasm_client::ToSingletonContractData;
use nym_validator_client::nyxd::Coin;
use rand::rngs::OsRng;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tracing::{debug, error, info, instrument};
use tokio::sync::{oneshot, Mutex};
use tokio::time::{timeout, Instant};
use tracing::{debug, error, info, instrument, warn};
use uuid::Uuid;
// use the same type alias as our contract without importing the whole thing just for this single line
pub type NodeId = u64;
#[instrument(skip(state), ret, err(Display))]
async fn make_deposit(
state: &ApiState,
pub_key: ed25519::PublicKey,
deposit_amount: &Coin,
) -> Result<DepositResponse, VpnApiError> {
let start = Instant::now();
let (on_done_tx, on_done_rx) = oneshot::channel();
let request = DepositRequest::new(pub_key, deposit_amount, on_done_tx);
state.request_deposit(request).await;
let time_taken = start.elapsed();
let formatted = humantime::format_duration(time_taken);
let Ok(deposit_response) = on_done_rx.await else {
error!("failed to receive deposit response: the corresponding sender channel got dropped by the DepositMaker!");
return Err(VpnApiError::DepositFailure);
};
if time_taken > Duration::from_secs(20) {
warn!("attempting to resolve deposit request took {formatted}. perhaps the buffer is too small or the process/chain is overloaded?")
} else {
debug!("attempting to resolve deposit request took {formatted}")
}
deposit_response.ok_or(VpnApiError::DepositFailure)
}
#[instrument(
skip(state, request_data, request, requested_on),
fields(
@@ -59,25 +87,12 @@ pub(crate) async fn try_obtain_wallet_shares(
.await?;
let ecash_api_clients = state.ecash_clients(epoch).await?.clone();
let chain_write_permit = state.start_chain_tx().await;
let DepositResponse {
deposit_id,
tx_hash,
} = make_deposit(state, *ed25519_keypair.public_key(), &deposit_amount).await?;
info!("starting the deposit!");
// TODO: batch those up
// TODO: batch those up
let deposit_res = chain_write_permit
.make_ticketbook_deposit(
ed25519_keypair.public_key().to_base58_string(),
deposit_amount.clone(),
None,
)
.await?;
// explicitly drop it here so other tasks could start using it
drop(chain_write_permit);
let deposit_id = deposit_res.parse_singleton_u32_contract_data()?;
let tx_hash = deposit_res.transaction_hash;
info!(deposit_id = %deposit_id, tx_hash = %tx_hash, "deposit finished");
info!(deposit_id = %deposit_id, "deposit finished");
// store the deposit information so if we fail, we could perhaps still reuse it for another issuance
state
@@ -342,6 +357,7 @@ pub(crate) async fn try_obtain_blinded_ticketbook_async(
params: TicketbookObtainQueryParams,
pending: BlindedShares,
) {
let skip_webhook = params.skip_webhook;
if let Err(err) = try_obtain_blinded_ticketbook_async_inner(
&state,
request,
@@ -352,6 +368,11 @@ pub(crate) async fn try_obtain_blinded_ticketbook_async(
)
.await
{
if skip_webhook {
info!(uuid = %request,"the webhook is not going to be called for this request");
return;
}
// post to the webhook to notify of errors on this side
if let Err(webhook_err) = try_trigger_webhook_request_for_error(
&state,
@@ -0,0 +1,205 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::VpnApiError;
use crate::http::state::ChainClient;
use nym_crypto::asymmetric::ed25519;
use nym_ecash_contract_common::deposit::DepositId;
use nym_validator_client::nyxd::cosmwasm_client::ContractResponseData;
use nym_validator_client::nyxd::{Coin, Hash};
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
#[derive(Debug)]
pub(crate) struct DepositResponse {
pub tx_hash: Hash,
pub deposit_id: DepositId,
}
pub(crate) struct DepositRequest {
pubkey: ed25519::PublicKey,
deposit_amount: Coin,
on_done: oneshot::Sender<Option<DepositResponse>>,
}
impl DepositRequest {
pub(crate) fn new(
pubkey: ed25519::PublicKey,
deposit_amount: &Coin,
on_done: oneshot::Sender<Option<DepositResponse>>,
) -> Self {
DepositRequest {
pubkey,
deposit_amount: deposit_amount.clone(),
on_done,
}
}
}
pub(crate) type DepositRequestReceiver = mpsc::Receiver<DepositRequest>;
pub(crate) fn new_control_channels(
max_concurrent_deposits: usize,
) -> (DepositRequestSender, DepositRequestReceiver) {
let (tx, rx) = mpsc::channel(max_concurrent_deposits);
(tx.into(), rx)
}
#[derive(Debug, Clone)]
pub struct DepositRequestSender(mpsc::Sender<DepositRequest>);
impl From<mpsc::Sender<DepositRequest>> for DepositRequestSender {
fn from(inner: mpsc::Sender<DepositRequest>) -> Self {
DepositRequestSender(inner)
}
}
impl DepositRequestSender {
pub(crate) async fn request_deposit(&self, request: DepositRequest) {
if self.0.send(request).await.is_err() {
error!("failed to request deposit: the DepositMaker must have died!")
}
}
}
pub(crate) struct DepositMaker {
client: ChainClient,
max_concurrent_deposits: usize,
deposit_request_sender: DepositRequestSender,
deposit_request_receiver: DepositRequestReceiver,
short_sha: &'static str,
cancellation_token: CancellationToken,
}
impl DepositMaker {
pub(crate) fn new(
short_sha: &'static str,
client: ChainClient,
max_concurrent_deposits: usize,
cancellation_token: CancellationToken,
) -> Self {
let (deposit_request_sender, deposit_request_receiver) =
new_control_channels(max_concurrent_deposits);
DepositMaker {
client,
max_concurrent_deposits,
deposit_request_sender,
deposit_request_receiver,
short_sha,
cancellation_token,
}
}
pub(crate) fn deposit_request_sender(&self) -> DepositRequestSender {
self.deposit_request_sender.clone()
}
pub(crate) async fn process_deposit_requests(
&mut self,
requests: Vec<DepositRequest>,
) -> Result<(), VpnApiError> {
let chain_write_permit = self.client.start_chain_tx().await;
info!("starting deposits");
let mut contents = Vec::new();
let mut replies = Vec::new();
for request in requests {
// check if the channel is still open in case the receiver client has cancelled the request
if request.on_done.is_closed() {
warn!(
"the request for deposit from {} got cancelled",
request.pubkey
);
continue;
}
contents.push((request.pubkey.to_base58_string(), request.deposit_amount));
replies.push(request.on_done);
}
let deposits_res = chain_write_permit
.make_deposits(self.short_sha, contents)
.await;
let execute_res = match deposits_res {
Ok(res) => res,
Err(err) => {
// we have to let requesters know the deposit(s) failed
for reply in replies {
if reply.send(None).is_err() {
warn!("one of the deposit requesters has been terminated")
}
}
return Err(err);
}
};
let tx_hash = execute_res.transaction_hash;
info!("{} deposits made in transaction: {tx_hash}", replies.len());
let contract_data = match execute_res.to_contract_data() {
Ok(contract_data) => contract_data,
Err(err) => {
// that one is tricky. deposits technically got made, but we somehow failed to parse response,
// in this case terminate the proxy with 0 exit code so it wouldn't get automatically restarted
// because it requires some serious MANUAL intervention
error!("CRITICAL FAILURE: failed to parse out deposit information from the contract transaction. either the chain got upgraded and the schema changed or the ecash contract got changed! terminating the process. it has to be inspected manually. error was: {err}");
self.cancellation_token.cancel();
return Err(VpnApiError::DepositFailure);
}
};
if contract_data.len() != replies.len() {
// another critical failure, that one should be quite impossible and thus has to be manually inspected
error!("CRITICAL FAILURE: failed to parse out all deposit information from the contract transaction. got {} responses while we sent {} deposits! either the chain got upgraded and the schema changed or the ecash contract got changed! terminating the process. it has to be inspected manually", contract_data.len(), replies.len());
self.cancellation_token.cancel();
return Err(VpnApiError::DepositFailure);
}
for (reply_channel, response) in replies.into_iter().zip(contract_data) {
let response_index = response.message_index;
let deposit_id = match response.parse_singleton_u32_contract_data() {
Ok(deposit_id) => deposit_id,
Err(err) => {
// another impossibility
error!("CRITICAL FAILURE: failed to parse out deposit id out of the response at index {response_index}: {err}. either the chain got upgraded and the schema changed or the ecash contract got changed! terminating the process. it has to be inspected manually");
self.cancellation_token.cancel();
return Err(VpnApiError::DepositFailure);
}
};
if reply_channel
.send(Some(DepositResponse {
deposit_id,
tx_hash,
}))
.is_err()
{
warn!("one of the deposit requesters has been terminated. deposit {deposit_id} will remain unclaimed!");
// this shouldn't happen as the requester task shouldn't be killed, but it's not a critical failure
// we just lost some tokens, but it's not an undefined on-chain behaviour
}
}
Ok(())
}
pub async fn run_forever(mut self) {
info!("starting the deposit maker task");
loop {
let mut receive_buffer = Vec::with_capacity(self.max_concurrent_deposits);
tokio::select! {
_ = self.cancellation_token.cancelled() => {
break
}
received = self.deposit_request_receiver.recv_many(&mut receive_buffer, self.max_concurrent_deposits) => {
debug!("received {received} deposit requests");
if let Err(err) = self.process_deposit_requests(receive_buffer).await {
error!("failed to process received deposit requests: {err}")
}
}
}
}
}
}
@@ -115,6 +115,9 @@ pub enum VpnApiError {
#[error("timed out while attempting to obtain partial wallet from {client_repr}")]
EcashApiRequestTimeout { client_repr: String },
#[error("failed to create deposit")]
DepositFailure,
}
impl VpnApiError {
@@ -1,64 +0,0 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use axum::{
extract::{ConnectInfo, Request},
http::{
header::{HOST, USER_AGENT},
HeaderValue,
},
middleware::Next,
response::IntoResponse,
};
use colored::*;
use std::net::SocketAddr;
use tokio::time::Instant;
use tracing::info;
/// Simple logger for requests
pub async fn logger(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
req: Request,
next: Next,
) -> impl IntoResponse {
let method = req.method().to_string().green();
let uri = req.uri().to_string().blue();
let agent = header_map(
req.headers().get(USER_AGENT),
"Unknown User Agent".to_string(),
);
let host = header_map(req.headers().get(HOST), "Unknown Host".to_string());
let start = Instant::now();
let res = next.run(req).await;
let time_taken = start.elapsed();
let status = res.status();
let print_status = if status.is_client_error() || status.is_server_error() {
status.to_string().red()
} else if status.is_success() {
status.to_string().green()
} else {
status.to_string().yellow()
};
let taken = "time taken".bold();
let time_taken = match time_taken.as_millis() {
ms if ms > 500 => format!("{taken}: {}", format!("{ms}ms").red()),
ms if ms > 200 => format!("{taken}: {}", format!("{ms}ms").yellow()),
ms if ms > 50 => format!("{taken}: {}", format!("{ms}ms").bright_yellow()),
ms => format!("{taken}: {ms}ms"),
};
let agent_str = "agent".bold();
info!("[{addr} -> {host}] {method} '{uri}': {print_status} {time_taken} {agent_str}: {agent}");
res
}
fn header_map(header: Option<&HeaderValue>, msg: String) -> String {
header
.map(|x| x.to_str().unwrap_or(&msg).to_string())
.unwrap_or(msg)
}
@@ -1,5 +0,0 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod auth;
pub mod logging;
@@ -10,7 +10,6 @@ use tokio_util::sync::CancellationToken;
use tracing::info;
pub mod helpers;
pub mod middleware;
pub mod router;
pub mod state;
pub mod types;
@@ -22,10 +21,15 @@ pub struct HttpServer {
}
impl HttpServer {
pub fn new(bind_address: SocketAddr, state: ApiState, auth_token: String) -> Self {
pub fn new(
bind_address: SocketAddr,
state: ApiState,
auth_token: String,
cancellation: CancellationToken,
) -> Self {
HttpServer {
bind_address,
cancellation: state.cancellation_token(),
cancellation,
router: build_router(state, auth_token),
}
}
@@ -4,8 +4,8 @@
use crate::http::state::ApiState;
use axum::Router;
use nym_credential_proxy_requests::routes;
use nym_http_api_common::middleware::bearer_auth::AuthLayer;
use crate::http::middleware::auth::AuthLayer;
pub(crate) use nym_http_api_common::{Output, OutputParams};
pub mod v1;
@@ -1,13 +1,11 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::http::middleware::auth::AuthLayer;
use crate::http::state::ApiState;
use axum::Router;
use nym_credential_proxy_requests::routes::api::v1;
use nym_http_api_common::middleware::bearer_auth::AuthLayer;
// pub mod bandwidth_voucher;
// pub mod freepass;
pub mod openapi;
pub mod ticketbook;
@@ -21,7 +21,7 @@ use nym_credential_proxy_requests::api::v1::ticketbook::models::{
use nym_credential_proxy_requests::routes::api::v1::ticketbook;
use nym_http_api_common::{FormattedResponse, OutputParams};
use time::OffsetDateTime;
use tracing::{error, info, span, warn, Level};
use tracing::{error, info, span, warn, Instrument, Level};
pub(crate) mod shares;
@@ -71,55 +71,58 @@ pub(crate) async fn obtain_ticketbook_shares(
let requested_on = OffsetDateTime::now_utc();
let span = span!(Level::INFO, "obtain ticketboook", uuid = %uuid);
let _entered = span.enter();
info!("");
async move {
info!("");
let output = params.output.unwrap_or_default();
let output = params.output.unwrap_or_default();
state.ensure_not_in_epoch_transition(Some(uuid)).await?;
let epoch_id = state
.current_epoch_id()
.await
.map_err(|err| RequestError::new_server_error(err, uuid))?;
state.ensure_not_in_epoch_transition(Some(uuid)).await?;
let epoch_id = state
.current_epoch_id()
.await
.map_err(|err| RequestError::new_server_error(err, uuid))?;
if let Err(err) = ensure_sane_expiration_date(payload.expiration_date) {
warn!("failure due to invalid expiration date");
return Err(RequestError::new_with_uuid(
err.to_string(),
uuid,
StatusCode::BAD_REQUEST,
));
}
if let Err(err) = ensure_sane_expiration_date(payload.expiration_date) {
warn!("failure due to invalid expiration date");
return Err(RequestError::new_with_uuid(
err.to_string(),
uuid,
StatusCode::BAD_REQUEST,
));
}
// if additional data was requested, grab them first in case there are any cache/network issues
let (
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
) = state
.response_global_data(
params.include_master_verification_key,
params.include_expiration_date_signatures,
params.include_coin_index_signatures,
// if additional data was requested, grab them first in case there are any cache/network issues
let (
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
) = state
.response_global_data(
params.include_master_verification_key,
params.include_expiration_date_signatures,
params.include_coin_index_signatures,
epoch_id,
payload.expiration_date,
uuid,
)
.await?;
let shares = try_obtain_wallet_shares(&state, uuid, requested_on, payload)
.await
.inspect_err(|err| warn!("request failure: {err}"))
.map_err(|err| RequestError::new(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
info!("request was successful!");
Ok(output.to_response(TicketbookWalletSharesResponse {
epoch_id,
payload.expiration_date,
uuid,
)
.await?;
let shares = try_obtain_wallet_shares(&state, uuid, requested_on, payload)
.await
.inspect_err(|err| warn!("request failure: {err}"))
.map_err(|err| RequestError::new(err.to_string(), StatusCode::INTERNAL_SERVER_ERROR))?;
info!("request was successful!");
Ok(output.to_response(TicketbookWalletSharesResponse {
epoch_id,
shares,
master_verification_key,
aggregated_coin_index_signatures,
aggregated_expiration_date_signatures,
}))
shares,
master_verification_key,
aggregated_coin_index_signatures,
aggregated_expiration_date_signatures,
}))
}
.instrument(span)
.await
}
/// Attempt to obtain blinded shares of an ecash ticketbook wallet asynchronously
@@ -159,63 +162,69 @@ pub(crate) async fn obtain_ticketbook_shares_async(
let requested_on = OffsetDateTime::now_utc();
let span = span!(Level::INFO, "[async] obtain ticketboook", uuid = %uuid);
let _entered = span.enter();
info!("");
async move {
info!("");
let output = params.output.unwrap_or_default();
let output = params.output.unwrap_or_default();
// 1. perform basic validation
state.ensure_not_in_epoch_transition(Some(uuid)).await?;
// 1. perform basic validation
state.ensure_not_in_epoch_transition(Some(uuid)).await?;
if let Err(err) = ensure_sane_expiration_date(payload.inner.expiration_date) {
warn!("failure due to invalid expiration date");
return Err(RequestError::new_with_uuid(
err.to_string(),
uuid,
StatusCode::BAD_REQUEST,
));
}
// 2. store the request to retrieve the id
let pending = match state
.storage()
.insert_new_pending_async_shares_request(uuid, &payload.device_id, &payload.credential_id)
.await
{
Err(err) => {
error!("failed to insert new pending async shares: {err}");
if let Err(err) = ensure_sane_expiration_date(payload.inner.expiration_date) {
warn!("failure due to invalid expiration date");
return Err(RequestError::new_with_uuid(
err.to_string(),
uuid,
StatusCode::CONFLICT,
StatusCode::BAD_REQUEST,
));
}
Ok(pending) => pending,
};
let id = pending.id;
// 3. try to spawn a new task attempting to resolve the request
if state
.try_spawn(try_obtain_blinded_ticketbook_async(
state.clone(),
uuid,
requested_on,
payload,
params,
pending,
))
.is_none()
{
// we're going through the shutdown
return Err(RequestError::new_with_uuid(
"server shutdown in progress",
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
));
// 2. store the request to retrieve the id
let pending = match state
.storage()
.insert_new_pending_async_shares_request(
uuid,
&payload.device_id,
&payload.credential_id,
)
.await
{
Err(err) => {
error!("failed to insert new pending async shares: {err}");
return Err(RequestError::new_with_uuid(
err.to_string(),
uuid,
StatusCode::CONFLICT,
));
}
Ok(pending) => pending,
};
let id = pending.id;
// 3. try to spawn a new task attempting to resolve the request
if state
.try_spawn(try_obtain_blinded_ticketbook_async(
state.clone(),
uuid,
requested_on,
payload,
params,
pending,
))
.is_none()
{
// we're going through the shutdown
return Err(RequestError::new_with_uuid(
"server shutdown in progress",
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
));
}
// 4. in the meantime, return the id to the user
Ok(output.to_response(TicketbookWalletSharesAsyncResponse { id, uuid }))
}
// 4. in the meantime, return the id to the user
Ok(output.to_response(TicketbookWalletSharesAsyncResponse { id, uuid }))
.instrument(span)
.await
}
/// Obtain the current value of the bandwidth voucher deposit
@@ -17,7 +17,7 @@ use nym_credential_proxy_requests::api::v1::ticketbook::models::{
use nym_credential_proxy_requests::routes::api::v1::ticketbook::shares;
use nym_http_api_common::OutputParams;
use nym_validator_client::nym_api::EpochId;
use tracing::{debug, span, Level};
use tracing::{debug, span, Instrument, Level};
use uuid::Uuid;
async fn shares_to_response(
@@ -100,50 +100,51 @@ pub(crate) async fn query_for_shares_by_id(
let uuid = random_uuid();
let span = span!(Level::INFO, "query shares by id", uuid = %uuid, share_id = %share_id);
let _entered = span.enter();
debug!("");
async move {
debug!("");
// TODO: edge case: this will **NOT** work if shares got created in epoch X,
// but this query happened in epoch X+1
let shares = match state
.storage()
.load_wallet_shares_by_shares_id(share_id)
.await
{
Ok(shares) => {
if shares.is_empty() {
debug!("shares not found");
// TODO: edge case: this will **NOT** work if shares got created in epoch X,
// but this query happened in epoch X+1
let shares = match state
.storage()
.load_wallet_shares_by_shares_id(share_id)
.await
{
Ok(shares) => {
if shares.is_empty() {
debug!("shares not found");
// check for explicit error
match state
.storage()
.load_shares_error_by_shares_id(share_id)
.await
{
Ok(maybe_error_message) => {
if let Some(error_message) = maybe_error_message {
return Err(RequestError::new_with_uuid(
format!("failed to obtain wallet shares: {error_message} - share_id = {share_id}"),
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
));
// check for explicit error
match state
.storage()
.load_shares_error_by_shares_id(share_id)
.await
{
Ok(maybe_error_message) => {
if let Some(error_message) = maybe_error_message {
return Err(RequestError::new_with_uuid(
format!("failed to obtain wallet shares: {error_message} - share_id = {share_id}"),
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
));
}
}
Err(err) => return db_failure(err, uuid),
}
Err(err) => return db_failure(err, uuid),
return Err(RequestError::new_with_uuid(
format!("not found - share_id = {share_id}"),
uuid,
StatusCode::NOT_FOUND,
));
}
return Err(RequestError::new_with_uuid(
format!("not found - share_id = {share_id}"),
uuid,
StatusCode::NOT_FOUND,
));
shares
}
shares
}
Err(err) => return db_failure(err, uuid),
};
Err(err) => return db_failure(err, uuid),
};
shares_to_response(state, uuid, shares, params).await
shares_to_response(state, uuid, shares, params).await
}.instrument(span).await
}
/// Query by id for blinded wallet shares of a ticketbook
@@ -174,50 +175,51 @@ pub(crate) async fn query_for_shares_by_device_id_and_credential_id(
let uuid = random_uuid();
let span = span!(Level::INFO, "query shares by device and credential ids", uuid = %uuid, device_id = %device_id, credential_id = %credential_id);
let _entered = span.enter();
debug!("");
async move {
debug!("");
// TODO: edge case: this will **NOT** work if shares got created in epoch X,
// but this query happened in epoch X+1
let shares = match state
.storage()
.load_wallet_shares_by_device_and_credential_id(&device_id, &credential_id)
.await
{
Ok(shares) => {
if shares.is_empty() {
debug!("shares not found");
// TODO: edge case: this will **NOT** work if shares got created in epoch X,
// but this query happened in epoch X+1
let shares = match state
.storage()
.load_wallet_shares_by_device_and_credential_id(&device_id, &credential_id)
.await
{
Ok(shares) => {
if shares.is_empty() {
debug!("shares not found");
// check for explicit error
match state
.storage()
.load_shares_error_by_device_and_credential_id(&device_id, &credential_id)
.await
{
Ok(maybe_error_message) => {
if let Some(error_message) = maybe_error_message {
return Err(RequestError::new_with_uuid(
format!("failed to obtain wallet shares: {error_message} - device_id = {device_id}, credential_id = {credential_id}"),
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
));
// check for explicit error
match state
.storage()
.load_shares_error_by_device_and_credential_id(&device_id, &credential_id)
.await
{
Ok(maybe_error_message) => {
if let Some(error_message) = maybe_error_message {
return Err(RequestError::new_with_uuid(
format!("failed to obtain wallet shares: {error_message} - device_id = {device_id}, credential_id = {credential_id}"),
uuid,
StatusCode::INTERNAL_SERVER_ERROR,
));
}
}
Err(err) => return db_failure(err, uuid),
}
Err(err) => return db_failure(err, uuid),
return Err(RequestError::new_with_uuid(
format!("not found - device_id = {device_id}, credential_id = {credential_id}"),
uuid,
StatusCode::NOT_FOUND,
));
}
return Err(RequestError::new_with_uuid(
format!("not found - device_id = {device_id}, credential_id = {credential_id}"),
uuid,
StatusCode::NOT_FOUND,
));
shares
}
shares
}
Err(err) => return db_failure(err, uuid),
};
Err(err) => return db_failure(err, uuid),
};
shares_to_response(state, uuid, shares, params).await
shares_to_response(state, uuid, shares, params).await
}.instrument(span).await
}
pub(crate) fn routes() -> Router<ApiState> {
@@ -1,13 +1,13 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::http::middleware::auth::AuthLayer;
use crate::http::middleware::logging;
use crate::http::state::ApiState;
use axum::response::Redirect;
use axum::routing::{get, MethodRouter};
use axum::Router;
use nym_credential_proxy_requests::routes;
use nym_http_api_common::middleware::bearer_auth::AuthLayer;
use nym_http_api_common::middleware::logging;
use std::sync::Arc;
use zeroize::Zeroizing;
@@ -1,6 +1,7 @@
// Copyright 2024 Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::deposit_maker::{DepositRequest, DepositRequestSender};
use crate::error::VpnApiError;
use crate::helpers::LockTimer;
use crate::http::types::RequestError;
@@ -28,20 +29,24 @@ use nym_credentials::{
AggregatedCoinIndicesSignatures, AggregatedExpirationDateSignatures, EpochVerificationKey,
};
use nym_credentials_interface::VerificationKeyAuth;
use nym_ecash_contract_common::msg::ExecuteMsg;
use nym_validator_client::coconut::EcashApiError;
use nym_validator_client::nym_api::EpochId;
use nym_validator_client::nyxd::contract_traits::dkg_query_client::Epoch;
use nym_validator_client::nyxd::contract_traits::{
DkgQueryClient, EcashQueryClient, NymContractsProvider, PagedDkgQueryClient,
};
use nym_validator_client::nyxd::{Coin, NyxdClient};
use nym_validator_client::nyxd::cosmwasm_client::types::ExecuteResult;
use nym_validator_client::nyxd::{Coin, CosmWasmClient, NyxdClient};
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient, EcashApiClient};
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use time::{Date, OffsetDateTime};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tracing::{debug, info, warn};
@@ -59,36 +64,19 @@ impl ApiState {
pub async fn new(
storage: VpnApiStorage,
zk_nym_web_hook_config: ZkNymWebHookConfig,
mnemonic: Mnemonic,
client: ChainClient,
deposit_requester: DepositRequestSender,
cancellation_token: CancellationToken,
) -> Result<Self, VpnApiError> {
let network_details = nym_network_defaults::NymNetworkDetails::new_from_env();
let client_config = nyxd::Config::try_from_nym_network_details(&network_details)?;
let nyxd_url = network_details
.endpoints
.first()
.ok_or_else(|| VpnApiError::NoNyxEndpointsAvailable)?
.nyxd_url
.as_str();
let client = NyxdClient::connect_with_mnemonic(client_config, nyxd_url, mnemonic)?;
if client.ecash_contract_address().is_none() {
return Err(VpnApiError::UnavailableEcashContract);
}
if client.dkg_contract_address().is_none() {
return Err(VpnApiError::UnavailableDKGContract);
}
let state = ApiState {
inner: Arc::new(ApiStateInner {
storage,
client: RwLock::new(client),
client,
ecash_state: EcashState::default(),
zk_nym_web_hook_config,
task_tracker: TaskTracker::new(),
cancellation_token: CancellationToken::new(),
deposit_requester,
cancellation_token,
}),
};
@@ -136,10 +124,6 @@ impl ApiState {
self.inner.task_tracker.wait().await
}
pub(crate) fn cancellation_token(&self) -> CancellationToken {
self.inner.cancellation_token.clone()
}
pub(crate) fn zk_nym_web_hook(&self) -> &ZkNymWebHookConfig {
&self.inner.zk_nym_web_hook_config
}
@@ -220,16 +204,19 @@ impl ApiState {
}
pub(crate) async fn query_chain(&self) -> RwLockReadGuard<DirectSigningHttpRpcNyxdClient> {
let _acquire_timer = LockTimer::new("acquire chain query permit");
self.inner.client.read().await
self.inner.client.query_chain().await
}
pub(crate) async fn start_chain_tx(&self) -> ChainWritePermit {
let _acquire_timer = LockTimer::new("acquire exclusive chain write permit");
pub(crate) async fn request_deposit(&self, request: DepositRequest) {
let start = Instant::now();
self.inner.deposit_requester.request_deposit(request).await;
ChainWritePermit {
lock_timer: LockTimer::new("exclusive chain access permit"),
inner: self.inner.client.write().await,
let time_taken = start.elapsed();
let formatted = humantime::format_duration(time_taken);
if time_taken > Duration::from_secs(10) {
warn!("attempting to push new deposit request onto the queue took {formatted}. perhaps the buffer is too small or the process/chain is overloaded?")
} else {
debug!("attempting to push new deposit request onto the queue took {formatted}")
}
}
@@ -604,10 +591,57 @@ impl ApiState {
}
}
#[derive(Clone)]
pub struct ChainClient(Arc<RwLock<DirectSigningHttpRpcNyxdClient>>);
impl ChainClient {
pub fn new(mnemonic: Mnemonic) -> Result<Self, VpnApiError> {
let network_details = nym_network_defaults::NymNetworkDetails::new_from_env();
let client_config = nyxd::Config::try_from_nym_network_details(&network_details)?;
let nyxd_url = network_details
.endpoints
.first()
.ok_or_else(|| VpnApiError::NoNyxEndpointsAvailable)?
.nyxd_url
.as_str();
let client = NyxdClient::connect_with_mnemonic(client_config, nyxd_url, mnemonic)?;
if client.ecash_contract_address().is_none() {
return Err(VpnApiError::UnavailableEcashContract);
}
if client.dkg_contract_address().is_none() {
return Err(VpnApiError::UnavailableDKGContract);
}
Ok(ChainClient(Arc::new(RwLock::new(client))))
}
pub(crate) async fn query_chain(&self) -> ChainReadPermit {
let _acquire_timer = LockTimer::new("acquire chain query permit");
self.0.read().await
}
pub(crate) async fn start_chain_tx(&self) -> ChainWritePermit {
let _acquire_timer = LockTimer::new("acquire exclusive chain write permit");
ChainWritePermit {
lock_timer: LockTimer::new("exclusive chain access permit"),
inner: self.0.write().await,
}
}
}
//
struct ApiStateInner {
storage: VpnApiStorage,
client: RwLock<DirectSigningHttpRpcNyxdClient>,
client: ChainClient,
deposit_requester: DepositRequestSender,
zk_nym_web_hook_config: ZkNymWebHookConfig,
@@ -666,6 +700,8 @@ pub(crate) struct EcashState {
CachedImmutableItems<Date, AggregatedExpirationDateSignatures>,
}
pub(crate) type ChainReadPermit<'a> = RwLockReadGuard<'a, DirectSigningHttpRpcNyxdClient>;
// explicitly wrap the WriteGuard for extra information regarding time taken
pub(crate) struct ChainWritePermit<'a> {
// it's not really dead, we only care about it being dropped
@@ -674,6 +710,55 @@ pub(crate) struct ChainWritePermit<'a> {
inner: RwLockWriteGuard<'a, DirectSigningHttpRpcNyxdClient>,
}
impl<'a> ChainWritePermit<'a> {
pub(crate) async fn make_deposits(
self,
short_sha: &'static str,
info: Vec<(String, Coin)>,
) -> Result<ExecuteResult, VpnApiError> {
let address = self.inner.address();
let starting_sequence = self.inner.get_sequence(&address).await?.sequence;
let deposits = info.len();
let ecash_contract = self
.inner
.ecash_contract_address()
.ok_or(VpnApiError::UnavailableEcashContract)?;
let deposit_messages = info
.into_iter()
.map(|(identity_key, amount)| {
(
ExecuteMsg::DepositTicketBookFunds { identity_key },
vec![amount],
)
})
.collect::<Vec<_>>();
let res = self
.inner
.execute_multiple(
ecash_contract,
deposit_messages,
None,
format!("cp-{short_sha}: performing {deposits} deposits"),
)
.await?;
loop {
let updated_sequence = self.inner.get_sequence(&address).await?.sequence;
if updated_sequence > starting_sequence {
break;
}
warn!("wrong sequence number... waiting before releasing chain lock");
tokio::time::sleep(Duration::from_millis(50)).await;
}
Ok(res)
}
}
impl Deref for ChainWritePermit<'_> {
type Target = DirectSigningHttpRpcNyxdClient;
@@ -7,19 +7,23 @@
#![warn(clippy::dbg_macro)]
use crate::cli::Cli;
use crate::deposit_maker::DepositMaker;
use crate::error::VpnApiError;
use crate::http::state::ApiState;
use crate::http::state::{ApiState, ChainClient};
use crate::http::HttpServer;
use crate::storage::VpnApiStorage;
use crate::tasks::StoragePruner;
use clap::Parser;
use nym_bin_common::logging::setup_tracing_logger;
use nym_bin_common::{bin_info, bin_info_owned};
use nym_network_defaults::setup_env;
use tracing::{info, trace};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, trace};
pub mod cli;
pub mod config;
pub mod credentials;
mod deposit_maker;
pub mod error;
pub mod helpers;
pub mod http;
@@ -50,6 +54,20 @@ pub async fn wait_for_signal() {
}
}
fn build_sha_short() -> &'static str {
let bin_info = bin_info!();
if bin_info.commit_sha.len() < 7 {
panic!("unavailable build commit sha")
}
if bin_info.commit_sha == "VERGEN_IDEMPOTENT_OUTPUT" {
error!("the binary hasn't been built correctly. it doesn't have a commit sha information");
return "unknown";
}
&bin_info.commit_sha[..7]
}
async fn run_api(cli: Cli) -> Result<(), VpnApiError> {
// create the tasks
let bind_address = cli.bind_address();
@@ -58,14 +76,37 @@ async fn run_api(cli: Cli) -> Result<(), VpnApiError> {
let mnemonic = cli.mnemonic;
let auth_token = cli.http_auth_token;
let webhook_cfg = cli.webhook;
let api_state = ApiState::new(storage.clone(), webhook_cfg, mnemonic).await?;
let http_server = HttpServer::new(bind_address, api_state.clone(), auth_token);
let chain_client = ChainClient::new(mnemonic)?;
let cancellation_token = CancellationToken::new();
let storage_pruner = StoragePruner::new(api_state.cancellation_token(), storage);
let deposit_maker = DepositMaker::new(
build_sha_short(),
chain_client.clone(),
cli.max_concurrent_deposits,
cancellation_token.clone(),
);
let deposit_request_sender = deposit_maker.deposit_request_sender();
let api_state = ApiState::new(
storage.clone(),
webhook_cfg,
chain_client,
deposit_request_sender,
cancellation_token.clone(),
)
.await?;
let http_server = HttpServer::new(
bind_address,
api_state.clone(),
auth_token,
cancellation_token.clone(),
);
let storage_pruner = StoragePruner::new(cancellation_token, storage);
// spawn all the tasks
api_state.try_spawn(http_server.run_forever());
api_state.try_spawn(storage_pruner.run_forever());
api_state.try_spawn(deposit_maker.run_forever());
// wait for cancel signal (SIGINT, SIGTERM or SIGQUIT)
wait_for_signal().await;
@@ -78,10 +119,10 @@ async fn run_api(cli: Cli) -> Result<(), VpnApiError> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
std::env::set_var(
"RUST_LOG",
"trace,handlebars=warn,tendermint_rpc=warn,h2=warn,hyper=warn,rustls=warn,reqwest=warn,tungstenite=warn,async_tungstenite=warn,tokio_util=warn,tokio_tungstenite=warn,tokio-util=warn,nym_validator_client=info",
);
// std::env::set_var(
// "RUST_LOG",
// "trace,handlebars=warn,tendermint_rpc=warn,h2=warn,hyper=warn,rustls=warn,reqwest=warn,tungstenite=warn,async_tungstenite=warn,tokio_util=warn,tokio_tungstenite=warn,tokio-util=warn,axum=warn,sqlx-core=warn,nym_validator_client=info",
// );
let cli = Cli::parse();
cli.webhook.ensure_valid_client_url()?;
@@ -90,6 +131,9 @@ async fn main() -> anyhow::Result<()> {
setup_env(cli.config_env_file.as_ref());
setup_tracing_logger();
let bin_info = bin_info_owned!();
info!("using the following version: {bin_info}");
run_api(cli).await?;
Ok(())
}
@@ -19,7 +19,9 @@ use nym_validator_client::nyxd::Coin;
use sqlx::ConnectOptions;
use std::fmt::Debug;
use std::path::Path;
use std::time::Duration;
use time::{Date, OffsetDateTime};
use tracing::log::LevelFilter;
use tracing::{debug, error, info, instrument};
use uuid::Uuid;
use zeroize::Zeroizing;
@@ -40,9 +42,15 @@ impl VpnApiStorage {
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
.log_statements(LevelFilter::Trace)
.log_slow_statements(LevelFilter::Warn, Duration::from_millis(250));
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
let pool_opts = sqlx::sqlite::SqlitePoolOptions::new()
.min_connections(5)
.max_connections(25)
.acquire_timeout(Duration::from_secs(60));
let connection_pool = match pool_opts.connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
@@ -19,10 +19,11 @@ impl StoragePruner {
}
pub async fn run_forever(self) {
while !self.cancellation_token.is_cancelled() {
info!("starting the storage pruner task");
loop {
tokio::select! {
_ = self.cancellation_token.cancelled() => {
// The token was cancelled, task can shut down
break
}
_ = tokio::time::sleep(std::time::Duration::from_secs(60 * 60)) => {
match self.storage.prune_old_blinded_shares().await {
@@ -5,7 +5,7 @@ use crate::error::VpnApiError;
use clap::Args;
use reqwest::header::AUTHORIZATION;
use serde::Serialize;
use tracing::{debug, error, instrument, span, Level};
use tracing::{debug, error, instrument, span, Instrument, Level};
use url::Url;
use uuid::Uuid;
@@ -46,30 +46,33 @@ impl ZkNymWebHookConfig {
pub async fn try_trigger<T: Serialize + ?Sized>(&self, original_uuid: Uuid, payload: &T) {
let url = self.unchecked_client_url();
let span = span!(Level::DEBUG, "webhook", uuid = %original_uuid, url = %url);
let _entered = span.enter();
debug!("🕸️ about to trigger the webhook");
async move {
debug!("🕸️ about to trigger the webhook");
match reqwest::Client::new()
.post(url.clone())
.header(AUTHORIZATION, self.bearer_token())
.json(payload)
.send()
.await
{
Ok(res) => {
if !res.status().is_success() {
error!("❌🕸️ failed to call webhook: {res:?}");
} else {
debug!("✅🕸️ webhook triggered successfully: {res:?}");
if let Ok(body) = res.text().await {
debug!("body = {body}");
match reqwest::Client::new()
.post(url.clone())
.header(AUTHORIZATION, self.bearer_token())
.json(payload)
.send()
.await
{
Ok(res) => {
if !res.status().is_success() {
error!("❌🕸️ failed to call webhook: {res:?}");
} else {
debug!("✅🕸️ webhook triggered successfully: {res:?}");
if let Ok(body) = res.text().await {
debug!("body = {body}");
}
}
}
}
Err(err) => {
error!("failed to call webhook: {err}")
Err(err) => {
error!("failed to call webhook: {err}")
}
}
}
.instrument(span)
.await
}
}
-1
View File
@@ -11,7 +11,6 @@ use std::net::SocketAddr;
use tracing::{debug, error};
pub mod error;
pub mod middleware;
pub mod router;
pub mod state;
@@ -1,51 +0,0 @@
// Copyright 2023-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use axum::{
extract::{ConnectInfo, Request},
http::{
header::{HOST, USER_AGENT},
HeaderValue,
},
middleware::Next,
response::IntoResponse,
};
use colored::*;
use std::net::SocketAddr;
use tracing::info;
/// Simple logger for requests
pub async fn logger(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
req: Request,
next: Next,
) -> impl IntoResponse {
let method = req.method().to_string().green();
let uri = req.uri().to_string().blue();
let agent = header_map(
req.headers().get(USER_AGENT),
"Unknown User Agent".to_string(),
);
let host = header_map(req.headers().get(HOST), "Unknown Host".to_string());
let res = next.run(req).await;
let status = res.status();
let print_status = if status.is_client_error() || status.is_server_error() {
status.to_string().red()
} else if status.is_success() {
status.to_string().green()
} else {
status.to_string().yellow()
};
info!(target: "incoming request", "[{addr} -> {host}] {method} '{uri}': {print_status} / agent: {agent}");
res
}
fn header_map(header: Option<&HeaderValue>, msg: String) -> String {
header
.map(|x| x.to_str().unwrap_or(&msg).to_string())
.unwrap_or(msg)
}
@@ -1,4 +0,0 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
pub mod logging;
+1 -1
View File
@@ -2,12 +2,12 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NymNodeHttpError;
use crate::middleware::logging;
use crate::state::AppState;
use crate::NymNodeHTTPServer;
use axum::response::Redirect;
use axum::routing::get;
use axum::Router;
use nym_http_api_common::middleware::logging;
use nym_node_requests::api::v1::authenticator::models::Authenticator;
use nym_node_requests::api::v1::gateway::models::{Gateway, Wireguard};
use nym_node_requests::api::v1::ip_packet_router::models::IpPacketRouter;
@@ -316,6 +316,7 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
.filter(|r| r.1.is_none())
.choose(&mut thread_rng())
.ok_or(AuthenticatorError::NoFreeIp)?;
let private_ips = *private_ip_ref.0;
// mark it as used, even though it's not final
*private_ip_ref.1 = Some(SystemTime::now());
let gateway_data = GatewayClient::new(
@@ -337,11 +338,12 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
v1::response::AuthenticatorResponse::new_pending_registration_success(
v1::registration::RegistrationData {
nonce: registration_data.nonce,
gateway_data: v1::GatewayClient {
pub_key: gateway_data.pub_key,
private_ip: gateway_data.private_ips.ipv4.into(),
mac: v1::ClientMac::new(gateway_data.mac.to_vec()),
},
gateway_data: v1::registration::GatewayClient::new(
self.keypair().private_key(),
remote_public.inner(),
private_ips.ipv4.into(),
nonce,
),
wg_port: registration_data.wg_port,
},
request_id,
@@ -356,7 +358,12 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
v2::response::AuthenticatorResponse::new_pending_registration_success(
v2::registration::RegistrationData {
nonce: registration_data.nonce,
gateway_data: registration_data.gateway_data.into(),
gateway_data: v2::registration::GatewayClient::new(
self.keypair().private_key(),
remote_public.inner(),
private_ips.ipv4.into(),
nonce,
),
wg_port: registration_data.wg_port,
},
request_id,
@@ -371,7 +378,12 @@ impl<S: Storage + Clone + 'static> MixnetListener<S> {
v3::response::AuthenticatorResponse::new_pending_registration_success(
v3::registration::RegistrationData {
nonce: registration_data.nonce,
gateway_data: registration_data.gateway_data.into(),
gateway_data: v3::registration::GatewayClient::new(
self.keypair().private_key(),
remote_public.inner(),
private_ips.ipv4.into(),
nonce,
),
wg_port: registration_data.wg_port,
},
request_id,