Compare commits

...

3 Commits

28 changed files with 590 additions and 429 deletions
+63 -25
View File
@@ -47,7 +47,10 @@ pub struct NyxdScraperBuilder {
}
impl NyxdScraperBuilder {
pub async fn build_and_start(self) -> Result<NyxdScraper, ScraperError> {
pub async fn build_and_start(
self,
start_block: Option<u32>,
) -> Result<NyxdScraper, ScraperError> {
let scraper = NyxdScraper::new(self.config).await?;
let (processing_tx, processing_rx) = unbounded_channel();
@@ -90,6 +93,10 @@ impl NyxdScraperBuilder {
)
.await?;
if let Some(height) = start_block {
scraper.process_block_range(Some(height), None).await?;
}
scraper.start_tasks(block_requester, block_processor, chain_subscriber);
Ok(scraper)
@@ -202,10 +209,10 @@ impl NyxdScraper {
.await?
.with_pruning(PruningOptions::nothing());
let current_height = self.rpc_client.current_block_height().await? as u32;
let mut current_height = self.rpc_client.current_block_height().await? as u32;
let last_processed = block_processor.last_process_height();
let starting_height = match starting_height {
let mut starting_height = match starting_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -219,7 +226,8 @@ impl NyxdScraper {
}
};
let end_height = match end_height {
let must_catch_up = end_height.is_none();
let mut end_height = match end_height {
// always attempt to use whatever the user has provided
Some(explicit) => explicit,
None => {
@@ -234,32 +242,62 @@ impl NyxdScraper {
}
};
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
let mut last_processed = starting_height;
let range = (starting_height..=end_height).collect::<Vec<_>>();
while last_processed < current_height {
info!(
starting_height = starting_height,
end_height = end_height,
"attempting to process block range"
);
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
let range = (starting_height..=end_height).collect::<Vec<_>>();
// the most likely bottleneck here are going to be the chain queries,
// so batch multiple requests
for batch in range.chunks(4) {
let batch_result = join_all(
batch
.iter()
.map(|height| self.rpc_client.get_basic_block_details(*height)),
)
.await;
for result in batch_result {
match result {
Ok(block) => block_processor.process_block(block.into()).await?,
Err(err) => {
error!("failed to retrieve the block: {err}. stopping...");
return Err(err);
}
}
}
}
// if we don't need to catch up, return early
if !must_catch_up {
return Ok(());
}
// check if we have caught up to the current block height
last_processed = end_height;
current_height = self.rpc_client.current_block_height().await? as u32;
info!(
last_processed = last_processed,
current_height = current_height,
"🏃 still need to catch up..."
);
starting_height = last_processed + 1;
end_height = current_height;
}
if must_catch_up {
info!(
last_processed = last_processed,
current_height = current_height,
"✅ block processing has caught up!"
);
}
Ok(())
@@ -237,6 +237,58 @@ impl StorageManager {
Ok(-1)
}
}
#[allow(dead_code)]
pub async fn get_transactions_after_height(
&self,
min_height: i64,
message_type: Option<&str>,
) -> Result<Vec<TransactionWithBlock>, sqlx::Error> {
match message_type {
Some(msg_type) => {
sqlx::query_as!(
TransactionWithBlock,
r#"
SELECT t.hash, t.height, t.memo, t.raw_log
FROM message m
JOIN "transaction" t ON m.transaction_hash = t.hash
JOIN block b ON t.height = b.height
WHERE t.height > ?
AND m.type = ?
ORDER BY t.height ASC
"#,
min_height,
msg_type
)
.fetch_all(&self.connection_pool)
.await
}
None => {
sqlx::query_as!(
TransactionWithBlock,
r#"
SELECT t.hash, t.height, t.memo, t.raw_log
FROM message m
JOIN "transaction" t ON m.transaction_hash = t.hash
JOIN block b ON t.height = b.height
WHERE t.height > ?
ORDER BY t.height ASC
"#,
min_height
)
.fetch_all(&self.connection_pool)
.await
}
}
}
}
#[derive(Debug, sqlx::FromRow)]
pub struct TransactionWithBlock {
pub hash: String,
pub height: i64,
pub memo: Option<String>,
pub raw_log: Option<String>,
}
// make those generic over executor so that they could be performed over connection pool and a tx
+12
View File
@@ -13,6 +13,7 @@ use crate::{
models::{CommitSignature, Validator},
},
};
use manager::TransactionWithBlock;
use sqlx::{types::time::OffsetDateTime, ConnectOptions, Sqlite, Transaction};
use std::{fmt::Debug, path::Path};
use tendermint::{
@@ -207,6 +208,17 @@ impl ScraperStorage {
pub async fn get_pruned_height(&self) -> Result<i64, ScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
pub async fn get_transactions_after_height(
&self,
min_height: i64,
message_type: Option<&str>,
) -> Result<Vec<TransactionWithBlock>, ScraperError> {
Ok(self
.manager
.get_transactions_after_height(min_height, message_type)
.await?)
}
}
pub async fn persist_block(
@@ -1,16 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO responses\n (joke_id, joke, date_created)\n VALUES\n ($1, $2, $3)\n ON CONFLICT(joke_id) DO UPDATE SET\n joke=excluded.joke,\n date_created=excluded.date_created;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Varchar",
"Text",
"Int4"
]
},
"nullable": []
},
"hash": "249faa11b88b749f50342bb5c9cc41d20896db543eed74a6f320c041bcbb723d"
}
@@ -1,34 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "joke_id",
"type_info": "Varchar"
},
{
"ordinal": 1,
"name": "joke",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "date_created",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "aff7fbd06728004d2f2226d20c32f1482df00de2dc1d2b4debbb2e12553d997b"
}
@@ -1,32 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT joke_id, joke, date_created FROM responses",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "joke_id",
"type_info": "Varchar"
},
{
"ordinal": 1,
"name": "joke",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "date_created",
"type_info": "Int4"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false
]
},
"hash": "e53f479f8cead3dc8aa1875e5d450ad69686cf6a109e37d6c3f0623c3e9f91d0"
}
+6 -2
View File
@@ -24,9 +24,13 @@ nym-task = { path = "../common/task" }
nym-node-requests = { path = "../nym-node/nym-node-requests", features = [
"openapi",
] }
nyxd-scraper = {path = "../common/nyxd-scraper"}
reqwest = {workspace= true, features = ["rustls-tls"]}
rocket = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "time"] }
time = {version = "0.3.36"}
tokio = { workspace = true, features = ["process", "rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
@@ -40,4 +44,4 @@ utoipauto = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] }
-27
View File
@@ -1,27 +0,0 @@
FROM rust:latest AS builder
COPY ./ /usr/src/nym
WORKDIR /usr/src/nym/nym-data-observatory
RUN cargo build --release
#-------------------------------------------------------------------
# The following environment variables are required at runtime:
#
# NYM_DATA_OBSERVATORY_CONNECTION_URL
#
# And optionally:
#
# NYM_DATA_OBSERVATORY_HTTP_PORT
#
# see https://github.com/nymtech/nym/blob/develop/nym-data-observatory/src/main.rs for details
#-------------------------------------------------------------------
FROM ubuntu:24.04
RUN apt update && apt install -yy curl ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/target/release/nym-data-observatory ./
ENTRYPOINT [ "/nym/nym-data-observatory" ]
+29 -36
View File
@@ -1,58 +1,51 @@
use anyhow::Result;
use sqlx::{Connection, PgConnection};
use sqlx::{sqlite::SqliteConnectOptions, Connection, SqliteConnection};
use std::io::Write;
use std::{collections::HashMap, fs::File};
use std::{collections::HashMap, fs::File, path::PathBuf, str::FromStr};
const POSTGRES_USER: &str = "nym";
const POSTGRES_PASSWORD: &str = "password123";
const POSTGRES_DB: &str = "data_obs_db";
/// if schema changes, rerun `cargo sqlx prepare` with a running DB
/// https://github.com/launchbadge/sqlx/blob/main/sqlx-cli/README.md#enable-building-in-offline-mode-with-query
#[tokio::main]
async fn main() -> Result<()> {
let db_url =
format!("postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:5432/{POSTGRES_DB}");
let db_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("data_observatory.sqlite");
export_db_variables(&db_url)?;
// if a live DB is reachable, use that
if PgConnection::connect(&db_url).await.is_ok() {
println!("cargo::rustc-env=SQLX_OFFLINE=false");
run_migrations(&db_url).await?;
} else {
// by default, run in offline mode
println!("cargo::rustc-env=SQLX_OFFLINE=true");
// Create the database directory if it doesn't exist
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
}
rerun_if_changed();
let db_url = format!("sqlite:{}", db_path.display());
// Ensure database file is created with proper permissions
let connect_options = SqliteConnectOptions::from_str(&db_url)?
.create_if_missing(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.foreign_keys(true);
// Create initial connection to ensure database exists
let mut conn = SqliteConnection::connect_with(&connect_options).await?;
export_db_variables(&db_url)?;
println!("cargo:rustc-env=SQLX_OFFLINE=false");
// Run migrations after ensuring database exists
sqlx::migrate!("./migrations").run(&mut conn).await?;
// Add rerun-if-changed directives
println!("cargo:rerun-if-changed=migrations");
println!("cargo:rerun-if-changed=build.rs");
println!("cargo:rerun-if-changed=src");
Ok(())
}
fn export_db_variables(db_url: &str) -> Result<()> {
let mut map = HashMap::new();
map.insert("POSTGRES_USER", POSTGRES_USER);
map.insert("POSTGRES_PASSWORD", POSTGRES_PASSWORD);
map.insert("POSTGRES_DB", POSTGRES_DB);
map.insert("DATABASE_URL", db_url);
let mut file = File::create(".env")?;
for (var, value) in map.iter() {
println!("cargo::rustc-env={}={}", var, value);
writeln!(file, "{}={}", var, value).expect("Failed to write to dotenv file");
println!("cargo:rustc-env={}={}", var, value);
writeln!(file, "{}={}", var, value)?;
}
Ok(())
}
async fn run_migrations(db_url: &str) -> Result<()> {
let mut conn = PgConnection::connect(db_url).await?;
sqlx::migrate!("./migrations").run(&mut conn).await?;
Ok(())
}
fn rerun_if_changed() {
println!("cargo::rerun-if-changed=migrations");
println!("cargo::rerun-if-changed=src/db/queries");
}
@@ -0,0 +1,7 @@
CREATE TABLE price_history (
timestamp INTEGER PRIMARY KEY,
chf REAL NOT NULL,
usd REAL NOT NULL,
eur REAL NOT NULL,
btc REAL NOT NULL
);
@@ -0,0 +1,10 @@
CREATE TABLE payments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_hash TEXT NOT NULL UNIQUE,
sender_address TEXT NOT NULL,
receiver_address TEXT NOT NULL,
amount REAL NOT NULL,
timestamp INTEGER NOT NULL,
height INTEGER NOT NULL,
memo TEXT
);
-13
View File
@@ -1,13 +0,0 @@
#!/bin/bash
# .env is generated in build.rs
source .env
# Launching a container in such a way that it's destroyed after you detach from the terminal:
docker compose up
# docker exec -it nym-data-observatory-pg /bin/bash
# psql -U youruser -d yourdb
echo "Tearing down containers to have a clean slate"
docker compose down -v
@@ -1,61 +0,0 @@
use core::str;
use serde::Deserialize;
use tokio::process::Command;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use crate::db::{self, DbPool};
const REFRESH_DELAY: Duration = Duration::from_secs(15);
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
pub(crate) async fn spawn_in_background(db_pool: DbPool) -> JoinHandle<()> {
loop {
tracing::info!("Running in a loop 🏃");
if let Err(e) = some_network_action(&db_pool).await {
tracing::error!(
"❌ Run failed: {e}, retrying in {}s...",
FAILURE_RETRY_DELAY.as_secs()
);
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!(
"✅ Run successful, sleeping for {}s...",
REFRESH_DELAY.as_secs()
);
tokio::time::sleep(REFRESH_DELAY).await;
}
}
}
#[derive(Deserialize, Debug)]
pub(crate) struct Response {
#[serde(rename(deserialize = "id"))]
pub(crate) joke_id: String,
pub(crate) joke: String,
#[serde(rename(deserialize = "status"))]
pub(crate) _status: u16,
}
async fn some_network_action(pool: &DbPool) -> anyhow::Result<()> {
// for demonstration purposes only. You should use reqwest if you need it
let output = Command::new("curl")
.arg("-H")
.arg("Accept: application/json")
.arg("https://icanhazdadjoke.com/")
.output()
.await?;
if !output.status.success() {
anyhow::bail!("Curl command failed with status: {}", output.status);
}
let response_str = str::from_utf8(&output.stdout)?;
let joke_response: Response = serde_json::from_str(response_str)?;
tracing::info!("{:?}", joke_response.joke);
db::queries::insert_joke(pool, joke_response.into()).await?;
Ok(())
}
@@ -0,0 +1,26 @@
use nyxd_scraper::{storage::ScraperStorage, Config, NyxdScraper, PruningOptions};
pub(crate) async fn run_chain_scraper() -> anyhow::Result<ScraperStorage> {
let websocket_url =
std::env::var("NYXD_WEBSOCKET_URL").expect("NYXD_WEBSOCKET_URL not defined");
let rpc_url = std::env::var("NYXD_RPC_URL").expect("NYXD_RPC_URL not defined");
let websocket_url = reqwest::Url::parse(&websocket_url)?;
let rpc_url = reqwest::Url::parse(&rpc_url)?;
let start_block_height = std::env::var("NYXD_SCRAPER_START_HEIGHT")
.ok()
.and_then(|value| value.parse::<u32>().ok());
let scraper = NyxdScraper::builder(Config {
websocket_url,
rpc_url,
database_path: "chain_history.sqlite".into(),
pruning_options: PruningOptions::nothing(),
store_precommits: false,
});
let instance = scraper.build_and_start(start_block_height).await?;
Ok(instance.storage)
}
+11 -5
View File
@@ -1,13 +1,16 @@
use anyhow::{anyhow, Result};
use sqlx::{migrate::Migrator, postgres::PgConnectOptions, ConnectOptions, PgPool};
use sqlx::{migrate::Migrator, sqlite::SqliteConnectOptions, SqlitePool};
use std::str::FromStr;
pub(crate) mod models;
pub(crate) mod queries;
pub(crate) mod queries {
pub mod payments;
pub mod price;
}
static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
pub(crate) type DbPool = PgPool;
pub(crate) type DbPool = SqlitePool;
pub(crate) struct Storage {
pool: DbPool,
@@ -16,13 +19,16 @@ pub(crate) struct Storage {
impl Storage {
pub async fn init(connection_url: String) -> Result<Self> {
let connect_options =
PgConnectOptions::from_str(&connection_url)?.disable_statement_logging();
SqliteConnectOptions::from_str(&connection_url)?.create_if_missing(true);
let pool = DbPool::connect_with(connect_options)
.await
.map_err(|err| anyhow!("Failed to connect to {}: {}", &connection_url, err))?;
MIGRATOR.run(&pool).await?;
MIGRATOR
.run(&pool)
.await
.map_err(|err| anyhow!("Failed to run migrations: {}", err))?;
Ok(Storage { pool })
}
+33 -14
View File
@@ -1,22 +1,41 @@
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use crate::background_task::Response;
#[derive(Clone, Deserialize, Debug)]
pub(crate) struct CurrencyPrices {
pub(crate) chf: f32,
pub(crate) usd: f32,
pub(crate) eur: f32,
pub(crate) btc: f32,
}
// Struct to hold Coingecko response
#[derive(Clone, Deserialize, Debug, ToSchema)]
pub(crate) struct CoingeckoPriceResponse {
pub(crate) nym: CurrencyPrices,
}
#[derive(Clone, Deserialize, Debug, ToSchema)]
pub(crate) struct PriceRecord {
pub(crate) timestamp: i64,
pub(crate) nym: CurrencyPrices,
}
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct JokeDto {
pub(crate) joke_id: String,
pub(crate) joke: String,
pub(crate) date_created: i32,
pub(crate) struct PriceHistory {
pub(crate) timestamp: i64,
pub(crate) chf: f32,
pub(crate) usd: f32,
pub(crate) eur: f32,
pub(crate) btc: f32,
}
impl From<Response> for JokeDto {
fn from(value: Response) -> Self {
Self {
joke_id: value.joke_id,
joke: value.joke,
// casting not smart, can implicitly panic, don't do this in prod
date_created: chrono::offset::Utc::now().timestamp() as i32,
}
}
#[derive(Serialize, Deserialize, Debug, ToSchema)]
pub(crate) struct PaymentRecord {
pub(crate) transaction_hash: String,
pub(crate) sender_address: String,
pub(crate) receiver_address: String,
pub(crate) amount: f64,
pub(crate) timestamp: i64,
pub(crate) height: i64,
}
@@ -1,39 +0,0 @@
use crate::db::{models::JokeDto, DbPool};
pub(crate) async fn insert_joke(pool: &DbPool, joke: JokeDto) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
sqlx::query!(
"INSERT INTO responses
(joke_id, joke, date_created)
VALUES
($1, $2, $3)
ON CONFLICT(joke_id) DO UPDATE SET
joke=excluded.joke,
date_created=excluded.date_created;",
joke.joke_id,
joke.joke,
joke.date_created as i32,
)
.execute(&mut *conn)
.await?;
Ok(())
}
pub(crate) async fn select_joke_by_id(pool: &DbPool, joke_id: &str) -> anyhow::Result<JokeDto> {
sqlx::query_as!(
JokeDto,
"SELECT joke_id, joke, date_created FROM responses WHERE joke_id = $1",
joke_id
)
.fetch_one(pool)
.await
.map_err(anyhow::Error::from)
}
pub(crate) async fn select_all(pool: &DbPool) -> anyhow::Result<Vec<JokeDto>> {
sqlx::query_as!(JokeDto, "SELECT joke_id, joke, date_created FROM responses",)
.fetch_all(pool)
.await
.map_err(anyhow::Error::from)
}
+4 -3
View File
@@ -1,5 +1,6 @@
// group queries in files by theme
mod joke;
mod payments;
mod price;
// re-exporting allows us to access all queries via `queries::bla``
pub(crate) use joke::{insert_joke, select_all, select_joke_by_id};
pub(crate) use payments::{get_last_checked_height, insert_payment};
pub(crate) use price::{get_latest_price, insert_nym_prices};
@@ -0,0 +1,41 @@
use crate::db::DbPool;
use anyhow::Result;
pub async fn get_last_checked_height(pool: &DbPool) -> Result<i64> {
let result = sqlx::query_scalar!("SELECT MAX(height) FROM payments")
.fetch_one(pool)
.await?;
Ok(result.unwrap_or(0))
}
pub async fn insert_payment(
pool: &DbPool,
transaction_hash: String,
sender_address: String,
receiver_address: String,
amount: f64,
height: i64,
memo: Option<String>,
) -> Result<()> {
let timestamp = chrono::Utc::now().timestamp();
sqlx::query!(
r#"
INSERT INTO payments (
transaction_hash, sender_address, receiver_address,
amount, height, timestamp, memo
) VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
transaction_hash,
sender_address,
receiver_address,
amount,
height,
timestamp,
memo,
)
.execute(pool)
.await?;
Ok(())
}
@@ -0,0 +1,46 @@
use crate::db::models::{PriceHistory, PriceRecord};
use crate::db::DbPool;
pub(crate) async fn insert_nym_prices(
pool: &DbPool,
price_data: PriceRecord,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
let timestamp = price_data.timestamp;
sqlx::query!(
"INSERT INTO price_history
(timestamp, chf, usd, eur, btc)
VALUES
($1, $2, $3, $4, $5)
ON CONFLICT(timestamp) DO UPDATE SET
chf=excluded.chf,
usd=excluded.usd,
eur=excluded.eur,
btc=excluded.btc;",
timestamp,
price_data.nym.chf,
price_data.nym.usd,
price_data.nym.eur,
price_data.nym.btc,
)
.execute(&mut *conn)
.await?;
Ok(())
}
pub(crate) async fn get_latest_price(pool: &DbPool) -> anyhow::Result<PriceHistory> {
let result = sqlx::query!(
"SELECT timestamp, chf, usd, eur, btc FROM price_history ORDER BY timestamp DESC LIMIT 1;"
)
.fetch_one(pool)
.await?;
Ok(PriceHistory {
timestamp: result.timestamp,
chf: result.chf as f32,
usd: result.usd as f32,
eur: result.eur as f32,
btc: result.btc as f32,
})
}
@@ -1,78 +0,0 @@
use axum::{
extract::{Path, State},
Json, Router,
};
use serde::Deserialize;
use utoipa::IntoParams;
use crate::{
db::{
models::JokeDto,
queries::{self, select_joke_by_id},
},
http::{
error::{Error, HttpResult},
state::AppState,
},
};
pub(crate) fn routes() -> Router<AppState> {
Router::new()
.route("/", axum::routing::get(jokes))
.route("/:joke_id", axum::routing::get(joke_by_id))
.route("/fetch_another", axum::routing::get(fetch_another))
}
#[utoipa::path(
tag = "Dad Jokes",
get,
path = "/v1/jokes",
responses(
(status = 200, body = Vec<JokeDto>)
)
)]
async fn jokes(State(state): State<AppState>) -> HttpResult<Json<Vec<JokeDto>>> {
queries::select_all(state.db_pool())
.await
.map(Json::from)
.map_err(|_| Error::internal())
}
#[derive(Deserialize, IntoParams)]
#[into_params(parameter_in = Path)]
struct JokeIdParam {
joke_id: String,
}
#[utoipa::path(
tag = "Dad Jokes",
get,
params(
JokeIdParam
),
path = "/v1/jokes/{joke_id}",
responses(
(status = 200, body = JokeDto)
)
)]
async fn joke_by_id(
Path(JokeIdParam { joke_id }): Path<JokeIdParam>,
State(state): State<AppState>,
) -> HttpResult<Json<JokeDto>> {
select_joke_by_id(state.db_pool(), &joke_id)
.await
.map(Json::from)
.map_err(|_| Error::not_found(joke_id))
}
#[utoipa::path(
tag = "Dad Jokes",
get,
path = "/v1/jokes/fetch_another",
responses(
(status = 200, body = String)
)
)]
async fn fetch_another(State(_state): State<AppState>) -> HttpResult<Json<String>> {
Ok(Json(String::from("Done boss, check the DB")))
}
+4 -3
View File
@@ -7,8 +7,8 @@ use utoipa_swagger_ui::SwaggerUi;
use crate::http::{api_docs, server::HttpServer, state::AppState};
pub(crate) mod jokes;
pub(crate) mod mixnodes;
pub(crate) mod price;
pub(crate) struct RouterBuilder {
unfinished_router: Router<AppState>,
@@ -28,8 +28,9 @@ impl RouterBuilder {
.nest(
"/v1",
Router::new()
.nest("/jokes", jokes::routes())
.nest("/mixnodes", mixnodes::routes()),
//.nest("/jokes", jokes::routes())
.nest("/mixnodes", mixnodes::routes())
.nest("/price", price::routes()),
);
Self {
@@ -0,0 +1,27 @@
use crate::db::models::PriceHistory;
use crate::db::queries::price::get_latest_price;
use crate::http::error::Error;
use crate::http::error::HttpResult;
use crate::http::state::AppState;
use axum::{extract::State, Json, Router};
pub(crate) fn routes() -> Router<AppState> {
Router::new().route("/", axum::routing::get(price))
}
#[utoipa::path(
tag = "Nym Price",
get,
path = "/v1/price",
responses(
(status = 200, body = String)
)
)]
/// Fetch the latest price cached by the data observatory
async fn price(State(state): State<AppState>) -> HttpResult<Json<PriceHistory>> {
get_latest_price(state.db_pool())
.await
.map(Json::from)
.map_err(|_| Error::internal())
}
+5 -1
View File
@@ -6,5 +6,9 @@ use utoipauto::utoipauto;
// https://github.com/ProbablyClem/utoipauto/issues/13#issuecomment-1974911829
#[utoipauto(paths = "./nym-data-observatory/src")]
#[derive(OpenApi)]
#[openapi(info(title = "Nym API"), tags(), components(schemas()))]
#[openapi(
info(title = "Nym Data Observatory API"),
tags(),
components(schemas())
)]
pub(super) struct ApiDoc;
-7
View File
@@ -6,13 +6,6 @@ pub(crate) struct Error {
}
impl Error {
pub(crate) fn not_found(message: String) -> Self {
Self {
message,
status: axum::http::StatusCode::NOT_FOUND,
}
}
pub(crate) fn internal() -> Self {
Self {
message: String::from("Internal server error"),
+45 -33
View File
@@ -1,11 +1,15 @@
use chain_scraper::run_chain_scraper;
use clap::Parser;
use nym_network_defaults::setup_env;
use nym_task::signal::wait_for_signal;
use tokio::join;
mod background_task;
mod chain_scraper;
mod db;
mod http;
mod logging;
mod payment_listener;
mod price_scraper;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
@@ -18,25 +22,13 @@ struct Args {
#[arg(short, long, default_value = None, env = "NYM_DATA_OBSERVATORY_ENV_FILE")]
env_file: Option<String>,
/// DB connection username
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_USERNAME")]
connection_username: String,
/// DB connection password
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PASSWORD")]
connection_password: String,
/// DB connection host
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_HOST")]
connection_host: String,
/// DB connection port
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_PORT")]
connection_port: String,
/// DB connection database name
#[arg(long, default_value = None, env = "NYM_DATA_OBSERVATORY_CONNECTION_DB")]
connection_db: String,
/// SQLite database file path
#[arg(
long,
default_value = "data_observatory.sqlite",
env = "NYM_DATA_OBSERVATORY_DB_PATH"
)]
db_path: String,
}
#[tokio::main]
@@ -44,30 +36,50 @@ async fn main() -> anyhow::Result<()> {
logging::setup_tracing_logger();
let args = Args::parse();
setup_env(args.env_file); // Defaults to mainnet if empty
let connection_url = format!(
"postgres://{}:{}@{}:{}/{}",
args.connection_username,
args.connection_password,
args.connection_host,
args.connection_port,
args.connection_db
);
let db_path = args.db_path;
// Ensure parent directory exists
if let Some(parent) = std::path::Path::new(&db_path).parent() {
std::fs::create_dir_all(parent)?;
}
let connection_url = format!("sqlite://{}?mode=rwc", db_path);
let storage = db::Storage::init(connection_url).await?;
let db_pool = storage.pool_owned().await;
tokio::spawn(async move {
background_task::spawn_in_background(db_pool).await;
tracing::info!("Started task");
let observatory_pool = storage.pool_owned().await;
// Spawn the chain scraper and get its storage
// Spawn the payment listener task
let payment_listener_handle = tokio::spawn({
let obs_pool = observatory_pool.clone();
let chain_storage = run_chain_scraper().await?;
async move {
if let Err(e) = payment_listener::run_payment_listener(obs_pool, chain_storage).await {
tracing::error!("Payment listener error: {}", e);
}
Ok::<_, anyhow::Error>(())
}
});
// Clone pool for each task that needs it
//let background_pool = db_pool.clone();
let price_scraper_handle = tokio::spawn(async move {
price_scraper::run_price_scraper(&observatory_pool).await;
});
let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, args.http_port)
.await
.expect("Failed to start server");
tracing::info!("Started HTTP server on port {}", args.http_port);
// Wait for the short-lived tasks to complete
let _ = join!(price_scraper_handle, payment_listener_handle);
// Wait for a signal to terminate the long-running task
wait_for_signal().await;
if let Err(err) = shutdown_handles.shutdown().await {
@@ -0,0 +1,114 @@
use crate::db::queries;
use nyxd_scraper::storage::ScraperStorage;
use reqwest::Client;
use serde_json::{json, Value};
use sqlx::SqlitePool;
use std::env;
use tokio::time::{self, Duration};
#[derive(Debug)]
struct TransferEvent {
recipient: String,
sender: String,
amount: String,
}
pub(crate) async fn run_payment_listener(
observatory_pool: SqlitePool,
chain_storage: ScraperStorage,
) -> anyhow::Result<()> {
let payment_receive_address = env::var("PAYMENT_RECEIVE_ADDRESS").map_err(|_| {
anyhow::anyhow!("Environment variable `PAYMENT_RECEIVE_ADDRESS` not defined")
})?;
let webhook_url = env::var("WEBHOOK_URL")
.map_err(|_| anyhow::anyhow!("Environment variable `WEBHOOK_URL` not defined"))?;
let client = Client::new();
loop {
let last_checked_height =
queries::payments::get_last_checked_height(&observatory_pool).await?;
tracing::info!("Last checked height: {}", last_checked_height);
let transactions = chain_storage
.get_transactions_after_height(
last_checked_height,
Some("/cosmos.bank.v1beta1.MsgSend"),
)
.await?;
for tx in transactions {
tracing::info!("Processing transaction: {}", tx.hash);
if let Some(raw_log) = tx.raw_log.as_deref() {
if let Some(transfer) = parse_transfer_from_raw_log(raw_log)? {
if transfer.recipient == payment_receive_address {
let amount: f64 = parse_unym_amount(&transfer.amount)?;
queries::payments::insert_payment(
&observatory_pool,
tx.hash.clone(),
transfer.sender.clone(),
transfer.recipient.clone(),
amount,
tx.height,
tx.memo.clone(),
)
.await?;
let webhook_data = json!({
"transaction_hash": tx.hash,
"sender_address": transfer.sender,
"receiver_address": transfer.recipient,
"amount": amount,
"height": tx.height,
"memo": tx.memo,
});
let _ = client.post(&webhook_url).json(&webhook_data).send().await;
}
}
}
}
time::sleep(Duration::from_secs(10)).await;
}
}
fn parse_transfer_from_raw_log(raw_log: &str) -> anyhow::Result<Option<TransferEvent>> {
let log_value: Value = serde_json::from_str(raw_log)?;
if let Some(events) = log_value[0]["events"].as_array() {
if let Some(transfer_event) = events.iter().find(|e| e["type"] == "transfer") {
if let Some(attrs) = transfer_event["attributes"].as_array() {
let mut transfer = TransferEvent {
recipient: String::new(),
sender: String::new(),
amount: String::new(),
};
for attr in attrs {
match attr["key"].as_str() {
Some("recipient") => {
transfer.recipient = attr["value"].as_str().unwrap_or("").to_string()
}
Some("sender") => {
transfer.sender = attr["value"].as_str().unwrap_or("").to_string()
}
Some("amount") => {
transfer.amount = attr["value"].as_str().unwrap_or("").to_string()
}
_ => continue,
}
}
return Ok(Some(transfer));
}
}
}
Ok(None)
}
fn parse_unym_amount(amount: &str) -> anyhow::Result<f64> {
let amount = amount.trim_end_matches("unym");
let parsed: f64 = amount.parse()?;
Ok(parsed / 1_000_000.0)
}
@@ -0,0 +1,55 @@
use crate::db::{
models::{CoingeckoPriceResponse, PriceRecord},
queries::price::insert_nym_prices,
};
use core::str;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use crate::db::DbPool;
const REFRESH_DELAY: Duration = Duration::from_secs(300);
const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2);
const COINGECKO_API_URL: &str =
"https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,btc";
pub(crate) async fn run_price_scraper(db_pool: &DbPool) -> JoinHandle<()> {
loop {
tracing::info!("Running in a loop 🏃");
if let Err(e) = get_coingecko_prices(db_pool).await {
tracing::error!("❌ Failed to get CoinGecko prices: {e}");
tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs());
tokio::time::sleep(FAILURE_RETRY_DELAY).await;
} else {
tracing::info!("✅ Successfully fetched CoinGecko prices");
tokio::time::sleep(REFRESH_DELAY).await;
}
}
}
async fn get_coingecko_prices(pool: &DbPool) -> anyhow::Result<()> {
tracing::info!("💰 Fetching CoinGecko prices from {}", COINGECKO_API_URL);
let response = reqwest::get(COINGECKO_API_URL)
.await?
.json::<CoingeckoPriceResponse>()
.await;
tracing::info!("Got response {:?}", response);
match response {
Ok(resp) => {
let price_record = PriceRecord {
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
nym: resp.nym,
};
insert_nym_prices(pool, price_record).await?;
}
Err(e) => {
//tracing::info!("💰 CoinGecko price response: {:?}", response);
tracing::error!("Error sending request: {}", e);
}
}
Ok(())
}