Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1bdcf9c3cf | |||
| 4ebb9cd239 | |||
| 620d68ea2f |
@@ -42,8 +42,32 @@ impl PendingSync {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockProcessorConfig {
|
||||
pub pruning_options: PruningOptions,
|
||||
pub store_precommits: bool,
|
||||
}
|
||||
|
||||
impl Default for BlockProcessorConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
pruning_options: PruningOptions::nothing(),
|
||||
store_precommits: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockProcessorConfig {
|
||||
pub fn new(pruning_options: PruningOptions, store_precommits: bool) -> Self {
|
||||
Self {
|
||||
pruning_options,
|
||||
store_precommits,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BlockProcessor {
|
||||
pruning_options: PruningOptions,
|
||||
config: BlockProcessorConfig,
|
||||
cancel: CancellationToken,
|
||||
synced: Arc<Notify>,
|
||||
last_processed_height: u32,
|
||||
@@ -65,9 +89,10 @@ pub struct BlockProcessor {
|
||||
msg_modules: Vec<Box<dyn MsgModule + Send>>,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
impl BlockProcessor {
|
||||
pub async fn new(
|
||||
pruning_options: PruningOptions,
|
||||
config: BlockProcessorConfig,
|
||||
cancel: CancellationToken,
|
||||
synced: Arc<Notify>,
|
||||
incoming: UnboundedReceiver<BlockToProcess>,
|
||||
@@ -82,7 +107,7 @@ impl BlockProcessor {
|
||||
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
|
||||
|
||||
Ok(BlockProcessor {
|
||||
pruning_options,
|
||||
config,
|
||||
cancel,
|
||||
synced,
|
||||
last_processed_height,
|
||||
@@ -101,7 +126,7 @@ impl BlockProcessor {
|
||||
}
|
||||
|
||||
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
|
||||
self.pruning_options = pruning_options;
|
||||
self.config.pruning_options = pruning_options;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -128,7 +153,7 @@ impl BlockProcessor {
|
||||
// we won't end up with a corrupted storage.
|
||||
let mut tx = self.storage.begin_processing_tx().await?;
|
||||
|
||||
persist_block(&full_info, &mut tx).await?;
|
||||
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
|
||||
|
||||
// let the modules do whatever they want
|
||||
// the ones wanting the full block:
|
||||
@@ -241,7 +266,7 @@ impl BlockProcessor {
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
||||
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||
let last_to_keep = self.last_processed_height - keep_recent;
|
||||
|
||||
info!(
|
||||
@@ -282,12 +307,12 @@ impl BlockProcessor {
|
||||
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||
debug!("checking for storage pruning");
|
||||
|
||||
if self.pruning_options.strategy.is_nothing() {
|
||||
if self.config.pruning_options.strategy.is_nothing() {
|
||||
trace!("the current pruning strategy is 'nothing'");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let interval = self.pruning_options.strategy_interval();
|
||||
let interval = self.config.pruning_options.strategy_interval();
|
||||
if self.last_pruned_height + interval <= self.last_processed_height {
|
||||
self.prune_storage().await?;
|
||||
}
|
||||
@@ -371,7 +396,7 @@ impl BlockProcessor {
|
||||
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
|
||||
// in case we were offline for a while,
|
||||
// make sure we don't request blocks we'd have to prune anyway
|
||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
||||
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||
let last_to_keep = latest_block - keep_recent;
|
||||
self.last_processed_height = max(self.last_processed_height, last_to_keep);
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::block_processor::types::BlockToProcess;
|
||||
use crate::block_processor::BlockProcessor;
|
||||
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
|
||||
use crate::block_requester::{BlockRequest, BlockRequester};
|
||||
use crate::error::ScraperError;
|
||||
use crate::modules::{BlockModule, MsgModule, TxModule};
|
||||
@@ -34,6 +34,8 @@ pub struct Config {
|
||||
pub database_path: PathBuf,
|
||||
|
||||
pub pruning_options: PruningOptions,
|
||||
|
||||
pub store_precommits: bool,
|
||||
}
|
||||
|
||||
pub struct NyxdScraperBuilder {
|
||||
@@ -60,8 +62,14 @@ impl NyxdScraperBuilder {
|
||||
req_rx,
|
||||
processing_tx.clone(),
|
||||
);
|
||||
let mut block_processor = BlockProcessor::new(
|
||||
|
||||
let block_processor_config = BlockProcessorConfig::new(
|
||||
scraper.config.pruning_options,
|
||||
scraper.config.store_precommits,
|
||||
);
|
||||
|
||||
let mut block_processor = BlockProcessor::new(
|
||||
block_processor_config,
|
||||
scraper.cancel_token.clone(),
|
||||
scraper.startup_sync.clone(),
|
||||
processing_rx,
|
||||
@@ -275,8 +283,11 @@ impl NyxdScraper {
|
||||
req_tx: Sender<BlockRequest>,
|
||||
processing_rx: UnboundedReceiver<BlockToProcess>,
|
||||
) -> Result<BlockProcessor, ScraperError> {
|
||||
let block_processor_config =
|
||||
BlockProcessorConfig::new(self.config.pruning_options, self.config.store_precommits);
|
||||
|
||||
BlockProcessor::new(
|
||||
self.config.pruning_options,
|
||||
block_processor_config,
|
||||
self.cancel_token.clone(),
|
||||
self.startup_sync.clone(),
|
||||
processing_rx,
|
||||
|
||||
@@ -212,6 +212,7 @@ impl ScraperStorage {
|
||||
pub async fn persist_block(
|
||||
block: &FullBlockInformation,
|
||||
tx: &mut StorageTransaction,
|
||||
store_precommits: bool,
|
||||
) -> Result<(), ScraperError> {
|
||||
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
|
||||
|
||||
@@ -224,11 +225,12 @@ pub async fn persist_block(
|
||||
// persist block data
|
||||
persist_block_data(&block.block, total_gas, tx).await?;
|
||||
|
||||
// persist commits
|
||||
if let Some(commit) = &block.block.last_commit {
|
||||
persist_commits(commit, &block.validators, tx).await?;
|
||||
} else {
|
||||
warn!("no commits for block {}", block.block.header.height)
|
||||
if store_precommits {
|
||||
if let Some(commit) = &block.block.last_commit {
|
||||
persist_commits(commit, &block.validators, tx).await?;
|
||||
} else {
|
||||
warn!("no commits for block {}", block.block.header.height)
|
||||
}
|
||||
}
|
||||
|
||||
// persist txs
|
||||
|
||||
@@ -107,6 +107,7 @@ impl Config {
|
||||
nyxd_scraper: NyxdScraper {
|
||||
websocket_url,
|
||||
pruning: Default::default(),
|
||||
store_precommits: true,
|
||||
},
|
||||
base: Base {
|
||||
upstream_nyxd: nyxd_url,
|
||||
@@ -122,6 +123,7 @@ impl Config {
|
||||
rpc_url: self.base.upstream_nyxd.clone(),
|
||||
database_path: self.storage_paths.nyxd_scraper.clone(),
|
||||
pruning_options: self.nyxd_scraper.pruning,
|
||||
store_precommits: self.nyxd_scraper.store_precommits,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -249,7 +251,14 @@ pub struct NyxdScraper {
|
||||
// if the value is missing, use `nothing` pruning as this was the past behaviour
|
||||
#[serde(default = "PruningOptions::nothing")]
|
||||
pub pruning: PruningOptions,
|
||||
// TODO: debug with everything that's currently hardcoded in the scraper
|
||||
|
||||
/// Specifies whether to store pre-commits within the database.
|
||||
#[serde(default = "default_store_precommits")]
|
||||
pub store_precommits: bool,
|
||||
}
|
||||
|
||||
fn default_store_precommits() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
impl NyxdScraper {
|
||||
|
||||
Reference in New Issue
Block a user