Compare commits

...

13 Commits

Author SHA1 Message Date
Mark Sinclair e4ad1e2e1b wip 2025-07-06 17:11:10 +01:00
Mark Sinclair 5b39cb9d8c wip 2025-07-06 13:23:53 +01:00
Mark Sinclair c4fec04fb0 wip 2025-07-06 13:19:53 +01:00
Mark Sinclair a6f156df47 wip 2025-07-06 13:13:00 +01:00
Mark Sinclair e8f76aa7ee wip 2025-07-06 13:10:58 +01:00
Jędrzej Stuczyński 83c84bfd2d use postgres in chain scraper 2025-07-04 18:15:24 +01:00
Jędrzej Stuczyński 71090c85c2 initial postgres support - missing some proto -> json parsing 2025-07-04 18:04:37 +01:00
Jędrzej Stuczyński 976961471b psql scaffolding 2025-07-04 12:45:06 +01:00
Jędrzej Stuczyński 655fd421a6 using sqlite instance for rewarder and chain watcher 2025-07-04 12:25:08 +01:00
Jędrzej Stuczyński 81eaf7b1cc implemented traits for sqlite instance 2025-07-04 12:09:45 +01:00
Jędrzej Stuczyński 027bd85200 changed error types to make modules dyn compatible 2025-07-04 11:23:25 +01:00
Jędrzej Stuczyński 08cff312af wip: made storage mostly generic minus modules 2025-07-04 11:08:05 +01:00
Jędrzej Stuczyński 2920e6ff01 rename nyxd-scraper to sqlite 2025-07-03 16:33:32 +01:00
185 changed files with 6343 additions and 558 deletions
@@ -3,7 +3,7 @@ on:
workflow_dispatch:
env:
WORKING_DIRECTORY: "nyx-chain-watcher"
WORKING_DIRECTORY: "nyx-chain-watcher/sqlite"
CONTAINER_NAME: "nyx-chain-watcher"
jobs:
Generated
+62 -4
View File
@@ -7854,7 +7854,7 @@ dependencies = [
"nym-task 0.1.0",
"nym-ticketbooks-merkle 0.1.0",
"nym-validator-client 0.1.0",
"nyxd-scraper",
"nyxd-scraper-sqlite",
"rand 0.8.5",
"rand_chacha 0.3.1",
"serde",
@@ -8061,7 +8061,7 @@ dependencies = [
"nym-network-defaults 0.1.0",
"nym-task 0.1.0",
"nym-validator-client 0.1.0",
"nyxd-scraper",
"nyxd-scraper-sqlite",
"reqwest 0.12.15",
"schemars",
"serde",
@@ -8079,7 +8079,54 @@ dependencies = [
]
[[package]]
name = "nyxd-scraper"
name = "nyx-chain-watcher-pgsql"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"axum 0.7.9",
"chrono",
"clap",
"nym-bin-common 0.6.0",
"nym-config 0.1.0",
"nym-network-defaults 0.1.0",
"nym-task 0.1.0",
"nym-validator-client 0.1.0",
"nyxd-scraper-psql",
"reqwest 0.12.15",
"schemars",
"serde",
"sqlx",
"thiserror 2.0.12",
"time",
"tokio",
"tokio-util",
"tower-http",
"tracing",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"utoipauto",
]
[[package]]
name = "nyxd-scraper-psql"
version = "0.1.0"
dependencies = [
"async-trait",
"base64 0.22.1",
"cosmrs",
"nyxd-scraper-shared",
"serde",
"serde_json",
"sqlx",
"thiserror 2.0.12",
"tokio",
"tracing",
]
[[package]]
name = "nyxd-scraper-shared"
version = "0.1.0"
dependencies = [
"async-trait",
@@ -8090,7 +8137,6 @@ dependencies = [
"humantime",
"serde",
"sha2 0.10.9",
"sqlx",
"tendermint",
"tendermint-rpc",
"thiserror 2.0.12",
@@ -8102,6 +8148,18 @@ dependencies = [
"url",
]
[[package]]
name = "nyxd-scraper-sqlite"
version = "0.1.0"
dependencies = [
"async-trait",
"nyxd-scraper-shared",
"sqlx",
"thiserror 2.0.12",
"tokio",
"tracing",
]
[[package]]
name = "object"
version = "0.36.7"
+7 -3
View File
@@ -80,7 +80,9 @@ members = [
"common/nymsphinx/params",
"common/nymsphinx/routing",
"common/nymsphinx/types",
"common/nyxd-scraper",
"common/nyxd-scraper-sqlite",
"common/nyxd-scraper-psql",
"common/nyxd-scraper-shared",
"common/pemstore",
"common/serde-helpers",
"common/service-provider-requests-common",
@@ -118,7 +120,8 @@ members = [
"nym-outfox",
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"nyx-chain-watcher/pgsql",
"nyx-chain-watcher/sqlite",
"sdk/ffi/cpp",
"sdk/ffi/go",
"sdk/ffi/shared",
@@ -160,7 +163,8 @@ default-members = [
"nym-node-status-api/nym-node-status-api",
"nym-statistics-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"nyx-chain-watcher/sqlite",
"nyx-chain-watcher/pgsql",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO validator (consensus_address, consensus_pubkey)\n VALUES ($1, $2)\n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text"
]
},
"nullable": []
},
"hash": "0d3709efacf763b06bf14803bb803b5ee5b27879b0026bb0480b3f2722318a75"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM pre_commit WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "1c2fb0e9ffceca21ef8dbea19b116422b1f723d0a316314b50c43c8b29f8891d"
}
@@ -0,0 +1,26 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO transaction\n (hash, height, index, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)\n ON CONFLICT (hash) DO UPDATE\n SET height = excluded.height,\n index = excluded.index,\n success = excluded.success,\n messages = excluded.messages,\n memo = excluded.memo,\n signatures = excluded.signatures,\n signer_infos = excluded.signer_infos,\n fee = excluded.fee,\n gas_wanted = excluded.gas_wanted,\n gas_used = excluded.gas_used,\n raw_log = excluded.raw_log,\n logs = excluded.logs\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Int4",
"Bool",
"Json",
"Text",
"TextArray",
"Jsonb",
"Jsonb",
"Int8",
"Int8",
"Text",
"Jsonb"
]
},
"nullable": []
},
"hash": "1e344c1dff8b98eb0eb2e530e28f3cb2eed5b5d35391fd30a4d5f44f2e2178b7"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT height\n FROM block\n ORDER BY height ASC\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "height",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "2561fb016951ea4cd29e43fb9a4a93e944b0d44ed1f7c1036f306e34372da11c"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE metadata SET last_processed_height = GREATEST(last_processed_height, $1)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int4"
]
},
"nullable": []
},
"hash": "2679cdf11fa66c7920678cde860c57402119ec7c3aae731b0da831327301466f"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE pruning SET last_pruned_height = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "36ba5941aca6e7b604a10b8b0aba70635028f392fe794d6131827b083e1755e1"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT last_pruned_height FROM pruning\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "last_pruned_height",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "3bdf81a9db6075f6f77224c30553f419a849d4ec45af40b052a4cbf09b44f3ec"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM message WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "52c27143720ddfdfd0f5644b60f5b67fd9281ce1de0653efa53b9d9b93cf335d"
}
@@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO pre_commit (validator_address, height, timestamp, voting_power, proposer_priority)\n VALUES ($1, $2, $3, $4, $5)\n ON CONFLICT (validator_address, timestamp) DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Timestamp",
"Int8",
"Int8"
]
},
"nullable": []
},
"hash": "62e14613f5ffe692346a79086857a22f0444fbc679db1c06b651fb8b5538b278"
}
@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO block (height, hash, num_txs, total_gas, proposer_address, timestamp)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Int4",
"Int8",
"Text",
"Timestamp"
]
},
"nullable": []
},
"hash": "64a484fd46d8ec46797f944a4cced56b6e270ce186f0e49528865d1924343b78"
}
@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT height\n FROM block\n WHERE timestamp < $1\n ORDER BY timestamp DESC\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "height",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Timestamp"
]
},
"nullable": [
false
]
},
"hash": "7e82426f5dbcadf1631ba1a806e19cc462d04222fb20ad76de2a40f3f4f8fe15"
}
@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT height\n FROM block\n WHERE timestamp > $1\n ORDER BY timestamp\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "height",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Timestamp"
]
},
"nullable": [
false
]
},
"hash": "9455331f9be5a3be28e2bd399a36b2e2d6a9ad4b225c4c883aafc4e9f0428008"
}
@@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT COUNT(*) as count FROM pre_commit\n WHERE\n validator_address = $1\n AND height >= $2\n AND height <= $3\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "count",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Text",
"Int8",
"Int8"
]
},
"nullable": [
null
]
},
"hash": "bc7795e58ce71893c3f32a19db8e77b7bc0a1af315ffd42c3e68156d6e4ace70"
}
@@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT * FROM validator\n WHERE EXISTS (\n SELECT 1 FROM pre_commit\n WHERE height = $1\n AND pre_commit.validator_address = validator.consensus_address\n )\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "consensus_address",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "consensus_pubkey",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
false,
false
]
},
"hash": "be43d4873911deca784b7be0531ab7bd82ecd68041aa932a56c8ce09623251e4"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT last_processed_height FROM metadata\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "last_processed_height",
"type_info": "Int4"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "c88d07fecc3f33deaa6e93db1469ce71582635df47f52dcf3fd1df4e7be6b96d"
}
@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO message(transaction_hash, index, type, value, involved_accounts_addresses, height)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT (transaction_hash, index) DO UPDATE\n SET height = excluded.height,\n type = excluded.type,\n value = excluded.value,\n involved_accounts_addresses = excluded.involved_accounts_addresses\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Text",
"Json",
"TextArray",
"Int8"
]
},
"nullable": []
},
"hash": "cc0ae74082d7d8a89f2d3364676890bbf6150ab394c72783114340d4def5f9ef"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM block WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "cdba9b267f143c8a8c6c3d6ed713cf00236490b86779559d84740ec18bcfa3a9"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM transaction WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "d89558c37c51e8e6b1e6a9d5a2b13d0598fd856aa019a0cbbae12d7cafb4672f"
}
+33
View File
@@ -0,0 +1,33 @@
[package]
name = "nyxd-scraper-psql"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "macros", "migrate", "time"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
nyxd-scraper-shared = { path = "../nyxd-scraper-shared" }
# temp due to cosmrs redefinitions for serde
cosmrs = { workspace = true }
[build-dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "macros", "migrate"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[lints]
workspace = true
+102
View File
@@ -0,0 +1,102 @@
# Makefile for nyxd-scraper-psql database management
# --- Configuration ---
TEST_DATABASE_URL := postgres://testuser:testpass@localhost:5433/nyxd_scraper_test
# Docker compose service names
DB_SERVICE_NAME := postgres-test
DB_CONTAINER_NAME := nyxd_scraper_psql_test
# Default target
.PHONY: default
default: help
# --- Main Targets ---
.PHONY: prepare-pg
prepare-pg: test-db-up test-db-wait test-db-migrate test-db-prepare test-db-down ## Setup PostgreSQL and prepare SQLx offline cache
.PHONY: test-db
test-db: test-db-up test-db-wait test-db-migrate test-db-run test-db-down ## Run tests with PostgreSQL database
.PHONY: dev-db
dev-db: test-db-up test-db-wait test-db-migrate ## Start PostgreSQL for development (keeps running)
@echo "PostgreSQL is running on port 5433"
@echo "Connection string: $(TEST_DATABASE_URL)"
# --- Docker Compose Targets ---
.PHONY: test-db-up
test-db-up: ## Start the PostgreSQL test database in the background
@echo "Starting PostgreSQL test database..."
docker compose up -d $(DB_SERVICE_NAME)
.PHONY: test-db-wait
test-db-wait: ## Wait for the PostgreSQL database to be healthy
@echo "Waiting for PostgreSQL database..."
@while ! docker inspect --format='{{.State.Health.Status}}' $(DB_CONTAINER_NAME) 2>/dev/null | grep -q 'healthy'; do \
echo -n "."; \
sleep 1; \
done; \
echo " Database is healthy!"
.PHONY: test-db-down
test-db-down: ## Stop and remove the test database
@echo "Stopping PostgreSQL test database..."
docker compose down
# --- SQLx Targets ---
.PHONY: test-db-migrate
test-db-migrate: ## Run database migrations against PostgreSQL
@echo "Running PostgreSQL migrations..."
DATABASE_URL="$(TEST_DATABASE_URL)" sqlx migrate run --source sql_migrations
.PHONY: test-db-prepare
test-db-prepare: ## Run sqlx prepare for compile-time query verification
@echo "Running sqlx prepare for PostgreSQL..."
DATABASE_URL="$(TEST_DATABASE_URL)" cargo sqlx prepare
# --- Build and Test Targets ---
.PHONY: test-db-run
test-db-run: ## Run tests with PostgreSQL feature
@echo "Running tests with PostgreSQL..."
DATABASE_URL="$(TEST_DATABASE_URL)" cargo test --features pg --no-default-features
.PHONY: build-pg
build-pg: ## Build with PostgreSQL feature
@echo "Building with PostgreSQL feature..."
cargo build
.PHONY: check-pg
check-pg: ## Check code with PostgreSQL feature
@echo "Checking code with PostgreSQL feature..."
cargo check
.PHONY: clippy
clippy: clippy-pg
.PHONY: clippy-pg
clippy-pg: ## Run clippy with PostgreSQL feature
@echo "Running clippy with PostgreSQL feature..."
cargo clippy -- -D warnings
# --- Cleanup Targets ---
.PHONY: clean
clean: ## Clean build artifacts and SQLx cache
cargo clean
rm -rf .sqlx
.PHONY: clean-db
clean-db: test-db-down ## Stop database and clean volumes
docker volume rm -f nyxd_scraper_postgres_test_data 2>/dev/null || true
# --- Utility Targets ---
.PHONY: sqlx-cli
sqlx-cli: ## Install sqlx-cli if not already installed
@command -v sqlx >/dev/null 2>&1 || cargo install sqlx-cli --features postgres
.PHONY: psql
psql: ## Connect to the running PostgreSQL database with psql
@docker exec -it $(DB_CONTAINER_NAME) psql -U testuser -d nyxd_scraper_test
.PHONY: help
help: ## Show help for Makefile targets
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
+79
View File
@@ -0,0 +1,79 @@
## Quick Start with PostgreSQL
### 1. Install Prerequisites
```bash
# Install sqlx-cli if not already installed
make sqlx-cli
```
### 2. Prepare PostgreSQL for Development
```bash
# This will:
# - Start PostgreSQL in Docker
# - Run migrations
# - Generate SQLx offline query cache
# - Stop the database
make prepare-pg
```
### 3. Build with PostgreSQL
```bash
# Build with PostgreSQL feature
make build-pg
# Or manually:
cargo build
```
### 4. Run with PostgreSQL
```bash
# Start PostgreSQL for development (keeps running)
make dev-db
# In another terminal, run the application
DATABASE_URL=postgres://testuser:testpass@localhost:5433/nym_node_status_api_test \
cargo run
```
## Makefile Targets
```bash
make help # Show all available targets
make prepare-pg # Setup PostgreSQL and prepare SQLx cache
make dev-db # Start PostgreSQL for development
make test-db # Run tests with PostgreSQL
make build-pg # Build with PostgreSQL
make psql # Connect to running PostgreSQL
make clean # Clean build artifacts
make clean-db # Stop database and clean volumes
```
## Environment Variables
See `.env.example` for all configuration options. Key variable:
```bash
# For PostgreSQL:
DATABASE_URL=postgres://testuser:testpass@localhost:5433/nym_node_status_api_test
```
## Troubleshooting
### SQLx Offline Mode
If you see "no cached data for this query" errors:
1. Ensure PostgreSQL is running: `make dev-db`
2. Run: `make test-db-prepare`
### Connection Refused
If you see "Connection refused" errors:
1. Check Docker is running: `docker ps`
2. Check PostgreSQL container: `docker ps | grep nym_node_status_api_postgres_test`
3. Restart database: `make test-db-down && make dev-db`
+8
View File
@@ -0,0 +1,8 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
fn main() {
if let Ok(database_url) = std::env::var("DATABASE_URL") {
println!("cargo::rustc-env=DATABASE_URL={database_url}");
}
}
@@ -0,0 +1,21 @@
services:
postgres-test:
image: postgres:16-alpine
container_name: nyxd_scraper_psql_test
environment:
POSTGRES_DB: nyxd_scraper_test
POSTGRES_USER: testuser
POSTGRES_PASSWORD: testpass
ports:
- '5433:5432' # Map to 5433 to avoid conflicts with default PostgreSQL
healthcheck:
test: [ 'CMD-SHELL', 'pg_isready -U testuser -d nyxd_scraper_test' ]
interval: 5s
timeout: 5s
retries: 5
# Optional: Add volume for persistent data during development
# volumes:
# - nyxd_scraper_postgres_test_data:/var/lib/postgresql/data
# volumes:
# nyxd_scraper_postgres_test_data:
@@ -0,0 +1,127 @@
CREATE TABLE validator
(
consensus_address TEXT NOT NULL PRIMARY KEY, /* Validator consensus address */
consensus_pubkey TEXT NOT NULL UNIQUE /* Validator consensus public key */
);
CREATE TABLE pre_commit
(
validator_address TEXT NOT NULL REFERENCES validator (consensus_address),
height BIGINT NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
voting_power BIGINT NOT NULL,
proposer_priority BIGINT NOT NULL,
UNIQUE (validator_address, timestamp)
);
CREATE INDEX pre_commit_validator_address_index ON pre_commit (validator_address);
CREATE INDEX pre_commit_height_index ON pre_commit (height);
CREATE TABLE block
(
height BIGINT UNIQUE PRIMARY KEY,
hash TEXT NOT NULL UNIQUE,
num_txs INTEGER DEFAULT 0,
total_gas BIGINT DEFAULT 0,
proposer_address TEXT REFERENCES validator (consensus_address),
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
CREATE INDEX block_height_index ON block (height);
CREATE INDEX block_hash_index ON block (hash);
CREATE INDEX block_proposer_address_index ON block (proposer_address);
ALTER TABLE block
SET (
autovacuum_vacuum_scale_factor = 0,
autovacuum_analyze_scale_factor = 0,
autovacuum_vacuum_threshold = 10000,
autovacuum_analyze_threshold = 10000
);
CREATE TABLE transaction
(
hash TEXT NOT NULL,
height BIGINT NOT NULL REFERENCES block (height),
"index" INTEGER NOT NULL, -- <<<=== not present in original bdjuno table, but it's quite useful
success BOOLEAN NOT NULL,
/* Body */
messages JSON NOT NULL DEFAULT '[]'::JSON,
memo TEXT,
signatures TEXT[] NOT NULL,
/* AuthInfo */
signer_infos JSONB NOT NULL DEFAULT '[]'::JSONB,
fee JSONB NOT NULL DEFAULT '{}'::JSONB,
/* Tx response */
gas_wanted BIGINT DEFAULT 0,
gas_used BIGINT DEFAULT 0,
raw_log TEXT,
logs JSONB,
CONSTRAINT unique_tx UNIQUE (hash)
);
CREATE INDEX transaction_hash_index ON transaction (hash);
CREATE INDEX transaction_height_index ON transaction (height);
CREATE TABLE message_type
(
type TEXT NOT NULL UNIQUE,
module TEXT NOT NULL,
label TEXT NOT NULL,
height BIGINT NOT NULL
);
CREATE INDEX message_type_module_index ON message_type (module);
CREATE INDEX message_type_type_index ON message_type (type);
CREATE TABLE message
(
transaction_hash TEXT NOT NULL,
index BIGINT NOT NULL,
type TEXT NOT NULL REFERENCES message_type (type),
value JSON NOT NULL,
involved_accounts_addresses TEXT[] NOT NULL,
height BIGINT NOT NULL,
FOREIGN KEY (transaction_hash) REFERENCES transaction (hash),
CONSTRAINT unique_message_per_tx UNIQUE (transaction_hash, index)
);
CREATE INDEX message_transaction_hash_index ON message (transaction_hash);
CREATE INDEX message_type_index ON message (type);
CREATE INDEX message_involved_accounts_index ON message USING GIN (involved_accounts_addresses);
/**
* This function is used to find all the utils that involve any of the given addresses and have
* type that is one of the specified types.
*/
CREATE FUNCTION messages_by_address(
addresses TEXT[],
types TEXT[],
"limit" BIGINT = 100,
"offset" BIGINT = 0)
RETURNS SETOF message AS
$$
SELECT *
FROM message
WHERE (cardinality(types) = 0 OR type = ANY (types))
AND addresses && involved_accounts_addresses
ORDER BY height DESC
LIMIT "limit" OFFSET "offset"
$$ LANGUAGE sql STABLE;
CREATE FUNCTION messages_by_type(
types text[],
"limit" bigint DEFAULT 100,
"offset" bigint DEFAULT 0)
RETURNS SETOF message AS
$$
SELECT *
FROM message
WHERE (cardinality(types) = 0 OR type = ANY (types))
ORDER BY height DESC
LIMIT "limit" OFFSET "offset"
$$ LANGUAGE sql STABLE;
CREATE TABLE pruning
(
last_pruned_height BIGINT NOT NULL
);
+43
View File
@@ -0,0 +1,43 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nyxd_scraper_shared::helpers::MalformedDataError;
use nyxd_scraper_shared::storage::NyxdScraperStorageError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum PostgresScraperError {
#[error("experienced internal database error: {0}")]
InternalDatabaseError(#[from] sqlx::error::Error),
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("failed to begin storage tx: {source}")]
StorageTxBeginFailure {
#[source]
source: sqlx::error::Error,
},
#[error("failed to commit storage tx: {source}")]
StorageTxCommitFailure {
#[source]
source: sqlx::error::Error,
},
#[error(transparent)]
MalformedData(#[from] MalformedDataError),
// TOOD: add struct name
#[error("json serialisation failure: {source}")]
SerialisationFailure {
#[from]
source: serde_json::Error,
},
}
impl From<PostgresScraperError> for NyxdScraperStorageError {
fn from(err: PostgresScraperError) -> Self {
NyxdScraperStorageError::new(err)
}
}
+21
View File
@@ -0,0 +1,21 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::block_storage::PostgresScraperStorage;
use nyxd_scraper_shared::NyxdScraper;
pub use nyxd_scraper_shared::constants;
pub use nyxd_scraper_shared::error::ScraperError;
pub use nyxd_scraper_shared::{
BlockModule, MsgModule, NyxdScraperTransaction, ParsedTransactionResponse, PruningOptions,
PruningStrategy, StartingBlockOpts, TxModule,
};
pub use storage::models;
pub mod error;
pub mod storage;
pub type PostgresNyxdScraper = NyxdScraper<PostgresScraperStorage>;
// TODO: for now just use exactly the same config
pub use nyxd_scraper_shared::Config;
@@ -0,0 +1,235 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::PostgresScraperError;
use crate::models::{CommitSignature, Validator};
use crate::storage::manager::{
prune_blocks, prune_messages, prune_pre_commits, prune_transactions, update_last_pruned,
StorageManager,
};
use crate::storage::transaction::PostgresStorageTransaction;
use async_trait::async_trait;
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use nyxd_scraper_shared::storage::{NyxdScraperStorage, NyxdScraperStorageError};
use sqlx::types::time::{OffsetDateTime, PrimitiveDateTime};
use tokio::time::Instant;
use tracing::{debug, error, info, instrument};
#[derive(Clone)]
pub struct PostgresScraperStorage {
pub(crate) manager: StorageManager,
}
impl PostgresScraperStorage {
#[instrument]
pub async fn init(connection_string: &str) -> Result<Self, PostgresScraperError> {
debug!("initialising scraper database with '{connection_string}'",);
let connection_pool = match sqlx::PgPool::connect(connection_string).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./sql_migrations")
.run(&connection_pool)
.await
{
error!("Failed to initialize SQLx database: {err}");
return Err(err.into());
}
info!("Database migration finished!");
let manager = StorageManager { connection_pool };
manager.set_initial_metadata().await?;
let storage = PostgresScraperStorage { manager };
Ok(storage)
}
#[instrument(skip(self))]
pub async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), PostgresScraperError> {
let start = Instant::now();
let mut tx = self.begin_processing_tx().await?;
prune_messages(oldest_to_keep.into(), &mut **tx).await?;
prune_transactions(oldest_to_keep.into(), &mut **tx).await?;
prune_pre_commits(oldest_to_keep.into(), &mut **tx).await?;
prune_blocks(oldest_to_keep.into(), &mut **tx).await?;
update_last_pruned(current_height.into(), &mut **tx).await?;
let commit_start = Instant::now();
tx.0.commit()
.await
.map_err(|source| PostgresScraperError::StorageTxCommitFailure { source })?;
log_db_operation_time("committing pruning tx", commit_start);
log_db_operation_time("pruning storage", start);
Ok(())
}
#[instrument(skip_all)]
pub async fn begin_processing_tx(
&self,
) -> Result<PostgresStorageTransaction, PostgresScraperError> {
debug!("starting storage tx");
self.manager
.connection_pool
.begin()
.await
.map(PostgresStorageTransaction)
.map_err(|source| PostgresScraperError::StorageTxBeginFailure { source })
}
pub async fn lowest_block_height(&self) -> Result<Option<i64>, PostgresScraperError> {
Ok(self.manager.get_lowest_block().await?)
}
pub async fn get_first_block_height_after(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, PostgresScraperError> {
let time = PrimitiveDateTime::new(time.date(), time.time());
Ok(self.manager.get_first_block_height_after(time).await?)
}
pub async fn get_last_block_height_before(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, PostgresScraperError> {
let time = PrimitiveDateTime::new(time.date(), time.time());
Ok(self.manager.get_last_block_height_before(time).await?)
}
pub async fn get_blocks_between(
&self,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, PostgresScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
Ok(block_end - block_start)
}
pub async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, PostgresScraperError> {
Ok(self
.manager
.get_signed_between(consensus_address, start_height, end_height)
.await?)
}
pub async fn get_signed_between_times(
&self,
consensus_address: &str,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, PostgresScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
self.get_signed_between(consensus_address, block_start, block_end)
.await
}
pub async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, PostgresScraperError> {
Ok(self
.manager
.get_precommit(consensus_address, height)
.await?)
}
pub async fn get_block_signers(
&self,
height: i64,
) -> Result<Vec<Validator>, PostgresScraperError> {
Ok(self.manager.get_block_validators(height).await?)
}
pub async fn get_all_known_validators(&self) -> Result<Vec<Validator>, PostgresScraperError> {
Ok(self.manager.get_validators().await?)
}
pub async fn get_last_processed_height(&self) -> Result<i64, PostgresScraperError> {
Ok(self.manager.get_last_processed_height().await?)
}
pub async fn get_pruned_height(&self) -> Result<i64, PostgresScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
}
#[async_trait]
impl NyxdScraperStorage for PostgresScraperStorage {
type StorageTransaction = PostgresStorageTransaction;
async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError> {
PostgresScraperStorage::init(storage)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError> {
self.begin_processing_tx()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_last_processed_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_pruned_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError> {
self.lowest_block_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), NyxdScraperStorageError> {
self.prune_storage(oldest_to_keep, current_height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
@@ -0,0 +1,46 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
// use crate::error::PostgresScraperError;
// use nyxd_scraper_shared::Any;
// use sqlx::types::JsonValue;
//
// pub(crate) fn proto_to_json(proto: &Any) -> Result<JsonValue, PostgresScraperError> {
// todo!()
// }
use nyxd_scraper_shared::Any;
use serde::Serialize;
#[derive(Serialize)]
pub(crate) struct PlaceholderMessage {
#[serde(rename = "@type")]
pub(crate) typ: String,
pub(crate) placeholder: String,
}
impl<'a> From<&'a Any> for PlaceholderMessage {
fn from(value: &'a Any) -> Self {
PlaceholderMessage {
typ: value.type_url.to_ascii_lowercase(),
placeholder: "PLACEHOLDER CONTENT - TODO: IMPLEMENT PROPER PROTO -> JSON PARSING"
.to_string(),
}
}
}
#[derive(Serialize)]
pub(crate) struct PlaceholderStruct {
pub(crate) typ: String,
pub(crate) placeholder: String,
}
impl PlaceholderStruct {
pub(crate) fn new<T>(_: T) -> Self {
PlaceholderStruct {
typ: std::any::type_name::<T>().to_string(),
placeholder: "PLACEHOLDER CONTENT - SOMETHING IS MISSING serde DERIVES".to_string(),
}
}
}
@@ -0,0 +1,538 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::models::{CommitSignature, Validator};
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use sqlx::types::time::PrimitiveDateTime;
use sqlx::types::JsonValue;
use sqlx::{Executor, Postgres};
use tokio::time::Instant;
use tracing::{instrument, trace};
#[derive(Clone)]
pub(crate) struct StorageManager {
pub(crate) connection_pool: sqlx::Pool<Postgres>,
}
impl StorageManager {
pub(crate) async fn set_initial_metadata(&self) -> Result<(), sqlx::Error> {
if sqlx::query("SELECT * from metadata")
.fetch_optional(&self.connection_pool)
.await?
.is_none()
{
sqlx::query("INSERT INTO metadata (id, last_processed_height) VALUES (0, 0)")
.execute(&self.connection_pool)
.await?;
}
Ok(())
}
pub(crate) async fn get_lowest_block(&self) -> Result<Option<i64>, sqlx::Error> {
trace!("get_lowest_block");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT height
FROM block
ORDER BY height ASC
LIMIT 1
"#,
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_lowest_block", start);
Ok(maybe_record.map(|x| x.height))
}
pub(crate) async fn get_first_block_height_after(
&self,
time: PrimitiveDateTime,
) -> Result<Option<i64>, sqlx::Error> {
trace!("get_first_block_height_after");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT height
FROM block
WHERE timestamp > $1
ORDER BY timestamp
LIMIT 1
"#,
time
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_first_block_height_after", start);
Ok(maybe_record.map(|x| x.height))
}
pub(crate) async fn get_last_block_height_before(
&self,
time: PrimitiveDateTime,
) -> Result<Option<i64>, sqlx::Error> {
trace!("get_last_block_height_before");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT height
FROM block
WHERE timestamp < $1
ORDER BY timestamp DESC
LIMIT 1
"#,
time
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_last_block_height_before", start);
Ok(maybe_record.map(|x| x.height))
}
pub(crate) async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, sqlx::Error> {
trace!("get_signed_between");
let start = Instant::now();
let count = sqlx::query!(
r#"
SELECT COUNT(*) as count FROM pre_commit
WHERE
validator_address = $1
AND height >= $2
AND height <= $3
"#,
consensus_address,
start_height,
end_height
)
.fetch_one(&self.connection_pool)
.await?
.count;
log_db_operation_time("get_signed_between", start);
Ok(count.unwrap_or(0))
}
pub(crate) async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, sqlx::Error> {
trace!("get_precommit");
let start = Instant::now();
let res = sqlx::query_as(
r#"
SELECT * FROM pre_commit
WHERE validator_address = $1
AND height = $2
"#,
)
.bind(consensus_address)
.bind(height)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_precommit", start);
Ok(res)
}
pub(crate) async fn get_block_validators(
&self,
height: i64,
) -> Result<Vec<Validator>, sqlx::Error> {
trace!("get_block_validators");
let start = Instant::now();
let res = sqlx::query_as!(
Validator,
r#"
SELECT * FROM validator
WHERE EXISTS (
SELECT 1 FROM pre_commit
WHERE height = $1
AND pre_commit.validator_address = validator.consensus_address
)
"#,
height
)
.fetch_all(&self.connection_pool)
.await?;
log_db_operation_time("get_block_validators", start);
Ok(res)
}
pub(crate) async fn get_validators(&self) -> Result<Vec<Validator>, sqlx::Error> {
trace!("get_validators");
let start = Instant::now();
let res = sqlx::query_as("SELECT * FROM validator")
.fetch_all(&self.connection_pool)
.await?;
log_db_operation_time("get_validators", start);
Ok(res)
}
pub(crate) async fn get_last_processed_height(&self) -> Result<i64, sqlx::Error> {
trace!("get_last_processed_height");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT last_processed_height FROM metadata
"#
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_last_processed_height", start);
if let Some(row) = maybe_record {
Ok(row.last_processed_height as i64)
} else {
Ok(-1)
}
}
pub(crate) async fn get_pruned_height(&self) -> Result<i64, sqlx::Error> {
trace!("get_pruned_height");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT last_pruned_height FROM pruning
"#
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_pruned_height", start);
if let Some(row) = maybe_record {
Ok(row.last_pruned_height)
} else {
Ok(-1)
}
}
}
// make those generic over executor so that they could be performed over connection pool and a tx
#[instrument(skip(executor))]
pub(crate) async fn insert_validator<'a, E>(
consensus_address: String,
consensus_pubkey: String,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_validator");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO validator (consensus_address, consensus_pubkey)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
"#,
consensus_address,
consensus_pubkey
)
.execute(executor)
.await?;
log_db_operation_time("insert_validator", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn insert_block<'a, E>(
height: i64,
hash: String,
num_txs: i32,
total_gas: i64,
proposer_address: String,
timestamp: PrimitiveDateTime,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_block");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO block (height, hash, num_txs, total_gas, proposer_address, timestamp)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT DO NOTHING
"#,
height,
hash,
num_txs,
total_gas,
proposer_address,
timestamp
)
.execute(executor)
.await?;
log_db_operation_time("insert_block", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn insert_precommit<'a, E>(
validator_address: String,
height: i64,
timestamp: PrimitiveDateTime,
voting_power: i64,
proposer_priority: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_precommit");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO pre_commit (validator_address, height, timestamp, voting_power, proposer_priority)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (validator_address, timestamp) DO NOTHING
"#,
validator_address,
height,
timestamp,
voting_power,
proposer_priority
)
.execute(executor)
.await?;
log_db_operation_time("insert_precommit", start);
Ok(())
}
#[instrument(skip(executor))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn insert_transaction<'a, E>(
hash: String,
height: i64,
index: i32,
success: bool,
messages: JsonValue,
memo: String,
signatures: Vec<String>,
signer_infos: JsonValue,
fee: JsonValue,
gas_wanted: i64,
gas_used: i64,
raw_log: String,
logs: JsonValue,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_transaction");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO transaction
(hash, height, index, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (hash) DO UPDATE
SET height = excluded.height,
index = excluded.index,
success = excluded.success,
messages = excluded.messages,
memo = excluded.memo,
signatures = excluded.signatures,
signer_infos = excluded.signer_infos,
fee = excluded.fee,
gas_wanted = excluded.gas_wanted,
gas_used = excluded.gas_used,
raw_log = excluded.raw_log,
logs = excluded.logs
"#,
hash,
height,
index,
success,
messages,
memo,
&signatures,
signer_infos,
fee,
gas_wanted,
gas_used,
raw_log,
logs,
)
.execute(executor)
.await?;
log_db_operation_time("insert_transaction", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn insert_message<'a, E>(
transaction_hash: String,
index: i64,
typ: String,
value: JsonValue,
involved_account_addresses: Vec<String>,
height: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_message");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO message(transaction_hash, index, type, value, involved_accounts_addresses, height)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (transaction_hash, index) DO UPDATE
SET height = excluded.height,
type = excluded.type,
value = excluded.value,
involved_accounts_addresses = excluded.involved_accounts_addresses
"#,
transaction_hash,
index,
typ,
value,
&involved_account_addresses,
height
)
.execute(executor)
.await?;
log_db_operation_time("insert_message", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn update_last_processed<'a, E>(
height: i32,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("update_last_processed");
let start = Instant::now();
sqlx::query!(
"UPDATE metadata SET last_processed_height = GREATEST(last_processed_height, $1)",
height
)
.execute(executor)
.await?;
log_db_operation_time("update_last_processed", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn update_last_pruned<'a, E>(height: i64, executor: E) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("update_last_pruned");
let start = Instant::now();
sqlx::query!("UPDATE pruning SET last_pruned_height = $1", height)
.execute(executor)
.await?;
log_db_operation_time("update_last_pruned", start);
Ok(())
}
pub(crate) async fn prune_blocks<'a, E>(oldest_to_keep: i64, executor: E) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_blocks");
let start = Instant::now();
sqlx::query!("DELETE FROM block WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_blocks", start);
Ok(())
}
pub(crate) async fn prune_pre_commits<'a, E>(
oldest_to_keep: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_pre_commits");
let start = Instant::now();
sqlx::query!("DELETE FROM pre_commit WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_pre_commits", start);
Ok(())
}
pub(crate) async fn prune_transactions<'a, E>(
oldest_to_keep: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_transactions");
let start = Instant::now();
sqlx::query!("DELETE FROM transaction WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_transactions", start);
Ok(())
}
pub(crate) async fn prune_messages<'a, E>(
oldest_to_keep: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_messages");
let start = Instant::now();
sqlx::query!("DELETE FROM message WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_messages", start);
Ok(())
}
@@ -0,0 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod block_storage;
mod helpers;
mod manager;
pub mod models;
pub mod transaction;
@@ -0,0 +1,278 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::PostgresScraperError;
use crate::storage::helpers::{PlaceholderMessage, PlaceholderStruct};
use crate::storage::manager::{
insert_block, insert_message, insert_precommit, insert_transaction, insert_validator,
};
use async_trait::async_trait;
use base64::engine::general_purpose;
use base64::Engine as _;
use nyxd_scraper_shared::helpers::{
validator_consensus_address, validator_info, validator_pubkey_to_bech32,
};
use nyxd_scraper_shared::storage::validators::Response;
use nyxd_scraper_shared::storage::{
validators, Block, Commit, CommitSig, NyxdScraperStorageError, NyxdScraperTransaction,
};
use nyxd_scraper_shared::ParsedTransactionResponse;
use serde_json::json;
use sqlx::types::time::{OffsetDateTime, PrimitiveDateTime};
use sqlx::{Postgres, Transaction};
use std::ops::{Deref, DerefMut};
use tracing::{debug, trace, warn};
pub struct PostgresStorageTransaction(pub(crate) Transaction<'static, Postgres>);
impl Deref for PostgresStorageTransaction {
type Target = Transaction<'static, Postgres>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for PostgresStorageTransaction {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl PostgresStorageTransaction {
async fn persist_validators(
&mut self,
validators: &validators::Response,
) -> Result<(), PostgresScraperError> {
debug!("persisting {} validators", validators.total);
for validator in &validators.validators {
let consensus_address = validator_consensus_address(validator.address)?;
let consensus_pubkey = validator_pubkey_to_bech32(validator.pub_key)?;
insert_validator(
consensus_address.to_string(),
consensus_pubkey.to_string(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), PostgresScraperError> {
let proposer_address =
validator_consensus_address(block.header.proposer_address)?.to_string();
let offset_datetime: OffsetDateTime = block.header.time.into();
let time = PrimitiveDateTime::new(offset_datetime.date(), offset_datetime.time());
insert_block(
block.header.height.into(),
block.header.hash().to_string(),
block.data.len() as i32,
total_gas,
proposer_address,
time,
self.0.as_mut(),
)
.await?;
Ok(())
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &validators::Response,
) -> Result<(), PostgresScraperError> {
debug!("persisting up to {} commits", commits.signatures.len());
let height: i64 = commits.height.into();
for commit_sig in &commits.signatures {
let (validator_id, timestamp, signature) = match commit_sig {
CommitSig::BlockIdFlagAbsent => {
trace!("absent signature");
continue;
}
CommitSig::BlockIdFlagCommit {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
CommitSig::BlockIdFlagNil {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
};
let validator = validator_info(*validator_id, validators)?;
let validator_address = validator_consensus_address(*validator_id)?;
if signature.is_none() {
warn!("empty signature for {validator_address} at height {height}");
continue;
}
let offset_datetime: OffsetDateTime = (*timestamp).into();
let time = PrimitiveDateTime::new(offset_datetime.date(), offset_datetime.time());
insert_precommit(
validator_address.to_string(),
height,
time,
validator.power.into(),
validator.proposer_priority.value(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), PostgresScraperError> {
debug!("persisting {} txs", txs.len());
for chain_tx in txs {
// bdjuno style, base64 encode them
let signatures = chain_tx
.tx
.signatures
.iter()
.map(|sig| general_purpose::STANDARD.encode(sig))
.collect();
// TODO: uncover the secrets of juno's usage of `jsonpb` and how they're recovering
// field names from proto data
let messages = chain_tx
.tx
.body
.messages
.iter()
.map(|msg| PlaceholderMessage::from(msg))
.collect::<Vec<_>>();
// TODO: missing cosmrs' derives
let signer_infos = chain_tx
.tx
.auth_info
.signer_infos
.iter()
.map(PlaceholderStruct::new)
.collect::<Vec<_>>();
insert_transaction(
chain_tx.hash.to_string(),
chain_tx.height.into(),
chain_tx.index as i32,
chain_tx.tx_result.code.is_ok(),
serde_json::to_value(messages)?,
chain_tx.tx.body.memo.clone(),
signatures,
serde_json::to_value(signer_infos)?,
serde_json::to_value(&chain_tx.tx.auth_info.fee)?,
chain_tx.tx_result.gas_wanted,
chain_tx.tx_result.gas_used,
chain_tx.tx_result.log.clone(),
json!({ "value": "yep, another todo. on first glance corresponding field doesn't exist in rust" }),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), PostgresScraperError> {
debug!("persisting messages");
for chain_tx in txs {
for (index, msg) in chain_tx.tx.body.messages.iter().enumerate() {
insert_message(
chain_tx.hash.to_string(),
index as i64,
msg.type_url.clone(),
serde_json::to_value(PlaceholderMessage::from(msg))?,
vec!["PLACEHOLDER".to_owned()],
chain_tx.height.into(),
self.0.as_mut(),
)
.await?
}
}
Ok(())
}
}
#[async_trait]
impl NyxdScraperTransaction for PostgresStorageTransaction {
async fn commit(self) -> Result<(), NyxdScraperStorageError> {
self.0
.commit()
.await
.map_err(PostgresScraperError::from)
.map_err(NyxdScraperStorageError::from)
}
async fn persist_validators(
&mut self,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_validators(validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), NyxdScraperStorageError> {
self.persist_block_data(block, total_gas)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_commits(commits, validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_txs(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_messages(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError> {
self.update_last_processed(height).await
}
}
@@ -1,5 +1,5 @@
[package]
name = "nyxd-scraper"
name = "nyxd-scraper-shared"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
@@ -7,8 +7,8 @@ homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
rust-version.workspace = true
readme.workspace = true
[dependencies]
async-trait.workspace = true
@@ -19,7 +19,6 @@ futures.workspace = true
humantime = { workspace = true }
sha2 = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"] }
tendermint.workspace = true
tendermint-rpc = { workspace = true, features = ["websocket-client", "http-client"] }
thiserror.workspace = true
@@ -30,11 +29,5 @@ tokio-util = { workspace = true, features = ["rt"] }
tracing.workspace = true
url.workspace = true
# TEMP
#nym-bin-common = { path = "../bin-common", features = ["basic_tracing"]}
[build-dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[lints]
workspace = true
@@ -7,7 +7,7 @@ use crate::block_requester::BlockRequest;
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::storage::{persist_block, ScraperStorage};
use crate::storage::{persist_block, NyxdScraperStorage, NyxdScraperTransaction};
use crate::PruningOptions;
use futures::StreamExt;
use std::cmp::max;
@@ -77,7 +77,7 @@ impl BlockProcessorConfig {
}
}
pub struct BlockProcessor {
pub struct BlockProcessor<S> {
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
@@ -90,7 +90,7 @@ pub struct BlockProcessor {
rpc_client: RpcClient,
incoming: UnboundedReceiverStream<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: ScraperStorage,
storage: S,
// future work: rather than sending each msg to every msg module,
// let them subscribe based on `type_url` inside the message itself
@@ -101,14 +101,17 @@ pub struct BlockProcessor {
}
#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
impl<S> BlockProcessor<S>
where
S: NyxdScraperStorage,
{
pub async fn new(
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: ScraperStorage,
storage: S,
rpc_client: RpcClient,
) -> Result<Self, ScraperError> {
let last_processed = storage.get_last_processed_height().await?;
@@ -164,7 +167,11 @@ impl BlockProcessor {
// process the entire block as a transaction so that if anything fails,
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
@@ -192,10 +199,8 @@ impl BlockProcessor {
}
let commit_start = Instant::now();
tx.commit()
.await
.map_err(|source| ScraperError::StorageTxCommitFailure { source })?;
crate::storage::log_db_operation_time("committing processing tx", commit_start);
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
self.last_processed_height = full_info.block.header.height.value() as u32;
self.last_processed_at = Instant::now();
@@ -4,17 +4,16 @@
use crate::block_processor::pruning::{
EVERYTHING_PRUNING_INTERVAL, EVERYTHING_PRUNING_KEEP_RECENT,
};
use crate::helpers::MalformedDataError;
use crate::storage::NyxdScraperStorageError;
use tendermint::Hash;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
#[derive(Debug, Error)]
pub enum ScraperError {
#[error("experienced internal database error: {0}")]
InternalDatabaseError(#[from] sqlx::Error),
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("storage error: {0}")]
StorageError(#[from] NyxdScraperStorageError),
#[error("the block scraper is already running")]
ScraperAlreadyRunning,
@@ -104,40 +103,26 @@ pub enum ScraperError {
#[error("failed to begin storage tx: {source}")]
StorageTxBeginFailure {
#[source]
source: sqlx::Error,
source: NyxdScraperStorageError,
},
#[error("failed to commit storage tx: {source}")]
StorageTxCommitFailure {
#[source]
source: sqlx::Error,
source: NyxdScraperStorageError,
},
#[error("failed to send on a closed channel")]
ClosedChannelError,
#[error("failed to parse validator's address: {source}")]
MalformedValidatorAddress {
#[source]
source: eyre::Report,
},
#[error("failed to parse validator's address: {source}")]
MalformedValidatorPubkey {
#[source]
source: eyre::Report,
},
#[error(transparent)]
MalformedData(#[from] MalformedDataError),
#[error(
"could not find the block proposer ('{proposer}') for height {height} in the validator set"
)]
BlockProposerNotInValidatorSet { height: u32, proposer: String },
#[error(
"could not find validator information for {address}; the validator has signed a commit"
)]
MissingValidatorInfoCommitted { address: String },
#[error("pruning.interval must not be set to 0. If you want to disable pruning, select pruning.strategy = \"nothing\"")]
ZeroPruningInterval,
@@ -148,6 +133,18 @@ pub enum ScraperError {
TooSmallKeepRecent { keep_recent: u32 },
}
impl ScraperError {
pub fn tx_begin_failure(source: NyxdScraperStorageError) -> ScraperError
where {
ScraperError::StorageTxBeginFailure { source }
}
pub fn tx_commit_failure(source: NyxdScraperStorageError) -> ScraperError
where {
ScraperError::StorageTxCommitFailure { source }
}
}
impl<T> From<SendError<T>> for ScraperError {
fn from(_: SendError<T>) -> Self {
ScraperError::ClosedChannelError
+66
View File
@@ -0,0 +1,66 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::ParsedTransactionResponse;
use crate::constants::{BECH32_CONESNSUS_PUBKEY_PREFIX, BECH32_CONSENSUS_ADDRESS_PREFIX};
use cosmrs::AccountId;
use sha2::{Digest, Sha256};
use tendermint::{account, PublicKey};
use tendermint::{validator, Hash};
use tendermint_rpc::endpoint::validators;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MalformedDataError {
#[error("failed to parse validator's address: {source}")]
MalformedValidatorAddress {
#[source]
source: eyre::Report,
},
#[error("failed to parse validator's address: {source}")]
MalformedValidatorPubkey {
#[source]
source: eyre::Report,
},
#[error(
"could not find validator information for {address}; the validator has signed a commit"
)]
MissingValidatorInfoCommitted { address: String },
}
pub fn tx_hash<M: AsRef<[u8]>>(raw_tx: M) -> Hash {
Hash::Sha256(Sha256::digest(raw_tx).into())
}
pub fn validator_pubkey_to_bech32(pubkey: PublicKey) -> Result<AccountId, MalformedDataError> {
// TODO: this one seem to attach additional prefix to they pubkeys, is that what we want instead maybe?
// Ok(pubkey.to_bech32(BECH32_CONESNSUS_PUBKEY_PREFIX))
AccountId::new(BECH32_CONESNSUS_PUBKEY_PREFIX, &pubkey.to_bytes())
.map_err(|source| MalformedDataError::MalformedValidatorPubkey { source })
}
pub fn validator_consensus_address(id: account::Id) -> Result<AccountId, MalformedDataError> {
AccountId::new(BECH32_CONSENSUS_ADDRESS_PREFIX, id.as_ref())
.map_err(|source| MalformedDataError::MalformedValidatorAddress { source })
}
pub fn tx_gas_sum(txs: &[ParsedTransactionResponse]) -> i64 {
txs.iter().map(|tx| tx.tx_result.gas_used).sum()
}
pub fn validator_info(
id: account::Id,
validators: &validators::Response,
) -> Result<&validator::Info, MalformedDataError> {
match validators.validators.iter().find(|v| v.address == id) {
Some(info) => Ok(info),
None => {
let addr = validator_consensus_address(id)?;
Err(MalformedDataError::MissingValidatorInfoCommitted {
address: addr.to_string(),
})
}
}
}
@@ -1,14 +1,11 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![warn(clippy::expect_used)]
#![warn(clippy::unwrap_used)]
pub(crate) mod block_processor;
pub(crate) mod block_requester;
pub mod constants;
pub mod error;
pub(crate) mod helpers;
pub mod helpers;
pub mod modules;
pub(crate) mod rpc_client;
pub(crate) mod scraper;
@@ -16,6 +13,7 @@ pub mod storage;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use block_processor::types::ParsedTransactionResponse;
pub use cosmrs::Any;
pub use modules::{BlockModule, MsgModule, TxModule};
pub use scraper::{Config, NyxdScraper, StartingBlockOpts};
pub use storage::models;
pub use storage::{NyxdScraperStorage, NyxdScraperTransaction};
@@ -3,7 +3,7 @@
use crate::block_processor::types::FullBlockInformation;
use crate::error::ScraperError;
use crate::storage::StorageTransaction;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
@@ -11,6 +11,6 @@ pub trait BlockModule {
async fn handle_block(
&mut self,
block: &FullBlockInformation,
storage_tx: &mut StorageTransaction,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,7 +3,7 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::StorageTransaction;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
use cosmrs::Any;
@@ -16,6 +16,6 @@ pub trait MsgModule {
index: usize,
msg: &Any,
tx: &ParsedTransactionResponse,
storage_tx: &mut StorageTransaction,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,7 +3,7 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::StorageTransaction;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
@@ -11,6 +11,6 @@ pub trait TxModule {
async fn handle_tx(
&mut self,
tx: &ParsedTransactionResponse,
storage_tx: &mut StorageTransaction,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -8,10 +8,10 @@ use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::scraper::subscriber::ChainSubscriber;
use crate::storage::ScraperStorage;
use crate::storage::NyxdScraperStorage;
use crate::PruningOptions;
use futures::future::join_all;
use std::path::PathBuf;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
@@ -40,7 +40,8 @@ pub struct Config {
/// Url to the rpc endpoint of a validator, for example `https://rpc.nymtech.net/`
pub rpc_url: Url,
pub database_path: PathBuf,
/// Points to either underlying file (sqlite) or connection string (postgres)
pub database_storage: String,
pub pruning_options: PruningOptions,
@@ -49,7 +50,8 @@ pub struct Config {
pub start_block: StartingBlockOpts,
}
pub struct NyxdScraperBuilder {
pub struct NyxdScraperBuilder<S> {
_storage: PhantomData<S>,
config: Config,
block_modules: Vec<Box<dyn BlockModule + Send>>,
@@ -57,9 +59,13 @@ pub struct NyxdScraperBuilder {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
impl NyxdScraperBuilder {
pub async fn build_and_start(self) -> Result<NyxdScraper, ScraperError> {
let scraper = NyxdScraper::new(self.config).await?;
impl<S> NyxdScraperBuilder<S>
where
S: NyxdScraperStorage + Send + Sync + 'static,
S::StorageTransaction: Send + Sync + 'static,
{
pub async fn build_and_start(self) -> Result<NyxdScraper<S>, ScraperError> {
let scraper = NyxdScraper::<S>::new(self.config).await?;
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
@@ -110,6 +116,7 @@ impl NyxdScraperBuilder {
pub fn new(config: Config) -> Self {
NyxdScraperBuilder {
_storage: PhantomData,
config,
block_modules: vec![],
tx_modules: vec![],
@@ -133,24 +140,28 @@ impl NyxdScraperBuilder {
}
}
pub struct NyxdScraper {
pub struct NyxdScraper<S> {
config: Config,
task_tracker: TaskTracker,
cancel_token: CancellationToken,
startup_sync: Arc<Notify>,
storage: ScraperStorage,
storage: S,
rpc_client: RpcClient,
}
impl NyxdScraper {
pub fn builder(config: Config) -> NyxdScraperBuilder {
impl<S> NyxdScraper<S>
where
S: NyxdScraperStorage + Send + Sync + 'static,
S::StorageTransaction: Send + Sync + 'static,
{
pub fn builder(config: Config) -> NyxdScraperBuilder<S> {
NyxdScraperBuilder::new(config)
}
pub async fn new(config: Config) -> Result<Self, ScraperError> {
config.pruning_options.validate()?;
let storage = ScraperStorage::init(&config.database_path).await?;
let storage = S::initialise(&config.database_storage).await?;
let rpc_client = RpcClient::new(&config.rpc_url)?;
Ok(NyxdScraper {
@@ -163,14 +174,14 @@ impl NyxdScraper {
})
}
pub fn storage(&self) -> ScraperStorage {
self.storage.clone()
pub fn storage(&self) -> &S {
&self.storage
}
fn start_tasks(
&self,
mut block_requester: BlockRequester,
mut block_processor: BlockProcessor,
mut block_processor: BlockProcessor<S>,
mut chain_subscriber: ChainSubscriber,
) {
self.task_tracker
@@ -336,7 +347,7 @@ impl NyxdScraper {
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
) -> Result<BlockProcessor<S>, ScraperError> {
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
self.config.store_precommits,
@@ -344,7 +355,7 @@ impl NyxdScraper {
self.config.start_block.use_best_effort_start_height,
);
BlockProcessor::new(
BlockProcessor::<S>::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
@@ -0,0 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use tokio::time::Instant;
use tracing::{debug, error, info, trace, warn};
pub fn log_db_operation_time(op_name: &str, start_time: Instant) {
let elapsed = start_time.elapsed();
let formatted = humantime::format_duration(elapsed);
match elapsed.as_millis() {
v if v > 10000 => error!("{op_name} took {formatted} to execute"),
v if v > 1000 => warn!("{op_name} took {formatted} to execute"),
v if v > 100 => info!("{op_name} took {formatted} to execute"),
v if v > 10 => debug!("{op_name} took {formatted} to execute"),
_ => trace!("{op_name} took {formatted} to execute"),
}
}
@@ -0,0 +1,124 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::ScraperError;
use async_trait::async_trait;
use thiserror::Error;
use tracing::warn;
pub use crate::block_processor::types::FullBlockInformation;
pub use crate::ParsedTransactionResponse;
pub use tendermint::block::{Commit, CommitSig};
pub use tendermint::Block;
pub use tendermint_rpc::endpoint::validators;
pub mod helpers;
// a workaround for needing associated type (which is a no-no in dynamic dispatch)
#[derive(Error, Debug)]
#[error(transparent)]
pub struct NyxdScraperStorageError(Box<dyn std::error::Error + Send + Sync>);
impl NyxdScraperStorageError {
pub fn new<E>(error: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
NyxdScraperStorageError(Box::new(error))
}
}
#[async_trait]
pub trait NyxdScraperStorage: Clone + Sized {
type StorageTransaction: NyxdScraperTransaction;
/// Either connection string (postgres) or storage path (sqlite)
async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError>;
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError>;
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError>;
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError>;
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError>;
async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), NyxdScraperStorageError>;
}
#[async_trait]
pub trait NyxdScraperTransaction {
async fn commit(mut self) -> Result<(), NyxdScraperStorageError>;
async fn persist_validators(
&mut self,
validators: &validators::Response,
) -> Result<(), NyxdScraperStorageError>;
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), NyxdScraperStorageError>;
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &validators::Response,
) -> Result<(), NyxdScraperStorageError>;
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError>;
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError>;
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError>;
}
pub async fn persist_block<Tx>(
block: &FullBlockInformation,
tx: &mut Tx,
store_precommits: bool,
) -> Result<(), ScraperError>
where
Tx: NyxdScraperTransaction,
{
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
// SANITY CHECK: make sure the block proposer is present in the validator set
block.ensure_proposer()?;
tx.persist_validators(&block.validators).await?;
tx.persist_block_data(&block.block, total_gas).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
tx.persist_commits(commit, &block.validators).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
tx.persist_txs(&block.transactions).await?;
// persist messages (inside the transactions)
tx.persist_messages(&block.transactions).await?;
tx.update_last_processed(block.block.header.height.into())
.await?;
Ok(())
}
+28
View File
@@ -0,0 +1,28 @@
[package]
name = "nyxd-scraper-sqlite"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
nyxd-scraper-shared = { path = "../nyxd-scraper-shared" }
[build-dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[lints]
workspace = true
@@ -0,0 +1,10 @@
/*
* Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
CREATE TABLE METADATA
(
id INTEGER PRIMARY KEY CHECK (id = 0),
last_processed_height INTEGER NOT NULL
);
+36
View File
@@ -0,0 +1,36 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nyxd_scraper_shared::helpers::MalformedDataError;
use nyxd_scraper_shared::storage::NyxdScraperStorageError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SqliteScraperError {
#[error("experienced internal database error: {0}")]
InternalDatabaseError(#[from] sqlx::error::Error),
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("failed to begin storage tx: {source}")]
StorageTxBeginFailure {
#[source]
source: sqlx::error::Error,
},
#[error("failed to commit storage tx: {source}")]
StorageTxCommitFailure {
#[source]
source: sqlx::error::Error,
},
#[error(transparent)]
MalformedData(#[from] MalformedDataError),
}
impl From<SqliteScraperError> for NyxdScraperStorageError {
fn from(err: SqliteScraperError) -> Self {
NyxdScraperStorageError::new(err)
}
}
+21
View File
@@ -0,0 +1,21 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::block_storage::SqliteScraperStorage;
use nyxd_scraper_shared::NyxdScraper;
pub use nyxd_scraper_shared::constants;
pub use nyxd_scraper_shared::error::ScraperError;
pub use nyxd_scraper_shared::{
BlockModule, MsgModule, NyxdScraperTransaction, ParsedTransactionResponse, PruningOptions,
PruningStrategy, StartingBlockOpts, TxModule,
};
pub use storage::models;
pub mod error;
pub mod storage;
pub type SqliteNyxdScraper = NyxdScraper<SqliteScraperStorage>;
// TODO: for now just use exactly the same config
pub use nyxd_scraper_shared::Config;
@@ -0,0 +1,251 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::SqliteScraperError;
use crate::models::{CommitSignature, Validator};
use crate::storage::manager::{
prune_blocks, prune_messages, prune_pre_commits, prune_transactions, update_last_pruned,
StorageManager,
};
use crate::storage::transaction::SqliteStorageTransaction;
use async_trait::async_trait;
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use nyxd_scraper_shared::storage::{NyxdScraperStorage, NyxdScraperStorageError};
use sqlx::sqlite::{SqliteAutoVacuum, SqliteSynchronous};
use sqlx::types::time::OffsetDateTime;
use sqlx::ConnectOptions;
use std::fmt::Debug;
use std::path::Path;
use tokio::time::Instant;
use tracing::{debug, error, info, instrument};
#[derive(Clone)]
pub struct SqliteScraperStorage {
pub(crate) manager: StorageManager,
}
impl SqliteScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(
database_path: P,
) -> Result<Self, SqliteScraperError> {
let database_path = database_path.as_ref();
debug!(
"initialising scraper database path to '{}'",
database_path.display()
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./sql_migrations")
.run(&connection_pool)
.await
{
error!("Failed to initialize SQLx database: {err}");
return Err(err.into());
}
info!("Database migration finished!");
let manager = StorageManager { connection_pool };
manager.set_initial_metadata().await?;
let storage = SqliteScraperStorage { manager };
Ok(storage)
}
#[instrument(skip(self))]
pub async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), SqliteScraperError> {
let start = Instant::now();
let mut tx = self.begin_processing_tx().await?;
prune_messages(oldest_to_keep.into(), &mut **tx).await?;
prune_transactions(oldest_to_keep.into(), &mut **tx).await?;
prune_pre_commits(oldest_to_keep.into(), &mut **tx).await?;
prune_blocks(oldest_to_keep.into(), &mut **tx).await?;
update_last_pruned(current_height.into(), &mut **tx).await?;
let commit_start = Instant::now();
tx.0.commit()
.await
.map_err(|source| SqliteScraperError::StorageTxCommitFailure { source })?;
log_db_operation_time("committing pruning tx", commit_start);
log_db_operation_time("pruning storage", start);
Ok(())
}
#[instrument(skip_all)]
pub async fn begin_processing_tx(
&self,
) -> Result<SqliteStorageTransaction, SqliteScraperError> {
debug!("starting storage tx");
self.manager
.connection_pool
.begin()
.await
.map(SqliteStorageTransaction)
.map_err(|source| SqliteScraperError::StorageTxBeginFailure { source })
}
pub async fn lowest_block_height(&self) -> Result<Option<i64>, SqliteScraperError> {
Ok(self.manager.get_lowest_block().await?)
}
pub async fn get_first_block_height_after(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, SqliteScraperError> {
Ok(self.manager.get_first_block_height_after(time).await?)
}
pub async fn get_last_block_height_before(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, SqliteScraperError> {
Ok(self.manager.get_last_block_height_before(time).await?)
}
pub async fn get_blocks_between(
&self,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, SqliteScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
Ok(block_end - block_start)
}
pub async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, SqliteScraperError> {
Ok(self
.manager
.get_signed_between(consensus_address, start_height, end_height)
.await?)
}
pub async fn get_signed_between_times(
&self,
consensus_address: &str,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, SqliteScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
self.get_signed_between(consensus_address, block_start, block_end)
.await
}
pub async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, SqliteScraperError> {
Ok(self
.manager
.get_precommit(consensus_address, height)
.await?)
}
pub async fn get_block_signers(
&self,
height: i64,
) -> Result<Vec<Validator>, SqliteScraperError> {
Ok(self.manager.get_block_validators(height).await?)
}
pub async fn get_all_known_validators(&self) -> Result<Vec<Validator>, SqliteScraperError> {
Ok(self.manager.get_validators().await?)
}
pub async fn get_last_processed_height(&self) -> Result<i64, SqliteScraperError> {
Ok(self.manager.get_last_processed_height().await?)
}
pub async fn get_pruned_height(&self) -> Result<i64, SqliteScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
}
#[async_trait]
impl NyxdScraperStorage for SqliteScraperStorage {
type StorageTransaction = SqliteStorageTransaction;
async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError> {
SqliteScraperStorage::init(storage)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError> {
self.begin_processing_tx()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_last_processed_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_pruned_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError> {
self.lowest_block_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), NyxdScraperStorageError> {
self.prune_storage(oldest_to_keep, current_height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
@@ -1,8 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::log_db_operation_time;
use crate::storage::models::{CommitSignature, Validator};
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use sqlx::types::time::OffsetDateTime;
use sqlx::{Executor, Sqlite};
use tokio::time::Instant;
@@ -1,2 +1,7 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod block_storage;
mod manager;
pub mod models;
pub mod transaction;
@@ -0,0 +1,30 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use sqlx::types::time::OffsetDateTime;
use sqlx::FromRow;
#[derive(Debug, Clone, Eq, PartialEq, Hash, FromRow)]
pub struct Validator {
pub consensus_address: String,
pub consensus_pubkey: String,
}
#[derive(Debug, Clone, FromRow)]
pub struct Block {
pub height: i64,
pub hash: String,
pub num_txs: u32,
pub total_gas: i64,
pub proposer_address: String,
pub timestamp: OffsetDateTime,
}
#[derive(Debug, Clone, FromRow)]
pub struct CommitSignature {
pub height: i64,
pub validator_address: String,
pub voting_power: i64,
pub proposer_priority: i64,
pub timestamp: OffsetDateTime,
}
@@ -0,0 +1,236 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::SqliteScraperError;
use crate::storage::manager::{
insert_block, insert_message, insert_precommit, insert_transaction, insert_validator,
};
use async_trait::async_trait;
use nyxd_scraper_shared::helpers::{
validator_consensus_address, validator_info, validator_pubkey_to_bech32,
};
use nyxd_scraper_shared::storage::validators::Response;
use nyxd_scraper_shared::storage::{
validators, Block, Commit, CommitSig, NyxdScraperStorageError, NyxdScraperTransaction,
};
use nyxd_scraper_shared::ParsedTransactionResponse;
use sqlx::{Sqlite, Transaction};
use std::ops::{Deref, DerefMut};
use tracing::{debug, trace, warn};
pub struct SqliteStorageTransaction(pub(crate) Transaction<'static, Sqlite>);
impl Deref for SqliteStorageTransaction {
type Target = Transaction<'static, Sqlite>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for SqliteStorageTransaction {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl SqliteStorageTransaction {
async fn persist_validators(
&mut self,
validators: &validators::Response,
) -> Result<(), SqliteScraperError> {
debug!("persisting {} validators", validators.total);
for validator in &validators.validators {
let consensus_address = validator_consensus_address(validator.address)?;
let consensus_pubkey = validator_pubkey_to_bech32(validator.pub_key)?;
insert_validator(
consensus_address.to_string(),
consensus_pubkey.to_string(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), SqliteScraperError> {
let proposer_address =
validator_consensus_address(block.header.proposer_address)?.to_string();
insert_block(
block.header.height.into(),
block.header.hash().to_string(),
block.data.len() as u32,
total_gas,
proposer_address,
block.header.time.into(),
self.0.as_mut(),
)
.await?;
Ok(())
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &validators::Response,
) -> Result<(), SqliteScraperError> {
debug!("persisting up to {} commits", commits.signatures.len());
let height: i64 = commits.height.into();
for commit_sig in &commits.signatures {
let (validator_id, timestamp, signature) = match commit_sig {
CommitSig::BlockIdFlagAbsent => {
trace!("absent signature");
continue;
}
CommitSig::BlockIdFlagCommit {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
CommitSig::BlockIdFlagNil {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
};
let validator = validator_info(*validator_id, validators)?;
let validator_address = validator_consensus_address(*validator_id)?;
if signature.is_none() {
warn!("empty signature for {validator_address} at height {height}");
continue;
}
insert_precommit(
validator_address.to_string(),
height,
(*timestamp).into(),
validator.power.into(),
validator.proposer_priority.value(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), SqliteScraperError> {
debug!("persisting {} txs", txs.len());
for chain_tx in txs {
insert_transaction(
chain_tx.hash.to_string(),
chain_tx.height.into(),
chain_tx.index as i64,
chain_tx.tx_result.code.is_ok(),
chain_tx.tx.body.messages.len() as i64,
chain_tx.tx.body.memo.clone(),
chain_tx.tx_result.gas_wanted,
chain_tx.tx_result.gas_used,
chain_tx.tx_result.log.clone(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), SqliteScraperError> {
debug!("persisting messages");
for chain_tx in txs {
for (index, msg) in chain_tx.tx.body.messages.iter().enumerate() {
insert_message(
chain_tx.hash.to_string(),
index as i64,
msg.type_url.clone(),
chain_tx.height.into(),
self.0.as_mut(),
)
.await?
}
}
Ok(())
}
}
#[async_trait]
impl NyxdScraperTransaction for SqliteStorageTransaction {
async fn commit(self) -> Result<(), NyxdScraperStorageError> {
self.0
.commit()
.await
.map_err(SqliteScraperError::from)
.map_err(NyxdScraperStorageError::from)
}
async fn persist_validators(
&mut self,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_validators(validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), NyxdScraperStorageError> {
self.persist_block_data(block, total_gas)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_commits(commits, validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_txs(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_messages(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError> {
self.update_last_processed(height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
-46
View File
@@ -1,46 +0,0 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::ParsedTransactionResponse;
use crate::constants::{BECH32_CONESNSUS_PUBKEY_PREFIX, BECH32_CONSENSUS_ADDRESS_PREFIX};
use crate::error::ScraperError;
use cosmrs::AccountId;
use sha2::{Digest, Sha256};
use tendermint::{account, PublicKey};
use tendermint::{validator, Hash};
use tendermint_rpc::endpoint::validators;
pub(crate) fn tx_hash<M: AsRef<[u8]>>(raw_tx: M) -> Hash {
Hash::Sha256(Sha256::digest(raw_tx).into())
}
pub(crate) fn validator_pubkey_to_bech32(pubkey: PublicKey) -> Result<AccountId, ScraperError> {
// TODO: this one seem to attach additional prefix to they pubkeys, is that what we want instead maybe?
// Ok(pubkey.to_bech32(BECH32_CONESNSUS_PUBKEY_PREFIX))
AccountId::new(BECH32_CONESNSUS_PUBKEY_PREFIX, &pubkey.to_bytes())
.map_err(|source| ScraperError::MalformedValidatorPubkey { source })
}
pub(crate) fn validator_consensus_address(id: account::Id) -> Result<AccountId, ScraperError> {
AccountId::new(BECH32_CONSENSUS_ADDRESS_PREFIX, id.as_ref())
.map_err(|source| ScraperError::MalformedValidatorAddress { source })
}
pub(crate) fn tx_gas_sum(txs: &[ParsedTransactionResponse]) -> i64 {
txs.iter().map(|tx| tx.tx_result.gas_used).sum()
}
pub(crate) fn validator_info(
id: account::Id,
validators: &validators::Response,
) -> Result<&validator::Info, ScraperError> {
match validators.validators.iter().find(|v| v.address == id) {
Some(info) => Ok(info),
None => {
let addr = validator_consensus_address(id)?;
Err(ScraperError::MissingValidatorInfoCommitted {
address: addr.to_string(),
})
}
}
}
-394
View File
@@ -1,394 +0,0 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{
block_processor::types::{FullBlockInformation, ParsedTransactionResponse},
error::ScraperError,
storage::{
manager::{
insert_block, insert_message, insert_precommit, insert_transaction, insert_validator,
prune_blocks, prune_messages, prune_pre_commits, prune_transactions,
update_last_processed, update_last_pruned, StorageManager,
},
models::{CommitSignature, Validator},
},
};
use sqlx::{
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
types::time::OffsetDateTime,
ConnectOptions, Sqlite, Transaction,
};
use std::{fmt::Debug, path::Path};
use tendermint::{
block::{Commit, CommitSig},
Block,
};
use tendermint_rpc::endpoint::validators;
use tokio::time::Instant;
use tracing::{debug, error, info, instrument, trace, warn};
mod helpers;
mod manager;
pub mod models;
pub type StorageTransaction = Transaction<'static, Sqlite>;
#[derive(Clone)]
pub struct ScraperStorage {
pub(crate) manager: StorageManager,
}
pub(crate) fn log_db_operation_time(op_name: &str, start_time: Instant) {
let elapsed = start_time.elapsed();
let formatted = humantime::format_duration(elapsed);
match elapsed.as_millis() {
v if v > 10000 => error!("{op_name} took {formatted} to execute"),
v if v > 1000 => warn!("{op_name} took {formatted} to execute"),
v if v > 100 => info!("{op_name} took {formatted} to execute"),
v if v > 10 => debug!("{op_name} took {formatted} to execute"),
_ => trace!("{op_name} took {formatted} to execute"),
}
}
impl ScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(database_path: P) -> Result<Self, ScraperError> {
let database_path = database_path.as_ref();
debug!(
"initialising scraper database path to '{}'",
database_path.display()
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./sql_migrations")
.run(&connection_pool)
.await
{
error!("Failed to initialize SQLx database: {err}");
return Err(err.into());
}
info!("Database migration finished!");
let manager = StorageManager { connection_pool };
manager.set_initial_metadata().await?;
let storage = ScraperStorage { manager };
Ok(storage)
}
#[instrument(skip(self))]
pub async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), ScraperError> {
let start = Instant::now();
let mut tx = self.begin_processing_tx().await?;
prune_messages(oldest_to_keep.into(), &mut *tx).await?;
prune_transactions(oldest_to_keep.into(), &mut *tx).await?;
prune_pre_commits(oldest_to_keep.into(), &mut *tx).await?;
prune_blocks(oldest_to_keep.into(), &mut *tx).await?;
update_last_pruned(current_height.into(), &mut *tx).await?;
let commit_start = Instant::now();
tx.commit()
.await
.map_err(|source| ScraperError::StorageTxCommitFailure { source })?;
log_db_operation_time("committing pruning tx", commit_start);
log_db_operation_time("pruning storage", start);
Ok(())
}
#[instrument(skip_all)]
pub async fn begin_processing_tx(&self) -> Result<StorageTransaction, ScraperError> {
debug!("starting storage tx");
self.manager
.connection_pool
.begin()
.await
.map_err(|source| ScraperError::StorageTxBeginFailure { source })
}
pub async fn lowest_block_height(&self) -> Result<Option<i64>, ScraperError> {
Ok(self.manager.get_lowest_block().await?)
}
pub async fn get_first_block_height_after(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, ScraperError> {
Ok(self.manager.get_first_block_height_after(time).await?)
}
pub async fn get_last_block_height_before(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, ScraperError> {
Ok(self.manager.get_last_block_height_before(time).await?)
}
pub async fn get_blocks_between(
&self,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, ScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
Ok(block_end - block_start)
}
pub async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, ScraperError> {
Ok(self
.manager
.get_signed_between(consensus_address, start_height, end_height)
.await?)
}
pub async fn get_signed_between_times(
&self,
consensus_address: &str,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, ScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
self.get_signed_between(consensus_address, block_start, block_end)
.await
}
pub async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, ScraperError> {
Ok(self
.manager
.get_precommit(consensus_address, height)
.await?)
}
pub async fn get_block_signers(&self, height: i64) -> Result<Vec<Validator>, ScraperError> {
Ok(self.manager.get_block_validators(height).await?)
}
pub async fn get_all_known_validators(&self) -> Result<Vec<Validator>, ScraperError> {
Ok(self.manager.get_validators().await?)
}
pub async fn get_last_processed_height(&self) -> Result<i64, ScraperError> {
Ok(self.manager.get_last_processed_height().await?)
}
pub async fn get_pruned_height(&self) -> Result<i64, ScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
}
pub async fn persist_block(
block: &FullBlockInformation,
tx: &mut StorageTransaction,
store_precommits: bool,
) -> Result<(), ScraperError> {
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
// SANITY CHECK: make sure the block proposer is present in the validator set
block.ensure_proposer()?;
// persist validators
persist_validators(&block.validators, tx).await?;
// persist block data
persist_block_data(&block.block, total_gas, tx).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
persist_txs(&block.transactions, tx).await?;
// persist messages (inside the transactions)
persist_messages(&block.transactions, tx).await?;
update_last_processed(block.block.header.height.into(), tx.as_mut()).await?;
Ok(())
}
async fn persist_validators(
validators: &validators::Response,
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting {} validators", validators.total);
for validator in &validators.validators {
let consensus_address = crate::helpers::validator_consensus_address(validator.address)?;
let consensus_pubkey = crate::helpers::validator_pubkey_to_bech32(validator.pub_key)?;
insert_validator(
consensus_address.to_string(),
consensus_pubkey.to_string(),
tx.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_block_data(
block: &Block,
total_gas: i64,
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
let proposer_address =
crate::helpers::validator_consensus_address(block.header.proposer_address)?.to_string();
insert_block(
block.header.height.into(),
block.header.hash().to_string(),
block.data.len() as u32,
total_gas,
proposer_address,
block.header.time.into(),
tx.as_mut(),
)
.await?;
Ok(())
}
async fn persist_commits(
commits: &Commit,
validators: &validators::Response,
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting up to {} commits", commits.signatures.len());
let height: i64 = commits.height.into();
for commit_sig in &commits.signatures {
let (validator_id, timestamp, signature) = match commit_sig {
CommitSig::BlockIdFlagAbsent => {
trace!("absent signature");
continue;
}
CommitSig::BlockIdFlagCommit {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
CommitSig::BlockIdFlagNil {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
};
let validator = crate::helpers::validator_info(*validator_id, validators)?;
let validator_address = crate::helpers::validator_consensus_address(*validator_id)?;
if signature.is_none() {
warn!("empty signature for {validator_address} at height {height}");
continue;
}
insert_precommit(
validator_address.to_string(),
height,
(*timestamp).into(),
validator.power.into(),
validator.proposer_priority.value(),
tx.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_txs(
txs: &[ParsedTransactionResponse],
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting {} txs", txs.len());
for chain_tx in txs {
insert_transaction(
chain_tx.hash.to_string(),
chain_tx.height.into(),
chain_tx.index as i64,
chain_tx.tx_result.code.is_ok(),
chain_tx.tx.body.messages.len() as i64,
chain_tx.tx.body.memo.clone(),
chain_tx.tx_result.gas_wanted,
chain_tx.tx_result.gas_used,
chain_tx.tx_result.log.clone(),
tx.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_messages(
txs: &[ParsedTransactionResponse],
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting messages");
for chain_tx in txs {
for (index, msg) in chain_tx.tx.body.messages.iter().enumerate() {
insert_message(
chain_tx.hash.to_string(),
index as i64,
msg.type_url.clone(),
chain_tx.height.into(),
tx.as_mut(),
)
.await?
}
}
Ok(())
}
+1 -1
View File
@@ -43,7 +43,7 @@ nym-network-defaults = { path = "../common/network-defaults" }
nym-task = { path = "../common/task" }
nym-validator-client = { path = "../common/client-libs/validator-client" }
nym-coconut-dkg-common = { path = "../common/cosmwasm-smart-contracts/coconut-dkg" }
nyxd-scraper = { path = "../common/nyxd-scraper" }
nyxd-scraper-sqlite = { path = "../common/nyxd-scraper-sqlite" }
nym-ticketbooks-merkle = { path = "../common/ticketbooks-merkle" }
nym-serde-helpers = { path = "../common/serde-helpers", features = ["base64"] }
nym-pemstore = { path = "../common/pemstore" }
@@ -3,7 +3,7 @@
use crate::cli::{try_load_current_config, ConfigOverridableArgs};
use crate::error::NymRewarderError;
use nyxd_scraper::NyxdScraper;
use nyxd_scraper_sqlite::SqliteNyxdScraper;
use std::path::PathBuf;
#[derive(Debug, clap::Args)]
@@ -24,7 +24,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
let config =
try_load_current_config(&args.custom_config_path)?.with_override(args.config_override);
NyxdScraper::new(config.scraper_config())
SqliteNyxdScraper::new(config.scraper_config())
.await?
.unsafe_process_single_block(args.height)
.await?;
@@ -3,7 +3,7 @@
use crate::cli::{try_load_current_config, ConfigOverridableArgs};
use crate::error::NymRewarderError;
use nyxd_scraper::NyxdScraper;
use nyxd_scraper_sqlite::SqliteNyxdScraper;
use std::path::PathBuf;
#[derive(Debug, clap::Args)]
@@ -37,7 +37,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {
let config =
try_load_current_config(&args.custom_config_path)?.with_override(args.config_override);
NyxdScraper::new(config.scraper_config())
SqliteNyxdScraper::new(config.scraper_config())
.await?
.unsafe_process_block_range(args.start_height, args.stop_height)
.await?;
+4 -4
View File
@@ -12,7 +12,7 @@ use nym_config::{
DEFAULT_CONFIG_DIR, DEFAULT_CONFIG_FILENAME, DEFAULT_DATA_DIR, NYM_DIR,
};
use nym_validator_client::nyxd::{AccountId, Coin};
use nyxd_scraper::{PruningOptions, StartingBlockOpts};
use nyxd_scraper_sqlite::{PruningOptions, StartingBlockOpts};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::io;
@@ -119,11 +119,11 @@ impl Config {
}
}
pub fn scraper_config(&self) -> nyxd_scraper::Config {
nyxd_scraper::Config {
pub fn scraper_config(&self) -> nyxd_scraper_sqlite::Config {
nyxd_scraper_sqlite::Config {
websocket_url: self.nyxd_scraper.websocket_url.clone(),
rpc_url: self.base.upstream_nyxd.clone(),
database_path: self.storage_paths.nyxd_scraper.clone(),
database_storage: self.storage_paths.nyxd_scraper.clone(),
pruning_options: self.nyxd_scraper.pruning,
store_precommits: self.nyxd_scraper.store_precommits,
start_block: StartingBlockOpts {
@@ -15,7 +15,7 @@ pub const DEFAULT_ED25519_PUBLIC_IDENTITY_KEY_FILENAME: &str = "ed25519_identity
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ValidatorRewarderPaths {
pub nyxd_scraper: PathBuf,
pub nyxd_scraper: String,
pub reward_history: PathBuf,
@@ -46,7 +46,10 @@ impl ValidatorRewarderPaths {
impl Default for ValidatorRewarderPaths {
fn default() -> Self {
ValidatorRewarderPaths {
nyxd_scraper: default_data_directory().join(DEFAULT_SCRAPER_DB_FILENAME),
// validator rewarder uses sqlite
nyxd_scraper: (default_data_directory().join(DEFAULT_SCRAPER_DB_FILENAME))
.to_string_lossy()
.to_string(),
reward_history: default_data_directory().join(DEFAULT_REWARD_HISTORY_DB_FILENAME),
private_ed25519_identity_key_file: default_data_directory()
.join(DEFAULT_ED25519_PRIVATE_IDENTITY_KEY_FILENAME),
+8 -1
View File
@@ -9,6 +9,7 @@ use nym_validator_client::nym_api::error::NymAPIError;
use nym_validator_client::nyxd::error::NyxdError;
use nym_validator_client::nyxd::tx::ErrorReport;
use nym_validator_client::nyxd::{AccountId, Coin};
use nyxd_scraper_sqlite::error::SqliteScraperError;
use std::io;
use std::path::PathBuf;
use thiserror::Error;
@@ -78,7 +79,13 @@ pub enum NymRewarderError {
#[error("chain scraping failure: {source}")]
ScraperFailure {
#[from]
source: nyxd_scraper::error::ScraperError,
source: nyxd_scraper_sqlite::ScraperError,
},
#[error("chain scraper storage failure: {source}")]
ScraperStorageFailure {
#[from]
source: SqliteScraperError,
},
// this should never happen but unwrapping everywhere was more cumbersome than just propagating the error
@@ -7,7 +7,7 @@ use crate::rewarder::epoch::Epoch;
use crate::rewarder::nyxd_client::NyxdClient;
use nym_validator_client::nyxd::module_traits::staking;
use nym_validator_client::nyxd::{AccountId, PageRequest};
use nyxd_scraper::NyxdScraper;
use nyxd_scraper_sqlite::SqliteNyxdScraper;
use std::cmp::min;
use std::collections::HashMap;
use std::ops::Range;
@@ -17,7 +17,7 @@ pub(crate) mod types;
pub struct EpochSigning {
pub(crate) nyxd_client: NyxdClient,
pub(crate) nyxd_scraper: NyxdScraper,
pub(crate) nyxd_scraper: SqliteNyxdScraper,
pub(crate) whitelist: Vec<AccountId>,
}
@@ -7,7 +7,7 @@ use crate::{
};
use cosmwasm_std::{Decimal, Uint128};
use nym_validator_client::nyxd::{module_traits::staking, AccountId, Coin};
use nyxd_scraper::models;
use nyxd_scraper_sqlite::models;
use std::collections::HashMap;
use tracing::info;
@@ -3,7 +3,7 @@
use crate::error::NymRewarderError;
use nym_validator_client::nyxd::{AccountId, PublicKey};
use nyxd_scraper::constants::{BECH32_CONSENSUS_ADDRESS_PREFIX, BECH32_PREFIX};
use nyxd_scraper_sqlite::constants::{BECH32_CONSENSUS_ADDRESS_PREFIX, BECH32_PREFIX};
use sha2::{Digest, Sha256};
pub(crate) fn consensus_pubkey_to_address(
+2 -2
View File
@@ -16,7 +16,7 @@ use nym_crypto::asymmetric::ed25519;
use nym_ecash_time::{ecash_today, ecash_today_date, EcashTime};
use nym_task::TaskManager;
use nym_validator_client::nyxd::{AccountId, Coin, Hash};
use nyxd_scraper::NyxdScraper;
use nyxd_scraper_sqlite::SqliteNyxdScraper;
use std::sync::Arc;
use time::Date;
use tokio::pin;
@@ -188,7 +188,7 @@ impl Rewarder {
info!("the block signing rewarding is running in monitor only mode");
}
let nyxd_scraper = NyxdScraper::new(config.scraper_config()).await?;
let nyxd_scraper = SqliteNyxdScraper::new(config.scraper_config()).await?;
Some(EpochSigning {
nyxd_scraper,
+2
View File
@@ -0,0 +1,2 @@
0001_metadata.sql
0002_cosmos.sql
@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO price_history\n (timestamp, chf, usd, eur, gbp, btc)\n VALUES\n ($1, $2, $3, $4, $5, $6)\n ON CONFLICT(timestamp) DO UPDATE SET\n chf=excluded.chf,\n usd=excluded.usd,\n eur=excluded.eur,\n gbp=excluded.gbp,\n btc=excluded.btc;",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Float8",
"Float8",
"Float8",
"Float8",
"Float8"
]
},
"nullable": []
},
"hash": "140df23f816ff5d7501128682ce378d582b7da78c45bc0de934f92c1abe14bda"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO transactions (tx_hash, height, message_index, sender, recipient, amount, memo)\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Int8",
"Text",
"Text",
"Text",
"Text"
]
},
"nullable": []
},
"hash": "34d0109e12e76621181b846bff9f8701f275d416a8d41686148fdb6a876de4c5"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO payments (\n transaction_hash, sender_address, receiver_address,\n amount, height, timestamp, memo\n ) VALUES ($1, $2, $3, $4, $5, $6, $7)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"Text",
"Float8",
"Int8",
"Int8",
"Text"
]
},
"nullable": []
},
"hash": "454925e9a2f21cb370d90b3e105925542f62a5e1fc1e2cdba3df0e47e47b8c9f"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "SELECT MAX(height) FROM payments",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "max",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
null
]
},
"hash": "83df8a7c5fb24ba4d89f1feb300a8f6d4ac14e7e9b7b482bd57fae568ebd96ba"
}
@@ -0,0 +1,52 @@
{
"db_name": "PostgreSQL",
"query": "SELECT timestamp, chf, usd, eur, gbp, btc FROM price_history WHERE timestamp >= $1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "timestamp",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "chf",
"type_info": "Float8"
},
{
"ordinal": 2,
"name": "usd",
"type_info": "Float8"
},
{
"ordinal": 3,
"name": "eur",
"type_info": "Float8"
},
{
"ordinal": 4,
"name": "gbp",
"type_info": "Float8"
},
{
"ordinal": 5,
"name": "btc",
"type_info": "Float8"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "a57b74a049b33aee36b72741056d60df8ad35a747808d5d1d3d525a76bbf0618"
}
@@ -0,0 +1,70 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT id, tx_hash, height, message_index, sender, recipient, amount, memo, created_at as \"created_at: ::time::OffsetDateTime\"\n FROM transactions\n WHERE height > $1\n ORDER BY height, message_index\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "tx_hash",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "height",
"type_info": "Int8"
},
{
"ordinal": 3,
"name": "message_index",
"type_info": "Int8"
},
{
"ordinal": 4,
"name": "sender",
"type_info": "Text"
},
{
"ordinal": 5,
"name": "recipient",
"type_info": "Text"
},
{
"ordinal": 6,
"name": "amount",
"type_info": "Text"
},
{
"ordinal": 7,
"name": "memo",
"type_info": "Text"
},
{
"ordinal": 8,
"name": "created_at: ::time::OffsetDateTime",
"type_info": "Date"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
true,
true
]
},
"hash": "e64101f92363902aae3fe2e70fdb0f041f4bca756a12abe300fe91637c33b4ed"
}
@@ -0,0 +1,50 @@
{
"db_name": "PostgreSQL",
"query": "SELECT timestamp, chf, usd, eur, gbp, btc FROM price_history ORDER BY timestamp DESC LIMIT 1;",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "timestamp",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "chf",
"type_info": "Float8"
},
{
"ordinal": 2,
"name": "usd",
"type_info": "Float8"
},
{
"ordinal": 3,
"name": "eur",
"type_info": "Float8"
},
{
"ordinal": 4,
"name": "gbp",
"type_info": "Float8"
},
{
"ordinal": 5,
"name": "btc",
"type_info": "Float8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false,
false,
false,
false
]
},
"hash": "f81a3275a1c7cbeefb3fdf7904c677d46a284e0446b96a2fc5bd77630c62d4b8"
}
@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO watcher_execution(start_ts, end_ts, error_message)\n VALUES ($1, $2, $3)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz",
"Timestamptz",
"Text"
]
},
"nullable": []
},
"hash": "fbf7dc2d779476fffcefafaa0a1731dfc6affe6c672df121140a5c7141f71c63"
}
+47
View File
@@ -0,0 +1,47 @@
# Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
# SPDX-License-Identifier: GPL-3.0-only
[package]
name = "nyx-chain-watcher-pgsql"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
anyhow = { workspace = true }
async-trait.workspace = true
axum = { workspace = true, features = ["tokio"] }
chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive", "env"] }
nym-config = { path = "../../common/config" }
nym-bin-common = { path = "../../common/bin-common", features = ["output_format"] }
nym-network-defaults = { path = "../../common/network-defaults" }
nym-task = { path = "../../common/task" }
nym-validator-client = { path = "../../common/client-libs/validator-client" }
nyxd-scraper-psql = { path = "../../common/nyxd-scraper-psql" }
reqwest = { workspace = true, features = ["rustls-tls"] }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "time"] }
thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true, features = ["process", "rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tower-http = { workspace = true, features = ["cors", "trace"] }
utoipa = { workspace = true, features = ["axum_extras", "time"] }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
utoipauto = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
@@ -2,7 +2,7 @@
FROM harbor.nymte.ch/dockerhub/rust:latest AS builder
COPY ./ /usr/src/nym
WORKDIR /usr/src/nym/nyx-chain-watcher
WORKDIR /usr/src/nym/nyx-chain-watcher/pgsql
RUN cargo build --release
@@ -28,6 +28,6 @@ RUN apt update && apt install -yy curl ca-certificates
WORKDIR /nym
COPY --from=builder /usr/src/nym/target/release/nyx-chain-watcher ./
COPY --from=builder /usr/src/nym/target/release/nyx-chain-watcher/pgsql ./
ENTRYPOINT [ "/nym/nyx-chain-watcher", "run" ]
+104
View File
@@ -0,0 +1,104 @@
# Makefile for nyx_chain_watcher database management
# --- Configuration ---
TEST_DATABASE_URL := postgres://testuser:testpass@localhost:5433/nyx_chain_watcher_test
# Docker compose service names
DB_SERVICE_NAME := postgres-test
DB_CONTAINER_NAME := nyx_chain_watcher_postgres_test
# Default target
.PHONY: default
default: help
# --- Main Targets ---
.PHONY: prepare-pg
prepare-pg: test-db-up test-db-wait test-db-migrate test-db-prepare test-db-down ## Setup PostgreSQL and prepare SQLx offline cache
.PHONY: test-db
test-db: test-db-up test-db-wait test-db-migrate test-db-run test-db-down ## Run tests with PostgreSQL database
.PHONY: dev-db
dev-db: test-db-up test-db-wait test-db-migrate ## Start PostgreSQL for development (keeps running)
@echo "PostgreSQL is running on port 5433"
@echo "Connection string: $(TEST_DATABASE_URL)"
# --- Docker Compose Targets ---
.PHONY: test-db-up
test-db-up: ## Start the PostgreSQL test database in the background
@echo "Starting PostgreSQL test database..."
docker compose up -d $(DB_SERVICE_NAME)
.PHONY: test-db-wait
test-db-wait: ## Wait for the PostgreSQL database to be healthy
@echo "Waiting for PostgreSQL database..."
@while ! docker inspect --format='{{.State.Health.Status}}' $(DB_CONTAINER_NAME) 2>/dev/null | grep -q 'healthy'; do \
echo -n "."; \
sleep 1; \
done; \
echo " Database is healthy!"
.PHONY: test-db-down
test-db-down: ## Stop and remove the test database
@echo "Stopping PostgreSQL test database..."
docker compose down
# --- SQLx Targets ---
.PHONY: test-db-migrate
test-db-migrate: ## Run database migrations against PostgreSQL
@echo "Copying common PostgreSQL migrations..."
cp ../../common/nyxd-scraper-psql/sql_migrations/* migrations
@echo "Running watcher PostgreSQL migrations..."
RUST_LOG=debug DATABASE_URL="$(TEST_DATABASE_URL)" sqlx migrate run --source migrations
.PHONY: test-db-prepare
test-db-prepare: ## Run sqlx prepare for compile-time query verification
@echo "Running sqlx prepare for PostgreSQL..."
DATABASE_URL="$(TEST_DATABASE_URL)" cargo sqlx prepare --
# --- Build and Test Targets ---
.PHONY: test-db-run
test-db-run: ## Run tests with PostgreSQL feature
@echo "Running tests with PostgreSQL..."
DATABASE_URL="$(TEST_DATABASE_URL)" cargo test --no-default-features
.PHONY: build-pg
build-pg: ## Build with PostgreSQL feature
@echo "Building with PostgreSQL feature..."
cargo build --no-default-features
.PHONY: check-pg
check-pg: ## Check code with PostgreSQL feature
@echo "Checking code with PostgreSQL feature..."
cargo check --no-default-features
.PHONY: clippy
clippy: clippy-pg
.PHONY: clippy-pg
clippy-pg: ## Run clippy with PostgreSQL feature
@echo "Running clippy with PostgreSQL feature..."
cargo clippy --no-default-features -- -D warnings
# --- Cleanup Targets ---
.PHONY: clean
clean: ## Clean build artifacts and SQLx cache
cargo clean
rm -rf .sqlx
.PHONY: clean-db
clean-db: test-db-down ## Stop database and clean volumes
docker volume rm -f nyx_chain_watcher_postgres_test_data 2>/dev/null || true
# --- Utility Targets ---
.PHONY: sqlx-cli
sqlx-cli: ## Install sqlx-cli if not already installed
@command -v sqlx >/dev/null 2>&1 || cargo install sqlx-cli --features postgres
.PHONY: psql
psql: ## Connect to the running PostgreSQL database with psql
@docker exec -it $(DB_CONTAINER_NAME) psql -U testuser -d nyx_chain_watcher_test
.PHONY: help
help: ## Show help for Makefile targets
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
+37
View File
@@ -0,0 +1,37 @@
# Nyx Chain Watcher
A simple binary to watch addresses on the Nyx chain and to call webhooks when particular message types are in a block.
Look in [env.rs](./src/env.rs) for the names of environment variables that can be overridden.
## Running locally
Connect with `psql` to your local database:
```sql
CREATE USER nyx_chain_scraper WITH PASSWORD 'scrapymcscrapeface';
CREATE DATABASE nyx_chain_scraper_data;
GRANT ALL ON DATABASE nyx_chain_scraper_data TO nyx_chain_scraper;
```
Then run:
```
cargo run -- init --chain-history-db-connection-string postgres://nyx_chain_scraper:scrapymcscrapeface@localhost/nyx_chain_scraper_data
```
```
NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH=postgres://nyx_chain_scraper:scrapymcscrapeface@localhost/nyx_chain_scraper_data \
NYX_CHAIN_WATCHER_WATCH_ACCOUNTS=n1...,n1...,n1... \
NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES="/cosmos.bank.v1beta1.MsgSend,/ibc.applications.transfer.v1.MsgTransfer"
NYX_CHAIN_WATCHER_WEBHOOK_URL="https://webhook.site" \
NYX_CHAIN_WATCHER_WEBHOOK_AUTH=1234 \
cargo run -- run
```
## sqlx
If you have issues with `sqlx` please see [README_SQLX.md](../README_SQLX.md).
+8
View File
@@ -0,0 +1,8 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
fn main() {
if let Ok(database_url) = std::env::var("DATABASE_URL") {
println!("cargo::rustc-env=DATABASE_URL={database_url}");
}
}
@@ -0,0 +1,21 @@
services:
postgres-test:
image: postgres:16-alpine
container_name: nyx_chain_watcher_postgres_test
environment:
POSTGRES_DB: nyx_chain_watcher_test
POSTGRES_USER: testuser
POSTGRES_PASSWORD: testpass
ports:
- '5433:5432' # Map to 5433 to avoid conflicts with default PostgreSQL
healthcheck:
test: ['CMD-SHELL', 'pg_isready -U testuser -d nyx_chain_watcher_test']
interval: 5s
timeout: 5s
retries: 5
# Optional: Add volume for persistent data during development
# volumes:
# - nyx_chain_watcher_postgres_test_data:/var/lib/postgresql/data
#volumes:
# nyx_chain_watcher_postgres_test_data:
@@ -0,0 +1,8 @@
CREATE TABLE price_history (
timestamp bigint PRIMARY KEY,
chf double precision NOT NULL,
usd double precision NOT NULL,
eur double precision NOT NULL,
btc double precision NOT NULL,
gbp double precision NOT NULL
);
@@ -0,0 +1,10 @@
CREATE TABLE payments (
id INTEGER PRIMARY KEY,
transaction_hash TEXT NOT NULL UNIQUE,
sender_address TEXT NOT NULL,
receiver_address TEXT NOT NULL,
amount double precision NOT NULL,
timestamp bigint NOT NULL,
height bigint NOT NULL,
memo TEXT
);
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY,
tx_hash TEXT NOT NULL,
height BIGINT NOT NULL,
message_index BIGINT NOT NULL,
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
amount TEXT NOT NULL,
memo TEXT,
created_at DATE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(tx_hash, message_index)
);
@@ -0,0 +1,11 @@
/*
* Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: GPL-3.0-only
*/
CREATE TABLE watcher_execution
(
start_ts TIMESTAMPTZ NOT NULL,
end_ts TIMESTAMPTZ NOT NULL,
error_message TEXT
);
@@ -0,0 +1,221 @@
use crate::config::PaymentWatchersConfig;
use crate::db::DbPool;
use crate::env::vars::{
NYXD_SCRAPER_START_HEIGHT, NYXD_SCRAPER_UNSAFE_NUKE_DB,
NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT,
};
use crate::http::state::BankScraperModuleState;
use async_trait::async_trait;
use nym_validator_client::nyxd::{Any, Coin, CosmosCoin, Hash, Msg, MsgSend, Name};
use nyxd_scraper_psql::{
MsgModule, NyxdScraperTransaction, ParsedTransactionResponse, PostgresNyxdScraper,
PruningOptions, ScraperError,
};
use std::fs;
use tracing::{info, warn};
pub(crate) async fn run_chain_scraper(
config: &crate::config::Config,
connection_pool: DbPool,
shared_state: BankScraperModuleState,
) -> anyhow::Result<PostgresNyxdScraper> {
let websocket_url = std::env::var("NYXD_WS").expect("NYXD_WS not defined");
let rpc_url = std::env::var("NYXD").expect("NYXD not defined");
let websocket_url = reqwest::Url::parse(&websocket_url)?;
let rpc_url = reqwest::Url::parse(&rpc_url)?;
// why are those not part of CLI? : (
let start_block_height = match std::env::var(NYXD_SCRAPER_START_HEIGHT).ok() {
None => None,
// blow up if passed malformed env value
Some(raw) => Some(raw.parse()?),
};
let use_best_effort_start_height =
match std::env::var(NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT).ok() {
None => false,
// blow up if passed malformed env value
Some(raw) => raw.parse()?,
};
let nuke_db: bool = match std::env::var(NYXD_SCRAPER_UNSAFE_NUKE_DB).ok() {
None => false,
// blow up if passed malformed env value
Some(raw) => raw.parse()?,
};
if nuke_db {
warn!("☢️☢️☢️ NUKING THE SCRAPER DATABASE");
fs::remove_file(config.chain_scraper_connection_string())?;
}
let scraper = PostgresNyxdScraper::builder(nyxd_scraper_psql::Config {
websocket_url,
rpc_url,
database_storage: config.chain_scraper_connection_string.clone(),
pruning_options: PruningOptions::nothing(),
store_precommits: false,
start_block: nyxd_scraper_psql::StartingBlockOpts {
start_block_height,
use_best_effort_start_height,
},
})
.with_msg_module(BankScraperModule::new(
connection_pool,
config.payment_watcher_config.clone(),
shared_state,
));
let instance = scraper.build_and_start().await?;
info!("🚧 blocking until the chain has caught up...");
instance.wait_for_startup_sync().await;
Ok(instance)
}
pub struct BankScraperModule {
connection_pool: DbPool,
payment_config: PaymentWatchersConfig,
shared_state: BankScraperModuleState,
}
impl BankScraperModule {
pub fn new(
connection_pool: DbPool,
payment_config: PaymentWatchersConfig,
shared_state: BankScraperModuleState,
) -> Self {
Self {
connection_pool,
payment_config,
shared_state,
}
}
#[allow(clippy::too_many_arguments)]
async fn store_transfer_event(
&self,
tx_hash: &str,
height: i64,
message_index: i64,
sender: String,
recipient: String,
amount: String,
memo: Option<String>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO transactions (tx_hash, height, message_index, sender, recipient, amount, memo)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
tx_hash,
height,
message_index,
sender,
recipient,
amount,
memo
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
fn get_unym_coin(&self, coins: &[CosmosCoin]) -> Option<Coin> {
coins
.iter()
.find(|coin| coin.denom.as_ref() == "unym")
.map(|c| c.clone().into())
}
// TODO: ideally this should be done by the scraper itself
fn recover_bank_msg(
&self,
tx_hash: Hash,
index: usize,
msg: &Any,
) -> Result<MsgSend, ScraperError> {
MsgSend::from_any(msg).map_err(|source| ScraperError::MsgParseFailure {
hash: tx_hash,
index,
type_url: self.type_url(),
source,
})
}
}
#[async_trait]
impl MsgModule for BankScraperModule {
fn type_url(&self) -> String {
<MsgSend as Msg>::Proto::type_url()
}
async fn handle_msg(
&mut self,
index: usize,
msg: &Any,
tx: &ParsedTransactionResponse,
_storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError> {
let memo = tx.tx.body.memo.clone();
// Don't process failed transactions
if !tx.tx_result.code.is_ok() {
return Ok(());
}
let msg = self.recover_bank_msg(tx.hash, index, msg)?;
// Check if any watcher is watching this recipient
let is_watched = self
.payment_config
.is_being_watched(msg.to_address.as_ref());
self.shared_state
.new_bank_msg(tx, index, &msg, is_watched)
.await;
if is_watched {
let Some(unym_coin) = self.get_unym_coin(&msg.amount) else {
let warn = format!(
"{} sent {:?} instead of unym!",
msg.from_address, msg.amount
);
warn!("{warn}");
self.shared_state
.new_rejection(tx.hash.to_string(), tx.height.value(), index as u32, warn)
.await;
// we don't want to fail the whole processing - this is not a failure in that sense!
return Ok(());
};
if let Err(err) = self
.store_transfer_event(
&tx.hash.to_string(),
tx.height.value() as i64,
index as i64,
msg.from_address.to_string(),
msg.to_address.to_string(),
unym_coin.to_string(),
Some(memo.clone()),
)
.await
{
warn!("Failed to store transfer event: {err}");
self.shared_state
.new_rejection(
tx.hash.to_string(),
tx.height.value(),
index as u32,
format!("storage failure: {err}"),
)
.await;
}
}
Ok(())
}
}

Some files were not shown because too many files have changed in this diff Show More