Compare commits

...

27 Commits

Author SHA1 Message Date
Mark Sinclair d2e9f1cc1a update offline queries 2025-12-02 12:52:36 +00:00
Mark Sinclair 2c989071ef migrations - remove unnecessary columns 2025-12-02 12:47:50 +00:00
Mark Sinclair e05d1f50c7 process events and logs 2025-12-02 12:47:43 +00:00
Mark Sinclair bce7871c41 add reset target to make file 2025-12-02 12:47:36 +00:00
benedettadavico 3ea31075fe update runner 2025-12-02 12:42:52 +00:00
Mark Sinclair d455620789 build.rs checks data observatory migrations have content of all shared scraper migrations and errors on changes or new files 2025-12-02 12:42:52 +00:00
Mark Sinclair 67b69f655f add check for manual migration sync that will fail on cargo build in CI 2025-12-02 12:42:52 +00:00
Mark Sinclair c2e82a0ebf duplicate shared migrations to keep things simple 2025-12-02 12:42:52 +00:00
Mark Sinclair dc78f979e1 remove copying from shared migrations 2025-12-02 12:42:52 +00:00
Mark Sinclair 9bef69d967 rename migration files 2025-12-02 12:42:52 +00:00
Mark Sinclair f53ac2b08f add glob to workspace dependencies 2025-12-02 12:42:52 +00:00
Jędrzej Stuczyński c9676de462 updated cosmrs version used by the nym wallet 2025-12-02 12:42:52 +00:00
Mark Sinclair 0196465da1 add missing things and make clippy happy 2025-12-02 12:42:52 +00:00
Mark Sinclair 94c85f4dbf ignore profiler output 2025-12-02 12:42:52 +00:00
Mark Sinclair b31c8b5cc6 change webhook module from msg to tx handler 2025-12-02 12:42:52 +00:00
Mark Sinclair 82d3b61d7a lock file 2025-12-02 12:42:52 +00:00
Mark Sinclair 9dcc1b41a6 tidy up validator rewarder 2025-12-02 12:42:52 +00:00
Mark Sinclair 0334bdc816 tidy up README, startup info, typos 2025-12-02 12:42:51 +00:00
Mark Sinclair 9517beecfa change to clap args and use url::Url to parse args 2025-12-02 12:42:51 +00:00
Mark Sinclair 51b6741f3e update offline queries 2025-12-02 12:42:51 +00:00
Mark Sinclair c3a6ce8150 copy shared migrations and add comments to ignore file to explain 2025-12-02 12:42:51 +00:00
Mark Sinclair 5526c5bffd formatting and clippy 2025-12-02 12:42:51 +00:00
Mark Sinclair 3fc6f6f2c9 handle wasm messages in a module 2025-12-02 12:42:51 +00:00
Mark Sinclair bfb762128d move message parsing and change webhook 2025-12-02 12:42:51 +00:00
Mark Sinclair b7c99f802d add the data observatory 2025-12-02 12:42:49 +00:00
Mark Sinclair 188fb5b970 add the data observatory 2025-12-02 12:41:37 +00:00
Jędrzej Stuczyński d4ace76273 rename nyxd-scraper to sqlite
wip: made storage mostly generic minus modules

changed error types to make modules dyn compatible

implemented traits for sqlite instance

using sqlite instance for rewarder and chain watcher

psql scaffolding

initial postgres support - missing some proto -> json parsing

use postgres in chain scraper

added message registry to block processor

message content parsing in psql

involved addresses

adding null value for logs

Revert "use postgres in chain scraper"

This reverts commit 83c84bfd2d.

using SignerInfo proto definitions for db serialisation

added ibc messages to MessageRegistry
2025-12-02 12:41:37 +00:00
186 changed files with 7325 additions and 897 deletions
+1 -1
View File
@@ -8,7 +8,7 @@ env:
jobs:
build-container:
runs-on: arc-ubuntu-22.04-dind
runs-on: arc-linux-latest-dind
steps:
- name: Login to Harbor
uses: docker/login-action@v3
+2
View File
@@ -63,3 +63,5 @@ nym-api/redocly/formatted-openapi.json
**/settings.sql
**/enter_db.sh
*.profraw
Generated
+466 -322
View File
File diff suppressed because it is too large Load Diff
+8 -2
View File
@@ -87,7 +87,9 @@ members = [
"common/nymsphinx/params",
"common/nymsphinx/routing",
"common/nymsphinx/types",
"common/nyxd-scraper",
"common/nyxd-scraper-sqlite",
"common/nyxd-scraper-psql",
"common/nyxd-scraper-shared",
"common/pemstore",
"common/registration",
"common/serde-helpers",
@@ -124,6 +126,7 @@ members = [
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-ip-packet-client",
"nym-network-monitor",
"nym-node",
@@ -263,6 +266,7 @@ futures = "0.3.31"
futures-util = "0.3"
generic-array = "0.14.7"
getrandom = "0.2.10"
glob = "0.3"
handlebars = "3.5.5"
hex = "0.4.3"
hickory-resolver = "0.25"
@@ -398,7 +402,9 @@ cw-multi-test = "=2.3.2"
bip32 = { version = "0.5.3", default-features = false }
cosmrs = { version = "0.21.1" }
cosmrs = { version = "0.22.0" }
cosmos-sdk-proto = { version = "0.27.0" }
ibc-proto = { version = "0.52.0" }
tendermint = "0.40.4"
tendermint-rpc = "0.40.4"
prost = { version = "0.13", default-features = false }
@@ -0,0 +1,27 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO transaction\n (hash, height, index, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs, events)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)\n ON CONFLICT (hash) DO UPDATE\n SET height = excluded.height,\n index = excluded.index,\n success = excluded.success,\n messages = excluded.messages,\n memo = excluded.memo,\n signatures = excluded.signatures,\n signer_infos = excluded.signer_infos,\n fee = excluded.fee,\n gas_wanted = excluded.gas_wanted,\n gas_used = excluded.gas_used,\n raw_log = excluded.raw_log,\n logs = excluded.logs,\n events = excluded.events\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Int4",
"Bool",
"Jsonb",
"Text",
"TextArray",
"Jsonb",
"Jsonb",
"Int8",
"Int8",
"Text",
"Jsonb",
"Jsonb"
]
},
"nullable": []
},
"hash": "08f4e54ac24fccd54f4208797b3749e457f8cd4ba3d7d906a7ab3bf5b4e7dc9c"
}
@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO validator (consensus_address, consensus_pubkey)\n VALUES ($1, $2)\n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text"
]
},
"nullable": []
},
"hash": "0d3709efacf763b06bf14803bb803b5ee5b27879b0026bb0480b3f2722318a75"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM pre_commit WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "1c2fb0e9ffceca21ef8dbea19b116422b1f723d0a316314b50c43c8b29f8891d"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT height\n FROM block\n ORDER BY height ASC\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "height",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "2561fb016951ea4cd29e43fb9a4a93e944b0d44ed1f7c1036f306e34372da11c"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE metadata SET last_processed_height = GREATEST(last_processed_height, $1)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "2679cdf11fa66c7920678cde860c57402119ec7c3aae731b0da831327301466f"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE pruning SET last_pruned_height = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "36ba5941aca6e7b604a10b8b0aba70635028f392fe794d6131827b083e1755e1"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT last_pruned_height FROM pruning\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "last_pruned_height",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "3bdf81a9db6075f6f77224c30553f419a849d4ec45af40b052a4cbf09b44f3ec"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM message WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "52c27143720ddfdfd0f5644b60f5b67fd9281ce1de0653efa53b9d9b93cf335d"
}
@@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO pre_commit (validator_address, height, timestamp, voting_power, proposer_priority)\n VALUES ($1, $2, $3, $4, $5)\n ON CONFLICT (validator_address, timestamp) DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Timestamp",
"Int8",
"Int8"
]
},
"nullable": []
},
"hash": "62e14613f5ffe692346a79086857a22f0444fbc679db1c06b651fb8b5538b278"
}
@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO block (height, hash, num_txs, total_gas, proposer_address, timestamp)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8",
"Text",
"Int4",
"Int8",
"Text",
"Timestamp"
]
},
"nullable": []
},
"hash": "64a484fd46d8ec46797f944a4cced56b6e270ce186f0e49528865d1924343b78"
}
@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT height\n FROM block\n WHERE timestamp < $1\n ORDER BY timestamp DESC\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "height",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Timestamp"
]
},
"nullable": [
false
]
},
"hash": "7e82426f5dbcadf1631ba1a806e19cc462d04222fb20ad76de2a40f3f4f8fe15"
}
@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT height\n FROM block\n WHERE timestamp > $1\n ORDER BY timestamp\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "height",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Timestamp"
]
},
"nullable": [
false
]
},
"hash": "9455331f9be5a3be28e2bd399a36b2e2d6a9ad4b225c4c883aafc4e9f0428008"
}
@@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT COUNT(*) as count FROM pre_commit\n WHERE\n validator_address = $1\n AND height >= $2\n AND height <= $3\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "count",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Text",
"Int8",
"Int8"
]
},
"nullable": [
null
]
},
"hash": "bc7795e58ce71893c3f32a19db8e77b7bc0a1af315ffd42c3e68156d6e4ace70"
}
@@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT * FROM validator\n WHERE EXISTS (\n SELECT 1 FROM pre_commit\n WHERE height = $1\n AND pre_commit.validator_address = validator.consensus_address\n )\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "consensus_address",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "consensus_pubkey",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
false,
false
]
},
"hash": "be43d4873911deca784b7be0531ab7bd82ecd68041aa932a56c8ce09623251e4"
}
@@ -0,0 +1,20 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT last_processed_height FROM metadata\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "last_processed_height",
"type_info": "Int8"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
},
"hash": "c88d07fecc3f33deaa6e93db1469ce71582635df47f52dcf3fd1df4e7be6b96d"
}
@@ -0,0 +1,19 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO message(transaction_hash, index, type, value, involved_accounts_addresses, height)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT (transaction_hash, index) DO UPDATE\n SET height = excluded.height,\n type = excluded.type,\n value = excluded.value,\n involved_accounts_addresses = excluded.involved_accounts_addresses\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Text",
"Jsonb",
"TextArray",
"Int8"
]
},
"nullable": []
},
"hash": "cc0ae74082d7d8a89f2d3364676890bbf6150ab394c72783114340d4def5f9ef"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM block WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "cdba9b267f143c8a8c6c3d6ed713cf00236490b86779559d84740ec18bcfa3a9"
}
@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM transaction WHERE height < $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": []
},
"hash": "d89558c37c51e8e6b1e6a9d5a2b13d0598fd856aa019a0cbbae12d7cafb4672f"
}
+34
View File
@@ -0,0 +1,34 @@
[package]
name = "nyxd-scraper-psql"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
readme.workspace = true
[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
itertools = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "macros", "migrate", "time"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
nyxd-scraper-shared = { path = "../nyxd-scraper-shared" }
# temp due to cosmrs redefinitions for serde
cosmrs = { workspace = true }
[build-dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "macros", "migrate"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[lints]
workspace = true
+105
View File
@@ -0,0 +1,105 @@
# Makefile for nyxd-scraper-psql database management
# --- Configuration ---
TEST_DATABASE_URL := postgres://testuser:testpass@localhost:5433/nyxd_scraper_test
# Docker compose service names
DB_SERVICE_NAME := postgres-test
DB_CONTAINER_NAME := nyxd_scraper_psql_test
# Default target
.PHONY: default
default: help
# --- Main Targets ---
.PHONY: prepare-pg
prepare-pg: test-db-up test-db-wait test-db-migrate test-db-prepare test-db-down ## Setup PostgreSQL and prepare SQLx offline cache
.PHONY: test-db
test-db: test-db-up test-db-wait test-db-migrate test-db-run test-db-down ## Run tests with PostgreSQL database
.PHONY: dev-db
dev-db: test-db-up test-db-wait test-db-migrate ## Start PostgreSQL for development (keeps running)
@echo "PostgreSQL is running on port 5433"
@echo "Connection string: $(TEST_DATABASE_URL)"
.PHONY: dev-db-restart
dev-db-restart: clean-db dev-db
# --- Docker Compose Targets ---
.PHONY: test-db-up
test-db-up: ## Start the PostgreSQL test database in the background
@echo "Starting PostgreSQL test database..."
docker compose up -d $(DB_SERVICE_NAME)
.PHONY: test-db-wait
test-db-wait: ## Wait for the PostgreSQL database to be healthy
@echo "Waiting for PostgreSQL database..."
@while ! docker inspect --format='{{.State.Health.Status}}' $(DB_CONTAINER_NAME) 2>/dev/null | grep -q 'healthy'; do \
echo -n "."; \
sleep 1; \
done; \
echo " Database is healthy!"
.PHONY: test-db-down
test-db-down: ## Stop and remove the test database
@echo "Stopping PostgreSQL test database..."
docker compose down
# --- SQLx Targets ---
.PHONY: test-db-migrate
test-db-migrate: ## Run database migrations against PostgreSQL
@echo "Running PostgreSQL migrations..."
DATABASE_URL="$(TEST_DATABASE_URL)" sqlx migrate run --source sql_migrations
.PHONY: test-db-prepare
test-db-prepare: ## Run sqlx prepare for compile-time query verification
@echo "Running sqlx prepare for PostgreSQL..."
DATABASE_URL="$(TEST_DATABASE_URL)" cargo sqlx prepare
# --- Build and Test Targets ---
.PHONY: test-db-run
test-db-run: ## Run tests with PostgreSQL feature
@echo "Running tests with PostgreSQL..."
DATABASE_URL="$(TEST_DATABASE_URL)" cargo test --features pg --no-default-features
.PHONY: build-pg
build-pg: ## Build with PostgreSQL feature
@echo "Building with PostgreSQL feature..."
cargo build
.PHONY: check-pg
check-pg: ## Check code with PostgreSQL feature
@echo "Checking code with PostgreSQL feature..."
cargo check
.PHONY: clippy
clippy: clippy-pg
.PHONY: clippy-pg
clippy-pg: ## Run clippy with PostgreSQL feature
@echo "Running clippy with PostgreSQL feature..."
cargo clippy -- -D warnings
# --- Cleanup Targets ---
.PHONY: clean
clean: ## Clean build artifacts and SQLx cache
cargo clean
rm -rf .sqlx
.PHONY: clean-db
clean-db: test-db-down ## Stop database and clean volumes
docker volume rm -f nym-node-status-api_postgres_test_data 2>/dev/null || true
# --- Utility Targets ---
.PHONY: sqlx-cli
sqlx-cli: ## Install sqlx-cli if not already installed
@command -v sqlx >/dev/null 2>&1 || cargo install sqlx-cli --features postgres
.PHONY: psql
psql: ## Connect to the running PostgreSQL database with psql
@docker exec -it $(DB_CONTAINER_NAME) psql -U testuser -d nyxd_scraper_test
.PHONY: help
help: ## Show help for Makefile targets
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
+80
View File
@@ -0,0 +1,80 @@
## Quick Start with PostgreSQL
### 1. Install Prerequisites
```bash
# Install sqlx-cli if not already installed
make sqlx-cli
```
### 2. Prepare PostgreSQL for Development
```bash
# This will:
# - Start PostgreSQL in Docker
# - Run migrations
# - Generate SQLx offline query cache
# - Stop the database
make prepare-pg
```
### 3. Build with PostgreSQL
```bash
# Build with PostgreSQL feature
make build-pg
# Or manually:
cargo build
```
### 4. Run with PostgreSQL
```bash
# Start PostgreSQL for development (keeps running)
make dev-db
# In another terminal, run the application
DATABASE_URL=postgres://testuser:testpass@localhost:5433/nym_node_status_api_test \
cargo run
```
## Makefile Targets
```bash
make help # Show all available targets
make prepare-pg # Setup PostgreSQL and prepare SQLx cache
make dev-db # Start PostgreSQL for development
make test-db # Run tests with PostgreSQL
make build-pg # Build with PostgreSQL
make psql # Connect to running PostgreSQL
make clean # Clean build artifacts
make clean-db # Stop database and clean volumes
make dev-db-restart # Stop database, clean volumes, rebuild test database and restart
```
## Environment Variables
See `.env.example` for all configuration options. Key variable:
```bash
# For PostgreSQL:
DATABASE_URL=postgres://testuser:testpass@localhost:5433/nym_node_status_api_test
```
## Troubleshooting
### SQLx Offline Mode
If you see "no cached data for this query" errors:
1. Ensure PostgreSQL is running: `make dev-db`
2. Run: `make test-db-prepare`
### Connection Refused
If you see "Connection refused" errors:
1. Check Docker is running: `docker ps`
2. Check PostgreSQL container: `docker ps | grep nym_node_status_api_postgres_test`
3. Restart database: `make test-db-down && make dev-db`
+8
View File
@@ -0,0 +1,8 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
fn main() {
if let Ok(database_url) = std::env::var("DATABASE_URL") {
println!("cargo::rustc-env=DATABASE_URL={database_url}");
}
}
@@ -0,0 +1,21 @@
services:
postgres-test:
image: postgres:16-alpine
container_name: nyxd_scraper_psql_test
environment:
POSTGRES_DB: nyxd_scraper_test
POSTGRES_USER: testuser
POSTGRES_PASSWORD: testpass
ports:
- '5433:5432' # Map to 5433 to avoid conflicts with default PostgreSQL
healthcheck:
test: [ 'CMD-SHELL', 'pg_isready -U testuser -d nyxd_scraper_test' ]
interval: 5s
timeout: 5s
retries: 5
# Optional: Add volume for persistent data during development
# volumes:
# - postgres_test_data:/var/lib/postgresql/data
# volumes:
# postgres_test_data:
@@ -0,0 +1,10 @@
/*
* Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
* SPDX-License-Identifier: Apache-2.0
*/
CREATE TABLE METADATA
(
id INTEGER PRIMARY KEY CHECK (id = 0),
last_processed_height BIGINT NOT NULL
);
@@ -0,0 +1,127 @@
CREATE TABLE validator
(
consensus_address TEXT NOT NULL PRIMARY KEY, /* Validator consensus address */
consensus_pubkey TEXT NOT NULL UNIQUE /* Validator consensus public key */
);
CREATE TABLE pre_commit
(
validator_address TEXT NOT NULL REFERENCES validator (consensus_address),
height BIGINT NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
voting_power BIGINT NOT NULL,
proposer_priority BIGINT NOT NULL,
UNIQUE (validator_address, timestamp)
);
CREATE INDEX pre_commit_validator_address_index ON pre_commit (validator_address);
CREATE INDEX pre_commit_height_index ON pre_commit (height);
CREATE TABLE block
(
height BIGINT UNIQUE PRIMARY KEY,
hash TEXT NOT NULL UNIQUE,
num_txs INTEGER DEFAULT 0,
total_gas BIGINT DEFAULT 0,
proposer_address TEXT REFERENCES validator (consensus_address),
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
CREATE INDEX block_height_index ON block (height);
CREATE INDEX block_hash_index ON block (hash);
CREATE INDEX block_proposer_address_index ON block (proposer_address);
ALTER TABLE block
SET (
autovacuum_vacuum_scale_factor = 0,
autovacuum_analyze_scale_factor = 0,
autovacuum_vacuum_threshold = 10000,
autovacuum_analyze_threshold = 10000
);
CREATE TABLE transaction
(
hash TEXT NOT NULL,
height BIGINT NOT NULL REFERENCES block (height),
"index" INTEGER NOT NULL, -- <<<=== not present in original bdjuno table, but it's quite useful
success BOOLEAN NOT NULL,
/* Body */
messages JSONB NOT NULL DEFAULT '[]'::JSONB,
memo TEXT,
signatures TEXT[] NOT NULL,
/* AuthInfo */
signer_infos JSONB NOT NULL DEFAULT '[]'::JSONB,
fee JSONB NOT NULL DEFAULT '{}'::JSONB,
/* Tx response */
gas_wanted BIGINT DEFAULT 0,
gas_used BIGINT DEFAULT 0,
raw_log TEXT,
logs JSONB,
events JSONB,
CONSTRAINT unique_tx UNIQUE (hash)
);
CREATE INDEX transaction_hash_index ON transaction (hash);
CREATE INDEX transaction_height_index ON transaction (height);
CREATE TYPE COIN AS
(
denom TEXT,
amount TEXT
);
CREATE TABLE message
(
transaction_hash TEXT NOT NULL,
index BIGINT NOT NULL,
type TEXT NOT NULL,
value JSONB NOT NULL,
involved_accounts_addresses TEXT[] NOT NULL,
height BIGINT NOT NULL,
funds COIN[] DEFAULT '{}',
FOREIGN KEY (transaction_hash) REFERENCES transaction (hash),
CONSTRAINT unique_message_per_tx UNIQUE (transaction_hash, index)
);
CREATE INDEX message_transaction_hash_index ON message (transaction_hash);
CREATE INDEX message_type_index ON message (type);
CREATE INDEX message_involved_accounts_index ON message USING GIN (involved_accounts_addresses);
/**
* This function is used to find all the utils that involve any of the given addresses and have
* type that is one of the specified types.
*/
CREATE FUNCTION messages_by_address(
addresses TEXT[],
types TEXT[],
"limit" BIGINT = 100,
"offset" BIGINT = 0)
RETURNS SETOF message AS
$$
SELECT *
FROM message
WHERE (cardinality(types) = 0 OR type = ANY (types))
AND addresses && involved_accounts_addresses
ORDER BY height DESC
LIMIT "limit" OFFSET "offset"
$$ LANGUAGE sql STABLE;
CREATE FUNCTION messages_by_type(
types text[],
"limit" bigint DEFAULT 100,
"offset" bigint DEFAULT 0)
RETURNS SETOF message AS
$$
SELECT *
FROM message
WHERE (cardinality(types) = 0 OR type = ANY (types))
ORDER BY height DESC
LIMIT "limit" OFFSET "offset"
$$ LANGUAGE sql STABLE;
CREATE TABLE pruning
(
last_pruned_height BIGINT NOT NULL
);
+43
View File
@@ -0,0 +1,43 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nyxd_scraper_shared::helpers::MalformedDataError;
use nyxd_scraper_shared::storage::NyxdScraperStorageError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum PostgresScraperError {
#[error("experienced internal database error: {0}")]
InternalDatabaseError(#[from] sqlx::error::Error),
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("failed to begin storage tx: {source}")]
StorageTxBeginFailure {
#[source]
source: sqlx::error::Error,
},
#[error("failed to commit storage tx: {source}")]
StorageTxCommitFailure {
#[source]
source: sqlx::error::Error,
},
#[error(transparent)]
MalformedData(#[from] MalformedDataError),
// TOOD: add struct name
#[error("json serialisation failure: {source}")]
SerialisationFailure {
#[from]
source: serde_json::Error,
},
}
impl From<PostgresScraperError> for NyxdScraperStorageError {
fn from(err: PostgresScraperError) -> Self {
NyxdScraperStorageError::new(err)
}
}
+21
View File
@@ -0,0 +1,21 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::block_storage::PostgresScraperStorage;
use nyxd_scraper_shared::NyxdScraper;
pub use nyxd_scraper_shared::constants;
pub use nyxd_scraper_shared::error::ScraperError;
pub use nyxd_scraper_shared::{
BlockModule, MsgModule, NyxdScraperTransaction, ParsedTransactionResponse, PruningOptions,
PruningStrategy, StartingBlockOpts, TxModule,
};
pub use storage::models;
pub mod error;
pub mod storage;
pub type PostgresNyxdScraper = NyxdScraper<PostgresScraperStorage>;
// TODO: for now just use exactly the same config
pub use nyxd_scraper_shared::Config;
@@ -0,0 +1,236 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::PostgresScraperError;
use crate::models::{CommitSignature, Validator};
use crate::storage::manager::{
StorageManager, prune_blocks, prune_messages, prune_pre_commits, prune_transactions,
update_last_pruned,
};
use crate::storage::transaction::PostgresStorageTransaction;
use async_trait::async_trait;
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use nyxd_scraper_shared::storage::{NyxdScraperStorage, NyxdScraperStorageError};
use sqlx::types::time::{OffsetDateTime, PrimitiveDateTime};
use tokio::time::Instant;
use tracing::{debug, error, info, instrument, warn};
#[derive(Clone)]
pub struct PostgresScraperStorage {
pub(crate) manager: StorageManager,
}
impl PostgresScraperStorage {
#[instrument]
pub async fn init(connection_string: &str) -> Result<Self, PostgresScraperError> {
debug!("initialising scraper database with '{connection_string}'",);
let connection_pool = match sqlx::PgPool::connect(connection_string).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./sql_migrations")
.run(&connection_pool)
.await
{
warn!("Failed to initialize SQLx database: {err}");
// return Err(err.into());
}
info!("Database migration finished!");
let manager = StorageManager { connection_pool };
manager.set_initial_metadata().await?;
let storage = PostgresScraperStorage { manager };
Ok(storage)
}
#[instrument(skip(self))]
pub async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), PostgresScraperError> {
let start = Instant::now();
let mut tx = self.begin_processing_tx().await?;
prune_messages(oldest_to_keep.into(), &mut **tx).await?;
prune_transactions(oldest_to_keep.into(), &mut **tx).await?;
prune_pre_commits(oldest_to_keep.into(), &mut **tx).await?;
prune_blocks(oldest_to_keep.into(), &mut **tx).await?;
update_last_pruned(current_height.into(), &mut **tx).await?;
let commit_start = Instant::now();
tx.inner
.commit()
.await
.map_err(|source| PostgresScraperError::StorageTxCommitFailure { source })?;
log_db_operation_time("committing pruning tx", commit_start);
log_db_operation_time("pruning storage", start);
Ok(())
}
#[instrument(skip_all)]
pub async fn begin_processing_tx(
&self,
) -> Result<PostgresStorageTransaction, PostgresScraperError> {
debug!("starting storage tx");
self.manager
.connection_pool
.begin()
.await
.map(|inner| PostgresStorageTransaction { inner })
.map_err(|source| PostgresScraperError::StorageTxBeginFailure { source })
}
pub async fn lowest_block_height(&self) -> Result<Option<i64>, PostgresScraperError> {
Ok(self.manager.get_lowest_block().await?)
}
pub async fn get_first_block_height_after(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, PostgresScraperError> {
let time = PrimitiveDateTime::new(time.date(), time.time());
Ok(self.manager.get_first_block_height_after(time).await?)
}
pub async fn get_last_block_height_before(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, PostgresScraperError> {
let time = PrimitiveDateTime::new(time.date(), time.time());
Ok(self.manager.get_last_block_height_before(time).await?)
}
pub async fn get_blocks_between(
&self,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, PostgresScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
Ok(block_end - block_start)
}
pub async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, PostgresScraperError> {
Ok(self
.manager
.get_signed_between(consensus_address, start_height, end_height)
.await?)
}
pub async fn get_signed_between_times(
&self,
consensus_address: &str,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, PostgresScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
self.get_signed_between(consensus_address, block_start, block_end)
.await
}
pub async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, PostgresScraperError> {
Ok(self
.manager
.get_precommit(consensus_address, height)
.await?)
}
pub async fn get_block_signers(
&self,
height: i64,
) -> Result<Vec<Validator>, PostgresScraperError> {
Ok(self.manager.get_block_validators(height).await?)
}
pub async fn get_all_known_validators(&self) -> Result<Vec<Validator>, PostgresScraperError> {
Ok(self.manager.get_validators().await?)
}
pub async fn get_last_processed_height(&self) -> Result<i64, PostgresScraperError> {
Ok(self.manager.get_last_processed_height().await?)
}
pub async fn get_pruned_height(&self) -> Result<i64, PostgresScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
}
#[async_trait]
impl NyxdScraperStorage for PostgresScraperStorage {
type StorageTransaction = PostgresStorageTransaction;
async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError> {
PostgresScraperStorage::init(storage)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError> {
self.begin_processing_tx()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_last_processed_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_pruned_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError> {
self.lowest_block_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), NyxdScraperStorageError> {
self.prune_storage(oldest_to_keep, current_height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
@@ -0,0 +1,25 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use cosmrs::AccountId;
use itertools::Itertools;
use nyxd_scraper_shared::ParsedTransactionResponse;
use std::str::FromStr;
// replicate behaviour of `CosmosMessageAddressesParser` from juno
pub(crate) fn parse_addresses_from_events(tx: &ParsedTransactionResponse) -> Vec<String> {
let mut addresses: Vec<String> = Vec::new();
for event in &tx.tx_result.events {
for attribute in &event.attributes {
let Ok(value) = attribute.value_str() else {
continue;
};
// Try parsing the address as an account address
if let Ok(address) = AccountId::from_str(value) {
addresses.push(address.to_string());
}
}
}
addresses.into_iter().unique().collect()
}
@@ -0,0 +1,543 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::models::{CommitSignature, Validator};
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use sqlx::types::JsonValue;
use sqlx::types::time::PrimitiveDateTime;
use sqlx::{Executor, Postgres};
use tokio::time::Instant;
use tracing::{instrument, trace};
#[derive(Clone)]
pub(crate) struct StorageManager {
pub(crate) connection_pool: sqlx::Pool<Postgres>,
}
impl StorageManager {
pub(crate) async fn set_initial_metadata(&self) -> Result<(), sqlx::Error> {
if sqlx::query("SELECT * from metadata")
.fetch_optional(&self.connection_pool)
.await?
.is_none()
{
sqlx::query("INSERT INTO metadata (id, last_processed_height) VALUES (0, 0)")
.execute(&self.connection_pool)
.await?;
}
Ok(())
}
pub(crate) async fn get_lowest_block(&self) -> Result<Option<i64>, sqlx::Error> {
trace!("get_lowest_block");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT height
FROM block
ORDER BY height ASC
LIMIT 1
"#,
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_lowest_block", start);
Ok(maybe_record.map(|x| x.height))
}
pub(crate) async fn get_first_block_height_after(
&self,
time: PrimitiveDateTime,
) -> Result<Option<i64>, sqlx::Error> {
trace!("get_first_block_height_after");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT height
FROM block
WHERE timestamp > $1
ORDER BY timestamp
LIMIT 1
"#,
time
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_first_block_height_after", start);
Ok(maybe_record.map(|x| x.height))
}
pub(crate) async fn get_last_block_height_before(
&self,
time: PrimitiveDateTime,
) -> Result<Option<i64>, sqlx::Error> {
trace!("get_last_block_height_before");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT height
FROM block
WHERE timestamp < $1
ORDER BY timestamp DESC
LIMIT 1
"#,
time
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_last_block_height_before", start);
Ok(maybe_record.map(|x| x.height))
}
pub(crate) async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, sqlx::Error> {
trace!("get_signed_between");
let start = Instant::now();
let count = sqlx::query!(
r#"
SELECT COUNT(*) as count FROM pre_commit
WHERE
validator_address = $1
AND height >= $2
AND height <= $3
"#,
consensus_address,
start_height,
end_height
)
.fetch_one(&self.connection_pool)
.await?
.count;
log_db_operation_time("get_signed_between", start);
Ok(count.unwrap_or(0))
}
pub(crate) async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, sqlx::Error> {
trace!("get_precommit");
let start = Instant::now();
let res = sqlx::query_as(
r#"
SELECT * FROM pre_commit
WHERE validator_address = $1
AND height = $2
"#,
)
.bind(consensus_address)
.bind(height)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_precommit", start);
Ok(res)
}
pub(crate) async fn get_block_validators(
&self,
height: i64,
) -> Result<Vec<Validator>, sqlx::Error> {
trace!("get_block_validators");
let start = Instant::now();
let res = sqlx::query_as!(
Validator,
r#"
SELECT * FROM validator
WHERE EXISTS (
SELECT 1 FROM pre_commit
WHERE height = $1
AND pre_commit.validator_address = validator.consensus_address
)
"#,
height
)
.fetch_all(&self.connection_pool)
.await?;
log_db_operation_time("get_block_validators", start);
Ok(res)
}
pub(crate) async fn get_validators(&self) -> Result<Vec<Validator>, sqlx::Error> {
trace!("get_validators");
let start = Instant::now();
let res = sqlx::query_as("SELECT * FROM validator")
.fetch_all(&self.connection_pool)
.await?;
log_db_operation_time("get_validators", start);
Ok(res)
}
pub(crate) async fn get_last_processed_height(&self) -> Result<i64, sqlx::Error> {
trace!("get_last_processed_height");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT last_processed_height FROM metadata
"#
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_last_processed_height", start);
if let Some(row) = maybe_record {
#[allow(clippy::useless_conversion)]
Ok(row.last_processed_height.into())
} else {
Ok(-1)
}
}
pub(crate) async fn get_pruned_height(&self) -> Result<i64, sqlx::Error> {
trace!("get_pruned_height");
let start = Instant::now();
let maybe_record = sqlx::query!(
r#"
SELECT last_pruned_height FROM pruning
"#
)
.fetch_optional(&self.connection_pool)
.await?;
log_db_operation_time("get_pruned_height", start);
if let Some(row) = maybe_record {
Ok(row.last_pruned_height)
} else {
Ok(-1)
}
}
}
// make those generic over executor so that they could be performed over connection pool and a tx
#[instrument(skip(executor))]
pub(crate) async fn insert_validator<'a, E>(
consensus_address: String,
consensus_pubkey: String,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_validator");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO validator (consensus_address, consensus_pubkey)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
"#,
consensus_address,
consensus_pubkey
)
.execute(executor)
.await?;
log_db_operation_time("insert_validator", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn insert_block<'a, E>(
height: i64,
hash: String,
num_txs: i32,
total_gas: i64,
proposer_address: String,
timestamp: PrimitiveDateTime,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_block");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO block (height, hash, num_txs, total_gas, proposer_address, timestamp)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT DO NOTHING
"#,
height,
hash,
num_txs,
total_gas,
proposer_address,
timestamp
)
.execute(executor)
.await?;
log_db_operation_time("insert_block", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn insert_precommit<'a, E>(
validator_address: String,
height: i64,
timestamp: PrimitiveDateTime,
voting_power: i64,
proposer_priority: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_precommit");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO pre_commit (validator_address, height, timestamp, voting_power, proposer_priority)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (validator_address, timestamp) DO NOTHING
"#,
validator_address,
height,
timestamp,
voting_power,
proposer_priority
)
.execute(executor)
.await?;
log_db_operation_time("insert_precommit", start);
Ok(())
}
#[instrument(skip(executor))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn insert_transaction<'a, E>(
hash: String,
height: i64,
index: i32,
success: bool,
messages: JsonValue,
memo: String,
signatures: Vec<String>,
signer_infos: JsonValue,
fee: JsonValue,
gas_wanted: i64,
gas_used: i64,
raw_log: String,
logs: JsonValue,
events: JsonValue,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_transaction");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO transaction
(hash, height, index, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs, events)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT (hash) DO UPDATE
SET height = excluded.height,
index = excluded.index,
success = excluded.success,
messages = excluded.messages,
memo = excluded.memo,
signatures = excluded.signatures,
signer_infos = excluded.signer_infos,
fee = excluded.fee,
gas_wanted = excluded.gas_wanted,
gas_used = excluded.gas_used,
raw_log = excluded.raw_log,
logs = excluded.logs,
events = excluded.events
"#,
hash,
height,
index,
success,
messages,
memo,
&signatures,
signer_infos,
fee,
gas_wanted,
gas_used,
raw_log,
logs,
events,
)
.execute(executor)
.await?;
log_db_operation_time("insert_transaction", start);
Ok(())
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip(executor))]
pub(crate) async fn insert_message<'a, E>(
transaction_hash: String,
index: i64,
typ: String,
value: JsonValue,
involved_account_addresses: Vec<String>,
height: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("insert_message");
let start = Instant::now();
sqlx::query!(
r#"
INSERT INTO message(transaction_hash, index, type, value, involved_accounts_addresses, height)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (transaction_hash, index) DO UPDATE
SET height = excluded.height,
type = excluded.type,
value = excluded.value,
involved_accounts_addresses = excluded.involved_accounts_addresses
"#,
transaction_hash,
index,
typ,
value,
&involved_account_addresses,
height,
)
.execute(executor)
.await?;
log_db_operation_time("insert_message", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn update_last_processed<'a, E>(
height: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("update_last_processed");
let start = Instant::now();
sqlx::query!(
"UPDATE metadata SET last_processed_height = GREATEST(last_processed_height, $1)",
height as i32
)
.execute(executor)
.await?;
log_db_operation_time("update_last_processed", start);
Ok(())
}
#[instrument(skip(executor))]
pub(crate) async fn update_last_pruned<'a, E>(height: i64, executor: E) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("update_last_pruned");
let start = Instant::now();
sqlx::query!("UPDATE pruning SET last_pruned_height = $1", height)
.execute(executor)
.await?;
log_db_operation_time("update_last_pruned", start);
Ok(())
}
pub(crate) async fn prune_blocks<'a, E>(oldest_to_keep: i64, executor: E) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_blocks");
let start = Instant::now();
sqlx::query!("DELETE FROM block WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_blocks", start);
Ok(())
}
pub(crate) async fn prune_pre_commits<'a, E>(
oldest_to_keep: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_pre_commits");
let start = Instant::now();
sqlx::query!("DELETE FROM pre_commit WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_pre_commits", start);
Ok(())
}
pub(crate) async fn prune_transactions<'a, E>(
oldest_to_keep: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_transactions");
let start = Instant::now();
sqlx::query!("DELETE FROM transaction WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_transactions", start);
Ok(())
}
pub(crate) async fn prune_messages<'a, E>(
oldest_to_keep: i64,
executor: E,
) -> Result<(), sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
trace!("prune_messages");
let start = Instant::now();
sqlx::query!("DELETE FROM message WHERE height < $1", oldest_to_keep)
.execute(executor)
.await?;
log_db_operation_time("prune_messages", start);
Ok(())
}
@@ -0,0 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod block_storage;
mod helpers;
mod manager;
pub mod models;
pub mod transaction;
@@ -0,0 +1,47 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use sqlx::types::time::OffsetDateTime;
#[derive(Debug, Clone, Eq, PartialEq, Hash, FromRow)]
pub struct Validator {
pub consensus_address: String,
pub consensus_pubkey: String,
}
#[derive(Debug, Clone, FromRow)]
pub struct Block {
pub height: i64,
pub hash: String,
pub num_txs: u32,
pub total_gas: i64,
pub proposer_address: String,
pub timestamp: OffsetDateTime,
}
#[derive(Debug, Clone, FromRow)]
pub struct CommitSignature {
pub height: i64,
pub validator_address: String,
pub voting_power: i64,
pub proposer_priority: i64,
pub timestamp: OffsetDateTime,
}
#[derive(Debug, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "coin")]
pub struct DbCoin {
pub amount: String,
pub denom: String,
}
impl From<cosmrs::proto::cosmos::base::v1beta1::Coin> for DbCoin {
fn from(coin: cosmrs::proto::cosmos::base::v1beta1::Coin) -> Self {
Self {
amount: coin.amount,
denom: coin.denom,
}
}
}
@@ -0,0 +1,297 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::PostgresScraperError;
use crate::storage::helpers::parse_addresses_from_events;
use crate::storage::manager::{
insert_block, insert_message, insert_precommit, insert_transaction, insert_validator,
update_last_processed,
};
use async_trait::async_trait;
use base64::Engine as _;
use base64::engine::general_purpose;
use cosmrs::proto;
use nyxd_scraper_shared::ParsedTransactionResponse;
use nyxd_scraper_shared::helpers::{
validator_consensus_address, validator_info, validator_pubkey_to_bech32,
};
use nyxd_scraper_shared::storage::validators::Response;
use nyxd_scraper_shared::storage::{
Block, Commit, CommitSig, NyxdScraperStorageError, NyxdScraperTransaction, validators,
};
use serde_json::json;
use sqlx::types::time::{OffsetDateTime, PrimitiveDateTime};
use sqlx::{Postgres, Transaction};
use std::ops::{Deref, DerefMut};
use tracing::{debug, error, trace, warn};
pub struct PostgresStorageTransaction {
pub(super) inner: Transaction<'static, Postgres>,
}
impl Deref for PostgresStorageTransaction {
type Target = Transaction<'static, Postgres>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for PostgresStorageTransaction {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl PostgresStorageTransaction {
async fn persist_validators(
&mut self,
validators: &validators::Response,
) -> Result<(), PostgresScraperError> {
debug!("persisting {} validators", validators.total);
for validator in &validators.validators {
let consensus_address = validator_consensus_address(validator.address)?;
let consensus_pubkey = validator_pubkey_to_bech32(validator.pub_key)?;
insert_validator(
consensus_address.to_string(),
consensus_pubkey.to_string(),
self.inner.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), PostgresScraperError> {
let proposer_address =
validator_consensus_address(block.header.proposer_address)?.to_string();
let offset_datetime: OffsetDateTime = block.header.time.into();
let time = PrimitiveDateTime::new(offset_datetime.date(), offset_datetime.time());
insert_block(
block.header.height.into(),
block.header.hash().to_string(),
block.data.len() as i32,
total_gas,
proposer_address,
time,
self.inner.as_mut(),
)
.await?;
Ok(())
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &validators::Response,
) -> Result<(), PostgresScraperError> {
debug!("persisting up to {} commits", commits.signatures.len());
let height: i64 = commits.height.into();
for commit_sig in &commits.signatures {
let (validator_id, timestamp, signature) = match commit_sig {
CommitSig::BlockIdFlagAbsent => {
trace!("absent signature");
continue;
}
CommitSig::BlockIdFlagCommit {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
CommitSig::BlockIdFlagNil {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
};
let validator = validator_info(*validator_id, validators)?;
let validator_address = validator_consensus_address(*validator_id)?;
if signature.is_none() {
warn!("empty signature for {validator_address} at height {height}");
continue;
}
let offset_datetime: OffsetDateTime = (*timestamp).into();
let time = PrimitiveDateTime::new(offset_datetime.date(), offset_datetime.time());
insert_precommit(
validator_address.to_string(),
height,
time,
validator.power.into(),
validator.proposer_priority.value(),
self.inner.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), PostgresScraperError> {
debug!("persisting {} txs", txs.len());
for chain_tx in txs {
// bdjuno style, base64 encode them
let signatures = chain_tx
.tx
.signatures
.iter()
.map(|sig| general_purpose::STANDARD.encode(sig))
.collect();
let messages = chain_tx
.parsed_messages
.values()
.cloned()
.collect::<Vec<_>>();
let signer_infos = chain_tx
.tx
.auth_info
.signer_infos
.iter()
.map(|info| proto::cosmos::tx::v1beta1::SignerInfo::from(info.clone()))
.collect::<Vec<_>>();
let hash = chain_tx.hash.to_string();
let height = chain_tx.height.into();
let index = chain_tx.index as i32;
let log = serde_json::to_value(chain_tx.tx_result.log.clone()).map_err(|e| error!(hash, height, index, "Failed to parse logs: {e}")).unwrap_or_default();
let events = &chain_tx.tx_result.events;
insert_transaction(
hash,
height,
index,
chain_tx.tx_result.code.is_ok(),
serde_json::Value::Array(messages),
chain_tx.tx.body.memo.clone(),
signatures,
serde_json::to_value(signer_infos)?,
serde_json::to_value(&chain_tx.tx.auth_info.fee)?,
chain_tx.tx_result.gas_wanted,
chain_tx.tx_result.gas_used,
chain_tx.tx_result.log.clone(),
json!(log),
json!(events),
self.inner.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), PostgresScraperError> {
debug!("persisting messages");
for chain_tx in txs {
let involved_addresses = parse_addresses_from_events(chain_tx);
for (index, msg) in chain_tx.tx.body.messages.iter().enumerate() {
let parsed_message = chain_tx.parsed_messages.get(&index);
let value = serde_json::to_value(parsed_message)?;
insert_message(
chain_tx.hash.to_string(),
index as i64,
msg.type_url.clone(),
value,
involved_addresses.clone(),
chain_tx.height.into(),
self.inner.as_mut(),
)
.await?
}
}
Ok(())
}
async fn update_last_processed(&mut self, height: i64) -> Result<(), PostgresScraperError> {
debug!("update_last_processed");
update_last_processed(height, self.inner.as_mut()).await?;
Ok(())
}
}
#[async_trait]
impl NyxdScraperTransaction for PostgresStorageTransaction {
async fn commit(self) -> Result<(), NyxdScraperStorageError> {
self.inner
.commit()
.await
.map_err(PostgresScraperError::from)
.map_err(NyxdScraperStorageError::from)
}
async fn persist_validators(
&mut self,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_validators(validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), NyxdScraperStorageError> {
self.persist_block_data(block, total_gas)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_commits(commits, validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_txs(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_messages(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError> {
self.update_last_processed(height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
@@ -1,5 +1,5 @@
[package]
name = "nyxd-scraper"
name = "nyxd-scraper-shared"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
@@ -8,19 +8,22 @@ documentation.workspace = true
edition.workspace = true
license.workspace = true
rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
readme.workspace = true
[dependencies]
async-trait.workspace = true
base64.workspace = true
const_format = { workspace = true }
cosmrs.workspace = true
cosmos-sdk-proto = { workspace = true, features = ["serde", "cosmwasm"] } # we need to explicitly include serde feature
eyre = { workspace = true }
futures.workspace = true
humantime = { workspace = true }
ibc-proto = { workspace = true, features = ["serde"] }
prost = { workspace = true }
sha2 = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"] }
serde_json = { workspace = true }
tendermint.workspace = true
tendermint-rpc = { workspace = true, features = ["websocket-client", "http-client"] }
thiserror.workspace = true
@@ -32,11 +35,5 @@ tracing.workspace = true
url.workspace = true
# TEMP
#nym-bin-common = { path = "../bin-common", features = ["basic_tracing"]}
[build-dependencies]
anyhow = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[lints]
workspace = true
@@ -8,7 +8,7 @@ use crate::block_requester::BlockRequest;
use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::storage::{ScraperStorage, persist_block};
use crate::storage::{NyxdScraperStorage, NyxdScraperTransaction, persist_block};
use futures::StreamExt;
use std::cmp::max;
use std::collections::{BTreeMap, HashSet, VecDeque};
@@ -77,7 +77,7 @@ impl BlockProcessorConfig {
}
}
pub struct BlockProcessor {
pub struct BlockProcessor<S> {
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
@@ -90,7 +90,7 @@ pub struct BlockProcessor {
rpc_client: RpcClient,
incoming: UnboundedReceiverStream<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: ScraperStorage,
storage: S,
// future work: rather than sending each msg to every msg module,
// let them subscribe based on `type_url` inside the message itself
@@ -101,14 +101,17 @@ pub struct BlockProcessor {
}
#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
impl<S> BlockProcessor<S>
where
S: NyxdScraperStorage,
{
pub async fn new(
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
block_requester: Sender<BlockRequest>,
storage: ScraperStorage,
storage: S,
rpc_client: RpcClient,
) -> Result<Self, ScraperError> {
let last_processed = storage.get_last_processed_height().await?;
@@ -164,7 +167,11 @@ impl BlockProcessor {
// process the entire block as a transaction so that if anything fails,
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;
let mut tx = self
.storage
.begin_processing_tx()
.await
.map_err(ScraperError::tx_begin_failure)?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;
@@ -192,10 +199,8 @@ impl BlockProcessor {
}
let commit_start = Instant::now();
tx.commit()
.await
.map_err(|source| ScraperError::StorageTxCommitFailure { source })?;
crate::storage::log_db_operation_time("committing processing tx", commit_start);
tx.commit().await.map_err(ScraperError::tx_commit_failure)?;
crate::storage::helpers::log_db_operation_time("committing processing tx", commit_start);
self.last_processed_height = full_info.block.header.height.value() as u32;
self.last_processed_at = Instant::now();
@@ -3,6 +3,7 @@
use crate::error::ScraperError;
use crate::helpers;
use std::collections::HashMap;
use tendermint::{Block, Hash, abci, block, tx};
use tendermint_rpc::endpoint::{block as block_endpoint, block_results, validators};
use tendermint_rpc::event::{Event, EventData};
@@ -26,6 +27,12 @@ pub struct ParsedTransactionResponse {
pub tx: cosmrs::tx::Tx,
pub proof: Option<tx::Proof>,
pub parsed_messages: HashMap<usize, serde_json::Value>,
pub parsed_message_urls: HashMap<usize, String>,
pub block: Block,
}
#[derive(Debug)]
@@ -0,0 +1,146 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::modules::auth::Auth;
use crate::cosmos_module::modules::authz::Authz;
use crate::cosmos_module::modules::bank::Bank;
use crate::cosmos_module::modules::capability::Capability;
use crate::cosmos_module::modules::consensus::Consensus;
use crate::cosmos_module::modules::crisis::Crisis;
use crate::cosmos_module::modules::distribution::Distribution;
use crate::cosmos_module::modules::evidence::Evidence;
use crate::cosmos_module::modules::feegrant::Feegrant;
use crate::cosmos_module::modules::gov_v1::GovV1;
use crate::cosmos_module::modules::gov_v1beta1::GovV1Beta1;
use crate::cosmos_module::modules::group::Group;
use crate::cosmos_module::modules::ibc_core::IbcCore;
use crate::cosmos_module::modules::ibc_fee::IbcFee;
use crate::cosmos_module::modules::ibc_interchain_accounts_controller::IbcInterchainAccountsController;
use crate::cosmos_module::modules::ibc_transfer_v1::IbcTransferV1;
use crate::cosmos_module::modules::ibc_transfer_v2::IbcTransferV2;
use crate::cosmos_module::modules::mint::Mint;
use crate::cosmos_module::modules::nft::Nft;
use crate::cosmos_module::modules::params::Params;
use crate::cosmos_module::modules::slashing::Slashing;
use crate::cosmos_module::modules::staking::Staking;
use crate::cosmos_module::modules::upgrade::Upgrade;
use crate::cosmos_module::modules::vesting::Vesting;
use crate::cosmos_module::modules::wasm::Wasm;
use crate::error::ScraperError;
use cosmrs::Any;
use cosmrs::proto::prost::Name;
use cosmrs::proto::traits::Message;
use serde::Serialize;
use std::collections::HashMap;
pub(crate) fn default_proto_to_json<T: Message + Default + Serialize>(
msg: &Any,
) -> Result<serde_json::Value, ScraperError> {
let proto = <T as Message>::decode(msg.value.as_slice()).map_err(|error| {
ScraperError::InvalidProtoRepresentation {
type_url: msg.type_url.clone(),
error,
}
})?;
let mut base_serde =
serde_json::to_value(&proto).map_err(|error| ScraperError::JsonSerialisationFailure {
type_url: msg.type_url.clone(),
error,
})?;
// in bdjuno's output we also had @type field with the type_url
let obj = base_serde.as_object_mut().ok_or_else(|| {
ScraperError::JsonSerialisationFailureNotObject {
type_url: msg.type_url.clone(),
}
})?;
obj.insert(
"@type".to_string(),
serde_json::Value::String(msg.type_url.clone()),
);
Ok(base_serde)
}
type ConvertFn = fn(&Any) -> Result<serde_json::Value, ScraperError>;
#[derive(Default, Clone)]
pub struct MessageRegistry {
// type url to function converting bytes to proto and finally to json
registered_types: HashMap<String, ConvertFn>,
}
impl MessageRegistry {
pub fn new() -> Self {
MessageRegistry {
registered_types: Default::default(),
}
}
pub fn register<T>(&mut self)
where
T: Message + Default + Name + Serialize + 'static,
{
self.register_with_custom_fn::<T>(default_proto_to_json::<T>)
}
#[allow(clippy::panic)]
pub fn register_with_custom_fn<T>(&mut self, convert_fn: ConvertFn)
where
T: Message + Default + Name + Serialize + 'static,
{
if self
.registered_types
.insert(<T as Name>::type_url(), convert_fn)
.is_some()
{
// don't allow duplicate registration because it most likely implies bug in the code
panic!("duplicate registration of type {}", <T as Name>::type_url());
}
}
pub fn try_decode(&self, raw: &Any) -> Result<serde_json::Value, ScraperError> {
self.registered_types.get(&raw.type_url).ok_or(
ScraperError::MissingTypeUrlRegistration {
type_url: raw.type_url.clone(),
},
)?(raw)
}
}
pub fn default_message_registry() -> MessageRegistry {
let mut registry = MessageRegistry::new();
let modules: Vec<Box<dyn CosmosModule>> = vec![
Box::new(Auth),
Box::new(Authz),
Box::new(Bank),
Box::new(Capability),
Box::new(Consensus),
Box::new(Wasm),
Box::new(Crisis),
Box::new(Distribution),
Box::new(Evidence),
Box::new(Feegrant),
Box::new(GovV1),
Box::new(GovV1Beta1),
Box::new(Group),
Box::new(IbcCore),
Box::new(IbcFee),
Box::new(IbcTransferV1),
Box::new(IbcTransferV2),
Box::new(IbcInterchainAccountsController),
Box::new(Mint),
Box::new(Nft),
Box::new(Params),
Box::new(Slashing),
Box::new(Staking),
Box::new(Upgrade),
Box::new(Vesting),
];
for module in modules {
module.register_messages(&mut registry)
}
registry
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::message_registry::MessageRegistry;
pub mod message_registry;
mod modules;
pub trait CosmosModule {
fn register_messages(&self, registry: &mut MessageRegistry);
}
@@ -0,0 +1,14 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::auth::v1beta1::MsgUpdateParams;
pub(crate) struct Auth;
impl CosmosModule for Auth {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgUpdateParams>()
}
}
@@ -0,0 +1,16 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::authz::v1beta1::{MsgExec, MsgGrant, MsgRevoke};
pub(crate) struct Authz;
impl CosmosModule for Authz {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgGrant>();
registry.register::<MsgExec>();
registry.register::<MsgRevoke>();
}
}
@@ -0,0 +1,19 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::bank::v1beta1::{
MsgMultiSend, MsgSend, MsgSetSendEnabled, MsgUpdateParams,
};
pub(crate) struct Bank;
impl CosmosModule for Bank {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSend>();
registry.register::<MsgMultiSend>();
registry.register::<MsgUpdateParams>();
registry.register::<MsgSetSendEnabled>();
}
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
pub(crate) struct Capability;
impl CosmosModule for Capability {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
pub(crate) struct Consensus;
impl CosmosModule for Consensus {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,15 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::crisis::v1beta1::{MsgUpdateParams, MsgVerifyInvariant};
pub(crate) struct Crisis;
impl CosmosModule for Crisis {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgVerifyInvariant>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,22 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::distribution::v1beta1::{
MsgCommunityPoolSpend, MsgFundCommunityPool, MsgSetWithdrawAddress, MsgUpdateParams,
MsgWithdrawDelegatorReward, MsgWithdrawValidatorCommission,
};
pub(crate) struct Distribution;
impl CosmosModule for Distribution {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgWithdrawDelegatorReward>();
registry.register::<MsgWithdrawValidatorCommission>();
registry.register::<MsgSetWithdrawAddress>();
registry.register::<MsgFundCommunityPool>();
registry.register::<MsgUpdateParams>();
registry.register::<MsgCommunityPoolSpend>();
}
}
@@ -0,0 +1,14 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::evidence::v1beta1::MsgSubmitEvidence;
pub(crate) struct Evidence;
impl CosmosModule for Evidence {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSubmitEvidence>()
}
}
@@ -0,0 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::feegrant::v1beta1::{
MsgGrantAllowance, MsgPruneAllowances, MsgRevokeAllowance,
};
pub(crate) struct Feegrant;
impl CosmosModule for Feegrant {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgGrantAllowance>();
registry.register::<MsgRevokeAllowance>();
registry.register::<MsgPruneAllowances>();
}
}
@@ -0,0 +1,21 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::gov::v1::{
MsgDeposit, MsgExecLegacyContent, MsgSubmitProposal, MsgUpdateParams, MsgVote, MsgVoteWeighted,
};
pub(crate) struct GovV1;
impl CosmosModule for GovV1 {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSubmitProposal>();
registry.register::<MsgDeposit>();
registry.register::<MsgVote>();
registry.register::<MsgVoteWeighted>();
registry.register::<MsgExecLegacyContent>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,19 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::gov::v1beta1::{
MsgDeposit, MsgSubmitProposal, MsgVote, MsgVoteWeighted,
};
pub(crate) struct GovV1Beta1;
impl CosmosModule for GovV1Beta1 {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSubmitProposal>();
registry.register::<MsgDeposit>();
registry.register::<MsgVote>();
registry.register::<MsgVoteWeighted>();
}
}
@@ -0,0 +1,29 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use tracing::warn;
pub(crate) struct Group;
impl CosmosModule for Group {
fn register_messages(&self, _registry: &mut MessageRegistry) {
warn!("missing cosmos-sdk-proto definition for 'group::MsgCreateGroup'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgUpdateGroupMembers'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgUpdateGroupAdmin'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgUpdateGroupMetadata'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgCreateGroupWithPolicy'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgCreateGroupPolicy'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgUpdateGroupPolicyAdmin'");
warn!(
"missing cosmos-sdk-proto definition for 'group::MsgUpdateGroupPolicyDecisionPolicy'"
);
warn!("missing cosmos-sdk-proto definition for 'group::MsgUpdateGroupPolicyMetadata'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgSubmitProposal'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgWithdrawProposal'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgVote'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgExec'");
warn!("missing cosmos-sdk-proto definition for 'group::MsgLeaveGroup'");
}
}
@@ -0,0 +1,70 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
use ibc_proto::ibc::core::channel::{
self,
v1::{
MsgAcknowledgement, MsgChannelCloseConfirm, MsgChannelCloseInit, MsgChannelOpenAck,
MsgChannelOpenConfirm, MsgChannelOpenInit, MsgChannelOpenTry, MsgChannelUpgradeAck,
MsgChannelUpgradeCancel, MsgChannelUpgradeConfirm, MsgChannelUpgradeInit,
MsgChannelUpgradeOpen, MsgChannelUpgradeTimeout, MsgChannelUpgradeTry,
MsgPruneAcknowledgements, MsgRecvPacket, MsgTimeout, MsgTimeoutOnClose,
},
};
use ibc_proto::ibc::core::client::{
self,
v1::{
MsgCreateClient, MsgIbcSoftwareUpgrade, MsgRecoverClient, MsgSubmitMisbehaviour,
MsgUpdateClient, MsgUpgradeClient,
},
};
use ibc_proto::ibc::core::connection::{
self,
v1::{
MsgConnectionOpenAck, MsgConnectionOpenConfirm, MsgConnectionOpenInit, MsgConnectionOpenTry,
},
};
pub(crate) struct IbcCore;
impl CosmosModule for IbcCore {
fn register_messages(&self, registry: &mut MessageRegistry) {
// channel
registry.register::<MsgChannelOpenInit>();
registry.register::<MsgChannelOpenTry>();
registry.register::<MsgChannelOpenAck>();
registry.register::<MsgChannelOpenConfirm>();
registry.register::<MsgChannelCloseInit>();
registry.register::<MsgChannelCloseConfirm>();
registry.register::<MsgRecvPacket>();
registry.register::<MsgTimeout>();
registry.register::<MsgTimeoutOnClose>();
registry.register::<MsgAcknowledgement>();
registry.register::<MsgChannelUpgradeInit>();
registry.register::<MsgChannelUpgradeTry>();
registry.register::<MsgChannelUpgradeAck>();
registry.register::<MsgChannelUpgradeConfirm>();
registry.register::<MsgChannelUpgradeOpen>();
registry.register::<MsgChannelUpgradeTimeout>();
registry.register::<MsgChannelUpgradeCancel>();
registry.register::<channel::v1::MsgUpdateParams>();
registry.register::<MsgPruneAcknowledgements>();
// client
registry.register::<MsgCreateClient>();
registry.register::<MsgUpdateClient>();
registry.register::<MsgUpgradeClient>();
registry.register::<MsgSubmitMisbehaviour>();
registry.register::<MsgRecoverClient>();
registry.register::<MsgIbcSoftwareUpgrade>();
registry.register::<client::v1::MsgUpdateParams>();
// connection
registry.register::<MsgConnectionOpenInit>();
registry.register::<MsgConnectionOpenTry>();
registry.register::<MsgConnectionOpenAck>();
registry.register::<MsgConnectionOpenConfirm>();
registry.register::<connection::v1::MsgUpdateParams>();
}
}
@@ -0,0 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
use ibc_proto::ibc::applications::fee::v1::{
MsgPayPacketFee, MsgPayPacketFeeAsync, MsgRegisterPayee, RegisteredCounterpartyPayee,
};
pub(crate) struct IbcFee;
impl CosmosModule for IbcFee {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgRegisterPayee>();
registry.register::<RegisteredCounterpartyPayee>();
registry.register::<MsgPayPacketFee>();
registry.register::<MsgPayPacketFeeAsync>();
}
}
@@ -0,0 +1,17 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
use ibc_proto::ibc::applications::interchain_accounts::controller::v1::{
MsgRegisterInterchainAccount, MsgSendTx, MsgUpdateParams,
};
pub(crate) struct IbcInterchainAccountsController;
impl CosmosModule for IbcInterchainAccountsController {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgRegisterInterchainAccount>();
registry.register::<MsgSendTx>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,14 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
use ibc_proto::ibc::applications::transfer::v1::{MsgTransfer, MsgUpdateParams};
pub(crate) struct IbcTransferV1;
impl CosmosModule for IbcTransferV1 {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgTransfer>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,10 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{CosmosModule, MessageRegistry};
pub(crate) struct IbcTransferV2;
impl CosmosModule for IbcTransferV2 {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,14 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::mint::v1beta1::MsgUpdateParams;
pub(crate) struct Mint;
impl CosmosModule for Mint {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgUpdateParams>()
}
}
@@ -0,0 +1,28 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub(crate) mod auth;
pub(crate) mod authz;
pub(crate) mod bank;
pub(crate) mod capability;
pub(crate) mod consensus;
pub(crate) mod crisis;
pub(crate) mod distribution;
pub(crate) mod evidence;
pub(crate) mod feegrant;
pub(crate) mod gov_v1;
pub(crate) mod gov_v1beta1;
pub(crate) mod group;
pub(crate) mod ibc_core;
pub(crate) mod ibc_fee;
pub(crate) mod ibc_interchain_accounts_controller;
pub(crate) mod ibc_transfer_v1;
pub(crate) mod ibc_transfer_v2;
pub(crate) mod mint;
pub(crate) mod nft;
pub(crate) mod params;
pub(crate) mod slashing;
pub(crate) mod staking;
pub(crate) mod upgrade;
pub(crate) mod vesting;
pub(crate) mod wasm;
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
pub(crate) struct Nft;
impl CosmosModule for Nft {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
pub(crate) struct Params;
impl CosmosModule for Params {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,11 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
pub(crate) struct Slashing;
impl CosmosModule for Slashing {
fn register_messages(&self, _registry: &mut MessageRegistry) {}
}
@@ -0,0 +1,23 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::staking::v1beta1::{
MsgBeginRedelegate, MsgCancelUnbondingDelegation, MsgCreateValidator, MsgDelegate,
MsgEditValidator, MsgUndelegate, MsgUpdateParams,
};
pub(crate) struct Staking;
impl CosmosModule for Staking {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgCreateValidator>();
registry.register::<MsgEditValidator>();
registry.register::<MsgDelegate>();
registry.register::<MsgUndelegate>();
registry.register::<MsgBeginRedelegate>();
registry.register::<MsgCancelUnbondingDelegation>();
registry.register::<MsgUpdateParams>();
}
}
@@ -0,0 +1,15 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::upgrade::v1beta1::{MsgCancelUpgrade, MsgSoftwareUpgrade};
pub(crate) struct Upgrade;
impl CosmosModule for Upgrade {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgSoftwareUpgrade>();
registry.register::<MsgCancelUpgrade>();
}
}
@@ -0,0 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::MessageRegistry;
use cosmos_sdk_proto::cosmos::vesting::v1beta1::{
MsgCreatePeriodicVestingAccount, MsgCreatePermanentLockedAccount, MsgCreateVestingAccount,
};
pub(crate) struct Vesting;
impl CosmosModule for Vesting {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgCreateVestingAccount>();
registry.register::<MsgCreatePermanentLockedAccount>();
registry.register::<MsgCreatePeriodicVestingAccount>();
}
}
@@ -0,0 +1,104 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::cosmos_module::CosmosModule;
use crate::cosmos_module::message_registry::{MessageRegistry, default_proto_to_json};
use crate::error::ScraperError;
use base64::Engine;
use base64::engine::general_purpose::STANDARD;
use cosmos_sdk_proto::cosmwasm::wasm::v1::{
MsgAddCodeUploadParamsAddresses, MsgClearAdmin, MsgExecuteContract, MsgIbcCloseChannel,
MsgIbcSend, MsgInstantiateContract, MsgInstantiateContract2, MsgMigrateContract, MsgPinCodes,
MsgRemoveCodeUploadParamsAddresses, MsgStoreAndInstantiateContract, MsgStoreAndMigrateContract,
MsgStoreCode, MsgSudoContract, MsgUnpinCodes, MsgUpdateAdmin, MsgUpdateContractLabel,
MsgUpdateInstantiateConfig, MsgUpdateParams,
};
use cosmrs::Any;
use prost::Message;
use serde::Serialize;
use tracing::warn;
pub(crate) struct Wasm;
fn decode_wasm_message<T: Message + Default + Serialize>(
msg: &Any,
) -> Result<serde_json::Value, ScraperError> {
let field = "msg";
// 1. perform basic decoding
let mut base = default_proto_to_json::<T>(msg)?;
let Some(encoded_field) = base.get_mut(field) else {
warn!(
"missing field 'msg' in wasm message of type {} - can't perform additional decoding",
msg.type_url
);
return Ok(base);
};
// 2. decode 'msg' field
let as_str =
encoded_field
.as_str()
.ok_or(ScraperError::JsonWasmSerialisationFailureNotString {
field: field.to_string(),
type_url: msg.type_url.clone(),
})?;
let decoded = STANDARD.decode(as_str).map_err(|error| {
ScraperError::JsonWasmSerialisationFailureInvalidBase64Encoding {
field: field.to_string(),
type_url: msg.type_url.clone(),
error,
}
})?;
// 3. replace original 'msg' with the new json
let re_decoded: serde_json::Value = serde_json::from_slice(&decoded).map_err(|error| {
ScraperError::JsonSerialisationFailure {
type_url: format!("{}.{field}", msg.type_url),
error,
}
})?;
*encoded_field = re_decoded;
Ok(base)
}
impl CosmosModule for Wasm {
fn register_messages(&self, registry: &mut MessageRegistry) {
registry.register::<MsgIbcSend>();
registry.register::<MsgIbcCloseChannel>();
registry.register::<MsgStoreCode>();
registry.register_with_custom_fn::<MsgInstantiateContract>(|msg| {
decode_wasm_message::<MsgInstantiateContract>(msg)
});
registry.register_with_custom_fn::<MsgInstantiateContract2>(|msg| {
decode_wasm_message::<MsgInstantiateContract2>(msg)
});
registry.register_with_custom_fn::<MsgExecuteContract>(|msg| {
decode_wasm_message::<MsgExecuteContract>(msg)
});
registry.register_with_custom_fn::<MsgMigrateContract>(|msg| {
decode_wasm_message::<MsgMigrateContract>(msg)
});
registry.register_with_custom_fn::<MsgSudoContract>(|msg| {
decode_wasm_message::<MsgSudoContract>(msg)
});
registry.register_with_custom_fn::<MsgStoreAndInstantiateContract>(|msg| {
decode_wasm_message::<MsgStoreAndInstantiateContract>(msg)
});
registry.register_with_custom_fn::<MsgStoreAndMigrateContract>(|msg| {
decode_wasm_message::<MsgStoreAndMigrateContract>(msg)
});
registry.register::<MsgUpdateAdmin>();
registry.register::<MsgClearAdmin>();
registry.register::<MsgUpdateInstantiateConfig>();
registry.register::<MsgUpdateParams>();
registry.register::<MsgPinCodes>();
registry.register::<MsgUnpinCodes>();
registry.register::<MsgAddCodeUploadParamsAddresses>();
registry.register::<MsgRemoveCodeUploadParamsAddresses>();
registry.register::<MsgUpdateContractLabel>();
}
}
@@ -4,17 +4,16 @@
use crate::block_processor::pruning::{
EVERYTHING_PRUNING_INTERVAL, EVERYTHING_PRUNING_KEEP_RECENT,
};
use crate::helpers::MalformedDataError;
use crate::storage::NyxdScraperStorageError;
use tendermint::Hash;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
#[derive(Debug, Error)]
pub enum ScraperError {
#[error("experienced internal database error: {0}")]
InternalDatabaseError(#[from] sqlx::Error),
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("storage error: {0}")]
StorageError(#[from] NyxdScraperStorageError),
#[error("the block scraper is already running")]
ScraperAlreadyRunning,
@@ -106,40 +105,26 @@ pub enum ScraperError {
#[error("failed to begin storage tx: {source}")]
StorageTxBeginFailure {
#[source]
source: sqlx::Error,
source: NyxdScraperStorageError,
},
#[error("failed to commit storage tx: {source}")]
StorageTxCommitFailure {
#[source]
source: sqlx::Error,
source: NyxdScraperStorageError,
},
#[error("failed to send on a closed channel")]
ClosedChannelError,
#[error("failed to parse validator's address: {source}")]
MalformedValidatorAddress {
#[source]
source: eyre::Report,
},
#[error("failed to parse validator's address: {source}")]
MalformedValidatorPubkey {
#[source]
source: eyre::Report,
},
#[error(transparent)]
MalformedData(#[from] MalformedDataError),
#[error(
"could not find the block proposer ('{proposer}') for height {height} in the validator set"
)]
BlockProposerNotInValidatorSet { height: u32, proposer: String },
#[error(
"could not find validator information for {address}; the validator has signed a commit"
)]
MissingValidatorInfoCommitted { address: String },
#[error(
"pruning.interval must not be set to 0. If you want to disable pruning, select pruning.strategy = \"nothing\""
)]
@@ -156,6 +141,49 @@ pub enum ScraperError {
EVERYTHING_PRUNING_KEEP_RECENT
)]
TooSmallKeepRecent { keep_recent: u32 },
#[error("'{type_url}' is not registered in the message registry")]
MissingTypeUrlRegistration { type_url: String },
#[error("failed to decode message of type '{type_url}': {error}")]
InvalidProtoRepresentation {
type_url: String,
#[source]
error: prost::DecodeError,
},
#[error("failed to encode message of type '{type_url}' to json: '{error}'")]
JsonSerialisationFailure {
type_url: String,
#[source]
error: serde_json::Error,
},
#[error("serialisation of message of type '{type_url}' didn't result in an object!")]
JsonSerialisationFailureNotObject { type_url: String },
#[error("field '{field}' in '{type_url}' is not a string")]
JsonWasmSerialisationFailureNotString { field: String, type_url: String },
#[error("field '{field}' in '{type_url}' has invalid base64 encoding: {error}")]
JsonWasmSerialisationFailureInvalidBase64Encoding {
field: String,
type_url: String,
#[source]
error: base64::DecodeError,
},
}
impl ScraperError {
pub fn tx_begin_failure(source: NyxdScraperStorageError) -> ScraperError
where {
ScraperError::StorageTxBeginFailure { source }
}
pub fn tx_commit_failure(source: NyxdScraperStorageError) -> ScraperError
where {
ScraperError::StorageTxCommitFailure { source }
}
}
impl<T> From<SendError<T>> for ScraperError {
+66
View File
@@ -0,0 +1,66 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::ParsedTransactionResponse;
use crate::constants::{BECH32_CONESNSUS_PUBKEY_PREFIX, BECH32_CONSENSUS_ADDRESS_PREFIX};
use cosmrs::AccountId;
use sha2::{Digest, Sha256};
use tendermint::{Hash, validator};
use tendermint::{PublicKey, account};
use tendermint_rpc::endpoint::validators;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MalformedDataError {
#[error("failed to parse validator's address: {source}")]
MalformedValidatorAddress {
#[source]
source: eyre::Report,
},
#[error("failed to parse validator's address: {source}")]
MalformedValidatorPubkey {
#[source]
source: eyre::Report,
},
#[error(
"could not find validator information for {address}; the validator has signed a commit"
)]
MissingValidatorInfoCommitted { address: String },
}
pub fn tx_hash<M: AsRef<[u8]>>(raw_tx: M) -> Hash {
Hash::Sha256(Sha256::digest(raw_tx).into())
}
pub fn validator_pubkey_to_bech32(pubkey: PublicKey) -> Result<AccountId, MalformedDataError> {
// TODO: this one seem to attach additional prefix to they pubkeys, is that what we want instead maybe?
// Ok(pubkey.to_bech32(BECH32_CONESNSUS_PUBKEY_PREFIX))
AccountId::new(BECH32_CONESNSUS_PUBKEY_PREFIX, &pubkey.to_bytes())
.map_err(|source| MalformedDataError::MalformedValidatorPubkey { source })
}
pub fn validator_consensus_address(id: account::Id) -> Result<AccountId, MalformedDataError> {
AccountId::new(BECH32_CONSENSUS_ADDRESS_PREFIX, id.as_ref())
.map_err(|source| MalformedDataError::MalformedValidatorAddress { source })
}
pub fn tx_gas_sum(txs: &[ParsedTransactionResponse]) -> i64 {
txs.iter().map(|tx| tx.tx_result.gas_used).sum()
}
pub fn validator_info(
id: account::Id,
validators: &validators::Response,
) -> Result<&validator::Info, MalformedDataError> {
match validators.validators.iter().find(|v| v.address == id) {
Some(info) => Ok(info),
None => {
let addr = validator_consensus_address(id)?;
Err(MalformedDataError::MissingValidatorInfoCommitted {
address: addr.to_string(),
})
}
}
}
@@ -1,14 +1,12 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
#![warn(clippy::expect_used)]
#![warn(clippy::unwrap_used)]
pub(crate) mod block_processor;
pub(crate) mod block_requester;
pub mod constants;
mod cosmos_module;
pub mod error;
pub(crate) mod helpers;
pub mod helpers;
pub mod modules;
pub(crate) mod rpc_client;
pub(crate) mod scraper;
@@ -16,6 +14,11 @@ pub mod storage;
pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use block_processor::types::ParsedTransactionResponse;
pub use cosmos_module::{
CosmosModule,
message_registry::{MessageRegistry, default_message_registry},
};
pub use cosmrs::Any;
pub use modules::{BlockModule, MsgModule, TxModule};
pub use scraper::{Config, NyxdScraper, StartingBlockOpts};
pub use storage::models;
pub use storage::{NyxdScraperStorage, NyxdScraperTransaction};
@@ -3,7 +3,7 @@
use crate::block_processor::types::FullBlockInformation;
use crate::error::ScraperError;
use crate::storage::StorageTransaction;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
@@ -11,6 +11,6 @@ pub trait BlockModule {
async fn handle_block(
&mut self,
block: &FullBlockInformation,
storage_tx: &mut StorageTransaction,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,7 +3,7 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::StorageTransaction;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
use cosmrs::Any;
@@ -16,6 +16,6 @@ pub trait MsgModule {
index: usize,
msg: &Any,
tx: &ParsedTransactionResponse,
storage_tx: &mut StorageTransaction,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -3,7 +3,7 @@
use crate::block_processor::types::ParsedTransactionResponse;
use crate::error::ScraperError;
use crate::storage::StorageTransaction;
use crate::storage::NyxdScraperTransaction;
use async_trait::async_trait;
#[async_trait]
@@ -11,6 +11,6 @@ pub trait TxModule {
async fn handle_tx(
&mut self,
tx: &ParsedTransactionResponse,
storage_tx: &mut StorageTransaction,
storage_tx: &mut dyn NyxdScraperTransaction,
) -> Result<(), ScraperError>;
}
@@ -6,15 +6,16 @@ use crate::block_processor::types::{
};
use crate::error::ScraperError;
use crate::helpers::tx_hash;
use crate::{Any, MessageRegistry, default_message_registry};
use futures::StreamExt;
use futures::future::join3;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use tendermint::Hash;
use tendermint_rpc::endpoint::{block, block_results, tx, validators};
use tendermint_rpc::{Client, HttpClient, Paging};
use tokio::sync::Mutex;
use tracing::{debug, instrument};
use tracing::{debug, instrument, warn};
use url::Url;
#[derive(Clone)]
@@ -22,6 +23,9 @@ pub struct RpcClient {
// right now I don't care about anything nym specific, so a simple http client is sufficient,
// once this is inadequate, we can switch to a NyxdClient
inner: Arc<HttpClient>,
// kinda like very limited cosmos sdk codec
pub(crate) message_registry: MessageRegistry,
}
impl RpcClient {
@@ -35,9 +39,20 @@ impl RpcClient {
Ok(RpcClient {
inner: Arc::new(http_client),
message_registry: default_message_registry(),
})
}
fn decode_or_skip(&self, msg: &Any) -> Option<serde_json::Value> {
match self.message_registry.try_decode(msg) {
Ok(decoded) => Some(decoded),
Err(err) => {
warn!("Failed to decode raw message: {err}");
None
}
}
}
#[instrument(skip(self, block), fields(height = block.height))]
pub async fn try_get_full_details(
&self,
@@ -56,19 +71,33 @@ impl RpcClient {
let raw_transactions = raw_transactions?;
let mut transactions = Vec::with_capacity(raw_transactions.len());
for tx in raw_transactions {
for raw_tx in raw_transactions {
let mut parsed_messages = HashMap::new();
let mut parsed_message_urls = HashMap::new();
let tx = cosmrs::Tx::from_bytes(&raw_tx.tx).map_err(|source| {
ScraperError::TxParseFailure {
hash: raw_tx.hash,
source,
}
})?;
for (index, msg) in tx.body.messages.iter().enumerate() {
if let Some(value) = self.decode_or_skip(msg) {
parsed_messages.insert(index, value);
parsed_message_urls.insert(index, msg.type_url.clone());
}
}
transactions.push(ParsedTransactionResponse {
hash: tx.hash,
height: tx.height,
index: tx.index,
tx_result: tx.tx_result,
tx: cosmrs::Tx::from_bytes(&tx.tx).map_err(|source| {
ScraperError::TxParseFailure {
hash: tx.hash,
source,
}
})?,
proof: tx.proof,
hash: raw_tx.hash,
height: raw_tx.height,
index: raw_tx.index,
tx_result: raw_tx.tx_result,
tx,
proof: raw_tx.proof,
parsed_messages,
parsed_message_urls,
block: block.block.clone(),
})
}
@@ -9,9 +9,9 @@ use crate::error::ScraperError;
use crate::modules::{BlockModule, MsgModule, TxModule};
use crate::rpc_client::RpcClient;
use crate::scraper::subscriber::ChainSubscriber;
use crate::storage::ScraperStorage;
use crate::storage::NyxdScraperStorage;
use futures::future::join_all;
use std::path::PathBuf;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::sync::mpsc::{
@@ -40,7 +40,8 @@ pub struct Config {
/// Url to the rpc endpoint of a validator, for example `https://rpc.nymtech.net/`
pub rpc_url: Url,
pub database_path: PathBuf,
/// Points to either underlying file (sqlite) or connection string (postgres)
pub database_storage: String,
pub pruning_options: PruningOptions,
@@ -49,7 +50,8 @@ pub struct Config {
pub start_block: StartingBlockOpts,
}
pub struct NyxdScraperBuilder {
pub struct NyxdScraperBuilder<S> {
_storage: PhantomData<S>,
config: Config,
block_modules: Vec<Box<dyn BlockModule + Send>>,
@@ -57,9 +59,13 @@ pub struct NyxdScraperBuilder {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}
impl NyxdScraperBuilder {
pub async fn build_and_start(self) -> Result<NyxdScraper, ScraperError> {
let scraper = NyxdScraper::new(self.config).await?;
impl<S> NyxdScraperBuilder<S>
where
S: NyxdScraperStorage + Send + Sync + 'static,
S::StorageTransaction: Send + Sync + 'static,
{
pub async fn build_and_start(self) -> Result<NyxdScraper<S>, ScraperError> {
let scraper = NyxdScraper::<S>::new(self.config).await?;
let (processing_tx, processing_rx) = unbounded_channel();
let (req_tx, req_rx) = channel(5);
@@ -110,6 +116,7 @@ impl NyxdScraperBuilder {
pub fn new(config: Config) -> Self {
NyxdScraperBuilder {
_storage: PhantomData,
config,
block_modules: vec![],
tx_modules: vec![],
@@ -133,24 +140,28 @@ impl NyxdScraperBuilder {
}
}
pub struct NyxdScraper {
pub struct NyxdScraper<S> {
config: Config,
task_tracker: TaskTracker,
cancel_token: CancellationToken,
startup_sync: Arc<Notify>,
storage: ScraperStorage,
storage: S,
rpc_client: RpcClient,
}
impl NyxdScraper {
pub fn builder(config: Config) -> NyxdScraperBuilder {
impl<S> NyxdScraper<S>
where
S: NyxdScraperStorage + Send + Sync + 'static,
S::StorageTransaction: Send + Sync + 'static,
{
pub fn builder(config: Config) -> NyxdScraperBuilder<S> {
NyxdScraperBuilder::new(config)
}
pub async fn new(config: Config) -> Result<Self, ScraperError> {
config.pruning_options.validate()?;
let storage = ScraperStorage::init(&config.database_path).await?;
let storage = S::initialise(&config.database_storage).await?;
let rpc_client = RpcClient::new(&config.rpc_url)?;
Ok(NyxdScraper {
@@ -163,14 +174,14 @@ impl NyxdScraper {
})
}
pub fn storage(&self) -> ScraperStorage {
self.storage.clone()
pub fn storage(&self) -> &S {
&self.storage
}
fn start_tasks(
&self,
mut block_requester: BlockRequester,
mut block_processor: BlockProcessor,
mut block_processor: BlockProcessor<S>,
mut chain_subscriber: ChainSubscriber,
) {
self.task_tracker
@@ -336,7 +347,7 @@ impl NyxdScraper {
&self,
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
) -> Result<BlockProcessor<S>, ScraperError> {
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
self.config.store_precommits,
@@ -344,7 +355,7 @@ impl NyxdScraper {
self.config.start_block.use_best_effort_start_height,
);
BlockProcessor::new(
BlockProcessor::<S>::new(
block_processor_config,
self.cancel_token.clone(),
self.startup_sync.clone(),
@@ -0,0 +1,18 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use tokio::time::Instant;
use tracing::{debug, error, info, trace, warn};
pub fn log_db_operation_time(op_name: &str, start_time: Instant) {
let elapsed = start_time.elapsed();
let formatted = humantime::format_duration(elapsed);
match elapsed.as_millis() {
v if v > 10000 => error!("{op_name} took {formatted} to execute"),
v if v > 1000 => warn!("{op_name} took {formatted} to execute"),
v if v > 100 => info!("{op_name} took {formatted} to execute"),
v if v > 10 => debug!("{op_name} took {formatted} to execute"),
_ => trace!("{op_name} took {formatted} to execute"),
}
}
@@ -0,0 +1,124 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::ScraperError;
use async_trait::async_trait;
use thiserror::Error;
use tracing::warn;
pub use crate::ParsedTransactionResponse;
pub use crate::block_processor::types::FullBlockInformation;
pub use tendermint::Block;
pub use tendermint::block::{Commit, CommitSig};
pub use tendermint_rpc::endpoint::validators;
pub mod helpers;
// a workaround for needing associated type (which is a no-no in dynamic dispatch)
#[derive(Error, Debug)]
#[error(transparent)]
pub struct NyxdScraperStorageError(Box<dyn std::error::Error + Send + Sync>);
impl NyxdScraperStorageError {
pub fn new<E>(error: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
NyxdScraperStorageError(Box::new(error))
}
}
#[async_trait]
pub trait NyxdScraperStorage: Clone + Sized {
type StorageTransaction: NyxdScraperTransaction;
/// Either connection string (postgres) or storage path (sqlite)
async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError>;
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError>;
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError>;
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError>;
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError>;
async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), NyxdScraperStorageError>;
}
#[async_trait]
pub trait NyxdScraperTransaction {
async fn commit(mut self) -> Result<(), NyxdScraperStorageError>;
async fn persist_validators(
&mut self,
validators: &validators::Response,
) -> Result<(), NyxdScraperStorageError>;
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), NyxdScraperStorageError>;
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &validators::Response,
) -> Result<(), NyxdScraperStorageError>;
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError>;
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError>;
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError>;
}
pub async fn persist_block<Tx>(
block: &FullBlockInformation,
tx: &mut Tx,
store_precommits: bool,
) -> Result<(), ScraperError>
where
Tx: NyxdScraperTransaction,
{
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
// SANITY CHECK: make sure the block proposer is present in the validator set
block.ensure_proposer()?;
tx.persist_validators(&block.validators).await?;
tx.persist_block_data(&block.block, total_gas).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
tx.persist_commits(commit, &block.validators).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
tx.persist_txs(&block.transactions).await?;
// persist messages (inside the transactions)
tx.persist_messages(&block.transactions).await?;
tx.update_last_processed(block.block.header.height.into())
.await?;
Ok(())
}
+29
View File
@@ -0,0 +1,29 @@
[package]
name = "nyxd-scraper-sqlite"
version = "0.1.0"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
documentation.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate", "time"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
nyxd-scraper-shared = { path = "../nyxd-scraper-shared" }
[build-dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
anyhow.workspace = true
[lints]
workspace = true
+36
View File
@@ -0,0 +1,36 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use nyxd_scraper_shared::helpers::MalformedDataError;
use nyxd_scraper_shared::storage::NyxdScraperStorageError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SqliteScraperError {
#[error("experienced internal database error: {0}")]
InternalDatabaseError(#[from] sqlx::error::Error),
#[error("failed to perform startup SQL migration: {0}")]
StartupMigrationFailure(#[from] sqlx::migrate::MigrateError),
#[error("failed to begin storage tx: {source}")]
StorageTxBeginFailure {
#[source]
source: sqlx::error::Error,
},
#[error("failed to commit storage tx: {source}")]
StorageTxCommitFailure {
#[source]
source: sqlx::error::Error,
},
#[error(transparent)]
MalformedData(#[from] MalformedDataError),
}
impl From<SqliteScraperError> for NyxdScraperStorageError {
fn from(err: SqliteScraperError) -> Self {
NyxdScraperStorageError::new(err)
}
}
+21
View File
@@ -0,0 +1,21 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::block_storage::SqliteScraperStorage;
use nyxd_scraper_shared::NyxdScraper;
pub use nyxd_scraper_shared::constants;
pub use nyxd_scraper_shared::error::ScraperError;
pub use nyxd_scraper_shared::{
BlockModule, MsgModule, NyxdScraperTransaction, ParsedTransactionResponse, PruningOptions,
PruningStrategy, StartingBlockOpts, TxModule,
};
pub use storage::models;
pub mod error;
pub mod storage;
pub type SqliteNyxdScraper = NyxdScraper<SqliteScraperStorage>;
// TODO: for now just use exactly the same config
pub use nyxd_scraper_shared::Config;
@@ -0,0 +1,251 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::SqliteScraperError;
use crate::models::{CommitSignature, Validator};
use crate::storage::manager::{
StorageManager, prune_blocks, prune_messages, prune_pre_commits, prune_transactions,
update_last_pruned,
};
use crate::storage::transaction::SqliteStorageTransaction;
use async_trait::async_trait;
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use nyxd_scraper_shared::storage::{NyxdScraperStorage, NyxdScraperStorageError};
use sqlx::ConnectOptions;
use sqlx::sqlite::{SqliteAutoVacuum, SqliteSynchronous};
use sqlx::types::time::OffsetDateTime;
use std::fmt::Debug;
use std::path::Path;
use tokio::time::Instant;
use tracing::{debug, error, info, instrument};
#[derive(Clone)]
pub struct SqliteScraperStorage {
pub(crate) manager: StorageManager,
}
impl SqliteScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(
database_path: P,
) -> Result<Self, SqliteScraperError> {
let database_path = database_path.as_ref();
debug!(
"initialising scraper database path to '{}'",
database_path.display()
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./sql_migrations")
.run(&connection_pool)
.await
{
error!("Failed to initialize SQLx database: {err}");
return Err(err.into());
}
info!("Database migration finished!");
let manager = StorageManager { connection_pool };
manager.set_initial_metadata().await?;
let storage = SqliteScraperStorage { manager };
Ok(storage)
}
#[instrument(skip(self))]
pub async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), SqliteScraperError> {
let start = Instant::now();
let mut tx = self.begin_processing_tx().await?;
prune_messages(oldest_to_keep.into(), &mut **tx).await?;
prune_transactions(oldest_to_keep.into(), &mut **tx).await?;
prune_pre_commits(oldest_to_keep.into(), &mut **tx).await?;
prune_blocks(oldest_to_keep.into(), &mut **tx).await?;
update_last_pruned(current_height.into(), &mut **tx).await?;
let commit_start = Instant::now();
tx.0.commit()
.await
.map_err(|source| SqliteScraperError::StorageTxCommitFailure { source })?;
log_db_operation_time("committing pruning tx", commit_start);
log_db_operation_time("pruning storage", start);
Ok(())
}
#[instrument(skip_all)]
pub async fn begin_processing_tx(
&self,
) -> Result<SqliteStorageTransaction, SqliteScraperError> {
debug!("starting storage tx");
self.manager
.connection_pool
.begin()
.await
.map(SqliteStorageTransaction)
.map_err(|source| SqliteScraperError::StorageTxBeginFailure { source })
}
pub async fn lowest_block_height(&self) -> Result<Option<i64>, SqliteScraperError> {
Ok(self.manager.get_lowest_block().await?)
}
pub async fn get_first_block_height_after(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, SqliteScraperError> {
Ok(self.manager.get_first_block_height_after(time).await?)
}
pub async fn get_last_block_height_before(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, SqliteScraperError> {
Ok(self.manager.get_last_block_height_before(time).await?)
}
pub async fn get_blocks_between(
&self,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, SqliteScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
Ok(block_end - block_start)
}
pub async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, SqliteScraperError> {
Ok(self
.manager
.get_signed_between(consensus_address, start_height, end_height)
.await?)
}
pub async fn get_signed_between_times(
&self,
consensus_address: &str,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, SqliteScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
self.get_signed_between(consensus_address, block_start, block_end)
.await
}
pub async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, SqliteScraperError> {
Ok(self
.manager
.get_precommit(consensus_address, height)
.await?)
}
pub async fn get_block_signers(
&self,
height: i64,
) -> Result<Vec<Validator>, SqliteScraperError> {
Ok(self.manager.get_block_validators(height).await?)
}
pub async fn get_all_known_validators(&self) -> Result<Vec<Validator>, SqliteScraperError> {
Ok(self.manager.get_validators().await?)
}
pub async fn get_last_processed_height(&self) -> Result<i64, SqliteScraperError> {
Ok(self.manager.get_last_processed_height().await?)
}
pub async fn get_pruned_height(&self) -> Result<i64, SqliteScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
}
#[async_trait]
impl NyxdScraperStorage for SqliteScraperStorage {
type StorageTransaction = SqliteStorageTransaction;
async fn initialise(storage: &str) -> Result<Self, NyxdScraperStorageError> {
SqliteScraperStorage::init(storage)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn begin_processing_tx(
&self,
) -> Result<Self::StorageTransaction, NyxdScraperStorageError> {
self.begin_processing_tx()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_last_processed_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_last_processed_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn get_pruned_height(&self) -> Result<i64, NyxdScraperStorageError> {
self.get_pruned_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn lowest_block_height(&self) -> Result<Option<i64>, NyxdScraperStorageError> {
self.lowest_block_height()
.await
.map_err(NyxdScraperStorageError::from)
}
async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), NyxdScraperStorageError> {
self.prune_storage(oldest_to_keep, current_height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
@@ -1,8 +1,8 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::storage::log_db_operation_time;
use crate::storage::models::{CommitSignature, Validator};
use nyxd_scraper_shared::storage::helpers::log_db_operation_time;
use sqlx::types::time::OffsetDateTime;
use sqlx::{Executor, Sqlite};
use tokio::time::Instant;
@@ -1,2 +1,7 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
pub mod block_storage;
mod manager;
pub mod models;
pub mod transaction;
@@ -0,0 +1,243 @@
// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::error::SqliteScraperError;
use crate::storage::manager::{
insert_block, insert_message, insert_precommit, insert_transaction, insert_validator,
update_last_processed,
};
use async_trait::async_trait;
use nyxd_scraper_shared::ParsedTransactionResponse;
use nyxd_scraper_shared::helpers::{
validator_consensus_address, validator_info, validator_pubkey_to_bech32,
};
use nyxd_scraper_shared::storage::validators::Response;
use nyxd_scraper_shared::storage::{
Block, Commit, CommitSig, NyxdScraperStorageError, NyxdScraperTransaction, validators,
};
use sqlx::{Sqlite, Transaction};
use std::ops::{Deref, DerefMut};
use tracing::{debug, trace, warn};
pub struct SqliteStorageTransaction(pub(crate) Transaction<'static, Sqlite>);
impl Deref for SqliteStorageTransaction {
type Target = Transaction<'static, Sqlite>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for SqliteStorageTransaction {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl SqliteStorageTransaction {
async fn persist_validators(
&mut self,
validators: &validators::Response,
) -> Result<(), SqliteScraperError> {
debug!("persisting {} validators", validators.total);
for validator in &validators.validators {
let consensus_address = validator_consensus_address(validator.address)?;
let consensus_pubkey = validator_pubkey_to_bech32(validator.pub_key)?;
insert_validator(
consensus_address.to_string(),
consensus_pubkey.to_string(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), SqliteScraperError> {
let proposer_address =
validator_consensus_address(block.header.proposer_address)?.to_string();
insert_block(
block.header.height.into(),
block.header.hash().to_string(),
block.data.len() as u32,
total_gas,
proposer_address,
block.header.time.into(),
self.0.as_mut(),
)
.await?;
Ok(())
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &validators::Response,
) -> Result<(), SqliteScraperError> {
debug!("persisting up to {} commits", commits.signatures.len());
let height: i64 = commits.height.into();
for commit_sig in &commits.signatures {
let (validator_id, timestamp, signature) = match commit_sig {
CommitSig::BlockIdFlagAbsent => {
trace!("absent signature");
continue;
}
CommitSig::BlockIdFlagCommit {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
CommitSig::BlockIdFlagNil {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
};
let validator = validator_info(*validator_id, validators)?;
let validator_address = validator_consensus_address(*validator_id)?;
if signature.is_none() {
warn!("empty signature for {validator_address} at height {height}");
continue;
}
insert_precommit(
validator_address.to_string(),
height,
(*timestamp).into(),
validator.power.into(),
validator.proposer_priority.value(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), SqliteScraperError> {
debug!("persisting {} txs", txs.len());
for chain_tx in txs {
insert_transaction(
chain_tx.hash.to_string(),
chain_tx.height.into(),
chain_tx.index as i64,
chain_tx.tx_result.code.is_ok(),
chain_tx.tx.body.messages.len() as i64,
chain_tx.tx.body.memo.clone(),
chain_tx.tx_result.gas_wanted,
chain_tx.tx_result.gas_used,
chain_tx.tx_result.log.clone(),
self.0.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), SqliteScraperError> {
debug!("persisting messages");
for chain_tx in txs {
for (index, msg) in chain_tx.tx.body.messages.iter().enumerate() {
insert_message(
chain_tx.hash.to_string(),
index as i64,
msg.type_url.clone(),
chain_tx.height.into(),
self.0.as_mut(),
)
.await?
}
}
Ok(())
}
async fn update_last_processed(&mut self, height: i64) -> Result<(), SqliteScraperError> {
debug!("update_last_processed");
update_last_processed(height, self.0.as_mut()).await?;
Ok(())
}
}
#[async_trait]
impl NyxdScraperTransaction for SqliteStorageTransaction {
async fn commit(self) -> Result<(), NyxdScraperStorageError> {
self.0
.commit()
.await
.map_err(SqliteScraperError::from)
.map_err(NyxdScraperStorageError::from)
}
async fn persist_validators(
&mut self,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_validators(validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_block_data(
&mut self,
block: &Block,
total_gas: i64,
) -> Result<(), NyxdScraperStorageError> {
self.persist_block_data(block, total_gas)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_commits(
&mut self,
commits: &Commit,
validators: &Response,
) -> Result<(), NyxdScraperStorageError> {
self.persist_commits(commits, validators)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_txs(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_txs(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn persist_messages(
&mut self,
txs: &[ParsedTransactionResponse],
) -> Result<(), NyxdScraperStorageError> {
self.persist_messages(txs)
.await
.map_err(NyxdScraperStorageError::from)
}
async fn update_last_processed(&mut self, height: i64) -> Result<(), NyxdScraperStorageError> {
self.update_last_processed(height)
.await
.map_err(NyxdScraperStorageError::from)
}
}
-46
View File
@@ -1,46 +0,0 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::block_processor::types::ParsedTransactionResponse;
use crate::constants::{BECH32_CONESNSUS_PUBKEY_PREFIX, BECH32_CONSENSUS_ADDRESS_PREFIX};
use crate::error::ScraperError;
use cosmrs::AccountId;
use sha2::{Digest, Sha256};
use tendermint::{Hash, validator};
use tendermint::{PublicKey, account};
use tendermint_rpc::endpoint::validators;
pub(crate) fn tx_hash<M: AsRef<[u8]>>(raw_tx: M) -> Hash {
Hash::Sha256(Sha256::digest(raw_tx).into())
}
pub(crate) fn validator_pubkey_to_bech32(pubkey: PublicKey) -> Result<AccountId, ScraperError> {
// TODO: this one seem to attach additional prefix to they pubkeys, is that what we want instead maybe?
// Ok(pubkey.to_bech32(BECH32_CONESNSUS_PUBKEY_PREFIX))
AccountId::new(BECH32_CONESNSUS_PUBKEY_PREFIX, &pubkey.to_bytes())
.map_err(|source| ScraperError::MalformedValidatorPubkey { source })
}
pub(crate) fn validator_consensus_address(id: account::Id) -> Result<AccountId, ScraperError> {
AccountId::new(BECH32_CONSENSUS_ADDRESS_PREFIX, id.as_ref())
.map_err(|source| ScraperError::MalformedValidatorAddress { source })
}
pub(crate) fn tx_gas_sum(txs: &[ParsedTransactionResponse]) -> i64 {
txs.iter().map(|tx| tx.tx_result.gas_used).sum()
}
pub(crate) fn validator_info(
id: account::Id,
validators: &validators::Response,
) -> Result<&validator::Info, ScraperError> {
match validators.validators.iter().find(|v| v.address == id) {
Some(info) => Ok(info),
None => {
let addr = validator_consensus_address(id)?;
Err(ScraperError::MissingValidatorInfoCommitted {
address: addr.to_string(),
})
}
}
}
-400
View File
@@ -1,400 +0,0 @@
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::{
block_processor::types::{FullBlockInformation, ParsedTransactionResponse},
error::ScraperError,
storage::{
manager::{
StorageManager, insert_block, insert_message, insert_precommit, insert_transaction,
insert_validator, prune_blocks, prune_messages, prune_pre_commits, prune_transactions,
update_last_processed, update_last_pruned,
},
models::{CommitSignature, Validator},
},
};
use sqlx::{
ConnectOptions, Sqlite, Transaction,
sqlite::{SqliteAutoVacuum, SqliteSynchronous},
types::time::OffsetDateTime,
};
use std::{fmt::Debug, path::Path};
use tendermint::{
Block,
block::{Commit, CommitSig},
};
use tendermint_rpc::endpoint::validators;
use tokio::time::Instant;
use tracing::{debug, error, info, instrument, trace, warn};
mod helpers;
mod manager;
pub mod models;
pub type StorageTransaction = Transaction<'static, Sqlite>;
#[derive(Clone)]
pub struct ScraperStorage {
pub(crate) manager: StorageManager,
}
pub(crate) fn log_db_operation_time(op_name: &str, start_time: Instant) {
let elapsed = start_time.elapsed();
let formatted = humantime::format_duration(elapsed);
match elapsed.as_millis() {
v if v > 10000 => error!("{op_name} took {formatted} to execute"),
v if v > 1000 => warn!("{op_name} took {formatted} to execute"),
v if v > 100 => info!("{op_name} took {formatted} to execute"),
v if v > 10 => debug!("{op_name} took {formatted} to execute"),
_ => trace!("{op_name} took {formatted} to execute"),
}
}
impl ScraperStorage {
#[instrument]
pub async fn init<P: AsRef<Path> + Debug>(database_path: P) -> Result<Self, ScraperError> {
let database_path = database_path.as_ref();
debug!(
"initialising scraper database path to '{}'",
database_path.display()
);
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.auto_vacuum(SqliteAutoVacuum::Incremental)
.filename(database_path)
.create_if_missing(true)
.disable_statement_logging();
// TODO: do we want auto_vacuum ?
let connection_pool = match sqlx::SqlitePool::connect_with(opts).await {
Ok(db) => db,
Err(err) => {
error!("Failed to connect to SQLx database: {err}");
return Err(err.into());
}
};
if let Err(err) = sqlx::migrate!("./sql_migrations")
.run(&connection_pool)
.await
{
error!("Failed to initialize SQLx database: {err}");
return Err(err.into());
}
info!("Database migration finished!");
let manager = StorageManager { connection_pool };
manager.set_initial_metadata().await?;
let storage = ScraperStorage { manager };
Ok(storage)
}
#[instrument(skip(self))]
pub async fn prune_storage(
&self,
oldest_to_keep: u32,
current_height: u32,
) -> Result<(), ScraperError> {
let start = Instant::now();
let mut tx = self.begin_processing_tx().await?;
prune_messages(oldest_to_keep.into(), &mut *tx).await?;
prune_transactions(oldest_to_keep.into(), &mut *tx).await?;
prune_pre_commits(oldest_to_keep.into(), &mut *tx).await?;
prune_blocks(oldest_to_keep.into(), &mut *tx).await?;
update_last_pruned(current_height.into(), &mut *tx).await?;
let commit_start = Instant::now();
tx.commit()
.await
.map_err(|source| ScraperError::StorageTxCommitFailure { source })?;
log_db_operation_time("committing pruning tx", commit_start);
log_db_operation_time("pruning storage", start);
Ok(())
}
#[instrument(skip_all)]
pub async fn begin_processing_tx(&self) -> Result<StorageTransaction, ScraperError> {
debug!("starting storage tx");
self.manager
.connection_pool
.begin()
.await
.map_err(|source| ScraperError::StorageTxBeginFailure { source })
}
pub async fn lowest_block_height(&self) -> Result<Option<i64>, ScraperError> {
Ok(self.manager.get_lowest_block().await?)
}
pub async fn get_first_block_height_after(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, ScraperError> {
Ok(self.manager.get_first_block_height_after(time).await?)
}
pub async fn get_last_block_height_before(
&self,
time: OffsetDateTime,
) -> Result<Option<i64>, ScraperError> {
Ok(self.manager.get_last_block_height_before(time).await?)
}
pub async fn get_blocks_between(
&self,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, ScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
Ok(block_end - block_start)
}
pub async fn get_signed_between(
&self,
consensus_address: &str,
start_height: i64,
end_height: i64,
) -> Result<i64, ScraperError> {
Ok(self
.manager
.get_signed_between(consensus_address, start_height, end_height)
.await?)
}
pub async fn get_signed_between_times(
&self,
consensus_address: &str,
start_time: OffsetDateTime,
end_time: OffsetDateTime,
) -> Result<i64, ScraperError> {
let Some(block_start) = self.get_first_block_height_after(start_time).await? else {
return Ok(0);
};
let Some(block_end) = self.get_last_block_height_before(end_time).await? else {
return Ok(0);
};
self.get_signed_between(consensus_address, block_start, block_end)
.await
}
pub async fn get_precommit(
&self,
consensus_address: &str,
height: i64,
) -> Result<Option<CommitSignature>, ScraperError> {
Ok(self
.manager
.get_precommit(consensus_address, height)
.await?)
}
pub async fn get_block_signers(&self, height: i64) -> Result<Vec<Validator>, ScraperError> {
Ok(self.manager.get_block_validators(height).await?)
}
pub async fn get_all_known_validators(&self) -> Result<Vec<Validator>, ScraperError> {
Ok(self.manager.get_validators().await?)
}
pub async fn get_last_processed_height(&self) -> Result<i64, ScraperError> {
Ok(self.manager.get_last_processed_height().await?)
}
pub async fn get_pruned_height(&self) -> Result<i64, ScraperError> {
Ok(self.manager.get_pruned_height().await?)
}
}
pub async fn persist_block(
block: &FullBlockInformation,
tx: &mut StorageTransaction,
store_precommits: bool,
) -> Result<(), ScraperError> {
let total_gas = crate::helpers::tx_gas_sum(&block.transactions);
// SANITY CHECK: make sure the block proposer is present in the validator set
block.ensure_proposer()?;
// persist validators
persist_validators(&block.validators, tx).await?;
// persist block data
persist_block_data(&block.block, total_gas, tx).await?;
if store_precommits {
if let Some(commit) = &block.block.last_commit {
persist_commits(commit, &block.validators, tx).await?;
} else {
warn!("no commits for block {}", block.block.header.height)
}
}
// persist txs
persist_txs(&block.transactions, tx).await?;
// persist messages (inside the transactions)
persist_messages(&block.transactions, tx).await?;
update_last_processed(block.block.header.height.into(), tx.as_mut()).await?;
Ok(())
}
async fn persist_validators(
validators: &validators::Response,
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting {} validators", validators.total);
for validator in &validators.validators {
let consensus_address = crate::helpers::validator_consensus_address(validator.address)?;
let consensus_pubkey = crate::helpers::validator_pubkey_to_bech32(validator.pub_key)?;
insert_validator(
consensus_address.to_string(),
consensus_pubkey.to_string(),
tx.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_block_data(
block: &Block,
total_gas: i64,
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
let proposer_address =
crate::helpers::validator_consensus_address(block.header.proposer_address)?.to_string();
insert_block(
block.header.height.into(),
block.header.hash().to_string(),
block.data.len() as u32,
total_gas,
proposer_address,
block.header.time.into(),
tx.as_mut(),
)
.await?;
Ok(())
}
async fn persist_commits(
commits: &Commit,
validators: &validators::Response,
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting up to {} commits", commits.signatures.len());
let height: i64 = commits.height.into();
for commit_sig in &commits.signatures {
let (validator_id, timestamp, signature) = match commit_sig {
CommitSig::BlockIdFlagAbsent => {
trace!("absent signature");
continue;
}
CommitSig::BlockIdFlagCommit {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
CommitSig::BlockIdFlagNil {
validator_address,
timestamp,
signature,
} => (validator_address, timestamp, signature),
};
let validator = match crate::helpers::validator_info(*validator_id, validators) {
Ok(validator_info) => validator_info,
Err(err) => {
error!("{err}");
continue;
}
};
let validator_address = crate::helpers::validator_consensus_address(*validator_id)?;
if signature.is_none() {
warn!("empty signature for {validator_address} at height {height}");
continue;
}
insert_precommit(
validator_address.to_string(),
height,
(*timestamp).into(),
validator.power.into(),
validator.proposer_priority.value(),
tx.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_txs(
txs: &[ParsedTransactionResponse],
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting {} txs", txs.len());
for chain_tx in txs {
insert_transaction(
chain_tx.hash.to_string(),
chain_tx.height.into(),
chain_tx.index as i64,
chain_tx.tx_result.code.is_ok(),
chain_tx.tx.body.messages.len() as i64,
chain_tx.tx.body.memo.clone(),
chain_tx.tx_result.gas_wanted,
chain_tx.tx_result.gas_used,
chain_tx.tx_result.log.clone(),
tx.as_mut(),
)
.await?;
}
Ok(())
}
async fn persist_messages(
txs: &[ParsedTransactionResponse],
tx: &mut StorageTransaction,
) -> Result<(), ScraperError> {
debug!("persisting messages");
for chain_tx in txs {
for (index, msg) in chain_tx.tx.body.messages.iter().enumerate() {
insert_message(
chain_tx.hash.to_string(),
index as i64,
msg.type_url.clone(),
chain_tx.height.into(),
tx.as_mut(),
)
.await?
}
}
Ok(())
}
@@ -0,0 +1,27 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO transaction\n (hash, height, index, success, messages, memo, signatures, signer_infos, fee, gas_wanted, gas_used, raw_log, logs, events)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)\n ON CONFLICT (hash) DO UPDATE\n SET height = excluded.height,\n index = excluded.index,\n success = excluded.success,\n messages = excluded.messages,\n memo = excluded.memo,\n signatures = excluded.signatures,\n signer_infos = excluded.signer_infos,\n fee = excluded.fee,\n gas_wanted = excluded.gas_wanted,\n gas_used = excluded.gas_used,\n raw_log = excluded.raw_log,\n logs = excluded.logs,\n events = excluded.events\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8",
"Int4",
"Bool",
"Jsonb",
"Text",
"TextArray",
"Jsonb",
"Jsonb",
"Int8",
"Int8",
"Text",
"Jsonb",
"Jsonb"
]
},
"nullable": []
},
"hash": "08f4e54ac24fccd54f4208797b3749e457f8cd4ba3d7d906a7ab3bf5b4e7dc9c"
}
@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO validator (consensus_address, consensus_pubkey)\n VALUES ($1, $2)\n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text"
]
},
"nullable": []
},
"hash": "0d3709efacf763b06bf14803bb803b5ee5b27879b0026bb0480b3f2722318a75"
}

Some files were not shown because too many files have changed in this diff Show More