nyxd-scraper: add optional starting height parameter to scrape before listening for new blocks
This commit is contained in:
@@ -47,7 +47,10 @@ pub struct NyxdScraperBuilder {
|
||||
}
|
||||
|
||||
impl NyxdScraperBuilder {
|
||||
pub async fn build_and_start(self) -> Result<NyxdScraper, ScraperError> {
|
||||
pub async fn build_and_start(
|
||||
self,
|
||||
start_block: Option<u32>,
|
||||
) -> Result<NyxdScraper, ScraperError> {
|
||||
let scraper = NyxdScraper::new(self.config).await?;
|
||||
|
||||
let (processing_tx, processing_rx) = unbounded_channel();
|
||||
@@ -90,6 +93,10 @@ impl NyxdScraperBuilder {
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(height) = start_block {
|
||||
scraper.process_block_range(Some(height), None).await?;
|
||||
}
|
||||
|
||||
scraper.start_tasks(block_requester, block_processor, chain_subscriber);
|
||||
|
||||
Ok(scraper)
|
||||
@@ -202,10 +209,10 @@ impl NyxdScraper {
|
||||
.await?
|
||||
.with_pruning(PruningOptions::nothing());
|
||||
|
||||
let current_height = self.rpc_client.current_block_height().await? as u32;
|
||||
let mut current_height = self.rpc_client.current_block_height().await? as u32;
|
||||
let last_processed = block_processor.last_process_height();
|
||||
|
||||
let starting_height = match starting_height {
|
||||
let mut starting_height = match starting_height {
|
||||
// always attempt to use whatever the user has provided
|
||||
Some(explicit) => explicit,
|
||||
None => {
|
||||
@@ -219,7 +226,8 @@ impl NyxdScraper {
|
||||
}
|
||||
};
|
||||
|
||||
let end_height = match end_height {
|
||||
let must_catch_up = end_height.is_none();
|
||||
let mut end_height = match end_height {
|
||||
// always attempt to use whatever the user has provided
|
||||
Some(explicit) => explicit,
|
||||
None => {
|
||||
@@ -234,32 +242,62 @@ impl NyxdScraper {
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
starting_height = starting_height,
|
||||
end_height = end_height,
|
||||
"attempting to process block range"
|
||||
);
|
||||
let mut last_processed = starting_height;
|
||||
|
||||
let range = (starting_height..=end_height).collect::<Vec<_>>();
|
||||
while last_processed < current_height {
|
||||
info!(
|
||||
starting_height = starting_height,
|
||||
end_height = end_height,
|
||||
"attempting to process block range"
|
||||
);
|
||||
|
||||
// the most likely bottleneck here are going to be the chain queries,
|
||||
// so batch multiple requests
|
||||
for batch in range.chunks(4) {
|
||||
let batch_result = join_all(
|
||||
batch
|
||||
.iter()
|
||||
.map(|height| self.rpc_client.get_basic_block_details(*height)),
|
||||
)
|
||||
.await;
|
||||
for result in batch_result {
|
||||
match result {
|
||||
Ok(block) => block_processor.process_block(block.into()).await?,
|
||||
Err(err) => {
|
||||
error!("failed to retrieve the block: {err}. stopping...");
|
||||
return Err(err);
|
||||
let range = (starting_height..=end_height).collect::<Vec<_>>();
|
||||
|
||||
// the most likely bottleneck here are going to be the chain queries,
|
||||
// so batch multiple requests
|
||||
for batch in range.chunks(4) {
|
||||
let batch_result = join_all(
|
||||
batch
|
||||
.iter()
|
||||
.map(|height| self.rpc_client.get_basic_block_details(*height)),
|
||||
)
|
||||
.await;
|
||||
for result in batch_result {
|
||||
match result {
|
||||
Ok(block) => block_processor.process_block(block.into()).await?,
|
||||
Err(err) => {
|
||||
error!("failed to retrieve the block: {err}. stopping...");
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if we don't need to catch up, return early
|
||||
if !must_catch_up {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// check if we have caught up to the current block height
|
||||
last_processed = end_height;
|
||||
current_height = self.rpc_client.current_block_height().await? as u32;
|
||||
|
||||
info!(
|
||||
last_processed = last_processed,
|
||||
current_height = current_height,
|
||||
"🏃 still need to catch up..."
|
||||
);
|
||||
|
||||
starting_height = last_processed + 1;
|
||||
end_height = current_height;
|
||||
}
|
||||
|
||||
if must_catch_up {
|
||||
info!(
|
||||
last_processed = last_processed,
|
||||
current_height = current_height,
|
||||
"✅ block processing has caught up!"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -8,6 +8,10 @@ pub(crate) async fn run_chain_scraper() -> anyhow::Result<ScraperStorage> {
|
||||
let websocket_url = reqwest::Url::parse(&websocket_url)?;
|
||||
let rpc_url = reqwest::Url::parse(&rpc_url)?;
|
||||
|
||||
let start_block_height = std::env::var("NYXD_SCRAPER_START_HEIGHT")
|
||||
.ok()
|
||||
.and_then(|value| value.parse::<u32>().ok());
|
||||
|
||||
let scraper = NyxdScraper::builder(Config {
|
||||
websocket_url,
|
||||
rpc_url,
|
||||
@@ -16,7 +20,7 @@ pub(crate) async fn run_chain_scraper() -> anyhow::Result<ScraperStorage> {
|
||||
store_precommits: false,
|
||||
});
|
||||
|
||||
let storage = scraper.build_and_start().await?;
|
||||
let instance = scraper.build_and_start(start_block_height).await?;
|
||||
|
||||
Ok(storage.storage)
|
||||
Ok(instance.storage)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user