Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1bdcf9c3cf | |||
| 4ebb9cd239 | |||
| 620d68ea2f |
@@ -42,8 +42,32 @@ impl PendingSync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct BlockProcessorConfig {
|
||||||
|
pub pruning_options: PruningOptions,
|
||||||
|
pub store_precommits: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for BlockProcessorConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
pruning_options: PruningOptions::nothing(),
|
||||||
|
store_precommits: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockProcessorConfig {
|
||||||
|
pub fn new(pruning_options: PruningOptions, store_precommits: bool) -> Self {
|
||||||
|
Self {
|
||||||
|
pruning_options,
|
||||||
|
store_precommits,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct BlockProcessor {
|
pub struct BlockProcessor {
|
||||||
pruning_options: PruningOptions,
|
config: BlockProcessorConfig,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
synced: Arc<Notify>,
|
synced: Arc<Notify>,
|
||||||
last_processed_height: u32,
|
last_processed_height: u32,
|
||||||
@@ -65,9 +89,10 @@ pub struct BlockProcessor {
|
|||||||
msg_modules: Vec<Box<dyn MsgModule + Send>>,
|
msg_modules: Vec<Box<dyn MsgModule + Send>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
impl BlockProcessor {
|
impl BlockProcessor {
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
pruning_options: PruningOptions,
|
config: BlockProcessorConfig,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
synced: Arc<Notify>,
|
synced: Arc<Notify>,
|
||||||
incoming: UnboundedReceiver<BlockToProcess>,
|
incoming: UnboundedReceiver<BlockToProcess>,
|
||||||
@@ -82,7 +107,7 @@ impl BlockProcessor {
|
|||||||
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
|
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
|
||||||
|
|
||||||
Ok(BlockProcessor {
|
Ok(BlockProcessor {
|
||||||
pruning_options,
|
config,
|
||||||
cancel,
|
cancel,
|
||||||
synced,
|
synced,
|
||||||
last_processed_height,
|
last_processed_height,
|
||||||
@@ -101,7 +126,7 @@ impl BlockProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
|
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
|
||||||
self.pruning_options = pruning_options;
|
self.config.pruning_options = pruning_options;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,7 +153,7 @@ impl BlockProcessor {
|
|||||||
// we won't end up with a corrupted storage.
|
// we won't end up with a corrupted storage.
|
||||||
let mut tx = self.storage.begin_processing_tx().await?;
|
let mut tx = self.storage.begin_processing_tx().await?;
|
||||||
|
|
||||||
persist_block(&full_info, &mut tx).await?;
|
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
|
||||||
|
|
||||||
// let the modules do whatever they want
|
// let the modules do whatever they want
|
||||||
// the ones wanting the full block:
|
// the ones wanting the full block:
|
||||||
@@ -241,7 +266,7 @@ impl BlockProcessor {
|
|||||||
|
|
||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
|
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||||
let last_to_keep = self.last_processed_height - keep_recent;
|
let last_to_keep = self.last_processed_height - keep_recent;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
@@ -282,12 +307,12 @@ impl BlockProcessor {
|
|||||||
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
|
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||||
debug!("checking for storage pruning");
|
debug!("checking for storage pruning");
|
||||||
|
|
||||||
if self.pruning_options.strategy.is_nothing() {
|
if self.config.pruning_options.strategy.is_nothing() {
|
||||||
trace!("the current pruning strategy is 'nothing'");
|
trace!("the current pruning strategy is 'nothing'");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let interval = self.pruning_options.strategy_interval();
|
let interval = self.config.pruning_options.strategy_interval();
|
||||||
if self.last_pruned_height + interval <= self.last_processed_height {
|
if self.last_pruned_height + interval <= self.last_processed_height {
|
||||||
self.prune_storage().await?;
|
self.prune_storage().await?;
|
||||||
}
|
}
|
||||||
@@ -371,7 +396,7 @@ impl BlockProcessor {
|
|||||||
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
|
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
|
||||||
// in case we were offline for a while,
|
// in case we were offline for a while,
|
||||||
// make sure we don't request blocks we'd have to prune anyway
|
// make sure we don't request blocks we'd have to prune anyway
|
||||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||||
let last_to_keep = latest_block - keep_recent;
|
let last_to_keep = latest_block - keep_recent;
|
||||||
self.last_processed_height = max(self.last_processed_height, last_to_keep);
|
self.last_processed_height = max(self.last_processed_height, last_to_keep);
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use crate::block_processor::types::BlockToProcess;
|
use crate::block_processor::types::BlockToProcess;
|
||||||
use crate::block_processor::BlockProcessor;
|
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
|
||||||
use crate::block_requester::{BlockRequest, BlockRequester};
|
use crate::block_requester::{BlockRequest, BlockRequester};
|
||||||
use crate::error::ScraperError;
|
use crate::error::ScraperError;
|
||||||
use crate::modules::{BlockModule, MsgModule, TxModule};
|
use crate::modules::{BlockModule, MsgModule, TxModule};
|
||||||
@@ -34,6 +34,8 @@ pub struct Config {
|
|||||||
pub database_path: PathBuf,
|
pub database_path: PathBuf,
|
||||||
|
|
||||||
pub pruning_options: PruningOptions,
|
pub pruning_options: PruningOptions,
|
||||||
|
|
||||||
|
pub store_precommits: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NyxdScraperBuilder {
|
pub struct NyxdScraperBuilder {
|
||||||
@@ -60,8 +62,14 @@ impl NyxdScraperBuilder {
|
|||||||
req_rx,
|
req_rx,
|
||||||
processing_tx.clone(),
|
processing_tx.clone(),
|
||||||
);
|
);
|
||||||
let mut block_processor = BlockProcessor::new(
|
|
||||||
|
let block_processor_config = BlockProcessorConfig::new(
|
||||||
scraper.config.pruning_options,
|
scraper.config.pruning_options,
|
||||||
|
scraper.config.store_precommits,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut block_processor = BlockProcessor::new(
|
||||||
|
block_processor_config,
|
||||||
scraper.cancel_token.clone(),
|
scraper.cancel_token.clone(),
|
||||||
scraper.startup_sync.clone(),
|
scraper.startup_sync.clone(),
|
||||||
processing_rx,
|
processing_rx,
|
||||||
@@ -275,8 +283,11 @@ impl NyxdScraper {
|
|||||||
req_tx: Sender<BlockRequest>,
|
req_tx: Sender<BlockRequest>,
|
||||||
processing_rx: UnboundedReceiver<BlockToProcess>,
|
processing_rx: UnboundedReceiver<BlockToProcess>,
|
||||||
) -> Result<BlockProcessor, ScraperError> {
|
) -> Result<BlockProcessor, ScraperError> {
|
||||||
|
let block_processor_config =
|
||||||
|
BlockProcessorConfig::new(self.config.pruning_options, self.config.store_precommits);
|
||||||
|
|
||||||
BlockProcessor::new(
|
BlockProcessor::new(
|
||||||
self.config.pruning_options,
|
block_processor_config,
|
||||||
self.cancel_token.clone(),
|
self.cancel_token.clone(),
|
||||||
self.startup_sync.clone(),
|
self.startup_sync.clone(),
|
||||||
processing_rx,
|
processing_rx,
|
||||||
|
|||||||
@@ -212,6 +212,7 @@ impl ScraperStorage {
|
|||||||
pub async fn persist_block(
|
pub async fn persist_block(
|
||||||
block: &FullBlockInformation,
|
block: &FullBlockInformation,
|
||||||
tx: &mut StorageTransaction,
|
tx: &mut StorageTransaction,
|
||||||
|
store_precommits: bool,
|
||||||
) -> Result<(), ScraperError> {
|
) -> Result<(), ScraperError> {
|
||||||
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
|
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
|
||||||
|
|
||||||
@@ -224,11 +225,12 @@ pub async fn persist_block(
|
|||||||
// persist block data
|
// persist block data
|
||||||
persist_block_data(&block.block, total_gas, tx).await?;
|
persist_block_data(&block.block, total_gas, tx).await?;
|
||||||
|
|
||||||
// persist commits
|
if store_precommits {
|
||||||
if let Some(commit) = &block.block.last_commit {
|
if let Some(commit) = &block.block.last_commit {
|
||||||
persist_commits(commit, &block.validators, tx).await?;
|
persist_commits(commit, &block.validators, tx).await?;
|
||||||
} else {
|
} else {
|
||||||
warn!("no commits for block {}", block.block.header.height)
|
warn!("no commits for block {}", block.block.header.height)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// persist txs
|
// persist txs
|
||||||
|
|||||||
@@ -107,6 +107,7 @@ impl Config {
|
|||||||
nyxd_scraper: NyxdScraper {
|
nyxd_scraper: NyxdScraper {
|
||||||
websocket_url,
|
websocket_url,
|
||||||
pruning: Default::default(),
|
pruning: Default::default(),
|
||||||
|
store_precommits: true,
|
||||||
},
|
},
|
||||||
base: Base {
|
base: Base {
|
||||||
upstream_nyxd: nyxd_url,
|
upstream_nyxd: nyxd_url,
|
||||||
@@ -122,6 +123,7 @@ impl Config {
|
|||||||
rpc_url: self.base.upstream_nyxd.clone(),
|
rpc_url: self.base.upstream_nyxd.clone(),
|
||||||
database_path: self.storage_paths.nyxd_scraper.clone(),
|
database_path: self.storage_paths.nyxd_scraper.clone(),
|
||||||
pruning_options: self.nyxd_scraper.pruning,
|
pruning_options: self.nyxd_scraper.pruning,
|
||||||
|
store_precommits: self.nyxd_scraper.store_precommits,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -249,7 +251,14 @@ pub struct NyxdScraper {
|
|||||||
// if the value is missing, use `nothing` pruning as this was the past behaviour
|
// if the value is missing, use `nothing` pruning as this was the past behaviour
|
||||||
#[serde(default = "PruningOptions::nothing")]
|
#[serde(default = "PruningOptions::nothing")]
|
||||||
pub pruning: PruningOptions,
|
pub pruning: PruningOptions,
|
||||||
// TODO: debug with everything that's currently hardcoded in the scraper
|
|
||||||
|
/// Specifies whether to store pre-commits within the database.
|
||||||
|
#[serde(default = "default_store_precommits")]
|
||||||
|
pub store_precommits: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_store_precommits() -> bool {
|
||||||
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NyxdScraper {
|
impl NyxdScraper {
|
||||||
|
|||||||
Reference in New Issue
Block a user