Compare commits

...

2 Commits

Author SHA1 Message Date
Mark Sinclair e28f5809ff nym-data-observatory: bump version 2026-03-17 15:46:44 +00:00
Mark Sinclair 598a9341a4 nym-data-observatory: add CLI command to process blocks from a range 2026-03-17 15:26:56 +00:00
10 changed files with 241 additions and 9 deletions
Generated
+1 -1
View File
@@ -6445,7 +6445,7 @@ dependencies = [
[[package]]
name = "nym-data-observatory"
version = "1.0.1"
version = "1.0.2"
dependencies = [
"anyhow",
"async-trait",
+16
View File
@@ -119,6 +119,22 @@ where
Ok(scraper)
}
pub async fn build_unsafe(self) -> Result<NyxdScraper<S>, ScraperError> {
self.config.pruning_options.validate()?;
let storage =
S::initialise(&self.config.database_storage, &self.config.run_migrations).await?;
let rpc_client = RpcClient::new(&self.config.rpc_url)?;
Ok(NyxdScraper {
config: self.config,
task_tracker: TaskTracker::new(),
cancel_token: CancellationToken::new(),
startup_sync: Arc::new(Default::default()),
storage,
rpc_client,
})
}
pub fn new(config: Config) -> Self {
NyxdScraperBuilder {
_storage: PhantomData,
+1 -1
View File
@@ -3,7 +3,7 @@
[package]
name = "nym-data-observatory"
version = "1.0.1"
version = "1.0.2"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
+26
View File
@@ -6,6 +6,32 @@ Collects data about the Nym network including:
- **Price scraper** - to get the NYM/USD token price from CoinGecko
- **Webhooks** - trigger on messages or all messages to call with details
## Processing historical data or reprocessing block ranges
You can run the scraper to operate on a range of blocks, without listening for new blocks. This is useful for:
- processing historical data
- reprocessing block ranges
Run this as follows to process blocks starting at 1000 and ending at 2000:
```bash
./nym_data_observatory process --start 1000 --end 2000
```
Or to process a single block:
```bash
./nym_data_observatory process --start 1000
```
Or to process 10 blocks starting at block 1000:
```bash
./nym_data_observatory process --start 1000 --blocks 10
```
## Running locally
### 1. Install Prerequisites
+40 -3
View File
@@ -1,4 +1,3 @@
use crate::cli::commands::run::Args;
use crate::db::DbPool;
use nyxd_scraper_psql::{PostgresNyxdScraper, PruningOptions};
use tracing::info;
@@ -6,7 +5,7 @@ use tracing::info;
pub(crate) mod webhook;
pub(crate) async fn run_chain_scraper(
args: Args,
args: crate::cli::commands::run::Args,
config: &crate::config::Config,
connection_pool: DbPool,
) -> anyhow::Result<PostgresNyxdScraper> {
@@ -19,8 +18,8 @@ pub(crate) async fn run_chain_scraper(
.expect("no database connection string set in config");
let scraper = PostgresNyxdScraper::builder(nyxd_scraper_psql::Config {
websocket_url: args.websocket_url,
rpc_url: args.rpc_url,
websocket_url: args.websocket_url,
database_storage,
pruning_options: PruningOptions::nothing(),
store_precommits: false,
@@ -40,3 +39,41 @@ pub(crate) async fn run_chain_scraper(
Ok(instance)
}
pub(crate) async fn process_chain_scraper(
args: crate::cli::commands::process::Args,
config: &crate::config::Config,
connection_pool: DbPool,
start_block_height: u32,
end_block_height: u32,
) -> anyhow::Result<PostgresNyxdScraper> {
let database_storage = config
.chain_scraper_connection_string
.clone()
.and(args.db_connection_string)
.expect("no database connection string set in config");
let scraper = PostgresNyxdScraper::builder(nyxd_scraper_psql::Config {
rpc_url: args.rpc_url,
websocket_url: args.websocket_url,
database_storage,
pruning_options: PruningOptions::nothing(),
store_precommits: false,
start_block: nyxd_scraper_psql::StartingBlockOpts {
start_block_height: Some(start_block_height),
use_best_effort_start_height: false,
},
run_migrations: false, // ignore the base migrations
})
.with_msg_module(crate::modules::wasm::WasmModule::new(connection_pool))
.with_tx_module(webhook::WebhookModule::new(config.clone())?);
let instance = scraper.build_unsafe().await?;
info!("🏃‍♂️ processing blocks from {start_block_height} to {end_block_height}...");
instance
.unsafe_process_block_range(Some(start_block_height), Some(end_block_height))
.await?;
Ok(instance)
}
@@ -1,3 +1,4 @@
pub(crate) mod build_info;
pub(crate) mod init;
pub(crate) mod process;
pub(crate) mod run;
@@ -0,0 +1,48 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::env::vars::*;
use url::Url;
#[derive(clap::Args, Debug, Clone)]
pub(crate) struct Args {
#[arg(long, alias = "start")]
pub(crate) start_block_height: u32,
#[arg(long, alias = "end")]
pub(crate) end_block_height: Option<u32>,
#[arg(long, alias = "blocks")]
pub(crate) blocks_to_process: Option<u32>,
#[arg(long, env = NYM_DATA_OBSERVATORY_DB_URL, alias = "db_url")]
pub(crate) db_connection_string: Option<String>,
#[arg(long, env = NYXD_WS, alias = "nyxd_ws", default_value = "wss://rpc.nymtech.net/websocket")]
pub(crate) websocket_url: Url,
#[arg(long, env = NYXD, alias = "nyxd", default_value = "https://rpc.nymtech.net")]
pub(crate) rpc_url: Url,
/// (Override) Watch for chain messages of these types
#[clap(
long,
value_delimiter = ',',
env = NYM_DATA_OBSERVATORY_WATCH_CHAIN_MESSAGE_TYPES
)]
pub watch_for_chain_message_types: Vec<String>,
/// (Override) The webhook to call when we find something
#[clap(
long,
env = NYM_DATA_OBSERVATORY_WEBHOOK_URL
)]
pub webhook_url: Option<Url>,
/// (Override) Optionally, authenticate with the webhook
#[clap(
long,
env = NYM_DATA_OBSERVATORY_WEBHOOK_AUTH
)]
pub webhook_auth: Option<String>,
}
@@ -0,0 +1,100 @@
// Copyright 2026 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::chain_scraper::process_chain_scraper;
pub(crate) use crate::cli::commands::process::args::Args;
use crate::cli::commands::run::wait_for_shutdown;
use crate::db;
use crate::error::NymDataObservatoryError;
use nym_task::wait_for_signal;
use time::OffsetDateTime;
use tokio::task::{JoinHandle, JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
mod args;
pub(crate) async fn execute(args: Args) -> Result<(), NymDataObservatoryError> {
let start = OffsetDateTime::now_utc();
let scraper_args = args.clone();
let run_args = crate::cli::commands::run::args::Args {
rpc_url: args.rpc_url.clone(),
websocket_url: args.websocket_url.clone(),
start_block_height: Some(args.start_block_height),
db_connection_string: args.db_connection_string,
webhook_url: args.webhook_url.clone(),
watch_for_chain_message_types: args.watch_for_chain_message_types,
webhook_auth: args.webhook_auth.clone(),
};
let config = crate::cli::commands::run::config::get_run_config(run_args.clone())?;
let db_connection_string = config.chain_scraper_connection_string();
let start_block_height = args.start_block_height;
let end_block_height = args
.end_block_height
.unwrap_or(args.start_block_height + (args.blocks_to_process.unwrap_or(1u32) - 1u32));
info!("nyxd rpc: {}", args.rpc_url.to_string());
info!("start_block_height: {:#?}", start_block_height);
info!("end_block_height: {:#?}", end_block_height);
info!("blocks_to_process: {:#?}", args.blocks_to_process);
let storage = db::Storage::init(db_connection_string).await?;
let tasks = JoinSet::new();
let cancellation_token = CancellationToken::new();
let scraper_pool = storage.pool_owned();
let shutdown_pool = storage.pool_owned();
// start the blocks processing in the background, that can be cancelled by the user
let cancel_after_processing = cancellation_token.clone();
let scraper_token_handle: JoinHandle<anyhow::Result<CancellationToken>> = tokio::spawn({
let config = config.clone();
async move {
// this only blocks until startup sync is done; it then runs on its own set of tasks
let scraper = process_chain_scraper(
scraper_args,
&config,
scraper_pool,
start_block_height,
end_block_height,
)
.await?;
info!("⏰ shutting down...");
cancel_after_processing.cancel();
Ok(scraper.cancel_token())
}
});
// wait for either shutdown or scraper having finished processing the block range
tokio::select! {
_ = wait_for_signal() => {
info!("received shutdown signal while waiting for scraper to finish its startup");
return Ok(())
}
scraper_token = scraper_token_handle => {
let scraper_token = match scraper_token {
Ok(Ok(token)) => token,
Ok(Err(startup_err)) => {
error!("failed to startup the chain scraper: {startup_err}");
return Err(startup_err.into());
}
Err(runtime_err) => {
error!("failed to finish the scraper startup task: {runtime_err}");
return Ok(())
}
};
wait_for_shutdown(shutdown_pool, start, cancellation_token, scraper_token, tasks).await
}
}
Ok(())
}
@@ -9,8 +9,8 @@ use tokio::task::{JoinHandle, JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
mod args;
mod config;
pub(crate) mod args;
pub(crate) mod config;
use crate::chain_scraper::run_chain_scraper;
use crate::db::DbPool;
@@ -40,7 +40,7 @@ async fn try_insert_startup_information(
.inspect_err(|err| error!("failed to insert run information: {err}"));
}
async fn wait_for_shutdown(
pub(crate) async fn wait_for_shutdown(
db_pool: DbPool,
start: OffsetDateTime,
main_cancellation_token: CancellationToken,
+5 -1
View File
@@ -1,7 +1,7 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::cli::commands::{build_info, init, run};
use crate::cli::commands::{build_info, init, process, run};
use crate::env::vars::*;
use crate::error::NymDataObservatoryError;
use clap::{Parser, Subcommand};
@@ -50,6 +50,7 @@ impl Cli {
Commands::BuildInfo(args) => build_info::execute(args),
Commands::Run(args) => run::execute(*args, self.http_port).await,
Commands::Init(args) => init::execute(args).await,
Commands::Process(args) => process::execute(*args).await,
}
}
}
@@ -59,6 +60,9 @@ pub(crate) enum Commands {
/// Show build information of this binary
BuildInfo(build_info::Args),
/// Process or re-process a fixed block range
Process(Box<process::Args>),
/// Start this nym-chain-watcher
Run(Box<run::Args>),