Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e28f5809ff | |||
| 598a9341a4 |
Generated
+1
-1
@@ -6445,7 +6445,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nym-data-observatory"
|
||||
version = "1.0.1"
|
||||
version = "1.0.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
|
||||
@@ -119,6 +119,22 @@ where
|
||||
Ok(scraper)
|
||||
}
|
||||
|
||||
pub async fn build_unsafe(self) -> Result<NyxdScraper<S>, ScraperError> {
|
||||
self.config.pruning_options.validate()?;
|
||||
let storage =
|
||||
S::initialise(&self.config.database_storage, &self.config.run_migrations).await?;
|
||||
let rpc_client = RpcClient::new(&self.config.rpc_url)?;
|
||||
|
||||
Ok(NyxdScraper {
|
||||
config: self.config,
|
||||
task_tracker: TaskTracker::new(),
|
||||
cancel_token: CancellationToken::new(),
|
||||
startup_sync: Arc::new(Default::default()),
|
||||
storage,
|
||||
rpc_client,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn new(config: Config) -> Self {
|
||||
NyxdScraperBuilder {
|
||||
_storage: PhantomData,
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nym-data-observatory"
|
||||
version = "1.0.1"
|
||||
version = "1.0.2"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
@@ -6,6 +6,32 @@ Collects data about the Nym network including:
|
||||
- **Price scraper** - to get the NYM/USD token price from CoinGecko
|
||||
- **Webhooks** - trigger on messages or all messages to call with details
|
||||
|
||||
## Processing historical data or reprocessing block ranges
|
||||
|
||||
You can run the scraper to operate on a range of blocks, without listening for new blocks. This is useful for:
|
||||
|
||||
- processing historical data
|
||||
- reprocessing block ranges
|
||||
|
||||
Run this as follows to process blocks starting at 1000 and ending at 2000:
|
||||
|
||||
```bash
|
||||
./nym_data_observatory process --start 1000 --end 2000
|
||||
```
|
||||
|
||||
Or to process a single block:
|
||||
|
||||
```bash
|
||||
./nym_data_observatory process --start 1000
|
||||
```
|
||||
|
||||
Or to process 10 blocks starting at block 1000:
|
||||
|
||||
```bash
|
||||
./nym_data_observatory process --start 1000 --blocks 10
|
||||
```
|
||||
|
||||
|
||||
## Running locally
|
||||
|
||||
### 1. Install Prerequisites
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::cli::commands::run::Args;
|
||||
use crate::db::DbPool;
|
||||
use nyxd_scraper_psql::{PostgresNyxdScraper, PruningOptions};
|
||||
use tracing::info;
|
||||
@@ -6,7 +5,7 @@ use tracing::info;
|
||||
pub(crate) mod webhook;
|
||||
|
||||
pub(crate) async fn run_chain_scraper(
|
||||
args: Args,
|
||||
args: crate::cli::commands::run::Args,
|
||||
config: &crate::config::Config,
|
||||
connection_pool: DbPool,
|
||||
) -> anyhow::Result<PostgresNyxdScraper> {
|
||||
@@ -19,8 +18,8 @@ pub(crate) async fn run_chain_scraper(
|
||||
.expect("no database connection string set in config");
|
||||
|
||||
let scraper = PostgresNyxdScraper::builder(nyxd_scraper_psql::Config {
|
||||
websocket_url: args.websocket_url,
|
||||
rpc_url: args.rpc_url,
|
||||
websocket_url: args.websocket_url,
|
||||
database_storage,
|
||||
pruning_options: PruningOptions::nothing(),
|
||||
store_precommits: false,
|
||||
@@ -40,3 +39,41 @@ pub(crate) async fn run_chain_scraper(
|
||||
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
pub(crate) async fn process_chain_scraper(
|
||||
args: crate::cli::commands::process::Args,
|
||||
config: &crate::config::Config,
|
||||
connection_pool: DbPool,
|
||||
start_block_height: u32,
|
||||
end_block_height: u32,
|
||||
) -> anyhow::Result<PostgresNyxdScraper> {
|
||||
let database_storage = config
|
||||
.chain_scraper_connection_string
|
||||
.clone()
|
||||
.and(args.db_connection_string)
|
||||
.expect("no database connection string set in config");
|
||||
|
||||
let scraper = PostgresNyxdScraper::builder(nyxd_scraper_psql::Config {
|
||||
rpc_url: args.rpc_url,
|
||||
websocket_url: args.websocket_url,
|
||||
database_storage,
|
||||
pruning_options: PruningOptions::nothing(),
|
||||
store_precommits: false,
|
||||
start_block: nyxd_scraper_psql::StartingBlockOpts {
|
||||
start_block_height: Some(start_block_height),
|
||||
use_best_effort_start_height: false,
|
||||
},
|
||||
run_migrations: false, // ignore the base migrations
|
||||
})
|
||||
.with_msg_module(crate::modules::wasm::WasmModule::new(connection_pool))
|
||||
.with_tx_module(webhook::WebhookModule::new(config.clone())?);
|
||||
|
||||
let instance = scraper.build_unsafe().await?;
|
||||
|
||||
info!("🏃♂️ processing blocks from {start_block_height} to {end_block_height}...");
|
||||
instance
|
||||
.unsafe_process_block_range(Some(start_block_height), Some(end_block_height))
|
||||
.await?;
|
||||
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub(crate) mod build_info;
|
||||
pub(crate) mod init;
|
||||
pub(crate) mod process;
|
||||
pub(crate) mod run;
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::env::vars::*;
|
||||
use url::Url;
|
||||
|
||||
#[derive(clap::Args, Debug, Clone)]
|
||||
pub(crate) struct Args {
|
||||
#[arg(long, alias = "start")]
|
||||
pub(crate) start_block_height: u32,
|
||||
|
||||
#[arg(long, alias = "end")]
|
||||
pub(crate) end_block_height: Option<u32>,
|
||||
|
||||
#[arg(long, alias = "blocks")]
|
||||
pub(crate) blocks_to_process: Option<u32>,
|
||||
|
||||
#[arg(long, env = NYM_DATA_OBSERVATORY_DB_URL, alias = "db_url")]
|
||||
pub(crate) db_connection_string: Option<String>,
|
||||
|
||||
#[arg(long, env = NYXD_WS, alias = "nyxd_ws", default_value = "wss://rpc.nymtech.net/websocket")]
|
||||
pub(crate) websocket_url: Url,
|
||||
|
||||
#[arg(long, env = NYXD, alias = "nyxd", default_value = "https://rpc.nymtech.net")]
|
||||
pub(crate) rpc_url: Url,
|
||||
|
||||
/// (Override) Watch for chain messages of these types
|
||||
#[clap(
|
||||
long,
|
||||
value_delimiter = ',',
|
||||
env = NYM_DATA_OBSERVATORY_WATCH_CHAIN_MESSAGE_TYPES
|
||||
)]
|
||||
pub watch_for_chain_message_types: Vec<String>,
|
||||
|
||||
/// (Override) The webhook to call when we find something
|
||||
#[clap(
|
||||
long,
|
||||
env = NYM_DATA_OBSERVATORY_WEBHOOK_URL
|
||||
)]
|
||||
pub webhook_url: Option<Url>,
|
||||
|
||||
/// (Override) Optionally, authenticate with the webhook
|
||||
#[clap(
|
||||
long,
|
||||
env = NYM_DATA_OBSERVATORY_WEBHOOK_AUTH
|
||||
)]
|
||||
pub webhook_auth: Option<String>,
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::chain_scraper::process_chain_scraper;
|
||||
pub(crate) use crate::cli::commands::process::args::Args;
|
||||
use crate::cli::commands::run::wait_for_shutdown;
|
||||
use crate::db;
|
||||
use crate::error::NymDataObservatoryError;
|
||||
use nym_task::wait_for_signal;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
|
||||
mod args;
|
||||
|
||||
pub(crate) async fn execute(args: Args) -> Result<(), NymDataObservatoryError> {
|
||||
let start = OffsetDateTime::now_utc();
|
||||
let scraper_args = args.clone();
|
||||
|
||||
let run_args = crate::cli::commands::run::args::Args {
|
||||
rpc_url: args.rpc_url.clone(),
|
||||
websocket_url: args.websocket_url.clone(),
|
||||
start_block_height: Some(args.start_block_height),
|
||||
db_connection_string: args.db_connection_string,
|
||||
webhook_url: args.webhook_url.clone(),
|
||||
watch_for_chain_message_types: args.watch_for_chain_message_types,
|
||||
webhook_auth: args.webhook_auth.clone(),
|
||||
};
|
||||
|
||||
let config = crate::cli::commands::run::config::get_run_config(run_args.clone())?;
|
||||
|
||||
let db_connection_string = config.chain_scraper_connection_string();
|
||||
|
||||
let start_block_height = args.start_block_height;
|
||||
let end_block_height = args
|
||||
.end_block_height
|
||||
.unwrap_or(args.start_block_height + (args.blocks_to_process.unwrap_or(1u32) - 1u32));
|
||||
|
||||
info!("nyxd rpc: {}", args.rpc_url.to_string());
|
||||
info!("start_block_height: {:#?}", start_block_height);
|
||||
info!("end_block_height: {:#?}", end_block_height);
|
||||
info!("blocks_to_process: {:#?}", args.blocks_to_process);
|
||||
|
||||
let storage = db::Storage::init(db_connection_string).await?;
|
||||
|
||||
let tasks = JoinSet::new();
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let scraper_pool = storage.pool_owned();
|
||||
let shutdown_pool = storage.pool_owned();
|
||||
|
||||
// start the blocks processing in the background, that can be cancelled by the user
|
||||
let cancel_after_processing = cancellation_token.clone();
|
||||
let scraper_token_handle: JoinHandle<anyhow::Result<CancellationToken>> = tokio::spawn({
|
||||
let config = config.clone();
|
||||
async move {
|
||||
// this only blocks until startup sync is done; it then runs on its own set of tasks
|
||||
let scraper = process_chain_scraper(
|
||||
scraper_args,
|
||||
&config,
|
||||
scraper_pool,
|
||||
start_block_height,
|
||||
end_block_height,
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("⏰ shutting down...");
|
||||
cancel_after_processing.cancel();
|
||||
|
||||
Ok(scraper.cancel_token())
|
||||
}
|
||||
});
|
||||
|
||||
// wait for either shutdown or scraper having finished processing the block range
|
||||
tokio::select! {
|
||||
_ = wait_for_signal() => {
|
||||
info!("received shutdown signal while waiting for scraper to finish its startup");
|
||||
return Ok(())
|
||||
}
|
||||
scraper_token = scraper_token_handle => {
|
||||
let scraper_token = match scraper_token {
|
||||
Ok(Ok(token)) => token,
|
||||
Ok(Err(startup_err)) => {
|
||||
error!("failed to startup the chain scraper: {startup_err}");
|
||||
return Err(startup_err.into());
|
||||
}
|
||||
Err(runtime_err) => {
|
||||
error!("failed to finish the scraper startup task: {runtime_err}");
|
||||
return Ok(())
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
wait_for_shutdown(shutdown_pool, start, cancellation_token, scraper_token, tasks).await
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -9,8 +9,8 @@ use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
|
||||
mod args;
|
||||
mod config;
|
||||
pub(crate) mod args;
|
||||
pub(crate) mod config;
|
||||
|
||||
use crate::chain_scraper::run_chain_scraper;
|
||||
use crate::db::DbPool;
|
||||
@@ -40,7 +40,7 @@ async fn try_insert_startup_information(
|
||||
.inspect_err(|err| error!("failed to insert run information: {err}"));
|
||||
}
|
||||
|
||||
async fn wait_for_shutdown(
|
||||
pub(crate) async fn wait_for_shutdown(
|
||||
db_pool: DbPool,
|
||||
start: OffsetDateTime,
|
||||
main_cancellation_token: CancellationToken,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::cli::commands::{build_info, init, run};
|
||||
use crate::cli::commands::{build_info, init, process, run};
|
||||
use crate::env::vars::*;
|
||||
use crate::error::NymDataObservatoryError;
|
||||
use clap::{Parser, Subcommand};
|
||||
@@ -50,6 +50,7 @@ impl Cli {
|
||||
Commands::BuildInfo(args) => build_info::execute(args),
|
||||
Commands::Run(args) => run::execute(*args, self.http_port).await,
|
||||
Commands::Init(args) => init::execute(args).await,
|
||||
Commands::Process(args) => process::execute(*args).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -59,6 +60,9 @@ pub(crate) enum Commands {
|
||||
/// Show build information of this binary
|
||||
BuildInfo(build_info::Args),
|
||||
|
||||
/// Process or re-process a fixed block range
|
||||
Process(Box<process::Args>),
|
||||
|
||||
/// Start this nym-chain-watcher
|
||||
Run(Box<run::Args>),
|
||||
|
||||
|
||||
Reference in New Issue
Block a user