Compare commits

...

30 Commits

Author SHA1 Message Date
Mark Sinclair 2376183f92 Bump version 2024-12-11 21:32:35 +00:00
Sachin Kamath 4781f12751 bugfix: dont manually set last_processed_height for pruning=nothing strat. 2024-12-12 01:38:15 +05:30
Sachin Kamath 5036268963 chain-scraper : use tx module for parsing transactions 2024-12-12 01:13:22 +05:30
Mark Sinclair cabc53dced nyx-chain-watcher: return average price over 24 hours 2024-12-11 15:33:18 +00:00
Mark Sinclair d731384bca formatting 2024-12-10 19:58:21 +00:00
Mark Sinclair accbbd09fa change webhook payload to have a structured coin for funds 2024-12-10 17:43:33 +00:00
Mark Sinclair df1e8f6471 bump version 2024-12-09 17:27:29 +00:00
Mark Sinclair 7c1e0e319e add websocket rpcs to env files 2024-12-09 17:26:47 +00:00
Jędrzej Stuczyński 3ce0510044 added env variable to nuke the db 2024-12-09 16:58:40 +00:00
Jędrzej Stuczyński 083488e2fd even more logs 2024-12-09 16:49:00 +00:00
Jędrzej Stuczyński 49e6c6933f extra logs 2024-12-09 16:34:17 +00:00
Jędrzej Stuczyński 3211907bf4 explicitly build websocket client in 0.37 compat mode 2024-12-09 16:13:15 +00:00
Jędrzej Stuczyński 14ac613574 allow conversion from CometBFT block subscription 2024-12-09 16:13:02 +00:00
Mark Sinclair 06032b8a48 Bump version 2024-12-09 15:29:28 +00:00
Mark Sinclair 64091eb175 Bump version 2024-12-09 15:07:43 +00:00
Jędrzej Stuczyński 6204014395 implemented starting block logic inside the chain scraper itself 2024-12-09 13:43:49 +00:00
Mark Sinclair d4680382f5 Fix docker entry point and bump version 2024-12-06 21:42:29 +00:00
Mark Sinclair 2e9ab39002 Add Dockerfile and workflow to build 2024-12-06 20:57:49 +00:00
Mark Sinclair 4877a93201 parse message index and process all log entries 2024-12-06 20:11:45 +00:00
Mark Sinclair d38abba9a3 init saves example config 2024-12-06 20:11:14 +00:00
Mark Sinclair 93b0063f73 Add example to README 2024-12-06 20:10:46 +00:00
Mark Sinclair 247a4cefa6 Remove migration from seed app 2024-12-06 20:10:36 +00:00
Mark Sinclair ade88af3cb Add config, overrides and CLI 2024-12-06 18:12:45 +00:00
Mark Sinclair 0b99c1f467 Move nym-data-observatory (v0) to nyx-chain-watcher 2024-12-06 17:32:21 +00:00
Mark Sinclair 46fb4617e8 data-observatory: renamed transactions to payments because there is already transaction in the base scraper schema 2024-12-06 17:32:21 +00:00
Mark Sinclair ed821d1a57 nyxd-scraper: add optional starting height parameter to scrape before listening for new blocks 2024-12-06 17:32:21 +00:00
Sachin Kamath 23e52653d7 observatory 0.1 2024-12-06 17:32:21 +00:00
Sachin Kamath 8fba032e26 fix review comments 2024-12-06 17:32:21 +00:00
Sachin Kamath 504b2af388 clippy 2024-12-06 17:32:21 +00:00
Sachin Kamath 5a8e91542e nyxd-scraper: add config to make pre-commit storage optional 2024-12-06 17:32:21 +00:00
76 changed files with 1972 additions and 641 deletions
@@ -0,0 +1,55 @@
name: Build and upload Nyx Chain Watcher container to harbor.nymte.ch
on:
workflow_dispatch:
env:
WORKING_DIRECTORY: "nyx-chain-watcher"
CONTAINER_NAME: "nyx-chain-watcher"
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
- name: Checkout repo
uses: actions/checkout@v4
- name: Configure git identity
run: |
git config --global user.email "lawrence@nymtech.net"
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.44.5
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Check if tag exists
run: |
if git rev-parse ${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
fi
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
+4 -1
View File
@@ -51,4 +51,7 @@ ppa-private-key.b64
ppa-private-key.asc
nym-network-monitor/topology.json
nym-network-monitor/__pycache__
nym-network-monitor/*.key
nym-network-monitor/*.key
*.sqlite
.build
Generated
+35 -25
View File
@@ -5064,6 +5064,7 @@ dependencies = [
"log",
"nym-network-defaults",
"serde",
"thiserror",
"toml 0.8.14",
"url",
]
@@ -5300,31 +5301,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "nym-data-observatory"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.7.7",
"chrono",
"clap 4.5.20",
"nym-bin-common",
"nym-network-defaults",
"nym-node-requests",
"nym-task",
"serde",
"serde_json",
"sqlx",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nym-dkg"
version = "0.1.0"
@@ -6957,6 +6933,40 @@ dependencies = [
"url",
]
[[package]]
name = "nyx-chain-watcher"
version = "0.1.6"
dependencies = [
"anyhow",
"async-trait",
"axum 0.7.7",
"chrono",
"clap 4.5.20",
"nym-bin-common",
"nym-config",
"nym-network-defaults",
"nym-node-requests",
"nym-task",
"nym-validator-client",
"nyxd-scraper",
"reqwest 0.12.4",
"rocket",
"schemars",
"serde",
"serde_json",
"sqlx",
"thiserror",
"time",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nyxd-scraper"
version = "0.1.0"
+2 -2
View File
@@ -119,8 +119,8 @@ members = [
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-network-monitor",
"nyx-chain-watcher",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
@@ -157,11 +157,11 @@ default-members = [
"explorer-api",
"nym-api",
"nym-credential-proxy/nym-credential-proxy",
"nym-data-observatory",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
+2 -1
View File
@@ -12,7 +12,8 @@ dirs = { workspace = true, optional = true }
handlebars = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
toml = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true, features = ["display"] }
url = { workspace = true }
nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
+10
View File
@@ -0,0 +1,10 @@
use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NymConfigTomlError {
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
TomlSerializeFailure(#[from] toml::ser::Error),
}
+37
View File
@@ -13,6 +13,7 @@ pub use helpers::{parse_urls, OptionalSet};
pub use toml::de::Error as TomlDeError;
pub mod defaults;
pub mod error;
pub mod helpers;
pub mod legacy_helpers;
pub mod serde_helpers;
@@ -95,6 +96,42 @@ where
config.format_to_writer(file)
}
pub fn save_unformatted_config_to_file<C, P>(
config: &C,
path: P,
) -> Result<(), error::NymConfigTomlError>
where
C: Serialize + ?Sized,
P: AsRef<Path>,
{
let path = path.as_ref();
log::info!("saving config file to {}", path.display());
if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}
let mut file = File::create(path)?;
// TODO: check for whether any of our configs store anything sensitive
// and change that to 0o644 instead
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(path, perms)?;
}
// let serde format the TOML in whatever ugly way it chooses
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
let toml_string = toml::to_string_pretty(config)?;
Ok(file.write_all(toml_string.as_bytes())?)
}
pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
where
C: DeserializeOwned,
+99 -12
View File
@@ -42,8 +42,43 @@ impl PendingSync {
}
}
#[derive(Debug, Clone)]
pub struct BlockProcessorConfig {
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub explicit_starting_block_height: Option<u32>,
pub use_best_effort_start_height: bool,
}
impl Default for BlockProcessorConfig {
fn default() -> Self {
Self {
pruning_options: PruningOptions::nothing(),
store_precommits: true,
explicit_starting_block_height: None,
use_best_effort_start_height: false,
}
}
}
impl BlockProcessorConfig {
pub fn new(
pruning_options: PruningOptions,
store_precommits: bool,
explicit_starting_block_height: Option<u32>,
use_best_effort_start_height: bool,
) -> Self {
Self {
pruning_options,
store_precommits,
explicit_starting_block_height,
use_best_effort_start_height,
}
}
}
pub struct BlockProcessor {
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
@@ -65,9 +100,10 @@ pub struct BlockProcessor {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
pub async fn new(
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
@@ -81,8 +117,10 @@ impl BlockProcessor {
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
debug!(last_processed_height = %last_processed_height, pruned_height = %last_pruned_height, "setting up block processor...");
Ok(BlockProcessor {
pruning_options,
config,
cancel,
synced,
last_processed_height,
@@ -101,7 +139,7 @@ impl BlockProcessor {
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.pruning_options = pruning_options;
self.config.pruning_options = pruning_options;
self
}
@@ -128,7 +166,7 @@ impl BlockProcessor {
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;
persist_block(&full_info, &mut tx).await?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
// let the modules do whatever they want
// the ones wanting the full block:
@@ -241,7 +279,7 @@ impl BlockProcessor {
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
@@ -282,12 +320,12 @@ impl BlockProcessor {
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.pruning_options.strategy.is_nothing() {
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.pruning_options.strategy_interval();
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
@@ -363,20 +401,69 @@ impl BlockProcessor {
// but we need it to help the compiler figure out the future is `Send`
async fn startup_resync(&mut self) -> Result<(), ScraperError> {
assert!(self.pending_sync.is_empty());
info!("attempting to run startup resync...");
self.maybe_prune_storage().await?;
let latest_block = self.rpc_client.current_block_height().await? as u32;
info!("obtained latest block height: {latest_block}");
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
info!("we have already processed some blocks in the past - attempting to resume...");
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
self.last_processed_height = max(self.last_processed_height, last_to_keep);
if !self.config.pruning_options.strategy.is_nothing() {
self.last_processed_height = max(self.last_processed_height, last_to_keep);
}
let request_range = self.last_processed_height + 1..latest_block + 1;
info!("we need to request {request_range:?} to resync");
info!(
keep_recent = %keep_recent,
last_to_keep = %last_to_keep,
last_processed_height = %self.last_processed_height,
"we need to request {request_range:?} to resync"
);
self.request_missing_blocks(request_range).await?;
return Ok(());
}
// this is the first time starting up
if self.last_processed_height == 0 {
info!("this is the first time starting up");
let Some(starting_height) = self.config.explicit_starting_block_height else {
info!("no starting block height set - will use the default behaviour");
// nothing to do
return Ok(());
};
info!("attempting to start the scraper from block {starting_height}");
let earliest_available =
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");
if earliest_available > starting_height && self.config.use_best_effort_start_height {
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
});
}
let starting_height = if earliest_available > starting_height {
// add few additional blocks to account for all the startup waiting
// because the node might have pruned few blocks since
earliest_available + 10
} else {
starting_height
};
let request_range = starting_height..latest_block + 1;
info!("going to start the scraper from block {starting_height}");
info!("we need to request {request_range:?} before properly starting up");
self.request_missing_blocks(request_range).await?;
}
@@ -384,7 +471,7 @@ impl BlockProcessor {
}
pub(crate) async fn run(&mut self) {
info!("starting processing loop");
info!("starting block processor processing loop");
// sure, we could be more efficient and reset it on every processed block,
// but the overhead is so minimal that it doesn't matter
@@ -84,13 +84,7 @@ impl TryFrom<Event> for BlockToProcess {
// TODO: we're losing `result_begin_block` and `result_end_block` here but maybe that's fine?
let maybe_block = match event.data {
// we don't care about `NewBlock` until CometBFT 0.38, i.e. until we upgrade to wasmd 0.50
EventData::NewBlock { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
query,
kind: "NewBlock".to_string(),
})
}
EventData::NewBlock { block, .. } => block,
EventData::LegacyNewBlock { block, .. } => block,
EventData::Tx { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
+3
View File
@@ -19,6 +19,9 @@ pub enum ScraperError {
#[error("the block scraper is already running")]
ScraperAlreadyRunning,
#[error("block information for height {height} is not available on the provided rpc endpoint")]
BlocksUnavailable { height: u32 },
#[error("failed to establish websocket connection to {url}: {source}")]
WebSocketConnectionFailure {
url: String,
+2 -1
View File
@@ -15,6 +15,7 @@ pub(crate) mod scraper;
pub mod storage;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use block_processor::types::ParsedTransactionResponse;
pub use modules::{BlockModule, MsgModule, TxModule};
pub use scraper::{Config, NyxdScraper};
pub use scraper::{Config, NyxdScraper, StartingBlockOpts};
pub use storage::models;
+11
View File
@@ -117,6 +117,17 @@ impl RpcClient {
Ok(info.last_block_height.value())
}
pub(crate) async fn earliest_available_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting earliest available block height");
let status = self
.inner
.status()
.await
.map_err(|source| ScraperError::AbciInfoQueryFailure { source })?;
Ok(status.sync_info.earliest_block_height.value())
}
async fn get_transaction_results(
&self,
raw: &[Vec<u8>],
+99 -30
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::BlockProcessor;
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
@@ -24,6 +24,15 @@ use url::Url;
mod subscriber;
#[derive(Default, Clone, Copy)]
pub struct StartingBlockOpts {
pub start_block_height: Option<u32>,
/// If the scraper fails to start from the desired height, rather than failing,
/// attempt to use the next available height
pub use_best_effort_start_height: bool,
}
pub struct Config {
/// Url to the websocket endpoint of a validator, for example `wss://rpc.nymtech.net/websocket`
pub websocket_url: Url,
@@ -34,6 +43,10 @@ pub struct Config {
pub database_path: PathBuf,
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub start_block: StartingBlockOpts,
}
pub struct NyxdScraperBuilder {
@@ -60,8 +73,16 @@ impl NyxdScraperBuilder {
req_rx,
processing_tx.clone(),
);
let mut block_processor = BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
scraper.config.pruning_options,
scraper.config.store_precommits,
scraper.config.start_block.start_block_height,
scraper.config.start_block.use_best_effort_start_height,
);
let mut block_processor = BlockProcessor::new(
block_processor_config,
scraper.cancel_token.clone(),
scraper.startup_sync.clone(),
processing_rx,
@@ -118,7 +139,7 @@ pub struct NyxdScraper {
task_tracker: TaskTracker,
cancel_token: CancellationToken,
startup_sync: Arc<Notify>,
pub storage: ScraperStorage,
storage: ScraperStorage,
rpc_client: RpcClient,
}
@@ -142,6 +163,10 @@ impl NyxdScraper {
})
}
pub fn storage(&self) -> ScraperStorage {
self.storage.clone()
}
fn start_tasks(
&self,
mut block_requester: BlockRequester,
@@ -158,7 +183,10 @@ impl NyxdScraper {
self.task_tracker.close();
}
pub async fn process_single_block(&self, height: u32) -> Result<(), ScraperError> {
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_single_block(&self, height: u32) -> Result<(), ScraperError> {
info!(height = height, "attempting to process a single block");
if !self.task_tracker.is_empty() {
return Err(ScraperError::ScraperAlreadyRunning);
@@ -177,7 +205,10 @@ impl NyxdScraper {
block_processor.process_block(block.into()).await
}
pub async fn process_block_range(
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_block_range(
&self,
starting_height: Option<u32>,
end_height: Option<u32>,
@@ -194,10 +225,10 @@ impl NyxdScraper {
.await?
.with_pruning(PruningOptions::nothing());
let current_height = self.rpc_client.current_block_height().await? as u32;
let mut current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
let starting_height = match starting_height {
let mut starting_height = match starting_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -211,7 +242,8 @@ impl NyxdScraper {
}
};
let end_height = match end_height {
let must_catch_up = end_height.is_none();
let mut end_height = match end_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -226,32 +258,62 @@ impl NyxdScraper {
}
};
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
let mut last_processed = starting_height;
let range = (starting_height..=end_height).collect::<Vec<_>>();
while last_processed < current_height {
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
let range = (starting_height..=end_height).collect::<Vec<_>>();
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
}
}
}
}
// if we don't need to catch up, return early
if !must_catch_up {
return Ok(());
}
// check if we have caught up to the current block height
last_processed = end_height;
current_height = self.rpc_client.current_block_height().await? as u32;
info!(
last_processed = last_processed,
current_height = current_height,
"🏃 still need to catch up..."
);
starting_height = last_processed + 1;
end_height = current_height;
}
if must_catch_up {
info!(
last_processed = last_processed,
current_height = current_height,
"✅ block processing has caught up!"
);
}
Ok(())
@@ -275,8 +337,15 @@ impl NyxdScraper {
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
self.config.store_precommits,
self.config.start_block.start_block_height,
self.config.start_block.use_best_effort_start_height,
);
BlockProcessor::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
+11 -1
View File
@@ -3,6 +3,7 @@
use crate::block_processor::types::BlockToProcess;
use crate::error::ScraperError;
use tendermint_rpc::client::CompatMode;
use tendermint_rpc::event::Event;
use tendermint_rpc::query::EventType;
use tendermint_rpc::{SubscriptionClient, WebSocketClient, WebSocketClientDriver};
@@ -38,7 +39,16 @@ impl ChainSubscriber {
) -> Result<Self, ScraperError> {
// sure, we could have just used websocket client entirely, but let's keep the logic for
// getting current blocks and historical blocks completely separate with the dual connection
let (client, driver) = WebSocketClient::new(websocket_endpoint.as_str())
let websocket_url = websocket_endpoint.as_str().try_into().map_err(|source| {
ScraperError::WebSocketConnectionFailure {
url: websocket_endpoint.to_string(),
source,
}
})?;
let (client, driver) = WebSocketClient::builder(websocket_url)
.compat_mode(CompatMode::V0_37)
.build()
.await
.map_err(|source| ScraperError::WebSocketConnectionFailure {
url: websocket_endpoint.to_string(),
+13 -5
View File
@@ -50,6 +50,12 @@ pub(crate) fn log_db_operation_time(op_name: &str, start_time: Instant) {
impl ScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(database_path: P) -> Result<Self, ScraperError> {
let database_path = database_path.as_ref();
debug!(
"initialising scraper database path to '{}'",
database_path.display()
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(database_path)
.create_if_missing(true)
@@ -212,6 +218,7 @@ impl ScraperStorage {
pub async fn persist_block(
block: &FullBlockInformation,
tx: &mut StorageTransaction,
store_precommits: bool,
) -> Result<(), ScraperError> {
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
@@ -224,11 +231,12 @@ pub async fn persist_block(
// persist block data
persist_block_data(&block.block, total_gas, tx).await?;
// persist commits
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
if store_precommits {
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
+2 -1
View File
@@ -19,6 +19,7 @@ MULTISIG_CONTRACT_ADDRESS=n1zwv6feuzhy6a9wekh96cd57lsarmqlwxdypdsplw6zhfncqw6ftq
COCONUT_DKG_CONTRACT_ADDRESS=n1aakfpghcanxtc45gpqlx8j3rq0zcpyf49qmhm9mdjrfx036h4z5sy2vfh9
EXPLORER_API=https://canary-explorer.performance.nymte.ch/api/
NYXD=https://canary-validator.performance.nymte.ch
NYXD=https://rpc.canary-validator.performance.nymte.ch
NYM_API=https://canary-api.performance.nymte.ch/api/
NYXD_WS=wss://rpc.canary-validator.performance.nymte.ch/websocket
NYM_VPN_API=https://nym-vpn-api-git-deploy-canary-nyx-network-staging.vercel.app/api/
+2 -1
View File
@@ -19,6 +19,7 @@ VESTING_CONTRACT_ADDRESS=n1jlzdxnyces4hrhqz68dqk28mrw5jgwtcfq0c2funcwrmw0dx9l9s8
REWARDING_VALIDATOR_ADDRESS=n1rfvpsynktze6wvn6ldskj8xgwfzzk5v6pnff39
EXPLORER_API=https://qa-network-explorer.qa.nymte.ch/api/
NYXD=https://qa-validator.qa.nymte.ch
NYXD=https://rpc.qa-validator.qa.nymte.ch
NYXD_WS=wss://rpc.qa-validator.qa.nymte.ch/websocket
NYM_API=https://qa-nym-api.qa.nymte.ch/api/
NYM_VPN_API=https://nym-vpn-api-git-deploy-qa-nyx-network-staging.vercel.app/api/
+1 -1
View File
@@ -21,6 +21,6 @@ ECASH_CONTRACT_ADDRESS=n1v3vydvs2ued84yv3khqwtgldmgwn0elljsdh08dr5s2j9x4rc5fs9jl
STATISTICS_SERVICE_DOMAIN_ADDRESS="http://0.0.0.0"
EXPLORER_API=https://sandbox-explorer.nymtech.net/api/
NYXD=https://rpc.sandbox.nymtech.net
NYXD_WS=wss://rpc.sandbox.nymtech.net/websocket/
NYXD_WS=wss://rpc.sandbox.nymtech.net/websocket
NYM_API=https://sandbox-nym-api1.nymtech.net/api/
NYM_VPN_API=https://nym-vpn-api-git-deploy-sandbox-nyx-network-staging.vercel.app/api/
@@ -1,16 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO responses\n (joke_id, joke, date_created)\n VALUES\n ($1, $2, $3)\n ON CONFLICT(joke_id) DO UPDATE SET\n joke=excluded.joke,\n date_created=excluded.date_created;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Varchar",
"Text",
"Int4"
]
},
"nullable": []
},
"hash": "249faa11b88b749f50342bb5c9cc41d20896db543eed74a6f320c041bcbb723d"
}
@@ -1,34 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "joke_id",
"type_info": "Varchar"
},
{
"ordinal": 1,
"name": "joke",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "date_created",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "aff7fbd06728004d2f2226d20c32f1482df00de2dc1d2b4debbb2e12553d997b"
}
@@ -1,32 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT joke_id, joke, date_created FROM responses",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "joke_id",
"type_info": "Varchar"
},
{
"ordinal": 1,
"name": "joke",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "date_created",
"type_info": "Int4"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false
]
},
"hash": "e53f479f8cead3dc8aa1875e5d450ad69686cf6a109e37d6c3f0623c3e9f91d0"
}
-27
View File
@@ -1,27 +0,0 @@
FROM rust:latest AS builder
COPY ./ /usr/src/nym
WORKDIR /usr/src/nym/nym-data-observatory
RUN cargo build --release
#-------------------------------------------------------------------
# The following environment variables are required at runtime:
#
# NYM_DATA_OBSERVATORY_CONNECTION_URL
#
# And optionally:
#
# NYM_DATA_OBSERVATORY_HTTP_PORT
#
# see https://github.com/nymtech/nym/blob/develop/nym-data-observatory/src/main.rs for details
#-------------------------------------------------------------------
FROM ubuntu:24.04
RUN apt update && apt install -yy curl ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/target/release/nym-data-observatory ./
ENTRYPOINT [ "/nym/nym-data-observatory" ]
-58
View File
@@ -1,58 +0,0 @@
use anyhow::Result;
use sqlx::{Connection, PgConnection};
use std::io::Write;
use std::{collections::HashMap, fs::File};
const POSTGRES_USER: &str = "nym";
const POSTGRES_PASSWORD: &str = "password123";
const POSTGRES_DB: &str = "data_obs_db";
/// if schema changes, rerun `cargo sqlx prepare` with a running DB
/// https://github.com/launchbadge/sqlx/blob/main/sqlx-cli/README.md#enable-building-in-offline-mode-with-query
#[tokio::main]
async fn main() -> Result<()> {
let db_url =
format!("postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:5432/{POSTGRES_DB}");
export_db_variables(&db_url)?;
// if a live DB is reachable, use that
if PgConnection::connect(&db_url).await.is_ok() {
println!("cargo::rustc-env=SQLX_OFFLINE=false");
run_migrations(&db_url).await?;
} else {
// by default, run in offline mode
println!("cargo::rustc-env=SQLX_OFFLINE=true");
}
rerun_if_changed();
Ok(())
}
fn export_db_variables(db_url: &str) -> Result<()> {
let mut map = HashMap::new();
map.insert("POSTGRES_USER", POSTGRES_USER);
map.insert("POSTGRES_PASSWORD", POSTGRES_PASSWORD);
map.insert("POSTGRES_DB", POSTGRES_DB);
map.insert("DATABASE_URL", db_url);
let mut file = File::create(".env")?;
for (var, value) in map.iter() {
println!("cargo::rustc-env={}={}", var, value);
writeln!(file, "{}={}", var, value).expect("Failed to write to dotenv file");
}
Ok(())
}
async fn run_migrations(db_url: &str) -> Result<()> {
let mut conn = PgConnection::connect(db_url).await?;
sqlx::migrate!("./migrations").run(&mut conn).await?;
Ok(())
}
fn rerun_if_changed() {
println!("cargo::rerun-if-changed=migrations");
println!("cargo::rerun-if-changed=src/db/queries");
}
-31
View File
@@ -1,31 +0,0 @@
services:
postgres:
image: postgres:13
container_name: nym-data-observatory-pg
environment:
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
data-observatory:
depends_on:
- postgres
image: nym-data-observatory:latest
build:
context: ../
dockerfile: nym-data-observatory/Dockerfile
container_name: nym-data-observatory
environment:
NYM_DATA_OBSERVATORY_CONNECTION_USERNAME: "postgres"
NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD: "password"
NYM_DATA_OBSERVATORY_CONNECTION_HOST: "postgres"
NYM_DATA_OBSERVATORY_CONNECTION_PORT: "5432"
NYM_DATA_OBSERVATORY_CONNECTION_DB: ""
NYM_DATA_OBSERVATORY_HTTP_PORT: 8000
env_file:
- ../envs/qa.env
volumes:
pgdata:
@@ -1,6 +0,0 @@
CREATE TABLE responses (
id SERIAL PRIMARY KEY,
joke_id VARCHAR NOT NULL UNIQUE,
joke TEXT NOT NULL,
date_created INTEGER NOT NULL
);
-13
View File
@@ -1,13 +0,0 @@
#!/bin/bash
# .env is generated in build.rs
source .env
# Launching a container in such a way that it's destroyed after you detach from the terminal:
docker compose up
# docker exec -it nym-data-observatory-pg /bin/bash
# psql -U youruser -d yourdb
echo "Tearing down containers to have a clean slate"
docker compose down -v
@@ -1,61 +0,0 @@
use core::str;
use serde::Deserialize;
use tokio::process::Command;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use crate::db::{self, DbPool};
const REFRESH_DELAY: Duration = Duration::from_secs(15);
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
pub(crate) async fn spawn_in_background(db_pool: DbPool) -> JoinHandle<()> {
loop {
tracing::info!("Running in a loop 🏃");
if let Err(e) = some_network_action(&db_pool).await {
tracing::error!(
"❌ Run failed: {e}, retrying in {}s...",
FAILURE_RETRY_DELAY.as_secs()
);
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!(
"✅ Run successful, sleeping for {}s...",
REFRESH_DELAY.as_secs()
);
tokio::time::sleep(REFRESH_DELAY).await;
}
}
}
#[derive(Deserialize, Debug)]
pub(crate) struct Response {
#[serde(rename(deserialize = "id"))]
pub(crate) joke_id: String,
pub(crate) joke: String,
#[serde(rename(deserialize = "status"))]
pub(crate) _status: u16,
}
async fn some_network_action(pool: &DbPool) -> anyhow::Result<()> {
// for demonstration purposes only. You should use reqwest if you need it
let output = Command::new("curl")
.arg("-H")
.arg("Accept: application/json")
.arg("https://icanhazdadjoke.com/")
.output()
.await?;
if !output.status.success() {
anyhow::bail!("Curl command failed with status: {}", output.status);
}
let response_str = str::from_utf8(&output.stdout)?;
let joke_response: Response = serde_json::from_str(response_str)?;
tracing::info!("{:?}", joke_response.joke);
db::queries::insert_joke(pool, joke_response.into()).await?;
Ok(())
}
-22
View File
@@ -1,22 +0,0 @@
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use crate::background_task::Response;
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct JokeDto {
pub(crate) joke_id: String,
pub(crate) joke: String,
pub(crate) date_created: i32,
}
impl From<Response> for JokeDto {
fn from(value: Response) -> Self {
Self {
joke_id: value.joke_id,
joke: value.joke,
// casting not smart, can implicitly panic, don't do this in prod
date_created: chrono::offset::Utc::now().timestamp() as i32,
}
}
}
@@ -1,39 +0,0 @@
use crate::db::{models::JokeDto, DbPool};
pub(crate) async fn insert_joke(pool: &DbPool, joke: JokeDto) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
sqlx::query!(
"INSERT INTO responses
(joke_id, joke, date_created)
VALUES
($1, $2, $3)
ON CONFLICT(joke_id) DO UPDATE SET
joke=excluded.joke,
date_created=excluded.date_created;",
joke.joke_id,
joke.joke,
joke.date_created as i32,
)
.execute(&mut *conn)
.await?;
Ok(())
}
pub(crate) async fn select_joke_by_id(pool: &DbPool, joke_id: &str) -> anyhow::Result<JokeDto> {
sqlx::query_as!(
JokeDto,
"SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
joke_id
)
.fetch_one(pool)
.await
.map_err(anyhow::Error::from)
}
pub(crate) async fn select_all(pool: &DbPool) -> anyhow::Result<Vec<JokeDto>> {
sqlx::query_as!(JokeDto, "SELECT joke_id, joke, date_created FROM responses",)
.fetch_all(pool)
.await
.map_err(anyhow::Error::from)
}
@@ -1,5 +0,0 @@
// group queries in files by theme
mod joke;
// re-exporting allows us to access all queries via `queries::bla``
pub(crate) use joke::{insert_joke, select_all, select_joke_by_id};
@@ -1,78 +0,0 @@
use axum::{
extract::{Path, State},
Json, Router,
};
use serde::Deserialize;
use utoipa::IntoParams;
use crate::{
db::{
models::JokeDto,
queries::{self, select_joke_by_id},
},
http::{
error::{Error, HttpResult},
state::AppState,
},
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(jokes))
.route("/:joke_id", axum::routing::get(joke_by_id))
.route("/fetch_another", axum::routing::get(fetch_another))
}
#[utoipa::path(
tag = "Dad Jokes",
get,
path = "/v1/jokes",
responses(
(status = 200, body = Vec<JokeDto>)
)
)]
async fn jokes(State(state): State<AppState>) -> HttpResult<Json<Vec<JokeDto>>> {
queries::select_all(state.db_pool())
.await
.map(Json::from)
.map_err(|_| Error::internal())
}
#[derive(Deserialize, IntoParams)]
#[into_params(parameter_in = Path)]
struct JokeIdParam {
joke_id: String,
}
#[utoipa::path(
tag = "Dad Jokes",
get,
params(
JokeIdParam
),
path = "/v1/jokes/{joke_id}",
responses(
(status = 200, body = JokeDto)
)
)]
async fn joke_by_id(
Path(JokeIdParam { joke_id }): Path<JokeIdParam>,
State(state): State<AppState>,
) -> HttpResult<Json<JokeDto>> {
select_joke_by_id(state.db_pool(), &joke_id)
.await
.map(Json::from)
.map_err(|_| Error::not_found(joke_id))
}
#[utoipa::path(
tag = "Dad Jokes",
get,
path = "/v1/jokes/fetch_another",
responses(
(status = 200, body = String)
)
)]
async fn fetch_another(State(_state): State<AppState>) -> HttpResult<Json<String>> {
Ok(Json(String::from("Done boss, check the DB")))
}
@@ -1,21 +0,0 @@
use axum::{extract::State, Json, Router};
use crate::http::{error::HttpResult, state::AppState};
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/", axum::routing::get(mixnodes))
}
#[utoipa::path(
tag = "Mixnodes",
get,
path = "/v1/mixnodes",
responses(
(status = 200, body = String)
)
)]
async fn mixnodes(State(_state): State<AppState>) -> HttpResult<Json<serde_json::Value>> {
Ok(Json(
serde_json::json!({"message": "😎 Nothing to see here, move along 😎"}),
))
}
-78
View File
@@ -1,78 +0,0 @@
use clap::Parser;
use nym_network_defaults::setup_env;
use nym_task::signal::wait_for_signal;
mod background_task;
mod db;
mod http;
mod logging;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// Port to listen on
#[arg(long, default_value_t = 8000, env = "NYM_DATA_OBSERVATORY_HTTP_PORT")]
http_port: u16,
/// Path to the environment variables file. If you don't provide one, variables for the mainnet will be used.
#[arg(short, long, default_value = None, env = "NYM_DATA_OBSERVATORY_ENV_FILE")]
env_file: Option<String>,
/// DB connection username
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_USERNAME")]
connection_username: String,
/// DB connection password
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD")]
connection_password: String,
/// DB connection host
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_HOST")]
connection_host: String,
/// DB connection port
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PORT")]
connection_port: String,
/// DB connection database name
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_DB")]
connection_db: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::setup_tracing_logger();
let args = Args::parse();
setup_env(args.env_file); // Defaults to mainnet if empty
let connection_url = format!(
"postgres://{}:{}@{}:{}/{}",
args.connection_username,
args.connection_password,
args.connection_host,
args.connection_port,
args.connection_db
);
let storage = db::Storage::init(connection_url).await?;
let db_pool = storage.pool_owned().await;
tokio::spawn(async move {
background_task::spawn_in_background(db_pool).await;
tracing::info!("Started task");
});
let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, args.http_port)
.await
.expect("Failed to start server");
tracing::info!("Started HTTP server on port {}", args.http_port);
wait_for_signal().await;
if let Err(err) = shutdown_handles.shutdown().await {
tracing::error!("{err}");
};
Ok(())
}
@@ -26,7 +26,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
NyxdScraper::new(config.scraper_config())
.await?
.process_single_block(args.height)
.unsafe_process_single_block(args.height)
.await?;
Ok(())
}
@@ -39,7 +39,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
NyxdScraper::new(config.scraper_config())
.await?
.process_block_range(args.start_height, args.stop_height)
.unsafe_process_block_range(args.start_height, args.stop_height)
.await?;
Ok(())
}
+11 -1
View File
@@ -112,6 +112,7 @@ impl Config {
nyxd_scraper: NyxdScraper {
websocket_url,
pruning: Default::default(),
store_precommits: true,
},
base: Base {
upstream_nyxd: nyxd_url,
@@ -127,6 +128,8 @@ impl Config {
rpc_url: self.base.upstream_nyxd.clone(),
database_path: self.storage_paths.nyxd_scraper.clone(),
pruning_options: self.nyxd_scraper.pruning,
store_precommits: self.nyxd_scraper.store_precommits,
start_block_height: None,
}
}
@@ -314,7 +317,14 @@ pub struct NyxdScraper {
// if the value is missing, use `nothing` pruning as this was the past behaviour
#[serde(default = "PruningOptions::nothing")]
pub pruning: PruningOptions,
// TODO: debug with everything that's currently hardcoded in the scraper
/// Specifies whether to store pre-commits within the database.
#[serde(default = "default_store_precommits")]
pub store_precommits: bool,
}
fn default_store_precommits() -> bool {
true
}
impl NyxdScraper {
+1
View File
@@ -3163,6 +3163,7 @@ dependencies = [
"log",
"nym-network-defaults",
"serde",
"thiserror",
"toml 0.8.19",
"url",
]
@@ -1,9 +1,9 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
# SPDX-License-Identifier: GPL-3.0-only
[package]
name = "nym-data-observatory"
version = "0.1.0"
name = "nyx-chain-watcher"
version = "0.1.6"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -15,18 +15,27 @@ readme.workspace = true
[dependencies]
anyhow = { workspace = true }
async-trait.workspace = true
axum = { workspace = true, features = ["tokio"] }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
nym-bin-common = { path = "../common/bin-common" }
clap = { workspace = true, features = ["cargo", "derive", "env"] }
nym-config = { path = "../common/config" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
nym-network-defaults = { path = "../common/network-defaults" }
nym-task = { path = "../common/task" }
nym-node-requests = { path = "../nym-node/nym-node-requests", features = [
"openapi",
] }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nyxd-scraper = {path = "../common/nyxd-scraper"}
reqwest = {workspace= true, features = ["rustls-tls"]}
rocket = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "time"] }
thiserror = { workspace = true }
time = {version = "0.3.36"}
tokio = { workspace = true, features = ["process", "rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
@@ -40,4 +49,4 @@ utoipauto = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
+32
View File
@@ -0,0 +1,32 @@
FROM rust:latest AS builder
COPY ./ /usr/src/nym
WORKDIR /usr/src/nym/nyx-chain-watcher
RUN cargo build --release
#-------------------------------------------------------------------
# The following environment variables are required at runtime:
#
# NYX_CHAIN_WATCHER_DATABASE_PATH = /mnt/nyx-chain-watchter.sqlite
# NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH = /mnt/chain-history.sqlite
# NYX_CHAIN_WATCHER_WATCH_ACCOUNTS = "n1...,n1...,n1..."
#
# And optionally:
#
# NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES = "/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
# NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG = /mnt/sandbox.env for sandbox environment
#
# see https://github.com/nymtech/nym/blob/develop/nyx-chain-watcher/src/cli/commands/run/args.rs for details
# and https://github.com/nymtech/nym/blob/develop/nyx-chain-watcher/src/env.rs for env vars
#-------------------------------------------------------------------
FROM ubuntu:24.04
RUN apt update && apt install -yy curl ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/target/release/nyx-chain-watcher ./
ENTRYPOINT [ "/nym/nyx-chain-watcher", "run" ]
+19
View File
@@ -0,0 +1,19 @@
# Nyx Chain Watcher
A simple binary to watch addresses on the Nyx chain and to call webhooks when particular message types are in a block.
Look in [env.rs](./src/env.rs) for the names of environment variables that can be overridden.
## Running locally
```
NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH=chain_history.sqlite \
NYX_CHAIN_WATCHER_DATABASE_PATH=nyx_chain_watcher.sqlite \
NYX_CHAIN_WATCHER_WATCH_ACCOUNTS=n1...,n1...,n1... \
NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES="/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
NYX_CHAIN_WATCHER_WEBHOOK_URL="https://webhook.site" \
NYX_CHAIN_WATCHER_WEBHOOK_AUTH=1234 \
cargo run -- run
```
+53
View File
@@ -0,0 +1,53 @@
use anyhow::Result;
use sqlx::{sqlite::SqliteConnectOptions, Connection, SqliteConnection};
use std::io::Write;
use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr};
#[tokio::main]
async fn main() -> Result<()> {
let db_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join(".build")
.join("nyx_chain_watcher.sqlite");
// Create the database directory if it doesn't exist
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
let db_url = format!("sqlite:{}", db_path.display());
// Ensure database file is created with proper permissions
let connect_options = SqliteConnectOptions::from_str(&db_url)?
.create_if_missing(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.foreign_keys(true);
// Create initial connection to ensure database exists
let mut conn = SqliteConnection::connect_with(&connect_options).await?;
export_db_variables(&db_url)?;
println!("cargo:rustc-env=SQLX_OFFLINE=false");
// Run migrations after ensuring database exists
sqlx::migrate!("./migrations").run(&mut conn).await?;
// Add rerun-if-changed directives
println!("cargo:rerun-if-changed=migrations");
println!("cargo:rerun-if-changed=build.rs");
println!("cargo:rerun-if-changed=src");
Ok(())
}
fn export_db_variables(db_url: &str) -> Result<()> {
let mut map = HashMap::new();
map.insert("DATABASE_URL", db_url);
let mut file = File::create(".env")?;
for (var, value) in map.iter() {
println!("cargo:rustc-env={}={}", var, value);
writeln!(file, "{}={}", var, value)?;
}
Ok(())
}
@@ -0,0 +1,7 @@
CREATE TABLE price_history (
timestamp INTEGER PRIMARY KEY,
chf REAL NOT NULL,
usd REAL NOT NULL,
eur REAL NOT NULL,
btc REAL NOT NULL
);
@@ -0,0 +1,10 @@
CREATE TABLE payments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_hash TEXT NOT NULL UNIQUE,
sender_address TEXT NOT NULL,
receiver_address TEXT NOT NULL,
amount REAL NOT NULL,
timestamp INTEGER NOT NULL,
height INTEGER NOT NULL,
memo TEXT
);
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tx_hash TEXT NOT NULL,
height INTEGER NOT NULL,
message_index INTEGER NOT NULL,
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
amount TEXT NOT NULL,
memo TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(tx_hash, message_index)
);
+198
View File
@@ -0,0 +1,198 @@
use crate::config::PaymentWatcherConfig;
use crate::env::vars::{
NYXD_SCRAPER_START_HEIGHT, NYXD_SCRAPER_UNSAFE_NUKE_DB,
NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT,
};
use async_trait::async_trait;
use nyxd_scraper::{
error::ScraperError, storage::StorageTransaction, NyxdScraper, ParsedTransactionResponse,
PruningOptions, TxModule,
};
use sqlx::SqlitePool;
use std::fs;
use tracing::{info, warn};
pub(crate) async fn run_chain_scraper(
config: &crate::config::Config,
db_pool: SqlitePool,
) -> anyhow::Result<NyxdScraper> {
let websocket_url = std::env::var("NYXD_WS").expect("NYXD_WS not defined");
let rpc_url = std::env::var("NYXD").expect("NYXD not defined");
let websocket_url = reqwest::Url::parse(&websocket_url)?;
let rpc_url = reqwest::Url::parse(&rpc_url)?;
// why are those not part of CLI? : (
let start_block_height = match std::env::var(NYXD_SCRAPER_START_HEIGHT).ok() {
None => None,
// blow up if passed malformed env value
Some(raw) => Some(raw.parse()?),
};
let use_best_effort_start_height =
match std::env::var(NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT).ok() {
None => false,
// blow up if passed malformed env value
Some(raw) => raw.parse()?,
};
let nuke_db: bool = match std::env::var(NYXD_SCRAPER_UNSAFE_NUKE_DB).ok() {
None => false,
// blow up if passed malformed env value
Some(raw) => raw.parse()?,
};
if nuke_db {
warn!("☢️☢️☢️ NUKING THE SCRAPER DATABASE");
fs::remove_file(config.chain_scraper_database_path())?;
}
if config.payment_watcher_config.is_none() {
anyhow::bail!("No payment watcher config found, not running chain scraper");
}
let scraper = NyxdScraper::builder(nyxd_scraper::Config {
websocket_url,
rpc_url,
database_path: config.chain_scraper_database_path().into(),
pruning_options: PruningOptions::nothing(),
store_precommits: false,
start_block: nyxd_scraper::StartingBlockOpts {
start_block_height,
use_best_effort_start_height,
},
})
.with_tx_module(EventScraperModule::new(
db_pool,
config.payment_watcher_config.clone().unwrap_or_default(),
));
let instance = scraper.build_and_start().await?;
info!("🚧 blocking until the chain has caught up...");
instance.wait_for_startup_sync().await;
Ok(instance)
}
pub struct EventScraperModule {
db_pool: SqlitePool,
payment_config: PaymentWatcherConfig,
}
impl EventScraperModule {
pub fn new(db_pool: SqlitePool, payment_config: PaymentWatcherConfig) -> Self {
Self {
db_pool,
payment_config,
}
}
#[allow(clippy::too_many_arguments)]
async fn store_transfer_event(
&self,
tx_hash: &str,
height: i64,
message_index: i64,
sender: String,
recipient: String,
amount: String,
memo: Option<String>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO transactions (tx_hash, height, message_index, sender, recipient, amount, memo)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
tx_hash,
height,
message_index,
sender,
recipient,
amount,
memo
)
.execute(&self.db_pool)
.await?;
Ok(())
}
}
#[async_trait]
impl TxModule for EventScraperModule {
async fn handle_tx(
&mut self,
tx: &ParsedTransactionResponse,
_: &mut StorageTransaction,
) -> Result<(), ScraperError> {
let events = &tx.tx_result.events;
let height = tx.height.value() as i64;
let tx_hash = tx.hash.to_string();
let memo = tx.tx.body.memo.clone();
// Don't process failed transactions
if !tx.tx_result.code.is_ok() {
return Ok(());
}
// Process each event
for event in events {
// Only process transfer events
if event.kind == "transfer" {
let mut recipient = None;
let mut sender = None;
let mut amount = None;
// TODO: get message index from event
let message_index = 0;
// Extract transfer event attributes
for attr in &event.attributes {
if let (Ok(key), Ok(value)) = (attr.key_str(), attr.value_str()) {
match key {
"recipient" => recipient = Some(value.to_string()),
"sender" => sender = Some(value.to_string()),
"amount" => amount = Some(value.to_string()),
_ => continue,
}
}
}
// If we have all required fields, check if recipient is watched and store
if let (Some(recipient), Some(sender), Some(amount)) = (recipient, sender, amount) {
// Check if any watcher is watching this recipient
let is_watched = self.payment_config.watchers.iter().any(|watcher| {
if let Some(watched_accounts) =
&watcher.watch_for_transfer_recipient_accounts
{
watched_accounts
.iter()
.any(|account| account.to_string() == recipient)
} else {
false
}
});
if is_watched {
if let Err(e) = self
.store_transfer_event(
&tx_hash,
height,
message_index,
sender,
recipient,
amount,
Some(memo.clone()),
)
.await
{
warn!("Failed to store transfer event: {}", e);
}
}
}
}
}
Ok(())
}
}
@@ -0,0 +1,17 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NyxChainWatcherError;
use nym_bin_common::bin_info_owned;
use nym_bin_common::output_format::OutputFormat;
#[derive(clap::Args, Debug)]
pub(crate) struct Args {
#[clap(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
pub(crate) fn execute(args: Args) -> Result<(), NyxChainWatcherError> {
println!("{}", args.output.format(&bin_info_owned!()));
Ok(())
}
@@ -0,0 +1,44 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::cli::DEFAULT_NYX_CHAIN_WATCHER_ID;
use crate::config::payments_watcher::HttpAuthenticationOptions::AuthorizationBearerToken;
use crate::config::payments_watcher::PaymentWatcherEntry;
use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatcherConfig};
use crate::error::NyxChainWatcherError;
use nym_config::save_unformatted_config_to_file;
use nym_validator_client::nyxd::AccountId;
use std::str::FromStr;
#[derive(clap::Args, Debug)]
pub(crate) struct Args {}
pub(crate) async fn execute(_args: Args) -> Result<(), NyxChainWatcherError> {
let config_path = default_config_filepath();
let data_dir = Config::default_data_directory(&config_path)?;
let builder = ConfigBuilder::new(config_path.clone(), data_dir).with_payment_watcher_config(
PaymentWatcherConfig {
watchers: vec![PaymentWatcherEntry {
id: DEFAULT_NYX_CHAIN_WATCHER_ID.to_string(),
webhook_url: "https://webhook.site".to_string(),
watch_for_transfer_recipient_accounts: Some(vec![AccountId::from_str(
"n17g9a2pwwkg8m60wf59pq6mv0c2wusg9ukparkz",
)
.unwrap()]),
authentication: Some(AuthorizationBearerToken {
token: "1234".to_string(),
}),
description: None,
watch_for_chain_message_types: Some(vec![
"/cosmos.bank.v1beta1.MsgSend".to_string(),
"/ibc.applications.transfer.v1.MsgTransfer".to_string(),
]),
}],
},
);
let config = builder.build();
Ok(save_unformatted_config_to_file(&config, &config_path)?)
}
@@ -0,0 +1,3 @@
pub(crate) mod build_info;
pub(crate) mod init;
pub(crate) mod run;
@@ -0,0 +1,101 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::env::vars::*;
use nym_validator_client::nyxd::AccountId;
#[derive(clap::Args, Debug)]
pub(crate) struct Args {
/// (Override) SQLite database file path for chain watcher
#[arg(long, env = NYX_CHAIN_WATCHER_DATABASE_PATH)]
pub(crate) chain_watcher_db_path: Option<String>,
/// (Override) SQLite database file path for chain scraper history
#[arg(long, env = NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH)]
pub(crate) chain_history_db_path: Option<String>,
/// (Override) Watch for transfers to these recipient accounts
#[clap(
long,
value_delimiter = ',',
env = NYX_CHAIN_WATCHER_WATCH_ACCOUNTS
)]
pub watch_for_transfer_recipient_accounts: Option<Vec<AccountId>>,
/// (Override) Watch for chain messages of these types
#[clap(
long,
value_delimiter = ',',
env = NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES
)]
pub watch_for_chain_message_types: Option<Vec<String>>,
/// (Override) The webhook to call when we find something
#[clap(
long,
env = NYX_CHAIN_WATCHER_WEBHOOK_URL
)]
pub webhook_url: Option<String>,
/// (Override) Optionally, authenticate with the webhook
#[clap(
long,
env = NYX_CHAIN_WATCHER_WEBHOOK_AUTH
)]
pub webhook_auth: Option<String>,
}
/*impl Args {
pub(super) fn take_mnemonic(&mut self) -> Option<Zeroizing<bip39::Mnemonic>> {
self.entry_gateway.mnemonic.take().map(Zeroizing::new)
}
}
impl Args {
pub(crate) fn build_config(self) -> Result<Config, NymNodeError> {
let config_path = self.config.config_path();
let data_dir = Config::default_data_directory(&config_path)?;
let id = self
.config
.id()
.clone()
.ok_or(NymNodeError::MissingInitArg {
section: "global".to_string(),
name: "id".to_string(),
})?;
let config = ConfigBuilder::new(id, config_path.clone(), data_dir.clone())
.with_mode(self.mode.unwrap_or_default())
.with_host(self.host.build_config_section())
.with_http(self.http.build_config_section())
.with_mixnet(self.mixnet.build_config_section())
.with_wireguard(self.wireguard.build_config_section(&data_dir))
.with_storage_paths(NymNodePaths::new(&data_dir))
.with_mixnode(self.mixnode.build_config_section())
.with_entry_gateway(self.entry_gateway.build_config_section(&data_dir))
.with_exit_gateway(self.exit_gateway.build_config_section(&data_dir))
.build();
Ok(config)
}
pub(crate) fn override_config(self, mut config: Config) -> Config {
if let Some(mode) = self.mode {
config.mode = mode;
}
config.host = self.host.override_config_section(config.host);
config.http = self.http.override_config_section(config.http);
config.mixnet = self.mixnet.override_config_section(config.mixnet);
config.wireguard = self.wireguard.override_config_section(config.wireguard);
config.mixnode = self.mixnode.override_config_section(config.mixnode);
config.entry_gateway = self
.entry_gateway
.override_config_section(config.entry_gateway);
config.exit_gateway = self
.exit_gateway
.override_config_section(config.exit_gateway);
config
}
}
*/
@@ -0,0 +1,84 @@
use crate::cli::commands::run::args::Args;
use crate::cli::DEFAULT_NYX_CHAIN_WATCHER_ID;
use crate::config::payments_watcher::{HttpAuthenticationOptions, PaymentWatcherEntry};
use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatcherConfig};
use crate::error::NyxChainWatcherError;
use tracing::{info, warn};
pub(crate) fn get_run_config(args: Args) -> Result<Config, NyxChainWatcherError> {
info!("{args:#?}");
let Args {
ref watch_for_transfer_recipient_accounts,
mut watch_for_chain_message_types,
webhook_auth,
ref chain_watcher_db_path,
ref chain_history_db_path,
webhook_url,
} = args;
// if there are no args set, then try load the config
if args.watch_for_transfer_recipient_accounts.is_none()
&& args.watch_for_transfer_recipient_accounts.is_none()
&& args.chain_watcher_db_path.is_none()
{
info!("Loading default config file...");
return Config::read_from_toml_file_in_default_location();
}
// set default messages
if watch_for_chain_message_types.is_none() {
watch_for_chain_message_types = Some(vec!["/cosmos.bank.v1beta1.MsgSend".to_string()]);
}
// warn if no accounts set
if watch_for_transfer_recipient_accounts.is_none() {
warn!(
"You did not specify any accounts to watch in {}. Only chain data will be stored.",
crate::env::vars::NYX_CHAIN_WATCHER_WATCH_ACCOUNTS
);
}
let config_path = default_config_filepath();
let data_dir = Config::default_data_directory(&config_path)?;
let mut builder = ConfigBuilder::new(config_path, data_dir);
if let Some(db_path) = chain_watcher_db_path {
info!("Overriding database url with '{db_path}'");
builder = builder.with_db_path(db_path.clone());
}
if let Some(db_path) = chain_history_db_path {
info!("Overriding chain history database url with '{db_path}'");
builder = builder.with_chain_scraper_db_path(db_path.clone());
}
if let Some(webhook_url) = webhook_url {
let authentication =
webhook_auth.map(|token| HttpAuthenticationOptions::AuthorizationBearerToken { token });
let watcher_config = PaymentWatcherConfig {
watchers: vec![PaymentWatcherEntry {
id: DEFAULT_NYX_CHAIN_WATCHER_ID.to_string(),
description: None,
watch_for_transfer_recipient_accounts: watch_for_transfer_recipient_accounts
.clone(),
watch_for_chain_message_types,
webhook_url,
authentication,
}],
};
info!("Overriding watcher config with env vars");
builder = builder.with_payment_watcher_config(watcher_config);
} else {
warn!(
"You did not specify a webhook in {}. Only database items will be stored.",
crate::env::vars::NYX_CHAIN_WATCHER_WEBHOOK_URL
);
}
Ok(builder.build())
}
@@ -0,0 +1,90 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NyxChainWatcherError;
use tokio::join;
use tracing::{error, info, trace};
mod args;
mod config;
use crate::chain_scraper::run_chain_scraper;
use crate::{db, http, payment_listener, price_scraper};
pub(crate) use args::Args;
use nym_task::signal::wait_for_signal;
pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWatcherError> {
trace!("passed arguments: {args:#?}");
let config = config::get_run_config(args)?;
let db_path = config.database_path();
info!("Config is {config:#?}");
info!(
"Database path is {:?}",
std::path::Path::new(&db_path)
.canonicalize()
.unwrap_or_default()
);
info!(
"Chain History Database path is {:?}",
std::path::Path::new(&config.chain_scraper_database_path())
.canonicalize()
.unwrap_or_default()
);
// Ensure parent directory exists
if let Some(parent) = std::path::Path::new(&db_path).parent() {
std::fs::create_dir_all(parent)?;
}
let connection_url = format!("sqlite://{}?mode=rwc", db_path);
let storage = db::Storage::init(connection_url).await?;
let watcher_pool = storage.pool_owned().await;
// Spawn the chain scraper and get its storage
// Spawn the payment listener task
let payment_listener_handle = tokio::spawn({
let price_scraper_pool = storage.pool_owned().await;
let scraper_pool = storage.pool_owned().await;
run_chain_scraper(&config, scraper_pool).await?;
let payment_watcher_config = config.payment_watcher_config.unwrap_or_default();
async move {
if let Err(e) =
payment_listener::run_payment_listener(payment_watcher_config, price_scraper_pool)
.await
{
error!("Payment listener error: {}", e);
}
Ok::<_, anyhow::Error>(())
}
});
// Clone pool for each task that needs it
//let background_pool = db_pool.clone();
let price_scraper_handle = tokio::spawn(async move {
price_scraper::run_price_scraper(&watcher_pool).await;
});
let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, http_port)
.await
.expect("Failed to start server");
info!("Started HTTP server on port {}", http_port);
// Wait for the short-lived tasks to complete
let _ = join!(price_scraper_handle, payment_listener_handle);
// Wait for a signal to terminate the long-running task
wait_for_signal().await;
if let Err(err) = shutdown_handles.shutdown().await {
error!("{err}");
};
Ok(())
}
+67
View File
@@ -0,0 +1,67 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::cli::commands::{build_info, init, run};
use crate::env::vars::*;
use crate::error::NyxChainWatcherError;
use clap::{Parser, Subcommand};
use nym_bin_common::bin_info;
use std::sync::OnceLock;
mod commands;
pub const DEFAULT_NYX_CHAIN_WATCHER_ID: &str = "default-nyx-chain-watcher";
// Helper for passing LONG_VERSION to clap
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
#[derive(Parser, Debug)]
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
pub(crate) struct Cli {
/// Path pointing to an env file that configures the nym-chain-watcher and overrides any preconfigured values.
#[clap(
short,
long,
env = NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG
)]
pub(crate) config_env_file: Option<std::path::PathBuf>,
/// Flag used for disabling the printed banner in tty.
#[clap(
long,
env = NYX_CHAIN_WATCHER_NO_BANNER_ARG
)]
pub(crate) no_banner: bool,
/// Port to listen on
#[arg(long, default_value_t = 8000, env = "NYX_CHAIN_WATCHER_HTTP_PORT")]
pub(crate) http_port: u16,
#[clap(subcommand)]
command: Commands,
}
impl Cli {
pub(crate) async fn execute(self) -> Result<(), NyxChainWatcherError> {
match self.command {
Commands::BuildInfo(args) => build_info::execute(args),
Commands::Run(args) => run::execute(*args, self.http_port).await,
Commands::Init(args) => init::execute(args).await,
}
}
}
#[derive(Subcommand, Debug)]
pub(crate) enum Commands {
/// Show build information of this binary
BuildInfo(build_info::Args),
/// Start this nym-chain-watcher
Run(Box<run::Args>),
/// Initialise config
Init(init::Args),
}
+249
View File
@@ -0,0 +1,249 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::template::CONFIG_TEMPLATE;
use nym_bin_common::logging::LoggingSettings;
use nym_config::{
must_get_home, read_config_from_toml_file, save_unformatted_config_to_file, NymConfigTemplate,
DEFAULT_CONFIG_DIR, DEFAULT_CONFIG_FILENAME, DEFAULT_DATA_DIR, NYM_DIR,
};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tracing::{debug, error};
pub(crate) mod payments_watcher;
mod template;
pub use crate::config::payments_watcher::PaymentWatcherConfig;
use crate::error::NyxChainWatcherError;
const DEFAULT_NYM_CHAIN_WATCHER_DIR: &str = "nym-chain-watcher";
pub(crate) const DEFAULT_NYM_CHAIN_WATCHER_DB_FILENAME: &str = "nyx_chain_watcher.sqlite";
pub(crate) const DEFAULT_NYM_CHAIN_SCRAPER_HISTORY_DB_FILENAME: &str = "chain_history.sqlite";
/// Derive default path to nym-chain-watcher's config directory.
/// It should get resolved to `$HOME/.nym/nym-chain-watcher/config`
pub fn default_config_directory() -> PathBuf {
must_get_home()
.join(NYM_DIR)
.join(DEFAULT_NYM_CHAIN_WATCHER_DIR)
.join(DEFAULT_CONFIG_DIR)
}
/// Derive default path to nym-chain-watcher's config file.
/// It should get resolved to `$HOME/.nym/nym-chain-watcher/config/config.toml`
pub fn default_config_filepath() -> PathBuf {
default_config_directory().join(DEFAULT_CONFIG_FILENAME)
}
pub struct ConfigBuilder {
pub config_path: PathBuf,
pub data_dir: PathBuf,
pub db_path: Option<String>,
pub chain_scraper_db_path: Option<String>,
pub payment_watcher_config: Option<PaymentWatcherConfig>,
pub logging: Option<LoggingSettings>,
}
impl ConfigBuilder {
pub fn new(config_path: PathBuf, data_dir: PathBuf) -> Self {
ConfigBuilder {
config_path,
data_dir,
payment_watcher_config: None,
logging: None,
db_path: None,
chain_scraper_db_path: None,
}
}
pub fn with_db_path(mut self, db_path: String) -> Self {
self.db_path = Some(db_path);
self
}
pub fn with_chain_scraper_db_path(mut self, chain_scraper_db_path: String) -> Self {
self.chain_scraper_db_path = Some(chain_scraper_db_path);
self
}
#[allow(dead_code)]
pub fn with_payment_watcher_config(
mut self,
payment_watcher_config: impl Into<PaymentWatcherConfig>,
) -> Self {
self.payment_watcher_config = Some(payment_watcher_config.into());
self
}
#[allow(dead_code)]
pub fn with_logging(mut self, section: impl Into<Option<LoggingSettings>>) -> Self {
self.logging = section.into();
self
}
pub fn build(self) -> Config {
Config {
logging: self.logging.unwrap_or_default(),
save_path: Some(self.config_path),
payment_watcher_config: self.payment_watcher_config,
data_dir: self.data_dir,
db_path: self.db_path,
chain_scraper_db_path: self.chain_scraper_db_path,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
// additional metadata holding on-disk location of this config file
#[serde(skip)]
pub(crate) save_path: Option<PathBuf>,
#[serde(skip)]
pub(crate) data_dir: PathBuf,
#[serde(skip)]
db_path: Option<String>,
#[serde(skip)]
chain_scraper_db_path: Option<String>,
pub payment_watcher_config: Option<PaymentWatcherConfig>,
#[serde(default)]
pub logging: LoggingSettings,
}
impl NymConfigTemplate for Config {
fn template(&self) -> &'static str {
CONFIG_TEMPLATE
}
}
impl Config {
#[allow(unused)]
pub fn save(&self) -> Result<(), NyxChainWatcherError> {
let save_location = self.save_location();
debug!(
"attempting to save config file to '{}'",
save_location.display()
);
save_unformatted_config_to_file(self, &save_location).map_err(|source| {
NyxChainWatcherError::UnformattedConfigSaveFailure {
path: save_location,
source,
}
})
}
#[allow(unused)]
pub fn save_location(&self) -> PathBuf {
self.save_path
.clone()
.unwrap_or(self.default_save_location())
}
#[allow(unused)]
pub fn default_save_location(&self) -> PathBuf {
default_config_filepath()
}
pub fn default_data_directory<P: AsRef<Path>>(
config_path: P,
) -> Result<PathBuf, NyxChainWatcherError> {
let config_path = config_path.as_ref();
// we got a proper path to the .toml file
let Some(config_dir) = config_path.parent() else {
error!(
"'{}' does not have a parent directory. Have you pointed to the fs root?",
config_path.display()
);
return Err(NyxChainWatcherError::DataDirDerivationFailure);
};
let Some(config_dir_name) = config_dir.file_name() else {
error!(
"could not obtain parent directory name of '{}'. Have you used relative paths?",
config_path.display()
);
return Err(NyxChainWatcherError::DataDirDerivationFailure);
};
if config_dir_name != DEFAULT_CONFIG_DIR {
error!(
"the parent directory of '{}' ({}) is not {DEFAULT_CONFIG_DIR}. currently this is not supported",
config_path.display(), config_dir_name.to_str().unwrap_or("UNKNOWN")
);
return Err(NyxChainWatcherError::DataDirDerivationFailure);
}
let Some(node_dir) = config_dir.parent() else {
error!(
"'{}' does not have a parent directory. Have you pointed to the fs root?",
config_dir.display()
);
return Err(NyxChainWatcherError::DataDirDerivationFailure);
};
Ok(node_dir.join(DEFAULT_DATA_DIR))
}
pub fn database_path(&self) -> String {
self.db_path.clone().unwrap_or_else(|| {
let mut path = self.data_dir.clone().to_path_buf();
path.push(DEFAULT_NYM_CHAIN_WATCHER_DB_FILENAME);
path.to_str()
.unwrap_or(DEFAULT_NYM_CHAIN_WATCHER_DB_FILENAME)
.to_string()
})
}
pub fn chain_scraper_database_path(&self) -> String {
self.chain_scraper_db_path.clone().unwrap_or_else(|| {
let mut path = self.data_dir.clone().to_path_buf();
path.push(DEFAULT_NYM_CHAIN_SCRAPER_HISTORY_DB_FILENAME);
path.to_str()
.unwrap_or(DEFAULT_NYM_CHAIN_SCRAPER_HISTORY_DB_FILENAME)
.to_string()
})
}
// simple wrapper that reads config file and assigns path location
fn read_from_path<P: AsRef<Path>>(path: P, data_dir: P) -> Result<Self, NyxChainWatcherError> {
let path = path.as_ref();
let data_dir = data_dir.as_ref();
let mut loaded: Config = read_config_from_toml_file(path).map_err(|source| {
NyxChainWatcherError::ConfigLoadFailure {
path: path.to_path_buf(),
source,
}
})?;
loaded.data_dir = data_dir.to_path_buf();
loaded.save_path = Some(path.to_path_buf());
debug!("loaded config file from {}", path.display());
Ok(loaded)
}
#[allow(unused)]
pub fn read_from_toml_file<P: AsRef<Path>>(
path: P,
data_dir: P,
) -> Result<Self, NyxChainWatcherError> {
Self::read_from_path(path, data_dir)
}
pub fn read_from_toml_file_in_default_location() -> Result<Self, NyxChainWatcherError> {
let config_path = default_config_filepath();
let data_dir = Config::default_data_directory(&config_path)?;
Self::read_from_path(config_path, data_dir)
}
}
@@ -0,0 +1,23 @@
use nym_validator_client::nyxd::AccountId;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PaymentWatcherConfig {
pub watchers: Vec<PaymentWatcherEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaymentWatcherEntry {
pub id: String,
pub description: Option<String>,
pub webhook_url: String,
pub watch_for_transfer_recipient_accounts: Option<Vec<AccountId>>,
pub watch_for_chain_message_types: Option<Vec<String>>,
pub authentication: Option<HttpAuthenticationOptions>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HttpAuthenticationOptions {
AuthorizationBearerToken { token: String },
}
+29
View File
@@ -0,0 +1,29 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// While using normal toml marshalling would have been way simpler with less overhead,
// I think it's useful to have comments attached to the saved config file to explain behaviour of
// particular fields.
// Note: any changes to the template must be reflected in the appropriate structs.
pub(crate) const CONFIG_TEMPLATE: &str = r#"
# This is a TOML config file.
# For more information, see https://github.com/toml-lang/toml
[payment_watcher_config]
{{#each payment_watcher_config.watchers }}
[[watchers]]
id={{this.id}}
description='{{this.description}}'
webhook_url='{{this.webhook_url}}'
{{/each}}
##### logging configuration options #####
[logging]
# TODO
"#;
@@ -1,13 +1,16 @@
use anyhow::{anyhow, Result};
use sqlx::{migrate::Migrator, postgres::PgConnectOptions, ConnectOptions, PgPool};
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, SqlitePool};
use std::str::FromStr;
pub(crate) mod models;
pub(crate) mod queries;
pub(crate) mod queries {
pub mod payments;
pub mod price;
}
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
pub(crate) type DbPool = PgPool;
pub(crate) type DbPool = SqlitePool;
pub(crate) struct Storage {
pool: DbPool,
@@ -16,13 +19,16 @@ pub(crate) struct Storage {
impl Storage {
pub async fn init(connection_url: String) -> Result<Self> {
let connect_options =
PgConnectOptions::from_str(&connection_url)?.disable_statement_logging();
SqliteConnectOptions::from_str(&connection_url)?.create_if_missing(true);
let pool = DbPool::connect_with(connect_options)
.await
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
MIGRATOR.run(&pool).await?;
MIGRATOR
.run(&pool)
.await
.map_err(|err| anyhow!("Failed to run migrations: {}", err))?;
Ok(Storage { pool })
}
+41
View File
@@ -0,0 +1,41 @@
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
#[derive(Clone, Deserialize, Debug)]
pub(crate) struct CurrencyPrices {
pub(crate) chf: f32,
pub(crate) usd: f32,
pub(crate) eur: f32,
pub(crate) btc: f32,
}
// Struct to hold Coingecko response
#[derive(Clone, Deserialize, Debug, ToSchema)]
pub(crate) struct CoingeckoPriceResponse {
pub(crate) nym: CurrencyPrices,
}
#[derive(Clone, Deserialize, Debug, ToSchema)]
pub(crate) struct PriceRecord {
pub(crate) timestamp: i64,
pub(crate) nym: CurrencyPrices,
}
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct PriceHistory {
pub(crate) timestamp: i64,
pub(crate) chf: f64,
pub(crate) usd: f64,
pub(crate) eur: f64,
pub(crate) btc: f64,
}
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct PaymentRecord {
pub(crate) transaction_hash: String,
pub(crate) sender_address: String,
pub(crate) receiver_address: String,
pub(crate) amount: f64,
pub(crate) timestamp: i64,
pub(crate) height: i64,
}
+6
View File
@@ -0,0 +1,6 @@
mod payments;
mod price;
// re-exporting allows us to access all queries via `queries::bla``
pub(crate) use payments::{get_last_checked_height, insert_payment};
pub(crate) use price::{get_latest_price, insert_nym_prices};
@@ -0,0 +1,41 @@
use crate::db::DbPool;
use anyhow::Result;
pub async fn get_last_checked_height(pool: &DbPool) -> Result<i64> {
let result = sqlx::query_scalar!("SELECT MAX(height) FROM payments")
.fetch_one(pool)
.await?;
Ok(result.unwrap_or(0))
}
pub async fn insert_payment(
pool: &DbPool,
transaction_hash: String,
sender_address: String,
receiver_address: String,
amount: f64,
height: i64,
memo: Option<String>,
) -> Result<()> {
let timestamp = chrono::Utc::now().timestamp();
sqlx::query!(
r#"
INSERT INTO payments (
transaction_hash, sender_address, receiver_address,
amount, height, timestamp, memo
) VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
transaction_hash,
sender_address,
receiver_address,
amount,
height,
timestamp,
memo,
)
.execute(pool)
.await?;
Ok(())
}
+86
View File
@@ -0,0 +1,86 @@
use crate::db::models::{PriceHistory, PriceRecord};
use crate::db::DbPool;
use chrono::Local;
use std::ops::Sub;
pub(crate) async fn insert_nym_prices(
pool: &DbPool,
price_data: PriceRecord,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
let timestamp = price_data.timestamp;
sqlx::query!(
"INSERT INTO price_history
(timestamp, chf, usd, eur, btc)
VALUES
($1, $2, $3, $4, $5)
ON CONFLICT(timestamp) DO UPDATE SET
chf=excluded.chf,
usd=excluded.usd,
eur=excluded.eur,
btc=excluded.btc;",
timestamp,
price_data.nym.chf,
price_data.nym.usd,
price_data.nym.eur,
price_data.nym.btc,
)
.execute(&mut *conn)
.await?;
Ok(())
}
pub(crate) async fn get_latest_price(pool: &DbPool) -> anyhow::Result<PriceHistory> {
let result = sqlx::query!(
"SELECT timestamp, chf, usd, eur, btc FROM price_history ORDER BY timestamp DESC LIMIT 1;"
)
.fetch_one(pool)
.await?;
Ok(PriceHistory {
timestamp: result.timestamp,
chf: result.chf,
usd: result.usd,
eur: result.eur,
btc: result.btc,
})
}
pub(crate) async fn get_average_price(pool: &DbPool) -> anyhow::Result<PriceHistory> {
// now less 1 day
let earliest_timestamp = Local::now().sub(chrono::Duration::days(1)).timestamp();
let result = sqlx::query!(
"SELECT timestamp, chf, usd, eur, btc FROM price_history WHERE timestamp >= $1;",
earliest_timestamp
)
.fetch_all(pool)
.await?;
let count = result.len() as f64;
let mut price = PriceHistory {
timestamp: Local::now().timestamp(),
chf: 0f64,
usd: 0f64,
eur: 0f64,
btc: 0f64,
};
for p in &result {
price.chf += p.chf;
price.usd += p.usd;
price.eur += p.eur;
price.btc += p.btc;
}
if count > 0f64 {
price.chf /= count;
price.usd /= count;
price.eur /= count;
price.btc /= count;
}
Ok(price)
}
+29
View File
@@ -0,0 +1,29 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
#[allow(unused)]
pub mod vars {
pub const NYX_CHAIN_WATCHER_NO_BANNER_ARG: &str = "NYX_CHAIN_WATCHER_NO_BANNER";
pub const NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG: &str = "NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG";
pub const NYX_CHAIN_WATCHER_DATABASE_PATH: &str = "NYX_CHAIN_WATCHER_DATABASE_PATH";
pub const NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH: &str =
"NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH";
pub const NYXD_SCRAPER_START_HEIGHT: &str = "NYXD_SCRAPER_START_HEIGHT";
pub const NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT: &str =
"NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT";
pub const NYXD_SCRAPER_UNSAFE_NUKE_DB: &str = "NYXD_SCRAPER_UNSAFE_NUKE_DB";
pub const NYX_CHAIN_WATCHER_ID_ARG: &str = "NYX_CHAIN_WATCHER_ID";
pub const NYX_CHAIN_WATCHER_OUTPUT_ARG: &str = "NYX_CHAIN_WATCHER_OUTPUT";
pub const NYX_CHAIN_WATCHER_CONFIG_PATH_ARG: &str = "NYX_CHAIN_WATCHER_CONFIG";
pub const NYX_CHAIN_WATCHER_WATCH_ACCOUNTS: &str = "NYX_CHAIN_WATCHER_WATCH_ACCOUNTS";
pub const NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES: &str =
"NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES";
pub const NYX_CHAIN_WATCHER_WEBHOOK_URL: &str = "NYX_CHAIN_WATCHER_WEBHOOK_URL";
pub const NYX_CHAIN_WATCHER_WEBHOOK_AUTH: &str = "NYX_CHAIN_WATCHER_WEBHOOK_AUTH";
}
+40
View File
@@ -0,0 +1,40 @@
use std::io;
use std::path::PathBuf;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NyxChainWatcherError {
// #[error("failed to save config file using path '{}'. detailed message: {source}", path.display())]
// ConfigSaveFailure {
// path: PathBuf,
// #[source]
// source: io::Error,
// },
#[error("failed to save config file using path '{}'. detailed message: {source}", path.display())]
UnformattedConfigSaveFailure {
path: PathBuf,
#[source]
source: nym_config::error::NymConfigTomlError,
},
#[error("could not derive path to data directory of this nyx chain watcher")]
DataDirDerivationFailure,
// #[error("could not derive path to config directory of this nyx chain watcher")]
// ConfigDirDerivationFailure,
#[error("failed to load config file using path '{}'. detailed message: {source}", path.display())]
ConfigLoadFailure {
path: PathBuf,
#[source]
source: io::Error,
},
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
AnyhowFailure(#[from] anyhow::Error),
#[error(transparent)]
NymConfigTomlE(#[from] nym_config::error::NymConfigTomlError),
}
@@ -7,8 +7,7 @@ use utoipa_swagger_ui::SwaggerUi;
use crate::http::{api_docs, server::HttpServer, state::AppState};
pub(crate) mod jokes;
pub(crate) mod mixnodes;
pub(crate) mod price;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
@@ -25,12 +24,7 @@ impl RouterBuilder {
"/",
axum::routing::get(|| async { Redirect::permanent("/swagger") }),
)
.nest(
"/v1",
Router::new()
.nest("/jokes", jokes::routes())
.nest("/mixnodes", mixnodes::routes()),
);
.nest("/v1", Router::new().nest("/price", price::routes()));
Self {
unfinished_router: router,
+44
View File
@@ -0,0 +1,44 @@
use crate::db::models::PriceHistory;
use crate::db::queries::price::{get_average_price, get_latest_price};
use crate::http::error::Error;
use crate::http::error::HttpResult;
use crate::http::state::AppState;
use axum::{extract::State, Json, Router};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(price))
.route("/average", axum::routing::get(average_price))
}
#[utoipa::path(
tag = "NYM Price",
get,
path = "/v1/price",
responses(
(status = 200, body = String)
)
)]
/// Fetch the latest price cached by this API
async fn price(State(state): State<AppState>) -> HttpResult<Json<PriceHistory>> {
get_latest_price(state.db_pool())
.await
.map(Json::from)
.map_err(|_| Error::internal())
}
#[utoipa::path(
tag = "NYM Price",
get,
path = "/v1/price/average",
responses(
(status = 200, body = String)
)
)]
/// Fetch the average price cached by this API
async fn average_price(State(state): State<AppState>) -> HttpResult<Json<PriceHistory>> {
get_average_price(state.db_pool())
.await
.map(Json::from)
.map_err(|_| Error::internal())
}
@@ -4,7 +4,7 @@ use utoipauto::utoipauto;
// manually import external structs which are behind feature flags because they
// can't be automatically discovered
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
#[utoipauto(paths = "./nym-data-observatory/src")]
#[utoipauto(paths = "./nyx-chain-watcher/src")]
#[derive(OpenApi)]
#[openapi(info(title = "Nym API"), tags(), components(schemas()))]
#[openapi(info(title = "Nyx Chain Watcher API"), tags(), components(schemas()))]
pub(super) struct ApiDoc;
@@ -6,13 +6,6 @@ pub(crate) struct Error {
}
impl Error {
pub(crate) fn not_found(message: String) -> Self {
Self {
message,
status: axum::http::StatusCode::NOT_FOUND,
}
}
pub(crate) fn internal() -> Self {
Self {
message: String::from("Internal server error"),
+35
View File
@@ -0,0 +1,35 @@
use clap::{crate_name, crate_version, Parser};
use nym_bin_common::bin_info_owned;
use nym_bin_common::logging::maybe_print_banner;
use nym_network_defaults::setup_env;
use tracing::info;
mod chain_scraper;
mod cli;
mod config;
mod db;
mod env;
mod error;
mod http;
mod logging;
pub mod models;
mod payment_listener;
mod price_scraper;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = cli::Cli::parse();
setup_env(cli.config_env_file.as_ref());
logging::setup_tracing_logger();
if !cli.no_banner {
maybe_print_banner(crate_name!(), crate_version!());
}
let bin_info = bin_info_owned!();
info!("using the following version: {bin_info}");
cli.execute().await?;
Ok(())
}
+15
View File
@@ -0,0 +1,15 @@
use nym_validator_client::nyxd::CosmWasmCoin;
use rocket::serde::{Deserialize, Serialize};
use schemars::JsonSchema;
use utoipa::ToSchema;
#[derive(Serialize, Deserialize, Clone, JsonSchema, ToSchema)]
pub struct WebhookPayload {
pub transaction_hash: String,
pub message_index: u64,
pub sender_address: String,
pub receiver_address: String,
pub funds: CosmWasmCoin,
pub height: u128,
pub memo: Option<String>,
}
@@ -0,0 +1,108 @@
use crate::config::payments_watcher::HttpAuthenticationOptions;
use crate::config::PaymentWatcherConfig;
use crate::db::queries;
use crate::models::WebhookPayload;
use nym_validator_client::nyxd::Coin;
use reqwest::Client;
use sqlx::SqlitePool;
use std::str::FromStr;
use tokio::time::{self, Duration};
use tracing::{error, info};
pub(crate) async fn run_payment_listener(
payment_watcher_config: PaymentWatcherConfig,
watcher_pool: SqlitePool,
) -> anyhow::Result<()> {
let client = Client::new();
loop {
// 1. get the last height this watcher ran at
let last_checked_height = queries::payments::get_last_checked_height(&watcher_pool).await?;
info!("Last checked height: {}", last_checked_height);
// 2. iterate through watchers
for watcher in &payment_watcher_config.watchers {
if watcher.watch_for_transfer_recipient_accounts.is_some() {
// 3. Query new transactions for this watcher's recipient accounts
let transactions = sqlx::query!(
r#"
SELECT * FROM transactions
WHERE height > ?
ORDER BY height ASC, message_index ASC
"#,
last_checked_height
)
.fetch_all(&watcher_pool)
.await?;
if !transactions.is_empty() {
info!(
"[watcher = {}] Processing {} transactions",
watcher.id,
transactions.len()
);
}
for tx in transactions {
let funds = Coin::from_str(&tx.amount)?;
let amount: f64 = funds.amount as f64 / 1e6f64; // convert to major value, there will be precision loss
// Store transaction hash for later use
let tx_hash = tx.tx_hash.clone();
let message_index = tx.message_index;
queries::payments::insert_payment(
&watcher_pool,
tx.tx_hash,
tx.sender.clone(),
tx.recipient.clone(),
amount,
tx.height,
tx.memo.clone(),
)
.await?;
let webhook_data = WebhookPayload {
transaction_hash: tx_hash.clone(),
message_index: message_index as u64,
sender_address: tx.sender,
receiver_address: tx.recipient,
funds: funds.into(),
height: tx.height as u128,
memo: tx.memo,
};
let mut request_builder = client.post(&watcher.webhook_url).json(&webhook_data);
if let Some(auth) = &watcher.authentication {
match auth {
HttpAuthenticationOptions::AuthorizationBearerToken { token } => {
request_builder = request_builder.bearer_auth(token);
}
}
}
match request_builder.send().await {
Ok(res) => info!(
"[watcher = {}] ✅ Webhook {} {} - tx {}, index {}",
watcher.id,
res.status(),
res.url(),
tx_hash,
message_index,
),
Err(e) => error!(
"[watcher = {}] ❌ Webhook {:?} {:?} error = {}",
watcher.id,
e.status(),
e.url(),
e,
),
}
}
}
}
time::sleep(Duration::from_secs(10)).await;
}
}
@@ -0,0 +1,55 @@
use crate::db::{
models::{CoingeckoPriceResponse, PriceRecord},
queries::price::insert_nym_prices,
};
use core::str;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use crate::db::DbPool;
const REFRESH_DELAY: Duration = Duration::from_secs(300);
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
const COINGECKO_API_URL: &str =
"https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,btc";
pub(crate) async fn run_price_scraper(db_pool: &DbPool) -> JoinHandle<()> {
loop {
tracing::info!("Running in a loop 🏃");
if let Err(e) = get_coingecko_prices(db_pool).await {
tracing::error!("❌ Failed to get CoinGecko prices: {e}");
tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs());
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!("✅ Successfully fetched CoinGecko prices");
tokio::time::sleep(REFRESH_DELAY).await;
}
}
}
async fn get_coingecko_prices(pool: &DbPool) -> anyhow::Result<()> {
tracing::info!("💰 Fetching CoinGecko prices from {}", COINGECKO_API_URL);
let response = reqwest::get(COINGECKO_API_URL)
.await?
.json::<CoingeckoPriceResponse>()
.await;
tracing::info!("Got response {:?}", response);
match response {
Ok(resp) => {
let price_record = PriceRecord {
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
nym: resp.nym,
};
insert_nym_prices(pool, price_record).await?;
}
Err(e) => {
//tracing::info!("💰 CoinGecko price response: {:?}", response);
tracing::error!("Error sending request: {}", e);
}
}
Ok(())
}