Chore/more payment watcher debug endpoints (#5608)

* add new endpoints for health and build information

* fixed timestamp serialisation in api responses

* status routes for price scraper

* state for processing bank msg

* clippy
This commit is contained in:
Jędrzej Stuczyński
2025-03-12 12:12:28 +00:00
committed by GitHub
parent f62d8813e0
commit ce124a29a7
15 changed files with 607 additions and 126 deletions
Generated
+1 -3
View File
@@ -7234,7 +7234,7 @@ dependencies = [
[[package]]
name = "nyx-chain-watcher"
version = "0.1.13"
version = "0.1.14"
dependencies = [
"anyhow",
"async-trait",
@@ -7244,14 +7244,12 @@ dependencies = [
"nym-bin-common",
"nym-config",
"nym-network-defaults",
"nym-node-requests",
"nym-task",
"nym-validator-client",
"nyxd-scraper",
"reqwest 0.12.4",
"schemars",
"serde",
"serde_json",
"sqlx",
"thiserror 2.0.12",
"time",
@@ -62,6 +62,7 @@ pub use cw3;
pub use cw4;
pub use cw_controllers;
pub use fee::{gas_price::GasPrice, GasAdjustable, GasAdjustment};
pub use prost::Name;
pub use tendermint_rpc::endpoint::block::Response as BlockResponse;
pub use tendermint_rpc::{
endpoint::{tx::Response as TxResponse, validators::Response as ValidatorResponse},
@@ -182,9 +182,11 @@ impl BlockProcessor {
// the ones concerned with individual messages
for (index, msg) in block_tx.tx.body.messages.iter().enumerate() {
for msg_module in &mut self.msg_modules {
msg_module
.handle_msg(index, msg, &block_tx, &mut tx)
.await?
if msg.type_url == msg_module.type_url() {
msg_module
.handle_msg(index, msg, &block_tx, &mut tx)
.await?
}
}
}
}
+9
View File
@@ -83,6 +83,15 @@ pub enum ScraperError {
source: cosmrs::ErrorReport,
},
#[error("could not parse msg in tx {hash} at index {index} into {type_url}: {source}")]
MsgParseFailure {
hash: Hash,
index: usize,
type_url: String,
#[source]
source: cosmrs::ErrorReport,
},
#[error("received an invalid chain subscription event of kind {kind} while we were waiting for new block data (query: '{query}')")]
InvalidSubscriptionEvent { query: String, kind: String },
@@ -9,6 +9,8 @@ use cosmrs::Any;
#[async_trait]
pub trait MsgModule {
fn type_url(&self) -> String;
async fn handle_msg(
&mut self,
index: usize,
+2 -2
View File
@@ -4784,9 +4784,9 @@ dependencies = [
[[package]]
name = "rs_merkle"
version = "1.4.2"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b241d2e59b74ef9e98d94c78c47623d04c8392abaf82014dfd372a16041128f"
checksum = "bb09b49230ba22e8c676e7b75dfe2887dea8121f18b530ae0ba519ce442d2b21"
dependencies = [
"sha2 0.10.8",
]
+1 -5
View File
@@ -3,7 +3,7 @@
[package]
name = "nyx-chain-watcher"
version = "0.1.13"
version = "0.1.14"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
@@ -23,15 +23,11 @@ nym-config = { path = "../common/config" }
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
nym-network-defaults = { path = "../common/network-defaults" }
nym-task = { path = "../common/task" }
nym-node-requests = { path = "../nym-node/nym-node-requests", features = [
"openapi",
] }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nyxd-scraper = { path = "../common/nyxd-scraper" }
reqwest = { workspace = true, features = ["rustls-tls"] }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "time"] }
thiserror = { workspace = true }
time = { workspace = true }
+93 -62
View File
@@ -3,18 +3,21 @@ use crate::env::vars::{
NYXD_SCRAPER_START_HEIGHT, NYXD_SCRAPER_UNSAFE_NUKE_DB,
NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT,
};
use crate::http::state::BankScraperModuleState;
use async_trait::async_trait;
use nym_validator_client::nyxd::{Any, Coin, CosmosCoin, Hash, Msg, MsgSend, Name};
use nyxd_scraper::{
error::ScraperError, storage::StorageTransaction, NyxdScraper, ParsedTransactionResponse,
PruningOptions, TxModule,
error::ScraperError, storage::StorageTransaction, MsgModule, NyxdScraper,
ParsedTransactionResponse, PruningOptions,
};
use sqlx::SqlitePool;
use std::fs;
use tracing::{error, info, warn};
use tracing::{info, warn};
pub(crate) async fn run_chain_scraper(
config: &crate::config::Config,
db_pool: SqlitePool,
shared_state: BankScraperModuleState,
) -> anyhow::Result<NyxdScraper> {
let websocket_url = std::env::var("NYXD_WS").expect("NYXD_WS not defined");
@@ -58,9 +61,10 @@ pub(crate) async fn run_chain_scraper(
use_best_effort_start_height,
},
})
.with_tx_module(EventScraperModule::new(
.with_msg_module(BankScraperModule::new(
db_pool,
config.payment_watcher_config.clone(),
shared_state,
));
let instance = scraper.build_and_start().await?;
@@ -71,16 +75,22 @@ pub(crate) async fn run_chain_scraper(
Ok(instance)
}
pub struct EventScraperModule {
pub struct BankScraperModule {
db_pool: SqlitePool,
payment_config: PaymentWatchersConfig,
shared_state: BankScraperModuleState,
}
impl EventScraperModule {
pub fn new(db_pool: SqlitePool, payment_config: PaymentWatchersConfig) -> Self {
impl BankScraperModule {
pub fn new(
db_pool: SqlitePool,
payment_config: PaymentWatchersConfig,
shared_state: BankScraperModuleState,
) -> Self {
Self {
db_pool,
payment_config,
shared_state,
}
}
@@ -108,23 +118,47 @@ impl EventScraperModule {
amount,
memo
)
.execute(&self.db_pool)
.await?;
.execute(&self.db_pool)
.await?;
Ok(())
}
}
fn get_unym_coin(&self, coins: &[CosmosCoin]) -> Option<Coin> {
coins
.iter()
.find(|coin| coin.denom.as_ref() == "unym")
.map(|c| c.clone().into())
}
// TODO: ideally this should be done by the scraper itself
fn recover_bank_msg(
&self,
tx_hash: Hash,
index: usize,
msg: &Any,
) -> Result<MsgSend, ScraperError> {
MsgSend::from_any(msg).map_err(|source| ScraperError::MsgParseFailure {
hash: tx_hash,
index,
type_url: self.type_url(),
source,
})
}
}
#[async_trait]
impl TxModule for EventScraperModule {
async fn handle_tx(
impl MsgModule for BankScraperModule {
fn type_url(&self) -> String {
<MsgSend as Msg>::Proto::type_url()
}
async fn handle_msg(
&mut self,
index: usize,
msg: &Any,
tx: &ParsedTransactionResponse,
_: &mut StorageTransaction,
_storage_tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
let events = &tx.tx_result.events;
let height = tx.height.value() as i64;
let tx_hash = tx.hash.to_string();
let memo = tx.tx.body.memo.clone();
// Don't process failed transactions
@@ -132,56 +166,53 @@ impl TxModule for EventScraperModule {
return Ok(());
}
if tx.tx.body.messages.len() > 1 {
error!(
"this transaction has more than 1 message in it - payment information will be lost"
);
}
let msg = self.recover_bank_msg(tx.hash, index, msg)?;
// Process each event
for event in events {
// Only process transfer events
if event.kind == "transfer" {
let mut recipient = None;
let mut sender = None;
let mut amount = None;
// TODO: get message index from event
let message_index = 0;
// Check if any watcher is watching this recipient
let is_watched = self
.payment_config
.is_being_watched(msg.to_address.as_ref());
// Extract transfer event attributes
for attr in &event.attributes {
if let (Ok(key), Ok(value)) = (attr.key_str(), attr.value_str()) {
match key {
"recipient" => recipient = Some(value.to_string()),
"sender" => sender = Some(value.to_string()),
"amount" => amount = Some(value.to_string()),
_ => continue,
}
}
}
self.shared_state
.new_bank_msg(tx, index, &msg, is_watched)
.await;
// If we have all required fields, check if recipient is watched and store
if let (Some(recipient), Some(sender), Some(amount)) = (recipient, sender, amount) {
// Check if any watcher is watching this recipient
let is_watched = self.payment_config.is_being_watched(&recipient);
if is_watched {
let Some(unym_coin) = self.get_unym_coin(&msg.amount) else {
let warn = format!(
"{} sent {:?} instead of unym!",
msg.from_address, msg.amount
);
warn!("{warn}");
self.shared_state
.new_rejection(tx.hash.to_string(), tx.height.value(), index as u32, warn)
.await;
if is_watched {
if let Err(e) = self
.store_transfer_event(
&tx_hash,
height,
message_index,
sender,
recipient,
amount,
Some(memo.clone()),
)
.await
{
warn!("Failed to store transfer event: {}", e);
}
}
}
// we don't want to fail the whole processing - this is not a failure in that sense!
return Ok(());
};
if let Err(err) = self
.store_transfer_event(
&tx.hash.to_string(),
tx.height.value() as i64,
index as i64,
msg.from_address.to_string(),
msg.to_address.to_string(),
unym_coin.to_string(),
Some(memo.clone()),
)
.await
{
warn!("Failed to store transfer event: {err}");
self.shared_state
.new_rejection(
tx.hash.to_string(),
tx.height.value(),
index as u32,
format!("storage failure: {err}"),
)
.await;
}
}
+11 -4
View File
@@ -14,9 +14,10 @@ mod config;
use crate::chain_scraper::run_chain_scraper;
use crate::db::DbPool;
use crate::http::state::PaymentListenerState;
use crate::http::state::{BankScraperModuleState, PaymentListenerState, PriceScraperState};
use crate::payment_listener::PaymentListener;
use crate::{db, http, price_scraper};
use crate::price_scraper::PriceScraper;
use crate::{db, http};
pub(crate) use args::Args;
use nym_task::signal::wait_for_signal;
@@ -145,15 +146,18 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
// construct shared state
let payment_listener_shared_state = PaymentListenerState::new();
let price_scraper_shared_state = PriceScraperState::new();
let bank_scraper_module_shared_state = BankScraperModuleState::new();
// spawn all the tasks
// 1. chain scraper (note: this doesn't really spawn the full scraper on this task, but we don't want to be blocking waiting for its startup)
let scraper_token_handle: JoinHandle<anyhow::Result<CancellationToken>> = tokio::spawn({
let config = config.clone();
let shared_state = bank_scraper_module_shared_state.clone();
async move {
// this only blocks until startup sync is done; it then runs on its own set of tasks
let scraper = run_chain_scraper(&config, scraper_pool).await?;
let scraper = run_chain_scraper(&config, scraper_pool, shared_state).await?;
Ok(scraper.cancel_token())
}
});
@@ -178,12 +182,13 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
}
// 3. price scraper (note, this task never terminates on its own)
let price_scraper = PriceScraper::new(price_scraper_shared_state.clone(), watcher_pool);
{
let token = cancellation_token.clone();
tasks.spawn(async move {
token
.run_until_cancelled(async move {
price_scraper::run_price_scraper(&watcher_pool).await;
price_scraper.run().await;
Ok(())
})
.await
@@ -196,6 +201,8 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
&config,
http_port,
payment_listener_shared_state,
price_scraper_shared_state,
bank_scraper_module_shared_state,
)
.await?;
{
+2 -2
View File
@@ -5,7 +5,7 @@ use sqlx::FromRow;
use time::OffsetDateTime;
use utoipa::ToSchema;
#[derive(Clone, Deserialize, Debug, ToSchema)]
#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct CurrencyPrices {
pub(crate) chf: f32,
pub(crate) usd: f32,
@@ -15,7 +15,7 @@ pub(crate) struct CurrencyPrices {
}
// Struct to hold Coingecko response
#[derive(Clone, Deserialize, Debug, ToSchema)]
#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct CoingeckoPriceResponse {
pub(crate) nym: CurrencyPrices,
}
+133 -3
View File
@@ -2,19 +2,60 @@
// SPDX-License-Identifier: GPL-3.0-only
use crate::http::models::status::{
ActivePaymentWatchersResponse, PaymentListenerFailureDetails, PaymentListenerStatusResponse,
ProcessedPayment, WatcherFailureDetails, WatcherState,
ActivePaymentWatchersResponse, ApiStatus, BankModuleStatusResponse, BankMsgDetails,
BankMsgRejection, HealthResponse, PaymentListenerFailureDetails, PaymentListenerStatusResponse,
PriceScraperLastError, PriceScraperLastSuccess, PriceScraperStatusResponse, ProcessedPayment,
WatcherFailureDetails, WatcherState,
};
use crate::http::state::{
AppState, BankScraperModuleState, PaymentListenerState, PriceScraperState, StatusState,
};
use crate::http::state::{AppState, PaymentListenerState};
use axum::extract::State;
use axum::routing::get;
use axum::{Json, Router};
use nym_bin_common::build_information::BinaryBuildInformationOwned;
use std::ops::Deref;
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/health", get(health))
.route("/build-information", get(build_information))
.route("/active-payment-watchers", get(active_payment_watchers))
.route("/payment-listener", get(payment_listener_status))
.route("/price-scraper", get(price_scraper_status))
.route("/bank-module-scraper", get(bank_module_status))
}
#[utoipa::path(
tag = "Status",
get,
path = "/build-information",
context_path = "/v1/status",
responses(
(status = 200, body = BinaryBuildInformationOwned)
)
)]
async fn build_information(State(state): State<StatusState>) -> Json<BinaryBuildInformationOwned> {
Json(state.build_information.to_owned())
}
#[utoipa::path(
tag = "Status",
get,
path = "/health",
context_path = "/v1/status",
responses(
(status = 200, body = HealthResponse)
)
)]
async fn health(State(state): State<StatusState>) -> Json<HealthResponse> {
let uptime = state.startup_time.elapsed();
let health = HealthResponse {
status: ApiStatus::Up,
uptime: uptime.as_secs(),
};
Json(health)
}
#[utoipa::path(
@@ -96,3 +137,92 @@ pub(crate) async fn payment_listener_status(
.collect(),
})
}
#[utoipa::path(
tag = "Status",
get,
path = "/price-scraper",
context_path = "/v1/status",
responses(
(status = 200, body = PriceScraperStatusResponse)
)
)]
pub(crate) async fn price_scraper_status(
State(state): State<PriceScraperState>,
) -> Json<PriceScraperStatusResponse> {
let guard = state.inner.read().await;
Json(PriceScraperStatusResponse {
last_success: guard
.last_success
.as_ref()
.map(|s| PriceScraperLastSuccess {
timestamp: s.timestamp,
response: s.response.clone(),
}),
last_failure: guard.last_failure.as_ref().map(|f| PriceScraperLastError {
timestamp: f.timestamp,
message: f.message.clone(),
}),
})
}
#[utoipa::path(
tag = "Status",
get,
path = "/bank-module-scraper",
context_path = "/v1/status",
responses(
(status = 200, body = BankModuleStatusResponse)
)
)]
pub(crate) async fn bank_module_status(
State(state): State<BankScraperModuleState>,
) -> Json<BankModuleStatusResponse> {
let guard = state.inner.read().await;
Json(BankModuleStatusResponse {
processed_bank_msgs_since_startup: guard.processed_bank_msgs_since_startup,
processed_bank_msgs_to_watched_addresses_since_startup: guard
.processed_bank_msgs_to_watched_addresses_since_startup,
rejected_bank_msgs_to_watched_addresses_since_startup: guard
.rejected_bank_msgs_to_watched_addresses_since_startup,
last_seen_bank_msgs: guard
.last_seen_bank_msgs
.iter()
.map(|msg| BankMsgDetails {
processed_at: msg.processed_at,
tx_hash: msg.tx_hash.clone(),
height: msg.height,
index: msg.index,
from: msg.from.clone(),
to: msg.to.clone(),
amount: msg.amount.clone(),
memo: msg.memo.clone(),
})
.collect(),
last_seen_watched_bank_msgs: guard
.last_seen_watched_bank_msgs
.iter()
.map(|msg| BankMsgDetails {
processed_at: msg.processed_at,
tx_hash: msg.tx_hash.clone(),
height: msg.height,
index: msg.index,
from: msg.from.clone(),
to: msg.to.clone(),
amount: msg.amount.clone(),
memo: msg.memo.clone(),
})
.collect(),
last_rejected_watched_bank_msgs: guard
.last_rejected_watched_bank_msgs
.iter()
.map(|r| BankMsgRejection {
rejected_at: r.rejected_at,
tx_hash: r.tx_hash.clone(),
height: r.height,
index: r.index,
error: r.error.clone(),
})
.collect(),
})
}
+70
View File
@@ -5,6 +5,7 @@
pub mod status {
use crate::config::payments_watcher::PaymentWatcherConfig;
use crate::db::models::CoingeckoPriceResponse;
use crate::models::openapi_schema;
use nym_validator_client::nyxd::Coin;
use serde::{Deserialize, Serialize};
@@ -12,6 +13,18 @@ pub mod status {
use time::OffsetDateTime;
use utoipa::ToSchema;
#[derive(Clone, Copy, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
#[serde(rename_all = "lowercase")]
pub enum ApiStatus {
Up,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
pub struct HealthResponse {
pub status: ApiStatus,
pub uptime: u64,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ActivePaymentWatchersResponse {
pub watchers: Vec<PaymentWatcher>,
@@ -59,6 +72,7 @@ pub mod status {
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub(crate) struct ProcessedPayment {
#[serde(with = "time::serde::rfc3339")]
pub processed_at: OffsetDateTime,
pub tx_hash: String,
@@ -75,6 +89,7 @@ pub mod status {
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub(crate) struct PaymentListenerFailureDetails {
#[serde(with = "time::serde::rfc3339")]
pub(crate) timestamp: OffsetDateTime,
pub(crate) error: String,
}
@@ -86,7 +101,62 @@ pub mod status {
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub(crate) struct WatcherFailureDetails {
#[serde(with = "time::serde::rfc3339")]
pub(crate) timestamp: OffsetDateTime,
pub(crate) error: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub(crate) struct PriceScraperStatusResponse {
pub(crate) last_success: Option<PriceScraperLastSuccess>,
pub(crate) last_failure: Option<PriceScraperLastError>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub(crate) struct PriceScraperLastSuccess {
#[serde(with = "time::serde::rfc3339")]
pub(crate) timestamp: OffsetDateTime,
pub(crate) response: CoingeckoPriceResponse,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub(crate) struct PriceScraperLastError {
#[serde(with = "time::serde::rfc3339")]
pub(crate) timestamp: OffsetDateTime,
pub(crate) message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub(crate) struct BankModuleStatusResponse {
pub(crate) processed_bank_msgs_since_startup: usize,
pub(crate) processed_bank_msgs_to_watched_addresses_since_startup: usize,
pub(crate) rejected_bank_msgs_to_watched_addresses_since_startup: usize,
pub(crate) last_seen_bank_msgs: Vec<BankMsgDetails>,
pub(crate) last_seen_watched_bank_msgs: Vec<BankMsgDetails>,
pub(crate) last_rejected_watched_bank_msgs: Vec<BankMsgRejection>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub(crate) struct BankMsgDetails {
#[serde(with = "time::serde::rfc3339")]
pub(crate) processed_at: OffsetDateTime,
pub(crate) tx_hash: String,
pub(crate) height: u64,
pub(crate) index: u32,
pub(crate) from: String,
pub(crate) to: String,
pub(crate) amount: Vec<String>,
pub(crate) memo: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub(crate) struct BankMsgRejection {
#[serde(with = "time::serde::rfc3339")]
pub(crate) rejected_at: OffsetDateTime,
pub(crate) tx_hash: String,
pub(crate) height: u64,
pub(crate) index: u32,
pub(crate) error: String,
}
}
+5 -1
View File
@@ -4,7 +4,7 @@ use tokio::net::TcpListener;
use tokio_util::sync::WaitForCancellationFutureOwned;
use crate::config::Config;
use crate::http::state::PaymentListenerState;
use crate::http::state::{BankScraperModuleState, PaymentListenerState, PriceScraperState};
use crate::{
db::DbPool,
http::{api::RouterBuilder, state::AppState},
@@ -15,6 +15,8 @@ pub(crate) async fn build_http_api(
config: &Config,
http_port: u16,
payment_listener_state: PaymentListenerState,
price_scraper_state: PriceScraperState,
bank_scraper_module_shared_state: BankScraperModuleState,
) -> anyhow::Result<HttpServer> {
let router_builder = RouterBuilder::with_default_routes();
@@ -27,6 +29,8 @@ pub(crate) async fn build_http_api(
.map(Into::into)
.collect(),
payment_listener_state,
price_scraper_state,
bank_scraper_module_shared_state,
);
let router = router_builder.with_state(state);
+216 -7
View File
@@ -1,20 +1,29 @@
use crate::db::models::CoingeckoPriceResponse;
use crate::db::DbPool;
use crate::helpers::RingBuffer;
use crate::http::models::status::PaymentWatcher;
use crate::models::WebhookPayload;
use axum::extract::FromRef;
use nym_validator_client::nyxd::Coin;
use nym_bin_common::bin_info;
use nym_bin_common::build_information::BinaryBuildInformation;
use nym_validator_client::nyxd::{Coin, MsgSend};
use nyxd_scraper::ParsedTransactionResponse;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::RwLock;
use tokio::time::Instant;
#[derive(Debug, Clone)]
pub(crate) struct AppState {
db_pool: DbPool,
pub(crate) registered_payment_watchers: Arc<Vec<PaymentWatcher>>,
pub(crate) payment_listener_state: PaymentListenerState,
pub(crate) status_state: StatusState,
pub(crate) price_scraper_state: PriceScraperState,
pub(crate) bank_scraper_module_state: BankScraperModuleState,
}
impl AppState {
@@ -22,11 +31,16 @@ impl AppState {
db_pool: DbPool,
registered_payment_watchers: Vec<PaymentWatcher>,
payment_listener_state: PaymentListenerState,
price_scraper_state: PriceScraperState,
bank_scraper_module_state: BankScraperModuleState,
) -> Self {
Self {
db_pool,
registered_payment_watchers: Arc::new(registered_payment_watchers),
payment_listener_state,
status_state: Default::default(),
price_scraper_state,
bank_scraper_module_state,
}
}
@@ -43,6 +57,79 @@ impl AppState {
}
}
#[derive(Clone, Debug)]
pub(crate) struct StatusState {
inner: Arc<StatusStateInner>,
}
impl Default for StatusState {
fn default() -> Self {
StatusState {
inner: Arc::new(StatusStateInner {
startup_time: Instant::now(),
build_information: bin_info!(),
}),
}
}
}
impl Deref for StatusState {
type Target = StatusStateInner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug)]
pub(crate) struct StatusStateInner {
pub(crate) startup_time: Instant,
pub(crate) build_information: BinaryBuildInformation,
}
#[derive(Debug, Clone)]
pub(crate) struct PriceScraperState {
pub(crate) inner: Arc<RwLock<PriceScraperStateInner>>,
}
impl PriceScraperState {
pub(crate) fn new() -> Self {
PriceScraperState {
inner: Arc::new(Default::default()),
}
}
pub(crate) async fn new_failure<S: Into<String>>(&self, error: S) {
self.inner.write().await.last_failure = Some(PriceScraperLastError {
timestamp: OffsetDateTime::now_utc(),
message: error.into(),
})
}
pub(crate) async fn new_success(&self, response: CoingeckoPriceResponse) {
self.inner.write().await.last_success = Some(PriceScraperLastSuccess {
timestamp: OffsetDateTime::now_utc(),
response,
})
}
}
#[derive(Debug, Default)]
pub(crate) struct PriceScraperStateInner {
pub(crate) last_success: Option<PriceScraperLastSuccess>,
pub(crate) last_failure: Option<PriceScraperLastError>,
}
#[derive(Debug)]
pub(crate) struct PriceScraperLastSuccess {
pub(crate) timestamp: OffsetDateTime,
pub(crate) response: CoingeckoPriceResponse,
}
#[derive(Debug)]
pub(crate) struct PriceScraperLastError {
pub(crate) timestamp: OffsetDateTime,
pub(crate) message: String,
}
#[derive(Debug, Clone)]
pub(crate) struct PaymentListenerState {
pub(crate) inner: Arc<RwLock<PaymentListenerStateInner>>,
@@ -99,12 +186,6 @@ impl PaymentListenerState {
}
}
impl FromRef<AppState> for PaymentListenerState {
fn from_ref(input: &AppState) -> Self {
input.payment_listener_state.clone()
}
}
#[derive(Debug)]
pub(crate) struct PaymentListenerStateInner {
pub(crate) last_checked: OffsetDateTime,
@@ -181,3 +262,131 @@ impl WatcherFailureDetails {
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct BankScraperModuleState {
pub(crate) inner: Arc<RwLock<BankScraperModuleStateInner>>,
}
impl BankScraperModuleState {
// TODO: make those configurable
const MAX_LAST_BANK_MSGS: usize = 20;
const MAX_LAST_WATCHED_BANK_MSGS: usize = 10;
const MAX_LAST_REJECTED_BANK_MSGS: usize = 25;
pub(crate) fn new() -> Self {
BankScraperModuleState {
inner: Arc::new(RwLock::new(BankScraperModuleStateInner {
processed_bank_msgs_since_startup: 0,
processed_bank_msgs_to_watched_addresses_since_startup: 0,
rejected_bank_msgs_to_watched_addresses_since_startup: 0,
last_seen_bank_msgs: RingBuffer::new(Self::MAX_LAST_BANK_MSGS),
last_seen_watched_bank_msgs: RingBuffer::new(Self::MAX_LAST_WATCHED_BANK_MSGS),
last_rejected_watched_bank_msgs: RingBuffer::new(Self::MAX_LAST_REJECTED_BANK_MSGS),
})),
}
}
pub(crate) async fn new_bank_msg(
&self,
tx: &ParsedTransactionResponse,
index: usize,
msg: &MsgSend,
is_watched: bool,
) {
let mut guard = self.inner.write().await;
guard.processed_bank_msgs_since_startup += 1;
let details = BankMsgDetails {
processed_at: OffsetDateTime::now_utc(),
tx_hash: tx.hash.to_string(),
height: tx.height.value(),
index: index as u32,
from: msg.from_address.to_string(),
to: msg.to_address.to_string(),
amount: msg.amount.iter().map(|c| c.to_string()).collect(),
memo: tx.tx.body.memo.clone(),
};
guard.last_seen_bank_msgs.push(details.clone());
if is_watched {
guard.processed_bank_msgs_to_watched_addresses_since_startup += 1;
guard.last_seen_watched_bank_msgs.push(details.clone());
}
}
pub(crate) async fn new_rejection<S: Into<String>>(
&self,
tx_hash: String,
height: u64,
index: u32,
error: S,
) {
self.inner
.write()
.await
.last_rejected_watched_bank_msgs
.push(BankMsgRejection {
rejected_at: OffsetDateTime::now_utc(),
tx_hash,
height,
index,
error: error.into(),
})
}
}
#[derive(Debug)]
pub(crate) struct BankScraperModuleStateInner {
pub(crate) processed_bank_msgs_since_startup: usize,
pub(crate) processed_bank_msgs_to_watched_addresses_since_startup: usize,
pub(crate) rejected_bank_msgs_to_watched_addresses_since_startup: usize,
pub(crate) last_seen_bank_msgs: RingBuffer<BankMsgDetails>,
pub(crate) last_seen_watched_bank_msgs: RingBuffer<BankMsgDetails>,
pub(crate) last_rejected_watched_bank_msgs: RingBuffer<BankMsgRejection>,
}
#[derive(Debug, Clone)]
pub(crate) struct BankMsgDetails {
pub(crate) processed_at: OffsetDateTime,
pub(crate) tx_hash: String,
pub(crate) height: u64,
pub(crate) index: u32,
pub(crate) from: String,
pub(crate) to: String,
pub(crate) amount: Vec<String>,
pub(crate) memo: String,
}
#[derive(Debug)]
pub(crate) struct BankMsgRejection {
pub(crate) rejected_at: OffsetDateTime,
pub(crate) tx_hash: String,
pub(crate) height: u64,
pub(crate) index: u32,
pub(crate) error: String,
}
impl FromRef<AppState> for PaymentListenerState {
fn from_ref(input: &AppState) -> Self {
input.payment_listener_state.clone()
}
}
impl FromRef<AppState> for StatusState {
fn from_ref(input: &AppState) -> Self {
input.status_state.clone()
}
}
impl FromRef<AppState> for PriceScraperState {
fn from_ref(input: &AppState) -> Self {
input.price_scraper_state.clone()
}
}
impl FromRef<AppState> for BankScraperModuleState {
fn from_ref(input: &AppState) -> Self {
input.bank_scraper_module_state.clone()
}
}
+56 -34
View File
@@ -6,49 +6,71 @@ use core::str;
use tokio::time::Duration;
use crate::db::DbPool;
use crate::http::state::PriceScraperState;
const REFRESH_DELAY: Duration = Duration::from_secs(300);
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
const COINGECKO_API_URL: &str =
"https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,gbp,btc";
pub(crate) async fn run_price_scraper(db_pool: &DbPool) {
loop {
tracing::info!("Running in a loop 🏃");
if let Err(e) = get_coingecko_prices(db_pool).await {
tracing::error!("❌ Failed to get CoinGecko prices: {e}");
tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs());
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!("✅ Successfully fetched CoinGecko prices");
tokio::time::sleep(REFRESH_DELAY).await;
}
}
pub(crate) struct PriceScraper {
shared_state: PriceScraperState,
db_pool: DbPool,
}
async fn get_coingecko_prices(pool: &DbPool) -> anyhow::Result<()> {
tracing::info!("💰 Fetching CoinGecko prices from {}", COINGECKO_API_URL);
let response = reqwest::get(COINGECKO_API_URL)
.await?
.json::<CoingeckoPriceResponse>()
.await;
tracing::info!("Got response {:?}", response);
match response {
Ok(resp) => {
let price_record = PriceRecord {
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
nym: resp.nym,
};
insert_nym_prices(pool, price_record).await?;
}
Err(e) => {
//tracing::info!("💰 CoinGecko price response: {:?}", response);
tracing::error!("Error sending request: {}", e);
impl PriceScraper {
pub(crate) fn new(shared_state: PriceScraperState, db_pool: DbPool) -> Self {
PriceScraper {
shared_state,
db_pool,
}
}
Ok(())
async fn get_coingecko_prices(&self) -> anyhow::Result<CoingeckoPriceResponse> {
tracing::info!("💰 Fetching CoinGecko prices from {COINGECKO_API_URL}");
let response = reqwest::get(COINGECKO_API_URL)
.await?
.json::<CoingeckoPriceResponse>()
.await;
tracing::info!("Got response {:?}", response);
match response {
Ok(resp) => {
let price_record = PriceRecord {
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
nym: resp.nym.clone(),
};
insert_nym_prices(&self.db_pool, price_record).await?;
Ok(resp)
}
Err(err) => {
//tracing::info!("💰 CoinGecko price response: {:?}", response);
tracing::error!("Error sending request: {err}");
Err(err.into())
}
}
}
pub(crate) async fn run(&self) {
loop {
tracing::info!("Running in a loop 🏃");
match self.get_coingecko_prices().await {
Ok(coingecko_price_response) => {
self.shared_state
.new_success(coingecko_price_response)
.await;
tracing::info!("✅ Successfully fetched CoinGecko prices");
tokio::time::sleep(REFRESH_DELAY).await;
}
Err(err) => {
tracing::error!("❌ Failed to get CoinGecko prices: {err}");
tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs());
self.shared_state.new_failure(err.to_string()).await;
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
}
}
}
}
}