chore: add retries for retrieving chain data (#6847)
* chore: add retries for retrieving chain data * added retry backoff
This commit is contained in:
committed by
GitHub
parent
2705330595
commit
52949f825a
@@ -10,6 +10,7 @@ use crate::{Any, MessageRegistry, ParsedTransactionDetails, default_message_regi
|
||||
use futures::StreamExt;
|
||||
use futures::future::join3;
|
||||
use std::collections::BTreeMap;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use tendermint::{Block, Hash};
|
||||
use tendermint_rpc::endpoint::{block, block_results, tx, validators};
|
||||
@@ -18,6 +19,38 @@ use tokio::sync::Mutex;
|
||||
use tracing::{debug, instrument, warn};
|
||||
use url::Url;
|
||||
|
||||
const MAX_QUERY_ATTEMPTS: usize = 3;
|
||||
|
||||
/// Runs `op` up to `max_attempts` times (at least once), returning the first success or, on full
|
||||
/// exhaustion, the last error encountered.
|
||||
async fn query_with_retries<F, Fut, T>(mut max_attempts: usize, op: F) -> Result<T, ScraperError>
|
||||
where
|
||||
F: Fn() -> Fut,
|
||||
Fut: Future<Output = Result<T, ScraperError>>,
|
||||
{
|
||||
if max_attempts == 0 {
|
||||
max_attempts = 1;
|
||||
}
|
||||
|
||||
let mut last_err = None;
|
||||
|
||||
for i in 0..max_attempts {
|
||||
match op().await {
|
||||
Ok(result) => return Ok(result),
|
||||
Err(err) => {
|
||||
debug!("query failed, retrying {}/{max_attempts} - {err}", i + 1);
|
||||
last_err = Some(err);
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(300 * (i as u64 + 1))).await;
|
||||
}
|
||||
|
||||
// SAFETY: max_attempts >= 1, so we only reach here after at least one recorded failure
|
||||
#[allow(clippy::unwrap_used)]
|
||||
Err(last_err.unwrap())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct RetrievalConfig {
|
||||
pub get_validators: bool,
|
||||
@@ -173,13 +206,24 @@ impl RpcClient {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip(self), err(Display))]
|
||||
async fn get_block_results_with_retries(
|
||||
&self,
|
||||
height: u32,
|
||||
max_attempts: usize,
|
||||
) -> Result<block_results::Response, ScraperError> {
|
||||
query_with_retries(max_attempts, || self.get_block_results(height)).await
|
||||
}
|
||||
|
||||
async fn maybe_get_block_results(
|
||||
&self,
|
||||
height: u32,
|
||||
retrieve: bool,
|
||||
) -> Result<Option<block_results::Response>, ScraperError> {
|
||||
if retrieve {
|
||||
self.get_block_results(height).await.map(Some)
|
||||
self.get_block_results_with_retries(height, MAX_QUERY_ATTEMPTS)
|
||||
.await
|
||||
.map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
@@ -219,8 +263,6 @@ impl RpcClient {
|
||||
|
||||
// "Data is just a wrapper for a list of transactions, where transactions are arbitrary byte arrays"
|
||||
// source: https://github.com/tendermint/spec/blob/d46cd7f573a2c6a2399fcab2cde981330aa63f37/spec/core/data_structures.md#data
|
||||
//
|
||||
// I hate that zip as much as you, dear reader, but for some reason the compiler didn't let me remove the `move`
|
||||
futures::stream::iter(
|
||||
raw.iter()
|
||||
.map(tx_hash)
|
||||
@@ -228,12 +270,14 @@ impl RpcClient {
|
||||
.zip(std::iter::repeat(ordered_results.clone())),
|
||||
)
|
||||
.for_each_concurrent(4, |((id, tx_hash), ordered_results)| async move {
|
||||
let res = self.get_transaction_result(tx_hash).await;
|
||||
let res = self
|
||||
.get_transaction_result_with_retries(tx_hash, MAX_QUERY_ATTEMPTS)
|
||||
.await;
|
||||
ordered_results.lock().await.insert(id, res);
|
||||
})
|
||||
.await;
|
||||
|
||||
// safety the futures have completed so we MUST have the only arc reference
|
||||
// safety: the futures have completed so we MUST have the only arc reference
|
||||
#[allow(clippy::unwrap_used)]
|
||||
let inner = Arc::into_inner(ordered_results).unwrap().into_inner();
|
||||
|
||||
@@ -266,6 +310,15 @@ impl RpcClient {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip(self, tx_hash), fields(tx_hash = %tx_hash), err(Display))]
|
||||
async fn get_transaction_result_with_retries(
|
||||
&self,
|
||||
tx_hash: Hash,
|
||||
max_attempts: usize,
|
||||
) -> Result<tx::Response, ScraperError> {
|
||||
query_with_retries(max_attempts, || self.get_transaction_result(tx_hash)).await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn get_validators_details(
|
||||
&self,
|
||||
@@ -282,13 +335,24 @@ impl RpcClient {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip(self), err(Display))]
|
||||
async fn get_validators_details_with_retries(
|
||||
&self,
|
||||
height: u32,
|
||||
max_attempts: usize,
|
||||
) -> Result<validators::Response, ScraperError> {
|
||||
query_with_retries(max_attempts, || self.get_validators_details(height)).await
|
||||
}
|
||||
|
||||
async fn maybe_get_validators_details(
|
||||
&self,
|
||||
height: u32,
|
||||
retrieve: bool,
|
||||
) -> Result<Option<validators::Response>, ScraperError> {
|
||||
if retrieve {
|
||||
self.get_validators_details(height).await.map(Some)
|
||||
self.get_validators_details_with_retries(height, MAX_QUERY_ATTEMPTS)
|
||||
.await
|
||||
.map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user