Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 73bed4e742 | |||
| 0af371e844 | |||
| 23755e462e | |||
| e52d399fc6 | |||
| 900c44da59 |
Generated
+1
-3
@@ -7233,7 +7233,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "nyx-chain-watcher"
|
||||
version = "0.1.13"
|
||||
version = "0.1.14"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -7243,14 +7243,12 @@ dependencies = [
|
||||
"nym-bin-common",
|
||||
"nym-config",
|
||||
"nym-network-defaults",
|
||||
"nym-node-requests",
|
||||
"nym-task",
|
||||
"nym-validator-client",
|
||||
"nyxd-scraper",
|
||||
"reqwest 0.12.4",
|
||||
"schemars",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
|
||||
@@ -62,6 +62,7 @@ pub use cw3;
|
||||
pub use cw4;
|
||||
pub use cw_controllers;
|
||||
pub use fee::{gas_price::GasPrice, GasAdjustable, GasAdjustment};
|
||||
pub use prost::Name;
|
||||
pub use tendermint_rpc::endpoint::block::Response as BlockResponse;
|
||||
pub use tendermint_rpc::{
|
||||
endpoint::{tx::Response as TxResponse, validators::Response as ValidatorResponse},
|
||||
|
||||
@@ -182,9 +182,11 @@ impl BlockProcessor {
|
||||
// the ones concerned with individual messages
|
||||
for (index, msg) in block_tx.tx.body.messages.iter().enumerate() {
|
||||
for msg_module in &mut self.msg_modules {
|
||||
msg_module
|
||||
.handle_msg(index, msg, &block_tx, &mut tx)
|
||||
.await?
|
||||
if msg.type_url == msg_module.type_url() {
|
||||
msg_module
|
||||
.handle_msg(index, msg, &block_tx, &mut tx)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,6 +83,15 @@ pub enum ScraperError {
|
||||
source: cosmrs::ErrorReport,
|
||||
},
|
||||
|
||||
#[error("could not parse msg in tx {hash} at index {index} into {type_url}: {source}")]
|
||||
MsgParseFailure {
|
||||
hash: Hash,
|
||||
index: usize,
|
||||
type_url: String,
|
||||
#[source]
|
||||
source: cosmrs::ErrorReport,
|
||||
},
|
||||
|
||||
#[error("received an invalid chain subscription event of kind {kind} while we were waiting for new block data (query: '{query}')")]
|
||||
InvalidSubscriptionEvent { query: String, kind: String },
|
||||
|
||||
|
||||
@@ -9,6 +9,8 @@ use cosmrs::Any;
|
||||
|
||||
#[async_trait]
|
||||
pub trait MsgModule {
|
||||
fn type_url(&self) -> String;
|
||||
|
||||
async fn handle_msg(
|
||||
&mut self,
|
||||
index: usize,
|
||||
|
||||
Generated
+38
@@ -1185,6 +1185,7 @@ name = "nym-pemstore"
|
||||
version = "0.3.0"
|
||||
dependencies = [
|
||||
"pem",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1251,6 +1252,12 @@ dependencies = [
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-lite"
|
||||
version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
|
||||
|
||||
[[package]]
|
||||
name = "pkcs8"
|
||||
version = "0.9.0"
|
||||
@@ -1840,6 +1847,37 @@ dependencies = [
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.41"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
|
||||
dependencies = [
|
||||
"pin-project-lite",
|
||||
"tracing-attributes",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.28"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.98",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.33"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "typenum"
|
||||
version = "1.18.0"
|
||||
|
||||
Generated
+4
-4
@@ -4783,9 +4783,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rs_merkle"
|
||||
version = "1.4.2"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b241d2e59b74ef9e98d94c78c47623d04c8392abaf82014dfd372a16041128f"
|
||||
checksum = "bb09b49230ba22e8c676e7b75dfe2887dea8121f18b530ae0ba519ce442d2b21"
|
||||
dependencies = [
|
||||
"sha2 0.10.8",
|
||||
]
|
||||
@@ -6114,9 +6114,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.43.0"
|
||||
version = "1.44.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
|
||||
checksum = "9975ea0f48b5aa3972bf2d888c238182458437cc2a19374b81b25cdf1023fb3a"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
[package]
|
||||
name = "nyx-chain-watcher"
|
||||
version = "0.1.13"
|
||||
version = "0.1.14"
|
||||
authors.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
@@ -23,15 +23,11 @@ nym-config = { path = "../common/config" }
|
||||
nym-bin-common = { path = "../common/bin-common", features = ["output_format"] }
|
||||
nym-network-defaults = { path = "../common/network-defaults" }
|
||||
nym-task = { path = "../common/task" }
|
||||
nym-node-requests = { path = "../nym-node/nym-node-requests", features = [
|
||||
"openapi",
|
||||
] }
|
||||
nym-validator-client = { path = "../common/client-libs/validator-client" }
|
||||
nyxd-scraper = { path = "../common/nyxd-scraper" }
|
||||
reqwest = { workspace = true, features = ["rustls-tls"] }
|
||||
schemars = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "time"] }
|
||||
thiserror = { workspace = true }
|
||||
time = { workspace = true }
|
||||
|
||||
@@ -3,18 +3,21 @@ use crate::env::vars::{
|
||||
NYXD_SCRAPER_START_HEIGHT, NYXD_SCRAPER_UNSAFE_NUKE_DB,
|
||||
NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT,
|
||||
};
|
||||
use crate::http::state::BankScraperModuleState;
|
||||
use async_trait::async_trait;
|
||||
use nym_validator_client::nyxd::{Any, Coin, CosmosCoin, Hash, Msg, MsgSend, Name};
|
||||
use nyxd_scraper::{
|
||||
error::ScraperError, storage::StorageTransaction, NyxdScraper, ParsedTransactionResponse,
|
||||
PruningOptions, TxModule,
|
||||
error::ScraperError, storage::StorageTransaction, MsgModule, NyxdScraper,
|
||||
ParsedTransactionResponse, PruningOptions,
|
||||
};
|
||||
use sqlx::SqlitePool;
|
||||
use std::fs;
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{info, warn};
|
||||
|
||||
pub(crate) async fn run_chain_scraper(
|
||||
config: &crate::config::Config,
|
||||
db_pool: SqlitePool,
|
||||
shared_state: BankScraperModuleState,
|
||||
) -> anyhow::Result<NyxdScraper> {
|
||||
let websocket_url = std::env::var("NYXD_WS").expect("NYXD_WS not defined");
|
||||
|
||||
@@ -58,9 +61,10 @@ pub(crate) async fn run_chain_scraper(
|
||||
use_best_effort_start_height,
|
||||
},
|
||||
})
|
||||
.with_tx_module(EventScraperModule::new(
|
||||
.with_msg_module(BankScraperModule::new(
|
||||
db_pool,
|
||||
config.payment_watcher_config.clone(),
|
||||
shared_state,
|
||||
));
|
||||
|
||||
let instance = scraper.build_and_start().await?;
|
||||
@@ -71,16 +75,22 @@ pub(crate) async fn run_chain_scraper(
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
pub struct EventScraperModule {
|
||||
pub struct BankScraperModule {
|
||||
db_pool: SqlitePool,
|
||||
payment_config: PaymentWatchersConfig,
|
||||
shared_state: BankScraperModuleState,
|
||||
}
|
||||
|
||||
impl EventScraperModule {
|
||||
pub fn new(db_pool: SqlitePool, payment_config: PaymentWatchersConfig) -> Self {
|
||||
impl BankScraperModule {
|
||||
pub fn new(
|
||||
db_pool: SqlitePool,
|
||||
payment_config: PaymentWatchersConfig,
|
||||
shared_state: BankScraperModuleState,
|
||||
) -> Self {
|
||||
Self {
|
||||
db_pool,
|
||||
payment_config,
|
||||
shared_state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,23 +118,47 @@ impl EventScraperModule {
|
||||
amount,
|
||||
memo
|
||||
)
|
||||
.execute(&self.db_pool)
|
||||
.await?;
|
||||
.execute(&self.db_pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn get_unym_coin(&self, coins: &[CosmosCoin]) -> Option<Coin> {
|
||||
coins
|
||||
.iter()
|
||||
.find(|coin| coin.denom.as_ref() == "unym")
|
||||
.map(|c| c.clone().into())
|
||||
}
|
||||
|
||||
// TODO: ideally this should be done by the scraper itself
|
||||
fn recover_bank_msg(
|
||||
&self,
|
||||
tx_hash: Hash,
|
||||
index: usize,
|
||||
msg: &Any,
|
||||
) -> Result<MsgSend, ScraperError> {
|
||||
MsgSend::from_any(msg).map_err(|source| ScraperError::MsgParseFailure {
|
||||
hash: tx_hash,
|
||||
index,
|
||||
type_url: self.type_url(),
|
||||
source,
|
||||
})
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
impl TxModule for EventScraperModule {
|
||||
async fn handle_tx(
|
||||
impl MsgModule for BankScraperModule {
|
||||
fn type_url(&self) -> String {
|
||||
<MsgSend as Msg>::Proto::type_url()
|
||||
}
|
||||
|
||||
async fn handle_msg(
|
||||
&mut self,
|
||||
index: usize,
|
||||
msg: &Any,
|
||||
tx: &ParsedTransactionResponse,
|
||||
_: &mut StorageTransaction,
|
||||
_storage_tx: &mut StorageTransaction,
|
||||
) -> Result<(), ScraperError> {
|
||||
let events = &tx.tx_result.events;
|
||||
let height = tx.height.value() as i64;
|
||||
let tx_hash = tx.hash.to_string();
|
||||
let memo = tx.tx.body.memo.clone();
|
||||
|
||||
// Don't process failed transactions
|
||||
@@ -132,56 +166,53 @@ impl TxModule for EventScraperModule {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if tx.tx.body.messages.len() > 1 {
|
||||
error!(
|
||||
"this transaction has more than 1 message in it - payment information will be lost"
|
||||
);
|
||||
}
|
||||
let msg = self.recover_bank_msg(tx.hash, index, msg)?;
|
||||
|
||||
// Process each event
|
||||
for event in events {
|
||||
// Only process transfer events
|
||||
if event.kind == "transfer" {
|
||||
let mut recipient = None;
|
||||
let mut sender = None;
|
||||
let mut amount = None;
|
||||
// TODO: get message index from event
|
||||
let message_index = 0;
|
||||
// Check if any watcher is watching this recipient
|
||||
let is_watched = self
|
||||
.payment_config
|
||||
.is_being_watched(msg.to_address.as_ref());
|
||||
|
||||
// Extract transfer event attributes
|
||||
for attr in &event.attributes {
|
||||
if let (Ok(key), Ok(value)) = (attr.key_str(), attr.value_str()) {
|
||||
match key {
|
||||
"recipient" => recipient = Some(value.to_string()),
|
||||
"sender" => sender = Some(value.to_string()),
|
||||
"amount" => amount = Some(value.to_string()),
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
}
|
||||
self.shared_state
|
||||
.new_bank_msg(tx, index, &msg, is_watched)
|
||||
.await;
|
||||
|
||||
// If we have all required fields, check if recipient is watched and store
|
||||
if let (Some(recipient), Some(sender), Some(amount)) = (recipient, sender, amount) {
|
||||
// Check if any watcher is watching this recipient
|
||||
let is_watched = self.payment_config.is_being_watched(&recipient);
|
||||
if is_watched {
|
||||
let Some(unym_coin) = self.get_unym_coin(&msg.amount) else {
|
||||
let warn = format!(
|
||||
"{} sent {:?} instead of unym!",
|
||||
msg.from_address, msg.amount
|
||||
);
|
||||
warn!("{warn}");
|
||||
self.shared_state
|
||||
.new_rejection(tx.hash.to_string(), tx.height.value(), index as u32, warn)
|
||||
.await;
|
||||
|
||||
if is_watched {
|
||||
if let Err(e) = self
|
||||
.store_transfer_event(
|
||||
&tx_hash,
|
||||
height,
|
||||
message_index,
|
||||
sender,
|
||||
recipient,
|
||||
amount,
|
||||
Some(memo.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("Failed to store transfer event: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// we don't want to fail the whole processing - this is not a failure in that sense!
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if let Err(err) = self
|
||||
.store_transfer_event(
|
||||
&tx.hash.to_string(),
|
||||
tx.height.value() as i64,
|
||||
index as i64,
|
||||
msg.from_address.to_string(),
|
||||
msg.to_address.to_string(),
|
||||
unym_coin.to_string(),
|
||||
Some(memo.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("Failed to store transfer event: {err}");
|
||||
self.shared_state
|
||||
.new_rejection(
|
||||
tx.hash.to_string(),
|
||||
tx.height.value(),
|
||||
index as u32,
|
||||
format!("storage failure: {err}"),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,9 +14,10 @@ mod config;
|
||||
|
||||
use crate::chain_scraper::run_chain_scraper;
|
||||
use crate::db::DbPool;
|
||||
use crate::http::state::PaymentListenerState;
|
||||
use crate::http::state::{BankScraperModuleState, PaymentListenerState, PriceScraperState};
|
||||
use crate::payment_listener::PaymentListener;
|
||||
use crate::{db, http, price_scraper};
|
||||
use crate::price_scraper::PriceScraper;
|
||||
use crate::{db, http};
|
||||
pub(crate) use args::Args;
|
||||
use nym_task::signal::wait_for_signal;
|
||||
|
||||
@@ -145,15 +146,18 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
|
||||
|
||||
// construct shared state
|
||||
let payment_listener_shared_state = PaymentListenerState::new();
|
||||
let price_scraper_shared_state = PriceScraperState::new();
|
||||
let bank_scraper_module_shared_state = BankScraperModuleState::new();
|
||||
|
||||
// spawn all the tasks
|
||||
|
||||
// 1. chain scraper (note: this doesn't really spawn the full scraper on this task, but we don't want to be blocking waiting for its startup)
|
||||
let scraper_token_handle: JoinHandle<anyhow::Result<CancellationToken>> = tokio::spawn({
|
||||
let config = config.clone();
|
||||
let shared_state = bank_scraper_module_shared_state.clone();
|
||||
async move {
|
||||
// this only blocks until startup sync is done; it then runs on its own set of tasks
|
||||
let scraper = run_chain_scraper(&config, scraper_pool).await?;
|
||||
let scraper = run_chain_scraper(&config, scraper_pool, shared_state).await?;
|
||||
Ok(scraper.cancel_token())
|
||||
}
|
||||
});
|
||||
@@ -178,12 +182,13 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
|
||||
}
|
||||
|
||||
// 3. price scraper (note, this task never terminates on its own)
|
||||
let price_scraper = PriceScraper::new(price_scraper_shared_state.clone(), watcher_pool);
|
||||
{
|
||||
let token = cancellation_token.clone();
|
||||
tasks.spawn(async move {
|
||||
token
|
||||
.run_until_cancelled(async move {
|
||||
price_scraper::run_price_scraper(&watcher_pool).await;
|
||||
price_scraper.run().await;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
@@ -196,6 +201,8 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
|
||||
&config,
|
||||
http_port,
|
||||
payment_listener_shared_state,
|
||||
price_scraper_shared_state,
|
||||
bank_scraper_module_shared_state,
|
||||
)
|
||||
.await?;
|
||||
{
|
||||
|
||||
@@ -5,7 +5,7 @@ use sqlx::FromRow;
|
||||
use time::OffsetDateTime;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
#[derive(Clone, Deserialize, Debug, ToSchema)]
|
||||
#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct CurrencyPrices {
|
||||
pub(crate) chf: f32,
|
||||
pub(crate) usd: f32,
|
||||
@@ -15,7 +15,7 @@ pub(crate) struct CurrencyPrices {
|
||||
}
|
||||
|
||||
// Struct to hold Coingecko response
|
||||
#[derive(Clone, Deserialize, Debug, ToSchema)]
|
||||
#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct CoingeckoPriceResponse {
|
||||
pub(crate) nym: CurrencyPrices,
|
||||
}
|
||||
|
||||
@@ -2,19 +2,60 @@
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use crate::http::models::status::{
|
||||
ActivePaymentWatchersResponse, PaymentListenerFailureDetails, PaymentListenerStatusResponse,
|
||||
ProcessedPayment, WatcherFailureDetails, WatcherState,
|
||||
ActivePaymentWatchersResponse, ApiStatus, BankModuleStatusResponse, BankMsgDetails,
|
||||
BankMsgRejection, HealthResponse, PaymentListenerFailureDetails, PaymentListenerStatusResponse,
|
||||
PriceScraperLastError, PriceScraperLastSuccess, PriceScraperStatusResponse, ProcessedPayment,
|
||||
WatcherFailureDetails, WatcherState,
|
||||
};
|
||||
use crate::http::state::{
|
||||
AppState, BankScraperModuleState, PaymentListenerState, PriceScraperState, StatusState,
|
||||
};
|
||||
use crate::http::state::{AppState, PaymentListenerState};
|
||||
use axum::extract::State;
|
||||
use axum::routing::get;
|
||||
use axum::{Json, Router};
|
||||
use nym_bin_common::build_information::BinaryBuildInformationOwned;
|
||||
use std::ops::Deref;
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/health", get(health))
|
||||
.route("/build-information", get(build_information))
|
||||
.route("/active-payment-watchers", get(active_payment_watchers))
|
||||
.route("/payment-listener", get(payment_listener_status))
|
||||
.route("/price-scraper", get(price_scraper_status))
|
||||
.route("/bank-module-scraper", get(bank_module_status))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Status",
|
||||
get,
|
||||
path = "/build-information",
|
||||
context_path = "/v1/status",
|
||||
responses(
|
||||
(status = 200, body = BinaryBuildInformationOwned)
|
||||
)
|
||||
)]
|
||||
async fn build_information(State(state): State<StatusState>) -> Json<BinaryBuildInformationOwned> {
|
||||
Json(state.build_information.to_owned())
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Status",
|
||||
get,
|
||||
path = "/health",
|
||||
context_path = "/v1/status",
|
||||
responses(
|
||||
(status = 200, body = HealthResponse)
|
||||
)
|
||||
)]
|
||||
async fn health(State(state): State<StatusState>) -> Json<HealthResponse> {
|
||||
let uptime = state.startup_time.elapsed();
|
||||
|
||||
let health = HealthResponse {
|
||||
status: ApiStatus::Up,
|
||||
uptime: uptime.as_secs(),
|
||||
};
|
||||
Json(health)
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
@@ -96,3 +137,92 @@ pub(crate) async fn payment_listener_status(
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Status",
|
||||
get,
|
||||
path = "/price-scraper",
|
||||
context_path = "/v1/status",
|
||||
responses(
|
||||
(status = 200, body = PriceScraperStatusResponse)
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn price_scraper_status(
|
||||
State(state): State<PriceScraperState>,
|
||||
) -> Json<PriceScraperStatusResponse> {
|
||||
let guard = state.inner.read().await;
|
||||
Json(PriceScraperStatusResponse {
|
||||
last_success: guard
|
||||
.last_success
|
||||
.as_ref()
|
||||
.map(|s| PriceScraperLastSuccess {
|
||||
timestamp: s.timestamp,
|
||||
response: s.response.clone(),
|
||||
}),
|
||||
last_failure: guard.last_failure.as_ref().map(|f| PriceScraperLastError {
|
||||
timestamp: f.timestamp,
|
||||
message: f.message.clone(),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Status",
|
||||
get,
|
||||
path = "/bank-module-scraper",
|
||||
context_path = "/v1/status",
|
||||
responses(
|
||||
(status = 200, body = BankModuleStatusResponse)
|
||||
)
|
||||
)]
|
||||
pub(crate) async fn bank_module_status(
|
||||
State(state): State<BankScraperModuleState>,
|
||||
) -> Json<BankModuleStatusResponse> {
|
||||
let guard = state.inner.read().await;
|
||||
Json(BankModuleStatusResponse {
|
||||
processed_bank_msgs_since_startup: guard.processed_bank_msgs_since_startup,
|
||||
processed_bank_msgs_to_watched_addresses_since_startup: guard
|
||||
.processed_bank_msgs_to_watched_addresses_since_startup,
|
||||
rejected_bank_msgs_to_watched_addresses_since_startup: guard
|
||||
.rejected_bank_msgs_to_watched_addresses_since_startup,
|
||||
last_seen_bank_msgs: guard
|
||||
.last_seen_bank_msgs
|
||||
.iter()
|
||||
.map(|msg| BankMsgDetails {
|
||||
processed_at: msg.processed_at,
|
||||
tx_hash: msg.tx_hash.clone(),
|
||||
height: msg.height,
|
||||
index: msg.index,
|
||||
from: msg.from.clone(),
|
||||
to: msg.to.clone(),
|
||||
amount: msg.amount.clone(),
|
||||
memo: msg.memo.clone(),
|
||||
})
|
||||
.collect(),
|
||||
last_seen_watched_bank_msgs: guard
|
||||
.last_seen_watched_bank_msgs
|
||||
.iter()
|
||||
.map(|msg| BankMsgDetails {
|
||||
processed_at: msg.processed_at,
|
||||
tx_hash: msg.tx_hash.clone(),
|
||||
height: msg.height,
|
||||
index: msg.index,
|
||||
from: msg.from.clone(),
|
||||
to: msg.to.clone(),
|
||||
amount: msg.amount.clone(),
|
||||
memo: msg.memo.clone(),
|
||||
})
|
||||
.collect(),
|
||||
last_rejected_watched_bank_msgs: guard
|
||||
.last_rejected_watched_bank_msgs
|
||||
.iter()
|
||||
.map(|r| BankMsgRejection {
|
||||
rejected_at: r.rejected_at,
|
||||
tx_hash: r.tx_hash.clone(),
|
||||
height: r.height,
|
||||
index: r.index,
|
||||
error: r.error.clone(),
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
pub mod status {
|
||||
use crate::config::payments_watcher::PaymentWatcherConfig;
|
||||
use crate::db::models::CoingeckoPriceResponse;
|
||||
use crate::models::openapi_schema;
|
||||
use nym_validator_client::nyxd::Coin;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -12,6 +13,18 @@ pub mod status {
|
||||
use time::OffsetDateTime;
|
||||
use utoipa::ToSchema;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ApiStatus {
|
||||
Up,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)]
|
||||
pub struct HealthResponse {
|
||||
pub status: ApiStatus,
|
||||
pub uptime: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||
pub struct ActivePaymentWatchersResponse {
|
||||
pub watchers: Vec<PaymentWatcher>,
|
||||
@@ -59,6 +72,7 @@ pub mod status {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub(crate) struct ProcessedPayment {
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub processed_at: OffsetDateTime,
|
||||
|
||||
pub tx_hash: String,
|
||||
@@ -75,6 +89,7 @@ pub mod status {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub(crate) struct PaymentListenerFailureDetails {
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub(crate) timestamp: OffsetDateTime,
|
||||
pub(crate) error: String,
|
||||
}
|
||||
@@ -86,7 +101,62 @@ pub mod status {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub(crate) struct WatcherFailureDetails {
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub(crate) timestamp: OffsetDateTime,
|
||||
pub(crate) error: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub(crate) struct PriceScraperStatusResponse {
|
||||
pub(crate) last_success: Option<PriceScraperLastSuccess>,
|
||||
pub(crate) last_failure: Option<PriceScraperLastError>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub(crate) struct PriceScraperLastSuccess {
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub(crate) timestamp: OffsetDateTime,
|
||||
pub(crate) response: CoingeckoPriceResponse,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub(crate) struct PriceScraperLastError {
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub(crate) timestamp: OffsetDateTime,
|
||||
pub(crate) message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub(crate) struct BankModuleStatusResponse {
|
||||
pub(crate) processed_bank_msgs_since_startup: usize,
|
||||
pub(crate) processed_bank_msgs_to_watched_addresses_since_startup: usize,
|
||||
pub(crate) rejected_bank_msgs_to_watched_addresses_since_startup: usize,
|
||||
|
||||
pub(crate) last_seen_bank_msgs: Vec<BankMsgDetails>,
|
||||
pub(crate) last_seen_watched_bank_msgs: Vec<BankMsgDetails>,
|
||||
pub(crate) last_rejected_watched_bank_msgs: Vec<BankMsgRejection>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub(crate) struct BankMsgDetails {
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub(crate) processed_at: OffsetDateTime,
|
||||
pub(crate) tx_hash: String,
|
||||
pub(crate) height: u64,
|
||||
pub(crate) index: u32,
|
||||
pub(crate) from: String,
|
||||
pub(crate) to: String,
|
||||
pub(crate) amount: Vec<String>,
|
||||
pub(crate) memo: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
|
||||
pub(crate) struct BankMsgRejection {
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub(crate) rejected_at: OffsetDateTime,
|
||||
pub(crate) tx_hash: String,
|
||||
pub(crate) height: u64,
|
||||
pub(crate) index: u32,
|
||||
pub(crate) error: String,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use tokio::net::TcpListener;
|
||||
use tokio_util::sync::WaitForCancellationFutureOwned;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::http::state::PaymentListenerState;
|
||||
use crate::http::state::{BankScraperModuleState, PaymentListenerState, PriceScraperState};
|
||||
use crate::{
|
||||
db::DbPool,
|
||||
http::{api::RouterBuilder, state::AppState},
|
||||
@@ -15,6 +15,8 @@ pub(crate) async fn build_http_api(
|
||||
config: &Config,
|
||||
http_port: u16,
|
||||
payment_listener_state: PaymentListenerState,
|
||||
price_scraper_state: PriceScraperState,
|
||||
bank_scraper_module_shared_state: BankScraperModuleState,
|
||||
) -> anyhow::Result<HttpServer> {
|
||||
let router_builder = RouterBuilder::with_default_routes();
|
||||
|
||||
@@ -27,6 +29,8 @@ pub(crate) async fn build_http_api(
|
||||
.map(Into::into)
|
||||
.collect(),
|
||||
payment_listener_state,
|
||||
price_scraper_state,
|
||||
bank_scraper_module_shared_state,
|
||||
);
|
||||
let router = router_builder.with_state(state);
|
||||
|
||||
|
||||
@@ -1,20 +1,29 @@
|
||||
use crate::db::models::CoingeckoPriceResponse;
|
||||
use crate::db::DbPool;
|
||||
use crate::helpers::RingBuffer;
|
||||
use crate::http::models::status::PaymentWatcher;
|
||||
use crate::models::WebhookPayload;
|
||||
use axum::extract::FromRef;
|
||||
use nym_validator_client::nyxd::Coin;
|
||||
use nym_bin_common::bin_info;
|
||||
use nym_bin_common::build_information::BinaryBuildInformation;
|
||||
use nym_validator_client::nyxd::{Coin, MsgSend};
|
||||
use nyxd_scraper::ParsedTransactionResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::Instant;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct AppState {
|
||||
db_pool: DbPool,
|
||||
pub(crate) registered_payment_watchers: Arc<Vec<PaymentWatcher>>,
|
||||
pub(crate) payment_listener_state: PaymentListenerState,
|
||||
pub(crate) status_state: StatusState,
|
||||
pub(crate) price_scraper_state: PriceScraperState,
|
||||
pub(crate) bank_scraper_module_state: BankScraperModuleState,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
@@ -22,11 +31,16 @@ impl AppState {
|
||||
db_pool: DbPool,
|
||||
registered_payment_watchers: Vec<PaymentWatcher>,
|
||||
payment_listener_state: PaymentListenerState,
|
||||
price_scraper_state: PriceScraperState,
|
||||
bank_scraper_module_state: BankScraperModuleState,
|
||||
) -> Self {
|
||||
Self {
|
||||
db_pool,
|
||||
registered_payment_watchers: Arc::new(registered_payment_watchers),
|
||||
payment_listener_state,
|
||||
status_state: Default::default(),
|
||||
price_scraper_state,
|
||||
bank_scraper_module_state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +57,79 @@ impl AppState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct StatusState {
|
||||
inner: Arc<StatusStateInner>,
|
||||
}
|
||||
|
||||
impl Default for StatusState {
|
||||
fn default() -> Self {
|
||||
StatusState {
|
||||
inner: Arc::new(StatusStateInner {
|
||||
startup_time: Instant::now(),
|
||||
build_information: bin_info!(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for StatusState {
|
||||
type Target = StatusStateInner;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct StatusStateInner {
|
||||
pub(crate) startup_time: Instant,
|
||||
pub(crate) build_information: BinaryBuildInformation,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PriceScraperState {
|
||||
pub(crate) inner: Arc<RwLock<PriceScraperStateInner>>,
|
||||
}
|
||||
|
||||
impl PriceScraperState {
|
||||
pub(crate) fn new() -> Self {
|
||||
PriceScraperState {
|
||||
inner: Arc::new(Default::default()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn new_failure<S: Into<String>>(&self, error: S) {
|
||||
self.inner.write().await.last_failure = Some(PriceScraperLastError {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
message: error.into(),
|
||||
})
|
||||
}
|
||||
pub(crate) async fn new_success(&self, response: CoingeckoPriceResponse) {
|
||||
self.inner.write().await.last_success = Some(PriceScraperLastSuccess {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
response,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct PriceScraperStateInner {
|
||||
pub(crate) last_success: Option<PriceScraperLastSuccess>,
|
||||
pub(crate) last_failure: Option<PriceScraperLastError>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct PriceScraperLastSuccess {
|
||||
pub(crate) timestamp: OffsetDateTime,
|
||||
pub(crate) response: CoingeckoPriceResponse,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct PriceScraperLastError {
|
||||
pub(crate) timestamp: OffsetDateTime,
|
||||
pub(crate) message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PaymentListenerState {
|
||||
pub(crate) inner: Arc<RwLock<PaymentListenerStateInner>>,
|
||||
@@ -99,12 +186,6 @@ impl PaymentListenerState {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromRef<AppState> for PaymentListenerState {
|
||||
fn from_ref(input: &AppState) -> Self {
|
||||
input.payment_listener_state.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct PaymentListenerStateInner {
|
||||
pub(crate) last_checked: OffsetDateTime,
|
||||
@@ -181,3 +262,131 @@ impl WatcherFailureDetails {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct BankScraperModuleState {
|
||||
pub(crate) inner: Arc<RwLock<BankScraperModuleStateInner>>,
|
||||
}
|
||||
|
||||
impl BankScraperModuleState {
|
||||
// TODO: make those configurable
|
||||
const MAX_LAST_BANK_MSGS: usize = 20;
|
||||
const MAX_LAST_WATCHED_BANK_MSGS: usize = 10;
|
||||
const MAX_LAST_REJECTED_BANK_MSGS: usize = 25;
|
||||
|
||||
pub(crate) fn new() -> Self {
|
||||
BankScraperModuleState {
|
||||
inner: Arc::new(RwLock::new(BankScraperModuleStateInner {
|
||||
processed_bank_msgs_since_startup: 0,
|
||||
processed_bank_msgs_to_watched_addresses_since_startup: 0,
|
||||
rejected_bank_msgs_to_watched_addresses_since_startup: 0,
|
||||
last_seen_bank_msgs: RingBuffer::new(Self::MAX_LAST_BANK_MSGS),
|
||||
last_seen_watched_bank_msgs: RingBuffer::new(Self::MAX_LAST_WATCHED_BANK_MSGS),
|
||||
last_rejected_watched_bank_msgs: RingBuffer::new(Self::MAX_LAST_REJECTED_BANK_MSGS),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn new_bank_msg(
|
||||
&self,
|
||||
tx: &ParsedTransactionResponse,
|
||||
index: usize,
|
||||
msg: &MsgSend,
|
||||
is_watched: bool,
|
||||
) {
|
||||
let mut guard = self.inner.write().await;
|
||||
guard.processed_bank_msgs_since_startup += 1;
|
||||
|
||||
let details = BankMsgDetails {
|
||||
processed_at: OffsetDateTime::now_utc(),
|
||||
tx_hash: tx.hash.to_string(),
|
||||
height: tx.height.value(),
|
||||
index: index as u32,
|
||||
from: msg.from_address.to_string(),
|
||||
to: msg.to_address.to_string(),
|
||||
amount: msg.amount.iter().map(|c| c.to_string()).collect(),
|
||||
memo: tx.tx.body.memo.clone(),
|
||||
};
|
||||
guard.last_seen_bank_msgs.push(details.clone());
|
||||
|
||||
if is_watched {
|
||||
guard.processed_bank_msgs_to_watched_addresses_since_startup += 1;
|
||||
guard.last_seen_watched_bank_msgs.push(details.clone());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn new_rejection<S: Into<String>>(
|
||||
&self,
|
||||
tx_hash: String,
|
||||
height: u64,
|
||||
index: u32,
|
||||
error: S,
|
||||
) {
|
||||
self.inner
|
||||
.write()
|
||||
.await
|
||||
.last_rejected_watched_bank_msgs
|
||||
.push(BankMsgRejection {
|
||||
rejected_at: OffsetDateTime::now_utc(),
|
||||
tx_hash,
|
||||
height,
|
||||
index,
|
||||
error: error.into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BankScraperModuleStateInner {
|
||||
pub(crate) processed_bank_msgs_since_startup: usize,
|
||||
pub(crate) processed_bank_msgs_to_watched_addresses_since_startup: usize,
|
||||
pub(crate) rejected_bank_msgs_to_watched_addresses_since_startup: usize,
|
||||
|
||||
pub(crate) last_seen_bank_msgs: RingBuffer<BankMsgDetails>,
|
||||
pub(crate) last_seen_watched_bank_msgs: RingBuffer<BankMsgDetails>,
|
||||
pub(crate) last_rejected_watched_bank_msgs: RingBuffer<BankMsgRejection>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct BankMsgDetails {
|
||||
pub(crate) processed_at: OffsetDateTime,
|
||||
pub(crate) tx_hash: String,
|
||||
pub(crate) height: u64,
|
||||
pub(crate) index: u32,
|
||||
pub(crate) from: String,
|
||||
pub(crate) to: String,
|
||||
pub(crate) amount: Vec<String>,
|
||||
pub(crate) memo: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BankMsgRejection {
|
||||
pub(crate) rejected_at: OffsetDateTime,
|
||||
pub(crate) tx_hash: String,
|
||||
pub(crate) height: u64,
|
||||
pub(crate) index: u32,
|
||||
pub(crate) error: String,
|
||||
}
|
||||
|
||||
impl FromRef<AppState> for PaymentListenerState {
|
||||
fn from_ref(input: &AppState) -> Self {
|
||||
input.payment_listener_state.clone()
|
||||
}
|
||||
}
|
||||
impl FromRef<AppState> for StatusState {
|
||||
fn from_ref(input: &AppState) -> Self {
|
||||
input.status_state.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl FromRef<AppState> for PriceScraperState {
|
||||
fn from_ref(input: &AppState) -> Self {
|
||||
input.price_scraper_state.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl FromRef<AppState> for BankScraperModuleState {
|
||||
fn from_ref(input: &AppState) -> Self {
|
||||
input.bank_scraper_module_state.clone()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,49 +6,71 @@ use core::str;
|
||||
use tokio::time::Duration;
|
||||
|
||||
use crate::db::DbPool;
|
||||
use crate::http::state::PriceScraperState;
|
||||
|
||||
const REFRESH_DELAY: Duration = Duration::from_secs(300);
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
|
||||
const COINGECKO_API_URL: &str =
|
||||
"https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,gbp,btc";
|
||||
|
||||
pub(crate) async fn run_price_scraper(db_pool: &DbPool) {
|
||||
loop {
|
||||
tracing::info!("Running in a loop 🏃");
|
||||
if let Err(e) = get_coingecko_prices(db_pool).await {
|
||||
tracing::error!("❌ Failed to get CoinGecko prices: {e}");
|
||||
tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs());
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
} else {
|
||||
tracing::info!("✅ Successfully fetched CoinGecko prices");
|
||||
tokio::time::sleep(REFRESH_DELAY).await;
|
||||
}
|
||||
}
|
||||
pub(crate) struct PriceScraper {
|
||||
shared_state: PriceScraperState,
|
||||
db_pool: DbPool,
|
||||
}
|
||||
|
||||
async fn get_coingecko_prices(pool: &DbPool) -> anyhow::Result<()> {
|
||||
tracing::info!("💰 Fetching CoinGecko prices from {}", COINGECKO_API_URL);
|
||||
|
||||
let response = reqwest::get(COINGECKO_API_URL)
|
||||
.await?
|
||||
.json::<CoingeckoPriceResponse>()
|
||||
.await;
|
||||
|
||||
tracing::info!("Got response {:?}", response);
|
||||
match response {
|
||||
Ok(resp) => {
|
||||
let price_record = PriceRecord {
|
||||
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
|
||||
nym: resp.nym,
|
||||
};
|
||||
|
||||
insert_nym_prices(pool, price_record).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
//tracing::info!("💰 CoinGecko price response: {:?}", response);
|
||||
tracing::error!("Error sending request: {}", e);
|
||||
impl PriceScraper {
|
||||
pub(crate) fn new(shared_state: PriceScraperState, db_pool: DbPool) -> Self {
|
||||
PriceScraper {
|
||||
shared_state,
|
||||
db_pool,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
async fn get_coingecko_prices(&self) -> anyhow::Result<CoingeckoPriceResponse> {
|
||||
tracing::info!("💰 Fetching CoinGecko prices from {COINGECKO_API_URL}");
|
||||
|
||||
let response = reqwest::get(COINGECKO_API_URL)
|
||||
.await?
|
||||
.json::<CoingeckoPriceResponse>()
|
||||
.await;
|
||||
|
||||
tracing::info!("Got response {:?}", response);
|
||||
match response {
|
||||
Ok(resp) => {
|
||||
let price_record = PriceRecord {
|
||||
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
|
||||
nym: resp.nym.clone(),
|
||||
};
|
||||
|
||||
insert_nym_prices(&self.db_pool, price_record).await?;
|
||||
Ok(resp)
|
||||
}
|
||||
Err(err) => {
|
||||
//tracing::info!("💰 CoinGecko price response: {:?}", response);
|
||||
tracing::error!("Error sending request: {err}");
|
||||
Err(err.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run(&self) {
|
||||
loop {
|
||||
tracing::info!("Running in a loop 🏃");
|
||||
match self.get_coingecko_prices().await {
|
||||
Ok(coingecko_price_response) => {
|
||||
self.shared_state
|
||||
.new_success(coingecko_price_response)
|
||||
.await;
|
||||
tracing::info!("✅ Successfully fetched CoinGecko prices");
|
||||
tokio::time::sleep(REFRESH_DELAY).await;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("❌ Failed to get CoinGecko prices: {err}");
|
||||
tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs());
|
||||
self.shared_state.new_failure(err.to_string()).await;
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user