Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 64091eb175 | |||
| 6204014395 | |||
| d4680382f5 | |||
| 2e9ab39002 | |||
| 4877a93201 | |||
| d38abba9a3 | |||
| 93b0063f73 | |||
| 247a4cefa6 | |||
| ade88af3cb | |||
| 0b99c1f467 | |||
| 46fb4617e8 | |||
| ed821d1a57 | |||
| 23e52653d7 | |||
| 8fba032e26 | |||
| 504b2af388 | |||
| 5a8e91542e |
@@ -0,0 +1,55 @@
|
||||
name: Build and upload Nyx Chain Watcher container to harbor.nymte.ch
|
||||
on:
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
WORKING_DIRECTORY: "nyx-chain-watcher"
|
||||
CONTAINER_NAME: "nyx-chain-watcher"
|
||||
|
||||
jobs:
|
||||
build-container:
|
||||
runs-on: arc-ubuntu-22.04-dind
|
||||
steps:
|
||||
- name: Login to Harbor
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: harbor.nymte.ch
|
||||
username: ${{ secrets.HARBOR_ROBOT_USERNAME }}
|
||||
password: ${{ secrets.HARBOR_ROBOT_SECRET }}
|
||||
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Configure git identity
|
||||
run: |
|
||||
git config --global user.email "lawrence@nymtech.net"
|
||||
git config --global user.name "Lawrence Stalder"
|
||||
|
||||
- name: Get version from cargo.toml
|
||||
uses: mikefarah/yq@v4.44.5
|
||||
id: get_version
|
||||
with:
|
||||
cmd: yq -oy '.package.version' ${{ env.WORKING_DIRECTORY }}/Cargo.toml
|
||||
|
||||
- name: Check if tag exists
|
||||
run: |
|
||||
if git rev-parse ${{ steps.get_version.outputs.value }} >/dev/null 2>&1; then
|
||||
echo "Tag ${{ steps.get_version.outputs.value }} already exists"
|
||||
fi
|
||||
|
||||
- name: Remove existing tag if exists
|
||||
run: |
|
||||
if git rev-parse ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} >/dev/null 2>&1; then
|
||||
git push --delete origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
git tag -d ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
fi
|
||||
|
||||
- name: Create tag
|
||||
run: |
|
||||
git tag -a ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }} -m "Version ${{ steps.get_version.outputs.result }}"
|
||||
git push origin ${{ env.WORKING_DIRECTORY }}-${{ steps.get_version.outputs.result }}
|
||||
|
||||
- name: BuildAndPushImageOnHarbor
|
||||
run: |
|
||||
docker build -f ${{ env.WORKING_DIRECTORY }}/Dockerfile . -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:${{ steps.get_version.outputs.result }} -t harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }}:latest
|
||||
docker push harbor.nymte.ch/nym/${{ env.CONTAINER_NAME }} --all-tags
|
||||
+4
-1
@@ -51,4 +51,7 @@ ppa-private-key.b64
|
||||
ppa-private-key.asc
|
||||
nym-network-monitor/topology.json
|
||||
nym-network-monitor/__pycache__
|
||||
nym-network-monitor/*.key
|
||||
nym-network-monitor/*.key
|
||||
|
||||
*.sqlite
|
||||
.build
|
||||
Generated
+34
-25
@@ -5064,6 +5064,7 @@ dependencies = [
|
||||
"log",
|
||||
"nym-network-defaults",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"toml 0.8.14",
|
||||
"url",
|
||||
]
|
||||
@@ -5300,31 +5301,6 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-data-observatory"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum 0.7.7",
|
||||
"chrono",
|
||||
"clap 4.5.20",
|
||||
"nym-bin-common",
|
||||
"nym-network-defaults",
|
||||
"nym-node-requests",
|
||||
"nym-task",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"utoipa",
|
||||
"utoipa-swagger-ui",
|
||||
"utoipauto",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nym-dkg"
|
||||
version = "0.1.0"
|
||||
@@ -6957,6 +6933,39 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nyx-chain-watcher"
|
||||
version = "0.1.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum 0.7.7",
|
||||
"chrono",
|
||||
"clap 4.5.20",
|
||||
"nym-bin-common",
|
||||
"nym-config",
|
||||
"nym-network-defaults",
|
||||
"nym-node-requests",
|
||||
"nym-task",
|
||||
"nym-validator-client",
|
||||
"nyxd-scraper",
|
||||
"reqwest 0.12.4",
|
||||
"rocket",
|
||||
"schemars",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"thiserror",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"utoipa",
|
||||
"utoipa-swagger-ui",
|
||||
"utoipauto",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nyxd-scraper"
|
||||
version = "0.1.0"
|
||||
|
||||
+2
-2
@@ -119,8 +119,8 @@ members = [
|
||||
"nym-credential-proxy/nym-credential-proxy",
|
||||
"nym-credential-proxy/nym-credential-proxy-requests",
|
||||
"nym-credential-proxy/vpn-api-lib-wasm",
|
||||
"nym-data-observatory",
|
||||
"nym-network-monitor",
|
||||
"nyx-chain-watcher",
|
||||
"nym-node",
|
||||
"nym-node/nym-node-requests",
|
||||
"nym-node/nym-node-metrics",
|
||||
@@ -157,11 +157,11 @@ default-members = [
|
||||
"explorer-api",
|
||||
"nym-api",
|
||||
"nym-credential-proxy/nym-credential-proxy",
|
||||
"nym-data-observatory",
|
||||
"nym-node",
|
||||
"nym-node-status-api/nym-node-status-agent",
|
||||
"nym-node-status-api/nym-node-status-api",
|
||||
"nym-validator-rewarder",
|
||||
"nyx-chain-watcher",
|
||||
"service-providers/authenticator",
|
||||
"service-providers/ip-packet-router",
|
||||
"service-providers/network-requester",
|
||||
|
||||
@@ -12,7 +12,8 @@ dirs = { workspace = true, optional = true }
|
||||
handlebars = { workspace = true }
|
||||
log = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
toml = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
toml = { workspace = true, features = ["display"] }
|
||||
url = { workspace = true }
|
||||
|
||||
nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
use std::io;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum NymConfigTomlError {
|
||||
#[error(transparent)]
|
||||
FileIoFailure(#[from] io::Error),
|
||||
#[error(transparent)]
|
||||
TomlSerializeFailure(#[from] toml::ser::Error),
|
||||
}
|
||||
@@ -13,6 +13,7 @@ pub use helpers::{parse_urls, OptionalSet};
|
||||
pub use toml::de::Error as TomlDeError;
|
||||
|
||||
pub mod defaults;
|
||||
pub mod error;
|
||||
pub mod helpers;
|
||||
pub mod legacy_helpers;
|
||||
pub mod serde_helpers;
|
||||
@@ -95,6 +96,42 @@ where
|
||||
config.format_to_writer(file)
|
||||
}
|
||||
|
||||
pub fn save_unformatted_config_to_file<C, P>(
|
||||
config: &C,
|
||||
path: P,
|
||||
) -> Result<(), error::NymConfigTomlError>
|
||||
where
|
||||
C: Serialize + ?Sized,
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let path = path.as_ref();
|
||||
log::info!("saving config file to {}", path.display());
|
||||
|
||||
if let Some(parent) = path.parent() {
|
||||
create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
let mut file = File::create(path)?;
|
||||
|
||||
// TODO: check for whether any of our configs store anything sensitive
|
||||
// and change that to 0o644 instead
|
||||
#[cfg(target_family = "unix")]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
let mut perms = fs::metadata(path)?.permissions();
|
||||
perms.set_mode(0o600);
|
||||
fs::set_permissions(path, perms)?;
|
||||
}
|
||||
|
||||
// let serde format the TOML in whatever ugly way it chooses
|
||||
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
|
||||
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
|
||||
let toml_string = toml::to_string_pretty(config)?;
|
||||
|
||||
Ok(file.write_all(toml_string.as_bytes())?)
|
||||
}
|
||||
|
||||
pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
|
||||
where
|
||||
C: DeserializeOwned,
|
||||
|
||||
@@ -42,8 +42,43 @@ impl PendingSync {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockProcessorConfig {
|
||||
pub pruning_options: PruningOptions,
|
||||
pub store_precommits: bool,
|
||||
pub explicit_starting_block_height: Option<u32>,
|
||||
pub use_best_effort_start_height: bool,
|
||||
}
|
||||
|
||||
impl Default for BlockProcessorConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
pruning_options: PruningOptions::nothing(),
|
||||
store_precommits: true,
|
||||
explicit_starting_block_height: None,
|
||||
use_best_effort_start_height: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockProcessorConfig {
|
||||
pub fn new(
|
||||
pruning_options: PruningOptions,
|
||||
store_precommits: bool,
|
||||
explicit_starting_block_height: Option<u32>,
|
||||
use_best_effort_start_height: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
pruning_options,
|
||||
store_precommits,
|
||||
explicit_starting_block_height,
|
||||
use_best_effort_start_height,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BlockProcessor {
|
||||
pruning_options: PruningOptions,
|
||||
config: BlockProcessorConfig,
|
||||
cancel: CancellationToken,
|
||||
synced: Arc<Notify>,
|
||||
last_processed_height: u32,
|
||||
@@ -65,9 +100,10 @@ pub struct BlockProcessor {
|
||||
msg_modules: Vec<Box<dyn MsgModule + Send>>,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
impl BlockProcessor {
|
||||
pub async fn new(
|
||||
pruning_options: PruningOptions,
|
||||
config: BlockProcessorConfig,
|
||||
cancel: CancellationToken,
|
||||
synced: Arc<Notify>,
|
||||
incoming: UnboundedReceiver<BlockToProcess>,
|
||||
@@ -82,7 +118,7 @@ impl BlockProcessor {
|
||||
let last_pruned_height = last_pruned.try_into().unwrap_or_default();
|
||||
|
||||
Ok(BlockProcessor {
|
||||
pruning_options,
|
||||
config,
|
||||
cancel,
|
||||
synced,
|
||||
last_processed_height,
|
||||
@@ -101,7 +137,7 @@ impl BlockProcessor {
|
||||
}
|
||||
|
||||
pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
|
||||
self.pruning_options = pruning_options;
|
||||
self.config.pruning_options = pruning_options;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -128,7 +164,7 @@ impl BlockProcessor {
|
||||
// we won't end up with a corrupted storage.
|
||||
let mut tx = self.storage.begin_processing_tx().await?;
|
||||
|
||||
persist_block(&full_info, &mut tx).await?;
|
||||
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
|
||||
|
||||
// let the modules do whatever they want
|
||||
// the ones wanting the full block:
|
||||
@@ -241,7 +277,7 @@ impl BlockProcessor {
|
||||
|
||||
#[instrument(skip(self))]
|
||||
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
||||
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||
let last_to_keep = self.last_processed_height - keep_recent;
|
||||
|
||||
info!(
|
||||
@@ -282,12 +318,12 @@ impl BlockProcessor {
|
||||
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
|
||||
debug!("checking for storage pruning");
|
||||
|
||||
if self.pruning_options.strategy.is_nothing() {
|
||||
if self.config.pruning_options.strategy.is_nothing() {
|
||||
trace!("the current pruning strategy is 'nothing'");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let interval = self.pruning_options.strategy_interval();
|
||||
let interval = self.config.pruning_options.strategy_interval();
|
||||
if self.last_pruned_height + interval <= self.last_processed_height {
|
||||
self.prune_storage().await?;
|
||||
}
|
||||
@@ -371,13 +407,49 @@ impl BlockProcessor {
|
||||
if latest_block > self.last_processed_height && self.last_processed_height != 0 {
|
||||
// in case we were offline for a while,
|
||||
// make sure we don't request blocks we'd have to prune anyway
|
||||
let keep_recent = self.pruning_options.strategy_keep_recent();
|
||||
let keep_recent = self.config.pruning_options.strategy_keep_recent();
|
||||
let last_to_keep = latest_block - keep_recent;
|
||||
self.last_processed_height = max(self.last_processed_height, last_to_keep);
|
||||
|
||||
let request_range = self.last_processed_height + 1..latest_block + 1;
|
||||
info!("we need to request {request_range:?} to resync");
|
||||
self.request_missing_blocks(request_range).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// this is the first time starting up
|
||||
if self.last_processed_height == 0 {
|
||||
let Some(starting_height) = self.config.explicit_starting_block_height else {
|
||||
// nothing to do
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
info!("attempting to start the scraper from block {starting_height}");
|
||||
let earliest_available =
|
||||
self.rpc_client.earliest_available_block_height().await? as u32;
|
||||
info!("earliest available block height: {earliest_available}");
|
||||
|
||||
if earliest_available > starting_height && self.config.use_best_effort_start_height {
|
||||
error!("the earliest available block is higher than the desired starting height");
|
||||
return Err(ScraperError::BlocksUnavailable {
|
||||
height: starting_height,
|
||||
});
|
||||
}
|
||||
|
||||
let starting_height = if earliest_available > starting_height {
|
||||
// add few additional blocks to account for all the startup waiting
|
||||
// because the node might have pruned few blocks since
|
||||
earliest_available + 10
|
||||
} else {
|
||||
starting_height
|
||||
};
|
||||
|
||||
let request_range = starting_height..latest_block + 1;
|
||||
|
||||
info!("going to start the scraper from block {starting_height}");
|
||||
info!("we need to request {request_range:?} before properly starting up");
|
||||
|
||||
self.request_missing_blocks(request_range).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -19,6 +19,9 @@ pub enum ScraperError {
|
||||
#[error("the block scraper is already running")]
|
||||
ScraperAlreadyRunning,
|
||||
|
||||
#[error("block information for height {height} is not available on the provided rpc endpoint")]
|
||||
BlocksUnavailable { height: u32 },
|
||||
|
||||
#[error("failed to establish websocket connection to {url}: {source}")]
|
||||
WebSocketConnectionFailure {
|
||||
url: String,
|
||||
|
||||
@@ -16,5 +16,5 @@ pub mod storage;
|
||||
|
||||
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
|
||||
pub use modules::{BlockModule, MsgModule, TxModule};
|
||||
pub use scraper::{Config, NyxdScraper};
|
||||
pub use scraper::{Config, NyxdScraper, StartingBlockOpts};
|
||||
pub use storage::models;
|
||||
|
||||
@@ -117,6 +117,17 @@ impl RpcClient {
|
||||
Ok(info.last_block_height.value())
|
||||
}
|
||||
|
||||
pub(crate) async fn earliest_available_block_height(&self) -> Result<u64, ScraperError> {
|
||||
debug!("getting earliest available block height");
|
||||
|
||||
let status = self
|
||||
.inner
|
||||
.status()
|
||||
.await
|
||||
.map_err(|source| ScraperError::AbciInfoQueryFailure { source })?;
|
||||
Ok(status.sync_info.earliest_block_height.value())
|
||||
}
|
||||
|
||||
async fn get_transaction_results(
|
||||
&self,
|
||||
raw: &[Vec<u8>],
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use crate::block_processor::types::BlockToProcess;
|
||||
use crate::block_processor::BlockProcessor;
|
||||
use crate::block_processor::{BlockProcessor, BlockProcessorConfig};
|
||||
use crate::block_requester::{BlockRequest, BlockRequester};
|
||||
use crate::error::ScraperError;
|
||||
use crate::modules::{BlockModule, MsgModule, TxModule};
|
||||
@@ -24,6 +24,15 @@ use url::Url;
|
||||
|
||||
mod subscriber;
|
||||
|
||||
#[derive(Default, Clone, Copy)]
|
||||
pub struct StartingBlockOpts {
|
||||
pub start_block_height: Option<u32>,
|
||||
|
||||
/// If the scraper fails to start from the desired height, rather than failing,
|
||||
/// attempt to use the next available height
|
||||
pub use_best_effort_start_height: bool,
|
||||
}
|
||||
|
||||
pub struct Config {
|
||||
/// Url to the websocket endpoint of a validator, for example `wss://rpc.nymtech.net/websocket`
|
||||
pub websocket_url: Url,
|
||||
@@ -34,6 +43,10 @@ pub struct Config {
|
||||
pub database_path: PathBuf,
|
||||
|
||||
pub pruning_options: PruningOptions,
|
||||
|
||||
pub store_precommits: bool,
|
||||
|
||||
pub start_block: StartingBlockOpts,
|
||||
}
|
||||
|
||||
pub struct NyxdScraperBuilder {
|
||||
@@ -60,8 +73,16 @@ impl NyxdScraperBuilder {
|
||||
req_rx,
|
||||
processing_tx.clone(),
|
||||
);
|
||||
let mut block_processor = BlockProcessor::new(
|
||||
|
||||
let block_processor_config = BlockProcessorConfig::new(
|
||||
scraper.config.pruning_options,
|
||||
scraper.config.store_precommits,
|
||||
scraper.config.start_block.start_block_height,
|
||||
scraper.config.start_block.use_best_effort_start_height,
|
||||
);
|
||||
|
||||
let mut block_processor = BlockProcessor::new(
|
||||
block_processor_config,
|
||||
scraper.cancel_token.clone(),
|
||||
scraper.startup_sync.clone(),
|
||||
processing_rx,
|
||||
@@ -158,7 +179,10 @@ impl NyxdScraper {
|
||||
self.task_tracker.close();
|
||||
}
|
||||
|
||||
pub async fn process_single_block(&self, height: u32) -> Result<(), ScraperError> {
|
||||
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
|
||||
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
|
||||
// YOU WILL BE FIRED IF YOU USE IT : )
|
||||
pub async fn unsafe_process_single_block(&self, height: u32) -> Result<(), ScraperError> {
|
||||
info!(height = height, "attempting to process a single block");
|
||||
if !self.task_tracker.is_empty() {
|
||||
return Err(ScraperError::ScraperAlreadyRunning);
|
||||
@@ -177,7 +201,10 @@ impl NyxdScraper {
|
||||
block_processor.process_block(block.into()).await
|
||||
}
|
||||
|
||||
pub async fn process_block_range(
|
||||
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
|
||||
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
|
||||
// YOU WILL BE FIRED IF YOU USE IT : )
|
||||
pub async fn unsafe_process_block_range(
|
||||
&self,
|
||||
starting_height: Option<u32>,
|
||||
end_height: Option<u32>,
|
||||
@@ -194,10 +221,10 @@ impl NyxdScraper {
|
||||
.await?
|
||||
.with_pruning(PruningOptions::nothing());
|
||||
|
||||
let current_height = self.rpc_client.current_block_height().await? as u32;
|
||||
let mut current_height = self.rpc_client.current_block_height().await? as u32;
|
||||
let last_processed = block_processor.last_process_height();
|
||||
|
||||
let starting_height = match starting_height {
|
||||
let mut starting_height = match starting_height {
|
||||
// always attempt to use whatever the user has provided
|
||||
Some(explicit) => explicit,
|
||||
None => {
|
||||
@@ -211,7 +238,8 @@ impl NyxdScraper {
|
||||
}
|
||||
};
|
||||
|
||||
let end_height = match end_height {
|
||||
let must_catch_up = end_height.is_none();
|
||||
let mut end_height = match end_height {
|
||||
// always attempt to use whatever the user has provided
|
||||
Some(explicit) => explicit,
|
||||
None => {
|
||||
@@ -226,32 +254,62 @@ impl NyxdScraper {
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
starting_height = starting_height,
|
||||
end_height = end_height,
|
||||
"attempting to process block range"
|
||||
);
|
||||
let mut last_processed = starting_height;
|
||||
|
||||
let range = (starting_height..=end_height).collect::<Vec<_>>();
|
||||
while last_processed < current_height {
|
||||
info!(
|
||||
starting_height = starting_height,
|
||||
end_height = end_height,
|
||||
"attempting to process block range"
|
||||
);
|
||||
|
||||
// the most likely bottleneck here are going to be the chain queries,
|
||||
// so batch multiple requests
|
||||
for batch in range.chunks(4) {
|
||||
let batch_result = join_all(
|
||||
batch
|
||||
.iter()
|
||||
.map(|height| self.rpc_client.get_basic_block_details(*height)),
|
||||
)
|
||||
.await;
|
||||
for result in batch_result {
|
||||
match result {
|
||||
Ok(block) => block_processor.process_block(block.into()).await?,
|
||||
Err(err) => {
|
||||
error!("failed to retrieve the block: {err}. stopping...");
|
||||
return Err(err);
|
||||
let range = (starting_height..=end_height).collect::<Vec<_>>();
|
||||
|
||||
// the most likely bottleneck here are going to be the chain queries,
|
||||
// so batch multiple requests
|
||||
for batch in range.chunks(4) {
|
||||
let batch_result = join_all(
|
||||
batch
|
||||
.iter()
|
||||
.map(|height| self.rpc_client.get_basic_block_details(*height)),
|
||||
)
|
||||
.await;
|
||||
for result in batch_result {
|
||||
match result {
|
||||
Ok(block) => block_processor.process_block(block.into()).await?,
|
||||
Err(err) => {
|
||||
error!("failed to retrieve the block: {err}. stopping...");
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if we don't need to catch up, return early
|
||||
if !must_catch_up {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// check if we have caught up to the current block height
|
||||
last_processed = end_height;
|
||||
current_height = self.rpc_client.current_block_height().await? as u32;
|
||||
|
||||
info!(
|
||||
last_processed = last_processed,
|
||||
current_height = current_height,
|
||||
"🏃 still need to catch up..."
|
||||
);
|
||||
|
||||
starting_height = last_processed + 1;
|
||||
end_height = current_height;
|
||||
}
|
||||
|
||||
if must_catch_up {
|
||||
info!(
|
||||
last_processed = last_processed,
|
||||
current_height = current_height,
|
||||
"✅ block processing has caught up!"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -275,8 +333,15 @@ impl NyxdScraper {
|
||||
req_tx: Sender<BlockRequest>,
|
||||
processing_rx: UnboundedReceiver<BlockToProcess>,
|
||||
) -> Result<BlockProcessor, ScraperError> {
|
||||
BlockProcessor::new(
|
||||
let block_processor_config = BlockProcessorConfig::new(
|
||||
self.config.pruning_options,
|
||||
self.config.store_precommits,
|
||||
self.config.start_block.start_block_height,
|
||||
self.config.start_block.use_best_effort_start_height,
|
||||
);
|
||||
|
||||
BlockProcessor::new(
|
||||
block_processor_config,
|
||||
self.cancel_token.clone(),
|
||||
self.startup_sync.clone(),
|
||||
processing_rx,
|
||||
|
||||
@@ -237,6 +237,58 @@ impl StorageManager {
|
||||
Ok(-1)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn get_transactions_after_height(
|
||||
&self,
|
||||
min_height: i64,
|
||||
message_type: Option<&str>,
|
||||
) -> Result<Vec<TransactionWithBlock>, sqlx::Error> {
|
||||
match message_type {
|
||||
Some(msg_type) => {
|
||||
sqlx::query_as!(
|
||||
TransactionWithBlock,
|
||||
r#"
|
||||
SELECT t.hash, t.height, t.memo, t.raw_log
|
||||
FROM message m
|
||||
JOIN "transaction" t ON m.transaction_hash = t.hash
|
||||
JOIN block b ON t.height = b.height
|
||||
WHERE t.height > ?
|
||||
AND m.type = ?
|
||||
ORDER BY t.height ASC
|
||||
"#,
|
||||
min_height,
|
||||
msg_type
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
sqlx::query_as!(
|
||||
TransactionWithBlock,
|
||||
r#"
|
||||
SELECT t.hash, t.height, t.memo, t.raw_log
|
||||
FROM message m
|
||||
JOIN "transaction" t ON m.transaction_hash = t.hash
|
||||
JOIN block b ON t.height = b.height
|
||||
WHERE t.height > ?
|
||||
ORDER BY t.height ASC
|
||||
"#,
|
||||
min_height
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
pub struct TransactionWithBlock {
|
||||
pub hash: String,
|
||||
pub height: i64,
|
||||
pub memo: Option<String>,
|
||||
pub raw_log: Option<String>,
|
||||
}
|
||||
|
||||
// make those generic over executor so that they could be performed over connection pool and a tx
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::{
|
||||
models::{CommitSignature, Validator},
|
||||
},
|
||||
};
|
||||
use manager::TransactionWithBlock;
|
||||
use sqlx::{types::time::OffsetDateTime, ConnectOptions, Sqlite, Transaction};
|
||||
use std::{fmt::Debug, path::Path};
|
||||
use tendermint::{
|
||||
@@ -207,11 +208,23 @@ impl ScraperStorage {
|
||||
pub async fn get_pruned_height(&self) -> Result<i64, ScraperError> {
|
||||
Ok(self.manager.get_pruned_height().await?)
|
||||
}
|
||||
|
||||
pub async fn get_transactions_after_height(
|
||||
&self,
|
||||
min_height: i64,
|
||||
message_type: Option<&str>,
|
||||
) -> Result<Vec<TransactionWithBlock>, ScraperError> {
|
||||
Ok(self
|
||||
.manager
|
||||
.get_transactions_after_height(min_height, message_type)
|
||||
.await?)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn persist_block(
|
||||
block: &FullBlockInformation,
|
||||
tx: &mut StorageTransaction,
|
||||
store_precommits: bool,
|
||||
) -> Result<(), ScraperError> {
|
||||
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
|
||||
|
||||
@@ -224,11 +237,12 @@ pub async fn persist_block(
|
||||
// persist block data
|
||||
persist_block_data(&block.block, total_gas, tx).await?;
|
||||
|
||||
// persist commits
|
||||
if let Some(commit) = &block.block.last_commit {
|
||||
persist_commits(commit, &block.validators, tx).await?;
|
||||
} else {
|
||||
warn!("no commits for block {}", block.block.header.height)
|
||||
if store_precommits {
|
||||
if let Some(commit) = &block.block.last_commit {
|
||||
persist_commits(commit, &block.validators, tx).await?;
|
||||
} else {
|
||||
warn!("no commits for block {}", block.block.header.height)
|
||||
}
|
||||
}
|
||||
|
||||
// persist txs
|
||||
|
||||
-16
@@ -1,16 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "INSERT INTO responses\n (joke_id, joke, date_created)\n VALUES\n ($1, $2, $3)\n ON CONFLICT(joke_id) DO UPDATE SET\n joke=excluded.joke,\n date_created=excluded.date_created;",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Varchar",
|
||||
"Text",
|
||||
"Int4"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "249faa11b88b749f50342bb5c9cc41d20896db543eed74a6f320c041bcbb723d"
|
||||
}
|
||||
-34
@@ -1,34 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "joke_id",
|
||||
"type_info": "Varchar"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "joke",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "date_created",
|
||||
"type_info": "Int4"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "aff7fbd06728004d2f2226d20c32f1482df00de2dc1d2b4debbb2e12553d997b"
|
||||
}
|
||||
-32
@@ -1,32 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT joke_id, joke, date_created FROM responses",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "joke_id",
|
||||
"type_info": "Varchar"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "joke",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "date_created",
|
||||
"type_info": "Int4"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "e53f479f8cead3dc8aa1875e5d450ad69686cf6a109e37d6c3f0623c3e9f91d0"
|
||||
}
|
||||
@@ -1,27 +0,0 @@
|
||||
FROM rust:latest AS builder
|
||||
|
||||
COPY ./ /usr/src/nym
|
||||
WORKDIR /usr/src/nym/nym-data-observatory
|
||||
|
||||
RUN cargo build --release
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# The following environment variables are required at runtime:
|
||||
#
|
||||
# NYM_DATA_OBSERVATORY_CONNECTION_URL
|
||||
#
|
||||
# And optionally:
|
||||
#
|
||||
# NYM_DATA_OBSERVATORY_HTTP_PORT
|
||||
#
|
||||
# see https://github.com/nymtech/nym/blob/develop/nym-data-observatory/src/main.rs for details
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
FROM ubuntu:24.04
|
||||
|
||||
RUN apt update && apt install -yy curl ca-certificates
|
||||
|
||||
WORKDIR /nym
|
||||
|
||||
COPY --from=builder /usr/src/nym/target/release/nym-data-observatory ./
|
||||
ENTRYPOINT [ "/nym/nym-data-observatory" ]
|
||||
@@ -1,58 +0,0 @@
|
||||
use anyhow::Result;
|
||||
use sqlx::{Connection, PgConnection};
|
||||
use std::io::Write;
|
||||
use std::{collections::HashMap, fs::File};
|
||||
|
||||
const POSTGRES_USER: &str = "nym";
|
||||
const POSTGRES_PASSWORD: &str = "password123";
|
||||
const POSTGRES_DB: &str = "data_obs_db";
|
||||
|
||||
/// if schema changes, rerun `cargo sqlx prepare` with a running DB
|
||||
/// https://github.com/launchbadge/sqlx/blob/main/sqlx-cli/README.md#enable-building-in-offline-mode-with-query
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let db_url =
|
||||
format!("postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:5432/{POSTGRES_DB}");
|
||||
|
||||
export_db_variables(&db_url)?;
|
||||
// if a live DB is reachable, use that
|
||||
if PgConnection::connect(&db_url).await.is_ok() {
|
||||
println!("cargo::rustc-env=SQLX_OFFLINE=false");
|
||||
run_migrations(&db_url).await?;
|
||||
} else {
|
||||
// by default, run in offline mode
|
||||
println!("cargo::rustc-env=SQLX_OFFLINE=true");
|
||||
}
|
||||
|
||||
rerun_if_changed();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn export_db_variables(db_url: &str) -> Result<()> {
|
||||
let mut map = HashMap::new();
|
||||
map.insert("POSTGRES_USER", POSTGRES_USER);
|
||||
map.insert("POSTGRES_PASSWORD", POSTGRES_PASSWORD);
|
||||
map.insert("POSTGRES_DB", POSTGRES_DB);
|
||||
map.insert("DATABASE_URL", db_url);
|
||||
|
||||
let mut file = File::create(".env")?;
|
||||
for (var, value) in map.iter() {
|
||||
println!("cargo::rustc-env={}={}", var, value);
|
||||
writeln!(file, "{}={}", var, value).expect("Failed to write to dotenv file");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_migrations(db_url: &str) -> Result<()> {
|
||||
let mut conn = PgConnection::connect(db_url).await?;
|
||||
sqlx::migrate!("./migrations").run(&mut conn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rerun_if_changed() {
|
||||
println!("cargo::rerun-if-changed=migrations");
|
||||
println!("cargo::rerun-if-changed=src/db/queries");
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:13
|
||||
container_name: nym-data-observatory-pg
|
||||
environment:
|
||||
POSTGRES_PASSWORD: password
|
||||
ports:
|
||||
- "5432:5432"
|
||||
volumes:
|
||||
- pgdata:/var/lib/postgresql/data
|
||||
|
||||
data-observatory:
|
||||
depends_on:
|
||||
- postgres
|
||||
image: nym-data-observatory:latest
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: nym-data-observatory/Dockerfile
|
||||
container_name: nym-data-observatory
|
||||
environment:
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_USERNAME: "postgres"
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD: "password"
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_HOST: "postgres"
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_PORT: "5432"
|
||||
NYM_DATA_OBSERVATORY_CONNECTION_DB: ""
|
||||
NYM_DATA_OBSERVATORY_HTTP_PORT: 8000
|
||||
env_file:
|
||||
- ../envs/qa.env
|
||||
|
||||
volumes:
|
||||
pgdata:
|
||||
@@ -1,6 +0,0 @@
|
||||
CREATE TABLE responses (
|
||||
id SERIAL PRIMARY KEY,
|
||||
joke_id VARCHAR NOT NULL UNIQUE,
|
||||
joke TEXT NOT NULL,
|
||||
date_created INTEGER NOT NULL
|
||||
);
|
||||
@@ -1,13 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# .env is generated in build.rs
|
||||
source .env
|
||||
|
||||
# Launching a container in such a way that it's destroyed after you detach from the terminal:
|
||||
docker compose up
|
||||
|
||||
# docker exec -it nym-data-observatory-pg /bin/bash
|
||||
# psql -U youruser -d yourdb
|
||||
|
||||
echo "Tearing down containers to have a clean slate"
|
||||
docker compose down -v
|
||||
@@ -1,61 +0,0 @@
|
||||
use core::str;
|
||||
use serde::Deserialize;
|
||||
use tokio::process::Command;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
|
||||
use crate::db::{self, DbPool};
|
||||
|
||||
const REFRESH_DELAY: Duration = Duration::from_secs(15);
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
|
||||
|
||||
pub(crate) async fn spawn_in_background(db_pool: DbPool) -> JoinHandle<()> {
|
||||
loop {
|
||||
tracing::info!("Running in a loop 🏃");
|
||||
|
||||
if let Err(e) = some_network_action(&db_pool).await {
|
||||
tracing::error!(
|
||||
"❌ Run failed: {e}, retrying in {}s...",
|
||||
FAILURE_RETRY_DELAY.as_secs()
|
||||
);
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
} else {
|
||||
tracing::info!(
|
||||
"✅ Run successful, sleeping for {}s...",
|
||||
REFRESH_DELAY.as_secs()
|
||||
);
|
||||
tokio::time::sleep(REFRESH_DELAY).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub(crate) struct Response {
|
||||
#[serde(rename(deserialize = "id"))]
|
||||
pub(crate) joke_id: String,
|
||||
pub(crate) joke: String,
|
||||
#[serde(rename(deserialize = "status"))]
|
||||
pub(crate) _status: u16,
|
||||
}
|
||||
|
||||
async fn some_network_action(pool: &DbPool) -> anyhow::Result<()> {
|
||||
// for demonstration purposes only. You should use reqwest if you need it
|
||||
let output = Command::new("curl")
|
||||
.arg("-H")
|
||||
.arg("Accept: application/json")
|
||||
.arg("https://icanhazdadjoke.com/")
|
||||
.output()
|
||||
.await?;
|
||||
|
||||
if !output.status.success() {
|
||||
anyhow::bail!("Curl command failed with status: {}", output.status);
|
||||
}
|
||||
|
||||
let response_str = str::from_utf8(&output.stdout)?;
|
||||
let joke_response: Response = serde_json::from_str(response_str)?;
|
||||
|
||||
tracing::info!("{:?}", joke_response.joke);
|
||||
db::queries::insert_joke(pool, joke_response.into()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::background_task::Response;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct JokeDto {
|
||||
pub(crate) joke_id: String,
|
||||
pub(crate) joke: String,
|
||||
pub(crate) date_created: i32,
|
||||
}
|
||||
|
||||
impl From<Response> for JokeDto {
|
||||
fn from(value: Response) -> Self {
|
||||
Self {
|
||||
joke_id: value.joke_id,
|
||||
joke: value.joke,
|
||||
// casting not smart, can implicitly panic, don't do this in prod
|
||||
date_created: chrono::offset::Utc::now().timestamp() as i32,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,39 +0,0 @@
|
||||
use crate::db::{models::JokeDto, DbPool};
|
||||
|
||||
pub(crate) async fn insert_joke(pool: &DbPool, joke: JokeDto) -> anyhow::Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
sqlx::query!(
|
||||
"INSERT INTO responses
|
||||
(joke_id, joke, date_created)
|
||||
VALUES
|
||||
($1, $2, $3)
|
||||
ON CONFLICT(joke_id) DO UPDATE SET
|
||||
joke=excluded.joke,
|
||||
date_created=excluded.date_created;",
|
||||
joke.joke_id,
|
||||
joke.joke,
|
||||
joke.date_created as i32,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn select_joke_by_id(pool: &DbPool, joke_id: &str) -> anyhow::Result<JokeDto> {
|
||||
sqlx::query_as!(
|
||||
JokeDto,
|
||||
"SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
|
||||
joke_id
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)
|
||||
}
|
||||
|
||||
pub(crate) async fn select_all(pool: &DbPool) -> anyhow::Result<Vec<JokeDto>> {
|
||||
sqlx::query_as!(JokeDto, "SELECT joke_id, joke, date_created FROM responses",)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
// group queries in files by theme
|
||||
mod joke;
|
||||
|
||||
// re-exporting allows us to access all queries via `queries::bla``
|
||||
pub(crate) use joke::{insert_joke, select_all, select_joke_by_id};
|
||||
@@ -1,78 +0,0 @@
|
||||
use axum::{
|
||||
extract::{Path, State},
|
||||
Json, Router,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
models::JokeDto,
|
||||
queries::{self, select_joke_by_id},
|
||||
},
|
||||
http::{
|
||||
error::{Error, HttpResult},
|
||||
state::AppState,
|
||||
},
|
||||
};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", axum::routing::get(jokes))
|
||||
.route("/:joke_id", axum::routing::get(joke_by_id))
|
||||
.route("/fetch_another", axum::routing::get(fetch_another))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Dad Jokes",
|
||||
get,
|
||||
path = "/v1/jokes",
|
||||
responses(
|
||||
(status = 200, body = Vec<JokeDto>)
|
||||
)
|
||||
)]
|
||||
async fn jokes(State(state): State<AppState>) -> HttpResult<Json<Vec<JokeDto>>> {
|
||||
queries::select_all(state.db_pool())
|
||||
.await
|
||||
.map(Json::from)
|
||||
.map_err(|_| Error::internal())
|
||||
}
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
#[into_params(parameter_in = Path)]
|
||||
struct JokeIdParam {
|
||||
joke_id: String,
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Dad Jokes",
|
||||
get,
|
||||
params(
|
||||
JokeIdParam
|
||||
),
|
||||
path = "/v1/jokes/{joke_id}",
|
||||
responses(
|
||||
(status = 200, body = JokeDto)
|
||||
)
|
||||
)]
|
||||
async fn joke_by_id(
|
||||
Path(JokeIdParam { joke_id }): Path<JokeIdParam>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<JokeDto>> {
|
||||
select_joke_by_id(state.db_pool(), &joke_id)
|
||||
.await
|
||||
.map(Json::from)
|
||||
.map_err(|_| Error::not_found(joke_id))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Dad Jokes",
|
||||
get,
|
||||
path = "/v1/jokes/fetch_another",
|
||||
responses(
|
||||
(status = 200, body = String)
|
||||
)
|
||||
)]
|
||||
async fn fetch_another(State(_state): State<AppState>) -> HttpResult<Json<String>> {
|
||||
Ok(Json(String::from("Done boss, check the DB")))
|
||||
}
|
||||
@@ -1,78 +0,0 @@
|
||||
use clap::Parser;
|
||||
use nym_network_defaults::setup_env;
|
||||
use nym_task::signal::wait_for_signal;
|
||||
|
||||
mod background_task;
|
||||
mod db;
|
||||
mod http;
|
||||
mod logging;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Port to listen on
|
||||
#[arg(long, default_value_t = 8000, env = "NYM_DATA_OBSERVATORY_HTTP_PORT")]
|
||||
http_port: u16,
|
||||
|
||||
/// Path to the environment variables file. If you don't provide one, variables for the mainnet will be used.
|
||||
#[arg(short, long, default_value = None, env = "NYM_DATA_OBSERVATORY_ENV_FILE")]
|
||||
env_file: Option<String>,
|
||||
|
||||
/// DB connection username
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_USERNAME")]
|
||||
connection_username: String,
|
||||
|
||||
/// DB connection password
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD")]
|
||||
connection_password: String,
|
||||
|
||||
/// DB connection host
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_HOST")]
|
||||
connection_host: String,
|
||||
|
||||
/// DB connection port
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PORT")]
|
||||
connection_port: String,
|
||||
|
||||
/// DB connection database name
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_DB")]
|
||||
connection_db: String,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
logging::setup_tracing_logger();
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
setup_env(args.env_file); // Defaults to mainnet if empty
|
||||
|
||||
let connection_url = format!(
|
||||
"postgres://{}:{}@{}:{}/{}",
|
||||
args.connection_username,
|
||||
args.connection_password,
|
||||
args.connection_host,
|
||||
args.connection_port,
|
||||
args.connection_db
|
||||
);
|
||||
|
||||
let storage = db::Storage::init(connection_url).await?;
|
||||
let db_pool = storage.pool_owned().await;
|
||||
tokio::spawn(async move {
|
||||
background_task::spawn_in_background(db_pool).await;
|
||||
tracing::info!("Started task");
|
||||
});
|
||||
|
||||
let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, args.http_port)
|
||||
.await
|
||||
.expect("Failed to start server");
|
||||
tracing::info!("Started HTTP server on port {}", args.http_port);
|
||||
|
||||
wait_for_signal().await;
|
||||
|
||||
if let Err(err) = shutdown_handles.shutdown().await {
|
||||
tracing::error!("{err}");
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -26,7 +26,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
|
||||
|
||||
NyxdScraper::new(config.scraper_config())
|
||||
.await?
|
||||
.process_single_block(args.height)
|
||||
.unsafe_process_single_block(args.height)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
|
||||
|
||||
NyxdScraper::new(config.scraper_config())
|
||||
.await?
|
||||
.process_block_range(args.start_height, args.stop_height)
|
||||
.unsafe_process_block_range(args.start_height, args.stop_height)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -112,6 +112,7 @@ impl Config {
|
||||
nyxd_scraper: NyxdScraper {
|
||||
websocket_url,
|
||||
pruning: Default::default(),
|
||||
store_precommits: true,
|
||||
},
|
||||
base: Base {
|
||||
upstream_nyxd: nyxd_url,
|
||||
@@ -127,6 +128,8 @@ impl Config {
|
||||
rpc_url: self.base.upstream_nyxd.clone(),
|
||||
database_path: self.storage_paths.nyxd_scraper.clone(),
|
||||
pruning_options: self.nyxd_scraper.pruning,
|
||||
store_precommits: self.nyxd_scraper.store_precommits,
|
||||
start_block_height: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,7 +317,14 @@ pub struct NyxdScraper {
|
||||
// if the value is missing, use `nothing` pruning as this was the past behaviour
|
||||
#[serde(default = "PruningOptions::nothing")]
|
||||
pub pruning: PruningOptions,
|
||||
// TODO: debug with everything that's currently hardcoded in the scraper
|
||||
|
||||
/// Specifies whether to store pre-commits within the database.
|
||||
#[serde(default = "default_store_precommits")]
|
||||
pub store_precommits: bool,
|
||||
}
|
||||
|
||||
fn default_store_precommits() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
impl NyxdScraper {
|
||||
|
||||
Generated
+1
@@ -3163,6 +3163,7 @@ dependencies = [
|
||||
"log",
|
||||
"nym-network-defaults",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"toml 0.8.19",
|
||||
"url",
|
||||
]
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
[package]
|
||||
name = "nym-data-observatory"
|
||||
version = "0.1.0"
|
||||
name = "nyx-chain-watcher"
|
||||
version = "0.1.2"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
@@ -17,16 +17,24 @@ readme.workspace = true
|
||||
anyhow = { workspace = true }
|
||||
axum = { workspace = true, features = ["tokio"] }
|
||||
chrono = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive", "env"] }
|
||||
nym-bin-common = { path = "../common/bin-common" }
|
||||
clap = { workspace = true, features = ["cargo", "derive", "env"] }
|
||||
nym-config = { path = "../common/config" }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
|
||||
nym-network-defaults = { path = "../common/network-defaults" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-node-requests = { path = "../nym-node/nym-node-requests", features = [
|
||||
"openapi",
|
||||
] }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nyxd-scraper = {path = "../common/nyxd-scraper"}
|
||||
reqwest = {workspace= true, features = ["rustls-tls"]}
|
||||
rocket = { workspace = true }
|
||||
schemars = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "time"] }
|
||||
thiserror = { workspace = true }
|
||||
time = {version = "0.3.36"}
|
||||
tokio = { workspace = true, features = ["process", "rt-multi-thread"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
@@ -40,4 +48,4 @@ utoipauto = { workspace = true }
|
||||
[build-dependencies]
|
||||
anyhow = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
|
||||
@@ -0,0 +1,32 @@
|
||||
FROM rust:latest AS builder
|
||||
|
||||
COPY ./ /usr/src/nym
|
||||
WORKDIR /usr/src/nym/nyx-chain-watcher
|
||||
|
||||
RUN cargo build --release
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# The following environment variables are required at runtime:
|
||||
#
|
||||
# NYX_CHAIN_WATCHER_DATABASE_PATH = /mnt/nyx-chain-watchter.sqlite
|
||||
# NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH = /mnt/chain-history.sqlite
|
||||
# NYX_CHAIN_WATCHER_WATCH_ACCOUNTS = "n1...,n1...,n1..."
|
||||
#
|
||||
# And optionally:
|
||||
#
|
||||
# NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES = "/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
|
||||
# NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG = /mnt/sandbox.env for sandbox environment
|
||||
#
|
||||
# see https://github.com/nymtech/nym/blob/develop/nyx-chain-watcher/src/cli/commands/run/args.rs for details
|
||||
# and https://github.com/nymtech/nym/blob/develop/nyx-chain-watcher/src/env.rs for env vars
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
FROM ubuntu:24.04
|
||||
|
||||
RUN apt update && apt install -yy curl ca-certificates
|
||||
|
||||
WORKDIR /nym
|
||||
|
||||
COPY --from=builder /usr/src/nym/target/release/nyx-chain-watcher ./
|
||||
ENTRYPOINT [ "/nym/nyx-chain-watcher", "run" ]
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
# Nyx Chain Watcher
|
||||
|
||||
A simple binary to watch addresses on the Nyx chain and to call webhooks when particular message types are in a block.
|
||||
|
||||
Look in [env.rs](./src/env.rs) for the names of environment variables that can be overridden.
|
||||
|
||||
## Running locally
|
||||
|
||||
```
|
||||
NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH=chain_history.sqlite \
|
||||
NYX_CHAIN_WATCHER_DATABASE_PATH=nyx_chain_watcher.sqlite \
|
||||
NYX_CHAIN_WATCHER_WATCH_ACCOUNTS=n1...,n1...,n1... \
|
||||
NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES="/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
|
||||
NYX_CHAIN_WATCHER_WEBHOOK_URL="https://webhook.site" \
|
||||
NYX_CHAIN_WATCHER_WEBHOOK_AUTH=1234 \
|
||||
cargo run -- run
|
||||
```
|
||||
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
use anyhow::Result;
|
||||
use sqlx::{sqlite::SqliteConnectOptions, Connection, SqliteConnection};
|
||||
use std::io::Write;
|
||||
use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let db_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
.join(".build")
|
||||
.join("nyx_chain_watcher.sqlite");
|
||||
|
||||
// Create the database directory if it doesn't exist
|
||||
if let Some(parent) = db_path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
let db_url = format!("sqlite:{}", db_path.display());
|
||||
|
||||
// Ensure database file is created with proper permissions
|
||||
let connect_options = SqliteConnectOptions::from_str(&db_url)?
|
||||
.create_if_missing(true)
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.foreign_keys(true);
|
||||
|
||||
// Create initial connection to ensure database exists
|
||||
let mut conn = SqliteConnection::connect_with(&connect_options).await?;
|
||||
|
||||
export_db_variables(&db_url)?;
|
||||
println!("cargo:rustc-env=SQLX_OFFLINE=false");
|
||||
|
||||
// Run migrations after ensuring database exists
|
||||
sqlx::migrate!("./migrations").run(&mut conn).await?;
|
||||
|
||||
// Add rerun-if-changed directives
|
||||
println!("cargo:rerun-if-changed=migrations");
|
||||
println!("cargo:rerun-if-changed=build.rs");
|
||||
println!("cargo:rerun-if-changed=src");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn export_db_variables(db_url: &str) -> Result<()> {
|
||||
let mut map = HashMap::new();
|
||||
map.insert("DATABASE_URL", db_url);
|
||||
|
||||
let mut file = File::create(".env")?;
|
||||
for (var, value) in map.iter() {
|
||||
println!("cargo:rustc-env={}={}", var, value);
|
||||
writeln!(file, "{}={}", var, value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
CREATE TABLE price_history (
|
||||
timestamp INTEGER PRIMARY KEY,
|
||||
chf REAL NOT NULL,
|
||||
usd REAL NOT NULL,
|
||||
eur REAL NOT NULL,
|
||||
btc REAL NOT NULL
|
||||
);
|
||||
@@ -0,0 +1,10 @@
|
||||
CREATE TABLE payments (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
transaction_hash TEXT NOT NULL UNIQUE,
|
||||
sender_address TEXT NOT NULL,
|
||||
receiver_address TEXT NOT NULL,
|
||||
amount REAL NOT NULL,
|
||||
timestamp INTEGER NOT NULL,
|
||||
height INTEGER NOT NULL,
|
||||
memo TEXT
|
||||
);
|
||||
@@ -0,0 +1,42 @@
|
||||
use crate::env::vars::{NYXD_SCRAPER_START_HEIGHT, NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT};
|
||||
use nyxd_scraper::{storage::ScraperStorage, NyxdScraper, PruningOptions};
|
||||
|
||||
pub(crate) async fn run_chain_scraper(
|
||||
config: &crate::config::Config,
|
||||
) -> anyhow::Result<ScraperStorage> {
|
||||
let websocket_url = std::env::var("NYXD_WS").expect("NYXD_WS not defined");
|
||||
|
||||
let rpc_url = std::env::var("NYXD").expect("NYXD not defined");
|
||||
let websocket_url = reqwest::Url::parse(&websocket_url)?;
|
||||
let rpc_url = reqwest::Url::parse(&rpc_url)?;
|
||||
|
||||
// why are those not part of CLI? : (
|
||||
let start_block_height = match std::env::var(NYXD_SCRAPER_START_HEIGHT).ok() {
|
||||
None => None,
|
||||
// blow up if passed malformed env value
|
||||
Some(raw) => Some(raw.parse()?),
|
||||
};
|
||||
|
||||
let use_best_effort_start_height =
|
||||
match std::env::var(NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT).ok() {
|
||||
None => false,
|
||||
// blow up if passed malformed env value
|
||||
Some(raw) => raw.parse()?,
|
||||
};
|
||||
|
||||
let scraper = NyxdScraper::builder(nyxd_scraper::Config {
|
||||
websocket_url,
|
||||
rpc_url,
|
||||
database_path: config.chain_scraper_database_path().into(),
|
||||
pruning_options: PruningOptions::nothing(),
|
||||
store_precommits: false,
|
||||
start_block: nyxd_scraper::StartingBlockOpts {
|
||||
start_block_height,
|
||||
use_best_effort_start_height,
|
||||
},
|
||||
});
|
||||
|
||||
let instance = scraper.build_and_start().await?;
|
||||
|
||||
Ok(instance.storage)
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::error::NyxChainWatcherError;
|
||||
use nym_bin_common::bin_info_owned;
|
||||
use nym_bin_common::output_format::OutputFormat;
|
||||
|
||||
#[derive(clap::Args, Debug)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(short, long, default_value_t = OutputFormat::default())]
|
||||
output: OutputFormat,
|
||||
}
|
||||
|
||||
pub(crate) fn execute(args: Args) -> Result<(), NyxChainWatcherError> {
|
||||
println!("{}", args.output.format(&bin_info_owned!()));
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::cli::DEFAULT_NYX_CHAIN_WATCHER_ID;
|
||||
use crate::config::payments_watcher::HttpAuthenticationOptions::AuthorizationBearerToken;
|
||||
use crate::config::payments_watcher::PaymentWatcherEntry;
|
||||
use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatcherConfig};
|
||||
use crate::error::NyxChainWatcherError;
|
||||
use nym_config::save_unformatted_config_to_file;
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
use std::str::FromStr;
|
||||
|
||||
#[derive(clap::Args, Debug)]
|
||||
pub(crate) struct Args {}
|
||||
|
||||
pub(crate) async fn execute(_args: Args) -> Result<(), NyxChainWatcherError> {
|
||||
let config_path = default_config_filepath();
|
||||
let data_dir = Config::default_data_directory(&config_path)?;
|
||||
|
||||
let builder = ConfigBuilder::new(config_path.clone(), data_dir).with_payment_watcher_config(
|
||||
PaymentWatcherConfig {
|
||||
watchers: vec![PaymentWatcherEntry {
|
||||
id: DEFAULT_NYX_CHAIN_WATCHER_ID.to_string(),
|
||||
webhook_url: "https://webhook.site".to_string(),
|
||||
watch_for_transfer_recipient_accounts: Some(vec![AccountId::from_str(
|
||||
"n17g9a2pwwkg8m60wf59pq6mv0c2wusg9ukparkz",
|
||||
)
|
||||
.unwrap()]),
|
||||
authentication: Some(AuthorizationBearerToken {
|
||||
token: "1234".to_string(),
|
||||
}),
|
||||
description: None,
|
||||
watch_for_chain_message_types: Some(vec![
|
||||
"/cosmos.bank.v1beta1.MsgSend".to_string(),
|
||||
"/ibc.applications.transfer.v1.MsgTransfer".to_string(),
|
||||
]),
|
||||
}],
|
||||
},
|
||||
);
|
||||
|
||||
let config = builder.build();
|
||||
|
||||
Ok(save_unformatted_config_to_file(&config, &config_path)?)
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
pub(crate) mod build_info;
|
||||
pub(crate) mod init;
|
||||
pub(crate) mod run;
|
||||
@@ -0,0 +1,101 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::env::vars::*;
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
|
||||
#[derive(clap::Args, Debug)]
|
||||
pub(crate) struct Args {
|
||||
/// (Override) SQLite database file path for chain watcher
|
||||
#[arg(long, env = NYX_CHAIN_WATCHER_DATABASE_PATH)]
|
||||
pub(crate) chain_watcher_db_path: Option<String>,
|
||||
|
||||
/// (Override) SQLite database file path for chain scraper history
|
||||
#[arg(long, env = NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH)]
|
||||
pub(crate) chain_history_db_path: Option<String>,
|
||||
|
||||
/// (Override) Watch for transfers to these recipient accounts
|
||||
#[clap(
|
||||
long,
|
||||
value_delimiter = ',',
|
||||
env = NYX_CHAIN_WATCHER_WATCH_ACCOUNTS
|
||||
)]
|
||||
pub watch_for_transfer_recipient_accounts: Option<Vec<AccountId>>,
|
||||
|
||||
/// (Override) Watch for chain messages of these types
|
||||
#[clap(
|
||||
long,
|
||||
value_delimiter = ',',
|
||||
env = NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES
|
||||
)]
|
||||
pub watch_for_chain_message_types: Option<Vec<String>>,
|
||||
|
||||
/// (Override) The webhook to call when we find something
|
||||
#[clap(
|
||||
long,
|
||||
env = NYX_CHAIN_WATCHER_WEBHOOK_URL
|
||||
)]
|
||||
pub webhook_url: Option<String>,
|
||||
|
||||
/// (Override) Optionally, authenticate with the webhook
|
||||
#[clap(
|
||||
long,
|
||||
env = NYX_CHAIN_WATCHER_WEBHOOK_AUTH
|
||||
)]
|
||||
pub webhook_auth: Option<String>,
|
||||
}
|
||||
|
||||
/*impl Args {
|
||||
pub(super) fn take_mnemonic(&mut self) -> Option<Zeroizing<bip39::Mnemonic>> {
|
||||
self.entry_gateway.mnemonic.take().map(Zeroizing::new)
|
||||
}
|
||||
}
|
||||
|
||||
impl Args {
|
||||
pub(crate) fn build_config(self) -> Result<Config, NymNodeError> {
|
||||
let config_path = self.config.config_path();
|
||||
let data_dir = Config::default_data_directory(&config_path)?;
|
||||
|
||||
let id = self
|
||||
.config
|
||||
.id()
|
||||
.clone()
|
||||
.ok_or(NymNodeError::MissingInitArg {
|
||||
section: "global".to_string(),
|
||||
name: "id".to_string(),
|
||||
})?;
|
||||
|
||||
let config = ConfigBuilder::new(id, config_path.clone(), data_dir.clone())
|
||||
.with_mode(self.mode.unwrap_or_default())
|
||||
.with_host(self.host.build_config_section())
|
||||
.with_http(self.http.build_config_section())
|
||||
.with_mixnet(self.mixnet.build_config_section())
|
||||
.with_wireguard(self.wireguard.build_config_section(&data_dir))
|
||||
.with_storage_paths(NymNodePaths::new(&data_dir))
|
||||
.with_mixnode(self.mixnode.build_config_section())
|
||||
.with_entry_gateway(self.entry_gateway.build_config_section(&data_dir))
|
||||
.with_exit_gateway(self.exit_gateway.build_config_section(&data_dir))
|
||||
.build();
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
pub(crate) fn override_config(self, mut config: Config) -> Config {
|
||||
if let Some(mode) = self.mode {
|
||||
config.mode = mode;
|
||||
}
|
||||
config.host = self.host.override_config_section(config.host);
|
||||
config.http = self.http.override_config_section(config.http);
|
||||
config.mixnet = self.mixnet.override_config_section(config.mixnet);
|
||||
config.wireguard = self.wireguard.override_config_section(config.wireguard);
|
||||
config.mixnode = self.mixnode.override_config_section(config.mixnode);
|
||||
config.entry_gateway = self
|
||||
.entry_gateway
|
||||
.override_config_section(config.entry_gateway);
|
||||
config.exit_gateway = self
|
||||
.exit_gateway
|
||||
.override_config_section(config.exit_gateway);
|
||||
config
|
||||
}
|
||||
}
|
||||
*/
|
||||
@@ -0,0 +1,84 @@
|
||||
use crate::cli::commands::run::args::Args;
|
||||
use crate::cli::DEFAULT_NYX_CHAIN_WATCHER_ID;
|
||||
use crate::config::payments_watcher::{HttpAuthenticationOptions, PaymentWatcherEntry};
|
||||
use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatcherConfig};
|
||||
use crate::error::NyxChainWatcherError;
|
||||
use tracing::{info, warn};
|
||||
|
||||
pub(crate) fn get_run_config(args: Args) -> Result<Config, NyxChainWatcherError> {
|
||||
info!("{args:#?}");
|
||||
|
||||
let Args {
|
||||
ref watch_for_transfer_recipient_accounts,
|
||||
mut watch_for_chain_message_types,
|
||||
webhook_auth,
|
||||
ref chain_watcher_db_path,
|
||||
ref chain_history_db_path,
|
||||
webhook_url,
|
||||
} = args;
|
||||
|
||||
// if there are no args set, then try load the config
|
||||
if args.watch_for_transfer_recipient_accounts.is_none()
|
||||
&& args.watch_for_transfer_recipient_accounts.is_none()
|
||||
&& args.chain_watcher_db_path.is_none()
|
||||
{
|
||||
info!("Loading default config file...");
|
||||
return Config::read_from_toml_file_in_default_location();
|
||||
}
|
||||
|
||||
// set default messages
|
||||
if watch_for_chain_message_types.is_none() {
|
||||
watch_for_chain_message_types = Some(vec!["/cosmos.bank.v1beta1.MsgSend".to_string()]);
|
||||
}
|
||||
|
||||
// warn if no accounts set
|
||||
if watch_for_transfer_recipient_accounts.is_none() {
|
||||
warn!(
|
||||
"You did not specify any accounts to watch in {}. Only chain data will be stored.",
|
||||
crate::env::vars::NYX_CHAIN_WATCHER_WATCH_ACCOUNTS
|
||||
);
|
||||
}
|
||||
|
||||
let config_path = default_config_filepath();
|
||||
let data_dir = Config::default_data_directory(&config_path)?;
|
||||
|
||||
let mut builder = ConfigBuilder::new(config_path, data_dir);
|
||||
|
||||
if let Some(db_path) = chain_watcher_db_path {
|
||||
info!("Overriding database url with '{db_path}'");
|
||||
builder = builder.with_db_path(db_path.clone());
|
||||
}
|
||||
|
||||
if let Some(db_path) = chain_history_db_path {
|
||||
info!("Overriding chain history database url with '{db_path}'");
|
||||
builder = builder.with_chain_scraper_db_path(db_path.clone());
|
||||
}
|
||||
|
||||
if let Some(webhook_url) = webhook_url {
|
||||
let authentication =
|
||||
webhook_auth.map(|token| HttpAuthenticationOptions::AuthorizationBearerToken { token });
|
||||
|
||||
let watcher_config = PaymentWatcherConfig {
|
||||
watchers: vec![PaymentWatcherEntry {
|
||||
id: DEFAULT_NYX_CHAIN_WATCHER_ID.to_string(),
|
||||
description: None,
|
||||
watch_for_transfer_recipient_accounts: watch_for_transfer_recipient_accounts
|
||||
.clone(),
|
||||
watch_for_chain_message_types,
|
||||
webhook_url,
|
||||
authentication,
|
||||
}],
|
||||
};
|
||||
|
||||
info!("Overriding watcher config with env vars");
|
||||
|
||||
builder = builder.with_payment_watcher_config(watcher_config);
|
||||
} else {
|
||||
warn!(
|
||||
"You did not specify a webhook in {}. Only database items will be stored.",
|
||||
crate::env::vars::NYX_CHAIN_WATCHER_WEBHOOK_URL
|
||||
);
|
||||
}
|
||||
|
||||
Ok(builder.build())
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::error::NyxChainWatcherError;
|
||||
use tokio::join;
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
mod args;
|
||||
mod config;
|
||||
|
||||
use crate::chain_scraper::run_chain_scraper;
|
||||
use crate::{db, http, payment_listener, price_scraper};
|
||||
pub(crate) use args::Args;
|
||||
use nym_task::signal::wait_for_signal;
|
||||
|
||||
pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWatcherError> {
|
||||
trace!("passed arguments: {args:#?}");
|
||||
|
||||
let config = config::get_run_config(args)?;
|
||||
|
||||
let db_path = config.database_path();
|
||||
|
||||
info!("Config is {config:#?}");
|
||||
info!(
|
||||
"Database path is {:?}",
|
||||
std::path::Path::new(&db_path)
|
||||
.canonicalize()
|
||||
.unwrap_or_default()
|
||||
);
|
||||
info!(
|
||||
"Chain History Database path is {:?}",
|
||||
std::path::Path::new(&config.chain_scraper_database_path())
|
||||
.canonicalize()
|
||||
.unwrap_or_default()
|
||||
);
|
||||
|
||||
// Ensure parent directory exists
|
||||
if let Some(parent) = std::path::Path::new(&db_path).parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
let connection_url = format!("sqlite://{}?mode=rwc", db_path);
|
||||
let storage = db::Storage::init(connection_url).await?;
|
||||
let watcher_pool = storage.pool_owned().await;
|
||||
|
||||
// Spawn the chain scraper and get its storage
|
||||
|
||||
// Spawn the payment listener task
|
||||
let payment_listener_handle = tokio::spawn({
|
||||
let obs_pool = watcher_pool.clone();
|
||||
let chain_storage = run_chain_scraper(&config).await?;
|
||||
let payment_watcher_config = config.payment_watcher_config.unwrap_or_default();
|
||||
|
||||
async move {
|
||||
if let Err(e) = payment_listener::run_payment_listener(
|
||||
payment_watcher_config,
|
||||
obs_pool,
|
||||
chain_storage,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("Payment listener error: {}", e);
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
}
|
||||
});
|
||||
|
||||
// Clone pool for each task that needs it
|
||||
//let background_pool = db_pool.clone();
|
||||
|
||||
let price_scraper_handle = tokio::spawn(async move {
|
||||
price_scraper::run_price_scraper(&watcher_pool).await;
|
||||
});
|
||||
|
||||
let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, http_port)
|
||||
.await
|
||||
.expect("Failed to start server");
|
||||
|
||||
info!("Started HTTP server on port {}", http_port);
|
||||
|
||||
// Wait for the short-lived tasks to complete
|
||||
let _ = join!(price_scraper_handle, payment_listener_handle);
|
||||
|
||||
// Wait for a signal to terminate the long-running task
|
||||
wait_for_signal().await;
|
||||
|
||||
if let Err(err) = shutdown_handles.shutdown().await {
|
||||
error!("{err}");
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::cli::commands::{build_info, init, run};
|
||||
use crate::env::vars::*;
|
||||
use crate::error::NyxChainWatcherError;
|
||||
use clap::{Parser, Subcommand};
|
||||
use nym_bin_common::bin_info;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
mod commands;
|
||||
|
||||
pub const DEFAULT_NYX_CHAIN_WATCHER_ID: &str = "default-nyx-chain-watcher";
|
||||
|
||||
// Helper for passing LONG_VERSION to clap
|
||||
fn pretty_build_info_static() -> &'static str {
|
||||
static PRETTY_BUILD_INFORMATION: OnceLock<String> = OnceLock::new();
|
||||
PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print())
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)]
|
||||
pub(crate) struct Cli {
|
||||
/// Path pointing to an env file that configures the nym-chain-watcher and overrides any preconfigured values.
|
||||
#[clap(
|
||||
short,
|
||||
long,
|
||||
env = NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG
|
||||
)]
|
||||
pub(crate) config_env_file: Option<std::path::PathBuf>,
|
||||
|
||||
/// Flag used for disabling the printed banner in tty.
|
||||
#[clap(
|
||||
long,
|
||||
env = NYX_CHAIN_WATCHER_NO_BANNER_ARG
|
||||
)]
|
||||
pub(crate) no_banner: bool,
|
||||
|
||||
/// Port to listen on
|
||||
#[arg(long, default_value_t = 8000, env = "NYX_CHAIN_WATCHER_HTTP_PORT")]
|
||||
pub(crate) http_port: u16,
|
||||
|
||||
#[clap(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
pub(crate) async fn execute(self) -> Result<(), NyxChainWatcherError> {
|
||||
match self.command {
|
||||
Commands::BuildInfo(args) => build_info::execute(args),
|
||||
Commands::Run(args) => run::execute(*args, self.http_port).await,
|
||||
Commands::Init(args) => init::execute(args).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
pub(crate) enum Commands {
|
||||
/// Show build information of this binary
|
||||
BuildInfo(build_info::Args),
|
||||
|
||||
/// Start this nym-chain-watcher
|
||||
Run(Box<run::Args>),
|
||||
|
||||
/// Initialise config
|
||||
Init(init::Args),
|
||||
}
|
||||
@@ -0,0 +1,249 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::config::template::CONFIG_TEMPLATE;
|
||||
use nym_bin_common::logging::LoggingSettings;
|
||||
use nym_config::{
|
||||
must_get_home, read_config_from_toml_file, save_unformatted_config_to_file, NymConfigTemplate,
|
||||
DEFAULT_CONFIG_DIR, DEFAULT_CONFIG_FILENAME, DEFAULT_DATA_DIR, NYM_DIR,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tracing::{debug, error};
|
||||
|
||||
pub(crate) mod payments_watcher;
|
||||
mod template;
|
||||
|
||||
pub use crate::config::payments_watcher::PaymentWatcherConfig;
|
||||
use crate::error::NyxChainWatcherError;
|
||||
|
||||
const DEFAULT_NYM_CHAIN_WATCHER_DIR: &str = "nym-chain-watcher";
|
||||
|
||||
pub(crate) const DEFAULT_NYM_CHAIN_WATCHER_DB_FILENAME: &str = "nyx_chain_watcher.sqlite";
|
||||
pub(crate) const DEFAULT_NYM_CHAIN_SCRAPER_HISTORY_DB_FILENAME: &str = "chain_history.sqlite";
|
||||
|
||||
/// Derive default path to nym-chain-watcher's config directory.
|
||||
/// It should get resolved to `$HOME/.nym/nym-chain-watcher/config`
|
||||
pub fn default_config_directory() -> PathBuf {
|
||||
must_get_home()
|
||||
.join(NYM_DIR)
|
||||
.join(DEFAULT_NYM_CHAIN_WATCHER_DIR)
|
||||
.join(DEFAULT_CONFIG_DIR)
|
||||
}
|
||||
|
||||
/// Derive default path to nym-chain-watcher's config file.
|
||||
/// It should get resolved to `$HOME/.nym/nym-chain-watcher/config/config.toml`
|
||||
pub fn default_config_filepath() -> PathBuf {
|
||||
default_config_directory().join(DEFAULT_CONFIG_FILENAME)
|
||||
}
|
||||
|
||||
pub struct ConfigBuilder {
|
||||
pub config_path: PathBuf,
|
||||
|
||||
pub data_dir: PathBuf,
|
||||
|
||||
pub db_path: Option<String>,
|
||||
|
||||
pub chain_scraper_db_path: Option<String>,
|
||||
|
||||
pub payment_watcher_config: Option<PaymentWatcherConfig>,
|
||||
|
||||
pub logging: Option<LoggingSettings>,
|
||||
}
|
||||
|
||||
impl ConfigBuilder {
|
||||
pub fn new(config_path: PathBuf, data_dir: PathBuf) -> Self {
|
||||
ConfigBuilder {
|
||||
config_path,
|
||||
data_dir,
|
||||
payment_watcher_config: None,
|
||||
logging: None,
|
||||
db_path: None,
|
||||
chain_scraper_db_path: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_db_path(mut self, db_path: String) -> Self {
|
||||
self.db_path = Some(db_path);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_chain_scraper_db_path(mut self, chain_scraper_db_path: String) -> Self {
|
||||
self.chain_scraper_db_path = Some(chain_scraper_db_path);
|
||||
self
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn with_payment_watcher_config(
|
||||
mut self,
|
||||
payment_watcher_config: impl Into<PaymentWatcherConfig>,
|
||||
) -> Self {
|
||||
self.payment_watcher_config = Some(payment_watcher_config.into());
|
||||
self
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn with_logging(mut self, section: impl Into<Option<LoggingSettings>>) -> Self {
|
||||
self.logging = section.into();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> Config {
|
||||
Config {
|
||||
logging: self.logging.unwrap_or_default(),
|
||||
save_path: Some(self.config_path),
|
||||
payment_watcher_config: self.payment_watcher_config,
|
||||
data_dir: self.data_dir,
|
||||
db_path: self.db_path,
|
||||
chain_scraper_db_path: self.chain_scraper_db_path,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct Config {
|
||||
// additional metadata holding on-disk location of this config file
|
||||
#[serde(skip)]
|
||||
pub(crate) save_path: Option<PathBuf>,
|
||||
|
||||
#[serde(skip)]
|
||||
pub(crate) data_dir: PathBuf,
|
||||
|
||||
#[serde(skip)]
|
||||
db_path: Option<String>,
|
||||
|
||||
#[serde(skip)]
|
||||
chain_scraper_db_path: Option<String>,
|
||||
|
||||
pub payment_watcher_config: Option<PaymentWatcherConfig>,
|
||||
|
||||
#[serde(default)]
|
||||
pub logging: LoggingSettings,
|
||||
}
|
||||
|
||||
impl NymConfigTemplate for Config {
|
||||
fn template(&self) -> &'static str {
|
||||
CONFIG_TEMPLATE
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
#[allow(unused)]
|
||||
pub fn save(&self) -> Result<(), NyxChainWatcherError> {
|
||||
let save_location = self.save_location();
|
||||
debug!(
|
||||
"attempting to save config file to '{}'",
|
||||
save_location.display()
|
||||
);
|
||||
save_unformatted_config_to_file(self, &save_location).map_err(|source| {
|
||||
NyxChainWatcherError::UnformattedConfigSaveFailure {
|
||||
path: save_location,
|
||||
source,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub fn save_location(&self) -> PathBuf {
|
||||
self.save_path
|
||||
.clone()
|
||||
.unwrap_or(self.default_save_location())
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub fn default_save_location(&self) -> PathBuf {
|
||||
default_config_filepath()
|
||||
}
|
||||
|
||||
pub fn default_data_directory<P: AsRef<Path>>(
|
||||
config_path: P,
|
||||
) -> Result<PathBuf, NyxChainWatcherError> {
|
||||
let config_path = config_path.as_ref();
|
||||
|
||||
// we got a proper path to the .toml file
|
||||
let Some(config_dir) = config_path.parent() else {
|
||||
error!(
|
||||
"'{}' does not have a parent directory. Have you pointed to the fs root?",
|
||||
config_path.display()
|
||||
);
|
||||
return Err(NyxChainWatcherError::DataDirDerivationFailure);
|
||||
};
|
||||
|
||||
let Some(config_dir_name) = config_dir.file_name() else {
|
||||
error!(
|
||||
"could not obtain parent directory name of '{}'. Have you used relative paths?",
|
||||
config_path.display()
|
||||
);
|
||||
return Err(NyxChainWatcherError::DataDirDerivationFailure);
|
||||
};
|
||||
|
||||
if config_dir_name != DEFAULT_CONFIG_DIR {
|
||||
error!(
|
||||
"the parent directory of '{}' ({}) is not {DEFAULT_CONFIG_DIR}. currently this is not supported",
|
||||
config_path.display(), config_dir_name.to_str().unwrap_or("UNKNOWN")
|
||||
);
|
||||
return Err(NyxChainWatcherError::DataDirDerivationFailure);
|
||||
}
|
||||
|
||||
let Some(node_dir) = config_dir.parent() else {
|
||||
error!(
|
||||
"'{}' does not have a parent directory. Have you pointed to the fs root?",
|
||||
config_dir.display()
|
||||
);
|
||||
return Err(NyxChainWatcherError::DataDirDerivationFailure);
|
||||
};
|
||||
|
||||
Ok(node_dir.join(DEFAULT_DATA_DIR))
|
||||
}
|
||||
|
||||
pub fn database_path(&self) -> String {
|
||||
self.db_path.clone().unwrap_or_else(|| {
|
||||
let mut path = self.data_dir.clone().to_path_buf();
|
||||
path.push(DEFAULT_NYM_CHAIN_WATCHER_DB_FILENAME);
|
||||
path.to_str()
|
||||
.unwrap_or(DEFAULT_NYM_CHAIN_WATCHER_DB_FILENAME)
|
||||
.to_string()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn chain_scraper_database_path(&self) -> String {
|
||||
self.chain_scraper_db_path.clone().unwrap_or_else(|| {
|
||||
let mut path = self.data_dir.clone().to_path_buf();
|
||||
path.push(DEFAULT_NYM_CHAIN_SCRAPER_HISTORY_DB_FILENAME);
|
||||
path.to_str()
|
||||
.unwrap_or(DEFAULT_NYM_CHAIN_SCRAPER_HISTORY_DB_FILENAME)
|
||||
.to_string()
|
||||
})
|
||||
}
|
||||
|
||||
// simple wrapper that reads config file and assigns path location
|
||||
fn read_from_path<P: AsRef<Path>>(path: P, data_dir: P) -> Result<Self, NyxChainWatcherError> {
|
||||
let path = path.as_ref();
|
||||
let data_dir = data_dir.as_ref();
|
||||
let mut loaded: Config = read_config_from_toml_file(path).map_err(|source| {
|
||||
NyxChainWatcherError::ConfigLoadFailure {
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
}
|
||||
})?;
|
||||
loaded.data_dir = data_dir.to_path_buf();
|
||||
loaded.save_path = Some(path.to_path_buf());
|
||||
debug!("loaded config file from {}", path.display());
|
||||
Ok(loaded)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub fn read_from_toml_file<P: AsRef<Path>>(
|
||||
path: P,
|
||||
data_dir: P,
|
||||
) -> Result<Self, NyxChainWatcherError> {
|
||||
Self::read_from_path(path, data_dir)
|
||||
}
|
||||
|
||||
pub fn read_from_toml_file_in_default_location() -> Result<Self, NyxChainWatcherError> {
|
||||
let config_path = default_config_filepath();
|
||||
let data_dir = Config::default_data_directory(&config_path)?;
|
||||
Self::read_from_path(config_path, data_dir)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct PaymentWatcherConfig {
|
||||
pub watchers: Vec<PaymentWatcherEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PaymentWatcherEntry {
|
||||
pub id: String,
|
||||
pub description: Option<String>,
|
||||
pub webhook_url: String,
|
||||
pub watch_for_transfer_recipient_accounts: Option<Vec<AccountId>>,
|
||||
pub watch_for_chain_message_types: Option<Vec<String>>,
|
||||
pub authentication: Option<HttpAuthenticationOptions>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum HttpAuthenticationOptions {
|
||||
AuthorizationBearerToken { token: String },
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
// While using normal toml marshalling would have been way simpler with less overhead,
|
||||
// I think it's useful to have comments attached to the saved config file to explain behaviour of
|
||||
// particular fields.
|
||||
// Note: any changes to the template must be reflected in the appropriate structs.
|
||||
pub(crate) const CONFIG_TEMPLATE: &str = r#"
|
||||
# This is a TOML config file.
|
||||
# For more information, see https://github.com/toml-lang/toml
|
||||
|
||||
[payment_watcher_config]
|
||||
{{#each payment_watcher_config.watchers }}
|
||||
[[watchers]]
|
||||
id={{this.id}}
|
||||
description='{{this.description}}'
|
||||
webhook_url='{{this.webhook_url}}'
|
||||
{{/each}}
|
||||
|
||||
|
||||
|
||||
|
||||
##### logging configuration options #####
|
||||
|
||||
[logging]
|
||||
|
||||
# TODO
|
||||
|
||||
"#;
|
||||
@@ -1,13 +1,16 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use sqlx::{migrate::Migrator, postgres::PgConnectOptions, ConnectOptions, PgPool};
|
||||
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, SqlitePool};
|
||||
use std::str::FromStr;
|
||||
|
||||
pub(crate) mod models;
|
||||
pub(crate) mod queries;
|
||||
pub(crate) mod queries {
|
||||
pub mod payments;
|
||||
pub mod price;
|
||||
}
|
||||
|
||||
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
|
||||
|
||||
pub(crate) type DbPool = PgPool;
|
||||
pub(crate) type DbPool = SqlitePool;
|
||||
|
||||
pub(crate) struct Storage {
|
||||
pool: DbPool,
|
||||
@@ -16,13 +19,16 @@ pub(crate) struct Storage {
|
||||
impl Storage {
|
||||
pub async fn init(connection_url: String) -> Result<Self> {
|
||||
let connect_options =
|
||||
PgConnectOptions::from_str(&connection_url)?.disable_statement_logging();
|
||||
SqliteConnectOptions::from_str(&connection_url)?.create_if_missing(true);
|
||||
|
||||
let pool = DbPool::connect_with(connect_options)
|
||||
.await
|
||||
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
|
||||
|
||||
MIGRATOR.run(&pool).await?;
|
||||
MIGRATOR
|
||||
.run(&pool)
|
||||
.await
|
||||
.map_err(|err| anyhow!("Failed to run migrations: {}", err))?;
|
||||
|
||||
Ok(Storage { pool })
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
#[derive(Clone, Deserialize, Debug)]
|
||||
pub(crate) struct CurrencyPrices {
|
||||
pub(crate) chf: f32,
|
||||
pub(crate) usd: f32,
|
||||
pub(crate) eur: f32,
|
||||
pub(crate) btc: f32,
|
||||
}
|
||||
|
||||
// Struct to hold Coingecko response
|
||||
#[derive(Clone, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct CoingeckoPriceResponse {
|
||||
pub(crate) nym: CurrencyPrices,
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct PriceRecord {
|
||||
pub(crate) timestamp: i64,
|
||||
pub(crate) nym: CurrencyPrices,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct PriceHistory {
|
||||
pub(crate) timestamp: i64,
|
||||
pub(crate) chf: f32,
|
||||
pub(crate) usd: f32,
|
||||
pub(crate) eur: f32,
|
||||
pub(crate) btc: f32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct PaymentRecord {
|
||||
pub(crate) transaction_hash: String,
|
||||
pub(crate) sender_address: String,
|
||||
pub(crate) receiver_address: String,
|
||||
pub(crate) amount: f64,
|
||||
pub(crate) timestamp: i64,
|
||||
pub(crate) height: i64,
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
mod payments;
|
||||
mod price;
|
||||
|
||||
// re-exporting allows us to access all queries via `queries::bla``
|
||||
pub(crate) use payments::{get_last_checked_height, insert_payment};
|
||||
pub(crate) use price::{get_latest_price, insert_nym_prices};
|
||||
@@ -0,0 +1,41 @@
|
||||
use crate::db::DbPool;
|
||||
use anyhow::Result;
|
||||
|
||||
pub async fn get_last_checked_height(pool: &DbPool) -> Result<i64> {
|
||||
let result = sqlx::query_scalar!("SELECT MAX(height) FROM payments")
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
Ok(result.unwrap_or(0))
|
||||
}
|
||||
|
||||
pub async fn insert_payment(
|
||||
pool: &DbPool,
|
||||
transaction_hash: String,
|
||||
sender_address: String,
|
||||
receiver_address: String,
|
||||
amount: f64,
|
||||
height: i64,
|
||||
memo: Option<String>,
|
||||
) -> Result<()> {
|
||||
let timestamp = chrono::Utc::now().timestamp();
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO payments (
|
||||
transaction_hash, sender_address, receiver_address,
|
||||
amount, height, timestamp, memo
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
transaction_hash,
|
||||
sender_address,
|
||||
receiver_address,
|
||||
amount,
|
||||
height,
|
||||
timestamp,
|
||||
memo,
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
use crate::db::models::{PriceHistory, PriceRecord};
|
||||
use crate::db::DbPool;
|
||||
|
||||
pub(crate) async fn insert_nym_prices(
|
||||
pool: &DbPool,
|
||||
price_data: PriceRecord,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let timestamp = price_data.timestamp;
|
||||
sqlx::query!(
|
||||
"INSERT INTO price_history
|
||||
(timestamp, chf, usd, eur, btc)
|
||||
VALUES
|
||||
($1, $2, $3, $4, $5)
|
||||
ON CONFLICT(timestamp) DO UPDATE SET
|
||||
chf=excluded.chf,
|
||||
usd=excluded.usd,
|
||||
eur=excluded.eur,
|
||||
btc=excluded.btc;",
|
||||
timestamp,
|
||||
price_data.nym.chf,
|
||||
price_data.nym.usd,
|
||||
price_data.nym.eur,
|
||||
price_data.nym.btc,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_latest_price(pool: &DbPool) -> anyhow::Result<PriceHistory> {
|
||||
let result = sqlx::query!(
|
||||
"SELECT timestamp, chf, usd, eur, btc FROM price_history ORDER BY timestamp DESC LIMIT 1;"
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok(PriceHistory {
|
||||
timestamp: result.timestamp,
|
||||
chf: result.chf as f32,
|
||||
usd: result.usd as f32,
|
||||
eur: result.eur as f32,
|
||||
btc: result.btc as f32,
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
#[allow(unused)]
|
||||
pub mod vars {
|
||||
pub const NYX_CHAIN_WATCHER_NO_BANNER_ARG: &str = "NYX_CHAIN_WATCHER_NO_BANNER";
|
||||
pub const NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG: &str = "NYX_CHAIN_WATCHER_CONFIG_ENV_FILE_ARG";
|
||||
|
||||
pub const NYX_CHAIN_WATCHER_DATABASE_PATH: &str = "NYX_CHAIN_WATCHER_DATABASE_PATH";
|
||||
pub const NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH: &str =
|
||||
"NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH";
|
||||
|
||||
pub const NYXD_SCRAPER_START_HEIGHT: &str = "NYXD_SCRAPER_START_HEIGHT";
|
||||
pub const NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT: &str =
|
||||
"NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT";
|
||||
|
||||
pub const NYX_CHAIN_WATCHER_ID_ARG: &str = "NYX_CHAIN_WATCHER_ID";
|
||||
pub const NYX_CHAIN_WATCHER_OUTPUT_ARG: &str = "NYX_CHAIN_WATCHER_OUTPUT";
|
||||
|
||||
pub const NYX_CHAIN_WATCHER_CONFIG_PATH_ARG: &str = "NYX_CHAIN_WATCHER_CONFIG";
|
||||
|
||||
pub const NYX_CHAIN_WATCHER_WATCH_ACCOUNTS: &str = "NYX_CHAIN_WATCHER_WATCH_ACCOUNTS";
|
||||
pub const NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES: &str =
|
||||
"NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES";
|
||||
pub const NYX_CHAIN_WATCHER_WEBHOOK_URL: &str = "NYX_CHAIN_WATCHER_WEBHOOK_URL";
|
||||
pub const NYX_CHAIN_WATCHER_WEBHOOK_AUTH: &str = "NYX_CHAIN_WATCHER_WEBHOOK_AUTH";
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum NyxChainWatcherError {
|
||||
// #[error("failed to save config file using path '{}'. detailed message: {source}", path.display())]
|
||||
// ConfigSaveFailure {
|
||||
// path: PathBuf,
|
||||
// #[source]
|
||||
// source: io::Error,
|
||||
// },
|
||||
#[error("failed to save config file using path '{}'. detailed message: {source}", path.display())]
|
||||
UnformattedConfigSaveFailure {
|
||||
path: PathBuf,
|
||||
#[source]
|
||||
source: nym_config::error::NymConfigTomlError,
|
||||
},
|
||||
|
||||
#[error("could not derive path to data directory of this nyx chain watcher")]
|
||||
DataDirDerivationFailure,
|
||||
|
||||
// #[error("could not derive path to config directory of this nyx chain watcher")]
|
||||
// ConfigDirDerivationFailure,
|
||||
#[error("failed to load config file using path '{}'. detailed message: {source}", path.display())]
|
||||
ConfigLoadFailure {
|
||||
path: PathBuf,
|
||||
#[source]
|
||||
source: io::Error,
|
||||
},
|
||||
|
||||
#[error(transparent)]
|
||||
FileIoFailure(#[from] io::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
AnyhowFailure(#[from] anyhow::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
NymConfigTomlE(#[from] nym_config::error::NymConfigTomlError),
|
||||
}
|
||||
@@ -7,8 +7,8 @@ use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
use crate::http::{api_docs, server::HttpServer, state::AppState};
|
||||
|
||||
pub(crate) mod jokes;
|
||||
pub(crate) mod mixnodes;
|
||||
pub(crate) mod price;
|
||||
|
||||
pub(crate) struct RouterBuilder {
|
||||
unfinished_router: Router<AppState>,
|
||||
@@ -28,8 +28,9 @@ impl RouterBuilder {
|
||||
.nest(
|
||||
"/v1",
|
||||
Router::new()
|
||||
.nest("/jokes", jokes::routes())
|
||||
.nest("/mixnodes", mixnodes::routes()),
|
||||
//.nest("/jokes", jokes::routes())
|
||||
.nest("/mixnodes", mixnodes::routes())
|
||||
.nest("/price", price::routes()),
|
||||
);
|
||||
|
||||
Self {
|
||||
@@ -0,0 +1,27 @@
|
||||
use crate::db::models::PriceHistory;
|
||||
use crate::db::queries::price::get_latest_price;
|
||||
use crate::http::error::Error;
|
||||
use crate::http::error::HttpResult;
|
||||
use crate::http::state::AppState;
|
||||
use axum::{extract::State, Json, Router};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new().route("/", axum::routing::get(price))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Nym Price",
|
||||
get,
|
||||
path = "/v1/price",
|
||||
responses(
|
||||
(status = 200, body = String)
|
||||
)
|
||||
)]
|
||||
|
||||
/// Fetch the latest price cached by this API
|
||||
async fn price(State(state): State<AppState>) -> HttpResult<Json<PriceHistory>> {
|
||||
get_latest_price(state.db_pool())
|
||||
.await
|
||||
.map(Json::from)
|
||||
.map_err(|_| Error::internal())
|
||||
}
|
||||
@@ -4,7 +4,7 @@ use utoipauto::utoipauto;
|
||||
// manually import external structs which are behind feature flags because they
|
||||
// can't be automatically discovered
|
||||
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
|
||||
#[utoipauto(paths = "./nym-data-observatory/src")]
|
||||
#[utoipauto(paths = "./nyx-chain-watcher/src")]
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(info(title = "Nym API"), tags(), components(schemas()))]
|
||||
#[openapi(info(title = "Nyx Chain Watcher API"), tags(), components(schemas()))]
|
||||
pub(super) struct ApiDoc;
|
||||
@@ -6,13 +6,6 @@ pub(crate) struct Error {
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub(crate) fn not_found(message: String) -> Self {
|
||||
Self {
|
||||
message,
|
||||
status: axum::http::StatusCode::NOT_FOUND,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn internal() -> Self {
|
||||
Self {
|
||||
message: String::from("Internal server error"),
|
||||
@@ -0,0 +1,35 @@
|
||||
use clap::{crate_name, crate_version, Parser};
|
||||
use nym_bin_common::bin_info_owned;
|
||||
use nym_bin_common::logging::maybe_print_banner;
|
||||
use nym_network_defaults::setup_env;
|
||||
use tracing::info;
|
||||
|
||||
mod chain_scraper;
|
||||
mod cli;
|
||||
mod config;
|
||||
mod db;
|
||||
mod env;
|
||||
mod error;
|
||||
mod http;
|
||||
mod logging;
|
||||
pub mod models;
|
||||
mod payment_listener;
|
||||
mod price_scraper;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cli = cli::Cli::parse();
|
||||
setup_env(cli.config_env_file.as_ref());
|
||||
logging::setup_tracing_logger();
|
||||
|
||||
if !cli.no_banner {
|
||||
maybe_print_banner(crate_name!(), crate_version!());
|
||||
}
|
||||
|
||||
let bin_info = bin_info_owned!();
|
||||
info!("using the following version: {bin_info}");
|
||||
|
||||
cli.execute().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
use rocket::serde::{Deserialize, Serialize};
|
||||
use schemars::JsonSchema;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, JsonSchema, ToSchema)]
|
||||
pub struct WebhookPayload {
|
||||
pub transaction_hash: String,
|
||||
pub message_index: u64,
|
||||
pub sender_address: String,
|
||||
pub receiver_address: String,
|
||||
pub amount: String,
|
||||
pub height: u128,
|
||||
pub memo: Option<String>,
|
||||
}
|
||||
@@ -0,0 +1,215 @@
|
||||
use crate::config::payments_watcher::HttpAuthenticationOptions;
|
||||
use crate::config::PaymentWatcherConfig;
|
||||
use crate::db::queries;
|
||||
use crate::models::WebhookPayload;
|
||||
use nym_validator_client::nyxd::AccountId;
|
||||
use nyxd_scraper::storage::ScraperStorage;
|
||||
use reqwest::Client;
|
||||
use rocket::form::validate::Contains;
|
||||
use serde_json::Value;
|
||||
use sqlx::SqlitePool;
|
||||
use std::str::FromStr;
|
||||
use tokio::time::{self, Duration};
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TransferEvent {
|
||||
recipient: AccountId,
|
||||
sender: AccountId,
|
||||
amount: String,
|
||||
message_index: u64,
|
||||
}
|
||||
|
||||
pub(crate) async fn run_payment_listener(
|
||||
payment_watcher_config: PaymentWatcherConfig,
|
||||
watcher_pool: SqlitePool,
|
||||
chain_storage: ScraperStorage,
|
||||
) -> anyhow::Result<()> {
|
||||
let client = Client::new();
|
||||
|
||||
let default_message_types = vec!["/cosmos.bank.v1beta1.MsgSend".to_string()];
|
||||
|
||||
loop {
|
||||
// 1. get the last height this watcher ran at
|
||||
let last_checked_height = queries::payments::get_last_checked_height(&watcher_pool).await?;
|
||||
info!("Last checked height: {}", last_checked_height);
|
||||
|
||||
// 2. iterate through watchers
|
||||
for watcher in &payment_watcher_config.watchers {
|
||||
let watch_for_chain_message_types = watcher
|
||||
.watch_for_chain_message_types
|
||||
.as_ref()
|
||||
.unwrap_or(&default_message_types);
|
||||
|
||||
// 3. build up transactions that match the message types we are looking for
|
||||
let mut transactions = vec![];
|
||||
for message_type in watch_for_chain_message_types {
|
||||
match chain_storage
|
||||
.get_transactions_after_height(
|
||||
last_checked_height,
|
||||
Some(message_type),
|
||||
)
|
||||
.await {
|
||||
Ok(txs) => {
|
||||
for t in txs {
|
||||
transactions.push(t);
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to get transactions (message_type = {message_type}) from scraper database: {e}")
|
||||
}
|
||||
}
|
||||
|
||||
for tx in transactions {
|
||||
if let Some(raw_log) = tx.raw_log.as_deref() {
|
||||
if let Some(watch_for_transfer_recipient_accounts) =
|
||||
&watcher.watch_for_transfer_recipient_accounts
|
||||
{
|
||||
// 4. match recipient accounts we are looking for
|
||||
match parse_transfer_from_raw_log(
|
||||
raw_log,
|
||||
watch_for_transfer_recipient_accounts,
|
||||
) {
|
||||
Ok(transfer_events) => {
|
||||
if !transfer_events.is_empty() {
|
||||
info!(
|
||||
"[watcher = {}] Processing transaction: {} - {} payment events found",
|
||||
watcher.id, tx.hash, transfer_events.len()
|
||||
);
|
||||
}
|
||||
|
||||
for transfer in transfer_events {
|
||||
let amount: f64 = parse_unym_amount(&transfer.amount)?;
|
||||
|
||||
queries::payments::insert_payment(
|
||||
&watcher_pool,
|
||||
tx.hash.clone(),
|
||||
transfer.sender.clone().to_string(),
|
||||
transfer.recipient.clone().to_string(),
|
||||
amount,
|
||||
tx.height,
|
||||
tx.memo.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let webhook_data = WebhookPayload {
|
||||
transaction_hash: tx.hash.clone(),
|
||||
message_index: transfer.message_index,
|
||||
sender_address: transfer.sender.to_string(),
|
||||
receiver_address: transfer.recipient.to_string(),
|
||||
amount: transfer.amount,
|
||||
height: tx.height as u128,
|
||||
memo: tx.memo.clone(),
|
||||
};
|
||||
|
||||
let mut request_builder =
|
||||
client.post(&watcher.webhook_url).json(&webhook_data);
|
||||
|
||||
if let Some(auth) = &watcher.authentication {
|
||||
match auth {
|
||||
HttpAuthenticationOptions::AuthorizationBearerToken { token } => {
|
||||
request_builder = request_builder.bearer_auth(token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match request_builder.send().await {
|
||||
Ok(res) => info!(
|
||||
"[watcher = {}] ✅ Webhook {} {} - tx {}, index {}",
|
||||
watcher.id,
|
||||
res.status(),
|
||||
res.url(),
|
||||
tx.hash,
|
||||
transfer.message_index,
|
||||
),
|
||||
Err(e) => error!(
|
||||
"[watcher = {}] ❌ Webhook {:?} {:?} error = {}",
|
||||
watcher.id,
|
||||
e.status(),
|
||||
e.url(),
|
||||
e,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => error!(
|
||||
"[watcher = {}] ❌ Parse logs for tx {} failed, error = {}",
|
||||
watcher.id, tx.hash, e,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_transfer_from_raw_log(
|
||||
raw_log: &str,
|
||||
watch_for_transfer_recipient_accounts: &Vec<AccountId>,
|
||||
) -> anyhow::Result<Vec<TransferEvent>> {
|
||||
let log_value: Value = serde_json::from_str(raw_log)?;
|
||||
|
||||
let mut transfers: Vec<TransferEvent> = vec![];
|
||||
|
||||
let default_value = vec![];
|
||||
let log_entries: &Vec<Value> = log_value.as_array().unwrap_or(&default_value);
|
||||
|
||||
trace!("contains {} log entries", log_entries.len());
|
||||
|
||||
for log_entry in log_entries {
|
||||
let message_index = log_entry["msg_index"].as_u64().unwrap_or_default();
|
||||
|
||||
trace!("entry - {message_index}...");
|
||||
|
||||
if let Some(events) = log_entry["events"].as_array() {
|
||||
for transfer_event in events.iter().filter(|e| e["type"] == "transfer") {
|
||||
if let Some(attrs) = transfer_event["attributes"].as_array() {
|
||||
let mut recipient: Option<AccountId> = None;
|
||||
let mut sender: Option<AccountId> = None;
|
||||
let mut amount: Option<String> = None;
|
||||
|
||||
for attr in attrs {
|
||||
match attr["key"].as_str() {
|
||||
Some("recipient") => {
|
||||
recipient =
|
||||
AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok();
|
||||
}
|
||||
Some("sender") => {
|
||||
sender =
|
||||
AccountId::from_str(attr["value"].as_str().unwrap_or("")).ok();
|
||||
}
|
||||
Some("amount") => {
|
||||
amount = Some(attr["value"].as_str().unwrap_or("").to_string())
|
||||
}
|
||||
// TODO: parse message index
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
if let (Some(recipient), Some(sender), Some(amount)) =
|
||||
(recipient, sender, amount)
|
||||
{
|
||||
if watch_for_transfer_recipient_accounts.contains(&recipient) {
|
||||
transfers.push(TransferEvent {
|
||||
recipient,
|
||||
sender,
|
||||
amount,
|
||||
message_index,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(transfers)
|
||||
}
|
||||
|
||||
fn parse_unym_amount(amount: &str) -> anyhow::Result<f64> {
|
||||
let amount = amount.trim_end_matches("unym");
|
||||
let parsed: f64 = amount.parse()?;
|
||||
Ok(parsed / 1_000_000.0)
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
use crate::db::{
|
||||
models::{CoingeckoPriceResponse, PriceRecord},
|
||||
queries::price::insert_nym_prices,
|
||||
};
|
||||
use core::str;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
|
||||
use crate::db::DbPool;
|
||||
|
||||
const REFRESH_DELAY: Duration = Duration::from_secs(300);
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
|
||||
const COINGECKO_API_URL: &str =
|
||||
"https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,btc";
|
||||
|
||||
pub(crate) async fn run_price_scraper(db_pool: &DbPool) -> JoinHandle<()> {
|
||||
loop {
|
||||
tracing::info!("Running in a loop 🏃");
|
||||
if let Err(e) = get_coingecko_prices(db_pool).await {
|
||||
tracing::error!("❌ Failed to get CoinGecko prices: {e}");
|
||||
tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs());
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
} else {
|
||||
tracing::info!("✅ Successfully fetched CoinGecko prices");
|
||||
tokio::time::sleep(REFRESH_DELAY).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_coingecko_prices(pool: &DbPool) -> anyhow::Result<()> {
|
||||
tracing::info!("💰 Fetching CoinGecko prices from {}", COINGECKO_API_URL);
|
||||
|
||||
let response = reqwest::get(COINGECKO_API_URL)
|
||||
.await?
|
||||
.json::<CoingeckoPriceResponse>()
|
||||
.await;
|
||||
|
||||
tracing::info!("Got response {:?}", response);
|
||||
match response {
|
||||
Ok(resp) => {
|
||||
let price_record = PriceRecord {
|
||||
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
|
||||
nym: resp.nym,
|
||||
};
|
||||
|
||||
insert_nym_prices(pool, price_record).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
//tracing::info!("💰 CoinGecko price response: {:?}", response);
|
||||
tracing::error!("Error sending request: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user