Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e8dd72f8f8 | |||
| 80173805c7 | |||
| 6174481a1d |
@@ -47,7 +47,10 @@ pub struct NyxdScraperBuilder {
|
||||
}
|
||||
|
||||
impl NyxdScraperBuilder {
|
||||
pub async fn build_and_start(self) -> Result<NyxdScraper, ScraperError> {
|
||||
pub async fn build_and_start(
|
||||
self,
|
||||
start_block: Option<u32>,
|
||||
) -> Result<NyxdScraper, ScraperError> {
|
||||
let scraper = NyxdScraper::new(self.config).await?;
|
||||
|
||||
let (processing_tx, processing_rx) = unbounded_channel();
|
||||
@@ -90,6 +93,10 @@ impl NyxdScraperBuilder {
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(height) = start_block {
|
||||
scraper.process_block_range(Some(height), None).await?;
|
||||
}
|
||||
|
||||
scraper.start_tasks(block_requester, block_processor, chain_subscriber);
|
||||
|
||||
Ok(scraper)
|
||||
@@ -202,10 +209,10 @@ impl NyxdScraper {
|
||||
.await?
|
||||
.with_pruning(PruningOptions::nothing());
|
||||
|
||||
let current_height = self.rpc_client.current_block_height().await? as u32;
|
||||
let mut current_height = self.rpc_client.current_block_height().await? as u32;
|
||||
let last_processed = block_processor.last_process_height();
|
||||
|
||||
let starting_height = match starting_height {
|
||||
let mut starting_height = match starting_height {
|
||||
// always attempt to use whatever the user has provided
|
||||
Some(explicit) => explicit,
|
||||
None => {
|
||||
@@ -219,7 +226,8 @@ impl NyxdScraper {
|
||||
}
|
||||
};
|
||||
|
||||
let end_height = match end_height {
|
||||
let must_catch_up = end_height.is_none();
|
||||
let mut end_height = match end_height {
|
||||
// always attempt to use whatever the user has provided
|
||||
Some(explicit) => explicit,
|
||||
None => {
|
||||
@@ -234,32 +242,62 @@ impl NyxdScraper {
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
starting_height = starting_height,
|
||||
end_height = end_height,
|
||||
"attempting to process block range"
|
||||
);
|
||||
let mut last_processed = starting_height;
|
||||
|
||||
let range = (starting_height..=end_height).collect::<Vec<_>>();
|
||||
while last_processed < current_height {
|
||||
info!(
|
||||
starting_height = starting_height,
|
||||
end_height = end_height,
|
||||
"attempting to process block range"
|
||||
);
|
||||
|
||||
// the most likely bottleneck here are going to be the chain queries,
|
||||
// so batch multiple requests
|
||||
for batch in range.chunks(4) {
|
||||
let batch_result = join_all(
|
||||
batch
|
||||
.iter()
|
||||
.map(|height| self.rpc_client.get_basic_block_details(*height)),
|
||||
)
|
||||
.await;
|
||||
for result in batch_result {
|
||||
match result {
|
||||
Ok(block) => block_processor.process_block(block.into()).await?,
|
||||
Err(err) => {
|
||||
error!("failed to retrieve the block: {err}. stopping...");
|
||||
return Err(err);
|
||||
let range = (starting_height..=end_height).collect::<Vec<_>>();
|
||||
|
||||
// the most likely bottleneck here are going to be the chain queries,
|
||||
// so batch multiple requests
|
||||
for batch in range.chunks(4) {
|
||||
let batch_result = join_all(
|
||||
batch
|
||||
.iter()
|
||||
.map(|height| self.rpc_client.get_basic_block_details(*height)),
|
||||
)
|
||||
.await;
|
||||
for result in batch_result {
|
||||
match result {
|
||||
Ok(block) => block_processor.process_block(block.into()).await?,
|
||||
Err(err) => {
|
||||
error!("failed to retrieve the block: {err}. stopping...");
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if we don't need to catch up, return early
|
||||
if !must_catch_up {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// check if we have caught up to the current block height
|
||||
last_processed = end_height;
|
||||
current_height = self.rpc_client.current_block_height().await? as u32;
|
||||
|
||||
info!(
|
||||
last_processed = last_processed,
|
||||
current_height = current_height,
|
||||
"🏃 still need to catch up..."
|
||||
);
|
||||
|
||||
starting_height = last_processed + 1;
|
||||
end_height = current_height;
|
||||
}
|
||||
|
||||
if must_catch_up {
|
||||
info!(
|
||||
last_processed = last_processed,
|
||||
current_height = current_height,
|
||||
"✅ block processing has caught up!"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -237,6 +237,58 @@ impl StorageManager {
|
||||
Ok(-1)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn get_transactions_after_height(
|
||||
&self,
|
||||
min_height: i64,
|
||||
message_type: Option<&str>,
|
||||
) -> Result<Vec<TransactionWithBlock>, sqlx::Error> {
|
||||
match message_type {
|
||||
Some(msg_type) => {
|
||||
sqlx::query_as!(
|
||||
TransactionWithBlock,
|
||||
r#"
|
||||
SELECT t.hash, t.height, t.memo, t.raw_log
|
||||
FROM message m
|
||||
JOIN "transaction" t ON m.transaction_hash = t.hash
|
||||
JOIN block b ON t.height = b.height
|
||||
WHERE t.height > ?
|
||||
AND m.type = ?
|
||||
ORDER BY t.height ASC
|
||||
"#,
|
||||
min_height,
|
||||
msg_type
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
sqlx::query_as!(
|
||||
TransactionWithBlock,
|
||||
r#"
|
||||
SELECT t.hash, t.height, t.memo, t.raw_log
|
||||
FROM message m
|
||||
JOIN "transaction" t ON m.transaction_hash = t.hash
|
||||
JOIN block b ON t.height = b.height
|
||||
WHERE t.height > ?
|
||||
ORDER BY t.height ASC
|
||||
"#,
|
||||
min_height
|
||||
)
|
||||
.fetch_all(&self.connection_pool)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
pub struct TransactionWithBlock {
|
||||
pub hash: String,
|
||||
pub height: i64,
|
||||
pub memo: Option<String>,
|
||||
pub raw_log: Option<String>,
|
||||
}
|
||||
|
||||
// make those generic over executor so that they could be performed over connection pool and a tx
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::{
|
||||
models::{CommitSignature, Validator},
|
||||
},
|
||||
};
|
||||
use manager::TransactionWithBlock;
|
||||
use sqlx::{types::time::OffsetDateTime, ConnectOptions, Sqlite, Transaction};
|
||||
use std::{fmt::Debug, path::Path};
|
||||
use tendermint::{
|
||||
@@ -207,6 +208,17 @@ impl ScraperStorage {
|
||||
pub async fn get_pruned_height(&self) -> Result<i64, ScraperError> {
|
||||
Ok(self.manager.get_pruned_height().await?)
|
||||
}
|
||||
|
||||
pub async fn get_transactions_after_height(
|
||||
&self,
|
||||
min_height: i64,
|
||||
message_type: Option<&str>,
|
||||
) -> Result<Vec<TransactionWithBlock>, ScraperError> {
|
||||
Ok(self
|
||||
.manager
|
||||
.get_transactions_after_height(min_height, message_type)
|
||||
.await?)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn persist_block(
|
||||
|
||||
-16
@@ -1,16 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "INSERT INTO responses\n (joke_id, joke, date_created)\n VALUES\n ($1, $2, $3)\n ON CONFLICT(joke_id) DO UPDATE SET\n joke=excluded.joke,\n date_created=excluded.date_created;",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Varchar",
|
||||
"Text",
|
||||
"Int4"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "249faa11b88b749f50342bb5c9cc41d20896db543eed74a6f320c041bcbb723d"
|
||||
}
|
||||
-34
@@ -1,34 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "joke_id",
|
||||
"type_info": "Varchar"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "joke",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "date_created",
|
||||
"type_info": "Int4"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "aff7fbd06728004d2f2226d20c32f1482df00de2dc1d2b4debbb2e12553d997b"
|
||||
}
|
||||
-32
@@ -1,32 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT joke_id, joke, date_created FROM responses",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "joke_id",
|
||||
"type_info": "Varchar"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "joke",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "date_created",
|
||||
"type_info": "Int4"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "e53f479f8cead3dc8aa1875e5d450ad69686cf6a109e37d6c3f0623c3e9f91d0"
|
||||
}
|
||||
@@ -24,9 +24,13 @@ nym-task = { path = "../common/task" }
|
||||
nym-node-requests = { path = "../nym-node/nym-node-requests", features = [
|
||||
"openapi",
|
||||
] }
|
||||
nyxd-scraper = {path = "../common/nyxd-scraper"}
|
||||
reqwest = {workspace= true, features = ["rustls-tls"]}
|
||||
rocket = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "time"] }
|
||||
time = {version = "0.3.36"}
|
||||
tokio = { workspace = true, features = ["process", "rt-multi-thread"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
@@ -40,4 +44,4 @@ utoipauto = { workspace = true }
|
||||
[build-dependencies]
|
||||
anyhow = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
|
||||
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
FROM rust:latest AS builder
|
||||
|
||||
COPY ./ /usr/src/nym
|
||||
WORKDIR /usr/src/nym/nym-data-observatory
|
||||
|
||||
RUN cargo build --release
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# The following environment variables are required at runtime:
|
||||
#
|
||||
# NYM_DATA_OBSERVATORY_CONNECTION_URL
|
||||
#
|
||||
# And optionally:
|
||||
#
|
||||
# NYM_DATA_OBSERVATORY_HTTP_PORT
|
||||
#
|
||||
# see https://github.com/nymtech/nym/blob/develop/nym-data-observatory/src/main.rs for details
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
FROM ubuntu:24.04
|
||||
|
||||
RUN apt update && apt install -yy curl ca-certificates
|
||||
|
||||
WORKDIR /nym
|
||||
|
||||
COPY --from=builder /usr/src/nym/target/release/nym-data-observatory ./
|
||||
ENTRYPOINT [ "/nym/nym-data-observatory" ]
|
||||
@@ -1,58 +1,51 @@
|
||||
use anyhow::Result;
|
||||
use sqlx::{Connection, PgConnection};
|
||||
use sqlx::{sqlite::SqliteConnectOptions, Connection, SqliteConnection};
|
||||
use std::io::Write;
|
||||
use std::{collections::HashMap, fs::File};
|
||||
use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr};
|
||||
|
||||
const POSTGRES_USER: &str = "nym";
|
||||
const POSTGRES_PASSWORD: &str = "password123";
|
||||
const POSTGRES_DB: &str = "data_obs_db";
|
||||
|
||||
/// if schema changes, rerun `cargo sqlx prepare` with a running DB
|
||||
/// https://github.com/launchbadge/sqlx/blob/main/sqlx-cli/README.md#enable-building-in-offline-mode-with-query
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let db_url =
|
||||
format!("postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:5432/{POSTGRES_DB}");
|
||||
let db_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("data_observatory.sqlite");
|
||||
|
||||
export_db_variables(&db_url)?;
|
||||
// if a live DB is reachable, use that
|
||||
if PgConnection::connect(&db_url).await.is_ok() {
|
||||
println!("cargo::rustc-env=SQLX_OFFLINE=false");
|
||||
run_migrations(&db_url).await?;
|
||||
} else {
|
||||
// by default, run in offline mode
|
||||
println!("cargo::rustc-env=SQLX_OFFLINE=true");
|
||||
// Create the database directory if it doesn't exist
|
||||
if let Some(parent) = db_path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
rerun_if_changed();
|
||||
let db_url = format!("sqlite:{}", db_path.display());
|
||||
|
||||
// Ensure database file is created with proper permissions
|
||||
let connect_options = SqliteConnectOptions::from_str(&db_url)?
|
||||
.create_if_missing(true)
|
||||
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
|
||||
.foreign_keys(true);
|
||||
|
||||
// Create initial connection to ensure database exists
|
||||
let mut conn = SqliteConnection::connect_with(&connect_options).await?;
|
||||
|
||||
export_db_variables(&db_url)?;
|
||||
println!("cargo:rustc-env=SQLX_OFFLINE=false");
|
||||
|
||||
// Run migrations after ensuring database exists
|
||||
sqlx::migrate!("./migrations").run(&mut conn).await?;
|
||||
|
||||
// Add rerun-if-changed directives
|
||||
println!("cargo:rerun-if-changed=migrations");
|
||||
println!("cargo:rerun-if-changed=build.rs");
|
||||
println!("cargo:rerun-if-changed=src");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn export_db_variables(db_url: &str) -> Result<()> {
|
||||
let mut map = HashMap::new();
|
||||
map.insert("POSTGRES_USER", POSTGRES_USER);
|
||||
map.insert("POSTGRES_PASSWORD", POSTGRES_PASSWORD);
|
||||
map.insert("POSTGRES_DB", POSTGRES_DB);
|
||||
map.insert("DATABASE_URL", db_url);
|
||||
|
||||
let mut file = File::create(".env")?;
|
||||
for (var, value) in map.iter() {
|
||||
println!("cargo::rustc-env={}={}", var, value);
|
||||
writeln!(file, "{}={}", var, value).expect("Failed to write to dotenv file");
|
||||
println!("cargo:rustc-env={}={}", var, value);
|
||||
writeln!(file, "{}={}", var, value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_migrations(db_url: &str) -> Result<()> {
|
||||
let mut conn = PgConnection::connect(db_url).await?;
|
||||
sqlx::migrate!("./migrations").run(&mut conn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rerun_if_changed() {
|
||||
println!("cargo::rerun-if-changed=migrations");
|
||||
println!("cargo::rerun-if-changed=src/db/queries");
|
||||
}
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
CREATE TABLE price_history (
|
||||
timestamp INTEGER PRIMARY KEY,
|
||||
chf REAL NOT NULL,
|
||||
usd REAL NOT NULL,
|
||||
eur REAL NOT NULL,
|
||||
btc REAL NOT NULL
|
||||
);
|
||||
@@ -0,0 +1,10 @@
|
||||
CREATE TABLE payments (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
transaction_hash TEXT NOT NULL UNIQUE,
|
||||
sender_address TEXT NOT NULL,
|
||||
receiver_address TEXT NOT NULL,
|
||||
amount REAL NOT NULL,
|
||||
timestamp INTEGER NOT NULL,
|
||||
height INTEGER NOT NULL,
|
||||
memo TEXT
|
||||
);
|
||||
@@ -1,13 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# .env is generated in build.rs
|
||||
source .env
|
||||
|
||||
# Launching a container in such a way that it's destroyed after you detach from the terminal:
|
||||
docker compose up
|
||||
|
||||
# docker exec -it nym-data-observatory-pg /bin/bash
|
||||
# psql -U youruser -d yourdb
|
||||
|
||||
echo "Tearing down containers to have a clean slate"
|
||||
docker compose down -v
|
||||
@@ -1,61 +0,0 @@
|
||||
use core::str;
|
||||
use serde::Deserialize;
|
||||
use tokio::process::Command;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
|
||||
use crate::db::{self, DbPool};
|
||||
|
||||
const REFRESH_DELAY: Duration = Duration::from_secs(15);
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
|
||||
|
||||
pub(crate) async fn spawn_in_background(db_pool: DbPool) -> JoinHandle<()> {
|
||||
loop {
|
||||
tracing::info!("Running in a loop 🏃");
|
||||
|
||||
if let Err(e) = some_network_action(&db_pool).await {
|
||||
tracing::error!(
|
||||
"❌ Run failed: {e}, retrying in {}s...",
|
||||
FAILURE_RETRY_DELAY.as_secs()
|
||||
);
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
} else {
|
||||
tracing::info!(
|
||||
"✅ Run successful, sleeping for {}s...",
|
||||
REFRESH_DELAY.as_secs()
|
||||
);
|
||||
tokio::time::sleep(REFRESH_DELAY).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub(crate) struct Response {
|
||||
#[serde(rename(deserialize = "id"))]
|
||||
pub(crate) joke_id: String,
|
||||
pub(crate) joke: String,
|
||||
#[serde(rename(deserialize = "status"))]
|
||||
pub(crate) _status: u16,
|
||||
}
|
||||
|
||||
async fn some_network_action(pool: &DbPool) -> anyhow::Result<()> {
|
||||
// for demonstration purposes only. You should use reqwest if you need it
|
||||
let output = Command::new("curl")
|
||||
.arg("-H")
|
||||
.arg("Accept: application/json")
|
||||
.arg("https://icanhazdadjoke.com/")
|
||||
.output()
|
||||
.await?;
|
||||
|
||||
if !output.status.success() {
|
||||
anyhow::bail!("Curl command failed with status: {}", output.status);
|
||||
}
|
||||
|
||||
let response_str = str::from_utf8(&output.stdout)?;
|
||||
let joke_response: Response = serde_json::from_str(response_str)?;
|
||||
|
||||
tracing::info!("{:?}", joke_response.joke);
|
||||
db::queries::insert_joke(pool, joke_response.into()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
use nyxd_scraper::{storage::ScraperStorage, Config, NyxdScraper, PruningOptions};
|
||||
|
||||
pub(crate) async fn run_chain_scraper() -> anyhow::Result<ScraperStorage> {
|
||||
let websocket_url =
|
||||
std::env::var("NYXD_WEBSOCKET_URL").expect("NYXD_WEBSOCKET_URL not defined");
|
||||
|
||||
let rpc_url = std::env::var("NYXD_RPC_URL").expect("NYXD_RPC_URL not defined");
|
||||
let websocket_url = reqwest::Url::parse(&websocket_url)?;
|
||||
let rpc_url = reqwest::Url::parse(&rpc_url)?;
|
||||
|
||||
let start_block_height = std::env::var("NYXD_SCRAPER_START_HEIGHT")
|
||||
.ok()
|
||||
.and_then(|value| value.parse::<u32>().ok());
|
||||
|
||||
let scraper = NyxdScraper::builder(Config {
|
||||
websocket_url,
|
||||
rpc_url,
|
||||
database_path: "chain_history.sqlite".into(),
|
||||
pruning_options: PruningOptions::nothing(),
|
||||
store_precommits: false,
|
||||
});
|
||||
|
||||
let instance = scraper.build_and_start(start_block_height).await?;
|
||||
|
||||
Ok(instance.storage)
|
||||
}
|
||||
@@ -1,13 +1,16 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use sqlx::{migrate::Migrator, postgres::PgConnectOptions, ConnectOptions, PgPool};
|
||||
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, SqlitePool};
|
||||
use std::str::FromStr;
|
||||
|
||||
pub(crate) mod models;
|
||||
pub(crate) mod queries;
|
||||
pub(crate) mod queries {
|
||||
pub mod payments;
|
||||
pub mod price;
|
||||
}
|
||||
|
||||
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
|
||||
|
||||
pub(crate) type DbPool = PgPool;
|
||||
pub(crate) type DbPool = SqlitePool;
|
||||
|
||||
pub(crate) struct Storage {
|
||||
pool: DbPool,
|
||||
@@ -16,13 +19,16 @@ pub(crate) struct Storage {
|
||||
impl Storage {
|
||||
pub async fn init(connection_url: String) -> Result<Self> {
|
||||
let connect_options =
|
||||
PgConnectOptions::from_str(&connection_url)?.disable_statement_logging();
|
||||
SqliteConnectOptions::from_str(&connection_url)?.create_if_missing(true);
|
||||
|
||||
let pool = DbPool::connect_with(connect_options)
|
||||
.await
|
||||
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
|
||||
|
||||
MIGRATOR.run(&pool).await?;
|
||||
MIGRATOR
|
||||
.run(&pool)
|
||||
.await
|
||||
.map_err(|err| anyhow!("Failed to run migrations: {}", err))?;
|
||||
|
||||
Ok(Storage { pool })
|
||||
}
|
||||
|
||||
@@ -1,22 +1,41 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::background_task::Response;
|
||||
#[derive(Clone, Deserialize, Debug)]
|
||||
pub(crate) struct CurrencyPrices {
|
||||
pub(crate) chf: f32,
|
||||
pub(crate) usd: f32,
|
||||
pub(crate) eur: f32,
|
||||
pub(crate) btc: f32,
|
||||
}
|
||||
|
||||
// Struct to hold Coingecko response
|
||||
#[derive(Clone, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct CoingeckoPriceResponse {
|
||||
pub(crate) nym: CurrencyPrices,
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct PriceRecord {
|
||||
pub(crate) timestamp: i64,
|
||||
pub(crate) nym: CurrencyPrices,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct JokeDto {
|
||||
pub(crate) joke_id: String,
|
||||
pub(crate) joke: String,
|
||||
pub(crate) date_created: i32,
|
||||
pub(crate) struct PriceHistory {
|
||||
pub(crate) timestamp: i64,
|
||||
pub(crate) chf: f32,
|
||||
pub(crate) usd: f32,
|
||||
pub(crate) eur: f32,
|
||||
pub(crate) btc: f32,
|
||||
}
|
||||
|
||||
impl From<Response> for JokeDto {
|
||||
fn from(value: Response) -> Self {
|
||||
Self {
|
||||
joke_id: value.joke_id,
|
||||
joke: value.joke,
|
||||
// casting not smart, can implicitly panic, don't do this in prod
|
||||
date_created: chrono::offset::Utc::now().timestamp() as i32,
|
||||
}
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Debug, ToSchema)]
|
||||
pub(crate) struct PaymentRecord {
|
||||
pub(crate) transaction_hash: String,
|
||||
pub(crate) sender_address: String,
|
||||
pub(crate) receiver_address: String,
|
||||
pub(crate) amount: f64,
|
||||
pub(crate) timestamp: i64,
|
||||
pub(crate) height: i64,
|
||||
}
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
use crate::db::{models::JokeDto, DbPool};
|
||||
|
||||
pub(crate) async fn insert_joke(pool: &DbPool, joke: JokeDto) -> anyhow::Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
sqlx::query!(
|
||||
"INSERT INTO responses
|
||||
(joke_id, joke, date_created)
|
||||
VALUES
|
||||
($1, $2, $3)
|
||||
ON CONFLICT(joke_id) DO UPDATE SET
|
||||
joke=excluded.joke,
|
||||
date_created=excluded.date_created;",
|
||||
joke.joke_id,
|
||||
joke.joke,
|
||||
joke.date_created as i32,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn select_joke_by_id(pool: &DbPool, joke_id: &str) -> anyhow::Result<JokeDto> {
|
||||
sqlx::query_as!(
|
||||
JokeDto,
|
||||
"SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
|
||||
joke_id
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)
|
||||
}
|
||||
|
||||
pub(crate) async fn select_all(pool: &DbPool) -> anyhow::Result<Vec<JokeDto>> {
|
||||
sqlx::query_as!(JokeDto, "SELECT joke_id, joke, date_created FROM responses",)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
// group queries in files by theme
|
||||
mod joke;
|
||||
mod payments;
|
||||
mod price;
|
||||
|
||||
// re-exporting allows us to access all queries via `queries::bla``
|
||||
pub(crate) use joke::{insert_joke, select_all, select_joke_by_id};
|
||||
pub(crate) use payments::{get_last_checked_height, insert_payment};
|
||||
pub(crate) use price::{get_latest_price, insert_nym_prices};
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
use crate::db::DbPool;
|
||||
use anyhow::Result;
|
||||
|
||||
pub async fn get_last_checked_height(pool: &DbPool) -> Result<i64> {
|
||||
let result = sqlx::query_scalar!("SELECT MAX(height) FROM payments")
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
Ok(result.unwrap_or(0))
|
||||
}
|
||||
|
||||
pub async fn insert_payment(
|
||||
pool: &DbPool,
|
||||
transaction_hash: String,
|
||||
sender_address: String,
|
||||
receiver_address: String,
|
||||
amount: f64,
|
||||
height: i64,
|
||||
memo: Option<String>,
|
||||
) -> Result<()> {
|
||||
let timestamp = chrono::Utc::now().timestamp();
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO payments (
|
||||
transaction_hash, sender_address, receiver_address,
|
||||
amount, height, timestamp, memo
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
transaction_hash,
|
||||
sender_address,
|
||||
receiver_address,
|
||||
amount,
|
||||
height,
|
||||
timestamp,
|
||||
memo,
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
use crate::db::models::{PriceHistory, PriceRecord};
|
||||
use crate::db::DbPool;
|
||||
|
||||
pub(crate) async fn insert_nym_prices(
|
||||
pool: &DbPool,
|
||||
price_data: PriceRecord,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
let timestamp = price_data.timestamp;
|
||||
sqlx::query!(
|
||||
"INSERT INTO price_history
|
||||
(timestamp, chf, usd, eur, btc)
|
||||
VALUES
|
||||
($1, $2, $3, $4, $5)
|
||||
ON CONFLICT(timestamp) DO UPDATE SET
|
||||
chf=excluded.chf,
|
||||
usd=excluded.usd,
|
||||
eur=excluded.eur,
|
||||
btc=excluded.btc;",
|
||||
timestamp,
|
||||
price_data.nym.chf,
|
||||
price_data.nym.usd,
|
||||
price_data.nym.eur,
|
||||
price_data.nym.btc,
|
||||
)
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_latest_price(pool: &DbPool) -> anyhow::Result<PriceHistory> {
|
||||
let result = sqlx::query!(
|
||||
"SELECT timestamp, chf, usd, eur, btc FROM price_history ORDER BY timestamp DESC LIMIT 1;"
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok(PriceHistory {
|
||||
timestamp: result.timestamp,
|
||||
chf: result.chf as f32,
|
||||
usd: result.usd as f32,
|
||||
eur: result.eur as f32,
|
||||
btc: result.btc as f32,
|
||||
})
|
||||
}
|
||||
@@ -1,78 +0,0 @@
|
||||
use axum::{
|
||||
extract::{Path, State},
|
||||
Json, Router,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use utoipa::IntoParams;
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
models::JokeDto,
|
||||
queries::{self, select_joke_by_id},
|
||||
},
|
||||
http::{
|
||||
error::{Error, HttpResult},
|
||||
state::AppState,
|
||||
},
|
||||
};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/", axum::routing::get(jokes))
|
||||
.route("/:joke_id", axum::routing::get(joke_by_id))
|
||||
.route("/fetch_another", axum::routing::get(fetch_another))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Dad Jokes",
|
||||
get,
|
||||
path = "/v1/jokes",
|
||||
responses(
|
||||
(status = 200, body = Vec<JokeDto>)
|
||||
)
|
||||
)]
|
||||
async fn jokes(State(state): State<AppState>) -> HttpResult<Json<Vec<JokeDto>>> {
|
||||
queries::select_all(state.db_pool())
|
||||
.await
|
||||
.map(Json::from)
|
||||
.map_err(|_| Error::internal())
|
||||
}
|
||||
|
||||
#[derive(Deserialize, IntoParams)]
|
||||
#[into_params(parameter_in = Path)]
|
||||
struct JokeIdParam {
|
||||
joke_id: String,
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Dad Jokes",
|
||||
get,
|
||||
params(
|
||||
JokeIdParam
|
||||
),
|
||||
path = "/v1/jokes/{joke_id}",
|
||||
responses(
|
||||
(status = 200, body = JokeDto)
|
||||
)
|
||||
)]
|
||||
async fn joke_by_id(
|
||||
Path(JokeIdParam { joke_id }): Path<JokeIdParam>,
|
||||
State(state): State<AppState>,
|
||||
) -> HttpResult<Json<JokeDto>> {
|
||||
select_joke_by_id(state.db_pool(), &joke_id)
|
||||
.await
|
||||
.map(Json::from)
|
||||
.map_err(|_| Error::not_found(joke_id))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Dad Jokes",
|
||||
get,
|
||||
path = "/v1/jokes/fetch_another",
|
||||
responses(
|
||||
(status = 200, body = String)
|
||||
)
|
||||
)]
|
||||
async fn fetch_another(State(_state): State<AppState>) -> HttpResult<Json<String>> {
|
||||
Ok(Json(String::from("Done boss, check the DB")))
|
||||
}
|
||||
@@ -7,8 +7,8 @@ use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
use crate::http::{api_docs, server::HttpServer, state::AppState};
|
||||
|
||||
pub(crate) mod jokes;
|
||||
pub(crate) mod mixnodes;
|
||||
pub(crate) mod price;
|
||||
|
||||
pub(crate) struct RouterBuilder {
|
||||
unfinished_router: Router<AppState>,
|
||||
@@ -28,8 +28,9 @@ impl RouterBuilder {
|
||||
.nest(
|
||||
"/v1",
|
||||
Router::new()
|
||||
.nest("/jokes", jokes::routes())
|
||||
.nest("/mixnodes", mixnodes::routes()),
|
||||
//.nest("/jokes", jokes::routes())
|
||||
.nest("/mixnodes", mixnodes::routes())
|
||||
.nest("/price", price::routes()),
|
||||
);
|
||||
|
||||
Self {
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
use crate::db::models::PriceHistory;
|
||||
use crate::db::queries::price::get_latest_price;
|
||||
use crate::http::error::Error;
|
||||
use crate::http::error::HttpResult;
|
||||
use crate::http::state::AppState;
|
||||
use axum::{extract::State, Json, Router};
|
||||
|
||||
pub(crate) fn routes() -> Router<AppState> {
|
||||
Router::new().route("/", axum::routing::get(price))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
tag = "Nym Price",
|
||||
get,
|
||||
path = "/v1/price",
|
||||
responses(
|
||||
(status = 200, body = String)
|
||||
)
|
||||
)]
|
||||
|
||||
/// Fetch the latest price cached by the data observatory
|
||||
async fn price(State(state): State<AppState>) -> HttpResult<Json<PriceHistory>> {
|
||||
get_latest_price(state.db_pool())
|
||||
.await
|
||||
.map(Json::from)
|
||||
.map_err(|_| Error::internal())
|
||||
}
|
||||
@@ -6,5 +6,9 @@ use utoipauto::utoipauto;
|
||||
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
|
||||
#[utoipauto(paths = "./nym-data-observatory/src")]
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(info(title = "Nym API"), tags(), components(schemas()))]
|
||||
#[openapi(
|
||||
info(title = "Nym Data Observatory API"),
|
||||
tags(),
|
||||
components(schemas())
|
||||
)]
|
||||
pub(super) struct ApiDoc;
|
||||
|
||||
@@ -6,13 +6,6 @@ pub(crate) struct Error {
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub(crate) fn not_found(message: String) -> Self {
|
||||
Self {
|
||||
message,
|
||||
status: axum::http::StatusCode::NOT_FOUND,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn internal() -> Self {
|
||||
Self {
|
||||
message: String::from("Internal server error"),
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
use chain_scraper::run_chain_scraper;
|
||||
use clap::Parser;
|
||||
use nym_network_defaults::setup_env;
|
||||
use nym_task::signal::wait_for_signal;
|
||||
use tokio::join;
|
||||
|
||||
mod background_task;
|
||||
mod chain_scraper;
|
||||
mod db;
|
||||
mod http;
|
||||
mod logging;
|
||||
mod payment_listener;
|
||||
mod price_scraper;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version, about, long_about = None)]
|
||||
@@ -18,25 +22,13 @@ struct Args {
|
||||
#[arg(short, long, default_value = None, env = "NYM_DATA_OBSERVATORY_ENV_FILE")]
|
||||
env_file: Option<String>,
|
||||
|
||||
/// DB connection username
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_USERNAME")]
|
||||
connection_username: String,
|
||||
|
||||
/// DB connection password
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD")]
|
||||
connection_password: String,
|
||||
|
||||
/// DB connection host
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_HOST")]
|
||||
connection_host: String,
|
||||
|
||||
/// DB connection port
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PORT")]
|
||||
connection_port: String,
|
||||
|
||||
/// DB connection database name
|
||||
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_DB")]
|
||||
connection_db: String,
|
||||
/// SQLite database file path
|
||||
#[arg(
|
||||
long,
|
||||
default_value = "data_observatory.sqlite",
|
||||
env = "NYM_DATA_OBSERVATORY_DB_PATH"
|
||||
)]
|
||||
db_path: String,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -44,30 +36,50 @@ async fn main() -> anyhow::Result<()> {
|
||||
logging::setup_tracing_logger();
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
setup_env(args.env_file); // Defaults to mainnet if empty
|
||||
|
||||
let connection_url = format!(
|
||||
"postgres://{}:{}@{}:{}/{}",
|
||||
args.connection_username,
|
||||
args.connection_password,
|
||||
args.connection_host,
|
||||
args.connection_port,
|
||||
args.connection_db
|
||||
);
|
||||
let db_path = args.db_path;
|
||||
// Ensure parent directory exists
|
||||
if let Some(parent) = std::path::Path::new(&db_path).parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
}
|
||||
|
||||
let connection_url = format!("sqlite://{}?mode=rwc", db_path);
|
||||
let storage = db::Storage::init(connection_url).await?;
|
||||
let db_pool = storage.pool_owned().await;
|
||||
tokio::spawn(async move {
|
||||
background_task::spawn_in_background(db_pool).await;
|
||||
tracing::info!("Started task");
|
||||
let observatory_pool = storage.pool_owned().await;
|
||||
|
||||
// Spawn the chain scraper and get its storage
|
||||
|
||||
// Spawn the payment listener task
|
||||
let payment_listener_handle = tokio::spawn({
|
||||
let obs_pool = observatory_pool.clone();
|
||||
let chain_storage = run_chain_scraper().await?;
|
||||
|
||||
async move {
|
||||
if let Err(e) = payment_listener::run_payment_listener(obs_pool, chain_storage).await {
|
||||
tracing::error!("Payment listener error: {}", e);
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
}
|
||||
});
|
||||
|
||||
// Clone pool for each task that needs it
|
||||
//let background_pool = db_pool.clone();
|
||||
|
||||
let price_scraper_handle = tokio::spawn(async move {
|
||||
price_scraper::run_price_scraper(&observatory_pool).await;
|
||||
});
|
||||
|
||||
let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, args.http_port)
|
||||
.await
|
||||
.expect("Failed to start server");
|
||||
|
||||
tracing::info!("Started HTTP server on port {}", args.http_port);
|
||||
|
||||
// Wait for the short-lived tasks to complete
|
||||
let _ = join!(price_scraper_handle, payment_listener_handle);
|
||||
|
||||
// Wait for a signal to terminate the long-running task
|
||||
wait_for_signal().await;
|
||||
|
||||
if let Err(err) = shutdown_handles.shutdown().await {
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
use crate::db::queries;
|
||||
use nyxd_scraper::storage::ScraperStorage;
|
||||
use reqwest::Client;
|
||||
use serde_json::{json, Value};
|
||||
use sqlx::SqlitePool;
|
||||
use std::env;
|
||||
use tokio::time::{self, Duration};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TransferEvent {
|
||||
recipient: String,
|
||||
sender: String,
|
||||
amount: String,
|
||||
}
|
||||
|
||||
pub(crate) async fn run_payment_listener(
|
||||
observatory_pool: SqlitePool,
|
||||
chain_storage: ScraperStorage,
|
||||
) -> anyhow::Result<()> {
|
||||
let payment_receive_address = env::var("PAYMENT_RECEIVE_ADDRESS").map_err(|_| {
|
||||
anyhow::anyhow!("Environment variable `PAYMENT_RECEIVE_ADDRESS` not defined")
|
||||
})?;
|
||||
let webhook_url = env::var("WEBHOOK_URL")
|
||||
.map_err(|_| anyhow::anyhow!("Environment variable `WEBHOOK_URL` not defined"))?;
|
||||
|
||||
let client = Client::new();
|
||||
loop {
|
||||
let last_checked_height =
|
||||
queries::payments::get_last_checked_height(&observatory_pool).await?;
|
||||
tracing::info!("Last checked height: {}", last_checked_height);
|
||||
|
||||
let transactions = chain_storage
|
||||
.get_transactions_after_height(
|
||||
last_checked_height,
|
||||
Some("/cosmos.bank.v1beta1.MsgSend"),
|
||||
)
|
||||
.await?;
|
||||
|
||||
for tx in transactions {
|
||||
tracing::info!("Processing transaction: {}", tx.hash);
|
||||
if let Some(raw_log) = tx.raw_log.as_deref() {
|
||||
if let Some(transfer) = parse_transfer_from_raw_log(raw_log)? {
|
||||
if transfer.recipient == payment_receive_address {
|
||||
let amount: f64 = parse_unym_amount(&transfer.amount)?;
|
||||
|
||||
queries::payments::insert_payment(
|
||||
&observatory_pool,
|
||||
tx.hash.clone(),
|
||||
transfer.sender.clone(),
|
||||
transfer.recipient.clone(),
|
||||
amount,
|
||||
tx.height,
|
||||
tx.memo.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let webhook_data = json!({
|
||||
"transaction_hash": tx.hash,
|
||||
"sender_address": transfer.sender,
|
||||
"receiver_address": transfer.recipient,
|
||||
"amount": amount,
|
||||
"height": tx.height,
|
||||
"memo": tx.memo,
|
||||
});
|
||||
let _ = client.post(&webhook_url).json(&webhook_data).send().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_transfer_from_raw_log(raw_log: &str) -> anyhow::Result<Option<TransferEvent>> {
|
||||
let log_value: Value = serde_json::from_str(raw_log)?;
|
||||
|
||||
if let Some(events) = log_value[0]["events"].as_array() {
|
||||
if let Some(transfer_event) = events.iter().find(|e| e["type"] == "transfer") {
|
||||
if let Some(attrs) = transfer_event["attributes"].as_array() {
|
||||
let mut transfer = TransferEvent {
|
||||
recipient: String::new(),
|
||||
sender: String::new(),
|
||||
amount: String::new(),
|
||||
};
|
||||
|
||||
for attr in attrs {
|
||||
match attr["key"].as_str() {
|
||||
Some("recipient") => {
|
||||
transfer.recipient = attr["value"].as_str().unwrap_or("").to_string()
|
||||
}
|
||||
Some("sender") => {
|
||||
transfer.sender = attr["value"].as_str().unwrap_or("").to_string()
|
||||
}
|
||||
Some("amount") => {
|
||||
transfer.amount = attr["value"].as_str().unwrap_or("").to_string()
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(Some(transfer));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn parse_unym_amount(amount: &str) -> anyhow::Result<f64> {
|
||||
let amount = amount.trim_end_matches("unym");
|
||||
let parsed: f64 = amount.parse()?;
|
||||
Ok(parsed / 1_000_000.0)
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
use crate::db::{
|
||||
models::{CoingeckoPriceResponse, PriceRecord},
|
||||
queries::price::insert_nym_prices,
|
||||
};
|
||||
use core::str;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
|
||||
use crate::db::DbPool;
|
||||
|
||||
const REFRESH_DELAY: Duration = Duration::from_secs(300);
|
||||
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
|
||||
const COINGECKO_API_URL: &str =
|
||||
"https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,btc";
|
||||
|
||||
pub(crate) async fn run_price_scraper(db_pool: &DbPool) -> JoinHandle<()> {
|
||||
loop {
|
||||
tracing::info!("Running in a loop 🏃");
|
||||
if let Err(e) = get_coingecko_prices(db_pool).await {
|
||||
tracing::error!("❌ Failed to get CoinGecko prices: {e}");
|
||||
tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs());
|
||||
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
|
||||
} else {
|
||||
tracing::info!("✅ Successfully fetched CoinGecko prices");
|
||||
tokio::time::sleep(REFRESH_DELAY).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_coingecko_prices(pool: &DbPool) -> anyhow::Result<()> {
|
||||
tracing::info!("💰 Fetching CoinGecko prices from {}", COINGECKO_API_URL);
|
||||
|
||||
let response = reqwest::get(COINGECKO_API_URL)
|
||||
.await?
|
||||
.json::<CoingeckoPriceResponse>()
|
||||
.await;
|
||||
|
||||
tracing::info!("Got response {:?}", response);
|
||||
match response {
|
||||
Ok(resp) => {
|
||||
let price_record = PriceRecord {
|
||||
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
|
||||
nym: resp.nym,
|
||||
};
|
||||
|
||||
insert_nym_prices(pool, price_record).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
//tracing::info!("💰 CoinGecko price response: {:?}", response);
|
||||
tracing::error!("Error sending request: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user