Compare commits

...

16 Commits

Author SHA1 Message Date
Mark Sinclair 64091eb175 Bump version 2024-12-09 15:07:43 +00:00
Jędrzej Stuczyński 6204014395 implemented starting block logic inside the chain scraper itself 2024-12-09 13:43:49 +00:00
Mark Sinclair d4680382f5 Fix docker entry point and bump version 2024-12-06 21:42:29 +00:00
Mark Sinclair 2e9ab39002 Add Dockerfile and workflow to build 2024-12-06 20:57:49 +00:00
Mark Sinclair 4877a93201 parse message index and process all log entries 2024-12-06 20:11:45 +00:00
Mark Sinclair d38abba9a3 init saves example config 2024-12-06 20:11:14 +00:00
Mark Sinclair 93b0063f73 Add example to README 2024-12-06 20:10:46 +00:00
Mark Sinclair 247a4cefa6 Remove migration from seed app 2024-12-06 20:10:36 +00:00
Mark Sinclair ade88af3cb Add config, overrides and CLI 2024-12-06 18:12:45 +00:00
Mark Sinclair 0b99c1f467 Move nym-data-observatory (v0) to nyx-chain-watcher 2024-12-06 17:32:21 +00:00
Mark Sinclair 46fb4617e8 data-observatory: renamed transactions to payments because there is already transaction in the base scraper schema 2024-12-06 17:32:21 +00:00
Mark Sinclair ed821d1a57 nyxd-scraper: add optional starting height parameter to scrape before listening for new blocks 2024-12-06 17:32:21 +00:00
Sachin Kamath 23e52653d7 observatory 0.1 2024-12-06 17:32:21 +00:00
Sachin Kamath 8fba032e26 fix review comments 2024-12-06 17:32:21 +00:00
Sachin Kamath 504b2af388 clippy 2024-12-06 17:32:21 +00:00
Sachin Kamath 5a8e91542e nyxd-scraper: add config to make pre-commit storage optional 2024-12-06 17:32:21 +00:00
71 changed files with 1870 additions and 600 deletions
@@ -0,0 +1,55 @@
name: Build and upload Nyx Chain Watcher container to harbor.nymte.ch
on:
workflow_dispatch:
env:
WORKING_DIRECTORY: "nyx-chain-watcher"
CONTAINER_NAME: "nyx-chain-watcher"
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
with:
registry: harbor.nymte.ch
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
- name: Checkout repo
uses: actions/checkout@v4
- name: Configure git identity
run: |
git config --global user.email "lawrence@nymtech.net"
git config --global user.name "Lawrence Stalder"
- name: Get version from cargo.toml
uses: mikefarah/yq@v4.44.5
id: get_version
with:
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
- name: Check if tag exists
run: |
if git rev-parse ${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
fi
- name: Remove existing tag if exists
run: |
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
fi
- name: Create tag
run: |
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
- name: BuildAndPushImageOnHarbor
run: |
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
+4 -1
View File
@@ -51,4 +51,7 @@ ppa-private-key.b64
ppa-private-key.asc
nym-network-monitor/topology.json
nym-network-monitor/__pycache__
nym-network-monitor/*.key
nym-network-monitor/*.key
*.sqlite
.build
Generated
+34 -25
View File
@@ -5064,6 +5064,7 @@ dependencies = [
"log",
"nym-network-defaults",
"serde",
"thiserror",
"toml 0.8.14",
"url",
]
@@ -5300,31 +5301,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "nym-data-observatory"
version = "0.1.0"
dependencies = [
"anyhow",
"axum 0.7.7",
"chrono",
"clap 4.5.20",
"nym-bin-common",
"nym-network-defaults",
"nym-node-requests",
"nym-task",
"serde",
"serde_json",
"sqlx",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nym-dkg"
version = "0.1.0"
@@ -6957,6 +6933,39 @@ dependencies = [
"url",
]
[[package]]
name = "nyx-chain-watcher"
version = "0.1.2"
dependencies = [
"anyhow",
"axum 0.7.7",
"chrono",
"clap 4.5.20",
"nym-bin-common",
"nym-config",
"nym-network-defaults",
"nym-node-requests",
"nym-task",
"nym-validator-client",
"nyxd-scraper",
"reqwest 0.12.4",
"rocket",
"schemars",
"serde",
"serde_json",
"sqlx",
"thiserror",
"time",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nyxd-scraper"
version = "0.1.0"
+2 -2
View File
@@ -119,8 +119,8 @@ members = [
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-network-monitor",
"nyx-chain-watcher",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
@@ -157,11 +157,11 @@ default-members = [
"explorer-api",
"nym-api",
"nym-credential-proxy/nym-credential-proxy",
"nym-data-observatory",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
+2 -1
View File
@@ -12,7 +12,8 @@ dirs = { workspace = true, optional = true }
handlebars = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
toml = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true, features = ["display"] }
url = { workspace = true }
nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
+10
View File
@@ -0,0 +1,10 @@
use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NymConfigTomlError {
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
TomlSerializeFailure(#[from] toml::ser::Error),
}
+37
View File
@@ -13,6 +13,7 @@ pub use helpers::{parse_urls, OptionalSet};
pub use toml::de::Error as TomlDeError;
pub mod defaults;
pub mod error;
pub mod helpers;
pub mod legacy_helpers;
pub mod serde_helpers;
@@ -95,6 +96,42 @@ where
config.format_to_writer(file)
}
pub fn save_unformatted_config_to_file<C, P>(
config: &C,
path: P,
) -> Result<(), error::NymConfigTomlError>
where
C: Serialize + ?Sized,
P: AsRef<Path>,
{
let path = path.as_ref();
log::info!("saving config file to {}", path.display());
if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}
let mut file = File::create(path)?;
// TODO: check for whether any of our configs store anything sensitive
// and change that to 0o644 instead
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(path, perms)?;
}
// let serde format the TOML in whatever ugly way it chooses
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
let toml_string = toml::to_string_pretty(config)?;
Ok(file.write_all(toml_string.as_bytes())?)
}
pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
where
C: DeserializeOwned,
+81 -9
View File
@@ -42,8 +42,43 @@ impl PendingSync {
}
}
#[derive(Debug, Clone)]
pub struct BlockProcessorConfig {
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub explicit_starting_block_height: Option<u32>,
pub use_best_effort_start_height: bool,
}
impl Default for BlockProcessorConfig {
fn default() -> Self {
Self {
pruning_options: PruningOptions::nothing(),
store_precommits: true,
explicit_starting_block_height: None,
use_best_effort_start_height: false,
}
}
}
impl BlockProcessorConfig {
pub fn new(
pruning_options: PruningOptions,
store_precommits: bool,
explicit_starting_block_height: Option<u32>,
use_best_effort_start_height: bool,
) -> Self {
Self {
pruning_options,
store_precommits,
explicit_starting_block_height,
use_best_effort_start_height,
}
}
}
pub struct BlockProcessor {
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
@@ -65,9 +100,10 @@ pub struct BlockProcessor {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
pub async fn new(
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
@@ -82,7 +118,7 @@ impl BlockProcessor {
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
Ok(BlockProcessor {
pruning_options,
config,
cancel,
synced,
last_processed_height,
@@ -101,7 +137,7 @@ impl BlockProcessor {
}
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.pruning_options = pruning_options;
self.config.pruning_options = pruning_options;
self
}
@@ -128,7 +164,7 @@ impl BlockProcessor {
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;
persist_block(&full_info, &mut tx).await?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
// let the modules do whatever they want
// the ones wanting the full block:
@@ -241,7 +277,7 @@ impl BlockProcessor {
#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;
info!(
@@ -282,12 +318,12 @@ impl BlockProcessor {
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");
if self.pruning_options.strategy.is_nothing() {
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}
let interval = self.pruning_options.strategy_interval();
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
@@ -371,13 +407,49 @@ impl BlockProcessor {
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
self.last_processed_height = max(self.last_processed_height, last_to_keep);
let request_range = self.last_processed_height + 1..latest_block + 1;
info!("we need to request {request_range:?} to resync");
self.request_missing_blocks(request_range).await?;
return Ok(());
}
// this is the first time starting up
if self.last_processed_height == 0 {
let Some(starting_height) = self.config.explicit_starting_block_height else {
// nothing to do
return Ok(());
};
info!("attempting to start the scraper from block {starting_height}");
let earliest_available =
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");
if earliest_available > starting_height && self.config.use_best_effort_start_height {
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
});
}
let starting_height = if earliest_available > starting_height {
// add few additional blocks to account for all the startup waiting
// because the node might have pruned few blocks since
earliest_available + 10
} else {
starting_height
};
let request_range = starting_height..latest_block + 1;
info!("going to start the scraper from block {starting_height}");
info!("we need to request {request_range:?} before properly starting up");
self.request_missing_blocks(request_range).await?;
}
Ok(())
+3
View File
@@ -19,6 +19,9 @@ pub enum ScraperError {
#[error("the block scraper is already running")]
ScraperAlreadyRunning,
#[error("block information for height {height} is not available on the provided rpc endpoint")]
BlocksUnavailable { height: u32 },
#[error("failed to establish websocket connection to {url}: {source}")]
WebSocketConnectionFailure {
url: String,
+1 -1
View File
@@ -16,5 +16,5 @@ pub mod storage;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use modules::{BlockModule, MsgModule, TxModule};
pub use scraper::{Config, NyxdScraper};
pub use scraper::{Config, NyxdScraper, StartingBlockOpts};
pub use storage::models;
+11
View File
@@ -117,6 +117,17 @@ impl RpcClient {
Ok(info.last_block_height.value())
}
pub(crate) async fn earliest_available_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting earliest available block height");
let status = self
.inner
.status()
.await
.map_err(|source| ScraperError::AbciInfoQueryFailure { source })?;
Ok(status.sync_info.earliest_block_height.value())
}
async fn get_transaction_results(
&self,
raw: &[Vec<u8>],
+94 -29
View File
@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::BlockToProcess;
use crate::block_processor::BlockProcessor;
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
use crate::block_requester::{BlockRequest, BlockRequester};
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
@@ -24,6 +24,15 @@ use url::Url;
mod subscriber;
#[derive(Default, Clone, Copy)]
pub struct StartingBlockOpts {
pub start_block_height: Option<u32>,
/// If the scraper fails to start from the desired height, rather than failing,
/// attempt to use the next available height
pub use_best_effort_start_height: bool,
}
pub struct Config {
/// Url to the websocket endpoint of a validator, for example `wss://rpc.nymtech.net/websocket`
pub websocket_url: Url,
@@ -34,6 +43,10 @@ pub struct Config {
pub database_path: PathBuf,
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub start_block: StartingBlockOpts,
}
pub struct NyxdScraperBuilder {
@@ -60,8 +73,16 @@ impl NyxdScraperBuilder {
req_rx,
processing_tx.clone(),
);
let mut block_processor = BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
scraper.config.pruning_options,
scraper.config.store_precommits,
scraper.config.start_block.start_block_height,
scraper.config.start_block.use_best_effort_start_height,
);
let mut block_processor = BlockProcessor::new(
block_processor_config,
scraper.cancel_token.clone(),
scraper.startup_sync.clone(),
processing_rx,
@@ -158,7 +179,10 @@ impl NyxdScraper {
self.task_tracker.close();
}
pub async fn process_single_block(&self, height: u32) -> Result<(), ScraperError> {
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_single_block(&self, height: u32) -> Result<(), ScraperError> {
info!(height = height, "attempting to process a single block");
if !self.task_tracker.is_empty() {
return Err(ScraperError::ScraperAlreadyRunning);
@@ -177,7 +201,10 @@ impl NyxdScraper {
block_processor.process_block(block.into()).await
}
pub async fn process_block_range(
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_block_range(
&self,
starting_height: Option<u32>,
end_height: Option<u32>,
@@ -194,10 +221,10 @@ impl NyxdScraper {
.await?
.with_pruning(PruningOptions::nothing());
let current_height = self.rpc_client.current_block_height().await? as u32;
let mut current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
let starting_height = match starting_height {
let mut starting_height = match starting_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -211,7 +238,8 @@ impl NyxdScraper {
}
};
let end_height = match end_height {
let must_catch_up = end_height.is_none();
let mut end_height = match end_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -226,32 +254,62 @@ impl NyxdScraper {
}
};
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
let mut last_processed = starting_height;
let range = (starting_height..=end_height).collect::<Vec<_>>();
while last_processed < current_height {
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
let range = (starting_height..=end_height).collect::<Vec<_>>();
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
}
}
}
}
// if we don't need to catch up, return early
if !must_catch_up {
return Ok(());
}
// check if we have caught up to the current block height
last_processed = end_height;
current_height = self.rpc_client.current_block_height().await? as u32;
info!(
last_processed = last_processed,
current_height = current_height,
"🏃 still need to catch up..."
);
starting_height = last_processed + 1;
end_height = current_height;
}
if must_catch_up {
info!(
last_processed = last_processed,
current_height = current_height,
"✅ block processing has caught up!"
);
}
Ok(())
@@ -275,8 +333,15 @@ impl NyxdScraper {
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
BlockProcessor::new(
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
self.config.store_precommits,
self.config.start_block.start_block_height,
self.config.start_block.use_best_effort_start_height,
);
BlockProcessor::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
processing_rx,
@@ -237,6 +237,58 @@ impl StorageManager {
Ok(-1)
}
}
#[allow(dead_code)]
pub async fn get_transactions_after_height(
&self,
min_height: i64,
message_type: Option<&str>,
) -> Result<Vec<TransactionWithBlock>, sqlx::Error> {
match message_type {
Some(msg_type) => {
sqlx::query_as!(
TransactionWithBlock,
r#"
SELECT t.hash, t.height, t.memo, t.raw_log
FROM message m
JOIN "transaction" t ON m.transaction_hash = t.hash
JOIN block b ON t.height = b.height
WHERE t.height > ?
AND m.type = ?
ORDER BY t.height ASC
"#,
min_height,
msg_type
)
.fetch_all(&self.connection_pool)
.await
}
None => {
sqlx::query_as!(
TransactionWithBlock,
r#"
SELECT t.hash, t.height, t.memo, t.raw_log
FROM message m
JOIN "transaction" t ON m.transaction_hash = t.hash
JOIN block b ON t.height = b.height
WHERE t.height > ?
ORDER BY t.height ASC
"#,
min_height
)
.fetch_all(&self.connection_pool)
.await
}
}
}
}
#[derive(Debug, sqlx::FromRow)]
pub struct TransactionWithBlock {
pub hash: String,
pub height: i64,
pub memo: Option<String>,
pub raw_log: Option<String>,
}
// make those generic over executor so that they could be performed over connection pool and a tx
+19 -5
View File
@@ -13,6 +13,7 @@ use crate::{
models::{CommitSignature, Validator},
},
};
use manager::TransactionWithBlock;
use sqlx::{types::time::OffsetDateTime, ConnectOptions, Sqlite, Transaction};
use std::{fmt::Debug, path::Path};
use tendermint::{
@@ -207,11 +208,23 @@ impl ScraperStorage {
pub async fn get_pruned_height(&self) -> Result<i64, ScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
pub async fn get_transactions_after_height(
&self,
min_height: i64,
message_type: Option<&str>,
) -> Result<Vec<TransactionWithBlock>, ScraperError> {
Ok(self
.manager
.get_transactions_after_height(min_height, message_type)
.await?)
}
}
pub async fn persist_block(
block: &FullBlockInformation,
tx: &mut StorageTransaction,
store_precommits: bool,
) -> Result<(), ScraperError> {
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
@@ -224,11 +237,12 @@ pub async fn persist_block(
// persist block data
persist_block_data(&block.block, total_gas, tx).await?;
// persist commits
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
if store_precommits {
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
@@ -1,16 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO responses\n (joke_id, joke, date_created)\n VALUES\n ($1, $2, $3)\n ON CONFLICT(joke_id) DO UPDATE SET\n joke=excluded.joke,\n date_created=excluded.date_created;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Varchar",
"Text",
"Int4"
]
},
"nullable": []
},
"hash": "249faa11b88b749f50342bb5c9cc41d20896db543eed74a6f320c041bcbb723d"
}
@@ -1,34 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "joke_id",
"type_info": "Varchar"
},
{
"ordinal": 1,
"name": "joke",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "date_created",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "aff7fbd06728004d2f2226d20c32f1482df00de2dc1d2b4debbb2e12553d997b"
}
@@ -1,32 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT joke_id, joke, date_created FROM responses",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "joke_id",
"type_info": "Varchar"
},
{
"ordinal": 1,
"name": "joke",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "date_created",
"type_info": "Int4"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false
]
},
"hash": "e53f479f8cead3dc8aa1875e5d450ad69686cf6a109e37d6c3f0623c3e9f91d0"
}
-27
View File
@@ -1,27 +0,0 @@
FROM rust:latest AS builder
COPY ./ /usr/src/nym
WORKDIR /usr/src/nym/nym-data-observatory
RUN cargo build --release
#-------------------------------------------------------------------
# The following environment variables are required at runtime:
#
# NYM_DATA_OBSERVATORY_CONNECTION_URL
#
# And optionally:
#
# NYM_DATA_OBSERVATORY_HTTP_PORT
#
# see https://github.com/nymtech/nym/blob/develop/nym-data-observatory/src/main.rs for details
#-------------------------------------------------------------------
FROM ubuntu:24.04
RUN apt update && apt install -yy curl ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/target/release/nym-data-observatory ./
ENTRYPOINT [ "/nym/nym-data-observatory" ]
-58
View File
@@ -1,58 +0,0 @@
use anyhow::Result;
use sqlx::{Connection, PgConnection};
use std::io::Write;
use std::{collections::HashMap, fs::File};
const POSTGRES_USER: &str = "nym";
const POSTGRES_PASSWORD: &str = "password123";
const POSTGRES_DB: &str = "data_obs_db";
/// if schema changes, rerun `cargo sqlx prepare` with a running DB
/// https://github.com/launchbadge/sqlx/blob/main/sqlx-cli/README.md#enable-building-in-offline-mode-with-query
#[tokio::main]
async fn main() -> Result<()> {
let db_url =
format!("postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:5432/{POSTGRES_DB}");
export_db_variables(&db_url)?;
// if a live DB is reachable, use that
if PgConnection::connect(&db_url).await.is_ok() {
println!("cargo::rustc-env=SQLX_OFFLINE=false");
run_migrations(&db_url).await?;
} else {
// by default, run in offline mode
println!("cargo::rustc-env=SQLX_OFFLINE=true");
}
rerun_if_changed();
Ok(())
}
fn export_db_variables(db_url: &str) -> Result<()> {
let mut map = HashMap::new();
map.insert("POSTGRES_USER", POSTGRES_USER);
map.insert("POSTGRES_PASSWORD", POSTGRES_PASSWORD);
map.insert("POSTGRES_DB", POSTGRES_DB);
map.insert("DATABASE_URL", db_url);
let mut file = File::create(".env")?;
for (var, value) in map.iter() {
println!("cargo::rustc-env={}={}", var, value);
writeln!(file, "{}={}", var, value).expect("Failed to write to dotenv file");
}
Ok(())
}
async fn run_migrations(db_url: &str) -> Result<()> {
let mut conn = PgConnection::connect(db_url).await?;
sqlx::migrate!("./migrations").run(&mut conn).await?;
Ok(())
}
fn rerun_if_changed() {
println!("cargo::rerun-if-changed=migrations");
println!("cargo::rerun-if-changed=src/db/queries");
}
-31
View File
@@ -1,31 +0,0 @@
services:
postgres:
image: postgres:13
container_name: nym-data-observatory-pg
environment:
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
data-observatory:
depends_on:
- postgres
image: nym-data-observatory:latest
build:
context: ../
dockerfile: nym-data-observatory/Dockerfile
container_name: nym-data-observatory
environment:
NYM_DATA_OBSERVATORY_CONNECTION_USERNAME: "postgres"
NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD: "password"
NYM_DATA_OBSERVATORY_CONNECTION_HOST: "postgres"
NYM_DATA_OBSERVATORY_CONNECTION_PORT: "5432"
NYM_DATA_OBSERVATORY_CONNECTION_DB: ""
NYM_DATA_OBSERVATORY_HTTP_PORT: 8000
env_file:
- ../envs/qa.env
volumes:
pgdata:
@@ -1,6 +0,0 @@
CREATE TABLE responses (
id SERIAL PRIMARY KEY,
joke_id VARCHAR NOT NULL UNIQUE,
joke TEXT NOT NULL,
date_created INTEGER NOT NULL
);
-13
View File
@@ -1,13 +0,0 @@
#!/bin/bash
# .env is generated in build.rs
source .env
# Launching a container in such a way that it's destroyed after you detach from the terminal:
docker compose up
# docker exec -it nym-data-observatory-pg /bin/bash
# psql -U youruser -d yourdb
echo "Tearing down containers to have a clean slate"
docker compose down -v
@@ -1,61 +0,0 @@
use core::str;
use serde::Deserialize;
use tokio::process::Command;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use crate::db::{self, DbPool};
const REFRESH_DELAY: Duration = Duration::from_secs(15);
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
pub(crate) async fn spawn_in_background(db_pool: DbPool) -> JoinHandle<()> {
loop {
tracing::info!("Running in a loop 🏃");
if let Err(e) = some_network_action(&db_pool).await {
tracing::error!(
"❌ Run failed: {e}, retrying in {}s...",
FAILURE_RETRY_DELAY.as_secs()
);
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!(
"✅ Run successful, sleeping for {}s...",
REFRESH_DELAY.as_secs()
);
tokio::time::sleep(REFRESH_DELAY).await;
}
}
}
#[derive(Deserialize, Debug)]
pub(crate) struct Response {
#[serde(rename(deserialize = "id"))]
pub(crate) joke_id: String,
pub(crate) joke: String,
#[serde(rename(deserialize = "status"))]
pub(crate) _status: u16,
}
async fn some_network_action(pool: &DbPool) -> anyhow::Result<()> {
// for demonstration purposes only. You should use reqwest if you need it
let output = Command::new("curl")
.arg("-H")
.arg("Accept: application/json")
.arg("https://icanhazdadjoke.com/")
.output()
.await?;
if !output.status.success() {
anyhow::bail!("Curl command failed with status: {}", output.status);
}
let response_str = str::from_utf8(&output.stdout)?;
let joke_response: Response = serde_json::from_str(response_str)?;
tracing::info!("{:?}", joke_response.joke);
db::queries::insert_joke(pool, joke_response.into()).await?;
Ok(())
}
-22
View File
@@ -1,22 +0,0 @@
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use crate::background_task::Response;
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct JokeDto {
pub(crate) joke_id: String,
pub(crate) joke: String,
pub(crate) date_created: i32,
}
impl From<Response> for JokeDto {
fn from(value: Response) -> Self {
Self {
joke_id: value.joke_id,
joke: value.joke,
// casting not smart, can implicitly panic, don't do this in prod
date_created: chrono::offset::Utc::now().timestamp() as i32,
}
}
}
@@ -1,39 +0,0 @@
use crate::db::{models::JokeDto, DbPool};
pub(crate) async fn insert_joke(pool: &DbPool, joke: JokeDto) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
sqlx::query!(
"INSERT INTO responses
(joke_id, joke, date_created)
VALUES
($1, $2, $3)
ON CONFLICT(joke_id) DO UPDATE SET
joke=excluded.joke,
date_created=excluded.date_created;",
joke.joke_id,
joke.joke,
joke.date_created as i32,
)
.execute(&mut *conn)
.await?;
Ok(())
}
pub(crate) async fn select_joke_by_id(pool: &DbPool, joke_id: &str) -> anyhow::Result<JokeDto> {
sqlx::query_as!(
JokeDto,
"SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
joke_id
)
.fetch_one(pool)
.await
.map_err(anyhow::Error::from)
}
pub(crate) async fn select_all(pool: &DbPool) -> anyhow::Result<Vec<JokeDto>> {
sqlx::query_as!(JokeDto, "SELECT joke_id, joke, date_created FROM responses",)
.fetch_all(pool)
.await
.map_err(anyhow::Error::from)
}
@@ -1,5 +0,0 @@
// group queries in files by theme
mod joke;
// re-exporting allows us to access all queries via `queries::bla``
pub(crate) use joke::{insert_joke, select_all, select_joke_by_id};
@@ -1,78 +0,0 @@
use axum::{
extract::{Path, State},
Json, Router,
};
use serde::Deserialize;
use utoipa::IntoParams;
use crate::{
db::{
models::JokeDto,
queries::{self, select_joke_by_id},
},
http::{
error::{Error, HttpResult},
state::AppState,
},
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(jokes))
.route("/:joke_id", axum::routing::get(joke_by_id))
.route("/fetch_another", axum::routing::get(fetch_another))
}
#[utoipa::path(
tag = "Dad Jokes",
get,
path = "/v1/jokes",
responses(
(status = 200, body = Vec<JokeDto>)
)
)]
async fn jokes(State(state): State<AppState>) -> HttpResult<Json<Vec<JokeDto>>> {
queries::select_all(state.db_pool())
.await
.map(Json::from)
.map_err(|_| Error::internal())
}
#[derive(Deserialize, IntoParams)]
#[into_params(parameter_in = Path)]
struct JokeIdParam {
joke_id: String,
}
#[utoipa::path(
tag = "Dad Jokes",
get,
params(
JokeIdParam
),
path = "/v1/jokes/{joke_id}",
responses(
(status = 200, body = JokeDto)
)
)]
async fn joke_by_id(
Path(JokeIdParam { joke_id }): Path<JokeIdParam>,
State(state): State<AppState>,
) -> HttpResult<Json<JokeDto>> {
select_joke_by_id(state.db_pool(), &joke_id)
.await
.map(Json::from)
.map_err(|_| Error::not_found(joke_id))
}
#[utoipa::path(
tag = "Dad Jokes",
get,
path = "/v1/jokes/fetch_another",
responses(
(status = 200, body = String)
)
)]
async fn fetch_another(State(_state): State<AppState>) -> HttpResult<Json<String>> {
Ok(Json(String::from("Done boss, check the DB")))
}
-78
View File
@@ -1,78 +0,0 @@
use clap::Parser;
use nym_network_defaults::setup_env;
use nym_task::signal::wait_for_signal;
mod background_task;
mod db;
mod http;
mod logging;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// Port to listen on
#[arg(long, default_value_t = 8000, env = "NYM_DATA_OBSERVATORY_HTTP_PORT")]
http_port: u16,
/// Path to the environment variables file. If you don't provide one, variables for the mainnet will be used.
#[arg(short, long, default_value = None, env = "NYM_DATA_OBSERVATORY_ENV_FILE")]
env_file: Option<String>,
/// DB connection username
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_USERNAME")]
connection_username: String,
/// DB connection password
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD")]
connection_password: String,
/// DB connection host
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_HOST")]
connection_host: String,
/// DB connection port
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PORT")]
connection_port: String,
/// DB connection database name
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_DB")]
connection_db: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::setup_tracing_logger();
let args = Args::parse();
setup_env(args.env_file); // Defaults to mainnet if empty
let connection_url = format!(
"postgres://{}:{}@{}:{}/{}",
args.connection_username,
args.connection_password,
args.connection_host,
args.connection_port,
args.connection_db
);
let storage = db::Storage::init(connection_url).await?;
let db_pool = storage.pool_owned().await;
tokio::spawn(async move {
background_task::spawn_in_background(db_pool).await;
tracing::info!("Started task");
});
let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, args.http_port)
.await
.expect("Failed to start server");
tracing::info!("Started HTTP server on port {}", args.http_port);
wait_for_signal().await;
if let Err(err) = shutdown_handles.shutdown().await {
tracing::error!("{err}");
};
Ok(())
}
@@ -26,7 +26,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
NyxdScraper::new(config.scraper_config())
.await?
.process_single_block(args.height)
.unsafe_process_single_block(args.height)
.await?;
Ok(())
}
@@ -39,7 +39,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
NyxdScraper::new(config.scraper_config())
.await?
.process_block_range(args.start_height, args.stop_height)
.unsafe_process_block_range(args.start_height, args.stop_height)
.await?;
Ok(())
}
+11 -1
View File
@@ -112,6 +112,7 @@ impl Config {
nyxd_scraper: NyxdScraper {
websocket_url,
pruning: Default::default(),
store_precommits: true,
},
base: Base {
upstream_nyxd: nyxd_url,
@@ -127,6 +128,8 @@ impl Config {
rpc_url: self.base.upstream_nyxd.clone(),
database_path: self.storage_paths.nyxd_scraper.clone(),
pruning_options: self.nyxd_scraper.pruning,
store_precommits: self.nyxd_scraper.store_precommits,
start_block_height: None,
}
}
@@ -314,7 +317,14 @@ pub struct NyxdScraper {
// if the value is missing, use `nothing` pruning as this was the past behaviour
#[serde(default = "PruningOptions::nothing")]
pub pruning: PruningOptions,
// TODO: debug with everything that's currently hardcoded in the scraper
/// Specifies whether to store pre-commits within the database.
#[serde(default = "default_store_precommits")]
pub store_precommits: bool,
}
fn default_store_precommits() -> bool {
true
}
impl NyxdScraper {
+1
View File
@@ -3163,6 +3163,7 @@ dependencies = [
"log",
"nym-network-defaults",
"serde",
"thiserror",
"toml 0.8.19",
"url",
]
@@ -1,9 +1,9 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: Apache-2.0
# SPDX-License-Identifier: GPL-3.0-only
[package]
name = "nym-data-observatory"
version = "0.1.0"
name = "nyx-chain-watcher"
version = "0.1.2"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -17,16 +17,24 @@ readme.workspace = true
anyhow = { workspace = true }
axum = { workspace = true, features = ["tokio"] }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
nym-bin-common = { path = "../common/bin-common" }
clap = { workspace = true, features = ["cargo", "derive", "env"] }
nym-config = { path = "../common/config" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
nym-network-defaults = { path = "../common/network-defaults" }
nym-task = { path = "../common/task" }
nym-node-requests = { path = "../nym-node/nym-node-requests", features = [
"openapi",
] }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nyxd-scraper = {path = "../common/nyxd-scraper"}
reqwest = {workspace= true, features = ["rustls-tls"]}
rocket = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "time"] }
thiserror = { workspace = true }
time = {version = "0.3.36"}
tokio = { workspace = true, features = ["process", "rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
@@ -40,4 +48,4 @@ utoipauto = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
+32
View File
@@ -0,0 +1,32 @@
FROM rust:latest AS builder
COPY ./ /usr/src/nym
WORKDIR /usr/src/nym/nyx-chain-watcher
RUN cargo build --release
#-------------------------------------------------------------------
# The following environment variables are required at runtime:
#
# NYX_CHAIN_WATCHER_DATABASE_PATH = /mnt/nyx-chain-watchter.sqlite
# NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH = /mnt/chain-history.sqlite
# NYX_CHAIN_WATCHER_WATCH_ACCOUNTS = "n1...,n1...,n1..."
#
# And optionally:
#
# NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES = "/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
# NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG = /mnt/sandbox.env for sandbox environment
#
# see https://github.com/nymtech/nym/blob/develop/nyx-chain-watcher/src/cli/commands/run/args.rs for details
# and https://github.com/nymtech/nym/blob/develop/nyx-chain-watcher/src/env.rs for env vars
#-------------------------------------------------------------------
FROM ubuntu:24.04
RUN apt update && apt install -yy curl ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/target/release/nyx-chain-watcher ./
ENTRYPOINT [ "/nym/nyx-chain-watcher", "run" ]
+19
View File
@@ -0,0 +1,19 @@
# Nyx Chain Watcher
A simple binary to watch addresses on the Nyx chain and to call webhooks when particular message types are in a block.
Look in [env.rs](./src/env.rs) for the names of environment variables that can be overridden.
## Running locally
```
NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH=chain_history.sqlite \
NYX_CHAIN_WATCHER_DATABASE_PATH=nyx_chain_watcher.sqlite \
NYX_CHAIN_WATCHER_WATCH_ACCOUNTS=n1...,n1...,n1... \
NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES="/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
NYX_CHAIN_WATCHER_WEBHOOK_URL="https://webhook.site" \
NYX_CHAIN_WATCHER_WEBHOOK_AUTH=1234 \
cargo run -- run
```
+53
View File
@@ -0,0 +1,53 @@
use anyhow::Result;
use sqlx::{sqlite::SqliteConnectOptions, Connection, SqliteConnection};
use std::io::Write;
use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr};
#[tokio::main]
async fn main() -> Result<()> {
let db_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join(".build")
.join("nyx_chain_watcher.sqlite");
// Create the database directory if it doesn't exist
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
let db_url = format!("sqlite:{}", db_path.display());
// Ensure database file is created with proper permissions
let connect_options = SqliteConnectOptions::from_str(&db_url)?
.create_if_missing(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.foreign_keys(true);
// Create initial connection to ensure database exists
let mut conn = SqliteConnection::connect_with(&connect_options).await?;
export_db_variables(&db_url)?;
println!("cargo:rustc-env=SQLX_OFFLINE=false");
// Run migrations after ensuring database exists
sqlx::migrate!("./migrations").run(&mut conn).await?;
// Add rerun-if-changed directives
println!("cargo:rerun-if-changed=migrations");
println!("cargo:rerun-if-changed=build.rs");
println!("cargo:rerun-if-changed=src");
Ok(())
}
fn export_db_variables(db_url: &str) -> Result<()> {
let mut map = HashMap::new();
map.insert("DATABASE_URL", db_url);
let mut file = File::create(".env")?;
for (var, value) in map.iter() {
println!("cargo:rustc-env={}={}", var, value);
writeln!(file, "{}={}", var, value)?;
}
Ok(())
}
@@ -0,0 +1,7 @@
CREATE TABLE price_history (
timestamp INTEGER PRIMARY KEY,
chf REAL NOT NULL,
usd REAL NOT NULL,
eur REAL NOT NULL,
btc REAL NOT NULL
);
@@ -0,0 +1,10 @@
CREATE TABLE payments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_hash TEXT NOT NULL UNIQUE,
sender_address TEXT NOT NULL,
receiver_address TEXT NOT NULL,
amount REAL NOT NULL,
timestamp INTEGER NOT NULL,
height INTEGER NOT NULL,
memo TEXT
);
@@ -0,0 +1,42 @@
use crate::env::vars::{NYXD_SCRAPER_START_HEIGHT, NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT};
use nyxd_scraper::{storage::ScraperStorage, NyxdScraper, PruningOptions};
pub(crate) async fn run_chain_scraper(
config: &crate::config::Config,
) -> anyhow::Result<ScraperStorage> {
let websocket_url = std::env::var("NYXD_WS").expect("NYXD_WS not defined");
let rpc_url = std::env::var("NYXD").expect("NYXD not defined");
let websocket_url = reqwest::Url::parse(&websocket_url)?;
let rpc_url = reqwest::Url::parse(&rpc_url)?;
// why are those not part of CLI? : (
let start_block_height = match std::env::var(NYXD_SCRAPER_START_HEIGHT).ok() {
None => None,
// blow up if passed malformed env value
Some(raw) => Some(raw.parse()?),
};
let use_best_effort_start_height =
match std::env::var(NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT).ok() {
None => false,
// blow up if passed malformed env value
Some(raw) => raw.parse()?,
};
let scraper = NyxdScraper::builder(nyxd_scraper::Config {
websocket_url,
rpc_url,
database_path: config.chain_scraper_database_path().into(),
pruning_options: PruningOptions::nothing(),
store_precommits: false,
start_block: nyxd_scraper::StartingBlockOpts {
start_block_height,
use_best_effort_start_height,
},
});
let instance = scraper.build_and_start().await?;
Ok(instance.storage)
}
@@ -0,0 +1,17 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NyxChainWatcherError;
use nym_bin_common::bin_info_owned;
use nym_bin_common::output_format::OutputFormat;
#[derive(clap::Args, Debug)]
pub(crate) struct Args {
#[clap(short, long, default_value_t = OutputFormat::default())]
output: OutputFormat,
}
pub(crate) fn execute(args: Args) -> Result<(), NyxChainWatcherError> {
println!("{}", args.output.format(&bin_info_owned!()));
Ok(())
}
@@ -0,0 +1,44 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::cli::DEFAULT_NYX_CHAIN_WATCHER_ID;
use crate::config::payments_watcher::HttpAuthenticationOptions::AuthorizationBearerToken;
use crate::config::payments_watcher::PaymentWatcherEntry;
use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatcherConfig};
use crate::error::NyxChainWatcherError;
use nym_config::save_unformatted_config_to_file;
use nym_validator_client::nyxd::AccountId;
use std::str::FromStr;
#[derive(clap::Args, Debug)]
pub(crate) struct Args {}
pub(crate) async fn execute(_args: Args) -> Result<(), NyxChainWatcherError> {
let config_path = default_config_filepath();
let data_dir = Config::default_data_directory(&config_path)?;
let builder = ConfigBuilder::new(config_path.clone(), data_dir).with_payment_watcher_config(
PaymentWatcherConfig {
watchers: vec![PaymentWatcherEntry {
id: DEFAULT_NYX_CHAIN_WATCHER_ID.to_string(),
webhook_url: "https://webhook.site".to_string(),
watch_for_transfer_recipient_accounts: Some(vec![AccountId::from_str(
"n17g9a2pwwkg8m60wf59pq6mv0c2wusg9ukparkz",
)
.unwrap()]),
authentication: Some(AuthorizationBearerToken {
token: "1234".to_string(),
}),
description: None,
watch_for_chain_message_types: Some(vec![
"/cosmos.bank.v1beta1.MsgSend".to_string(),
"/ibc.applications.transfer.v1.MsgTransfer".to_string(),
]),
}],
},
);
let config = builder.build();
Ok(save_unformatted_config_to_file(&config, &config_path)?)
}
@@ -0,0 +1,3 @@
pub(crate) mod build_info;
pub(crate) mod init;
pub(crate) mod run;
@@ -0,0 +1,101 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::env::vars::*;
use nym_validator_client::nyxd::AccountId;
#[derive(clap::Args, Debug)]
pub(crate) struct Args {
/// (Override) SQLite database file path for chain watcher
#[arg(long, env = NYX_CHAIN_WATCHER_DATABASE_PATH)]
pub(crate) chain_watcher_db_path: Option<String>,
/// (Override) SQLite database file path for chain scraper history
#[arg(long, env = NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH)]
pub(crate) chain_history_db_path: Option<String>,
/// (Override) Watch for transfers to these recipient accounts
#[clap(
long,
value_delimiter = ',',
env = NYX_CHAIN_WATCHER_WATCH_ACCOUNTS
)]
pub watch_for_transfer_recipient_accounts: Option<Vec<AccountId>>,
/// (Override) Watch for chain messages of these types
#[clap(
long,
value_delimiter = ',',
env = NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES
)]
pub watch_for_chain_message_types: Option<Vec<String>>,
/// (Override) The webhook to call when we find something
#[clap(
long,
env = NYX_CHAIN_WATCHER_WEBHOOK_URL
)]
pub webhook_url: Option<String>,
/// (Override) Optionally, authenticate with the webhook
#[clap(
long,
env = NYX_CHAIN_WATCHER_WEBHOOK_AUTH
)]
pub webhook_auth: Option<String>,
}
/*impl Args {
pub(super) fn take_mnemonic(&mut self) -> Option<Zeroizing<bip39::Mnemonic>> {
self.entry_gateway.mnemonic.take().map(Zeroizing::new)
}
}
impl Args {
pub(crate) fn build_config(self) -> Result<Config, NymNodeError> {
let config_path = self.config.config_path();
let data_dir = Config::default_data_directory(&config_path)?;
let id = self
.config
.id()
.clone()
.ok_or(NymNodeError::MissingInitArg {
section: "global".to_string(),
name: "id".to_string(),
})?;
let config = ConfigBuilder::new(id, config_path.clone(), data_dir.clone())
.with_mode(self.mode.unwrap_or_default())
.with_host(self.host.build_config_section())
.with_http(self.http.build_config_section())
.with_mixnet(self.mixnet.build_config_section())
.with_wireguard(self.wireguard.build_config_section(&data_dir))
.with_storage_paths(NymNodePaths::new(&data_dir))
.with_mixnode(self.mixnode.build_config_section())
.with_entry_gateway(self.entry_gateway.build_config_section(&data_dir))
.with_exit_gateway(self.exit_gateway.build_config_section(&data_dir))
.build();
Ok(config)
}
pub(crate) fn override_config(self, mut config: Config) -> Config {
if let Some(mode) = self.mode {
config.mode = mode;
}
config.host = self.host.override_config_section(config.host);
config.http = self.http.override_config_section(config.http);
config.mixnet = self.mixnet.override_config_section(config.mixnet);
config.wireguard = self.wireguard.override_config_section(config.wireguard);
config.mixnode = self.mixnode.override_config_section(config.mixnode);
config.entry_gateway = self
.entry_gateway
.override_config_section(config.entry_gateway);
config.exit_gateway = self
.exit_gateway
.override_config_section(config.exit_gateway);
config
}
}
*/
@@ -0,0 +1,84 @@
use crate::cli::commands::run::args::Args;
use crate::cli::DEFAULT_NYX_CHAIN_WATCHER_ID;
use crate::config::payments_watcher::{HttpAuthenticationOptions, PaymentWatcherEntry};
use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatcherConfig};
use crate::error::NyxChainWatcherError;
use tracing::{info, warn};
pub(crate) fn get_run_config(args: Args) -> Result<Config, NyxChainWatcherError> {
info!("{args:#?}");
let Args {
ref watch_for_transfer_recipient_accounts,
mut watch_for_chain_message_types,
webhook_auth,
ref chain_watcher_db_path,
ref chain_history_db_path,
webhook_url,
} = args;
// if there are no args set, then try load the config
if args.watch_for_transfer_recipient_accounts.is_none()
&& args.watch_for_transfer_recipient_accounts.is_none()
&& args.chain_watcher_db_path.is_none()
{
info!("Loading default config file...");
return Config::read_from_toml_file_in_default_location();
}
// set default messages
if watch_for_chain_message_types.is_none() {
watch_for_chain_message_types = Some(vec!["/cosmos.bank.v1beta1.MsgSend".to_string()]);
}
// warn if no accounts set
if watch_for_transfer_recipient_accounts.is_none() {
warn!(
"You did not specify any accounts to watch in {}. Only chain data will be stored.",
crate::env::vars::NYX_CHAIN_WATCHER_WATCH_ACCOUNTS
);
}
let config_path = default_config_filepath();
let data_dir = Config::default_data_directory(&config_path)?;
let mut builder = ConfigBuilder::new(config_path, data_dir);
if let Some(db_path) = chain_watcher_db_path {
info!("Overriding database url with '{db_path}'");
builder = builder.with_db_path(db_path.clone());
}
if let Some(db_path) = chain_history_db_path {
info!("Overriding chain history database url with '{db_path}'");
builder = builder.with_chain_scraper_db_path(db_path.clone());
}
if let Some(webhook_url) = webhook_url {
let authentication =
webhook_auth.map(|token| HttpAuthenticationOptions::AuthorizationBearerToken { token });
let watcher_config = PaymentWatcherConfig {
watchers: vec![PaymentWatcherEntry {
id: DEFAULT_NYX_CHAIN_WATCHER_ID.to_string(),
description: None,
watch_for_transfer_recipient_accounts: watch_for_transfer_recipient_accounts
.clone(),
watch_for_chain_message_types,
webhook_url,
authentication,
}],
};
info!("Overriding watcher config with env vars");
builder = builder.with_payment_watcher_config(watcher_config);
} else {
warn!(
"You did not specify a webhook in {}. Only database items will be stored.",
crate::env::vars::NYX_CHAIN_WATCHER_WEBHOOK_URL
);
}
Ok(builder.build())
}
@@ -0,0 +1,92 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NyxChainWatcherError;
use tokio::join;
use tracing::{error, info, trace};
mod args;
mod config;
use crate::chain_scraper::run_chain_scraper;
use crate::{db, http, payment_listener, price_scraper};
pub(crate) use args::Args;
use nym_task::signal::wait_for_signal;
pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWatcherError> {
trace!("passed arguments: {args:#?}");
let config = config::get_run_config(args)?;
let db_path = config.database_path();
info!("Config is {config:#?}");
info!(
"Database path is {:?}",
std::path::Path::new(&db_path)
.canonicalize()
.unwrap_or_default()
);
info!(
"Chain History Database path is {:?}",
std::path::Path::new(&config.chain_scraper_database_path())
.canonicalize()
.unwrap_or_default()
);
// Ensure parent directory exists
if let Some(parent) = std::path::Path::new(&db_path).parent() {
std::fs::create_dir_all(parent)?;
}
let connection_url = format!("sqlite://{}?mode=rwc", db_path);
let storage = db::Storage::init(connection_url).await?;
let watcher_pool = storage.pool_owned().await;
// Spawn the chain scraper and get its storage
// Spawn the payment listener task
let payment_listener_handle = tokio::spawn({
let obs_pool = watcher_pool.clone();
let chain_storage = run_chain_scraper(&config).await?;
let payment_watcher_config = config.payment_watcher_config.unwrap_or_default();
async move {
if let Err(e) = payment_listener::run_payment_listener(
payment_watcher_config,
obs_pool,
chain_storage,
)
.await
{
error!("Payment listener error: {}", e);
}
Ok::<_, anyhow::Error>(())
}
});
// Clone pool for each task that needs it
//let background_pool = db_pool.clone();
let price_scraper_handle = tokio::spawn(async move {
price_scraper::run_price_scraper(&watcher_pool).await;
});
let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, http_port)
.await
.expect("Failed to start server");
info!("Started HTTP server on port {}", http_port);
// Wait for the short-lived tasks to complete
let _ = join!(price_scraper_handle, payment_listener_handle);
// Wait for a signal to terminate the long-running task
wait_for_signal().await;
if let Err(err) = shutdown_handles.shutdown().await {
error!("{err}");
};
Ok(())
}
+67
View File
@@ -0,0 +1,67 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::cli::commands::{build_info, init, run};
use crate::env::vars::*;
use crate::error::NyxChainWatcherError;
use clap::{Parser, Subcommand};
use nym_bin_common::bin_info;
use std::sync::OnceLock;
mod commands;
pub const DEFAULT_NYX_CHAIN_WATCHER_ID: &str = "default-nyx-chain-watcher";
// Helper for passing LONG_VERSION to clap
fn pretty_build_info_static() -> &'static str {
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
}
#[derive(Parser, Debug)]
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
pub(crate) struct Cli {
/// Path pointing to an env file that configures the nym-chain-watcher and overrides any preconfigured values.
#[clap(
short,
long,
env = NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG
)]
pub(crate) config_env_file: Option<std::path::PathBuf>,
/// Flag used for disabling the printed banner in tty.
#[clap(
long,
env = NYX_CHAIN_WATCHER_NO_BANNER_ARG
)]
pub(crate) no_banner: bool,
/// Port to listen on
#[arg(long, default_value_t = 8000, env = "NYX_CHAIN_WATCHER_HTTP_PORT")]
pub(crate) http_port: u16,
#[clap(subcommand)]
command: Commands,
}
impl Cli {
pub(crate) async fn execute(self) -> Result<(), NyxChainWatcherError> {
match self.command {
Commands::BuildInfo(args) => build_info::execute(args),
Commands::Run(args) => run::execute(*args, self.http_port).await,
Commands::Init(args) => init::execute(args).await,
}
}
}
#[derive(Subcommand, Debug)]
pub(crate) enum Commands {
/// Show build information of this binary
BuildInfo(build_info::Args),
/// Start this nym-chain-watcher
Run(Box<run::Args>),
/// Initialise config
Init(init::Args),
}
+249
View File
@@ -0,0 +1,249 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::config::template::CONFIG_TEMPLATE;
use nym_bin_common::logging::LoggingSettings;
use nym_config::{
must_get_home, read_config_from_toml_file, save_unformatted_config_to_file, NymConfigTemplate,
DEFAULT_CONFIG_DIR, DEFAULT_CONFIG_FILENAME, DEFAULT_DATA_DIR, NYM_DIR,
};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tracing::{debug, error};
pub(crate) mod payments_watcher;
mod template;
pub use crate::config::payments_watcher::PaymentWatcherConfig;
use crate::error::NyxChainWatcherError;
const DEFAULT_NYM_CHAIN_WATCHER_DIR: &str = "nym-chain-watcher";
pub(crate) const DEFAULT_NYM_CHAIN_WATCHER_DB_FILENAME: &str = "nyx_chain_watcher.sqlite";
pub(crate) const DEFAULT_NYM_CHAIN_SCRAPER_HISTORY_DB_FILENAME: &str = "chain_history.sqlite";
/// Derive default path to nym-chain-watcher's config directory.
/// It should get resolved to `$HOME/.nym/nym-chain-watcher/config`
pub fn default_config_directory() -> PathBuf {
must_get_home()
.join(NYM_DIR)
.join(DEFAULT_NYM_CHAIN_WATCHER_DIR)
.join(DEFAULT_CONFIG_DIR)
}
/// Derive default path to nym-chain-watcher's config file.
/// It should get resolved to `$HOME/.nym/nym-chain-watcher/config/config.toml`
pub fn default_config_filepath() -> PathBuf {
default_config_directory().join(DEFAULT_CONFIG_FILENAME)
}
pub struct ConfigBuilder {
pub config_path: PathBuf,
pub data_dir: PathBuf,
pub db_path: Option<String>,
pub chain_scraper_db_path: Option<String>,
pub payment_watcher_config: Option<PaymentWatcherConfig>,
pub logging: Option<LoggingSettings>,
}
impl ConfigBuilder {
pub fn new(config_path: PathBuf, data_dir: PathBuf) -> Self {
ConfigBuilder {
config_path,
data_dir,
payment_watcher_config: None,
logging: None,
db_path: None,
chain_scraper_db_path: None,
}
}
pub fn with_db_path(mut self, db_path: String) -> Self {
self.db_path = Some(db_path);
self
}
pub fn with_chain_scraper_db_path(mut self, chain_scraper_db_path: String) -> Self {
self.chain_scraper_db_path = Some(chain_scraper_db_path);
self
}
#[allow(dead_code)]
pub fn with_payment_watcher_config(
mut self,
payment_watcher_config: impl Into<PaymentWatcherConfig>,
) -> Self {
self.payment_watcher_config = Some(payment_watcher_config.into());
self
}
#[allow(dead_code)]
pub fn with_logging(mut self, section: impl Into<Option<LoggingSettings>>) -> Self {
self.logging = section.into();
self
}
pub fn build(self) -> Config {
Config {
logging: self.logging.unwrap_or_default(),
save_path: Some(self.config_path),
payment_watcher_config: self.payment_watcher_config,
data_dir: self.data_dir,
db_path: self.db_path,
chain_scraper_db_path: self.chain_scraper_db_path,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
// additional metadata holding on-disk location of this config file
#[serde(skip)]
pub(crate) save_path: Option<PathBuf>,
#[serde(skip)]
pub(crate) data_dir: PathBuf,
#[serde(skip)]
db_path: Option<String>,
#[serde(skip)]
chain_scraper_db_path: Option<String>,
pub payment_watcher_config: Option<PaymentWatcherConfig>,
#[serde(default)]
pub logging: LoggingSettings,
}
impl NymConfigTemplate for Config {
fn template(&self) -> &'static str {
CONFIG_TEMPLATE
}
}
impl Config {
#[allow(unused)]
pub fn save(&self) -> Result<(), NyxChainWatcherError> {
let save_location = self.save_location();
debug!(
"attempting to save config file to '{}'",
save_location.display()
);
save_unformatted_config_to_file(self, &save_location).map_err(|source| {
NyxChainWatcherError::UnformattedConfigSaveFailure {
path: save_location,
source,
}
})
}
#[allow(unused)]
pub fn save_location(&self) -> PathBuf {
self.save_path
.clone()
.unwrap_or(self.default_save_location())
}
#[allow(unused)]
pub fn default_save_location(&self) -> PathBuf {
default_config_filepath()
}
pub fn default_data_directory<P: AsRef<Path>>(
config_path: P,
) -> Result<PathBuf, NyxChainWatcherError> {
let config_path = config_path.as_ref();
// we got a proper path to the .toml file
let Some(config_dir) = config_path.parent() else {
error!(
"'{}' does not have a parent directory. Have you pointed to the fs root?",
config_path.display()
);
return Err(NyxChainWatcherError::DataDirDerivationFailure);
};
let Some(config_dir_name) = config_dir.file_name() else {
error!(
"could not obtain parent directory name of '{}'. Have you used relative paths?",
config_path.display()
);
return Err(NyxChainWatcherError::DataDirDerivationFailure);
};
if config_dir_name != DEFAULT_CONFIG_DIR {
error!(
"the parent directory of '{}' ({}) is not {DEFAULT_CONFIG_DIR}. currently this is not supported",
config_path.display(), config_dir_name.to_str().unwrap_or("UNKNOWN")
);
return Err(NyxChainWatcherError::DataDirDerivationFailure);
}
let Some(node_dir) = config_dir.parent() else {
error!(
"'{}' does not have a parent directory. Have you pointed to the fs root?",
config_dir.display()
);
return Err(NyxChainWatcherError::DataDirDerivationFailure);
};
Ok(node_dir.join(DEFAULT_DATA_DIR))
}
pub fn database_path(&self) -> String {
self.db_path.clone().unwrap_or_else(|| {
let mut path = self.data_dir.clone().to_path_buf();
path.push(DEFAULT_NYM_CHAIN_WATCHER_DB_FILENAME);
path.to_str()
.unwrap_or(DEFAULT_NYM_CHAIN_WATCHER_DB_FILENAME)
.to_string()
})
}
pub fn chain_scraper_database_path(&self) -> String {
self.chain_scraper_db_path.clone().unwrap_or_else(|| {
let mut path = self.data_dir.clone().to_path_buf();
path.push(DEFAULT_NYM_CHAIN_SCRAPER_HISTORY_DB_FILENAME);
path.to_str()
.unwrap_or(DEFAULT_NYM_CHAIN_SCRAPER_HISTORY_DB_FILENAME)
.to_string()
})
}
// simple wrapper that reads config file and assigns path location
fn read_from_path<P: AsRef<Path>>(path: P, data_dir: P) -> Result<Self, NyxChainWatcherError> {
let path = path.as_ref();
let data_dir = data_dir.as_ref();
let mut loaded: Config = read_config_from_toml_file(path).map_err(|source| {
NyxChainWatcherError::ConfigLoadFailure {
path: path.to_path_buf(),
source,
}
})?;
loaded.data_dir = data_dir.to_path_buf();
loaded.save_path = Some(path.to_path_buf());
debug!("loaded config file from {}", path.display());
Ok(loaded)
}
#[allow(unused)]
pub fn read_from_toml_file<P: AsRef<Path>>(
path: P,
data_dir: P,
) -> Result<Self, NyxChainWatcherError> {
Self::read_from_path(path, data_dir)
}
pub fn read_from_toml_file_in_default_location() -> Result<Self, NyxChainWatcherError> {
let config_path = default_config_filepath();
let data_dir = Config::default_data_directory(&config_path)?;
Self::read_from_path(config_path, data_dir)
}
}
@@ -0,0 +1,23 @@
use nym_validator_client::nyxd::AccountId;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PaymentWatcherConfig {
pub watchers: Vec<PaymentWatcherEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaymentWatcherEntry {
pub id: String,
pub description: Option<String>,
pub webhook_url: String,
pub watch_for_transfer_recipient_accounts: Option<Vec<AccountId>>,
pub watch_for_chain_message_types: Option<Vec<String>>,
pub authentication: Option<HttpAuthenticationOptions>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HttpAuthenticationOptions {
AuthorizationBearerToken { token: String },
}
+29
View File
@@ -0,0 +1,29 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
// While using normal toml marshalling would have been way simpler with less overhead,
// I think it's useful to have comments attached to the saved config file to explain behaviour of
// particular fields.
// Note: any changes to the template must be reflected in the appropriate structs.
pub(crate) const CONFIG_TEMPLATE: &str = r#"
# This is a TOML config file.
# For more information, see https://github.com/toml-lang/toml
[payment_watcher_config]
{{#each payment_watcher_config.watchers }}
[[watchers]]
id={{this.id}}
description='{{this.description}}'
webhook_url='{{this.webhook_url}}'
{{/each}}
##### logging configuration options #####
[logging]
# TODO
"#;
@@ -1,13 +1,16 @@
use anyhow::{anyhow, Result};
use sqlx::{migrate::Migrator, postgres::PgConnectOptions, ConnectOptions, PgPool};
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, SqlitePool};
use std::str::FromStr;
pub(crate) mod models;
pub(crate) mod queries;
pub(crate) mod queries {
pub mod payments;
pub mod price;
}
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
pub(crate) type DbPool = PgPool;
pub(crate) type DbPool = SqlitePool;
pub(crate) struct Storage {
pool: DbPool,
@@ -16,13 +19,16 @@ pub(crate) struct Storage {
impl Storage {
pub async fn init(connection_url: String) -> Result<Self> {
let connect_options =
PgConnectOptions::from_str(&connection_url)?.disable_statement_logging();
SqliteConnectOptions::from_str(&connection_url)?.create_if_missing(true);
let pool = DbPool::connect_with(connect_options)
.await
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
MIGRATOR.run(&pool).await?;
MIGRATOR
.run(&pool)
.await
.map_err(|err| anyhow!("Failed to run migrations: {}", err))?;
Ok(Storage { pool })
}
+41
View File
@@ -0,0 +1,41 @@
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
#[derive(Clone, Deserialize, Debug)]
pub(crate) struct CurrencyPrices {
pub(crate) chf: f32,
pub(crate) usd: f32,
pub(crate) eur: f32,
pub(crate) btc: f32,
}
// Struct to hold Coingecko response
#[derive(Clone, Deserialize, Debug, ToSchema)]
pub(crate) struct CoingeckoPriceResponse {
pub(crate) nym: CurrencyPrices,
}
#[derive(Clone, Deserialize, Debug, ToSchema)]
pub(crate) struct PriceRecord {
pub(crate) timestamp: i64,
pub(crate) nym: CurrencyPrices,
}
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct PriceHistory {
pub(crate) timestamp: i64,
pub(crate) chf: f32,
pub(crate) usd: f32,
pub(crate) eur: f32,
pub(crate) btc: f32,
}
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct PaymentRecord {
pub(crate) transaction_hash: String,
pub(crate) sender_address: String,
pub(crate) receiver_address: String,
pub(crate) amount: f64,
pub(crate) timestamp: i64,
pub(crate) height: i64,
}
+6
View File
@@ -0,0 +1,6 @@
mod payments;
mod price;
// re-exporting allows us to access all queries via `queries::bla``
pub(crate) use payments::{get_last_checked_height, insert_payment};
pub(crate) use price::{get_latest_price, insert_nym_prices};
@@ -0,0 +1,41 @@
use crate::db::DbPool;
use anyhow::Result;
pub async fn get_last_checked_height(pool: &DbPool) -> Result<i64> {
let result = sqlx::query_scalar!("SELECT MAX(height) FROM payments")
.fetch_one(pool)
.await?;
Ok(result.unwrap_or(0))
}
pub async fn insert_payment(
pool: &DbPool,
transaction_hash: String,
sender_address: String,
receiver_address: String,
amount: f64,
height: i64,
memo: Option<String>,
) -> Result<()> {
let timestamp = chrono::Utc::now().timestamp();
sqlx::query!(
r#"
INSERT INTO payments (
transaction_hash, sender_address, receiver_address,
amount, height, timestamp, memo
) VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
transaction_hash,
sender_address,
receiver_address,
amount,
height,
timestamp,
memo,
)
.execute(pool)
.await?;
Ok(())
}
+46
View File
@@ -0,0 +1,46 @@
use crate::db::models::{PriceHistory, PriceRecord};
use crate::db::DbPool;
pub(crate) async fn insert_nym_prices(
pool: &DbPool,
price_data: PriceRecord,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
let timestamp = price_data.timestamp;
sqlx::query!(
"INSERT INTO price_history
(timestamp, chf, usd, eur, btc)
VALUES
($1, $2, $3, $4, $5)
ON CONFLICT(timestamp) DO UPDATE SET
chf=excluded.chf,
usd=excluded.usd,
eur=excluded.eur,
btc=excluded.btc;",
timestamp,
price_data.nym.chf,
price_data.nym.usd,
price_data.nym.eur,
price_data.nym.btc,
)
.execute(&mut *conn)
.await?;
Ok(())
}
pub(crate) async fn get_latest_price(pool: &DbPool) -> anyhow::Result<PriceHistory> {
let result = sqlx::query!(
"SELECT timestamp, chf, usd, eur, btc FROM price_history ORDER BY timestamp DESC LIMIT 1;"
)
.fetch_one(pool)
.await?;
Ok(PriceHistory {
timestamp: result.timestamp,
chf: result.chf as f32,
usd: result.usd as f32,
eur: result.eur as f32,
btc: result.btc as f32,
})
}
+27
View File
@@ -0,0 +1,27 @@
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
#[allow(unused)]
pub mod vars {
pub const NYX_CHAIN_WATCHER_NO_BANNER_ARG: &str = "NYX_CHAIN_WATCHER_NO_BANNER";
pub const NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG: &str = "NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG";
pub const NYX_CHAIN_WATCHER_DATABASE_PATH: &str = "NYX_CHAIN_WATCHER_DATABASE_PATH";
pub const NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH: &str =
"NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH";
pub const NYXD_SCRAPER_START_HEIGHT: &str = "NYXD_SCRAPER_START_HEIGHT";
pub const NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT: &str =
"NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT";
pub const NYX_CHAIN_WATCHER_ID_ARG: &str = "NYX_CHAIN_WATCHER_ID";
pub const NYX_CHAIN_WATCHER_OUTPUT_ARG: &str = "NYX_CHAIN_WATCHER_OUTPUT";
pub const NYX_CHAIN_WATCHER_CONFIG_PATH_ARG: &str = "NYX_CHAIN_WATCHER_CONFIG";
pub const NYX_CHAIN_WATCHER_WATCH_ACCOUNTS: &str = "NYX_CHAIN_WATCHER_WATCH_ACCOUNTS";
pub const NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES: &str =
"NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES";
pub const NYX_CHAIN_WATCHER_WEBHOOK_URL: &str = "NYX_CHAIN_WATCHER_WEBHOOK_URL";
pub const NYX_CHAIN_WATCHER_WEBHOOK_AUTH: &str = "NYX_CHAIN_WATCHER_WEBHOOK_AUTH";
}
+40
View File
@@ -0,0 +1,40 @@
use std::io;
use std::path::PathBuf;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NyxChainWatcherError {
// #[error("failed to save config file using path '{}'. detailed message: {source}", path.display())]
// ConfigSaveFailure {
// path: PathBuf,
// #[source]
// source: io::Error,
// },
#[error("failed to save config file using path '{}'. detailed message: {source}", path.display())]
UnformattedConfigSaveFailure {
path: PathBuf,
#[source]
source: nym_config::error::NymConfigTomlError,
},
#[error("could not derive path to data directory of this nyx chain watcher")]
DataDirDerivationFailure,
// #[error("could not derive path to config directory of this nyx chain watcher")]
// ConfigDirDerivationFailure,
#[error("failed to load config file using path '{}'. detailed message: {source}", path.display())]
ConfigLoadFailure {
path: PathBuf,
#[source]
source: io::Error,
},
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
AnyhowFailure(#[from] anyhow::Error),
#[error(transparent)]
NymConfigTomlE(#[from] nym_config::error::NymConfigTomlError),
}
@@ -7,8 +7,8 @@ use utoipa_swagger_ui::SwaggerUi;
use crate::http::{api_docs, server::HttpServer, state::AppState};
pub(crate) mod jokes;
pub(crate) mod mixnodes;
pub(crate) mod price;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
@@ -28,8 +28,9 @@ impl RouterBuilder {
.nest(
"/v1",
Router::new()
.nest("/jokes", jokes::routes())
.nest("/mixnodes", mixnodes::routes()),
//.nest("/jokes", jokes::routes())
.nest("/mixnodes", mixnodes::routes())
.nest("/price", price::routes()),
);
Self {
+27
View File
@@ -0,0 +1,27 @@
use crate::db::models::PriceHistory;
use crate::db::queries::price::get_latest_price;
use crate::http::error::Error;
use crate::http::error::HttpResult;
use crate::http::state::AppState;
use axum::{extract::State, Json, Router};
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/", axum::routing::get(price))
}
#[utoipa::path(
tag = "Nym Price",
get,
path = "/v1/price",
responses(
(status = 200, body = String)
)
)]
/// Fetch the latest price cached by this API
async fn price(State(state): State<AppState>) -> HttpResult<Json<PriceHistory>> {
get_latest_price(state.db_pool())
.await
.map(Json::from)
.map_err(|_| Error::internal())
}
@@ -4,7 +4,7 @@ use utoipauto::utoipauto;
// manually import external structs which are behind feature flags because they
// can't be automatically discovered
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
#[utoipauto(paths = "./nym-data-observatory/src")]
#[utoipauto(paths = "./nyx-chain-watcher/src")]
#[derive(OpenApi)]
#[openapi(info(title = "Nym API"), tags(), components(schemas()))]
#[openapi(info(title = "Nyx Chain Watcher API"), tags(), components(schemas()))]
pub(super) struct ApiDoc;
@@ -6,13 +6,6 @@ pub(crate) struct Error {
}
impl Error {
pub(crate) fn not_found(message: String) -> Self {
Self {
message,
status: axum::http::StatusCode::NOT_FOUND,
}
}
pub(crate) fn internal() -> Self {
Self {
message: String::from("Internal server error"),
+35
View File
@@ -0,0 +1,35 @@
use clap::{crate_name, crate_version, Parser};
use nym_bin_common::bin_info_owned;
use nym_bin_common::logging::maybe_print_banner;
use nym_network_defaults::setup_env;
use tracing::info;
mod chain_scraper;
mod cli;
mod config;
mod db;
mod env;
mod error;
mod http;
mod logging;
pub mod models;
mod payment_listener;
mod price_scraper;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = cli::Cli::parse();
setup_env(cli.config_env_file.as_ref());
logging::setup_tracing_logger();
if !cli.no_banner {
maybe_print_banner(crate_name!(), crate_version!());
}
let bin_info = bin_info_owned!();
info!("using the following version: {bin_info}");
cli.execute().await?;
Ok(())
}
+14
View File
@@ -0,0 +1,14 @@
use rocket::serde::{Deserialize, Serialize};
use schemars::JsonSchema;
use utoipa::ToSchema;
#[derive(Serialize, Deserialize, Clone, JsonSchema, ToSchema)]
pub struct WebhookPayload {
pub transaction_hash: String,
pub message_index: u64,
pub sender_address: String,
pub receiver_address: String,
pub amount: String,
pub height: u128,
pub memo: Option<String>,
}
@@ -0,0 +1,215 @@
use crate::config::payments_watcher::HttpAuthenticationOptions;
use crate::config::PaymentWatcherConfig;
use crate::db::queries;
use crate::models::WebhookPayload;
use nym_validator_client::nyxd::AccountId;
use nyxd_scraper::storage::ScraperStorage;
use reqwest::Client;
use rocket::form::validate::Contains;
use serde_json::Value;
use sqlx::SqlitePool;
use std::str::FromStr;
use tokio::time::{self, Duration};
use tracing::{error, info, trace};
#[derive(Debug)]
struct TransferEvent {
recipient: AccountId,
sender: AccountId,
amount: String,
message_index: u64,
}
pub(crate) async fn run_payment_listener(
payment_watcher_config: PaymentWatcherConfig,
watcher_pool: SqlitePool,
chain_storage: ScraperStorage,
) -> anyhow::Result<()> {
let client = Client::new();
let default_message_types = vec!["/cosmos.bank.v1beta1.MsgSend".to_string()];
loop {
// 1. get the last height this watcher ran at
let last_checked_height = queries::payments::get_last_checked_height(&watcher_pool).await?;
info!("Last checked height: {}", last_checked_height);
// 2. iterate through watchers
for watcher in &payment_watcher_config.watchers {
let watch_for_chain_message_types = watcher
.watch_for_chain_message_types
.as_ref()
.unwrap_or(&default_message_types);
// 3. build up transactions that match the message types we are looking for
let mut transactions = vec![];
for message_type in watch_for_chain_message_types {
match chain_storage
.get_transactions_after_height(
last_checked_height,
Some(message_type),
)
.await {
Ok(txs) => {
for t in txs {
transactions.push(t);
}
}
Err(e) => error!("Failed to get transactions (message_type = {message_type}) from scraper database: {e}")
}
}
for tx in transactions {
if let Some(raw_log) = tx.raw_log.as_deref() {
if let Some(watch_for_transfer_recipient_accounts) =
&watcher.watch_for_transfer_recipient_accounts
{
// 4. match recipient accounts we are looking for
match parse_transfer_from_raw_log(
raw_log,
watch_for_transfer_recipient_accounts,
) {
Ok(transfer_events) => {
if !transfer_events.is_empty() {
info!(
"[watcher = {}] Processing transaction: {} - {} payment events found",
watcher.id, tx.hash, transfer_events.len()
);
}
for transfer in transfer_events {
let amount: f64 = parse_unym_amount(&transfer.amount)?;
queries::payments::insert_payment(
&watcher_pool,
tx.hash.clone(),
transfer.sender.clone().to_string(),
transfer.recipient.clone().to_string(),
amount,
tx.height,
tx.memo.clone(),
)
.await?;
let webhook_data = WebhookPayload {
transaction_hash: tx.hash.clone(),
message_index: transfer.message_index,
sender_address: transfer.sender.to_string(),
receiver_address: transfer.recipient.to_string(),
amount: transfer.amount,
height: tx.height as u128,
memo: tx.memo.clone(),
};
let mut request_builder =
client.post(&watcher.webhook_url).json(&webhook_data);
if let Some(auth) = &watcher.authentication {
match auth {
HttpAuthenticationOptions::AuthorizationBearerToken { token } => {
request_builder = request_builder.bearer_auth(token);
}
}
}
match request_builder.send().await {
Ok(res) => info!(
"[watcher = {}] ✅ Webhook {} {} - tx {}, index {}",
watcher.id,
res.status(),
res.url(),
tx.hash,
transfer.message_index,
),
Err(e) => error!(
"[watcher = {}] ❌ Webhook {:?} {:?} error = {}",
watcher.id,
e.status(),
e.url(),
e,
),
}
}
}
Err(e) => error!(
"[watcher = {}] ❌ Parse logs for tx {} failed, error = {}",
watcher.id, tx.hash, e,
),
}
}
}
}
}
time::sleep(Duration::from_secs(10)).await;
}
}
fn parse_transfer_from_raw_log(
raw_log: &str,
watch_for_transfer_recipient_accounts: &Vec<AccountId>,
) -> anyhow::Result<Vec<TransferEvent>> {
let log_value: Value = serde_json::from_str(raw_log)?;
let mut transfers: Vec<TransferEvent> = vec![];
let default_value = vec![];
let log_entries: &Vec<Value> = log_value.as_array().unwrap_or(&default_value);
trace!("contains {} log entries", log_entries.len());
for log_entry in log_entries {
let message_index = log_entry["msg_index"].as_u64().unwrap_or_default();
trace!("entry - {message_index}...");
if let Some(events) = log_entry["events"].as_array() {
for transfer_event in events.iter().filter(|e| e["type"] == "transfer") {
if let Some(attrs) = transfer_event["attributes"].as_array() {
let mut recipient: Option<AccountId> = None;
let mut sender: Option<AccountId> = None;
let mut amount: Option<String> = None;
for attr in attrs {
match attr["key"].as_str() {
Some("recipient") => {
recipient =
AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok();
}
Some("sender") => {
sender =
AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok();
}
Some("amount") => {
amount = Some(attr["value"].as_str().unwrap_or("").to_string())
}
// TODO: parse message index
_ => continue,
}
}
if let (Some(recipient), Some(sender), Some(amount)) =
(recipient, sender, amount)
{
if watch_for_transfer_recipient_accounts.contains(&recipient) {
transfers.push(TransferEvent {
recipient,
sender,
amount,
message_index,
});
}
}
}
}
}
}
Ok(transfers)
}
fn parse_unym_amount(amount: &str) -> anyhow::Result<f64> {
let amount = amount.trim_end_matches("unym");
let parsed: f64 = amount.parse()?;
Ok(parsed / 1_000_000.0)
}
@@ -0,0 +1,55 @@
use crate::db::{
models::{CoingeckoPriceResponse, PriceRecord},
queries::price::insert_nym_prices,
};
use core::str;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use crate::db::DbPool;
const REFRESH_DELAY: Duration = Duration::from_secs(300);
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
const COINGECKO_API_URL: &str =
"https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,btc";
pub(crate) async fn run_price_scraper(db_pool: &DbPool) -> JoinHandle<()> {
loop {
tracing::info!("Running in a loop 🏃");
if let Err(e) = get_coingecko_prices(db_pool).await {
tracing::error!("❌ Failed to get CoinGecko prices: {e}");
tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs());
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!("✅ Successfully fetched CoinGecko prices");
tokio::time::sleep(REFRESH_DELAY).await;
}
}
}
async fn get_coingecko_prices(pool: &DbPool) -> anyhow::Result<()> {
tracing::info!("💰 Fetching CoinGecko prices from {}", COINGECKO_API_URL);
let response = reqwest::get(COINGECKO_API_URL)
.await?
.json::<CoingeckoPriceResponse>()
.await;
tracing::info!("Got response {:?}", response);
match response {
Ok(resp) => {
let price_record = PriceRecord {
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
nym: resp.nym,
};
insert_nym_prices(pool, price_record).await?;
}
Err(e) => {
//tracing::info!("💰 CoinGecko price response: {:?}", response);
tracing::error!("Error sending request: {}", e);
}
}
Ok(())
}