credential proxy pool (#6726)

* fix?

* version

* unit test

* additional logs for stalled deposits

---------

Co-authored-by: benedettadavico <benedettadavico@users.noreply.github.com>
Co-authored-by: Jędrzej Stuczyński <jedrzej.stuczynski@gmail.com>
This commit is contained in:
benedetta davico
2026-05-01 10:21:28 +02:00
committed by GitHub
parent d0f2c08cd1
commit d23a42f7f5
3 changed files with 76 additions and 8 deletions
@@ -13,8 +13,9 @@ use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, warn};
use tracing::{debug, error, info, instrument, warn};
use uuid::Uuid;
pub use helpers::{BufferedDeposit, PerformedDeposits, make_deposits_request, split_deposits};
@@ -146,9 +147,14 @@ impl DepositsBuffer {
// if we're here, we know we're below the threshold
fn maybe_refill_deposits(&self) {
if let Some(mut guard) = self.inner.deposits_refill_task.try_get_new_task_guard() {
if let Some((mut guard, completion_guard)) =
self.inner.deposits_refill_task.try_get_new_task_guard()
{
let this = self.clone();
*guard = Some(tokio::spawn(async move { this.refill_deposits().await }));
*guard = Some(tokio::spawn(async move {
let _completion_guard = completion_guard;
this.refill_deposits().await
}));
}
}
@@ -179,6 +185,8 @@ impl DepositsBuffer {
requested_on: OffsetDateTime,
client_pubkey: PublicKeyUser,
) -> Result<BufferedDeposit, CredentialProxyError> {
let wait_start = Instant::now();
let mut i = 0;
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
if let Some(buffered_deposit) = self.inner.unused_deposits.lock().await.pop() {
@@ -195,6 +203,15 @@ impl DepositsBuffer {
// make sure there's always a task working in the background in case deposits get used up too quickly
self.maybe_refill_deposits()
}
i += 1;
let elapsed = wait_start.elapsed();
if elapsed > Duration::from_secs(5) && i % 10 == 0 {
warn!("we've been waiting for over 5s to make a deposit - something is wrong!")
} else if elapsed > Duration::from_secs(10) && i % 5 == 0 {
error!(
"we've been waiting for over 10s to make a deposit - something is SERIOUSLY wrong!"
)
}
}
}
@@ -3,12 +3,22 @@
use crate::error::CredentialProxyError;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Mutex as StdMutex, MutexGuard};
use std::sync::{Arc, Mutex as StdMutex, MutexGuard};
use tokio::task::JoinHandle;
use tracing::{debug, error};
pub(super) type RefillTaskResult = Result<(), CredentialProxyError>;
pub(super) struct InProgressGuard {
in_progress: Arc<AtomicBool>,
}
impl Drop for InProgressGuard {
fn drop(&mut self) {
self.in_progress.store(false, Ordering::SeqCst);
}
}
#[derive(Default)]
pub(super) struct RefillTask {
// note that we can only have a single transaction in progress (or it'd mess up with our sequence numbers)
@@ -16,7 +26,7 @@ pub(super) struct RefillTask {
// we'll have to increase the number of deposits per transaction
join_handle: StdMutex<Option<JoinHandle<RefillTaskResult>>>,
in_progress: AtomicBool,
in_progress: Arc<AtomicBool>,
}
impl RefillTask {
@@ -28,9 +38,15 @@ impl RefillTask {
.is_ok()
}
/// Returns `None` if a refill is already in progress. On success, returns the
/// join-handle guard (to store the new `JoinHandle` into) and an [`InProgressGuard`]
/// that **must be moved into the spawned task** — it resets the flag when dropped.
pub(super) fn try_get_new_task_guard(
&self,
) -> Option<MutexGuard<'_, Option<JoinHandle<RefillTaskResult>>>> {
) -> Option<(
MutexGuard<'_, Option<JoinHandle<RefillTaskResult>>>,
InProgressGuard,
)> {
// sanity check for concurrent request
if !self.try_set_in_progress() {
debug!("another task has already started deposit refill request");
@@ -48,7 +64,11 @@ impl RefillTask {
}
}
Some(guard)
let completion_guard = InProgressGuard {
in_progress: Arc::clone(&self.in_progress),
};
Some((guard, completion_guard))
}
pub(super) fn take_task_join_handle(&self) -> Option<JoinHandle<RefillTaskResult>> {
@@ -56,3 +76,34 @@ impl RefillTask {
self.join_handle.lock().expect("mutex got poisoned").take()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn in_progress_resets_after_guard_drop() {
let task = RefillTask::default();
let (guard, completion_guard) = task.try_get_new_task_guard().unwrap();
drop(guard);
assert!(task.try_get_new_task_guard().is_none());
drop(completion_guard);
assert!(task.try_get_new_task_guard().is_some());
}
#[test]
fn in_progress_resets_on_panic() {
let task = RefillTask::default();
let (guard, completion_guard) = task.try_get_new_task_guard().unwrap();
drop(guard);
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _g = completion_guard;
panic!("simulated refill task panic");
}));
assert!(task.try_get_new_task_guard().is_some());
}
}
@@ -1,6 +1,6 @@
[package]
name = "nym-credential-proxy"
version = "0.3.1"
version = "0.3.2-rc"
authors.workspace = true
edition.workspace = true
license.workspace = true